OpenSDN source code
bfd_server.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 CodiLime, Inc. All rights reserved.
3  */
4 
5  #include <atomic>
6 
8 #include "bfd/bfd_server.h"
9 #include "bfd/bfd_session.h"
10 #include "bfd/bfd_connection.h"
11 #include "bfd/bfd_control_packet.h"
12 #include "bfd/bfd_state_machine.h"
13 #include "bfd/bfd_common.h"
14 
15 #include <boost/foreach.hpp>
16 #include <boost/random/mersenne_twister.hpp>
17 
18 #include "base/logging.h"
19 #include "io/event_manager.h"
20 
21 namespace BFD {
22 
24  evm_(evm),
25  communicator_(communicator),
26  session_manager_(evm),
27  event_queue_(new WorkQueue<Event *>(
28  TaskScheduler::GetInstance()->GetTaskId("BFD"), 0,
29  boost::bind(&Server::EventCallback, this, _1))) {
30  communicator->SetServer(this);
31 }
32 
34 }
35 
36 void Server::AddSession(const SessionKey &key, const SessionConfig &config,
37  ChangeCb cb) {
38  EnqueueEvent(new Event(ADD_CONNECTION, key, config, cb));
39 }
40 
41 void Server::AddSession(Event *event) {
42  CHECK_CONCURRENCY("BFD");
43  Discriminator discriminator;
44  ConfigureSession(event->key, event->config, &discriminator);
45  sessions_.insert(event->key);
46  Session *session = SessionByKey(event->key);
47  if (session) {
48  session->RegisterChangeCallback(0, event->cb);
49  event->cb(session->key(), session->local_state());
50  }
51 }
52 
55 }
56 
58  CHECK_CONCURRENCY("BFD");
59  int erase_size = sessions_.erase(event->key);
60  if (erase_size) {
62  } else {
63  LOG(ERROR, __func__ << "Cannot find session: " <<
64  event->key.to_string());
65  }
66 }
67 
70 }
71 
73  CHECK_CONCURRENCY("BFD");
74  for (Sessions::iterator it = sessions_.begin(), next;
75  it != sessions_.end(); it = next) {
76  SessionKey key = *it;
77  next = ++it;
78  sessions_.erase(key);
80  }
81 }
82 
84  event_queue_->Enqueue(event);
85 }
86 
88  switch (event->type) {
89  case ADD_CONNECTION:
90  AddSession(event);
91  break;
92  case DELETE_CONNECTION:
93  DeleteSession(event);
94  break;
96  DeleteClientSessions(event);
97  break;
98  case PROCESS_PACKET:
99  ProcessControlPacket(event);
100  break;
101  }
102  delete event;
103  return true;
104 }
105 
107  CHECK_CONCURRENCY("BFD");
108 
109  SessionIndex session_index;
110  if (packet->local_endpoint.port() == kSingleHop) {
111  session_index.if_index = packet->session_index.if_index;
112  } else {
113  session_index.vrf_index = packet->session_index.vrf_index;
114  }
115 
116  if (packet->receiver_discriminator) {
118  (packet->receiver_discriminator);
119  if (session_bydesc && session_bydesc->Stats().rx_count == 0) {
120  Session *session_bykey = session_manager_.SessionByKey(
121  SessionKey(packet->remote_endpoint.address(), session_index,
122  packet->local_endpoint.port(),
123  packet->local_endpoint.address()));
124  if (!session_bykey) {
125  if ((session_bydesc->key()).local_address ==
126  packet->local_endpoint.address()) {
127  LOG(ERROR, __func__ << " SRC-ADDR Changed, on Peer:" <<
128  packet->remote_endpoint.address().to_string() <<
129  "on Agent:" << (session_bydesc->key()).remote_address.to_string());
130  //Accept pkt to support src-addr change at peer
131  } else {
132  LOG(ERROR, __func__ << " Session from Key NULL: " <<
133  packet->remote_endpoint.address().to_string() <<
134  " -> " <<
135  packet->local_endpoint.address().to_string());
136  return NULL;
137  }
138  } else if (session_bydesc->local_discriminator() !=
139  session_bykey->local_discriminator()) {
140  LOG(ERROR, __func__ << " DISC -> Session MISMATCH: " <<
141  packet->remote_endpoint.address().to_string() <<
142  " -> " <<
143  packet->local_endpoint.address().to_string());
144  LOG(ERROR, "PACKET Your_Discriminator mapped session: " <<
145  session_bydesc->toString());
146  return NULL;
147  }
148  }
149  return session_bydesc;
150  }
151 
152  // Use ifindex for single hop and vrfindex for multihop sessions.
154  SessionKey(packet->remote_endpoint.address(), session_index,
155  packet->local_endpoint.port(),
156  packet->local_endpoint.address()));
157 
158  // Try with 0.0.0.0 local address
159  if (!session) {
160  session = session_manager_.SessionByKey(
161  SessionKey(packet->remote_endpoint.address(), session_index,
162  packet->local_endpoint.port()));
163  }
164  return session;
165 }
166 
167 Session *Server::SessionByKey(const boost::asio::ip::address &address,
168  const SessionIndex &index) {
169  return session_manager_.SessionByKey(SessionKey(address, index));
170 }
171 
173  return session_manager_.SessionByKey(key);
174 }
175 
177  return session_manager_.SessionByKey(key);
178 }
179 
181  const boost::asio::ip::udp::endpoint &local_endpoint,
182  const boost::asio::ip::udp::endpoint &remote_endpoint,
183  const SessionIndex &session_index,
184  const boost::asio::const_buffer &recv_buffer,
185  std::size_t bytes_transferred, const boost::system::error_code& error) {
186  EnqueueEvent(new Event(PROCESS_PACKET, local_endpoint, remote_endpoint,
187  session_index, recv_buffer, bytes_transferred));
188 }
189 
191  if (event->bytes_transferred != (std::size_t) kMinimalPacketLength) {
192  LOG(ERROR, __func__ << "Wrong packet size: " <<
193  event->bytes_transferred);
194  return;
195  }
196 
197  boost::scoped_ptr<ControlPacket> packet(ParseControlPacket(
198  boost::asio::buffer_cast<const uint8_t *>(event->recv_buffer),
199  event->bytes_transferred));
200  if (packet == NULL) {
201  LOG(ERROR, __func__ << "Unable to parse packet");
202  return;
203  }
204  packet->local_endpoint = event->local_endpoint;
205  packet->remote_endpoint = event->remote_endpoint;
206  packet->session_index = event->session_index;
207  ProcessControlPacketActual(packet.get());
208  delete[] boost::asio::buffer_cast<const uint8_t *>(event->recv_buffer);
209 }
210 
212  ResultCode result;
213  result = packet->Verify();
214  if (result != kResultCode_Ok) {
215  LOG(ERROR, "Wrong packet: " << result);
216  return result;
217  }
218  Session *session = NULL;
219  session = GetSession(packet);
220  if (session == NULL) {
221  LOG(ERROR, "Unknown session: " <<
222  packet->remote_endpoint.address().to_string() << "/" <<
223  packet->receiver_discriminator);
225  }
226  session->Stats().rx_count++;
227  result = session->ProcessControlPacket(packet);
228  if (result != kResultCode_Ok) {
229  LOG(ERROR, "Unable to process session: result " << result
230  << ", session: " << session->toString());
231  session->Stats().rx_error_count++;
232  return result;
233  }
234  return kResultCode_Ok;
235 }
236 
238  const SessionConfig &config,
239  Discriminator *assignedDiscriminator) {
240  return session_manager_.ConfigureSession(key, config, communicator_,
241  assignedDiscriminator);
242 }
243 
246 }
247 
249  Discriminator discriminator) {
250  DiscriminatorSessionMap::const_iterator it =
251  by_discriminator_.find(discriminator);
252  if (it == by_discriminator_.end())
253  return NULL;
254  return it->second;
255 }
256 
258  KeySessionMap::const_iterator it = by_key_.find(key);
259  return it != by_key_.end() ? it->second : NULL;
260 }
261 
263  KeySessionMap::const_iterator it = by_key_.find(key);
264  return it != by_key_.end() ? it->second : NULL;
265 }
266 
268  const SessionKey &key) {
269  Session *session = SessionByKey(key);
270  if (session == NULL) {
271  LOG(DEBUG, __FUNCTION__ << " No such session: " << key.to_string());
273  }
274 
275  if (!--refcounts_[session]) {
276  by_discriminator_.erase(session->local_discriminator());
277  by_key_.erase(key);
278  delete session;
279  }
280 
281  return kResultCode_Ok;
282 }
283 
285  const SessionConfig &config, Connection *communicator,
286  Discriminator *assignedDiscriminator) {
287  Session *session = SessionByKey(key);
288  if (session) {
289  session->UpdateConfig(config);
290 
291  LOG(INFO, __func__ << ": UpdateConfig : "
292  << session->key().to_string() << "/"
293  << session->local_discriminator());
294 
295  return kResultCode_Ok;
296  }
297 
298  *assignedDiscriminator = GenerateUniqueDiscriminator();
299  session = new Session(*assignedDiscriminator, key, evm_, config,
300  communicator);
301 
302  by_discriminator_[*assignedDiscriminator] = session;
303  by_key_[key] = session;
304  refcounts_[session] = 1;
305 
306  LOG(INFO, __func__ << ": New session configured: " << key.to_string() << "/"
307  << *assignedDiscriminator);
308 
309  return kResultCode_Ok;
310 }
311 
313  class DiscriminatorGenerator {
314  public:
315  DiscriminatorGenerator() {
316  next_ = gen()%0x1000000 + 1;
317  }
318 
319  Discriminator Next() {
320  return next_.fetch_add(1);
321  }
322 
323  private:
324  std::atomic<Discriminator> next_;
325  boost::random::mt19937 gen;
326  };
327 
328  static DiscriminatorGenerator generator;
329 
330  return generator.Next();
331 }
332 
334  for (DiscriminatorSessionMap::iterator it = by_discriminator_.begin();
335  it != by_discriminator_.end(); ++it) {
336  it->second->Stop();
337  delete it->second;
338  }
339 }
340 } // namespace BFD
virtual void SetServer(Server *server)=0
Session * SessionByKey(const SessionKey &key)
Definition: bfd_server.cc:257
ResultCode ConfigureSession(const SessionKey &key, const SessionConfig &config, Connection *communicator, Discriminator *assignedDiscriminator)
Definition: bfd_server.cc:284
Discriminator GenerateUniqueDiscriminator()
Definition: bfd_server.cc:312
Session * SessionByDiscriminator(Discriminator discriminator)
Definition: bfd_server.cc:248
DiscriminatorSessionMap by_discriminator_
Definition: bfd_server.h:89
ResultCode RemoveSessionReference(const SessionKey &key)
Definition: bfd_server.cc:267
ResultCode ProcessControlPacketActual(const ControlPacket *packet)
Definition: bfd_server.cc:211
bool EventCallback(Event *event)
Definition: bfd_server.cc:87
SessionManager session_manager_
Definition: bfd_server.h:145
void DeleteClientSessions()
Definition: bfd_server.cc:68
virtual ~Server()
Definition: bfd_server.cc:33
Sessions sessions_
Definition: bfd_server.h:147
boost::scoped_ptr< WorkQueue< Event * > > event_queue_
Definition: bfd_server.h:146
void AddSession(const SessionKey &key, const SessionConfig &config, ChangeCb cb)
Definition: bfd_server.cc:36
ResultCode ConfigureSession(const SessionKey &key, const SessionConfig &config, Discriminator *assignedDiscriminator)
Definition: bfd_server.cc:237
Session * SessionByKey(const boost::asio::ip::address &address, const SessionIndex &index=SessionIndex())
Definition: bfd_server.cc:167
Server(EventManager *evm, Connection *communicator)
Definition: bfd_server.cc:23
Session * GetSession(const ControlPacket *packet)
Definition: bfd_server.cc:106
Connection * communicator_
Definition: bfd_server.h:144
void EnqueueEvent(Event *event)
Definition: bfd_server.cc:83
void DeleteSession(const SessionKey &key)
Definition: bfd_server.cc:53
@ DELETE_CONNECTION
Definition: bfd_server.h:97
@ PROCESS_PACKET
Definition: bfd_server.h:99
@ DELETE_CLIENT_CONNECTIONS
Definition: bfd_server.h:98
@ ADD_CONNECTION
Definition: bfd_server.h:96
Connection * communicator() const
Definition: bfd_server.h:55
EventManager * evm_
Definition: bfd_server.h:143
ResultCode RemoveSessionReference(const SessionKey &key)
Definition: bfd_server.cc:244
void ProcessControlPacket(const boost::asio::ip::udp::endpoint &local_endpoint, const boost::asio::ip::udp::endpoint &remote_endpoint, const SessionIndex &session_index, const boost::asio::const_buffer &recv_buffer, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: bfd_server.cc:180
ResultCode ProcessControlPacket(const ControlPacket *packet)
Definition: bfd_session.cc:173
void UpdateConfig(const SessionConfig &config)
Definition: bfd_session.cc:313
void RegisterChangeCallback(ClientId client_id, ChangeCb cb)
Definition: bfd_session.cc:305
BFDState local_state() const
Definition: bfd_session.cc:143
std::string toString() const
Definition: bfd_session.cc:80
BFDStats & Stats()
Definition: bfd_session.h:73
Discriminator local_discriminator() const
Definition: bfd_session.cc:293
const SessionKey & key() const
Definition: bfd_session.cc:271
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
static EventManager evm
Event
Definition: http_client.h:29
#define LOG(_Level, _Msg)
Definition: logging.h:34
Definition: bfd_client.h:9
ControlPacket * ParseControlPacket(const uint8_t *data, size_t size)
boost::function< void(const SessionKey &key, const BFD::BFDState &state)> ChangeCb
Definition: bfd_common.h:131
ResultCode
Definition: bfd_common.h:41
@ kResultCode_Ok
Definition: bfd_common.h:42
@ kResultCode_UnknownSession
Definition: bfd_common.h:43
uint32_t Discriminator
Definition: bfd_common.h:17
const int kMinimalPacketLength
Definition: bfd_common.cc:41
@ kSingleHop
Definition: bfd_common.h:26
int rx_error_count
Definition: bfd_session.h:46
boost::asio::ip::udp::endpoint local_endpoint
boost::asio::ip::udp::endpoint remote_endpoint
BFD::Discriminator receiver_discriminator
SessionIndex session_index
ResultCode Verify() const
SessionConfig config
Definition: bfd_server.h:125
SessionKey key
Definition: bfd_server.h:124
std::size_t bytes_transferred
Definition: bfd_server.h:131
const boost::asio::const_buffer recv_buffer
Definition: bfd_server.h:130
uint32_t vrf_index
Definition: bfd_common.h:82
uint32_t if_index
Definition: bfd_common.h:81
const std::string to_string() const
Definition: bfd_common.h:107
#define CHECK_CONCURRENCY(...)