OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sandesh_state_machine.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_state_machine.h
7 //
8 // Sandesh state machine based on boost::statechart
9 //
10 
11 #ifndef __SANDESH_STATE_MACHINE_H__
12 #define __SANDESH_STATE_MACHINE_H__
13 
14 #include <boost/asio.hpp>
15 #include <boost/ptr_container/ptr_map.hpp>
16 #include <boost/statechart/state_machine.hpp>
17 #include <tbb/mutex.h>
18 #include <tbb/atomic.h>
19 
20 #include <base/queue_task.h>
21 #include <base/timer.h>
22 #include <io/tcp_session.h>
23 
24 #include <sandesh/sandesh_statistics.h>
25 
26 namespace sc = boost::statechart;
27 
28 namespace ssm {
29 
30 struct Idle;
31 struct Active;
32 struct Established;
33 struct ServerInit;
34 struct EvStart;
35 
36 typedef enum {
37  IDLE = 0,
38  ACTIVE = 1,
41 } SsmState;
42 
43 } // namespace ssm
44 
45 class SandeshSession;
46 class TcpSession;
47 class SandeshConnection;
49 class SandeshUVE;
50 class SandeshStateMachineStats;
51 class SandeshGeneratorStats;
54 
55 typedef boost::function<bool(SandeshStateMachine *)> EvValidate;
56 
58  public sc::state_machine<SandeshStateMachine, ssm::Idle> {
59 public:
60  static const int kIdleHoldTime = 5000; //5 sec .. specified in milliseconds
61  static const int kQueueSize = 200 * 1024 * 1024; // 200 MB
62 
65 
66  void Initialize();
67  void SetAdminState(bool down);
68  void SetDeferDequeue(bool defer);
69 
70  // State transitions
71  template <class Ev> void OnIdle(const Ev &event);
72 
73  // In state reactions
74  template <class Ev> void ReleaseSandesh(const Ev &event);
75  template <class Ev> void DeleteTcpSession(const Ev &event);
76  template <class Ev> void ProcessMessage(const Ev &event);
77 
78  void StartIdleHoldTimer();
79  void CancelIdleHoldTimer();
80  bool IdleHoldTimerRunning();
81  void IdleHoldTimerFired();
82 
83  // Feed session events into the state machine.
85 
86  // Receive Passive Open.
88 
89  // Send SandeshUVEs to collector
90  void SandeshUVESend(SandeshUVE *usnh);
91 
92  // Receive incoming sandesh message
93  bool OnSandeshMessage(SandeshSession *session, const std::string &msg);
94 
95  // In established state, the SM accepts updates to resource state
96  void ResourceUpdate(bool rsc);
97 
98  const std::string &StateName() const;
99  const std::string &LastStateName() const;
100 
101  // getters and setters
103 
106  void clear_session();
108 
109  void set_state(ssm::SsmState state) {
110  last_state_ = state;
111  state_ = state;
113  }
114  ssm::SsmState get_state() const { return state_; }
115 
116  bool get_resource() const { return resource_; }
117  void set_resource(bool r) { resource_ = r; }
118 
119  int idle_hold_time() const { return idle_hold_time_; }
123  }
124 
125  void set_last_event(const std::string &event) {
126  tbb::mutex::scoped_lock lock(smutex_);
127  last_event_ = event;
129  }
130  const std::string last_event() const {
131  tbb::mutex::scoped_lock lock(smutex_);
132  return last_event_;
133  }
135  tbb::mutex::scoped_lock lock(smutex_);
137  last_event_ = "";
138  }
139 
140  void Shutdown(void);
141  void unconsumed_event(const sc::event_base &event);
142  const char *prefix() { return prefix_; }
143 
144  void SetGeneratorKey(const std::string &generator) {
145  generator_key_ = generator;
146  }
147  const std::string &generator_key() const {
148  return generator_key_;
149  }
152  bool GetQueueCount(uint64_t &queue_count) const;
153  bool GetMessageDropLevel(std::string &drop_level) const;
154  bool GetStatistics(SandeshStateMachineStats &sm_stats,
155  SandeshGeneratorBasicStats &basic_msg_stats);
156  bool GetBasicStatistics(SandeshStateMachineStats *sm_stats,
157  SandeshGeneratorBasicStats *basic_msg_stats);
158  bool GetStatistics(SandeshStateMachineStats &sm_stats,
159  SandeshGeneratorStats &detail_msg_stats);
160  bool GetDetailStatistics(SandeshStateMachineStats *sm_stats,
161  SandeshGeneratorStats *detail_msg_stats);
162  size_t GetMaxQueueCount() const { return work_queue_.max_queue_len(); }
163 
164 private:
167 
168  struct EventContainer {
169  boost::intrusive_ptr<const sc::event_base> event;
171  EventContainer() = default;
172  EventContainer(const EventContainer&) = default;
173  };
174 
175  friend class WorkQueue<EventContainer>;
177  size_t *msg_size);
178 
179  void TimerErrorHandler(std::string name, std::string error);
180  bool IdleHoldTimerExpired();
181 
182  template <typename Ev> void Enqueue(const Ev &event);
183  bool DequeueEvent(EventContainer ec);
184  bool LogEvent(const sc::event_base *event);
185  void UpdateRxMsgStats(const std::string &msg_name,
186  size_t msg_size);
187  void UpdateRxMsgFailStats(const std::string &msg_name,
188  size_t msg_size, SandeshRxDropReason::type dreason);
189  void UpdateEventDequeue(const sc::event_base &event);
190  void UpdateEventDequeueFail(const sc::event_base &event);
191  void UpdateEventEnqueue(const sc::event_base &event);
192  void UpdateEventEnqueueFail(const sc::event_base &event);
193  void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail);
194  void GetEventStatistics(SandeshStateMachineStats *sm_stats);
195  void GetDetailMessageStatistics(SandeshGeneratorStats *detail_msg_stats);
196  void GetBasicMessageStatistics(SandeshGeneratorBasicStats *basic_msg_stats);
197  void SetSandeshMessageDropLevel(size_t queue_count,
198  SandeshLevel::type level, boost::function<void (void)> cb);
199  void SetDeferSessionReader(bool defer_reader);
200  bool IsValid() const;
201 
202  const char *prefix_;
203  mutable tbb::mutex smutex_;
210  bool deleted_;
211  tbb::atomic<ssm::SsmState> state_;
212  bool resource_;
214  uint64_t state_since_;
215  std::string last_event_;
216  uint64_t last_event_at_;
217 
218  std::string generator_key_;
223 
225 };
226 
227 template<>
230 
231 template<>
234 
235 #endif // __SANDESH_STATE_MACHINE_H__
const std::string & generator_key() const
const std::string & StateName() const
void SetSandeshMessageDropLevel(size_t queue_count, SandeshLevel::type level, boost::function< void(void)> cb)
void DeleteSession(SandeshSession *session)
const std::string last_event() const
void UpdateEventEnqueueFail(const sc::event_base &event)
void GetBasicMessageStatistics(SandeshGeneratorBasicStats *basic_msg_stats)
void set_state(ssm::SsmState state)
void SandeshUVESend(SandeshUVE *usnh)
size_t max_queue_len() const
Definition: queue_task.h:377
SandeshMessageBuilder * builder_
bool GetBasicStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorBasicStats *basic_msg_stats)
void UpdateEventDequeueFail(const sc::event_base &event)
void unconsumed_event(const sc::event_base &event)
void GetDetailMessageStatistics(SandeshGeneratorStats *detail_msg_stats)
void set_idle_hold_time(int idle_hold_time)
void DeleteTcpSession(const Ev &event)
friend bool GetEvSandeshMessageRecvSize(EventContainer *ec, size_t *msg_size)
bool DequeueEvent(EventContainer ec)
void TimerErrorHandler(std::string name, std::string error)
void ReleaseSandesh(const Ev &event)
SandeshMessageStatistics message_stats_
void ProcessMessage(const Ev &event)
bool GetQueueCount(uint64_t &queue_count) const
boost::function< bool(StateMachine *)> EvValidate
Definition: state_machine.h:33
DISALLOW_COPY_AND_ASSIGN(SandeshStateMachine)
SandeshSession * session()
void set_last_event(const std::string &event)
void GetEventStatistics(SandeshStateMachineStats *sm_stats)
size_t GetMaxQueueCount() const
void SetGeneratorKey(const std::string &generator)
uint8_t type
Definition: load_balance.h:109
bool GetStatistics(SandeshStateMachineStats &sm_stats, SandeshGeneratorBasicStats &basic_msg_stats)
friend class SandeshClientStateMachineTest
bool GetMessageDropLevel(std::string &drop_level) const
static const int kIdleHoldTime
const std::string & LastStateName() const
bool OnSandeshMessage(SandeshSession *session, const std::string &msg)
void set_session(SandeshSession *session)
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
void SetQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm)
void UpdateRxMsgStats(const std::string &msg_name, size_t msg_size)
void UpdateRxMsgFailStats(const std::string &msg_name, size_t msg_size, SandeshRxDropReason::type dreason)
SandeshConnection * connection_
void SetDeferDequeue(bool defer)
ssm::SsmState get_state() const
void Enqueue(const Ev &event)
WorkQueue< EventContainer > EventQueue
static const int kQueueSize
void PassiveOpen(SandeshSession *session)
boost::intrusive_ptr< const sc::event_base > event
size_t AtomicIncrementQueueCount(EventContainer *entry)
Definition: queue_task.h:431
void UpdateEventEnqueue(const sc::event_base &event)
size_t AtomicDecrementQueueCount(EventContainer *entry)
Definition: queue_task.h:435
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
SandeshEventStatistics event_stats_
bool GetDetailStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorStats *detail_msg_stats)
void UpdateEventDequeue(const sc::event_base &event)
SandeshStateMachine(const char *prefix, SandeshConnection *connection)
SandeshLevel::type message_drop_level_
void SetDeferSessionReader(bool defer_reader)
friend class SandeshServerStateMachineTest
Definition: timer.h:54
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
Definition: p/sandesh.h:147
void OnIdle(const Ev &event)
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
bool LogEvent(const sc::event_base *event)
tbb::atomic< ssm::SsmState > state_
SandeshConnection * connection()