13 #include <boost/foreach.hpp>
14 #include <boost/random/mersenne_twister.hpp>
23 communicator_(communicator),
24 session_manager_(evm),
27 boost::bind(&
Server::EventCallback, this, _1))) {
61 LOG(ERROR, __func__ <<
"Cannot find session: " <<
72 for (Sessions::iterator it =
sessions_.begin(), next;
86 switch (event->
type) {
117 if (session_bydesc && session_bydesc->
Stats().
rx_count == 0) {
122 if (!session_bykey) {
123 if ((session_bydesc->
key()).local_address ==
125 LOG(ERROR, __func__ <<
" SRC-ADDR Changed, on Peer:" <<
127 "on Agent:" << (session_bydesc->
key()).remote_address.to_string());
130 LOG(ERROR, __func__ <<
" Session from Key NULL: " <<
138 LOG(ERROR, __func__ <<
" DISC -> Session MISMATCH: " <<
142 LOG(ERROR,
"PACKET Your_Discriminator mapped session: " <<
147 return session_bydesc;
179 const boost::asio::ip::udp::endpoint &local_endpoint,
180 const boost::asio::ip::udp::endpoint &remote_endpoint,
182 const boost::asio::const_buffer &recv_buffer,
183 std::size_t bytes_transferred,
const boost::system::error_code& error) {
185 session_index, recv_buffer, bytes_transferred));
190 LOG(ERROR, __func__ <<
"Wrong packet size: " <<
196 boost::asio::buffer_cast<const uint8_t *>(event->
recv_buffer),
198 if (packet == NULL) {
199 LOG(ERROR, __func__ <<
"Unable to parse packet");
202 packet->local_endpoint =
event->local_endpoint;
203 packet->remote_endpoint =
event->remote_endpoint;
204 packet->session_index =
event->session_index;
206 delete[] boost::asio::buffer_cast<
const uint8_t *>(
event->recv_buffer);
211 result = packet->
Verify();
213 LOG(ERROR,
"Wrong packet: " << result);
218 if (session == NULL) {
219 LOG(ERROR,
"Unknown session: " <<
227 LOG(ERROR,
"Unable to process session: result " << result
228 <<
", session: " << session->
toString());
239 assignedDiscriminator);
248 DiscriminatorSessionMap::const_iterator it =
256 KeySessionMap::const_iterator it = by_key_.find(key);
257 return it != by_key_.end() ? it->second : NULL;
261 KeySessionMap::const_iterator it = by_key_.find(key);
262 return it != by_key_.end() ? it->second : NULL;
268 if (session == NULL) {
269 LOG(DEBUG, __FUNCTION__ <<
" No such session: " << key.
to_string());
273 if (!--refcounts_[session]) {
274 by_discriminator_.erase(session->local_discriminator());
289 LOG(INFO, __func__ <<
": UpdateConfig : "
296 *assignedDiscriminator = GenerateUniqueDiscriminator();
297 session =
new Session(*assignedDiscriminator, key,
evm_, config,
300 by_discriminator_[*assignedDiscriminator] = session;
301 by_key_[key] = session;
302 refcounts_[session] = 1;
304 LOG(INFO, __func__ <<
": New session configured: " << key.
to_string() <<
"/"
305 << *assignedDiscriminator);
311 class DiscriminatorGenerator {
313 DiscriminatorGenerator() {
314 next_ = gen()%0x1000000 + 1;
318 return next_.fetch_and_increment();
322 tbb::atomic<Discriminator> next_;
323 boost::random::mt19937 gen;
326 static DiscriminatorGenerator generator;
328 return generator.Next();
332 for (DiscriminatorSessionMap::iterator it = by_discriminator_.begin();
333 it != by_discriminator_.end(); ++it) {
void AddSession(const SessionKey &key, const SessionConfig &config, ChangeCb cb)
const SessionKey & key() const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Session * SessionByKey(const SessionKey &key)
void ProcessControlPacket(const boost::asio::ip::udp::endpoint &local_endpoint, const boost::asio::ip::udp::endpoint &remote_endpoint, const SessionIndex &session_index, const boost::asio::const_buffer &recv_buffer, std::size_t bytes_transferred, const boost::system::error_code &error)
Discriminator GenerateUniqueDiscriminator()
Session * SessionByKey(const boost::asio::ip::address &address, const SessionIndex &index=SessionIndex())
ResultCode Verify() const
Session * SessionByDiscriminator(Discriminator discriminator)
SessionIndex session_index
std::string toString() const
const boost::asio::const_buffer recv_buffer
SessionManager session_manager_
ResultCode ConfigureSession(const SessionKey &key, const SessionConfig &config, Connection *communicator, Discriminator *assignedDiscriminator)
ResultCode RemoveSessionReference(const SessionKey &key)
bool EventCallback(Event *event)
boost::scoped_ptr< WorkQueue< Event * > > event_queue_
BFD::Discriminator receiver_discriminator
boost::asio::ip::udp::endpoint remote_endpoint
ResultCode ProcessControlPacketActual(const ControlPacket *packet)
void RegisterChangeCallback(ClientId client_id, ChangeCb cb)
#define CHECK_CONCURRENCY(...)
Connection * communicator() const
void EnqueueEvent(Event *event)
const int kMinimalPacketLength
DiscriminatorSessionMap by_discriminator_
boost::function< void(const SessionKey &key, const BFD::BFDState &state)> ChangeCb
Session * GetSession(const ControlPacket *packet)
boost::asio::ip::udp::endpoint local_endpoint
ResultCode RemoveSessionReference(const SessionKey &key)
BFDState local_state() const
Connection * communicator_
#define LOG(_Level, _Msg)
void DeleteSession(const SessionKey &key)
ResultCode ProcessControlPacket(const ControlPacket *packet)
Discriminator local_discriminator() const
std::size_t bytes_transferred
virtual void SetServer(Server *server)=0
Server(EventManager *evm, Connection *communicator)
const std::string to_string() const
ResultCode ConfigureSession(const SessionKey &key, const SessionConfig &config, Discriminator *assignedDiscriminator)
void DeleteClientSessions()
ControlPacket * ParseControlPacket(const uint8_t *data, size_t size)
void UpdateConfig(const SessionConfig &config)