OpenSDN source code
sandesh_client_sm.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_client_sm.cc
7 //
8 // Sandesh Client State Machine
9 //
10 
11 #include <typeinfo>
12 #include <boost/bind/bind.hpp>
13 #include <boost/date_time/posix_time/posix_time.hpp>
14 #include <boost/statechart/custom_reaction.hpp>
15 #include <boost/statechart/event.hpp>
16 #include <boost/statechart/simple_state.hpp>
17 #include <boost/statechart/state.hpp>
18 #include <boost/statechart/state_machine.hpp>
19 #include <boost/statechart/transition.hpp>
20 #include <boost/statechart/in_state_reaction.hpp>
21 
22 #include <base/logging.h>
23 #include <base/timer.h>
24 #include <base/task_annotations.h>
25 #include <base/connection_info.h>
26 #include <io/event_manager.h>
27 
28 #include <sandesh/sandesh_constants.h>
29 #include <sandesh/sandesh_types.h>
30 #include <sandesh/sandesh.h>
31 #include <sandesh/sandesh_uve.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/sandesh_statistics.h>
34 #include "sandesh_client_sm_priv.h"
35 
36 using boost::system::error_code;
37 using std::string;
38 using std::map;
39 using std::vector;
41 using process::ConnectionType;
42 using process::ConnectionStatus;
43 using namespace boost::placeholders;
44 
45 namespace mpl = boost::mpl;
46 namespace sc = boost::statechart;
47 
48 #define SM_LOG(_Level, _Msg) \
49  do { \
50  if (LoggingDisabled()) break; \
51  log4cplus::Logger _Xlogger = Sandesh::logger(); \
52  if (_Xlogger.isEnabledFor(log4cplus::_Level##_LOG_LEVEL)) { \
53  log4cplus::tostringstream _Xbuf; \
54  _Xbuf << _Msg; \
55  _Xlogger.forcedLog(log4cplus::_Level##_LOG_LEVEL, \
56  _Xbuf.str()); \
57  } \
58  } while (false)
59 
60 #define SESSION_LOG(session) \
61  SANDESH_LOG(DEBUG, ((session) ? (session)->ToString() : "*") << ":" << Name())
62 
63 
64 namespace scm {
65 
66 // events
67 struct EvStart : sc::event<EvStart> {
68  static const char * Name() {
69  return "EvStart";
70  }
71 };
72 
73 struct EvStop : sc::event<EvStop> {
74  EvStop() : enq_(true) {}
75  EvStop(bool enq) : enq_(enq) {}
76  static const char * Name() {
77  return "EvStop";
78  }
79  bool enq_;
80 };
81 
82 struct EvCollectorUpdate : sc::event<EvCollectorUpdate> {
83  EvCollectorUpdate(const std::vector<TcpServer::Endpoint> &collectors) :
84  collectors_(collectors) {
85  }
86  static const char * Name() {
87  return "EvCollectorUpdate";
88  }
89  std::vector<TcpServer::Endpoint> collectors_;
90 };
91 
92 struct EvIdleHoldTimerExpired : sc::event<EvIdleHoldTimerExpired> {
93  EvIdleHoldTimerExpired(Timer *timer) : timer_(timer){
94  }
95  static const char * Name() {
96  return "EvIdleHoldTimerExpired";
97  }
98  bool validate() const {
99  return !timer_->cancelled();
100  }
102 };
103 
104 struct EvConnectTimerExpired : sc::event<EvConnectTimerExpired> {
105  EvConnectTimerExpired(Timer *timer) : timer_(timer) {
106  }
107  static const char * Name() {
108  return "EvConnectTimerExpired";
109  }
110  bool validate(SandeshClientSMImpl *state_machine) const {
111  if (timer_->cancelled()) {
112  return false;
113  }
114  return true;
115  }
117 };
118 
119 struct EvTcpConnected : sc::event<EvTcpConnected> {
120  EvTcpConnected(SandeshSession *session) : session(session) {
121  SESSION_LOG(session);
122  }
123  static const char * Name() {
124  return "EvTcpConnected";
125  }
126 
128 };
129 
130 struct EvTcpConnectFail : sc::event<EvTcpConnectFail> {
131  EvTcpConnectFail(SandeshSession *session) : session(session) {
132  SESSION_LOG(session);
133  }
134  static const char * Name() {
135  return "EvTcpConnectFail";
136  }
137 
139 };
140 
141 
142 struct EvTcpClose : sc::event<EvTcpClose> {
143  EvTcpClose(SandeshSession *session) : session(session) {
144  SESSION_LOG(session);
145  };
146  static const char * Name() {
147  return "EvTcpClose";
148  }
149 
151 };
152 
153 // Used to defer the session delete after all events currently on the queue.
154 struct EvTcpDeleteSession : sc::event<EvTcpDeleteSession> {
155  EvTcpDeleteSession(SandeshSession *session) : session(session) {
156  }
157  static const char *Name() {
158  return "EvTcpDeleteSession";
159  }
161 };
162 
163 struct EvSandeshSend : sc::event<EvSandeshSend> {
165  snh(snh) {
166  }
167  static const char * Name() {
168  return "EvSandeshSend";
169  }
171 };
172 
173 struct EvSandeshMessageRecv : sc::event<EvSandeshMessageRecv> {
174  EvSandeshMessageRecv(const std::string &msg, const SandeshHeader& header,
175  const std::string &msg_type, const uint32_t &header_offset) :
176  msg(msg), header(header), msg_type(msg_type), header_offset(header_offset) {
177  };
178  static const char * Name() {
179  return "EvSandeshMessageRecv";
180  }
181  const std::string msg;
182  const SandeshHeader header;
183  const std::string msg_type;
184  const uint32_t header_offset;
185 };
186 
187 
188 // states
189 struct Idle;
190 struct Connect;
191 struct Disconnect;
192 struct ClientInit;
193 struct Established;
194 
195 template <class Ev>
197  typedef sc::transition<Ev, Idle, SandeshClientSMImpl,
198  &SandeshClientSMImpl::OnIdle<Ev> > reaction;
199 };
200 
201 template <class Ev>
203  typedef sc::in_state_reaction<Ev, SandeshClientSMImpl,
204  &SandeshClientSMImpl::ReleaseSandesh<Ev> > reaction;
205 };
206 
207 template <class Ev>
209  typedef sc::in_state_reaction<Ev, SandeshClientSMImpl,
210  &SandeshClientSMImpl::DeleteTcpSession<Ev> > reaction;
211 };
212 
213 
214 struct Idle : public sc::state<Idle, SandeshClientSMImpl> {
215  typedef mpl::list<
216  sc::custom_reaction<EvStart>,
217  sc::custom_reaction<EvStop>,
218  sc::custom_reaction<EvIdleHoldTimerExpired>,
219  sc::custom_reaction<EvCollectorUpdate>,
223 
224  Idle(my_context ctx) : my_base(ctx) {
225  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
226  state_machine->set_state(SandeshClientSM::IDLE);
227  SM_LOG(DEBUG, state_machine->StateName());
228  state_machine->SendUVE();
229  }
230 
231  ~Idle() {
232  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
233  state_machine->CancelIdleHoldTimer();
234  }
235 
236  sc::result react(const EvStart &event) {
237  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
238  if (state_machine->idle_hold_time()) {
239  // Update connection info
240  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
241  std::string(), ConnectionStatus::INIT,
242  state_machine->server(),
243  state_machine->StateName() + " : " + event.Name());
244  state_machine->StartIdleHoldTimer();
245  } else {
246  // Update connection info
247  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
248  std::string(), ConnectionStatus::DOWN,
249  state_machine->server(),
250  state_machine->StateName() + " : " + event.Name() +
251  " -> Disconnect");
252  return transit<Disconnect>();
253  }
254  return discard_event();
255  }
256 
257  sc::result react(const EvStop &event) {
258  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
259  state_machine->CancelIdleHoldTimer();
260  return discard_event();
261  }
262 
263  sc::result react(const EvCollectorUpdate &event) {
264  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
265  state_machine->CollectorUpdate(event.collectors_);
266  return discard_event();
267  }
268  sc::result react(const EvIdleHoldTimerExpired &event) {
269  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
270  // Update connection info
271  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
272  std::string(), ConnectionStatus::INIT,
273  state_machine->server(),
274  state_machine->StateName() + " : " + event.Name() + " -> Connect");
275  return transit<Connect>();
276  }
277 };
278 
279 struct Disconnect : public sc::state<Disconnect, SandeshClientSMImpl> {
280  typedef mpl::list<
282  sc::custom_reaction<EvCollectorUpdate>,
286 
287  Disconnect(my_context ctx) : my_base(ctx) {
288  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
289  state_machine->set_state(SandeshClientSM::DISCONNECT);
290  SM_LOG(DEBUG, state_machine->StateName());
291  state_machine->SendUVE();
292  }
293 
294  sc::result react(const EvCollectorUpdate &event) {
295  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
296  state_machine->CollectorUpdate(event.collectors_);
297  // Update connection info
298  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
299  std::string(), ConnectionStatus::INIT,
300  state_machine->server(),
301  state_machine->StateName() + " : " + event.Name() +
302  " -> Connect");
303  return transit<Connect>();
304  }
305 };
306 
307 struct Connect : public sc::state<Connect, SandeshClientSMImpl> {
308  typedef mpl::list<
310  sc::custom_reaction<EvConnectTimerExpired>,
311  sc::custom_reaction<EvTcpConnected>,
312  sc::custom_reaction<EvTcpConnectFail>,
313  sc::custom_reaction<EvTcpClose>,
314  sc::custom_reaction<EvCollectorUpdate>,
318 
319  static const int kConnectTimeout = 60; // seconds
320 
321  Connect(my_context ctx) : my_base(ctx) {
322  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
323  state_machine->set_state(SandeshClientSM::CONNECT);
324  StartSession(state_machine);
325  state_machine->connect_attempts_inc();
326  SM_LOG(DEBUG, state_machine->StateName() << " : " << "Start Connect timer " <<
327  state_machine->server());
328  state_machine->StartConnectTimer(state_machine->GetConnectTime());
329  state_machine->SendUVE();
330  }
331 
333  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
334  state_machine->CancelConnectTimer();
335  }
336 
337  sc::result react(const EvTcpConnectFail &event) {
338  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
339  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
340  state_machine->CollectorChange();
341  return ToIdle(state_machine, event.Name());
342  }
343 
344 
345  sc::result react(const EvConnectTimerExpired &event) {
346  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
347  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
348  state_machine->CollectorChange();
349  return ToIdle(state_machine, event.Name());
350  }
351 
352  sc::result react(const EvTcpConnected &event) {
353  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
354  SM_LOG(DEBUG, state_machine->StateName() << " : " <<
355  event.Name() << " : " << "Cancelling Connect timer");
356  state_machine->CancelConnectTimer();
357  SandeshSession *session = event.session;
358  // Update connection info
359  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
360  std::string(), ConnectionStatus::INIT, session->remote_endpoint(),
361  state_machine->StateName() + " : " + event.Name());
362  // Start the send queue runner XXX move this to Established or later
363  session->send_queue()->MayBeStartRunner();
364  return transit<ClientInit>();
365  }
366 
367  sc::result react(const EvTcpClose &event) {
368  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
369  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
370  state_machine->CollectorChange();
371  return ToIdle(state_machine, event.Name());
372  }
373 
374  sc::result react(const EvCollectorUpdate &event) {
375  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
376  if (state_machine->CollectorUpdate(event.collectors_)) {
377  return ToIdle(state_machine, event.Name());
378  }
379  return discard_event();
380  }
381 
382  // Create an active connection request.
383  void StartSession(SandeshClientSMImpl *state_machine) {
384  SandeshSession *session = static_cast<SandeshSession *>(
385  state_machine->GetMgr()->CreateSMSession(
387  state_machine, _1, _2),
388  boost::bind(&SandeshClientSMImpl::OnMessage,
389  state_machine, _2, _1),
390  state_machine->server()));
391  session->set_stats_client(state_machine->GetMgr()->stats_client());
392  state_machine->set_session(session);
393  }
394 
395  sc::result ToIdle(SandeshClientSMImpl *state_machine,
396  const char *event_name) {
397  // Update connection info
398  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
399  std::string(), ConnectionStatus::DOWN,
400  state_machine->session()->remote_endpoint(),
401  state_machine->StateName() + " : " + event_name);
402  state_machine->set_idle_hold_time(state_machine->GetConnectTime() * 1000);
403  state_machine->OnIdle<EvStop>(EvStop());
404  state_machine->StartIdleHoldTimer();
405  return transit<Idle>();
406  }
407 };
408 
409 struct ClientInit : public sc::state<ClientInit, SandeshClientSMImpl> {
410  typedef mpl::list<
412  sc::custom_reaction<EvConnectTimerExpired>,
413  sc::custom_reaction<EvTcpClose>,
414  sc::custom_reaction<EvSandeshMessageRecv>,
415  sc::custom_reaction<EvSandeshSend>,
416  sc::custom_reaction<EvCollectorUpdate>,
419 
420  ClientInit(my_context ctx) : my_base(ctx) {
421  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
422  state_machine->set_state(SandeshClientSM::CLIENT_INIT);
423  SM_LOG(DEBUG, state_machine->StateName());
424  state_machine->CancelConnectTimer();
425  state_machine->StartConnectTimer(state_machine->GetConnectTime());
426  state_machine->GetMgr()->InitializeSMSession(state_machine->connects_inc());
427  state_machine->SendUVE();
428  }
429 
431  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
432  state_machine->CancelConnectTimer();
433  }
434 
435  sc::result react(const EvTcpClose &event) {
436  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
437  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
438  state_machine->CollectorChange();
439  return ToIdle(state_machine, event.Name());
440  }
441 
442  sc::result react(const EvConnectTimerExpired &event) {
443  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
444  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
445  state_machine->CollectorChange();
446  return ToIdle(state_machine, event.Name());
447  }
448 
449  sc::result react(const EvSandeshMessageRecv &event) {
450  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
451  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
452 
453  if (!state_machine->GetMgr()->ReceiveMsg(event.msg, event.header,
454  event.msg_type, event.header_offset)) {
455  return ToIdle(state_machine, event.Name());
456  }
457  state_machine->set_collector_name(event.header.get_Source());
458 
459  if (event.header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
460  // Update connection info
461  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
462  std::string(), ConnectionStatus::UP,
463  state_machine->session()->remote_endpoint(),
464  state_machine->StateName() + " : Control " + event.Name());
465  return transit<Established>();
466  }
467  return discard_event();
468  }
469 
470  sc::result react(const EvSandeshSend &event) {
471  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
472  Sandesh *snh(event.snh);
473  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name() <<
474  " : " << snh->Name());
475  if (dynamic_cast<SandeshUVE *>(snh)) {
477  SANDESH_LOG(ERROR, "SANDESH: Send FAILED: " <<
478  snh->ToString());
479  }
481  SandeshTxDropReason::WrongClientSMState);
482  SM_LOG(INFO, "Received UVE message in wrong state : " << snh->Name());
483  snh->Release();
484  return discard_event();
485  }
486  if (!state_machine->send_session(snh)) {
487  SM_LOG(INFO, "Could not EnQ Sandesh :" << snh->Name());
488  // If Enqueue encounters an error, it will release the Sandesh
489  }
490  return discard_event();
491  }
492 
493  sc::result react(const EvCollectorUpdate &event) {
494  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
495  if (state_machine->CollectorUpdate(event.collectors_)) {
496  return ToIdle(state_machine, event.Name());
497  }
498  return discard_event();
499  }
500 
501  sc::result ToIdle(SandeshClientSMImpl *state_machine,
502  const char *event_name) {
503  // Update connection info
504  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
505  std::string(), ConnectionStatus::DOWN,
506  state_machine->session()->remote_endpoint(),
507  state_machine->StateName() + " : " + event_name);
508  state_machine->OnIdle<EvStop>(EvStop());
509  state_machine->StartIdleHoldTimer();
510  SM_LOG(INFO, "Return to idle with " << state_machine->idle_hold_time());
511  return transit<Idle>();
512  }
513 };
514 
515 struct Established : public sc::state<Established, SandeshClientSMImpl> {
516  typedef mpl::list<
518  sc::custom_reaction<EvTcpClose>,
519  sc::custom_reaction<EvSandeshMessageRecv>,
520  sc::custom_reaction<EvSandeshSend>,
521  sc::custom_reaction<EvCollectorUpdate>,
524 
525  Established(my_context ctx) : my_base(ctx) {
526  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
527  state_machine->set_state(SandeshClientSM::ESTABLISHED);
528  SM_LOG(DEBUG, state_machine->StateName());
529  state_machine->connect_attempts_clear();
530  // Update connection info
531  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
532  std::string(), ConnectionStatus::UP,
533  state_machine->session()->remote_endpoint(),
534  state_machine->StateName());
535  state_machine->SendUVE();
536  }
537 
539  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
540  state_machine->set_collector_name(string());
541  }
542 
543  sc::result react(const EvTcpClose &event) {
544  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
545  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
546  if (state_machine->CollectorChange()) {
547  // Update connection info
548  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
549  std::string(), ConnectionStatus::INIT,
550  state_machine->session()->remote_endpoint(),
551  state_machine->StateName() + " : " + event.Name() +
552  " -> Connect");
553  state_machine->OnIdle<EvStop>(EvStop());
554  return transit<Connect>();
555  } else {
556  return ToIdle(state_machine, event.Name());
557  }
558  }
559 
560  sc::result react(const EvSandeshMessageRecv &event) {
561  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
562  SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
563 
564  if (!state_machine->GetMgr()->ReceiveMsg(event.msg, event.header,
565  event.msg_type, event.header_offset)) {
566  return ToIdle(state_machine, event.Name());
567  }
568  return discard_event();
569  }
570 
571  sc::result react(const EvSandeshSend &event) {
572  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
573  //SM_LOG(DEBUG, state_machine->StateName() << " : " << event.Name());
574  if (!state_machine->send_session(event.snh)) {
575  SM_LOG(ERROR, "Could not EnQ Sandesh :" << event.snh->Name());
576  // If Enqueue encounters an error, it will release the Sandesh
577  }
578  return discard_event();
579  }
580 
581  sc::result react(const EvCollectorUpdate &event) {
582  SandeshClientSMImpl *state_machine = &context<SandeshClientSMImpl>();
583  if (state_machine->CollectorUpdate(event.collectors_)) {
584  // Update connection info
585  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
586  std::string(), ConnectionStatus::INIT,
587  state_machine->session()->remote_endpoint(),
588  state_machine->StateName() + " : " + event.Name() +
589  " -> Connect");
590  state_machine->OnIdle<EvStop>(EvStop());
591  return transit<Connect>();
592  }
593  return discard_event();
594  }
595 
596  sc::result ToIdle(SandeshClientSMImpl *state_machine,
597  const char *event_name) {
598  // Update connection info
599  ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
600  std::string(), ConnectionStatus::DOWN,
601  state_machine->session()->remote_endpoint(),
602  state_machine->StateName() + " : " + event_name);
603  state_machine->OnIdle<EvStop>(EvStop());
604  state_machine->StartIdleHoldTimer();
605  return transit<Idle>();
606  }
607 
608 };
609 
610 } // namespace scm
611 
613  if (down) {
614  Enqueue(scm::EvStop());
615  } else {
616  reset_idle_hold_time();
617  // On fresh restart of state machine, all previous state should be reset
618  reset_last_info();
619  Enqueue(scm::EvStart());
620  }
621 }
622 
624  Enqueue(scm::EvTcpDeleteSession(session));
625 }
626 
627 template <class Ev>
628 void SandeshClientSMImpl::OnIdle(const Ev &event) {
629  // Release all resources
630  set_idle_hold_time(idle_hold_time() ? idle_hold_time() : kIdleHoldTime);
631 
632  CancelIdleHoldTimer();
633 
634  set_session(NULL, event.enq_);
635 }
636 
637 template <class Ev>
638 void SandeshClientSMImpl::ReleaseSandesh(const Ev &event) {
639  Sandesh *snh(event.snh);
641  SANDESH_LOG(ERROR, "SANDESH: Send FAILED: " << snh->ToString());
642  }
644  SandeshTxDropReason::WrongClientSMState);
645  SM_LOG(DEBUG, "Wrong state: " << StateName() << " for event: " <<
646  event.Name() << " message: " << snh->Name());
647  snh->Release();
648 }
649 
650 template <class Ev>
652  GetMgr()->DeleteSMSession(event.session);
653 }
654 
656  connect_timer_->Start(seconds * 1000,
657  boost::bind(&SandeshClientSMImpl::ConnectTimerExpired, this),
658  boost::bind(&SandeshClientSMImpl::TimerErrorHanlder, this, _1, _2));
659 }
660 
662  connect_timer_->Cancel();
663 }
664 
666  return connect_timer_->running();
667 }
668 
670  if (idle_hold_time_ <= 0)
671  return;
672 
673  idle_hold_timer_->Start(idle_hold_time_,
674  boost::bind(&SandeshClientSMImpl::IdleHoldTimerExpired, this),
675  boost::bind(&SandeshClientSMImpl::TimerErrorHanlder, this, _1, _2));
676 }
677 
679  idle_hold_timer_->Cancel();
680 }
681 
683  return idle_hold_timer_->running();
684 }
685 
686 //
687 // Test Only API : Start
688 //
690  connect_timer_->Fire();
691 }
692 
694  idle_hold_timer_->Fire();
695 }
696 //
697 // Test Only API : End
698 //
699 
700 void SandeshClientSMImpl::TimerErrorHanlder(std::string name, std::string error) {
701  SM_LOG(ERROR, name + " error: " + error);
702 }
703 
704 // Client only
706  if (!deleted_) {
707  SM_LOG(DEBUG, server() << " "
708  << "EvConnectTimerExpired in state " << StateName());
709  Enqueue(scm::EvConnectTimerExpired(connect_timer_));
710  }
711  return false;
712 }
713 
715  Enqueue(scm::EvIdleHoldTimerExpired(idle_hold_timer_));
716  return false;
717 }
718 
720  TcpSession *session, TcpSession::Event event) {
721  SandeshSession *sandesh_session = dynamic_cast<SandeshSession *>(session);
722  assert((session != NULL) == (sandesh_session != NULL));
723  std::string session_s = session ? session->ToString() : "*";
724  switch (event) {
726  SM_LOG(DEBUG, session_s << " " << __func__ <<
727  " " << "TCP Connected");
728  Enqueue(scm::EvTcpConnected(sandesh_session));
729  break;
731  SM_LOG(DEBUG, session_s << " " << __func__ <<
732  " " << "TCP Connect Failed");
733  Enqueue(scm::EvTcpConnectFail(sandesh_session));
734  break;
735  case TcpSession::CLOSE:
736  SM_LOG(DEBUG, session_s << " " << __func__ <<
737  " " << "TCP Connection Closed");
738  Enqueue(scm::EvTcpClose(sandesh_session));
739  break;
740  case TcpSession::ACCEPT:
741  default:
742  SM_LOG(DEBUG, session_s << " " << "Unknown event: " <<
743  event);
744  break;
745  }
746 }
747 
749  Enqueue(scm::EvSandeshSend(snh));
750  return true;
751 }
752 
754  Enqueue(scm::EvSandeshSend(snh));
755  return true;
756 }
757 
759  const std::string &msg) {
760  // Demux based on Sandesh message type
761  SandeshHeader header;
762  std::string message_type;
763  uint32_t xml_offset = 0;
764 
765  // Extract the header and message type
766  int ret = SandeshReader::ExtractMsgHeader(msg, header, message_type,
767  xml_offset);
768  if (ret) {
769  SM_LOG(ERROR, "OnMessage in state: " << StateName() << ": Extract "
770  << " FAILED(" << ret << ")");
771  return false;
772  }
773 
774  if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
775  SM_LOG(INFO, "OnMessage control in state: " << StateName() );
776  }
777  Enqueue(scm::EvSandeshMessageRecv(msg, header, message_type, xml_offset));
778  return true;
779 }
780 
781 static const std::string state_names[] = {
782  "Idle",
783  "Disconnect",
784  "Connect",
785  "ClientInit",
786  "Established",
787 };
788 
790  SandeshClientSM::State state) const {
791  return state_names[state];
792 }
793 
794 const string &SandeshClientSMImpl::StateName() const {
795  return state_names[state_];
796 }
797 
798 const string &SandeshClientSMImpl::LastStateName() const {
799  return state_names[last_state_];
800 }
801 
803 
805  int backoff = std::min(attempts_, 6);
806  return std::min(backoff ? 1 << (backoff - 1) : 0, kConnectInterval);
807 }
808 
809 void SandeshClientSMImpl::UpdateEventEnqueue(const sc::event_base &event) {
810  UpdateEventStats(event, true, false);
811 }
812 
813 void SandeshClientSMImpl::UpdateEventDequeue(const sc::event_base &event) {
814  UpdateEventStats(event, false, false);
815 }
816 
817 void SandeshClientSMImpl::UpdateEventEnqueueFail(const sc::event_base &event) {
818  UpdateEventStats(event, true, true);
819 }
820 
821 void SandeshClientSMImpl::UpdateEventDequeueFail(const sc::event_base &event) {
822  UpdateEventStats(event, false, true);
823 }
824 
825 void SandeshClientSMImpl::UpdateEventStats(const sc::event_base &event,
826  bool enqueue, bool fail) {
827  std::string event_name(TYPE_NAME(event));
828  std::scoped_lock lock(mutex_);
829  event_stats_.Update(event_name, enqueue, fail);
830 }
831 
833  if (deleted_) return true;
834  in_dequeue_ = true;
835 
836  set_last_event(TYPE_NAME(*ec.event));
837  if (ec.validate.empty() || ec.validate(this)) {
838  if ((state()!=ESTABLISHED)||(TYPE_NAME(*ec.event)!="scm::EvSandeshSend"))
839  SM_LOG(DEBUG, "Processing " << TYPE_NAME(*ec.event) << " in state "
840  << StateName());
841  UpdateEventDequeue(*ec.event);
842  process_event(*ec.event);
843  } else {
844  SM_LOG(DEBUG, "Discarding " << TYPE_NAME(*ec.event) << " in state "
845  << StateName());
846  UpdateEventDequeueFail(*ec.event);
847  }
848 
849  ec.event.reset();
850  in_dequeue_ = false;
851  return true;
852 }
853 
854 void SandeshClientSMImpl::unconsumed_event(const sc::event_base &event) {
855  SM_LOG(DEBUG, "Unconsumed " << TYPE_NAME(event) << " in state "
856  << StateName());
857 }
858 
859 // This class determines whether a given class has a method called 'validate'
860 template<typename Ev>
861 struct HasValidate
862 {
863  template<typename T, bool (T::*)(SandeshClientSMImpl *) const> struct SFINAE {};
864  template<typename T> static char Test(SFINAE<T, &T::validate>*);
865  template<typename T> static int Test(...);
866  static const bool Has = sizeof(Test<Ev>(0)) == sizeof(char);
867 };
868 
869 template <typename Ev, bool has_validate>
870 struct ValidateFn {
871  EvValidate operator()(const Ev *event) { return NULL; }
872 };
873 
874 template <typename Ev>
875 struct ValidateFn<Ev, true> {
876  EvValidate operator()(const Ev *event) {
877  return boost::bind(&Ev::validate, event, _1);
878  }
879 };
880 
881 template <typename Ev>
882 void SandeshClientSMImpl::Enqueue(const Ev &event) {
883  if (deleted_) return;
884 
885  EventContainer ec;
886  ec.event = event.intrusive_from_this();
887  ec.validate = ValidateFn<Ev, HasValidate<Ev>::Has>()(static_cast<const Ev *>(ec.event.get()));
888  if (!work_queue_.Enqueue(ec)) {
889  // XXX - Disable till we implement bounded work queues
890  //UpdateEventEnqueueFail(event);
891  //return;
892  }
893  UpdateEventEnqueue(event);
894  return;
895 }
896 
897 
899  int sm_task_instance, int sm_task_id, bool periodicuve)
900  : SandeshClientSM(mgr),
901  work_queue_(sm_task_id, sm_task_instance,
902  boost::bind(&SandeshClientSMImpl::DequeueEvent, this, _1)),
903  connect_timer_(TimerManager::CreateTimer(*evm->io_service(), "Client Connect timer", sm_task_id, sm_task_instance)),
904  idle_hold_timer_(TimerManager::CreateTimer(*evm->io_service(), "Client Idle hold timer", sm_task_id, sm_task_instance)),
905  statistics_timer_(TimerManager::CreateTimer(*evm->io_service(), "Client Tick and Statistics timer", sm_task_id, sm_task_instance)),
906  idle_hold_time_(0),
907  statistics_timer_interval_(kTickInterval),
908  periodicuve_(periodicuve),
909  attempts_(0),
910  deleted_(false),
911  in_dequeue_(false),
912  connects_(0),
913  coll_name_(string()) {
914  state_ = IDLE;
915  generator_key_ = Sandesh::source() + ":" + Sandesh::node_type() + ":" +
917  initiate();
919 }
920 
922  std::scoped_lock lock(mutex_);
923 
924  assert(!deleted_);
925  deleted_ = true;
926 
927  //
928  // XXX Temporary hack until config task is made to run exclusively wrt
929  // all other threads including main()
930  //
931 
932  while (!work_queue_.IsQueueEmpty()) usleep(100);
933 
934  while (in_dequeue_) usleep(100);
935 
936  // The State Machine should have already been shutdown by this point.
937  assert(!session());
938 
940 
941  //
942  // Explicitly call the state destructor before the state machine itself.
943  // This is needed because some of the destructors access the state machine
944  // context.
945  //
946  terminate();
947 
948  //
949  // Delete timer after state machine is terminated so that there is no
950  // possible reference to the timers being deleted any more
951  //
954 
957 }
958 
962  boost::bind(&SandeshClientSMImpl::TimerErrorHanlder, this, _1,
963  _2));
964 }
965 
967  if (deleted_ || generator_key_.empty()) {
968  return true;
969  }
970  std::vector<SandeshStateMachineEvStats> ev_stats;
971  {
972  std::scoped_lock lock(mutex_);
973  event_stats_.Get(&ev_stats);
974  }
975  // Send the message
976  ModuleClientState mcs;
977  mcs.set_name(generator_key_);
978  mcs.set_sm_queue_count(work_queue_.Length());
979  mcs.set_max_sm_queue_count(work_queue_.max_queue_len());
980  // Sandesh state machine statistics
981  SandeshStateMachineStats sm_stats;
982  sm_stats.set_ev_stats(ev_stats);
983  sm_stats.set_state(StateName());
984  sm_stats.set_last_state(LastStateName());
985  sm_stats.set_last_event(last_event());
986  sm_stats.set_state_since(state_since_);
987  sm_stats.set_last_event_at(last_event_at_);
988  mcs.set_sm_stats(sm_stats);
989  // Sandesh session statistics
990  SandeshSession *session = this->session();
991  if (session) {
992  mcs.set_session_stats(session->GetStats());
993  SocketIOStats rx_stats;
994  session->GetRxSocketStats(rx_stats);
995  mcs.set_session_rx_socket_stats(rx_stats);
996  SocketIOStats tx_stats;
997  session->GetTxSocketStats(tx_stats);
998  mcs.set_session_tx_socket_stats(tx_stats);
999  }
1000  SandeshModuleClientTrace::Send(mcs);
1001  SendUVE();
1002 
1003  map<string,uint32_t> inpMap;
1004  SandeshUVETypeMaps::SyncAllMaps(inpMap, true);
1005 
1006  return true;
1007 }
1008 
1010  const std::vector<TcpServer::Endpoint>& collectors) {
1011  Enqueue(scm::EvCollectorUpdate(collectors));
1012 }
1013 
1015  std::vector<TcpServer::Endpoint>& collectors) {
1016  collectors = collectors_;
1017 }
1018 
1020  if (collectors_.size()) {
1021  return collectors_[collector_index_];
1022  }
1023  return TcpServer::Endpoint();
1024 }
1025 
1027  if (collectors_.size()) {
1028  if (++collector_index_ == collectors_.size()) {
1029  collector_index_ = 0;
1030  }
1031  return collectors_[collector_index_];
1032  }
1033  return TcpServer::Endpoint();
1034 }
1035 
1037  const std::vector<TcpServer::Endpoint>& collectors) {
1038  collectors_ = collectors;
1039  collector_index_ = 0;
1040  TcpServer::Endpoint collector(GetCollector());
1041  if (server() != collector) {
1042  set_server(collector);
1043  SendUVE();
1044  return true;
1045  }
1046  SendUVE();
1047  return false;
1048 }
1049 
1052  if (server() != collector) {
1053  set_server(collector);
1054  SendUVE();
1055  return true;
1056  }
1057  return false;
1058 }
1059 
1061  EventManager *evm, Mgr *mgr, int sm_task_instance,
1062  int sm_task_id, bool periodicuve) {
1063  return new SandeshClientSMImpl(evm, mgr, sm_task_instance, sm_task_id, periodicuve);
1064 }
1065 
1066 
void GetCollectors(std::vector< TcpServer::Endpoint > &collectors)
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
void UpdateEventDequeueFail(const sc::event_base &event)
bool send_session(Sandesh *snh)
void UpdateEventEnqueueFail(const sc::event_base &event)
bool CollectorUpdate(const std::vector< TcpServer::Endpoint > &collectors)
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
std::vector< TcpServer::Endpoint > collectors_
void Enqueue(const Ev &event)
void ReleaseSandesh(const Ev &event)
TcpServer::Endpoint GetNextCollector()
void EnqueDelSession(SandeshSession *session)
const std::string last_event() const
void SetAdminState(bool down)
void DeleteTcpSession(const Ev &event)
bool OnMessage(SandeshSession *session, const std::string &msg)
void StartConnectTimer(int seconds)
bool SendSandeshUVE(Sandesh *snh)
void SetCollectors(const std::vector< TcpServer::Endpoint > &collectors)
WorkQueue< EventContainer > work_queue_
TcpServer::Endpoint GetCollector() const
void set_idle_hold_time(int idle_hold_time)
bool SendSandesh(Sandesh *snh)
void TimerErrorHanlder(std::string name, std::string error)
SandeshEventStatistics event_stats_
void UpdateEventDequeue(const sc::event_base &event)
bool DequeueEvent(EventContainer ec)
void set_state(State state)
const std::string & LastStateName() const
static const int kConnectInterval
void OnIdle(const Ev &event)
const std::string & StateName() const
void unconsumed_event(const sc::event_base &event)
void UpdateEventEnqueue(const sc::event_base &event)
void set_collector_name(const std::string &cname)
void set_session(SandeshSession *session, bool enq=true)
SandeshClientSMImpl(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve)
virtual bool ReceiveMsg(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t header_offset)=0
virtual void InitializeSMSession(int connects)=0
virtual StatsClient * stats_client() const =0
virtual SandeshSession * CreateSMSession(TcpSession::EventObserver eocb, SandeshReceiveMsgCb rmcb, TcpServer::Endpoint ep)=0
SandeshSession * session()
void set_server(TcpServer::Endpoint e)
std::atomic< State > state_
TcpServer::Endpoint server()
static SandeshClientSM * CreateClientSM(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve)
void Get(std::vector< SandeshStateMachineEvStats > *ev_stats) const
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
void set_stats_client(StatsClient *stats_client)
Sandesh::SandeshQueue * send_queue()
const SandeshSessionStats & GetStats() const
static void SyncAllMaps(const std::map< std::string, uint32_t > &, bool periodic=false)
Definition: sandesh_uve.cc:60
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
Definition: sandesh.cc:891
virtual void Release()
Definition: cpp/sandesh.h:268
static std::string source()
Definition: cpp/sandesh.h:291
virtual const char * Name() const
Definition: cpp/sandesh.h:279
SandeshType::type type() const
Definition: cpp/sandesh.h:316
static bool IsLoggingDroppedAllowed(SandeshType::type)
Definition: sandesh.cc:845
static std::string module()
Definition: cpp/sandesh.h:293
static std::string instance_id()
Definition: cpp/sandesh.h:295
static std::string node_type()
Definition: cpp/sandesh.h:297
virtual std::string ToString() const =0
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_server.h:30
@ CONNECT_COMPLETE
Definition: tcp_session.h:46
@ CONNECT_FAILED
Definition: tcp_session.h:47
virtual std::string ToString() const
Definition: tcp_session.h:79
void GetRxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:913
Endpoint remote_endpoint() const
Definition: tcp_session.h:131
void GetTxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:917
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:221
Definition: timer.h:57
bool Cancel()
Definition: timer.cc:149
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:107
size_t Length() const
Definition: queue_task.h:356
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
size_t max_queue_len() const
Definition: queue_task.h:377
bool IsQueueEmpty() const
Definition: queue_task.h:352
void MayBeStartRunner()
Definition: queue_task.h:281
static EventManager evm
#define SANDESH_LOG(_Level, _Msg)
Definition: cpp/sandesh.h:476
#define TYPE_NAME(_type)
Definition: logging.h:32
#define SM_LOG(_Level, _Msg)
#define SESSION_LOG(session)
static const std::string state_names[]
boost::function< bool(StateMachine *)> EvValidate
Definition: state_machine.h:33
static char Test(SFINAE< T, &T::validate > *)
static int Test(...)
boost::intrusive_ptr< const sc::event_base > event
EvValidate operator()(const Ev *event)
EvValidate operator()(const Ev *event)
sc::result react(const EvCollectorUpdate &event)
sc::result react(const EvConnectTimerExpired &event)
sc::result react(const EvTcpClose &event)
sc::result react(const EvSandeshMessageRecv &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvConnectTimerExpired >, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, sc::custom_reaction< EvSandeshSend >, sc::custom_reaction< EvCollectorUpdate >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
ClientInit(my_context ctx)
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
sc::result react(const EvSandeshSend &event)
Connect(my_context ctx)
sc::result react(const EvTcpConnected &event)
sc::result react(const EvTcpClose &event)
sc::result react(const EvConnectTimerExpired &event)
void StartSession(SandeshClientSMImpl *state_machine)
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
sc::result react(const EvTcpConnectFail &event)
sc::result react(const EvCollectorUpdate &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvConnectTimerExpired >, sc::custom_reaction< EvTcpConnected >, sc::custom_reaction< EvTcpConnectFail >, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::in_state_reaction< Ev, SandeshClientSMImpl, &SandeshClientSMImpl::DeleteTcpSession< Ev > > reaction
sc::result react(const EvCollectorUpdate &event)
Disconnect(my_context ctx)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
sc::result react(const EvCollectorUpdate &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, sc::custom_reaction< EvSandeshSend >, sc::custom_reaction< EvCollectorUpdate >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::result react(const EvSandeshMessageRecv &event)
sc::result react(const EvTcpClose &event)
sc::result react(const EvSandeshSend &event)
Established(my_context ctx)
static const char * Name()
std::vector< TcpServer::Endpoint > collectors_
EvCollectorUpdate(const std::vector< TcpServer::Endpoint > &collectors)
bool validate(SandeshClientSMImpl *state_machine) const
static const char * Name()
static const char * Name()
static const char * Name()
EvSandeshMessageRecv(const std::string &msg, const SandeshHeader &header, const std::string &msg_type, const uint32_t &header_offset)
const SandeshHeader header
EvSandeshSend(Sandesh *snh)
static const char * Name()
static const char * Name()
EvStop(bool enq)
static const char * Name()
static const char * Name()
SandeshSession * session
EvTcpClose(SandeshSession *session)
static const char * Name()
EvTcpConnectFail(SandeshSession *session)
SandeshSession * session
static const char * Name()
EvTcpConnected(SandeshSession *session)
SandeshSession * session
static const char * Name()
EvTcpDeleteSession(SandeshSession *session)
sc::result react(const EvCollectorUpdate &event)
sc::result react(const EvStop &event)
Idle(my_context ctx)
mpl::list< sc::custom_reaction< EvStart >, sc::custom_reaction< EvStop >, sc::custom_reaction< EvIdleHoldTimerExpired >, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::result react(const EvStart &event)
sc::result react(const EvIdleHoldTimerExpired &event)
sc::in_state_reaction< Ev, SandeshClientSMImpl, &SandeshClientSMImpl::ReleaseSandesh< Ev > > reaction
sc::transition< Ev, Idle, SandeshClientSMImpl, &SandeshClientSMImpl::OnIdle< Ev > > reaction