OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sandesh_client_sm.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_client_sm.cc
7 //
8 // Sandesh Client State Machine
9 //
10 
11 #include <typeinfo>
12 #include <boost/bind.hpp>
13 #include <boost/date_time/posix_time/posix_time.hpp>
14 #include <boost/statechart/custom_reaction.hpp>
15 #include <boost/statechart/event.hpp>
16 #include <boost/statechart/simple_state.hpp>
17 #include <boost/statechart/state.hpp>
18 #include <boost/statechart/state_machine.hpp>
19 #include <boost/statechart/transition.hpp>
20 #include <boost/statechart/in_state_reaction.hpp>
21 
22 #include <base/logging.h>
23 #include <base/timer.h>
24 #include <base/task_annotations.h>
25 #include <base/connection_info.h>
26 #include <io/event_manager.h>
27 
28 #include <sandesh/sandesh_constants.h>
29 #include <sandesh/sandesh_types.h>
30 #include <sandesh/sandesh.h>
31 #include <sandesh/sandesh_uve.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/sandesh_statistics.h>
34 #include "sandesh_client_sm_priv.h"
35 
36 using boost::system::error_code;
37 using std::string;
38 using std::map;
39 using std::vector;
41 using process::ConnectionType;
42 using process::ConnectionStatus;
43 
44 namespace mpl = boost::mpl;
45 namespace sc = boost::statechart;
46 
47 #define SM_LOG(_Level, _Msg) \
48  do { \
49  if (LoggingDisabled()) break; \
50  log4cplus::Logger _Xlogger = Sandesh::logger(); \
51  if (_Xlogger.isEnabledFor(log4cplus::_Level##_LOG_LEVEL)) { \
52  log4cplus::tostringstream _Xbuf; \
53  _Xbuf << _Msg; \
54  _Xlogger.forcedLog(log4cplus::_Level##_LOG_LEVEL, \
55  _Xbuf.str()); \
56  } \
57  } while (false)
58 
59 #define SESSION_LOG(session) \
60  SANDESH_LOG(DEBUG, ((session) ? (session)->ToString() : "*") << ":" << Name())
61 
62 
63 namespace scm {
64 
65 // events
66 struct EvStart : sc::event<EvStart> {
67  static const char * Name() {
68  return "EvStart";
69  }
70 };
71 
72 struct EvStop : sc::event<EvStop> {
73  EvStop() : enq_(true) {}
74  EvStop(bool enq) : enq_(enq) {}
75  static const char * Name() {
76  return "EvStop";
77  }
78  bool enq_;
79 };
80 
81 struct EvCollectorUpdate : sc::event<EvCollectorUpdate> {
82  EvCollectorUpdate(const std::vector<TcpServer::Endpoint> &collectors) :
83  collectors_(collectors) {
84  }
85  static const char * Name() {
86  return "EvCollectorUpdate";
87  }
88  std::vector<TcpServer::Endpoint> collectors_;
89 };
90 
91 struct EvIdleHoldTimerExpired : sc::event<EvIdleHoldTimerExpired> {
93  }
94  static const char * Name() {
95  return "EvIdleHoldTimerExpired";
96  }
97  bool validate() const {
98  return !timer_->cancelled();
99  }
101 };
102 
103 struct EvConnectTimerExpired : sc::event<EvConnectTimerExpired> {
104  EvConnectTimerExpired(Timer *timer) : timer_(timer) {
105  }
106  static const char * Name() {
107  return "EvConnectTimerExpired";
108  }
109  bool validate(SandeshClientSMImpl *state_machine) const {
110  if (timer_->cancelled()) {
111  return false;
112  }
113  return true;
114  }
116 };
117 
118 struct EvTcpConnected : sc::event<EvTcpConnected> {
119  EvTcpConnected(SandeshSession *session) : session(session) {
120  SESSION_LOG(session);
121  }
122  static const char * Name() {
123  return "EvTcpConnected";
124  }
125 
127 };
128 
129 struct EvTcpConnectFail : sc::event<EvTcpConnectFail> {
130  EvTcpConnectFail(SandeshSession *session) : session(session) {
131  SESSION_LOG(session);
132  }
133  static const char * Name() {
134  return "EvTcpConnectFail";
135  }
136 
138 };
139 
140 
141 struct EvTcpClose : sc::event<EvTcpClose> {
142  EvTcpClose(SandeshSession *session) : session(session) {
143  SESSION_LOG(session);
144  };
145  static const char * Name() {
146  return "EvTcpClose";
147  }
148 
150 };
151 
152 // Used to defer the session delete after all events currently on the queue.
153 struct EvTcpDeleteSession : sc::event<EvTcpDeleteSession> {
155  }
156  static const char *Name() {
157  return "EvTcpDeleteSession";
158  }
160 };
161 
162 struct EvSandeshSend : sc::event<EvSandeshSend> {
164  snh(snh) {
165  }
166  static const char * Name() {
167  return "EvSandeshSend";
168  }
170 };
171 
172 struct EvSandeshMessageRecv : sc::event<EvSandeshMessageRecv> {
173  EvSandeshMessageRecv(const std::string &msg, const SandeshHeader& header,
174  const std::string &msg_type, const uint32_t &header_offset) :
175  msg(msg), header(header), msg_type(msg_type), header_offset(header_offset) {
176  };
177  static const char * Name() {
178  return "EvSandeshMessageRecv";
179  }
180  const std::string msg;
181  const SandeshHeader header;
182  const std::string msg_type;
183  const uint32_t header_offset;
184 };
185 
186 
187 // states
188 struct Idle;
189 struct Connect;
190 struct Disconnect;
191 struct ClientInit;
192 struct Established;
193 
194 template <class Ev>
196  typedef sc::transition<Ev, Idle, SandeshClientSMImpl,
197  &SandeshClientSMImpl::OnIdle<Ev> > reaction;
198 };
199 
200 template <class Ev>
202  typedef sc::in_state_reaction<Ev, SandeshClientSMImpl,
203  &SandeshClientSMImpl::ReleaseSandesh<Ev> > reaction;
204 };
205 
206 template <class Ev>
208  typedef sc::in_state_reaction<Ev, SandeshClientSMImpl,
209  &SandeshClientSMImpl::DeleteTcpSession<Ev> > reaction;
210 };
211 
212 
213 struct Idle : public sc::state<Idle, SandeshClientSMImpl> {
214  typedef mpl::list<
215  sc::custom_reaction<EvStart>,
216  sc::custom_reaction<EvStop>,
217  sc::custom_reaction<EvIdleHoldTimerExpired>,
218  sc::custom_reaction<EvCollectorUpdate>,
222 
223  Idle(my_context ctx) : my_base(ctx) {
224  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
225  state_machine->set_state(SandeshClientSM::IDLE);
226  SM_LOG(DEBUG, state_machine->StateName());
227  state_machine->SendUVE();
228  }
229 
230  ~Idle() {
231  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
232  state_machine->CancelIdleHoldTimer();
233  }
234 
235  sc::result react(const EvStart &event) {
236  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
237  if (state_machine->idle_hold_time()) {
238  // Update connection info
239  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
240  std::string(), ConnectionStatus::INIT,
241  state_machine->server(),
242  state_machine->StateName() + " : " + event.Name());
243  state_machine->StartIdleHoldTimer();
244  } else {
245  // Update connection info
246  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
247  std::string(), ConnectionStatus::DOWN,
248  state_machine->server(),
249  state_machine->StateName() + " : " + event.Name() +
250  " -> Disconnect");
251  return transit<Disconnect>();
252  }
253  return discard_event();
254  }
255 
256  sc::result react(const EvStop &event) {
257  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
258  state_machine->CancelIdleHoldTimer();
259  return discard_event();
260  }
261 
262  sc::result react(const EvCollectorUpdate &event) {
263  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
264  state_machine->CollectorUpdate(event.collectors_);
265  return discard_event();
266  }
267  sc::result react(const EvIdleHoldTimerExpired &event) {
268  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
269  // Update connection info
270  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
271  std::string(), ConnectionStatus::INIT,
272  state_machine->server(),
273  state_machine->StateName() + " : " + event.Name() + " -> Connect");
274  return transit<Connect>();
275  }
276 };
277 
278 struct Disconnect : public sc::state<Disconnect, SandeshClientSMImpl> {
279  typedef mpl::list<
281  sc::custom_reaction<EvCollectorUpdate>,
285 
286  Disconnect(my_context ctx) : my_base(ctx) {
287  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
288  state_machine->set_state(SandeshClientSM::DISCONNECT);
289  SM_LOG(DEBUG, state_machine->StateName());
290  state_machine->SendUVE();
291  }
292 
293  sc::result react(const EvCollectorUpdate &event) {
294  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
295  state_machine->CollectorUpdate(event.collectors_);
296  // Update connection info
297  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
298  std::string(), ConnectionStatus::INIT,
299  state_machine->server(),
300  state_machine->StateName() + " : " + event.Name() +
301  " -> Connect");
302  return transit<Connect>();
303  }
304 };
305 
306 struct Connect : public sc::state<Connect, SandeshClientSMImpl> {
307  typedef mpl::list<
309  sc::custom_reaction<EvConnectTimerExpired>,
310  sc::custom_reaction<EvTcpConnected>,
311  sc::custom_reaction<EvTcpConnectFail>,
312  sc::custom_reaction<EvTcpClose>,
313  sc::custom_reaction<EvCollectorUpdate>,
317 
318  static const int kConnectTimeout = 60; // seconds
319 
320  Connect(my_context ctx) : my_base(ctx) {
321  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
322  state_machine->set_state(SandeshClientSM::CONNECT);
323  StartSession(state_machine);
324  state_machine->connect_attempts_inc();
325  SM_LOG(DEBUG, state_machine->StateName() << " : " << "Start Connect timer " <<
326  state_machine->server());
327  state_machine->StartConnectTimer(state_machine->GetConnectTime());
328  state_machine->SendUVE();
329  }
330 
332  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
333  state_machine->CancelConnectTimer();
334  }
335 
336  sc::result react(const EvTcpConnectFail &event) {
337  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
338  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
339  state_machine->CollectorChange();
340  return ToIdle(state_machine, event.Name());
341  }
342 
343 
344  sc::result react(const EvConnectTimerExpired &event) {
345  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
346  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
347  state_machine->CollectorChange();
348  return ToIdle(state_machine, event.Name());
349  }
350 
351  sc::result react(const EvTcpConnected &event) {
352  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
353  SM_LOG(DEBUG, state_machine->StateName() << " : " <<
354  event.Name() << " : " << "Cancelling Connect timer");
355  state_machine->CancelConnectTimer();
356  SandeshSession *session = event.session;
357  // Update connection info
358  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
359  std::string(), ConnectionStatus::INIT, session->remote_endpoint(),
360  state_machine->StateName() + " : " + event.Name());
361  // Start the send queue runner XXX move this to Established or later
362  session->send_queue()->MayBeStartRunner();
363  return transit<ClientInit>();
364  }
365 
366  sc::result react(const EvTcpClose &event) {
367  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
368  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
369  state_machine->CollectorChange();
370  return ToIdle(state_machine, event.Name());
371  }
372 
373  sc::result react(const EvCollectorUpdate &event) {
374  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
375  if (state_machine->CollectorUpdate(event.collectors_)) {
376  return ToIdle(state_machine, event.Name());
377  }
378  return discard_event();
379  }
380 
381  // Create an active connection request.
382  void StartSession(SandeshClientSMImpl *state_machine) {
383  SandeshSession *session = static_cast<SandeshSession *>(
384  state_machine->GetMgr()->CreateSMSession(
386  state_machine, _1, _2),
387  boost::bind(&SandeshClientSMImpl::OnMessage,
388  state_machine, _2, _1),
389  state_machine->server()));
390  session->set_stats_client(state_machine->GetMgr()->stats_client());
391  state_machine->set_session(session);
392  }
393 
394  sc::result ToIdle(SandeshClientSMImpl *state_machine,
395  const char *event_name) {
396  // Update connection info
397  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
398  std::string(), ConnectionStatus::DOWN,
399  state_machine->session()->remote_endpoint(),
400  state_machine->StateName() + " : " + event_name);
401  state_machine->set_idle_hold_time(state_machine->GetConnectTime() * 1000);
402  state_machine->OnIdle<EvStop>(EvStop());
403  state_machine->StartIdleHoldTimer();
404  return transit<Idle>();
405  }
406 };
407 
408 struct ClientInit : public sc::state<ClientInit, SandeshClientSMImpl> {
409  typedef mpl::list<
411  sc::custom_reaction<EvConnectTimerExpired>,
412  sc::custom_reaction<EvTcpClose>,
413  sc::custom_reaction<EvSandeshMessageRecv>,
414  sc::custom_reaction<EvSandeshSend>,
415  sc::custom_reaction<EvCollectorUpdate>,
418 
419  ClientInit(my_context ctx) : my_base(ctx) {
420  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
421  state_machine->set_state(SandeshClientSM::CLIENT_INIT);
422  SM_LOG(DEBUG, state_machine->StateName());
423  state_machine->CancelConnectTimer();
424  state_machine->StartConnectTimer(state_machine->GetConnectTime());
425  state_machine->GetMgr()->InitializeSMSession(state_machine->connects_inc());
426  state_machine->SendUVE();
427  }
428 
430  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
431  state_machine->CancelConnectTimer();
432  }
433 
434  sc::result react(const EvTcpClose &event) {
435  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
436  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
437  state_machine->CollectorChange();
438  return ToIdle(state_machine, event.Name());
439  }
440 
441  sc::result react(const EvConnectTimerExpired &event) {
442  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
443  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
444  state_machine->CollectorChange();
445  return ToIdle(state_machine, event.Name());
446  }
447 
448  sc::result react(const EvSandeshMessageRecv &event) {
449  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
450  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
451 
452  if (!state_machine->GetMgr()->ReceiveMsg(event.msg, event.header,
453  event.msg_type, event.header_offset)) {
454  return ToIdle(state_machine, event.Name());
455  }
456  state_machine->set_collector_name(event.header.get_Source());
457 
458  if (event.header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
459  // Update connection info
460  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
461  std::string(), ConnectionStatus::UP,
462  state_machine->session()->remote_endpoint(),
463  state_machine->StateName() + " : Control " + event.Name());
464  return transit<Established>();
465  }
466  return discard_event();
467  }
468 
469  sc::result react(const EvSandeshSend &event) {
470  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
471  Sandesh *snh(event.snh);
472  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name() <<
473  " : " << snh->Name());
474  if (dynamic_cast<SandeshUVE *>(snh)) {
475  if (Sandesh::IsLoggingDroppedAllowed(snh->type())) {
476  SANDESH_LOG(ERROR, "SANDESH: Send FAILED: " <<
477  snh->ToString());
478  }
479  Sandesh::UpdateTxMsgFailStats(snh->Name(), 0,
480  SandeshTxDropReason::WrongClientSMState);
481  SM_LOG(INFO, "Received UVE message in wrong state : " << snh->Name());
482  snh->Release();
483  return discard_event();
484  }
485  if (!state_machine->send_session(snh)) {
486  SM_LOG(INFO, "Could not EnQ Sandesh :" << snh->Name());
487  // If Enqueue encounters an error, it will release the Sandesh
488  }
489  return discard_event();
490  }
491 
492  sc::result react(const EvCollectorUpdate &event) {
493  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
494  if (state_machine->CollectorUpdate(event.collectors_)) {
495  return ToIdle(state_machine, event.Name());
496  }
497  return discard_event();
498  }
499 
500  sc::result ToIdle(SandeshClientSMImpl *state_machine,
501  const char *event_name) {
502  // Update connection info
503  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
504  std::string(), ConnectionStatus::DOWN,
505  state_machine->session()->remote_endpoint(),
506  state_machine->StateName() + " : " + event_name);
507  state_machine->OnIdle<EvStop>(EvStop());
508  state_machine->StartIdleHoldTimer();
509  SM_LOG(INFO, "Return to idle with " << state_machine->idle_hold_time());
510  return transit<Idle>();
511  }
512 };
513 
514 struct Established : public sc::state<Established, SandeshClientSMImpl> {
515  typedef mpl::list<
517  sc::custom_reaction<EvTcpClose>,
518  sc::custom_reaction<EvSandeshMessageRecv>,
519  sc::custom_reaction<EvSandeshSend>,
520  sc::custom_reaction<EvCollectorUpdate>,
523 
524  Established(my_context ctx) : my_base(ctx) {
525  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
526  state_machine->set_state(SandeshClientSM::ESTABLISHED);
527  SM_LOG(DEBUG, state_machine->StateName());
528  state_machine->connect_attempts_clear();
529  // Update connection info
530  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
531  std::string(), ConnectionStatus::UP,
532  state_machine->session()->remote_endpoint(),
533  state_machine->StateName());
534  state_machine->SendUVE();
535  }
536 
538  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
539  state_machine->set_collector_name(string());
540  }
541 
542  sc::result react(const EvTcpClose &event) {
543  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
544  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
545  if (state_machine->CollectorChange()) {
546  // Update connection info
547  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
548  std::string(), ConnectionStatus::INIT,
549  state_machine->session()->remote_endpoint(),
550  state_machine->StateName() + " : " + event.Name() +
551  " -> Connect");
552  state_machine->OnIdle<EvStop>(EvStop());
553  return transit<Connect>();
554  } else {
555  return ToIdle(state_machine, event.Name());
556  }
557  }
558 
559  sc::result react(const EvSandeshMessageRecv &event) {
560  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
561  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
562 
563  if (!state_machine->GetMgr()->ReceiveMsg(event.msg, event.header,
564  event.msg_type, event.header_offset)) {
565  return ToIdle(state_machine, event.Name());
566  }
567  return discard_event();
568  }
569 
570  sc::result react(const EvSandeshSend &event) {
571  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
572  //SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
573  if (!state_machine->send_session(event.snh)) {
574  SM_LOG(ERROR, "Could not EnQ Sandesh :" << event.snh->Name());
575  // If Enqueue encounters an error, it will release the Sandesh
576  }
577  return discard_event();
578  }
579 
580  sc::result react(const EvCollectorUpdate &event) {
581  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
582  if (state_machine->CollectorUpdate(event.collectors_)) {
583  // Update connection info
584  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
585  std::string(), ConnectionStatus::INIT,
586  state_machine->session()->remote_endpoint(),
587  state_machine->StateName() + " : " + event.Name() +
588  " -> Connect");
589  state_machine->OnIdle<EvStop>(EvStop());
590  return transit<Connect>();
591  }
592  return discard_event();
593  }
594 
595  sc::result ToIdle(SandeshClientSMImpl *state_machine,
596  const char *event_name) {
597  // Update connection info
598  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
599  std::string(), ConnectionStatus::DOWN,
600  state_machine->session()->remote_endpoint(),
601  state_machine->StateName() + " : " + event_name);
602  state_machine->OnIdle<EvStop>(EvStop());
603  state_machine->StartIdleHoldTimer();
604  return transit<Idle>();
605  }
606 
607 };
608 
609 } // namespace scm
610 
612  if (down) {
613  Enqueue(scm::EvStop());
614  } else {
616  // On fresh restart of state machine, all previous state should be reset
617  reset_last_info();
619  }
620 }
621 
624 }
625 
626 template <class Ev>
627 void SandeshClientSMImpl::OnIdle(const Ev &event) {
628  // Release all resources
630 
632 
633  set_session(NULL, event.enq_);
634 }
635 
636 template <class Ev>
637 void SandeshClientSMImpl::ReleaseSandesh(const Ev &event) {
638  Sandesh *snh(event.snh);
640  SANDESH_LOG(ERROR, "SANDESH: Send FAILED: " << snh->ToString());
641  }
643  SandeshTxDropReason::WrongClientSMState);
644  SM_LOG(DEBUG, "Wrong state: " << StateName() << " for event: " <<
645  event.Name() << " message: " << snh->Name());
646  snh->Release();
647 }
648 
649 template <class Ev>
651  GetMgr()->DeleteSMSession(event.session);
652 }
653 
655  connect_timer_->Start(seconds * 1000,
656  boost::bind(&SandeshClientSMImpl::ConnectTimerExpired, this),
657  boost::bind(&SandeshClientSMImpl::TimerErrorHanlder, this, _1, _2));
658 }
659 
662 }
663 
665  return connect_timer_->running();
666 }
667 
669  if (idle_hold_time_ <= 0)
670  return;
671 
673  boost::bind(&SandeshClientSMImpl::IdleHoldTimerExpired, this),
674  boost::bind(&SandeshClientSMImpl::TimerErrorHanlder, this, _1, _2));
675 }
676 
679 }
680 
682  return idle_hold_timer_->running();
683 }
684 
685 //
686 // Test Only API : Start
687 //
689  connect_timer_->Fire();
690 }
691 
694 }
695 //
696 // Test Only API : End
697 //
698 
699 void SandeshClientSMImpl::TimerErrorHanlder(std::string name, std::string error) {
700  SM_LOG(ERROR, name + " error: " + error);
701 }
702 
703 // Client only
705  if (!deleted_) {
706  SM_LOG(DEBUG, server() << " "
707  << "EvConnectTimerExpired in state " << StateName());
709  }
710  return false;
711 }
712 
715  return false;
716 }
717 
719  TcpSession *session, TcpSession::Event event) {
720  SandeshSession *sandesh_session = dynamic_cast<SandeshSession *>(session);
721  assert((session != NULL) == (sandesh_session != NULL));
722  std::string session_s = session ? session->ToString() : "*";
723  switch (event) {
725  SM_LOG(DEBUG, session_s << " " << __func__ <<
726  " " << "TCP Connected");
727  Enqueue(scm::EvTcpConnected(sandesh_session));
728  break;
730  SM_LOG(DEBUG, session_s << " " << __func__ <<
731  " " << "TCP Connect Failed");
732  Enqueue(scm::EvTcpConnectFail(sandesh_session));
733  break;
734  case TcpSession::CLOSE:
735  SM_LOG(DEBUG, session_s << " " << __func__ <<
736  " " << "TCP Connection Closed");
737  Enqueue(scm::EvTcpClose(sandesh_session));
738  break;
739  case TcpSession::ACCEPT:
740  default:
741  SM_LOG(DEBUG, session_s << " " << "Unknown event: " <<
742  event);
743  break;
744  }
745 }
746 
749  return true;
750 }
751 
754  return true;
755 }
756 
758  const std::string &msg) {
759  // Demux based on Sandesh message type
760  SandeshHeader header;
761  std::string message_type;
762  uint32_t xml_offset = 0;
763 
764  // Extract the header and message type
765  int ret = SandeshReader::ExtractMsgHeader(msg, header, message_type,
766  xml_offset);
767  if (ret) {
768  SM_LOG(ERROR, "OnMessage in state: " << StateName() << ": Extract "
769  << " FAILED(" << ret << ")");
770  return false;
771  }
772 
773  if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
774  SM_LOG(INFO, "OnMessage control in state: " << StateName() );
775  }
776  Enqueue(scm::EvSandeshMessageRecv(msg, header, message_type, xml_offset));
777  return true;
778 }
779 
780 static const std::string state_names[] = {
781  "Idle",
782  "Disconnect",
783  "Connect",
784  "ClientInit",
785  "Established",
786 };
787 
789  SandeshClientSM::State state) const {
790  return state_names[state];
791 }
792 
793 const string &SandeshClientSMImpl::StateName() const {
794  return state_names[state_];
795 }
796 
797 const string &SandeshClientSMImpl::LastStateName() const {
798  return state_names[last_state_];
799 }
800 
802 
804  int backoff = std::min(attempts_, 6);
805  return std::min(backoff ? 1 << (backoff - 1) : 0, kConnectInterval);
806 }
807 
808 void SandeshClientSMImpl::UpdateEventEnqueue(const sc::event_base &event) {
809  UpdateEventStats(event, true, false);
810 }
811 
812 void SandeshClientSMImpl::UpdateEventDequeue(const sc::event_base &event) {
813  UpdateEventStats(event, false, false);
814 }
815 
816 void SandeshClientSMImpl::UpdateEventEnqueueFail(const sc::event_base &event) {
817  UpdateEventStats(event, true, true);
818 }
819 
820 void SandeshClientSMImpl::UpdateEventDequeueFail(const sc::event_base &event) {
821  UpdateEventStats(event, false, true);
822 }
823 
824 void SandeshClientSMImpl::UpdateEventStats(const sc::event_base &event,
825  bool enqueue, bool fail) {
826  std::string event_name(TYPE_NAME(event));
827  tbb::mutex::scoped_lock lock(mutex_);
828  event_stats_.Update(event_name, enqueue, fail);
829  lock.release();
830 }
831 
833  if (deleted_) return true;
834  in_dequeue_ = true;
835 
837  if (ec.validate.empty() || ec.validate(this)) {
838  if ((state()!=ESTABLISHED)||(TYPE_NAME(*ec.event)!="scm::EvSandeshSend"))
839  SM_LOG(DEBUG, "Processing " << TYPE_NAME(*ec.event) << " in state "
840  << StateName());
842  process_event(*ec.event);
843  } else {
844  SM_LOG(DEBUG, "Discarding " << TYPE_NAME(*ec.event) << " in state "
845  << StateName());
847  }
848 
849  ec.event.reset();
850  in_dequeue_ = false;
851  return true;
852 }
853 
854 void SandeshClientSMImpl::unconsumed_event(const sc::event_base &event) {
855  SM_LOG(DEBUG, "Unconsumed " << TYPE_NAME(event) << " in state "
856  << StateName());
857 }
858 
859 // This class determines whether a given class has a method called 'validate'
860 template<typename Ev>
861 struct HasValidate
862 {
863  template<typename T, bool (T::*)(SandeshClientSMImpl *) const> struct SFINAE {};
864  template<typename T> static char Test(SFINAE<T, &T::validate>*);
865  template<typename T> static int Test(...);
866  static const bool Has = sizeof(Test<Ev>(0)) == sizeof(char);
867 };
868 
869 template <typename Ev, bool has_validate>
870 struct ValidateFn {
871  EvValidate operator()(const Ev *event) { return NULL; }
872 };
873 
874 template <typename Ev>
875 struct ValidateFn<Ev, true> {
876  EvValidate operator()(const Ev *event) {
877  return boost::bind(&Ev::validate, event, _1);
878  }
879 };
880 
881 template <typename Ev>
882 void SandeshClientSMImpl::Enqueue(const Ev &event) {
883  if (deleted_) return;
884 
885  EventContainer ec;
886  ec.event = event.intrusive_from_this();
887  ec.validate = ValidateFn<Ev, HasValidate<Ev>::Has>()(static_cast<const Ev *>(ec.event.get()));
888  if (!work_queue_.Enqueue(ec)) {
889  // XXX - Disable till we implement bounded work queues
890  //UpdateEventEnqueueFail(event);
891  //return;
892  }
893  UpdateEventEnqueue(event);
894  return;
895 }
896 
897 
899  int sm_task_instance, int sm_task_id, bool periodicuve)
900  : SandeshClientSM(mgr),
901  work_queue_(sm_task_id, sm_task_instance,
902  boost::bind(&SandeshClientSMImpl::DequeueEvent, this, _1)),
903  connect_timer_(TimerManager::CreateTimer(*evm->io_service(), "Client Connect timer", sm_task_id, sm_task_instance)),
904  idle_hold_timer_(TimerManager::CreateTimer(*evm->io_service(), "Client Idle hold timer", sm_task_id, sm_task_instance)),
905  statistics_timer_(TimerManager::CreateTimer(*evm->io_service(), "Client Tick and Statistics timer", sm_task_id, sm_task_instance)),
906  idle_hold_time_(0),
907  statistics_timer_interval_(kTickInterval),
908  periodicuve_(periodicuve),
909  attempts_(0),
910  deleted_(false),
911  in_dequeue_(false),
912  connects_(0),
913  coll_name_(string()) {
914  state_ = IDLE;
915  generator_key_ = Sandesh::source() + ":" + Sandesh::node_type() + ":" +
917  initiate();
919 }
920 
922  tbb::mutex::scoped_lock lock(mutex_);
923 
924  assert(!deleted_);
925  deleted_ = true;
926 
927  //
928  // XXX Temporary hack until config task is made to run exclusively wrt
929  // all other threads including main()
930  //
931 
932  while (!work_queue_.IsQueueEmpty()) usleep(100);
933 
934  while (in_dequeue_) usleep(100);
935 
936  // The State Machine should have already been shutdown by this point.
937  assert(!session());
938 
940 
941  //
942  // Explicitly call the state destructor before the state machine itself.
943  // This is needed because some of the destructors access the state machine
944  // context.
945  //
946  terminate();
947 
948  //
949  // Delete timer after state machine is terminated so that there is no
950  // possible reference to the timers being deleted any more
951  //
954 
957 }
958 
962  boost::bind(&SandeshClientSMImpl::TimerErrorHanlder, this, _1,
963  _2));
964 }
965 
967  if (deleted_ || generator_key_.empty()) {
968  return true;
969  }
970  std::vector<SandeshStateMachineEvStats> ev_stats;
971  tbb::mutex::scoped_lock lock(mutex_);
972  event_stats_.Get(&ev_stats);
973  lock.release();
974  // Send the message
975  ModuleClientState mcs;
976  mcs.set_name(generator_key_);
977  mcs.set_sm_queue_count(work_queue_.Length());
978  mcs.set_max_sm_queue_count(work_queue_.max_queue_len());
979  // Sandesh state machine statistics
980  SandeshStateMachineStats sm_stats;
981  sm_stats.set_ev_stats(ev_stats);
982  sm_stats.set_state(StateName());
983  sm_stats.set_last_state(LastStateName());
984  sm_stats.set_last_event(last_event());
985  sm_stats.set_state_since(state_since_);
986  sm_stats.set_last_event_at(last_event_at_);
987  mcs.set_sm_stats(sm_stats);
988  // Sandesh session statistics
989  SandeshSession *session = this->session();
990  if (session) {
991  mcs.set_session_stats(session->GetStats());
992  SocketIOStats rx_stats;
993  session->GetRxSocketStats(rx_stats);
994  mcs.set_session_rx_socket_stats(rx_stats);
995  SocketIOStats tx_stats;
996  session->GetTxSocketStats(tx_stats);
997  mcs.set_session_tx_socket_stats(tx_stats);
998  }
999  SandeshModuleClientTrace::Send(mcs);
1000  SendUVE();
1001 
1002  map<string,uint32_t> inpMap;
1003  SandeshUVETypeMaps::SyncAllMaps(inpMap, true);
1004 
1005  return true;
1006 }
1007 
1009  const std::vector<TcpServer::Endpoint>& collectors) {
1010  Enqueue(scm::EvCollectorUpdate(collectors));
1011 }
1012 
1014  std::vector<TcpServer::Endpoint>& collectors) {
1015  collectors = collectors_;
1016 }
1017 
1019  if (collectors_.size()) {
1020  return collectors_[collector_index_];
1021  }
1022  return TcpServer::Endpoint();
1023 }
1024 
1026  if (collectors_.size()) {
1027  if (++collector_index_ == collectors_.size()) {
1028  collector_index_ = 0;
1029  }
1030  return collectors_[collector_index_];
1031  }
1032  return TcpServer::Endpoint();
1033 }
1034 
1036  const std::vector<TcpServer::Endpoint>& collectors) {
1037  collectors_ = collectors;
1038  collector_index_ = 0;
1039  TcpServer::Endpoint collector(GetCollector());
1040  if (server() != collector) {
1041  set_server(collector);
1042  SendUVE();
1043  return true;
1044  }
1045  SendUVE();
1046  return false;
1047 }
1048 
1051  if (server() != collector) {
1052  set_server(collector);
1053  SendUVE();
1054  return true;
1055  }
1056  return false;
1057 }
1058 
1060  EventManager *evm, Mgr *mgr, int sm_task_instance,
1061  int sm_task_id, bool periodicuve) {
1062  return new SandeshClientSMImpl(evm, mgr, sm_task_instance, sm_task_id, periodicuve);
1063 }
1064 
1065 
TcpServer::Endpoint GetCollector() const
static const char * Name()
virtual void InitializeSMSession(int connects)=0
const SandeshHeader header
TcpServer::Endpoint GetNextCollector()
SandeshSession * session
void set_server(TcpServer::Endpoint e)
bool IsQueueEmpty() const
Definition: queue_task.h:352
bool CollectorUpdate(const std::vector< TcpServer::Endpoint > &collectors)
EvValidate operator()(const Ev *event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
void set_stats_client(StatsClient *stats_client)
void SetCollectors(const std::vector< TcpServer::Endpoint > &collectors)
std::vector< TcpServer::Endpoint > collectors_
sc::result react(const EvSandeshMessageRecv &event)
#define SM_LOG(_Level, _Msg)
bool OnMessage(SandeshSession *session, const std::string &msg)
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
sc::in_state_reaction< Ev, SandeshClientSMImpl,&SandeshClientSMImpl::ReleaseSandesh< Ev > > reaction
size_t max_queue_len() const
Definition: queue_task.h:377
sc::result react(const EvTcpClose &event)
sc::result react(const EvStop &event)
void ReleaseSandesh(const Ev &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvConnectTimerExpired >, sc::custom_reaction< EvTcpConnected >, sc::custom_reaction< EvTcpConnectFail >, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
const std::string last_event() const
const std::string & LastStateName() const
void Fire()
Definition: timer.h:118
static const char * Name()
EvValidate operator()(const Ev *event)
virtual std::string ToString() const
Definition: tcp_session.h:83
sc::result react(const EvSandeshSend &event)
void unconsumed_event(const sc::event_base &event)
sc::result react(const EvSandeshSend &event)
void UpdateEventDequeueFail(const sc::event_base &event)
const std::string & StateName() const
EvTcpDeleteSession(SandeshSession *session)
sc::result react(const EvConnectTimerExpired &event)
static const char * Name()
static const char * Name()
sc::result react(const EvStart &event)
WorkQueue< EventContainer > work_queue_
void set_last_event(const std::string &event)
SandeshSession * session()
EvTcpConnected(SandeshSession *session)
virtual const char * Name() const
Definition: p/sandesh.h:277
static const char * Name()
bool validate(SandeshClientSMImpl *state_machine) const
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
#define SANDESH_LOG(_Level, _Msg)
Definition: p/sandesh.h:474
sc::result react(const EvTcpConnected &event)
static const char * Name()
void UpdateEventEnqueue(const sc::event_base &event)
SandeshClientSMImpl(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve)
boost::function< bool(StateMachine *)> EvValidate
Definition: state_machine.h:33
Established(my_context ctx)
void DeleteTcpSession(const Ev &event)
static SandeshClientSM * CreateClientSM(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve)
void Enqueue(const Ev &event)
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
static std::string node_type()
Definition: p/sandesh.h:295
static const bool Has
sc::result react(const EvCollectorUpdate &event)
tbb::atomic< State > state_
sc::result react(const EvCollectorUpdate &event)
void GetRxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:912
Connect(my_context ctx)
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
Definition: sandesh.cc:890
size_t Length() const
Definition: queue_task.h:356
static void SyncAllMaps(const std::map< std::string, uint32_t > &, bool periodic=false)
Definition: sandesh_uve.cc:60
static std::string module()
Definition: p/sandesh.h:291
sc::result react(const EvTcpClose &event)
SandeshType::type type() const
Definition: p/sandesh.h:314
virtual void DeleteSMSession(SandeshSession *session)=0
bool send_session(Sandesh *snh)
static const char * Name()
sc::result react(const EvSandeshMessageRecv &event)
Disconnect(my_context ctx)
bool SendSandeshUVE(Sandesh *snh)
EvStop(bool enq)
TcpServer::Endpoint server()
static const char * Name()
void UpdateEventDequeue(const sc::event_base &event)
Sandesh::SandeshQueue * send_queue()
void TimerErrorHanlder(std::string name, std::string error)
virtual StatsClient * stats_client() const =0
virtual bool ReceiveMsg(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t header_offset)=0
bool DequeueEvent(EventContainer ec)
void GetTxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:916
void MayBeStartRunner()
Definition: queue_task.h:281
virtual void Release()
Definition: p/sandesh.h:266
void set_collector_name(const std::string &cname)
void OnIdle(const Ev &event)
sc::result react(const EvConnectTimerExpired &event)
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
void SetAdminState(bool down)
static const char * Name()
bool Cancel()
Definition: timer.cc:150
SandeshSession * session
#define TYPE_NAME(_type)
Definition: logging.h:31
sc::transition< Ev, Idle, SandeshClientSMImpl,&SandeshClientSMImpl::OnIdle< Ev > > reaction
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvConnectTimerExpired >, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, sc::custom_reaction< EvSandeshSend >, sc::custom_reaction< EvCollectorUpdate >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
bool cancelled() const
Definition: timer.h:100
void StartConnectTimer(int seconds)
SandeshSession * session
static const char * Name()
static char Test(SFINAE< T,&T::validate > *)
void set_state(State state)
boost::intrusive_ptr< const sc::event_base > event
const SandeshSessionStats & GetStats() const
static const int kConnectTimeout
sc::result react(const EvCollectorUpdate &event)
sc::result react(const EvTcpConnectFail &event)
sc::result react(const EvCollectorUpdate &event)
sc::result react(const EvTcpClose &event)
static const int kIdleHoldTime
EvTcpConnectFail(SandeshSession *session)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
void set_session(SandeshSession *session, bool enq=true)
Endpoint remote_endpoint() const
Definition: tcp_session.h:135
static bool IsLoggingDroppedAllowed(SandeshType::type)
Definition: sandesh.cc:844
static const char * Name()
virtual SandeshSession * CreateSMSession(TcpSession::EventObserver eocb, SandeshReceiveMsgCb rmcb, TcpServer::Endpoint ep)=0
EvCollectorUpdate(const std::vector< TcpServer::Endpoint > &collectors)
SandeshEventStatistics event_stats_
virtual std::string ToString() const =0
void UpdateEventEnqueueFail(const sc::event_base &event)
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
bool running() const
Definition: timer.h:86
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, sc::custom_reaction< EvSandeshSend >, sc::custom_reaction< EvCollectorUpdate >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::result react(const EvCollectorUpdate &event)
bool SendSandesh(Sandesh *snh)
void GetCollectors(std::vector< TcpServer::Endpoint > &collectors)
sc::in_state_reaction< Ev, SandeshClientSMImpl,&SandeshClientSMImpl::DeleteTcpSession< Ev > > reaction
Idle(my_context ctx)
ClientInit(my_context ctx)
EvSandeshSend(Sandesh *snh)
void EnqueDelSession(SandeshSession *session)
State state() const
mpl::list< sc::custom_reaction< EvStart >, sc::custom_reaction< EvStop >, sc::custom_reaction< EvIdleHoldTimerExpired >, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
void StartSession(SandeshClientSMImpl *state_machine)
static std::string source()
Definition: p/sandesh.h:289
Definition: timer.h:54
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
void Get(std::vector< SandeshStateMachineEvStats > *ev_stats) const
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_server.h:30
#define SESSION_LOG(session)
static const std::string state_names[]
void set_idle_hold_time(int idle_hold_time)
EvTcpClose(SandeshSession *session)
EvSandeshMessageRecv(const std::string &msg, const SandeshHeader &header, const std::string &msg_type, const uint32_t &header_offset)
void Update(std::string &event_name, bool enqueue, bool fail)
static EventManager evm
static std::string instance_id()
Definition: p/sandesh.h:293
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
std::vector< TcpServer::Endpoint > collectors_
static const int kConnectInterval
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
sc::result react(const EvIdleHoldTimerExpired &event)