OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
bfd_server.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 CodiLime, Inc. All rights reserved.
3  */
4 
6 #include "bfd/bfd_server.h"
7 #include "bfd/bfd_session.h"
8 #include "bfd/bfd_connection.h"
10 #include "bfd/bfd_state_machine.h"
11 #include "bfd/bfd_common.h"
12 
13 #include <boost/foreach.hpp>
14 #include <boost/random/mersenne_twister.hpp>
15 
16 #include "base/logging.h"
17 #include "io/event_manager.h"
18 
19 namespace BFD {
20 
22  evm_(evm),
23  communicator_(communicator),
24  session_manager_(evm),
25  event_queue_(new WorkQueue<Event *>(
26  TaskScheduler::GetInstance()->GetTaskId("BFD"), 0,
27  boost::bind(&Server::EventCallback, this, _1))) {
28  communicator->SetServer(this);
29 }
30 
32 }
33 
34 void Server::AddSession(const SessionKey &key, const SessionConfig &config,
35  ChangeCb cb) {
36  EnqueueEvent(new Event(ADD_CONNECTION, key, config, cb));
37 }
38 
39 void Server::AddSession(Event *event) {
40  CHECK_CONCURRENCY("BFD");
41  Discriminator discriminator;
42  ConfigureSession(event->key, event->config, &discriminator);
43  sessions_.insert(event->key);
44  Session *session = SessionByKey(event->key);
45  if (session) {
46  session->RegisterChangeCallback(0, event->cb);
47  event->cb(session->key(), session->local_state());
48  }
49 }
50 
53 }
54 
56  CHECK_CONCURRENCY("BFD");
57  int erase_size = sessions_.erase(event->key);
58  if (erase_size) {
60  } else {
61  LOG(ERROR, __func__ << "Cannot find session: " <<
62  event->key.to_string());
63  }
64 }
65 
68 }
69 
71  CHECK_CONCURRENCY("BFD");
72  for (Sessions::iterator it = sessions_.begin(), next;
73  it != sessions_.end(); it = next) {
74  SessionKey key = *it;
75  next = ++it;
76  sessions_.erase(key);
78  }
79 }
80 
82  event_queue_->Enqueue(event);
83 }
84 
86  switch (event->type) {
87  case ADD_CONNECTION:
88  AddSession(event);
89  break;
90  case DELETE_CONNECTION:
91  DeleteSession(event);
92  break;
94  DeleteClientSessions(event);
95  break;
96  case PROCESS_PACKET:
97  ProcessControlPacket(event);
98  break;
99  }
100  delete event;
101  return true;
102 }
103 
105  CHECK_CONCURRENCY("BFD");
106 
107  SessionIndex session_index;
108  if (packet->local_endpoint.port() == kSingleHop) {
109  session_index.if_index = packet->session_index.if_index;
110  } else {
111  session_index.vrf_index = packet->session_index.vrf_index;
112  }
113 
114  if (packet->receiver_discriminator) {
116  (packet->receiver_discriminator);
117  if (session_bydesc && session_bydesc->Stats().rx_count == 0) {
118  Session *session_bykey = session_manager_.SessionByKey(
119  SessionKey(packet->remote_endpoint.address(), session_index,
120  packet->local_endpoint.port(),
121  packet->local_endpoint.address()));
122  if (!session_bykey) {
123  if ((session_bydesc->key()).local_address ==
124  packet->local_endpoint.address()) {
125  LOG(ERROR, __func__ << " SRC-ADDR Changed, on Peer:" <<
126  packet->remote_endpoint.address().to_string() <<
127  "on Agent:" << (session_bydesc->key()).remote_address.to_string());
128  //Accept pkt to support src-addr change at peer
129  } else {
130  LOG(ERROR, __func__ << " Session from Key NULL: " <<
131  packet->remote_endpoint.address().to_string() <<
132  " -> " <<
133  packet->local_endpoint.address().to_string());
134  return NULL;
135  }
136  } else if (session_bydesc->local_discriminator() !=
137  session_bykey->local_discriminator()) {
138  LOG(ERROR, __func__ << " DISC -> Session MISMATCH: " <<
139  packet->remote_endpoint.address().to_string() <<
140  " -> " <<
141  packet->local_endpoint.address().to_string());
142  LOG(ERROR, "PACKET Your_Discriminator mapped session: " <<
143  session_bydesc->toString());
144  return NULL;
145  }
146  }
147  return session_bydesc;
148  }
149 
150  // Use ifindex for single hop and vrfindex for multihop sessions.
152  SessionKey(packet->remote_endpoint.address(), session_index,
153  packet->local_endpoint.port(),
154  packet->local_endpoint.address()));
155 
156  // Try with 0.0.0.0 local address
157  if (!session) {
158  session = session_manager_.SessionByKey(
159  SessionKey(packet->remote_endpoint.address(), session_index,
160  packet->local_endpoint.port()));
161  }
162  return session;
163 }
164 
165 Session *Server::SessionByKey(const boost::asio::ip::address &address,
166  const SessionIndex &index) {
167  return session_manager_.SessionByKey(SessionKey(address, index));
168 }
169 
171  return session_manager_.SessionByKey(key);
172 }
173 
175  return session_manager_.SessionByKey(key);
176 }
177 
179  const boost::asio::ip::udp::endpoint &local_endpoint,
180  const boost::asio::ip::udp::endpoint &remote_endpoint,
181  const SessionIndex &session_index,
182  const boost::asio::const_buffer &recv_buffer,
183  std::size_t bytes_transferred, const boost::system::error_code& error) {
184  EnqueueEvent(new Event(PROCESS_PACKET, local_endpoint, remote_endpoint,
185  session_index, recv_buffer, bytes_transferred));
186 }
187 
189  if (event->bytes_transferred != (std::size_t) kMinimalPacketLength) {
190  LOG(ERROR, __func__ << "Wrong packet size: " <<
191  event->bytes_transferred);
192  return;
193  }
194 
195  boost::scoped_ptr<ControlPacket> packet(ParseControlPacket(
196  boost::asio::buffer_cast<const uint8_t *>(event->recv_buffer),
197  event->bytes_transferred));
198  if (packet == NULL) {
199  LOG(ERROR, __func__ << "Unable to parse packet");
200  return;
201  }
202  packet->local_endpoint = event->local_endpoint;
203  packet->remote_endpoint = event->remote_endpoint;
204  packet->session_index = event->session_index;
205  ProcessControlPacketActual(packet.get());
206  delete[] boost::asio::buffer_cast<const uint8_t *>(event->recv_buffer);
207 }
208 
210  ResultCode result;
211  result = packet->Verify();
212  if (result != kResultCode_Ok) {
213  LOG(ERROR, "Wrong packet: " << result);
214  return result;
215  }
216  Session *session = NULL;
217  session = GetSession(packet);
218  if (session == NULL) {
219  LOG(ERROR, "Unknown session: " <<
220  packet->remote_endpoint.address().to_string() << "/" <<
221  packet->receiver_discriminator);
223  }
224  session->Stats().rx_count++;
225  result = session->ProcessControlPacket(packet);
226  if (result != kResultCode_Ok) {
227  LOG(ERROR, "Unable to process session: result " << result
228  << ", session: " << session->toString());
229  session->Stats().rx_error_count++;
230  return result;
231  }
232  return kResultCode_Ok;
233 }
234 
236  const SessionConfig &config,
237  Discriminator *assignedDiscriminator) {
238  return session_manager_.ConfigureSession(key, config, communicator_,
239  assignedDiscriminator);
240 }
241 
244 }
245 
247  Discriminator discriminator) {
248  DiscriminatorSessionMap::const_iterator it =
249  by_discriminator_.find(discriminator);
250  if (it == by_discriminator_.end())
251  return NULL;
252  return it->second;
253 }
254 
256  KeySessionMap::const_iterator it = by_key_.find(key);
257  return it != by_key_.end() ? it->second : NULL;
258 }
259 
261  KeySessionMap::const_iterator it = by_key_.find(key);
262  return it != by_key_.end() ? it->second : NULL;
263 }
264 
266  const SessionKey &key) {
267  Session *session = SessionByKey(key);
268  if (session == NULL) {
269  LOG(DEBUG, __FUNCTION__ << " No such session: " << key.to_string());
271  }
272 
273  if (!--refcounts_[session]) {
274  by_discriminator_.erase(session->local_discriminator());
275  by_key_.erase(key);
276  delete session;
277  }
278 
279  return kResultCode_Ok;
280 }
281 
283  const SessionConfig &config, Connection *communicator,
284  Discriminator *assignedDiscriminator) {
285  Session *session = SessionByKey(key);
286  if (session) {
287  session->UpdateConfig(config);
288 
289  LOG(INFO, __func__ << ": UpdateConfig : "
290  << session->key().to_string() << "/"
291  << session->local_discriminator());
292 
293  return kResultCode_Ok;
294  }
295 
296  *assignedDiscriminator = GenerateUniqueDiscriminator();
297  session = new Session(*assignedDiscriminator, key, evm_, config,
298  communicator);
299 
300  by_discriminator_[*assignedDiscriminator] = session;
301  by_key_[key] = session;
302  refcounts_[session] = 1;
303 
304  LOG(INFO, __func__ << ": New session configured: " << key.to_string() << "/"
305  << *assignedDiscriminator);
306 
307  return kResultCode_Ok;
308 }
309 
311  class DiscriminatorGenerator {
312  public:
313  DiscriminatorGenerator() {
314  next_ = gen()%0x1000000 + 1;
315  }
316 
317  Discriminator Next() {
318  return next_.fetch_and_increment();
319  }
320 
321  private:
322  tbb::atomic<Discriminator> next_;
323  boost::random::mt19937 gen;
324  };
325 
326  static DiscriminatorGenerator generator;
327 
328  return generator.Next();
329 }
330 
332  for (DiscriminatorSessionMap::iterator it = by_discriminator_.begin();
333  it != by_discriminator_.end(); ++it) {
334  it->second->Stop();
335  delete it->second;
336  }
337 }
338 } // namespace BFD
uint32_t Discriminator
Definition: bfd_common.h:17
void AddSession(const SessionKey &key, const SessionConfig &config, ChangeCb cb)
Definition: bfd_server.cc:34
const SessionKey & key() const
Definition: bfd_session.cc:271
SessionKey key
Definition: bfd_server.h:124
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
Session * SessionByKey(const SessionKey &key)
Definition: bfd_server.cc:255
void ProcessControlPacket(const boost::asio::ip::udp::endpoint &local_endpoint, const boost::asio::ip::udp::endpoint &remote_endpoint, const SessionIndex &session_index, const boost::asio::const_buffer &recv_buffer, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: bfd_server.cc:178
Discriminator GenerateUniqueDiscriminator()
Definition: bfd_server.cc:310
Session * SessionByKey(const boost::asio::ip::address &address, const SessionIndex &index=SessionIndex())
Definition: bfd_server.cc:165
ResultCode Verify() const
uint32_t vrf_index
Definition: bfd_common.h:82
Session * SessionByDiscriminator(Discriminator discriminator)
Definition: bfd_server.cc:246
SessionIndex session_index
std::string toString() const
Definition: bfd_session.cc:80
const boost::asio::const_buffer recv_buffer
Definition: bfd_server.h:130
virtual ~Server()
Definition: bfd_server.cc:31
SessionManager session_manager_
Definition: bfd_server.h:145
ResultCode ConfigureSession(const SessionKey &key, const SessionConfig &config, Connection *communicator, Discriminator *assignedDiscriminator)
Definition: bfd_server.cc:282
ResultCode RemoveSessionReference(const SessionKey &key)
Definition: bfd_server.cc:265
bool EventCallback(Event *event)
Definition: bfd_server.cc:85
boost::scoped_ptr< WorkQueue< Event * > > event_queue_
Definition: bfd_server.h:146
BFD::Discriminator receiver_discriminator
Event
Definition: http_client.h:27
boost::asio::ip::udp::endpoint remote_endpoint
ResultCode ProcessControlPacketActual(const ControlPacket *packet)
Definition: bfd_server.cc:209
int rx_error_count
Definition: bfd_session.h:46
void RegisterChangeCallback(ClientId client_id, ChangeCb cb)
Definition: bfd_session.cc:305
#define CHECK_CONCURRENCY(...)
SessionConfig config
Definition: bfd_server.h:125
Connection * communicator() const
Definition: bfd_server.h:55
void EnqueueEvent(Event *event)
Definition: bfd_server.cc:81
ResultCode
Definition: bfd_common.h:41
const int kMinimalPacketLength
Definition: bfd_common.cc:41
DiscriminatorSessionMap by_discriminator_
Definition: bfd_server.h:89
boost::function< void(const SessionKey &key, const BFD::BFDState &state)> ChangeCb
Definition: bfd_common.h:131
Session * GetSession(const ControlPacket *packet)
Definition: bfd_server.cc:104
boost::asio::ip::udp::endpoint local_endpoint
ResultCode RemoveSessionReference(const SessionKey &key)
Definition: bfd_server.cc:242
EventManager * evm_
Definition: bfd_server.h:143
BFDState local_state() const
Definition: bfd_session.cc:143
Connection * communicator_
Definition: bfd_server.h:144
#define LOG(_Level, _Msg)
Definition: logging.h:33
void DeleteSession(const SessionKey &key)
Definition: bfd_server.cc:51
ResultCode ProcessControlPacket(const ControlPacket *packet)
Definition: bfd_session.cc:173
BFDStats & Stats()
Definition: bfd_session.h:73
Discriminator local_discriminator() const
Definition: bfd_session.cc:293
std::size_t bytes_transferred
Definition: bfd_server.h:131
virtual void SetServer(Server *server)=0
Server(EventManager *evm, Connection *communicator)
Definition: bfd_server.cc:21
const std::string to_string() const
Definition: bfd_common.h:107
ResultCode ConfigureSession(const SessionKey &key, const SessionConfig &config, Discriminator *assignedDiscriminator)
Definition: bfd_server.cc:235
void DeleteClientSessions()
Definition: bfd_server.cc:66
uint32_t if_index
Definition: bfd_common.h:81
ControlPacket * ParseControlPacket(const uint8_t *data, size_t size)
static EventManager evm
Sessions sessions_
Definition: bfd_server.h:147
void UpdateConfig(const SessionConfig &config)
Definition: bfd_session.cc:313