OpenSDN source code
sandesh_state_machine.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_state_machine.cc
7 //
8 // Sandesh State Machine Implementation
9 //
10 
11 #include <typeinfo>
12 #include <boost/bind/bind.hpp>
13 #include <boost/date_time/posix_time/posix_time.hpp>
14 #include <boost/statechart/custom_reaction.hpp>
15 #include <boost/statechart/event.hpp>
16 #include <boost/statechart/simple_state.hpp>
17 #include <boost/statechart/state.hpp>
18 #include <boost/statechart/state_machine.hpp>
19 #include <boost/statechart/transition.hpp>
20 #include <boost/statechart/in_state_reaction.hpp>
21 
22 #include <base/logging.h>
23 #include <base/task_annotations.h>
24 #include <io/event_manager.h>
25 #include <io/tcp_session.h>
26 #include <io/tcp_server.h>
27 
28 #include <sandesh/sandesh_constants.h>
29 #include <sandesh/sandesh_types.h>
30 #include <sandesh/sandesh.h>
31 #include <sandesh/sandesh_session.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/sandesh_message_builder.h>
34 #include "sandesh_statistics.h"
35 #include "sandesh_connection.h"
36 #include "sandesh_state_machine.h"
37 
38 using namespace std;
39 using boost::system::error_code;
40 using namespace boost::placeholders;
41 
42 namespace mpl = boost::mpl;
43 namespace sc = boost::statechart;
44 
45 #define SM_LOG(_Level, _Msg) \
46  do { \
47  if (LoggingDisabled()) break; \
48  log4cplus::Logger _Xlogger = Sandesh::logger(); \
49  if (_Xlogger.isEnabledFor(log4cplus::_Level##_LOG_LEVEL)) { \
50  log4cplus::tostringstream _Xbuf; \
51  SandeshStateMachine *_sm = &context<SandeshStateMachine>(); \
52  _Xbuf << _sm->prefix() << _Msg; \
53  _Xlogger.forcedLog(log4cplus::_Level##_LOG_LEVEL, \
54  _Xbuf.str()); \
55  } \
56  } while (false)
57 
58 #define SESSION_LOG(session) \
59  SANDESH_LOG(DEBUG, ((session) ? (session)->ToString() : "*") << ":" << Name())
60 
61 
62 namespace ssm {
63 
64 // events
65 struct EvStart : sc::event<EvStart> {
66  static const char * Name() {
67  return "EvStart";
68  }
69 };
70 
71 struct EvStop : sc::event<EvStop> {
72  static const char * Name() {
73  return "EvStop";
74  }
75 };
76 
77 struct EvIdleHoldTimerExpired : sc::event<EvIdleHoldTimerExpired> {
78  EvIdleHoldTimerExpired(Timer *timer) : timer_(timer) {
79  }
80  static const char * Name() {
81  return "EvIdleHoldTimerExpired";
82  }
83  bool validate() const {
84  return !timer_->cancelled();
85  }
87 };
88 
89 struct EvTcpPassiveOpen : sc::event<EvTcpPassiveOpen> {
90  EvTcpPassiveOpen(SandeshSession *session) : session(session) {
91  SESSION_LOG(session);
92  };
93  static const char * Name() {
94  return "EvTcpPassiveOpen";
95  }
97 };
98 
99 struct EvTcpClose : sc::event<EvTcpClose> {
100  EvTcpClose(SandeshSession *session) : session(session) {
101  SESSION_LOG(session);
102  };
103  static const char * Name() {
104  return "EvTcpClose";
105  }
106  bool validate(SandeshStateMachine *state_machine) const {
107  return ((state_machine->connection()->session() == session) ||
108  (state_machine->session() == session));
109  }
111 };
112 
113 // Used to defer the session delete after all events currently on the queue.
114 struct EvTcpDeleteSession : sc::event<EvTcpDeleteSession> {
115  EvTcpDeleteSession(SandeshSession *session) : session(session) {
116  SESSION_LOG(session);
117  }
118  static const char *Name() {
119  return "EvTcpDeleteSession";
120  }
122 };
123 
124 struct EvSandeshMessageRecv : sc::event<EvSandeshMessageRecv> {
126  msg(msg) {
127  };
128  static const char * Name() {
129  return "EvSandeshMessageRecv";
130  }
131  boost::shared_ptr<const SandeshMessage> msg;
132 };
133 
134 struct EvSandeshCtrlMessageRecv : sc::event<EvSandeshCtrlMessageRecv> {
135  EvSandeshCtrlMessageRecv(const std::string &msg,
136  const SandeshHeader& header,
137  const std::string &msg_type, const uint32_t &header_offset) :
138  msg(msg), header(header), msg_type(msg_type),
139  header_offset(header_offset) {
140  };
141  static const char * Name() {
142  return "EvSandeshCtrlMessageRecv";
143  }
144  const std::string msg;
145  const SandeshHeader header;
146  const std::string msg_type;
147  const uint32_t header_offset;
148 };
149 
150 struct EvResourceUpdate : sc::event<EvResourceUpdate> {
151  EvResourceUpdate(bool rsc) :
152  rsc(rsc) {
153  };
154  static const char * Name() {
155  return "EvResourceUpdate";
156  }
157  bool rsc;
158 };
159 
160 // states
161 struct Idle;
162 struct Active;
163 struct Established;
164 struct ServerInit;
165 
166 template <class Ev>
168  typedef sc::transition<Ev, Idle, SandeshStateMachine,
169  &SandeshStateMachine::OnIdle<Ev> > reaction;
170 };
171 
172 template <class Ev>
174  typedef sc::in_state_reaction<Ev, SandeshStateMachine,
175  &SandeshStateMachine::ReleaseSandesh<Ev> > reaction;
176 };
177 
178 template <class Ev>
180  typedef sc::in_state_reaction<Ev, SandeshStateMachine,
181  &SandeshStateMachine::DeleteTcpSession<Ev> > reaction;
182 };
183 
184 template <class Ev>
186  typedef sc::in_state_reaction<Ev, SandeshStateMachine,
187  &SandeshStateMachine::ProcessMessage<Ev> > reaction;
188 };
189 
190 struct Idle : public sc::state<Idle, SandeshStateMachine> {
191  typedef mpl::list<
192  sc::custom_reaction<EvStart>,
193  sc::custom_reaction<EvStop>,
194  sc::custom_reaction<EvTcpPassiveOpen>,
195  sc::custom_reaction<EvIdleHoldTimerExpired>,
198 
199  Idle(my_context ctx) : my_base(ctx) {
200  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
201  state_machine->set_state(ssm::IDLE);
202  SM_LOG(DEBUG, state_machine->StateName());
203  }
204 
205  ~Idle() {
206  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
207  state_machine->CancelIdleHoldTimer();
208  }
209 
210  sc::result react(const EvStart &event) {
211  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
212  if (state_machine->idle_hold_time()) {
213  state_machine->StartIdleHoldTimer();
214  } else {
215  return transit<Active>();
216  }
217  return discard_event();
218  }
219 
220  sc::result react(const EvStop &event) {
221  SandeshStateMachine *state_mahine = &context<SandeshStateMachine>();
222  state_mahine->CancelIdleHoldTimer();
223  return discard_event();
224  }
225 
226  sc::result react(const EvIdleHoldTimerExpired &event) {
227  return transit<Active>();
228  }
229 
230  // Close the session and ignore event
231  sc::result react(const EvTcpPassiveOpen &event) {
232  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
233  SandeshSession *session = event.session;
234  state_machine->DeleteSession(session);
235  return discard_event();
236  }
237 };
238 
239 struct Active : public sc::state<Active, SandeshStateMachine> {
240  typedef mpl::list<
242  sc::custom_reaction<EvTcpPassiveOpen>,
243  sc::custom_reaction<EvTcpClose>,
246 
247  Active(my_context ctx) : my_base(ctx) {
248  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
249  state_machine->set_state(ssm::ACTIVE);
250  SM_LOG(DEBUG, state_machine->StateName());
251  }
252 
254  }
255 
256  sc::result react(const EvTcpPassiveOpen &event) {
257  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
258  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
259  state_machine->set_session(event.session);
260  event.session->set_observer(
262  state_machine, _1, _2));
263  return transit<ServerInit>();
264  }
265 
266  sc::result react(const EvTcpClose &event) {
267  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
268  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
269  state_machine->set_session(NULL);
270  return transit<Idle>();
271  }
272 };
273 
274 struct ServerInit : public sc::state<ServerInit, SandeshStateMachine> {
275  typedef mpl::list<
277  sc::custom_reaction<EvTcpClose>,
278  sc::custom_reaction<EvSandeshCtrlMessageRecv>,
279  sc::custom_reaction<EvSandeshMessageRecv>,
282 
283  ServerInit(my_context ctx) : my_base(ctx) {
284  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
285  state_machine->set_state(ssm::SERVER_INIT);
286  SM_LOG(DEBUG, state_machine->StateName());
287  }
288 
290  }
291 
292  sc::result react(const EvTcpClose &event) {
293  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
294  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
295  state_machine->set_session(NULL);
296  return transit<Idle>();
297  }
298 
299  sc::result react(const EvSandeshCtrlMessageRecv &event) {
300  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
301  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
302  SandeshConnection *connection = state_machine->connection();
303  if (!connection->ProcessSandeshCtrlMessage(event.msg, event.header,
304  event.msg_type, event.header_offset)) {
305  state_machine->set_session(NULL);
306  return transit<Idle>();
307  }
308  return transit<Established>();
309  }
310 
311  sc::result react(const EvSandeshMessageRecv &event) {
312  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
313  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
314  SandeshConnection *connection = state_machine->connection();
315  if (!connection->ProcessSandeshMessage(event.msg.get(), true)) {
316  state_machine->set_session(NULL);
317  return transit<Idle>();
318  }
319  return discard_event();
320  }
321 };
322 
323 struct Established : public sc::state<Established, SandeshStateMachine> {
324  typedef mpl::list<
326  sc::custom_reaction<EvTcpClose>,
327  sc::custom_reaction<EvSandeshMessageRecv>,
329  sc::custom_reaction<EvResourceUpdate>
331 
332  Established(my_context ctx) : my_base(ctx) {
333  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
334  state_machine->set_state(ssm::ESTABLISHED);
335  state_machine->set_resource(true);
336  SM_LOG(DEBUG, state_machine->StateName());
337  }
338 
340  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
341  state_machine->set_resource(false);
342  }
343 
344  sc::result react(const EvTcpClose &event) {
345  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
346  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
347  // Process disconnect
348  SandeshConnection *connection = state_machine->connection();
349  connection->ProcessDisconnect(state_machine->session());
350  // Reset the session
351  state_machine->set_session(NULL);
352  return transit<Idle>();
353  }
354 
355  sc::result react(const EvResourceUpdate &event) {
356  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
357  SandeshConnection *connection = state_machine->connection();
358 
359  state_machine->set_resource(event.rsc);
360 
361  if (!connection->ProcessResourceUpdate(event.rsc)) {
362  state_machine->set_session(NULL);
363  return transit<Idle>();
364  }
365  return discard_event();
366  }
367 
368  sc::result react(const EvSandeshMessageRecv &event) {
369  SandeshStateMachine *state_machine = &context<SandeshStateMachine>();
370  SandeshConnection *connection = state_machine->connection();
371  if (!connection->ProcessSandeshMessage(event.msg.get(),
372  state_machine->get_resource())) {
373  state_machine->set_session(NULL);
374  return transit<Idle>();
375  }
376  return discard_event();
377  }
378 };
379 
380 } // namespace ssm
381 
383  : prefix_(prefix),
384  work_queue_(connection->GetTaskId(),
385  connection->GetTaskInstance(),
386  boost::bind(&SandeshStateMachine::DequeueEvent, this, _1),
387  kQueueSize),
388  connection_(connection),
389  session_(),
390  idle_hold_timer_(TimerManager::CreateTimer(
391  *connection->server()->event_manager()->io_service(),
392  "Idle hold timer",
393  connection->GetTaskId(),
394  connection->GetTaskInstance())),
395  idle_hold_time_(0),
396  deleted_(false),
397  resource_(false),
398  builder_(SandeshMessageBuilder::GetInstance(SandeshMessageBuilder::XML)),
399  message_drop_level_(SandeshLevel::INVALID) {
400  state_ = ssm::IDLE;
401  initiate();
402 }
403 
405  assert(!deleted_);
406  deleted_ = true;
407 
411 
412  assert(session() == NULL);
413 
414  //
415  // Explicitly call the state destructor before the state machine itself.
416  // This is needed because some of the destructors access the state machine
417  // context.
418  //
419  terminate();
420 
421  //
422  // Delete timer after state machine is terminated so that there is no
423  // possible reference to the timers being deleted any more
424  //
426 }
427 
430 }
431 
433  Enqueue(ssm::EvStop());
434 }
435 
437  if (down) {
438  Enqueue(ssm::EvStop());
439  } else {
441  // On fresh restart of state machine, all previous state should be reset
442  reset_last_info();
444  }
445 }
446 
447 // Note this api does not enqueue the deletion of TCP session
449  if (session_ != NULL) {
450  session_->set_observer(NULL);
451  session_->SetReceiveMsgCb(NULL);
452  session_->SetConnection(NULL);
453  session_->Close();
454  session_->Shutdown();
455  connection_->set_session(NULL);
456  session_ = NULL;
457  }
458 }
459 
461  if (session_ != NULL) {
463  }
465  session_ = session;
466 }
467 
469  session->set_observer(NULL);
470  session->SetReceiveMsgCb(NULL);
471  session->SetConnection(NULL);
472  session->Close();
473  session->Shutdown();
475 }
476 
478  return session_;
479 }
480 
481 template <class Ev>
482 void SandeshStateMachine::OnIdle(const Ev &event) {
483  // Release all resources
485 
487 
488  set_session(NULL);
489 }
490 
491 template <class Ev>
493  event.session->server()->DeleteSession(event.session);
495  if (connection) {
497  }
498 }
499 
501  if (idle_hold_time_ <= 0)
502  return;
503 
505  boost::bind(&SandeshStateMachine::IdleHoldTimerExpired, this),
506  boost::bind(&SandeshStateMachine::TimerErrorHandler, this, _1,
507  _2));
508 }
509 
512 }
513 
515  return idle_hold_timer_->running();
516 }
517 
518 //
519 // Test Only API : Start
520 //
523 }
524 //
525 // Test Only API : End
526 //
527 
528 void SandeshStateMachine::TimerErrorHandler(std::string name, std::string error) {
529  SM_LOG(ERROR, name + " error: " + error);
530 }
531 
532 bool SandeshStateMachine::GetQueueCount(uint64_t &queue_count) const {
533  if (deleted_ || generator_key_.empty()) {
534  return false;
535  }
536  queue_count = work_queue_.Length();
537  return true;
538 }
539 
541  std::string &drop_level) const {
542  if (deleted_ || generator_key_.empty()) {
543  return false;
544  }
546  return true;
547 }
548 
550  SandeshStateMachineStats *sm_stats) {
551  std::vector<SandeshStateMachineEvStats> ev_stats;
552  {
553  std::scoped_lock elock(smutex_);
554  // State machine event statistics
555  event_stats_.Get(&ev_stats);
556  }
557  sm_stats->set_ev_stats(ev_stats);
558  sm_stats->set_state(StateName());
559  sm_stats->set_last_state(LastStateName());
560  sm_stats->set_last_event(last_event());
561  sm_stats->set_state_since(state_since_);
562  sm_stats->set_last_event_at(last_event_at_);
563 }
564 
566  SandeshGeneratorStats *detail_msg_stats) {
567  // Detail message statistics
568  SandeshMessageStatistics::DetailStatsList v_detail_type_stats;
569  SandeshMessageStats detail_agg_stats;
570  {
571  std::scoped_lock mlock(smutex_);
572  message_stats_.Get(&v_detail_type_stats, &detail_agg_stats);
573  }
574  detail_msg_stats->set_type_stats(v_detail_type_stats);
575  detail_msg_stats->set_aggregate_stats(detail_agg_stats);
576 }
577 
579  SandeshGeneratorBasicStats *basic_msg_stats) {
580  // Basic message statistics
581  SandeshMessageStatistics::BasicStatsList v_basic_type_stats;
582  SandeshMessageBasicStats basic_agg_stats;
583  {
584  std::scoped_lock mlock(smutex_);
585  message_stats_.Get(&v_basic_type_stats, &basic_agg_stats);
586  }
587  basic_msg_stats->set_type_stats(v_basic_type_stats);
588  basic_msg_stats->set_aggregate_stats(basic_agg_stats);
589 }
590 
592  return !deleted_ && !generator_key_.empty();
593 }
594 
596  SandeshStateMachineStats *sm_stats,
597  SandeshGeneratorStats *detail_msg_stats) {
598  if (!IsValid()) {
599  return false;
600  }
601  // State machine event statistics
602  GetEventStatistics(sm_stats);
603  // Detail message statistics
604  GetDetailMessageStatistics(detail_msg_stats);
605  return true;
606 }
607 
608 bool SandeshStateMachine::GetStatistics(SandeshStateMachineStats &sm_stats,
609  SandeshGeneratorStats &detail_msg_stats) {
610  return GetDetailStatistics(&sm_stats, &detail_msg_stats);
611 }
612 
614  SandeshStateMachineStats *sm_stats,
615  SandeshGeneratorBasicStats *basic_msg_stats) {
616  if (!IsValid()) {
617  return false;
618  }
619  // State machine event statistics
620  GetEventStatistics(sm_stats);
621  // Basic message statistics
622  GetBasicMessageStatistics(basic_msg_stats);
623  return true;
624 }
625 
626 bool SandeshStateMachine::GetStatistics(SandeshStateMachineStats &sm_stats,
627  SandeshGeneratorBasicStats &basic_msg_stats) {
628  return GetBasicStatistics(&sm_stats, &basic_msg_stats);
629 }
630 
633  return false;
634 }
635 
637  TcpSession *session, TcpSession::Event event) {
638  SandeshSession *sandesh_session = dynamic_cast<SandeshSession *>(session);
639  assert((session != NULL) == (sandesh_session != NULL));
640  std::string session_s = session ? session->ToString() : "*";
641  switch (event) {
642  case TcpSession::CLOSE:
643  SM_LOG(DEBUG, session_s << " " << __func__ <<
644  " " << "TCP Connection Closed");
645  Enqueue(ssm::EvTcpClose(sandesh_session));
646  break;
647  case TcpSession::ACCEPT:
648  break;
649  default:
650  SM_LOG(DEBUG, session_s << " " << "Unknown event: " <<
651  event);
652  break;
653  }
654 }
655 
658  this, _1, _2));
659  SM_LOG(DEBUG, session->ToString() << " " << "PassiveOpen");
661 }
662 
663 void SandeshStateMachine::UpdateRxMsgStats(const std::string &msg_name,
664  size_t msg_size) {
665  std::scoped_lock lock(smutex_);
666  message_stats_.UpdateRecv(msg_name, msg_size);
667 }
668 
669 void SandeshStateMachine::UpdateRxMsgFailStats(const std::string &msg_name,
670  size_t msg_size, SandeshRxDropReason::type dreason) {
671  std::scoped_lock lock(smutex_);
672  message_stats_.UpdateRecvFailed(msg_name, msg_size, dreason);
673 }
674 
676  const std::string &msg) {
677  // Demux based on Sandesh messkage type
678  SandeshMessage *xmessage = builder_->Create(
679  reinterpret_cast<const uint8_t *>(msg.c_str()), msg.size());
680  if (xmessage == NULL) {
681  // Update message statistics
682  UpdateRxMsgFailStats(std::string(), msg.size(),
683  SandeshRxDropReason::DecodingFailed);
684  return false;
685  }
686  const SandeshHeader &header(xmessage->GetHeader());
687  const std::string &message_type(xmessage->GetMessageType());
688  // Drop ?
690  // Update message statistics
691  UpdateRxMsgFailStats(message_type, msg.size(),
692  SandeshRxDropReason::QueueLevel);
693  delete xmessage;
694  return true;
695  }
696  if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
697  SandeshHeader ctrl_header;
698  std::string ctrl_message_type;
699  uint32_t ctrl_xml_offset = 0;
700  // Extract the header and message type
701  int ret = SandeshReader::ExtractMsgHeader(msg, ctrl_header,
702  ctrl_message_type, ctrl_xml_offset);
703  if (ret) {
704  SM_LOG(ERROR, "OnMessage control in state: " << StateName() <<
705  " session " << session->ToString() << ": Extract FAILED ("
706  << ret << ")");
707  // Update message statistics
708  UpdateRxMsgFailStats(message_type, msg.size(),
709  SandeshRxDropReason::ControlMsgFailed);
710  delete xmessage;
711  return false;
712  }
713  if (header != ctrl_header ||
714  message_type != ctrl_message_type) {
715  SM_LOG(ERROR, "OnMessage control in state: " << StateName() <<
716  " session " << session->ToString() << ": Header or message "
717  << "type (" << ctrl_message_type << ") MISMATCH");
718  // Update message statistics
719  UpdateRxMsgFailStats(message_type, msg.size(),
720  SandeshRxDropReason::ControlMsgFailed);
721  delete xmessage;
722  return false;
723  }
724  SM_LOG(DEBUG, "OnMessage control in state: " << StateName() <<
725  " session " << session->ToString());
726  // Update message statistics
727  UpdateRxMsgStats(message_type, msg.size());
728  Enqueue(ssm::EvSandeshCtrlMessageRecv(msg, ctrl_header,
729  ctrl_message_type, ctrl_xml_offset));
730  delete xmessage;
731  } else {
732  // Update message statistics
733  UpdateRxMsgStats(message_type, msg.size());
735  }
736  return true;
737 }
738 
741 }
742 
743 static const std::string state_names[] = {
744  "Idle",
745  "Active",
746  "Established",
747  "ServerInit"
748 };
749 
750 const string &SandeshStateMachine::StateName() const {
751  return state_names[state_];
752 }
753 
754 const string &SandeshStateMachine::LastStateName() const {
755  return state_names[last_state_];
756 }
757 
758 bool SandeshStateMachine::LogEvent(const sc::event_base *event) {
759  if (state_ == ssm::ESTABLISHED) {
760  const ssm::EvSandeshMessageRecv *snh_rcv =
761  dynamic_cast<const ssm::EvSandeshMessageRecv *>(event);
762  if (snh_rcv != NULL) {
763  return false;
764  }
765  }
766  return true;
767 }
768 
769 void SandeshStateMachine::UpdateEventEnqueue(const sc::event_base &event) {
770  UpdateEventStats(event, true, false);
771 }
772 
773 void SandeshStateMachine::UpdateEventDequeue(const sc::event_base &event) {
774  UpdateEventStats(event, false, false);
775 }
776 
777 void SandeshStateMachine::UpdateEventEnqueueFail(const sc::event_base &event) {
778  UpdateEventStats(event, true, true);
779 }
780 
781 void SandeshStateMachine::UpdateEventDequeueFail(const sc::event_base &event) {
782  UpdateEventStats(event, false, true);
783 }
784 
785 void SandeshStateMachine::UpdateEventStats(const sc::event_base &event,
786  bool enqueue, bool fail) {
787  if (!deleted_) {
788  std::string event_name(TYPE_NAME(event));
789  std::scoped_lock lock(smutex_);
790  event_stats_.Update(event_name, enqueue, fail);
791  }
792 }
793 
795  if (deleted_) {
796  return true;
797  }
799  if (ec.validate.empty() || ec.validate(this)) {
800  // Log only relevant events and states
801  if (LogEvent(ec.event.get())) {
802  SM_LOG(DEBUG, "Processing " << TYPE_NAME(*ec.event) << " in state "
803  << StateName() << " Key " << generator_key());
804  }
805  // Update event stats
807  process_event(*ec.event);
808  } else {
809  SM_LOG(DEBUG, "Discarding " << TYPE_NAME(*ec.event) << " in state "
810  << StateName() << " Key " << generator_key());
811  // Update event stats
813  }
814  ec.event.reset();
815  return true;
816 }
817 
818 void SandeshStateMachine::unconsumed_event(const sc::event_base &event) {
819  SM_LOG(DEBUG, "Unconsumed " << TYPE_NAME(event) << " in state "
820  << StateName());
821 }
822 
823 // This class determines whether a given class has a method called 'validate'
824 template<typename Ev>
825 struct HasValidate
826 {
827  template<typename T, bool (T::*)(SandeshStateMachine *) const> struct SFINAE {};
828  template<typename T> static char Test(SFINAE<T, &T::validate>*);
829  template<typename T> static int Test(...);
830  static const bool Has = sizeof(Test<Ev>(0)) == sizeof(char);
831 };
832 
833 template <typename Ev, bool has_validate>
834 struct ValidateFn {
835  EvValidate operator()(const Ev *event) { return NULL; }
836 };
837 
838 template <typename Ev>
839 struct ValidateFn<Ev, true> {
840  EvValidate operator()(const Ev *event) {
841  return boost::bind(&Ev::validate, event, _1);
842  }
843 };
844 
845 template <typename Ev>
846 void SandeshStateMachine::Enqueue(const Ev &event) {
847  if (deleted_) {
848  return;
849  }
850  EventContainer ec;
851  ec.event = event.intrusive_from_this();
852  ec.validate = ValidateFn<Ev, HasValidate<Ev>::Has>()(static_cast<const Ev *>(ec.event.get()));
853  if (!work_queue_.Enqueue(ec)) {
854  // Update event stats
855  // XXX Disable till we implement bounded work queues
856  //UpdateEventEnqueueFail(event);
857  //return;
858  }
859  // Update event stats
860  UpdateEventEnqueue(event);
861  return;
862 }
863 
865  SandeshLevel::type level, boost::function<void (void)> cb) {
866  if (message_drop_level_ != level) {
867  SM_LOG(INFO, "SANDESH MESSAGE DROP LEVEL: [" <<
869  Sandesh::LevelToString(level) << "], SM QUEUE COUNT: " <<
870  queue_count);
871  message_drop_level_ = level;
872  }
873  // Always invoke the callback
874  if (!cb.empty()) {
875  cb();
876  }
877 }
878 
880  if (session_ != NULL) {
881  SM_LOG(INFO, "SANDESH Session Reader Defer : " << defer_reader);
882  session_->SetDeferReader(defer_reader);
883  }
884 }
885 
888  bool high(boost::get<2>(wm));
889  size_t queue_count(boost::get<0>(wm));
890  bool defer_undefer(boost::get<3>(wm));
891  boost::function<void (void)> cb;
892  if (high) {
893  if (defer_undefer) {
895  this, true);
896  }
897  WaterMarkInfo wmi(queue_count,
899  this, _1, boost::get<1>(wm), cb));
901  } else {
902  if (defer_undefer) {
904  this, false);
905  }
906  WaterMarkInfo wmi(queue_count,
908  this, _1, boost::get<1>(wm), cb));
910  }
911 }
912 
916 }
917 
919  SandeshStateMachine::EventContainer *ec, size_t *msg_size) {
920  const ssm::EvSandeshMessageRecv *snh_rcv(
921  dynamic_cast<const ssm::EvSandeshMessageRecv *>(ec->event.get()));
922  if (snh_rcv == NULL) {
923  return false;
924  }
925  const SandeshMessage *msg(snh_rcv->msg.get());
926  *msg_size = msg->GetSize();
927  return true;
928 }
929 
930 template<>
933  size_t msg_size;
934  if (GetEvSandeshMessageRecvSize(ec, &msg_size)) {
935  return count_.fetch_add(msg_size) + msg_size;
936  } else {
937  return count_.fetch_add(1) + 1;
938  }
939 }
940 
941 template<>
944  size_t msg_size;
945  if (GetEvSandeshMessageRecvSize(ec, &msg_size)) {
946  return count_.fetch_sub(msg_size) - msg_size;
947  } else {
948  return count_.fetch_sub(1) - 1;
949  }
950 }
951 
952 void SandeshStateMachine::SetDeferDequeue(bool defer_dequeue) {
953  SM_LOG(INFO, "SANDESH Set Defer Dequeue: " << defer_dequeue);
954  work_queue_.set_disable(defer_dequeue);
955 }
virtual bool ProcessResourceUpdate(bool res)
virtual void ProcessDisconnect(SandeshSession *sess)=0
virtual bool ProcessSandeshCtrlMessage(const std::string &msg, const SandeshHeader &header, const std::string sandesh_name, const uint32_t header_offset)=0
virtual void ManagedDelete()=0
virtual bool ProcessSandeshMessage(const SandeshMessage *msg, bool resource)=0
void set_session(SandeshSession *session)
SandeshSession * session() const
void Update(std::string &event_name, bool enqueue, bool fail)
void Get(std::vector< SandeshStateMachineEvStats > *ev_stats) const
virtual SandeshMessage * Create(const uint8_t *data, size_t size) const =0
std::vector< SandeshMessageTypeStats > DetailStatsList
std::vector< SandeshMessageTypeBasicStats > BasicStatsList
void Get(DetailStatsList *v_detail_type_stats, SandeshMessageStats *detail_agg_stats) const
void UpdateRecv(const std::string &msg_name, uint64_t bytes)
void UpdateRecvFailed(const std::string &msg_name, uint64_t bytes, SandeshRxDropReason::type dreason)
const std::string & GetMessageType() const
const size_t GetSize() const
const SandeshHeader & GetHeader() const
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
virtual std::string ToString() const
virtual void Shutdown()
void SetConnection(SandeshConnection *connection)
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
SandeshMessageStatistics message_stats_
void GetDetailMessageStatistics(SandeshGeneratorStats *detail_msg_stats)
bool GetDetailStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorStats *detail_msg_stats)
void UpdateEventDequeueFail(const sc::event_base &event)
void set_state(ssm::SsmState state)
void UpdateEventEnqueueFail(const sc::event_base &event)
void GetBasicMessageStatistics(SandeshGeneratorBasicStats *basic_msg_stats)
bool GetStatistics(SandeshStateMachineStats &sm_stats, SandeshGeneratorBasicStats &basic_msg_stats)
bool OnSandeshMessage(SandeshSession *session, const std::string &msg)
void SetDeferDequeue(bool defer)
void SetDeferSessionReader(bool defer_reader)
void GetEventStatistics(SandeshStateMachineStats *sm_stats)
void set_session(SandeshSession *session)
bool GetBasicStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorBasicStats *basic_msg_stats)
SandeshStateMachine(const char *prefix, SandeshConnection *connection)
SandeshConnection * connection_
SandeshConnection * connection()
void UpdateEventDequeue(const sc::event_base &event)
void set_last_event(const std::string &event)
bool LogEvent(const sc::event_base *event)
SandeshSession * session()
void UpdateEventEnqueue(const sc::event_base &event)
void PassiveOpen(SandeshSession *session)
std::atomic< ssm::SsmState > state_
void UpdateRxMsgStats(const std::string &msg_name, size_t msg_size)
const std::string & generator_key() const
const std::string & LastStateName() const
void DeleteTcpSession(const Ev &event)
void UpdateRxMsgFailStats(const std::string &msg_name, size_t msg_size, SandeshRxDropReason::type dreason)
static const int kIdleHoldTime
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
SandeshEventStatistics event_stats_
void OnIdle(const Ev &event)
SandeshMessageBuilder * builder_
bool DequeueEvent(EventContainer ec)
const std::string last_event() const
void SetQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm)
void TimerErrorHandler(std::string name, std::string error)
void DeleteSession(SandeshSession *session)
bool GetQueueCount(uint64_t &queue_count) const
void set_idle_hold_time(int idle_hold_time)
void unconsumed_event(const sc::event_base &event)
const std::string & StateName() const
SandeshLevel::type message_drop_level_
bool GetMessageDropLevel(std::string &drop_level) const
void Enqueue(const Ev &event)
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
void SetSandeshMessageDropLevel(size_t queue_count, SandeshLevel::type level, boost::function< void(void)> cb)
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
Definition: cpp/sandesh.h:149
static const char * LevelToString(SandeshLevel::type level)
Definition: sandesh.cc:853
void set_observer(EventObserver observer)
Definition: tcp_session.cc:219
virtual void SetDeferReader(bool defer_reader)
Definition: tcp_session.cc:183
void Close()
Definition: tcp_session.cc:355
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:221
Definition: timer.h:57
bool Cancel()
Definition: timer.cc:149
bool running() const
Definition: timer.h:89
void Fire()
Definition: timer.h:121
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:107
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
void set_disable(bool disabled)
Definition: queue_task.h:319
size_t AtomicDecrementQueueCount(EventContainer *entry)
Definition: queue_task.h:435
size_t AtomicIncrementQueueCount(EventContainer *entry)
Definition: queue_task.h:431
size_t Length() const
Definition: queue_task.h:356
void SetHighWaterMark(const WaterMarkInfos &high_water)
Definition: queue_task.h:208
void ResetLowWaterMark()
Definition: queue_task.h:238
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
void SetLowWaterMark(const WaterMarkInfos &low_water)
Definition: queue_task.h:228
std::atomic< size_t > count_
Definition: queue_task.h:514
void ResetHighWaterMark()
Definition: queue_task.h:218
@ INVALID
Definition: globals.h:147
uint8_t type
Definition: load_balance.h:2
#define TYPE_NAME(_type)
Definition: logging.h:32
bool DoDropSandeshMessage(const SandeshHeader &header, const SandeshLevel::type drop_level)
Definition: sandesh.cc:928
#define SM_LOG(_Level, _Msg)
#define SESSION_LOG(session)
bool GetEvSandeshMessageRecvSize(SandeshStateMachine::EventContainer *ec, size_t *msg_size)
static const std::string state_names[]
boost::function< bool(StateMachine *)> EvValidate
Definition: state_machine.h:33
static const bool Has
static char Test(SFINAE< T, &T::validate > *)
static int Test(...)
boost::intrusive_ptr< const sc::event_base > event
EvValidate operator()(const Ev *event)
EvValidate operator()(const Ev *event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpPassiveOpen >, sc::custom_reaction< EvTcpClose >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
Active(my_context ctx)
sc::result react(const EvTcpClose &event)
sc::result react(const EvTcpPassiveOpen &event)
sc::in_state_reaction< Ev, SandeshStateMachine, &SandeshStateMachine::DeleteTcpSession< Ev > > reaction
Established(my_context ctx)
sc::result react(const EvTcpClose &event)
sc::result react(const EvSandeshMessageRecv &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, DeleteTcpSession< EvTcpDeleteSession >::reaction, sc::custom_reaction< EvResourceUpdate > > reactions
sc::result react(const EvResourceUpdate &event)
static const char * Name()
EvSandeshCtrlMessageRecv(const std::string &msg, const SandeshHeader &header, const std::string &msg_type, const uint32_t &header_offset)
EvSandeshMessageRecv(const SandeshMessage *msg)
boost::shared_ptr< const SandeshMessage > msg
static const char * Name()
static const char * Name()
SandeshSession * session
EvTcpClose(SandeshSession *session)
bool validate(SandeshStateMachine *state_machine) const
static const char * Name()
static const char * Name()
EvTcpDeleteSession(SandeshSession *session)
static const char * Name()
EvTcpPassiveOpen(SandeshSession *session)
sc::result react(const EvStart &event)
sc::result react(const EvIdleHoldTimerExpired &event)
sc::result react(const EvStop &event)
mpl::list< sc::custom_reaction< EvStart >, sc::custom_reaction< EvStop >, sc::custom_reaction< EvTcpPassiveOpen >, sc::custom_reaction< EvIdleHoldTimerExpired >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
Idle(my_context ctx)
sc::result react(const EvTcpPassiveOpen &event)
sc::in_state_reaction< Ev, SandeshStateMachine, &SandeshStateMachine::ProcessMessage< Ev > > reaction
sc::in_state_reaction< Ev, SandeshStateMachine, &SandeshStateMachine::ReleaseSandesh< Ev > > reaction
sc::result react(const EvTcpClose &event)
sc::result react(const EvSandeshMessageRecv &event)
sc::result react(const EvSandeshCtrlMessageRecv &event)
ServerInit(my_context ctx)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshCtrlMessageRecv >, sc::custom_reaction< EvSandeshMessageRecv >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::transition< Ev, Idle, SandeshStateMachine, &SandeshStateMachine::OnIdle< Ev > > reaction