12 #include <boost/bind/bind.hpp>
13 #include <boost/date_time/posix_time/posix_time.hpp>
14 #include <boost/statechart/custom_reaction.hpp>
15 #include <boost/statechart/event.hpp>
16 #include <boost/statechart/simple_state.hpp>
17 #include <boost/statechart/state.hpp>
18 #include <boost/statechart/state_machine.hpp>
19 #include <boost/statechart/transition.hpp>
20 #include <boost/statechart/in_state_reaction.hpp>
28 #include <sandesh/sandesh_constants.h>
29 #include <sandesh/sandesh_types.h>
30 #include <sandesh/sandesh.h>
31 #include <sandesh/sandesh_uve.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/sandesh_statistics.h>
36 using boost::system::error_code;
41 using process::ConnectionType;
42 using process::ConnectionStatus;
43 using namespace boost::placeholders;
45 namespace mpl = boost::mpl;
46 namespace sc = boost::statechart;
48 #define SM_LOG(_Level, _Msg) \
50 if (LoggingDisabled()) break; \
51 log4cplus::Logger _Xlogger = Sandesh::logger(); \
52 if (_Xlogger.isEnabledFor(log4cplus::_Level##_LOG_LEVEL)) { \
53 log4cplus::tostringstream _Xbuf; \
55 _Xlogger.forcedLog(log4cplus::_Level##_LOG_LEVEL, \
60 #define SESSION_LOG(session) \
61 SANDESH_LOG(DEBUG, ((session) ? (session)->ToString() : "*") << ":" << Name())
68 static const char *
Name() {
76 static const char *
Name() {
84 collectors_(collectors) {
86 static const char *
Name() {
87 return "EvCollectorUpdate";
95 static const char *
Name() {
96 return "EvIdleHoldTimerExpired";
99 return !timer_->cancelled();
108 return "EvConnectTimerExpired";
111 if (timer_->cancelled()) {
124 return "EvTcpConnected";
135 return "EvTcpConnectFail";
158 return "EvTcpDeleteSession";
168 return "EvSandeshSend";
175 const std::string &msg_type,
const uint32_t &header_offset) :
176 msg(msg), header(header), msg_type(msg_type), header_offset(header_offset) {
179 return "EvSandeshMessageRecv";
204 &SandeshClientSMImpl::ReleaseSandesh<Ev> >
reaction;
210 &SandeshClientSMImpl::DeleteTcpSession<Ev> >
reaction;
214 struct Idle :
public sc::state<Idle, SandeshClientSMImpl> {
216 sc::custom_reaction<EvStart>,
217 sc::custom_reaction<EvStop>,
218 sc::custom_reaction<EvIdleHoldTimerExpired>,
219 sc::custom_reaction<EvCollectorUpdate>,
224 Idle(my_context ctx) : my_base(ctx) {
240 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
241 std::string(), ConnectionStatus::INIT,
247 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
248 std::string(), ConnectionStatus::DOWN,
252 return transit<Disconnect>();
254 return discard_event();
260 return discard_event();
266 return discard_event();
271 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
272 std::string(), ConnectionStatus::INIT,
274 state_machine->
StateName() +
" : " + event.
Name() +
" -> Connect");
275 return transit<Connect>();
279 struct Disconnect :
public sc::state<Disconnect, SandeshClientSMImpl> {
282 sc::custom_reaction<EvCollectorUpdate>,
298 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
299 std::string(), ConnectionStatus::INIT,
303 return transit<Connect>();
307 struct Connect :
public sc::state<Connect, SandeshClientSMImpl> {
310 sc::custom_reaction<EvConnectTimerExpired>,
311 sc::custom_reaction<EvTcpConnected>,
312 sc::custom_reaction<EvTcpConnectFail>,
313 sc::custom_reaction<EvTcpClose>,
314 sc::custom_reaction<EvCollectorUpdate>,
319 static const int kConnectTimeout = 60;
324 StartSession(state_machine);
326 SM_LOG(DEBUG, state_machine->
StateName() <<
" : " <<
"Start Connect timer " <<
341 return ToIdle(state_machine, event.
Name());
349 return ToIdle(state_machine, event.
Name());
355 event.
Name() <<
" : " <<
"Cancelling Connect timer");
359 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
364 return transit<ClientInit>();
371 return ToIdle(state_machine, event.
Name());
377 return ToIdle(state_machine, event.
Name());
379 return discard_event();
387 state_machine, _1, _2),
389 state_machine, _2, _1),
390 state_machine->
server()));
396 const char *event_name) {
398 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
399 std::string(), ConnectionStatus::DOWN,
401 state_machine->
StateName() +
" : " + event_name);
405 return transit<Idle>();
409 struct ClientInit :
public sc::state<ClientInit, SandeshClientSMImpl> {
412 sc::custom_reaction<EvConnectTimerExpired>,
413 sc::custom_reaction<EvTcpClose>,
414 sc::custom_reaction<EvSandeshMessageRecv>,
415 sc::custom_reaction<EvSandeshSend>,
416 sc::custom_reaction<EvCollectorUpdate>,
439 return ToIdle(state_machine, event.
Name());
446 return ToIdle(state_machine, event.
Name());
455 return ToIdle(state_machine, event.
Name());
459 if (event.
header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
461 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
462 std::string(), ConnectionStatus::UP,
465 return transit<Established>();
467 return discard_event();
474 " : " << snh->
Name());
481 SandeshTxDropReason::WrongClientSMState);
482 SM_LOG(INFO,
"Received UVE message in wrong state : " << snh->
Name());
484 return discard_event();
487 SM_LOG(INFO,
"Could not EnQ Sandesh :" << snh->
Name());
490 return discard_event();
496 return ToIdle(state_machine, event.
Name());
498 return discard_event();
502 const char *event_name) {
504 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
505 std::string(), ConnectionStatus::DOWN,
507 state_machine->
StateName() +
" : " + event_name);
511 return transit<Idle>();
515 struct Established :
public sc::state<Established, SandeshClientSMImpl> {
518 sc::custom_reaction<EvTcpClose>,
519 sc::custom_reaction<EvSandeshMessageRecv>,
520 sc::custom_reaction<EvSandeshSend>,
521 sc::custom_reaction<EvCollectorUpdate>,
531 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
532 std::string(), ConnectionStatus::UP,
548 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
549 std::string(), ConnectionStatus::INIT,
554 return transit<Connect>();
556 return ToIdle(state_machine, event.
Name());
566 return ToIdle(state_machine, event.
Name());
568 return discard_event();
575 SM_LOG(ERROR,
"Could not EnQ Sandesh :" << event.
snh->
Name());
578 return discard_event();
585 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
586 std::string(), ConnectionStatus::INIT,
591 return transit<Connect>();
593 return discard_event();
597 const char *event_name) {
599 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
600 std::string(), ConnectionStatus::DOWN,
602 state_machine->
StateName() +
" : " + event_name);
605 return transit<Idle>();
616 reset_idle_hold_time();
630 set_idle_hold_time(idle_hold_time() ? idle_hold_time() : kIdleHoldTime);
632 CancelIdleHoldTimer();
634 set_session(NULL, event.enq_);
644 SandeshTxDropReason::WrongClientSMState);
645 SM_LOG(DEBUG,
"Wrong state: " << StateName() <<
" for event: " <<
646 event.Name() <<
" message: " << snh->
Name());
652 GetMgr()->DeleteSMSession(event.session);
656 connect_timer_->Start(seconds * 1000,
662 connect_timer_->Cancel();
666 return connect_timer_->running();
670 if (idle_hold_time_ <= 0)
673 idle_hold_timer_->Start(idle_hold_time_,
679 idle_hold_timer_->Cancel();
683 return idle_hold_timer_->running();
690 connect_timer_->Fire();
694 idle_hold_timer_->Fire();
701 SM_LOG(ERROR, name +
" error: " + error);
707 SM_LOG(DEBUG, server() <<
" "
708 <<
"EvConnectTimerExpired in state " << StateName());
722 assert((session != NULL) == (sandesh_session != NULL));
723 std::string session_s = session ? session->
ToString() :
"*";
726 SM_LOG(DEBUG, session_s <<
" " << __func__ <<
727 " " <<
"TCP Connected");
731 SM_LOG(DEBUG, session_s <<
" " << __func__ <<
732 " " <<
"TCP Connect Failed");
736 SM_LOG(DEBUG, session_s <<
" " << __func__ <<
737 " " <<
"TCP Connection Closed");
742 SM_LOG(DEBUG, session_s <<
" " <<
"Unknown event: " <<
759 const std::string &msg) {
761 SandeshHeader header;
762 std::string message_type;
763 uint32_t xml_offset = 0;
769 SM_LOG(ERROR,
"OnMessage in state: " << StateName() <<
": Extract "
770 <<
" FAILED(" << ret <<
")");
774 if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
775 SM_LOG(INFO,
"OnMessage control in state: " << StateName() );
805 int backoff = std::min(attempts_, 6);
806 return std::min(backoff ? 1 << (backoff - 1) : 0, kConnectInterval);
810 UpdateEventStats(event,
true,
false);
814 UpdateEventStats(event,
false,
false);
818 UpdateEventStats(event,
true,
true);
822 UpdateEventStats(event,
false,
true);
826 bool enqueue,
bool fail) {
827 std::string event_name(
TYPE_NAME(event));
828 std::scoped_lock lock(mutex_);
829 event_stats_.Update(event_name, enqueue, fail);
833 if (deleted_)
return true;
841 UpdateEventDequeue(*ec.
event);
842 process_event(*ec.
event);
846 UpdateEventDequeueFail(*ec.
event);
860 template<
typename Ev>
863 template<
typename T,
bool (T::*)(SandeshClientSMImpl *) const>
struct SFINAE {};
865 template<
typename T>
static int Test(...);
866 static const bool Has =
sizeof(Test<Ev>(0)) ==
sizeof(
char);
869 template <
typename Ev,
bool has_val
idate>
874 template <
typename Ev>
877 return boost::bind(&Ev::validate, event, _1);
881 template <
typename Ev>
883 if (deleted_)
return;
886 ec.
event =
event.intrusive_from_this();
888 if (!work_queue_.Enqueue(ec)) {
893 UpdateEventEnqueue(event);
899 int sm_task_instance,
int sm_task_id,
bool periodicuve)
901 work_queue_(sm_task_id, sm_task_instance,
903 connect_timer_(
TimerManager::CreateTimer(*
evm->io_service(),
"Client Connect timer", sm_task_id, sm_task_instance)),
904 idle_hold_timer_(
TimerManager::CreateTimer(*
evm->io_service(),
"Client Idle hold timer", sm_task_id, sm_task_instance)),
905 statistics_timer_(
TimerManager::CreateTimer(*
evm->io_service(),
"Client Tick and Statistics timer", sm_task_id, sm_task_instance)),
907 statistics_timer_interval_(kTickInterval),
908 periodicuve_(periodicuve),
913 coll_name_(string()) {
922 std::scoped_lock lock(
mutex_);
970 std::vector<SandeshStateMachineEvStats> ev_stats;
972 std::scoped_lock lock(
mutex_);
976 ModuleClientState mcs;
981 SandeshStateMachineStats sm_stats;
982 sm_stats.set_ev_stats(ev_stats);
988 mcs.set_sm_stats(sm_stats);
993 SocketIOStats rx_stats;
995 mcs.set_session_rx_socket_stats(rx_stats);
996 SocketIOStats tx_stats;
998 mcs.set_session_tx_socket_stats(tx_stats);
1000 SandeshModuleClientTrace::Send(mcs);
1003 map<string,uint32_t> inpMap;
1010 const std::vector<TcpServer::Endpoint>& collectors) {
1015 std::vector<TcpServer::Endpoint>& collectors) {
1037 const std::vector<TcpServer::Endpoint>& collectors) {
1041 if (
server() != collector) {
1052 if (
server() != collector) {
1062 int sm_task_id,
bool periodicuve) {
void GetCollectors(std::vector< TcpServer::Endpoint > &collectors)
int statistics_timer_interval_
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
void UpdateEventDequeueFail(const sc::event_base &event)
bool send_session(Sandesh *snh)
void UpdateEventEnqueueFail(const sc::event_base &event)
int idle_hold_time() const
bool CollectorUpdate(const std::vector< TcpServer::Endpoint > &collectors)
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
std::vector< TcpServer::Endpoint > collectors_
std::string generator_key_
void Enqueue(const Ev &event)
bool StatisticsTimerExpired()
virtual ~SandeshClientSMImpl()
void ReleaseSandesh(const Ev &event)
TcpServer::Endpoint GetNextCollector()
void CancelIdleHoldTimer()
void EnqueDelSession(SandeshSession *session)
const std::string last_event() const
void SetAdminState(bool down)
void DeleteTcpSession(const Ev &event)
bool ConnectTimerRunning()
bool OnMessage(SandeshSession *session, const std::string &msg)
void StartConnectTimer(int seconds)
Timer * statistics_timer_
void IdleHoldTimerFired()
bool ConnectTimerExpired()
bool SendSandeshUVE(Sandesh *snh)
void SetCollectors(const std::vector< TcpServer::Endpoint > &collectors)
WorkQueue< EventContainer > work_queue_
TcpServer::Endpoint GetCollector() const
bool IdleHoldTimerRunning()
void set_idle_hold_time(int idle_hold_time)
bool SendSandesh(Sandesh *snh)
void TimerErrorHanlder(std::string name, std::string error)
SandeshEventStatistics event_stats_
void UpdateEventDequeue(const sc::event_base &event)
void connect_attempts_clear()
bool IdleHoldTimerExpired()
bool DequeueEvent(EventContainer ec)
void set_state(State state)
void CancelConnectTimer()
const std::string & LastStateName() const
static const int kConnectInterval
void OnIdle(const Ev &event)
void StartIdleHoldTimer()
void StartStatisticsTimer()
const std::string & StateName() const
void unconsumed_event(const sc::event_base &event)
void UpdateEventEnqueue(const sc::event_base &event)
int GetConnectTime() const
void set_collector_name(const std::string &cname)
void connect_attempts_inc()
void set_session(SandeshSession *session, bool enq=true)
SandeshClientSMImpl(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve)
virtual bool ReceiveMsg(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t header_offset)=0
virtual void InitializeSMSession(int connects)=0
virtual StatsClient * stats_client() const =0
virtual SandeshSession * CreateSMSession(TcpSession::EventObserver eocb, SandeshReceiveMsgCb rmcb, TcpServer::Endpoint ep)=0
SandeshSession * session()
void set_server(TcpServer::Endpoint e)
std::atomic< State > state_
TcpServer::Endpoint server()
static SandeshClientSM * CreateClientSM(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve)
void Get(std::vector< SandeshStateMachineEvStats > *ev_stats) const
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
void set_stats_client(StatsClient *stats_client)
Sandesh::SandeshQueue * send_queue()
const SandeshSessionStats & GetStats() const
static void SyncAllMaps(const std::map< std::string, uint32_t > &, bool periodic=false)
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
static std::string source()
virtual const char * Name() const
SandeshType::type type() const
static bool IsLoggingDroppedAllowed(SandeshType::type)
static std::string module()
static std::string instance_id()
static std::string node_type()
virtual std::string ToString() const =0
boost::asio::ip::tcp::endpoint Endpoint
virtual std::string ToString() const
void GetRxSocketStats(SocketIOStats *socket_stats) const
Endpoint remote_endpoint() const
void GetTxSocketStats(SocketIOStats *socket_stats) const
static bool DeleteTimer(Timer *Timer)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
void Shutdown(bool delete_entries=true)
size_t max_queue_len() const
bool IsQueueEmpty() const
#define SANDESH_LOG(_Level, _Msg)
#define SM_LOG(_Level, _Msg)
#define SESSION_LOG(session)
static const std::string state_names[]
boost::function< bool(StateMachine *)> EvValidate
static char Test(SFINAE< T, &T::validate > *)
boost::intrusive_ptr< const sc::event_base > event
EvValidate operator()(const Ev *event)
EvValidate operator()(const Ev *event)
sc::result react(const EvCollectorUpdate &event)
sc::result react(const EvConnectTimerExpired &event)
sc::result react(const EvTcpClose &event)
sc::result react(const EvSandeshMessageRecv &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvConnectTimerExpired >, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, sc::custom_reaction< EvSandeshSend >, sc::custom_reaction< EvCollectorUpdate >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
ClientInit(my_context ctx)
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
sc::result react(const EvSandeshSend &event)
sc::result react(const EvTcpConnected &event)
sc::result react(const EvTcpClose &event)
sc::result react(const EvConnectTimerExpired &event)
void StartSession(SandeshClientSMImpl *state_machine)
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
sc::result react(const EvTcpConnectFail &event)
sc::result react(const EvCollectorUpdate &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvConnectTimerExpired >, sc::custom_reaction< EvTcpConnected >, sc::custom_reaction< EvTcpConnectFail >, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::in_state_reaction< Ev, SandeshClientSMImpl, &SandeshClientSMImpl::DeleteTcpSession< Ev > > reaction
sc::result react(const EvCollectorUpdate &event)
Disconnect(my_context ctx)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
sc::result react(const EvCollectorUpdate &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, sc::custom_reaction< EvSandeshSend >, sc::custom_reaction< EvCollectorUpdate >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::result react(const EvSandeshMessageRecv &event)
sc::result react(const EvTcpClose &event)
sc::result react(const EvSandeshSend &event)
Established(my_context ctx)
static const char * Name()
std::vector< TcpServer::Endpoint > collectors_
EvCollectorUpdate(const std::vector< TcpServer::Endpoint > &collectors)
bool validate(SandeshClientSMImpl *state_machine) const
static const char * Name()
EvConnectTimerExpired(Timer *timer)
EvIdleHoldTimerExpired(Timer *timer)
static const char * Name()
const std::string msg_type
static const char * Name()
EvSandeshMessageRecv(const std::string &msg, const SandeshHeader &header, const std::string &msg_type, const uint32_t &header_offset)
const uint32_t header_offset
const SandeshHeader header
EvSandeshSend(Sandesh *snh)
static const char * Name()
static const char * Name()
static const char * Name()
static const char * Name()
EvTcpClose(SandeshSession *session)
static const char * Name()
EvTcpConnectFail(SandeshSession *session)
static const char * Name()
EvTcpConnected(SandeshSession *session)
static const char * Name()
EvTcpDeleteSession(SandeshSession *session)
sc::result react(const EvCollectorUpdate &event)
sc::result react(const EvStop &event)
mpl::list< sc::custom_reaction< EvStart >, sc::custom_reaction< EvStop >, sc::custom_reaction< EvIdleHoldTimerExpired >, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::result react(const EvStart &event)
sc::result react(const EvIdleHoldTimerExpired &event)
sc::in_state_reaction< Ev, SandeshClientSMImpl, &SandeshClientSMImpl::ReleaseSandesh< Ev > > reaction
sc::transition< Ev, Idle, SandeshClientSMImpl, &SandeshClientSMImpl::OnIdle< Ev > > reaction