12 #include <boost/bind.hpp>
13 #include <boost/date_time/posix_time/posix_time.hpp>
14 #include <boost/statechart/custom_reaction.hpp>
15 #include <boost/statechart/event.hpp>
16 #include <boost/statechart/simple_state.hpp>
17 #include <boost/statechart/state.hpp>
18 #include <boost/statechart/state_machine.hpp>
19 #include <boost/statechart/transition.hpp>
20 #include <boost/statechart/in_state_reaction.hpp>
28 #include <sandesh/sandesh_constants.h>
29 #include <sandesh/sandesh_types.h>
30 #include <sandesh/sandesh.h>
31 #include <sandesh/sandesh_session.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/sandesh_message_builder.h>
39 using boost::system::error_code;
41 namespace mpl = boost::mpl;
42 namespace sc = boost::statechart;
44 #define SM_LOG(_Level, _Msg) \
46 if (LoggingDisabled()) break; \
47 log4cplus::Logger _Xlogger = Sandesh::logger(); \
48 if (_Xlogger.isEnabledFor(log4cplus::_Level##_LOG_LEVEL)) { \
49 log4cplus::tostringstream _Xbuf; \
50 SandeshStateMachine *_sm = &context<SandeshStateMachine>(); \
51 _Xbuf << _sm->prefix() << _Msg; \
52 _Xlogger.forcedLog(log4cplus::_Level##_LOG_LEVEL, \
57 #define SESSION_LOG(session) \
58 SANDESH_LOG(DEBUG, ((session) ? (session)->ToString() : "*") << ":" << Name())
65 static const char *
Name() {
71 static const char *
Name() {
79 static const char *
Name() {
80 return "EvIdleHoldTimerExpired";
83 return !timer_->cancelled();
92 static const char *
Name() {
93 return "EvTcpPassiveOpen";
107 (state_machine->
session() == session));
118 return "EvTcpDeleteSession";
128 return "EvSandeshMessageRecv";
130 boost::shared_ptr<const SandeshMessage>
msg;
135 const SandeshHeader& header,
136 const std::string &msg_type,
const uint32_t &header_offset) :
137 msg(msg), header(header), msg_type(msg_type),
138 header_offset(header_offset) {
141 return "EvSandeshCtrlMessageRecv";
154 return "EvResourceUpdate";
174 &SandeshStateMachine::ReleaseSandesh<Ev> >
reaction;
180 &SandeshStateMachine::DeleteTcpSession<Ev> >
reaction;
186 &SandeshStateMachine::ProcessMessage<Ev> >
reaction;
189 struct Idle :
public sc::state<Idle, SandeshStateMachine> {
191 sc::custom_reaction<EvStart>,
192 sc::custom_reaction<EvStop>,
193 sc::custom_reaction<EvTcpPassiveOpen>,
194 sc::custom_reaction<EvIdleHoldTimerExpired>,
198 Idle(my_context ctx) : my_base(ctx) {
214 return transit<Active>();
216 return discard_event();
222 return discard_event();
226 return transit<Active>();
234 return discard_event();
238 struct Active :
public sc::state<Active, SandeshStateMachine> {
241 sc::custom_reaction<EvTcpPassiveOpen>,
242 sc::custom_reaction<EvTcpClose>,
259 event.session->set_observer(
261 state_machine, _1, _2));
262 return transit<ServerInit>();
269 return transit<Idle>();
273 struct ServerInit :
public sc::state<ServerInit, SandeshStateMachine> {
276 sc::custom_reaction<EvTcpClose>,
277 sc::custom_reaction<EvSandeshCtrlMessageRecv>,
278 sc::custom_reaction<EvSandeshMessageRecv>,
295 return transit<Idle>();
302 if (!connection->ProcessSandeshCtrlMessage(event.
msg, event.
header,
305 return transit<Idle>();
307 return transit<Established>();
314 if (!connection->ProcessSandeshMessage(event.
msg.get(),
true)) {
316 return transit<Idle>();
318 return discard_event();
322 struct Established :
public sc::state<Established, SandeshStateMachine> {
325 sc::custom_reaction<EvTcpClose>,
326 sc::custom_reaction<EvSandeshMessageRecv>,
328 sc::custom_reaction<EvResourceUpdate>
351 return transit<Idle>();
360 if (!connection->ProcessResourceUpdate(event.
rsc)) {
362 return transit<Idle>();
364 return discard_event();
370 if (!connection->ProcessSandeshMessage(event.
msg.get(),
373 return transit<Idle>();
375 return discard_event();
383 work_queue_(connection->GetTaskId(),
384 connection->GetTaskInstance(),
387 connection_(connection),
390 *connection->server()->event_manager()->io_service(),
392 connection->GetTaskId(),
393 connection->GetTaskInstance())),
398 message_drop_level_(SandeshLevel::
INVALID) {
492 event.session->server()->DeleteSession(event.session);
528 SM_LOG(ERROR, name +
" error: " + error);
540 std::string &drop_level)
const {
549 SandeshStateMachineStats *sm_stats) {
550 std::vector<SandeshStateMachineEvStats> ev_stats;
551 tbb::mutex::scoped_lock elock(
smutex_);
555 sm_stats->set_ev_stats(ev_stats);
564 SandeshGeneratorStats *detail_msg_stats) {
567 SandeshMessageStats detail_agg_stats;
568 tbb::mutex::scoped_lock mlock(
smutex_);
571 detail_msg_stats->set_type_stats(v_detail_type_stats);
572 detail_msg_stats->set_aggregate_stats(detail_agg_stats);
576 SandeshGeneratorBasicStats *basic_msg_stats) {
579 SandeshMessageBasicStats basic_agg_stats;
580 tbb::mutex::scoped_lock mlock(
smutex_);
583 basic_msg_stats->set_type_stats(v_basic_type_stats);
584 basic_msg_stats->set_aggregate_stats(basic_agg_stats);
592 SandeshStateMachineStats *sm_stats,
593 SandeshGeneratorStats *detail_msg_stats) {
605 SandeshGeneratorStats &detail_msg_stats) {
610 SandeshStateMachineStats *sm_stats,
611 SandeshGeneratorBasicStats *basic_msg_stats) {
623 SandeshGeneratorBasicStats &basic_msg_stats) {
635 assert((session != NULL) == (sandesh_session != NULL));
636 std::string session_s = session ? session->
ToString() :
"*";
639 SM_LOG(DEBUG, session_s <<
" " << __func__ <<
640 " " <<
"TCP Connection Closed");
646 SM_LOG(DEBUG, session_s <<
" " <<
"Unknown event: " <<
661 tbb::mutex::scoped_lock lock(
smutex_);
667 tbb::mutex::scoped_lock lock(
smutex_);
672 const std::string &msg) {
675 reinterpret_cast<const uint8_t *>(msg.c_str()), msg.size());
676 if (xmessage == NULL) {
679 SandeshRxDropReason::DecodingFailed);
682 const SandeshHeader &header(xmessage->
GetHeader());
688 SandeshRxDropReason::QueueLevel);
692 if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
693 SandeshHeader ctrl_header;
694 std::string ctrl_message_type;
695 uint32_t ctrl_xml_offset = 0;
698 ctrl_message_type, ctrl_xml_offset);
701 " session " << session->
ToString() <<
": Extract FAILED ("
705 SandeshRxDropReason::ControlMsgFailed);
709 if (header != ctrl_header ||
710 message_type != ctrl_message_type) {
712 " session " << session->
ToString() <<
": Header or message "
713 <<
"type (" << ctrl_message_type <<
") MISMATCH");
716 SandeshRxDropReason::ControlMsgFailed);
721 " session " << session->
ToString());
725 ctrl_message_type, ctrl_xml_offset));
758 if (snh_rcv != NULL) {
782 bool enqueue,
bool fail) {
784 std::string event_name(
TYPE_NAME(event));
785 tbb::mutex::scoped_lock lock(
smutex_);
803 process_event(*ec.
event);
820 template<
typename Ev>
823 template<
typename T,
bool (T::*)(SandeshStateMachine *) const>
struct SFINAE {};
824 template<
typename T>
static char Test(SFINAE<T, &T::validate>*);
825 template<
typename T>
static int Test(...);
826 static const bool Has =
sizeof(Test<Ev>(0)) ==
sizeof(char);
829 template <
typename Ev,
bool has_val
idate>
834 template <
typename Ev>
837 return boost::bind(&Ev::validate, event, _1);
841 template <
typename Ev>
847 ec.
event =
event.intrusive_from_this();
863 SM_LOG(INFO,
"SANDESH MESSAGE DROP LEVEL: [" <<
877 SM_LOG(INFO,
"SANDESH Session Reader Defer : " << defer_reader);
884 bool high(boost::get<2>(wm));
885 size_t queue_count(boost::get<0>(wm));
886 bool defer_undefer(boost::get<3>(wm));
887 boost::function<void (void)> cb;
895 this, _1, boost::get<1>(wm), cb));
904 this, _1, boost::get<1>(wm), cb));
917 dynamic_cast<const ssm::EvSandeshMessageRecv *>(ec->
event.get()));
918 if (snh_rcv == NULL) {
931 return count_.fetch_and_add(msg_size) + msg_size;
933 return count_.fetch_and_increment() + 1;
942 return count_.fetch_and_add((
size_t)(0-msg_size)) - msg_size;
944 return count_.fetch_and_decrement() - 1;
949 SM_LOG(INFO,
"SANDESH Set Defer Dequeue: " << defer_dequeue);
virtual SandeshMessage * Create(const uint8_t *data, size_t size) const =0
const std::string & generator_key() const
const std::string & StateName() const
sc::result react(const EvTcpClose &event)
EvSandeshCtrlMessageRecv(const std::string &msg, const SandeshHeader &header, const std::string &msg_type, const uint32_t &header_offset)
void SetSandeshMessageDropLevel(size_t queue_count, SandeshLevel::type level, boost::function< void(void)> cb)
void DeleteSession(SandeshSession *session)
void ResetQueueWaterMarkInfo()
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshCtrlMessageRecv >, sc::custom_reaction< EvSandeshMessageRecv >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
static const char * Name()
const std::string last_event() const
bool GetEvSandeshMessageRecvSize(SandeshStateMachine::EventContainer *ec, size_t *msg_size)
bool IdleHoldTimerExpired()
EvTcpClose(SandeshSession *session)
EvValidate operator()(const Ev *event)
void UpdateEventEnqueueFail(const sc::event_base &event)
void GetBasicMessageStatistics(SandeshGeneratorBasicStats *basic_msg_stats)
EvTcpPassiveOpen(SandeshSession *session)
sc::result react(const EvSandeshMessageRecv &event)
mpl::list< sc::custom_reaction< EvStart >, sc::custom_reaction< EvStop >, sc::custom_reaction< EvTcpPassiveOpen >, sc::custom_reaction< EvIdleHoldTimerExpired >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
void set_state(ssm::SsmState state)
sc::result react(const EvStop &event)
sc::result react(const EvSandeshCtrlMessageRecv &event)
void Shutdown(bool delete_entries=true)
void reset_idle_hold_time()
SandeshMessageBuilder * builder_
bool GetBasicStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorBasicStats *basic_msg_stats)
void UpdateEventDequeueFail(const sc::event_base &event)
void unconsumed_event(const sc::event_base &event)
void GetDetailMessageStatistics(SandeshGeneratorStats *detail_msg_stats)
const size_t GetSize() const
sc::transition< Ev, Idle, SandeshStateMachine,&SandeshStateMachine::OnIdle< Ev > > reaction
bool get_resource() const
void SetHighWaterMark(const WaterMarkInfos &high_water)
SandeshSession * session() const
EvValidate operator()(const Ev *event)
virtual std::string ToString() const
void set_idle_hold_time(int idle_hold_time)
std::vector< SandeshMessageTypeBasicStats > BasicStatsList
void DeleteTcpSession(const Ev &event)
static const char * Name()
bool IdleHoldTimerRunning()
bool DequeueEvent(EventContainer ec)
void TimerErrorHandler(std::string name, std::string error)
sc::result react(const EvSandeshMessageRecv &event)
boost::shared_ptr< const SandeshMessage > msg
virtual void SetDeferReader(bool defer_reader)
std::vector< SandeshMessageTypeStats > DetailStatsList
const SandeshHeader header
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
SandeshMessageStatistics message_stats_
static const char * Name()
void StartIdleHoldTimer()
void ResourceUpdate(bool rsc)
bool GetQueueCount(uint64_t &queue_count) const
boost::function< bool(StateMachine *)> EvValidate
void UpdateRecv(const std::string &msg_name, uint64_t bytes)
SandeshSession * session()
static const char * Name()
void set_last_event(const std::string &event)
virtual void ProcessDisconnect(SandeshSession *sess)=0
void set_observer(EventObserver observer)
const uint32_t header_offset
void GetEventStatistics(SandeshStateMachineStats *sm_stats)
sc::in_state_reaction< Ev, SandeshStateMachine,&SandeshStateMachine::DeleteTcpSession< Ev > > reaction
virtual void ManagedDelete()=0
void set_session(SandeshSession *session)
void CancelIdleHoldTimer()
bool GetStatistics(SandeshStateMachineStats &sm_stats, SandeshGeneratorBasicStats &basic_msg_stats)
static const char * LevelToString(SandeshLevel::type level)
int idle_hold_time() const
bool validate(SandeshStateMachine *state_machine) const
static const char * Name()
bool GetMessageDropLevel(std::string &drop_level) const
static const int kIdleHoldTime
sc::result react(const EvResourceUpdate &event)
const std::string & LastStateName() const
std::string generator_key_
bool OnSandeshMessage(SandeshSession *session, const std::string &msg)
static const char * Name()
virtual std::string ToString() const
void set_session(SandeshSession *session)
ssm::SsmState last_state_
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
void SetQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm)
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpClose >, sc::custom_reaction< EvSandeshMessageRecv >, DeleteTcpSession< EvTcpDeleteSession >::reaction, sc::custom_reaction< EvResourceUpdate > > reactions
EvSandeshMessageRecv(const SandeshMessage *msg)
void SetConnection(SandeshConnection *connection)
EvIdleHoldTimerExpired(Timer *timer)
const std::string msg_type
bool DoDropSandeshMessage(const SandeshHeader &header, const SandeshLevel::type drop_level)
sc::result react(const EvTcpClose &event)
sc::result react(const EvStart &event)
static const char * Name()
void UpdateRxMsgStats(const std::string &msg_name, size_t msg_size)
void SetAdminState(bool down)
void UpdateRxMsgFailStats(const std::string &msg_name, size_t msg_size, SandeshRxDropReason::type dreason)
SandeshConnection * connection_
void UpdateRecvFailed(const std::string &msg_name, uint64_t bytes, SandeshRxDropReason::type dreason)
void SetDeferDequeue(bool defer)
static const char * Name()
void set_disable(bool disabled)
void Enqueue(const Ev &event)
void IdleHoldTimerFired()
EvTcpDeleteSession(SandeshSession *session)
void PassiveOpen(SandeshSession *session)
boost::intrusive_ptr< const sc::event_base > event
size_t AtomicIncrementQueueCount(EventContainer *entry)
static char Test(SFINAE< T,&T::validate > *)
ServerInit(my_context ctx)
void UpdateEventEnqueue(const sc::event_base &event)
Established(my_context ctx)
size_t AtomicDecrementQueueCount(EventContainer *entry)
sc::result react(const EvIdleHoldTimerExpired &event)
void set_resource(bool r)
void SetLowWaterMark(const WaterMarkInfos &low_water)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
SandeshEventStatistics event_stats_
EvResourceUpdate(bool rsc)
void Get(DetailStatsList *v_detail_type_stats, SandeshMessageStats *detail_agg_stats) const
bool GetDetailStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorStats *detail_msg_stats)
void UpdateEventDequeue(const sc::event_base &event)
void ResetHighWaterMark()
sc::result react(const EvTcpPassiveOpen &event)
static const char * Name()
SandeshStateMachine(const char *prefix, SandeshConnection *connection)
SandeshLevel::type message_drop_level_
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
#define SM_LOG(_Level, _Msg)
void SetDeferSessionReader(bool defer_reader)
#define SESSION_LOG(session)
sc::result react(const EvTcpClose &event)
SandeshSession * session_
mpl::list< TransitToIdle< EvStop >::reaction, sc::custom_reaction< EvTcpPassiveOpen >, sc::custom_reaction< EvTcpClose >, DeleteTcpSession< EvTcpDeleteSession >::reaction > reactions
sc::in_state_reaction< Ev, SandeshStateMachine,&SandeshStateMachine::ProcessMessage< Ev > > reaction
void Get(std::vector< SandeshStateMachineEvStats > *ev_stats) const
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
bool Enqueue(QueueEntryT entry)
static const std::string state_names[]
void OnIdle(const Ev &event)
const std::string & GetMessageType() const
const SandeshHeader & GetHeader() const
void Update(std::string &event_name, bool enqueue, bool fail)
sc::in_state_reaction< Ev, SandeshStateMachine,&SandeshStateMachine::ReleaseSandesh< Ev > > reaction
sc::result react(const EvTcpPassiveOpen &event)
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
bool LogEvent(const sc::event_base *event)
static bool DeleteTimer(Timer *Timer)
tbb::atomic< size_t > count_
tbb::atomic< ssm::SsmState > state_
SandeshConnection * connection()