OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sandesh_state_machine.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_state_machine.cc
7 //
8 // Sandesh State Machine Implementation
9 //
10 
11 #include <typeinfo>
12 #include <boost/bind.hpp>
13 #include <boost/date_time/posix_time/posix_time.hpp>
14 #include <boost/statechart/custom_reaction.hpp>
15 #include <boost/statechart/event.hpp>
16 #include <boost/statechart/simple_state.hpp>
17 #include <boost/statechart/state.hpp>
18 #include <boost/statechart/state_machine.hpp>
19 #include <boost/statechart/transition.hpp>
20 #include <boost/statechart/in_state_reaction.hpp>
21 
22 #include <base/logging.h>
23 #include <base/task_annotations.h>
24 #include <io/event_manager.h>
25 #include <io/tcp_session.h>
26 #include <io/tcp_server.h>
27 
28 #include <sandesh/sandesh_constants.h>
29 #include <sandesh/sandesh_types.h>
30 #include <sandesh/sandesh.h>
31 #include <sandesh/sandesh_session.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/sandesh_message_builder.h>
34 #include "sandesh_statistics.h"
35 #include "sandesh_connection.h"
36 #include "sandesh_state_machine.h"
37 
38 using namespace std;
39 using boost::system::error_code;
40 
41 namespace mpl = boost::mpl;
42 namespace sc = boost::statechart;
43 
44 #define SM_LOG(_Level, _Msg) \
45  do { \
46  if (LoggingDisabled()) break; \
47  log4cplus::Logger _Xlogger = Sandesh::logger(); \
48  if (_Xlogger.isEnabledFor(log4cplus::_Level##_LOG_LEVEL)) { \
49  log4cplus::tostringstream _Xbuf; \
50  SandeshStateMachine *_sm = &context<SandeshStateMachine>(); \
51  _Xbuf << _sm->prefix() << _Msg; \
52  _Xlogger.forcedLog(log4cplus::_Level##_LOG_LEVEL, \
53  _Xbuf.str()); \
54  } \
55  } while (false)
56 
57 #define SESSION_LOG(session) \
58  SANDESH_LOG(DEBUG, ((session) ? (session)->ToString() : "*") << ":" << Name())
59 
60 
61 namespace ssm {
62 
63 // events
64 struct EvStart : sc::event<EvStart> {
65  static const char * Name() {
66  return "EvStart";
67  }
68 };
69 
70 struct EvStop : sc::event<EvStop> {
71  static const char * Name() {
72  return "EvStop";
73  }
74 };
75 
76 struct EvIdleHoldTimerExpired : sc::event<EvIdleHoldTimerExpired> {
77  EvIdleHoldTimerExpired(Timer *timer) : timer_(timer) {
78  }
79  static const char * Name() {
80  return "EvIdleHoldTimerExpired";
81  }
82  bool validate() const {
83  return !timer_->cancelled();
84  }
86 };
87 
88 struct EvTcpPassiveOpen : sc::event<EvTcpPassiveOpen> {
89  EvTcpPassiveOpen(SandeshSession *session) : session(session) {
90  SESSION_LOG(session);
91  };
92  static const char * Name() {
93  return "EvTcpPassiveOpen";
94  }
96 };
97 
98 struct EvTcpClose : sc::event<EvTcpClose> {
99  EvTcpClose(SandeshSession *session) : session(session) {
100  SESSION_LOG(session);
101  };
102  static const char * Name() {
103  return "EvTcpClose";
104  }
105  bool validate(SandeshStateMachine *state_machine) const {
106  return ((state_machine->connection()->session() == session) ||
107  (state_machine->session() == session));
108  }
110 };
111 
112 // Used to defer the session delete after all events currently on the queue.
113 struct EvTcpDeleteSession : sc::event<EvTcpDeleteSession> {
114  EvTcpDeleteSession(SandeshSession *session) : session(session) {
115  SESSION_LOG(session);
116  }
117  static const char *Name() {
118  return "EvTcpDeleteSession";
119  }
121 };
122 
123 struct EvSandeshMessageRecv : sc::event<EvSandeshMessageRecv> {
125  msg(msg) {
126  };
127  static const char * Name() {
128  return "EvSandeshMessageRecv";
129  }
130  boost::shared_ptr<const SandeshMessage> msg;
131 };
132 
133 struct EvSandeshCtrlMessageRecv : sc::event<EvSandeshCtrlMessageRecv> {
134  EvSandeshCtrlMessageRecv(const std::string &msg,
135  const SandeshHeader& header,
136  const std::string &msg_type, const uint32_t &header_offset) :
137  msg(msg), header(header), msg_type(msg_type),
138  header_offset(header_offset) {
139  };
140  static const char * Name() {
141  return "EvSandeshCtrlMessageRecv";
142  }
143  const std::string msg;
144  const SandeshHeader header;
145  const std::string msg_type;
146  const uint32_t header_offset;
147 };
148 
149 struct EvResourceUpdate : sc::event<EvResourceUpdate> {
150  EvResourceUpdate(bool rsc) :
151  rsc(rsc) {
152  };
153  static const char * Name() {
154  return "EvResourceUpdate";
155  }
156  bool rsc;
157 };
158 
159 // states
160 struct Idle;
161 struct Active;
162 struct Established;
163 struct ServerInit;
164 
165 template <class Ev>
167  typedef sc::transition<Ev, Idle, SandeshStateMachine,
168  &SandeshStateMachine::OnIdle<Ev> > reaction;
169 };
170 
171 template <class Ev>
173  typedef sc::in_state_reaction<Ev, SandeshStateMachine,
174  &SandeshStateMachine::ReleaseSandesh<Ev> > reaction;
175 };
176 
177 template <class Ev>
179  typedef sc::in_state_reaction<Ev, SandeshStateMachine,
180  &SandeshStateMachine::DeleteTcpSession<Ev> > reaction;
181 };
182 
183 template <class Ev>
185  typedef sc::in_state_reaction<Ev, SandeshStateMachine,
186  &SandeshStateMachine::ProcessMessage<Ev> > reaction;
187 };
188 
189 struct Idle : public sc::state<Idle, SandeshStateMachine> {
190  typedef mpl::list<
191  sc::custom_reaction<EvStart>,
192  sc::custom_reaction<EvStop>,
193  sc::custom_reaction<EvTcpPassiveOpen>,
194  sc::custom_reaction<EvIdleHoldTimerExpired>,
197 
198  Idle(my_context ctx) : my_base(ctx) {
199  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
200  state_machine->set_state(ssm::IDLE);
201  SM_LOG(DEBUG, state_machine->StateName());
202  }
203 
204  ~Idle() {
205  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
206  state_machine->CancelIdleHoldTimer();
207  }
208 
209  sc::result react(const EvStart &event) {
210  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
211  if (state_machine->idle_hold_time()) {
212  state_machine->StartIdleHoldTimer();
213  } else {
214  return transit<Active>();
215  }
216  return discard_event();
217  }
218 
219  sc::result react(const EvStop &event) {
220  SandeshStateMachine *state_mahine = &context<SandeshStateMachine>();
221  state_mahine->CancelIdleHoldTimer();
222  return discard_event();
223  }
224 
225  sc::result react(const EvIdleHoldTimerExpired &event) {
226  return transit<Active>();
227  }
228 
229  // Close the session and ignore event
230  sc::result react(const EvTcpPassiveOpen &event) {
231  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
232  SandeshSession *session = event.session;
233  state_machine->DeleteSession(session);
234  return discard_event();
235  }
236 };
237 
238 struct Active : public sc::state<Active, SandeshStateMachine> {
239  typedef mpl::list<
241  sc::custom_reaction<EvTcpPassiveOpen>,
242  sc::custom_reaction<EvTcpClose>,
245 
246  Active(my_context ctx) : my_base(ctx) {
247  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
248  state_machine->set_state(ssm::ACTIVE);
249  SM_LOG(DEBUG, state_machine->StateName());
250  }
251 
253  }
254 
255  sc::result react(const EvTcpPassiveOpen &event) {
256  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
257  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
258  state_machine->set_session(event.session);
259  event.session->set_observer(
261  state_machine, _1, _2));
262  return transit<ServerInit>();
263  }
264 
265  sc::result react(const EvTcpClose &event) {
266  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
267  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
268  state_machine->set_session(NULL);
269  return transit<Idle>();
270  }
271 };
272 
273 struct ServerInit : public sc::state<ServerInit, SandeshStateMachine> {
274  typedef mpl::list<
276  sc::custom_reaction<EvTcpClose>,
277  sc::custom_reaction<EvSandeshCtrlMessageRecv>,
278  sc::custom_reaction<EvSandeshMessageRecv>,
281 
282  ServerInit(my_context ctx) : my_base(ctx) {
283  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
284  state_machine->set_state(ssm::SERVER_INIT);
285  SM_LOG(DEBUG, state_machine->StateName());
286  }
287 
289  }
290 
291  sc::result react(const EvTcpClose &event) {
292  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
293  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
294  state_machine->set_session(NULL);
295  return transit<Idle>();
296  }
297 
298  sc::result react(const EvSandeshCtrlMessageRecv &event) {
299  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
300  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
301  SandeshConnection *connection = state_machine->connection();
302  if (!connection->ProcessSandeshCtrlMessage(event.msg, event.header,
303  event.msg_type, event.header_offset)) {
304  state_machine->set_session(NULL);
305  return transit<Idle>();
306  }
307  return transit<Established>();
308  }
309 
310  sc::result react(const EvSandeshMessageRecv &event) {
311  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
312  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
313  SandeshConnection *connection = state_machine->connection();
314  if (!connection->ProcessSandeshMessage(event.msg.get(), true)) {
315  state_machine->set_session(NULL);
316  return transit<Idle>();
317  }
318  return discard_event();
319  }
320 };
321 
322 struct Established : public sc::state<Established, SandeshStateMachine> {
323  typedef mpl::list<
325  sc::custom_reaction<EvTcpClose>,
326  sc::custom_reaction<EvSandeshMessageRecv>,
328  sc::custom_reaction<EvResourceUpdate>
330 
331  Established(my_context ctx) : my_base(ctx) {
332  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
333  state_machine->set_state(ssm::ESTABLISHED);
334  state_machine->set_resource(true);
335  SM_LOG(DEBUG, state_machine->StateName());
336  }
337 
339  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
340  state_machine->set_resource(false);
341  }
342 
343  sc::result react(const EvTcpClose &event) {
344  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
345  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
346  // Process disconnect
347  SandeshConnection *connection = state_machine->connection();
348  connection->ProcessDisconnect(state_machine->session());
349  // Reset the session
350  state_machine->set_session(NULL);
351  return transit<Idle>();
352  }
353 
354  sc::result react(const EvResourceUpdate &event) {
355  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
356  SandeshConnection *connection = state_machine->connection();
357 
358  state_machine->set_resource(event.rsc);
359 
360  if (!connection->ProcessResourceUpdate(event.rsc)) {
361  state_machine->set_session(NULL);
362  return transit<Idle>();
363  }
364  return discard_event();
365  }
366 
367  sc::result react(const EvSandeshMessageRecv &event) {
368  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
369  SandeshConnection *connection = state_machine->connection();
370  if (!connection->ProcessSandeshMessage(event.msg.get(),
371  state_machine->get_resource())) {
372  state_machine->set_session(NULL);
373  return transit<Idle>();
374  }
375  return discard_event();
376  }
377 };
378 
379 } // namespace ssm
380 
382  : prefix_(prefix),
383  work_queue_(connection->GetTaskId(),
384  connection->GetTaskInstance(),
385  boost::bind(&SandeshStateMachine::DequeueEvent, this, _1),
386  kQueueSize),
387  connection_(connection),
388  session_(),
389  idle_hold_timer_(TimerManager::CreateTimer(
390  *connection->server()->event_manager()->io_service(),
391  "Idle hold timer",
392  connection->GetTaskId(),
393  connection->GetTaskInstance())),
394  idle_hold_time_(0),
395  deleted_(false),
396  resource_(false),
397  builder_(SandeshMessageBuilder::GetInstance(SandeshMessageBuilder::XML)),
398  message_drop_level_(SandeshLevel::INVALID) {
399  state_ = ssm::IDLE;
400  initiate();
401 }
402 
404  assert(!deleted_);
405  deleted_ = true;
406 
410 
411  assert(session() == NULL);
412 
413  //
414  // Explicitly call the state destructor before the state machine itself.
415  // This is needed because some of the destructors access the state machine
416  // context.
417  //
418  terminate();
419 
420  //
421  // Delete timer after state machine is terminated so that there is no
422  // possible reference to the timers being deleted any more
423  //
425 }
426 
429 }
430 
432  Enqueue(ssm::EvStop());
433 }
434 
436  if (down) {
437  Enqueue(ssm::EvStop());
438  } else {
440  // On fresh restart of state machine, all previous state should be reset
441  reset_last_info();
443  }
444 }
445 
446 // Note this api does not enqueue the deletion of TCP session
448  if (session_ != NULL) {
449  session_->set_observer(NULL);
450  session_->SetReceiveMsgCb(NULL);
451  session_->SetConnection(NULL);
452  session_->Close();
453  session_->Shutdown();
454  connection_->set_session(NULL);
455  session_ = NULL;
456  }
457 }
458 
460  if (session_ != NULL) {
462  }
463  connection_->set_session(session);
464  session_ = session;
465 }
466 
468  session->set_observer(NULL);
469  session->SetReceiveMsgCb(NULL);
470  session->SetConnection(NULL);
471  session->Close();
472  session->Shutdown();
474 }
475 
477  return session_;
478 }
479 
480 template <class Ev>
481 void SandeshStateMachine::OnIdle(const Ev &event) {
482  // Release all resources
484 
486 
487  set_session(NULL);
488 }
489 
490 template <class Ev>
492  event.session->server()->DeleteSession(event.session);
494  if (connection) {
495  connection->ManagedDelete();
496  }
497 }
498 
500  if (idle_hold_time_ <= 0)
501  return;
502 
504  boost::bind(&SandeshStateMachine::IdleHoldTimerExpired, this),
505  boost::bind(&SandeshStateMachine::TimerErrorHandler, this, _1,
506  _2));
507 }
508 
511 }
512 
514  return idle_hold_timer_->running();
515 }
516 
517 //
518 // Test Only API : Start
519 //
522 }
523 //
524 // Test Only API : End
525 //
526 
527 void SandeshStateMachine::TimerErrorHandler(std::string name, std::string error) {
528  SM_LOG(ERROR, name + " error: " + error);
529 }
530 
531 bool SandeshStateMachine::GetQueueCount(uint64_t &queue_count) const {
532  if (deleted_ || generator_key_.empty()) {
533  return false;
534  }
535  queue_count = work_queue_.Length();
536  return true;
537 }
538 
540  std::string &drop_level) const {
541  if (deleted_ || generator_key_.empty()) {
542  return false;
543  }
545  return true;
546 }
547 
549  SandeshStateMachineStats *sm_stats) {
550  std::vector<SandeshStateMachineEvStats> ev_stats;
551  tbb::mutex::scoped_lock elock(smutex_);
552  // State machine event statistics
553  event_stats_.Get(&ev_stats);
554  elock.release();
555  sm_stats->set_ev_stats(ev_stats);
556  sm_stats->set_state(StateName());
557  sm_stats->set_last_state(LastStateName());
558  sm_stats->set_last_event(last_event());
559  sm_stats->set_state_since(state_since_);
560  sm_stats->set_last_event_at(last_event_at_);
561 }
562 
564  SandeshGeneratorStats *detail_msg_stats) {
565  // Detail message statistics
566  SandeshMessageStatistics::DetailStatsList v_detail_type_stats;
567  SandeshMessageStats detail_agg_stats;
568  tbb::mutex::scoped_lock mlock(smutex_);
569  message_stats_.Get(&v_detail_type_stats, &detail_agg_stats);
570  mlock.release();
571  detail_msg_stats->set_type_stats(v_detail_type_stats);
572  detail_msg_stats->set_aggregate_stats(detail_agg_stats);
573 }
574 
576  SandeshGeneratorBasicStats *basic_msg_stats) {
577  // Basic message statistics
578  SandeshMessageStatistics::BasicStatsList v_basic_type_stats;
579  SandeshMessageBasicStats basic_agg_stats;
580  tbb::mutex::scoped_lock mlock(smutex_);
581  message_stats_.Get(&v_basic_type_stats, &basic_agg_stats);
582  mlock.release();
583  basic_msg_stats->set_type_stats(v_basic_type_stats);
584  basic_msg_stats->set_aggregate_stats(basic_agg_stats);
585 }
586 
588  return !deleted_ && !generator_key_.empty();
589 }
590 
592  SandeshStateMachineStats *sm_stats,
593  SandeshGeneratorStats *detail_msg_stats) {
594  if (!IsValid()) {
595  return false;
596  }
597  // State machine event statistics
598  GetEventStatistics(sm_stats);
599  // Detail message statistics
600  GetDetailMessageStatistics(detail_msg_stats);
601  return true;
602 }
603 
604 bool SandeshStateMachine::GetStatistics(SandeshStateMachineStats &sm_stats,
605  SandeshGeneratorStats &detail_msg_stats) {
606  return GetDetailStatistics(&sm_stats, &detail_msg_stats);
607 }
608 
610  SandeshStateMachineStats *sm_stats,
611  SandeshGeneratorBasicStats *basic_msg_stats) {
612  if (!IsValid()) {
613  return false;
614  }
615  // State machine event statistics
616  GetEventStatistics(sm_stats);
617  // Basic message statistics
618  GetBasicMessageStatistics(basic_msg_stats);
619  return true;
620 }
621 
622 bool SandeshStateMachine::GetStatistics(SandeshStateMachineStats &sm_stats,
623  SandeshGeneratorBasicStats &basic_msg_stats) {
624  return GetBasicStatistics(&sm_stats, &basic_msg_stats);
625 }
626 
629  return false;
630 }
631 
633  TcpSession *session, TcpSession::Event event) {
634  SandeshSession *sandesh_session = dynamic_cast<SandeshSession *>(session);
635  assert((session != NULL) == (sandesh_session != NULL));
636  std::string session_s = session ? session->ToString() : "*";
637  switch (event) {
638  case TcpSession::CLOSE:
639  SM_LOG(DEBUG, session_s << " " << __func__ <<
640  " " << "TCP Connection Closed");
641  Enqueue(ssm::EvTcpClose(sandesh_session));
642  break;
643  case TcpSession::ACCEPT:
644  break;
645  default:
646  SM_LOG(DEBUG, session_s << " " << "Unknown event: " <<
647  event);
648  break;
649  }
650 }
651 
654  this, _1, _2));
655  SM_LOG(DEBUG, session->ToString() << " " << "PassiveOpen");
656  Enqueue(ssm::EvTcpPassiveOpen(session));
657 }
658 
659 void SandeshStateMachine::UpdateRxMsgStats(const std::string &msg_name,
660  size_t msg_size) {
661  tbb::mutex::scoped_lock lock(smutex_);
662  message_stats_.UpdateRecv(msg_name, msg_size);
663 }
664 
665 void SandeshStateMachine::UpdateRxMsgFailStats(const std::string &msg_name,
666  size_t msg_size, SandeshRxDropReason::type dreason) {
667  tbb::mutex::scoped_lock lock(smutex_);
668  message_stats_.UpdateRecvFailed(msg_name, msg_size, dreason);
669 }
670 
672  const std::string &msg) {
673  // Demux based on Sandesh messkage type
674  SandeshMessage *xmessage = builder_->Create(
675  reinterpret_cast<const uint8_t *>(msg.c_str()), msg.size());
676  if (xmessage == NULL) {
677  // Update message statistics
678  UpdateRxMsgFailStats(std::string(), msg.size(),
679  SandeshRxDropReason::DecodingFailed);
680  return false;
681  }
682  const SandeshHeader &header(xmessage->GetHeader());
683  const std::string &message_type(xmessage->GetMessageType());
684  // Drop ?
686  // Update message statistics
687  UpdateRxMsgFailStats(message_type, msg.size(),
688  SandeshRxDropReason::QueueLevel);
689  delete xmessage;
690  return true;
691  }
692  if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
693  SandeshHeader ctrl_header;
694  std::string ctrl_message_type;
695  uint32_t ctrl_xml_offset = 0;
696  // Extract the header and message type
697  int ret = SandeshReader::ExtractMsgHeader(msg, ctrl_header,
698  ctrl_message_type, ctrl_xml_offset);
699  if (ret) {
700  SM_LOG(ERROR, "OnMessage control in state: " << StateName() <<
701  " session " << session->ToString() << ": Extract FAILED ("
702  << ret << ")");
703  // Update message statistics
704  UpdateRxMsgFailStats(message_type, msg.size(),
705  SandeshRxDropReason::ControlMsgFailed);
706  delete xmessage;
707  return false;
708  }
709  if (header != ctrl_header ||
710  message_type != ctrl_message_type) {
711  SM_LOG(ERROR, "OnMessage control in state: " << StateName() <<
712  " session " << session->ToString() << ": Header or message "
713  << "type (" << ctrl_message_type << ") MISMATCH");
714  // Update message statistics
715  UpdateRxMsgFailStats(message_type, msg.size(),
716  SandeshRxDropReason::ControlMsgFailed);
717  delete xmessage;
718  return false;
719  }
720  SM_LOG(DEBUG, "OnMessage control in state: " << StateName() <<
721  " session " << session->ToString());
722  // Update message statistics
723  UpdateRxMsgStats(message_type, msg.size());
724  Enqueue(ssm::EvSandeshCtrlMessageRecv(msg, ctrl_header,
725  ctrl_message_type, ctrl_xml_offset));
726  delete xmessage;
727  } else {
728  // Update message statistics
729  UpdateRxMsgStats(message_type, msg.size());
731  }
732  return true;
733 }
734 
737 }
738 
739 static const std::string state_names[] = {
740  "Idle",
741  "Active",
742  "Established",
743  "ServerInit"
744 };
745 
746 const string &SandeshStateMachine::StateName() const {
747  return state_names[state_];
748 }
749 
750 const string &SandeshStateMachine::LastStateName() const {
751  return state_names[last_state_];
752 }
753 
754 bool SandeshStateMachine::LogEvent(const sc::event_base *event) {
755  if (state_ == ssm::ESTABLISHED) {
756  const ssm::EvSandeshMessageRecv *snh_rcv =
757  dynamic_cast<const ssm::EvSandeshMessageRecv *>(event);
758  if (snh_rcv != NULL) {
759  return false;
760  }
761  }
762  return true;
763 }
764 
765 void SandeshStateMachine::UpdateEventEnqueue(const sc::event_base &event) {
766  UpdateEventStats(event, true, false);
767 }
768 
769 void SandeshStateMachine::UpdateEventDequeue(const sc::event_base &event) {
770  UpdateEventStats(event, false, false);
771 }
772 
773 void SandeshStateMachine::UpdateEventEnqueueFail(const sc::event_base &event) {
774  UpdateEventStats(event, true, true);
775 }
776 
777 void SandeshStateMachine::UpdateEventDequeueFail(const sc::event_base &event) {
778  UpdateEventStats(event, false, true);
779 }
780 
781 void SandeshStateMachine::UpdateEventStats(const sc::event_base &event,
782  bool enqueue, bool fail) {
783  if (!deleted_) {
784  std::string event_name(TYPE_NAME(event));
785  tbb::mutex::scoped_lock lock(smutex_);
786  event_stats_.Update(event_name, enqueue, fail);
787  }
788 }
789 
791  if (deleted_) {
792  return true;
793  }
795  if (ec.validate.empty() || ec.validate(this)) {
796  // Log only relevant events and states
797  if (LogEvent(ec.event.get())) {
798  SM_LOG(DEBUG, "Processing " << TYPE_NAME(*ec.event) << " in state "
799  << StateName() << " Key " << generator_key());
800  }
801  // Update event stats
803  process_event(*ec.event);
804  } else {
805  SM_LOG(DEBUG, "Discarding " << TYPE_NAME(*ec.event) << " in state "
806  << StateName() << " Key " << generator_key());
807  // Update event stats
809  }
810  ec.event.reset();
811  return true;
812 }
813 
814 void SandeshStateMachine::unconsumed_event(const sc::event_base &event) {
815  SM_LOG(DEBUG, "Unconsumed " << TYPE_NAME(event) << " in state "
816  << StateName());
817 }
818 
819 // This class determines whether a given class has a method called 'validate'
820 template<typename Ev>
821 struct HasValidate
822 {
823  template<typename T, bool (T::*)(SandeshStateMachine *) const> struct SFINAE {};
824  template<typename T> static char Test(SFINAE<T, &T::validate>*);
825  template<typename T> static int Test(...);
826  static const bool Has = sizeof(Test<Ev>(0)) == sizeof(char);
827 };
828 
829 template <typename Ev, bool has_validate>
830 struct ValidateFn {
831  EvValidate operator()(const Ev *event) { return NULL; }
832 };
833 
834 template <typename Ev>
835 struct ValidateFn<Ev, true> {
836  EvValidate operator()(const Ev *event) {
837  return boost::bind(&Ev::validate, event, _1);
838  }
839 };
840 
841 template <typename Ev>
842 void SandeshStateMachine::Enqueue(const Ev &event) {
843  if (deleted_) {
844  return;
845  }
846  EventContainer ec;
847  ec.event = event.intrusive_from_this();
848  ec.validate = ValidateFn<Ev, HasValidate<Ev>::Has>()(static_cast<const Ev *>(ec.event.get()));
849  if (!work_queue_.Enqueue(ec)) {
850  // Update event stats
851  // XXX Disable till we implement bounded work queues
852  //UpdateEventEnqueueFail(event);
853  //return;
854  }
855  // Update event stats
856  UpdateEventEnqueue(event);
857  return;
858 }
859 
861  SandeshLevel::type level, boost::function<void (void)> cb) {
862  if (message_drop_level_ != level) {
863  SM_LOG(INFO, "SANDESH MESSAGE DROP LEVEL: [" <<
865  Sandesh::LevelToString(level) << "], SM QUEUE COUNT: " <<
866  queue_count);
867  message_drop_level_ = level;
868  }
869  // Always invoke the callback
870  if (!cb.empty()) {
871  cb();
872  }
873 }
874 
876  if (session_ != NULL) {
877  SM_LOG(INFO, "SANDESH Session Reader Defer : " << defer_reader);
878  session_->SetDeferReader(defer_reader);
879  }
880 }
881 
884  bool high(boost::get<2>(wm));
885  size_t queue_count(boost::get<0>(wm));
886  bool defer_undefer(boost::get<3>(wm));
887  boost::function<void (void)> cb;
888  if (high) {
889  if (defer_undefer) {
891  this, true);
892  }
893  WaterMarkInfo wmi(queue_count,
895  this, _1, boost::get<1>(wm), cb));
897  } else {
898  if (defer_undefer) {
900  this, false);
901  }
902  WaterMarkInfo wmi(queue_count,
904  this, _1, boost::get<1>(wm), cb));
906  }
907 }
908 
912 }
913 
915  SandeshStateMachine::EventContainer *ec, size_t *msg_size) {
916  const ssm::EvSandeshMessageRecv *snh_rcv(
917  dynamic_cast<const ssm::EvSandeshMessageRecv *>(ec->event.get()));
918  if (snh_rcv == NULL) {
919  return false;
920  }
921  const SandeshMessage *msg(snh_rcv->msg.get());
922  *msg_size = msg->GetSize();
923  return true;
924 }
925 
926 template<>
929  size_t msg_size;
930  if (GetEvSandeshMessageRecvSize(ec, &msg_size)) {
931  return count_.fetch_and_add(msg_size) + msg_size;
932  } else {
933  return count_.fetch_and_increment() + 1;
934  }
935 }
936 
937 template<>
940  size_t msg_size;
941  if (GetEvSandeshMessageRecvSize(ec, &msg_size)) {
942  return count_.fetch_and_add((size_t)(0-msg_size)) - msg_size;
943  } else {
944  return count_.fetch_and_decrement() - 1;
945  }
946 }
947 
948 void SandeshStateMachine::SetDeferDequeue(bool defer_dequeue) {
949  SM_LOG(INFO, "SANDESH Set Defer Dequeue: " << defer_dequeue);
950  work_queue_.set_disable(defer_dequeue);
951 }
virtual SandeshMessage * Create(const uint8_t *data, size_t size) const =0
const std::string & generator_key() const
const std::string & StateName() const
sc::result react(const EvTcpClose &event)
EvSandeshCtrlMessageRecv(const std::string &msg, const SandeshHeader &header, const std::string &msg_type, const uint32_t &header_offset)
void Close()
Definition: tcp_session.cc:354
void SetSandeshMessageDropLevel(size_t queue_count, SandeshLevel::type level, boost::function< void(void)> cb)
void DeleteSession(SandeshSession *session)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshCtrlMessageRecv >, sc::custom_reaction< EvSandeshMessageRecv >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
static const char * Name()
const std::string last_event() const
bool GetEvSandeshMessageRecvSize(SandeshStateMachine::EventContainer *ec, size_t *msg_size)
EvTcpClose(SandeshSession *session)
EvValidate operator()(const Ev *event)
void UpdateEventEnqueueFail(const sc::event_base &event)
void GetBasicMessageStatistics(SandeshGeneratorBasicStats *basic_msg_stats)
EvTcpPassiveOpen(SandeshSession *session)
sc::result react(const EvSandeshMessageRecv &event)
mpl::list< sc::custom_reaction< EvStart >, sc::custom_reaction< EvStop >, sc::custom_reaction< EvTcpPassiveOpen >, sc::custom_reaction< EvIdleHoldTimerExpired >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
void set_state(ssm::SsmState state)
sc::result react(const EvStop &event)
sc::result react(const EvSandeshCtrlMessageRecv &event)
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
SandeshMessageBuilder * builder_
bool GetBasicStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorBasicStats *basic_msg_stats)
void UpdateEventDequeueFail(const sc::event_base &event)
void unconsumed_event(const sc::event_base &event)
void GetDetailMessageStatistics(SandeshGeneratorStats *detail_msg_stats)
const size_t GetSize() const
sc::transition< Ev, Idle, SandeshStateMachine,&SandeshStateMachine::OnIdle< Ev > > reaction
void Fire()
Definition: timer.h:118
void SetHighWaterMark(const WaterMarkInfos &high_water)
Definition: queue_task.h:208
SandeshSession * session() const
EvValidate operator()(const Ev *event)
virtual std::string ToString() const
Definition: tcp_session.h:83
void set_idle_hold_time(int idle_hold_time)
std::vector< SandeshMessageTypeBasicStats > BasicStatsList
void DeleteTcpSession(const Ev &event)
static const char * Name()
bool DequeueEvent(EventContainer ec)
void TimerErrorHandler(std::string name, std::string error)
sc::result react(const EvSandeshMessageRecv &event)
boost::shared_ptr< const SandeshMessage > msg
virtual void SetDeferReader(bool defer_reader)
Definition: tcp_session.cc:182
virtual void Shutdown()
std::vector< SandeshMessageTypeStats > DetailStatsList
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
SandeshMessageStatistics message_stats_
void ResetLowWaterMark()
Definition: queue_task.h:238
static const char * Name()
bool GetQueueCount(uint64_t &queue_count) const
boost::function< bool(StateMachine *)> EvValidate
Definition: state_machine.h:33
Active(my_context ctx)
void UpdateRecv(const std::string &msg_name, uint64_t bytes)
SandeshSession * session()
static const char * Name()
void set_last_event(const std::string &event)
virtual void ProcessDisconnect(SandeshSession *sess)=0
void set_observer(EventObserver observer)
Definition: tcp_session.cc:218
static const bool Has
void GetEventStatistics(SandeshStateMachineStats *sm_stats)
sc::in_state_reaction< Ev, SandeshStateMachine,&SandeshStateMachine::DeleteTcpSession< Ev > > reaction
virtual void ManagedDelete()=0
void set_session(SandeshSession *session)
uint8_t type
Definition: load_balance.h:109
bool GetStatistics(SandeshStateMachineStats &sm_stats, SandeshGeneratorBasicStats &basic_msg_stats)
static const char * LevelToString(SandeshLevel::type level)
Definition: sandesh.cc:852
size_t Length() const
Definition: queue_task.h:356
bool validate(SandeshStateMachine *state_machine) const
static const char * Name()
bool GetMessageDropLevel(std::string &drop_level) const
static const int kIdleHoldTime
sc::result react(const EvResourceUpdate &event)
const std::string & LastStateName() const
bool OnSandeshMessage(SandeshSession *session, const std::string &msg)
static const char * Name()
virtual std::string ToString() const
SandeshSession * session
void set_session(SandeshSession *session)
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
void SetQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, DeleteTcpSession< EvTcpDeleteSession >::reaction, sc::custom_reaction< EvResourceUpdate > > reactions
EvSandeshMessageRecv(const SandeshMessage *msg)
void SetConnection(SandeshConnection *connection)
bool DoDropSandeshMessage(const SandeshHeader &header, const SandeshLevel::type drop_level)
Definition: sandesh.cc:927
sc::result react(const EvTcpClose &event)
sc::result react(const EvStart &event)
void UpdateRxMsgStats(const std::string &msg_name, size_t msg_size)
void UpdateRxMsgFailStats(const std::string &msg_name, size_t msg_size, SandeshRxDropReason::type dreason)
SandeshConnection * connection_
void UpdateRecvFailed(const std::string &msg_name, uint64_t bytes, SandeshRxDropReason::type dreason)
void SetDeferDequeue(bool defer)
bool Cancel()
Definition: timer.cc:150
void set_disable(bool disabled)
Definition: queue_task.h:319
void Enqueue(const Ev &event)
EvTcpDeleteSession(SandeshSession *session)
#define TYPE_NAME(_type)
Definition: logging.h:31
void PassiveOpen(SandeshSession *session)
boost::intrusive_ptr< const sc::event_base > event
size_t AtomicIncrementQueueCount(EventContainer *entry)
Definition: queue_task.h:431
static char Test(SFINAE< T,&T::validate > *)
ServerInit(my_context ctx)
void UpdateEventEnqueue(const sc::event_base &event)
Established(my_context ctx)
size_t AtomicDecrementQueueCount(EventContainer *entry)
Definition: queue_task.h:435
sc::result react(const EvIdleHoldTimerExpired &event)
void SetLowWaterMark(const WaterMarkInfos &low_water)
Definition: queue_task.h:228
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
SandeshEventStatistics event_stats_
void Get(DetailStatsList *v_detail_type_stats, SandeshMessageStats *detail_agg_stats) const
bool GetDetailStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorStats *detail_msg_stats)
void UpdateEventDequeue(const sc::event_base &event)
void ResetHighWaterMark()
Definition: queue_task.h:218
sc::result react(const EvTcpPassiveOpen &event)
Idle(my_context ctx)
SandeshStateMachine(const char *prefix, SandeshConnection *connection)
SandeshLevel::type message_drop_level_
bool running() const
Definition: timer.h:86
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
#define SM_LOG(_Level, _Msg)
void SetDeferSessionReader(bool defer_reader)
#define SESSION_LOG(session)
sc::result react(const EvTcpClose &event)
Definition: timer.h:54
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpPassiveOpen >, sc::custom_reaction< EvTcpClose >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::in_state_reaction< Ev, SandeshStateMachine,&SandeshStateMachine::ProcessMessage< Ev > > reaction
void Get(std::vector< SandeshStateMachineEvStats > *ev_stats) const
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
Definition: p/sandesh.h:147
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
static const std::string state_names[]
void OnIdle(const Ev &event)
const std::string & GetMessageType() const
const SandeshHeader & GetHeader() const
void Update(std::string &event_name, bool enqueue, bool fail)
sc::in_state_reaction< Ev, SandeshStateMachine,&SandeshStateMachine::ReleaseSandesh< Ev > > reaction
sc::result react(const EvTcpPassiveOpen &event)
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
bool LogEvent(const sc::event_base *event)
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
tbb::atomic< size_t > count_
Definition: queue_task.h:514
tbb::atomic< ssm::SsmState > state_
SandeshConnection * connection()