12 #include <boost/bind.hpp>
13 #include <boost/date_time/posix_time/posix_time.hpp>
14 #include <boost/statechart/custom_reaction.hpp>
15 #include <boost/statechart/event.hpp>
16 #include <boost/statechart/simple_state.hpp>
17 #include <boost/statechart/state.hpp>
18 #include <boost/statechart/state_machine.hpp>
19 #include <boost/statechart/transition.hpp>
20 #include <boost/statechart/in_state_reaction.hpp>
28 #include <sandesh/sandesh_constants.h>
29 #include <sandesh/sandesh_types.h>
30 #include <sandesh/sandesh.h>
31 #include <sandesh/sandesh_uve.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/sandesh_statistics.h>
36 using boost::system::error_code;
41 using process::ConnectionType;
42 using process::ConnectionStatus;
44 namespace mpl = boost::mpl;
45 namespace sc = boost::statechart;
47 #define SM_LOG(_Level, _Msg) \
49 if (LoggingDisabled()) break; \
50 log4cplus::Logger _Xlogger = Sandesh::logger(); \
51 if (_Xlogger.isEnabledFor(log4cplus::_Level##_LOG_LEVEL)) { \
52 log4cplus::tostringstream _Xbuf; \
54 _Xlogger.forcedLog(log4cplus::_Level##_LOG_LEVEL, \
59 #define SESSION_LOG(session) \
60 SANDESH_LOG(DEBUG, ((session) ? (session)->ToString() : "*") << ":" << Name())
67 static const char *
Name() {
75 static const char *
Name() {
85 static const char *
Name() {
86 return "EvCollectorUpdate";
94 static const char *
Name() {
95 return "EvIdleHoldTimerExpired";
107 return "EvConnectTimerExpired";
123 return "EvTcpConnected";
134 return "EvTcpConnectFail";
157 return "EvTcpDeleteSession";
167 return "EvSandeshSend";
175 msg(msg), header(header), msg_type(msg_type), header_offset(header_offset) {
178 return "EvSandeshMessageRecv";
203 &SandeshClientSMImpl::ReleaseSandesh<Ev> >
reaction;
209 &SandeshClientSMImpl::DeleteTcpSession<Ev> >
reaction;
213 struct Idle :
public sc::state<Idle, SandeshClientSMImpl> {
215 sc::custom_reaction<EvStart>,
216 sc::custom_reaction<EvStop>,
217 sc::custom_reaction<EvIdleHoldTimerExpired>,
218 sc::custom_reaction<EvCollectorUpdate>,
223 Idle(my_context ctx) : my_base(ctx) {
239 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
240 std::string(), ConnectionStatus::INIT,
242 state_machine->
StateName() +
" : " +
event.Name());
246 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
247 std::string(), ConnectionStatus::DOWN,
249 state_machine->
StateName() +
" : " +
event.Name() +
251 return transit<Disconnect>();
253 return discard_event();
259 return discard_event();
265 return discard_event();
270 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
271 std::string(), ConnectionStatus::INIT,
273 state_machine->
StateName() +
" : " +
event.Name() +
" -> Connect");
274 return transit<Connect>();
278 struct Disconnect :
public sc::state<Disconnect, SandeshClientSMImpl> {
281 sc::custom_reaction<EvCollectorUpdate>,
297 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
298 std::string(), ConnectionStatus::INIT,
300 state_machine->
StateName() +
" : " +
event.Name() +
302 return transit<Connect>();
306 struct Connect :
public sc::state<Connect, SandeshClientSMImpl> {
309 sc::custom_reaction<EvConnectTimerExpired>,
310 sc::custom_reaction<EvTcpConnected>,
311 sc::custom_reaction<EvTcpConnectFail>,
312 sc::custom_reaction<EvTcpClose>,
313 sc::custom_reaction<EvCollectorUpdate>,
325 SM_LOG(DEBUG, state_machine->
StateName() <<
" : " <<
"Start Connect timer " <<
354 event.Name() <<
" : " <<
"Cancelling Connect timer");
358 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
360 state_machine->
StateName() +
" : " +
event.Name());
363 return transit<ClientInit>();
378 return discard_event();
386 state_machine, _1, _2),
388 state_machine, _2, _1),
389 state_machine->
server()));
395 const char *event_name) {
397 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
398 std::string(), ConnectionStatus::DOWN,
400 state_machine->
StateName() +
" : " + event_name);
404 return transit<Idle>();
408 struct ClientInit :
public sc::state<ClientInit, SandeshClientSMImpl> {
411 sc::custom_reaction<EvConnectTimerExpired>,
412 sc::custom_reaction<EvTcpClose>,
413 sc::custom_reaction<EvSandeshMessageRecv>,
414 sc::custom_reaction<EvSandeshSend>,
415 sc::custom_reaction<EvCollectorUpdate>,
458 if (event.
header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
460 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
461 std::string(), ConnectionStatus::UP,
463 state_machine->
StateName() +
" : Control " +
event.Name());
464 return transit<Established>();
466 return discard_event();
473 " : " << snh->Name());
474 if (dynamic_cast<SandeshUVE *>(snh)) {
480 SandeshTxDropReason::WrongClientSMState);
481 SM_LOG(INFO,
"Received UVE message in wrong state : " << snh->Name());
483 return discard_event();
486 SM_LOG(INFO,
"Could not EnQ Sandesh :" << snh->Name());
489 return discard_event();
497 return discard_event();
501 const char *event_name) {
503 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
504 std::string(), ConnectionStatus::DOWN,
506 state_machine->
StateName() +
" : " + event_name);
510 return transit<Idle>();
514 struct Established :
public sc::state<Established, SandeshClientSMImpl> {
517 sc::custom_reaction<EvTcpClose>,
518 sc::custom_reaction<EvSandeshMessageRecv>,
519 sc::custom_reaction<EvSandeshSend>,
520 sc::custom_reaction<EvCollectorUpdate>,
530 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
531 std::string(), ConnectionStatus::UP,
547 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
548 std::string(), ConnectionStatus::INIT,
550 state_machine->
StateName() +
" : " +
event.Name() +
553 return transit<Connect>();
567 return discard_event();
574 SM_LOG(ERROR,
"Could not EnQ Sandesh :" << event.
snh->
Name());
577 return discard_event();
584 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
585 std::string(), ConnectionStatus::INIT,
587 state_machine->
StateName() +
" : " +
event.Name() +
590 return transit<Connect>();
592 return discard_event();
596 const char *event_name) {
598 ConnectionState::GetInstance()->Update(ConnectionType::COLLECTOR,
599 std::string(), ConnectionStatus::DOWN,
601 state_machine->
StateName() +
" : " + event_name);
604 return transit<Idle>();
643 SandeshTxDropReason::WrongClientSMState);
645 event.Name() <<
" message: " << snh->
Name());
700 SM_LOG(ERROR, name +
" error: " + error);
707 <<
"EvConnectTimerExpired in state " <<
StateName());
721 assert((session != NULL) == (sandesh_session != NULL));
722 std::string session_s = session ? session->
ToString() :
"*";
725 SM_LOG(DEBUG, session_s <<
" " << __func__ <<
726 " " <<
"TCP Connected");
730 SM_LOG(DEBUG, session_s <<
" " << __func__ <<
731 " " <<
"TCP Connect Failed");
735 SM_LOG(DEBUG, session_s <<
" " << __func__ <<
736 " " <<
"TCP Connection Closed");
741 SM_LOG(DEBUG, session_s <<
" " <<
"Unknown event: " <<
758 const std::string &msg) {
760 SandeshHeader header;
761 std::string message_type;
762 uint32_t xml_offset = 0;
769 <<
" FAILED(" << ret <<
")");
773 if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
825 bool enqueue,
bool fail) {
826 std::string event_name(
TYPE_NAME(event));
827 tbb::mutex::scoped_lock lock(
mutex_);
842 process_event(*ec.
event);
860 template<
typename Ev>
863 template<
typename T,
bool (T::*)(SandeshClientSMImpl *) const>
struct SFINAE {};
864 template<
typename T>
static char Test(SFINAE<T, &T::validate>*);
865 template<
typename T>
static int Test(...);
866 static const bool Has =
sizeof(Test<Ev>(0)) ==
sizeof(char);
869 template <
typename Ev,
bool has_val
idate>
874 template <
typename Ev>
877 return boost::bind(&Ev::validate, event, _1);
881 template <
typename Ev>
886 ec.
event =
event.intrusive_from_this();
899 int sm_task_instance,
int sm_task_id,
bool periodicuve)
901 work_queue_(sm_task_id, sm_task_instance,
903 connect_timer_(
TimerManager::CreateTimer(*evm->io_service(),
"Client Connect timer", sm_task_id, sm_task_instance)),
904 idle_hold_timer_(
TimerManager::CreateTimer(*evm->io_service(),
"Client Idle hold timer", sm_task_id, sm_task_instance)),
905 statistics_timer_(
TimerManager::CreateTimer(*evm->io_service(),
"Client Tick and Statistics timer", sm_task_id, sm_task_instance)),
907 statistics_timer_interval_(kTickInterval),
908 periodicuve_(periodicuve),
913 coll_name_(string()) {
922 tbb::mutex::scoped_lock lock(
mutex_);
970 std::vector<SandeshStateMachineEvStats> ev_stats;
971 tbb::mutex::scoped_lock lock(
mutex_);
975 ModuleClientState mcs;
980 SandeshStateMachineStats sm_stats;
981 sm_stats.set_ev_stats(ev_stats);
987 mcs.set_sm_stats(sm_stats);
991 mcs.set_session_stats(session->
GetStats());
992 SocketIOStats rx_stats;
994 mcs.set_session_rx_socket_stats(rx_stats);
995 SocketIOStats tx_stats;
997 mcs.set_session_tx_socket_stats(tx_stats);
999 SandeshModuleClientTrace::Send(mcs);
1002 map<string,uint32_t> inpMap;
1009 const std::vector<TcpServer::Endpoint>& collectors) {
1014 std::vector<TcpServer::Endpoint>& collectors) {
1036 const std::vector<TcpServer::Endpoint>& collectors) {
1040 if (
server() != collector) {
1051 if (
server() != collector) {
1061 int sm_task_id,
bool periodicuve) {
TcpServer::Endpoint GetCollector() const
static const char * Name()
virtual void InitializeSMSession(int connects)=0
const SandeshHeader header
TcpServer::Endpoint GetNextCollector()
void set_server(TcpServer::Endpoint e)
bool IsQueueEmpty() const
const std::string msg_type
bool CollectorUpdate(const std::vector< TcpServer::Endpoint > &collectors)
EvValidate operator()(const Ev *event)
void IdleHoldTimerFired()
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
void set_stats_client(StatsClient *stats_client)
void SetCollectors(const std::vector< TcpServer::Endpoint > &collectors)
std::vector< TcpServer::Endpoint > collectors_
sc::result react(const EvSandeshMessageRecv &event)
#define SM_LOG(_Level, _Msg)
bool OnMessage(SandeshSession *session, const std::string &msg)
void Shutdown(bool delete_entries=true)
sc::in_state_reaction< Ev, SandeshClientSMImpl,&SandeshClientSMImpl::ReleaseSandesh< Ev > > reaction
size_t max_queue_len() const
sc::result react(const EvTcpClose &event)
sc::result react(const EvStop &event)
void ReleaseSandesh(const Ev &event)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvConnectTimerExpired >, sc::custom_reaction< EvTcpConnected >, sc::custom_reaction< EvTcpConnectFail >, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
const std::string last_event() const
const std::string & LastStateName() const
static const char * Name()
EvValidate operator()(const Ev *event)
virtual std::string ToString() const
sc::result react(const EvSandeshSend &event)
void unconsumed_event(const sc::event_base &event)
sc::result react(const EvSandeshSend &event)
void UpdateEventDequeueFail(const sc::event_base &event)
const std::string & StateName() const
EvTcpDeleteSession(SandeshSession *session)
sc::result react(const EvConnectTimerExpired &event)
int idle_hold_time() const
static const char * Name()
static const char * Name()
sc::result react(const EvStart &event)
bool StatisticsTimerExpired()
WorkQueue< EventContainer > work_queue_
void set_last_event(const std::string &event)
SandeshSession * session()
EvTcpConnected(SandeshSession *session)
virtual const char * Name() const
int GetConnectTime() const
static const char * Name()
bool validate(SandeshClientSMImpl *state_machine) const
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
bool ConnectTimerRunning()
#define SANDESH_LOG(_Level, _Msg)
sc::result react(const EvTcpConnected &event)
bool IdleHoldTimerRunning()
static const char * Name()
void UpdateEventEnqueue(const sc::event_base &event)
SandeshClientSMImpl(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve)
boost::function< bool(StateMachine *)> EvValidate
Established(my_context ctx)
void DeleteTcpSession(const Ev &event)
void StartStatisticsTimer()
static SandeshClientSM * CreateClientSM(EventManager *evm, Mgr *mgr, int sm_task_instance, int sm_task_id, bool periodicuve)
void Enqueue(const Ev &event)
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
static std::string node_type()
sc::result react(const EvCollectorUpdate &event)
tbb::atomic< State > state_
sc::result react(const EvCollectorUpdate &event)
void GetRxSocketStats(SocketIOStats *socket_stats) const
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
static void SyncAllMaps(const std::map< std::string, uint32_t > &, bool periodic=false)
static std::string module()
sc::result react(const EvTcpClose &event)
SandeshType::type type() const
virtual void DeleteSMSession(SandeshSession *session)=0
bool IdleHoldTimerExpired()
bool send_session(Sandesh *snh)
static const char * Name()
sc::result react(const EvSandeshMessageRecv &event)
Disconnect(my_context ctx)
bool ConnectTimerExpired()
bool SendSandeshUVE(Sandesh *snh)
void connect_attempts_clear()
TcpServer::Endpoint server()
static const char * Name()
void UpdateEventDequeue(const sc::event_base &event)
Sandesh::SandeshQueue * send_queue()
void TimerErrorHanlder(std::string name, std::string error)
virtual StatsClient * stats_client() const =0
virtual bool ReceiveMsg(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t header_offset)=0
bool DequeueEvent(EventContainer ec)
void GetTxSocketStats(SocketIOStats *socket_stats) const
void set_collector_name(const std::string &cname)
void OnIdle(const Ev &event)
sc::result react(const EvConnectTimerExpired &event)
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
void connect_attempts_inc()
void SetAdminState(bool down)
int statistics_timer_interval_
static const char * Name()
sc::transition< Ev, Idle, SandeshClientSMImpl,&SandeshClientSMImpl::OnIdle< Ev > > reaction
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvConnectTimerExpired >, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, sc::custom_reaction< EvSandeshSend >, sc::custom_reaction< EvCollectorUpdate >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
void StartConnectTimer(int seconds)
static const char * Name()
static char Test(SFINAE< T,&T::validate > *)
void set_state(State state)
boost::intrusive_ptr< const sc::event_base > event
const SandeshSessionStats & GetStats() const
static const int kConnectTimeout
sc::result react(const EvCollectorUpdate &event)
sc::result react(const EvTcpConnectFail &event)
sc::result react(const EvCollectorUpdate &event)
sc::result react(const EvTcpClose &event)
static const int kIdleHoldTime
void reset_idle_hold_time()
EvTcpConnectFail(SandeshSession *session)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
void set_session(SandeshSession *session, bool enq=true)
void CancelConnectTimer()
Endpoint remote_endpoint() const
static bool IsLoggingDroppedAllowed(SandeshType::type)
static const char * Name()
virtual SandeshSession * CreateSMSession(TcpSession::EventObserver eocb, SandeshReceiveMsgCb rmcb, TcpServer::Endpoint ep)=0
std::string generator_key_
EvCollectorUpdate(const std::vector< TcpServer::Endpoint > &collectors)
SandeshEventStatistics event_stats_
virtual std::string ToString() const =0
void UpdateEventEnqueueFail(const sc::event_base &event)
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
EvConnectTimerExpired(Timer *timer)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, sc::custom_reaction< EvSandeshSend >, sc::custom_reaction< EvCollectorUpdate >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::result react(const EvCollectorUpdate &event)
const uint32_t header_offset
bool SendSandesh(Sandesh *snh)
void GetCollectors(std::vector< TcpServer::Endpoint > &collectors)
virtual ~SandeshClientSMImpl()
sc::in_state_reaction< Ev, SandeshClientSMImpl,&SandeshClientSMImpl::DeleteTcpSession< Ev > > reaction
void CancelIdleHoldTimer()
void StartIdleHoldTimer()
ClientInit(my_context ctx)
EvSandeshSend(Sandesh *snh)
void EnqueDelSession(SandeshSession *session)
mpl::list< sc::custom_reaction< EvStart >, sc::custom_reaction< EvStop >, sc::custom_reaction< EvIdleHoldTimerExpired >, sc::custom_reaction< EvCollectorUpdate >, ReleaseSandesh< EvSandeshSend >::reaction, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
void StartSession(SandeshClientSMImpl *state_machine)
static std::string source()
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
void Get(std::vector< SandeshStateMachineEvStats > *ev_stats) const
bool Enqueue(QueueEntryT entry)
boost::asio::ip::tcp::endpoint Endpoint
EvIdleHoldTimerExpired(Timer *timer)
#define SESSION_LOG(session)
static const std::string state_names[]
void set_idle_hold_time(int idle_hold_time)
EvTcpClose(SandeshSession *session)
EvSandeshMessageRecv(const std::string &msg, const SandeshHeader &header, const std::string &msg_type, const uint32_t &header_offset)
void Update(std::string &event_name, bool enqueue, bool fail)
Timer * statistics_timer_
static std::string instance_id()
static bool DeleteTimer(Timer *Timer)
std::vector< TcpServer::Endpoint > collectors_
static const int kConnectInterval
sc::result ToIdle(SandeshClientSMImpl *state_machine, const char *event_name)
sc::result react(const EvIdleHoldTimerExpired &event)