OpenSDN source code
sandesh_state_machine.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_state_machine.h
7 //
8 // Sandesh state machine based on boost::statechart
9 //
10 
11 #ifndef __SANDESH_STATE_MACHINE_H__
12 #define __SANDESH_STATE_MACHINE_H__
13 
14 #include <atomic>
15 #include <mutex>
16 
17 #include <boost/asio.hpp>
18 #include <boost/ptr_container/ptr_map.hpp>
19 #include <boost/statechart/state_machine.hpp>
20 
21 #include <base/queue_task.h>
22 #include <base/timer.h>
23 #include <io/tcp_session.h>
24 
25 #include <sandesh/sandesh_statistics.h>
26 
27 namespace sc = boost::statechart;
28 
29 namespace ssm {
30 
31 struct Idle;
32 struct Active;
33 struct Established;
34 struct ServerInit;
35 struct EvStart;
36 
37 typedef enum {
38  IDLE = 0,
39  ACTIVE = 1,
42 } SsmState;
43 
44 } // namespace ssm
45 
46 class SandeshSession;
47 class TcpSession;
48 class SandeshConnection;
50 class SandeshUVE;
51 class SandeshStateMachineStats;
52 class SandeshGeneratorStats;
55 
56 typedef boost::function<bool(SandeshStateMachine *)> EvValidate;
57 
59  public sc::state_machine<SandeshStateMachine, ssm::Idle> {
60 public:
61  static const int kIdleHoldTime = 5000; //5 sec .. specified in milliseconds
62  static const int kQueueSize = 200 * 1024 * 1024; // 200 MB
63 
66 
67  void Initialize();
68  void SetAdminState(bool down);
69  void SetDeferDequeue(bool defer);
70 
71  // State transitions
72  template <class Ev> void OnIdle(const Ev &event);
73 
74  // In state reactions
75  template <class Ev> void ReleaseSandesh(const Ev &event);
76  template <class Ev> void DeleteTcpSession(const Ev &event);
77  template <class Ev> void ProcessMessage(const Ev &event);
78 
79  void StartIdleHoldTimer();
80  void CancelIdleHoldTimer();
81  bool IdleHoldTimerRunning();
82  void IdleHoldTimerFired();
83 
84  // Feed session events into the state machine.
86 
87  // Receive Passive Open.
89 
90  // Send SandeshUVEs to collector
92 
93  // Receive incoming sandesh message
94  bool OnSandeshMessage(SandeshSession *session, const std::string &msg);
95 
96  // In established state, the SM accepts updates to resource state
97  void ResourceUpdate(bool rsc);
98 
99  const std::string &StateName() const;
100  const std::string &LastStateName() const;
101 
102  // getters and setters
104 
107  void clear_session();
109 
110  void set_state(ssm::SsmState state) {
111  last_state_ = state;
112  state_ = state;
114  }
115  ssm::SsmState get_state() const { return state_; }
116 
117  bool get_resource() const { return resource_; }
118  void set_resource(bool r) { resource_ = r; }
119 
120  int idle_hold_time() const { return idle_hold_time_; }
124  }
125 
126  void set_last_event(const std::string &event) {
127  std::scoped_lock lock(smutex_);
128  last_event_ = event;
130  }
131  const std::string last_event() const {
132  std::scoped_lock lock(smutex_);
133  return last_event_;
134  }
136  std::scoped_lock lock(smutex_);
138  last_event_ = "";
139  }
140 
141  void Shutdown(void);
142  void unconsumed_event(const sc::event_base &event);
143  const char *prefix() { return prefix_; }
144 
145  void SetGeneratorKey(const std::string &generator) {
146  generator_key_ = generator;
147  }
148  const std::string &generator_key() const {
149  return generator_key_;
150  }
153  bool GetQueueCount(uint64_t &queue_count) const;
154  bool GetMessageDropLevel(std::string &drop_level) const;
155  bool GetStatistics(SandeshStateMachineStats &sm_stats,
156  SandeshGeneratorBasicStats &basic_msg_stats);
157  bool GetBasicStatistics(SandeshStateMachineStats *sm_stats,
158  SandeshGeneratorBasicStats *basic_msg_stats);
159  bool GetStatistics(SandeshStateMachineStats &sm_stats,
160  SandeshGeneratorStats &detail_msg_stats);
161  bool GetDetailStatistics(SandeshStateMachineStats *sm_stats,
162  SandeshGeneratorStats *detail_msg_stats);
163  size_t GetMaxQueueCount() const { return work_queue_.max_queue_len(); }
164 
165 private:
168 
169  struct EventContainer {
170  boost::intrusive_ptr<const sc::event_base> event;
172  EventContainer() = default;
173  EventContainer(const EventContainer&) = default;
174  };
175 
176  friend class WorkQueue<EventContainer>;
178  size_t *msg_size);
179 
180  void TimerErrorHandler(std::string name, std::string error);
181  bool IdleHoldTimerExpired();
182 
183  template <typename Ev> void Enqueue(const Ev &event);
184  bool DequeueEvent(EventContainer ec);
185  bool LogEvent(const sc::event_base *event);
186  void UpdateRxMsgStats(const std::string &msg_name,
187  size_t msg_size);
188  void UpdateRxMsgFailStats(const std::string &msg_name,
189  size_t msg_size, SandeshRxDropReason::type dreason);
190  void UpdateEventDequeue(const sc::event_base &event);
191  void UpdateEventDequeueFail(const sc::event_base &event);
192  void UpdateEventEnqueue(const sc::event_base &event);
193  void UpdateEventEnqueueFail(const sc::event_base &event);
194  void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail);
195  void GetEventStatistics(SandeshStateMachineStats *sm_stats);
196  void GetDetailMessageStatistics(SandeshGeneratorStats *detail_msg_stats);
197  void GetBasicMessageStatistics(SandeshGeneratorBasicStats *basic_msg_stats);
198  void SetSandeshMessageDropLevel(size_t queue_count,
199  SandeshLevel::type level, boost::function<void (void)> cb);
200  void SetDeferSessionReader(bool defer_reader);
201  bool IsValid() const;
202 
203  const char *prefix_;
204  mutable std::mutex smutex_;
211  bool deleted_;
212  std::atomic<ssm::SsmState> state_;
213  bool resource_;
215  uint64_t state_since_;
216  std::string last_event_;
217  uint64_t last_event_at_;
218 
219  std::string generator_key_;
224 
226 };
227 
228 template<>
231 
232 template<>
235 
236 #endif // __SANDESH_STATE_MACHINE_H__
SandeshMessageStatistics message_stats_
void GetDetailMessageStatistics(SandeshGeneratorStats *detail_msg_stats)
DISALLOW_COPY_AND_ASSIGN(SandeshStateMachine)
friend class SandeshClientStateMachineTest
bool GetDetailStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorStats *detail_msg_stats)
void UpdateEventDequeueFail(const sc::event_base &event)
void set_state(ssm::SsmState state)
void UpdateEventEnqueueFail(const sc::event_base &event)
void GetBasicMessageStatistics(SandeshGeneratorBasicStats *basic_msg_stats)
bool GetStatistics(SandeshStateMachineStats &sm_stats, SandeshGeneratorBasicStats &basic_msg_stats)
size_t GetMaxQueueCount() const
bool OnSandeshMessage(SandeshSession *session, const std::string &msg)
WorkQueue< EventContainer > EventQueue
void ReleaseSandesh(const Ev &event)
void SetDeferDequeue(bool defer)
void SetDeferSessionReader(bool defer_reader)
void SetGeneratorKey(const std::string &generator)
void GetEventStatistics(SandeshStateMachineStats *sm_stats)
void set_session(SandeshSession *session)
bool GetBasicStatistics(SandeshStateMachineStats *sm_stats, SandeshGeneratorBasicStats *basic_msg_stats)
SandeshStateMachine(const char *prefix, SandeshConnection *connection)
SandeshConnection * connection_
SandeshConnection * connection()
void UpdateEventDequeue(const sc::event_base &event)
void set_last_event(const std::string &event)
bool LogEvent(const sc::event_base *event)
SandeshSession * session()
void UpdateEventEnqueue(const sc::event_base &event)
void PassiveOpen(SandeshSession *session)
std::atomic< ssm::SsmState > state_
void UpdateRxMsgStats(const std::string &msg_name, size_t msg_size)
const std::string & generator_key() const
const std::string & LastStateName() const
void DeleteTcpSession(const Ev &event)
void ProcessMessage(const Ev &event)
void UpdateRxMsgFailStats(const std::string &msg_name, size_t msg_size, SandeshRxDropReason::type dreason)
static const int kIdleHoldTime
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
friend bool GetEvSandeshMessageRecvSize(EventContainer *ec, size_t *msg_size)
SandeshEventStatistics event_stats_
void OnIdle(const Ev &event)
friend class SandeshServerStateMachineTest
SandeshMessageBuilder * builder_
bool DequeueEvent(EventContainer ec)
const std::string last_event() const
void SetQueueWaterMarkInfo(Sandesh::QueueWaterMarkInfo &wm)
void SandeshUVESend(SandeshUVE *usnh)
void TimerErrorHandler(std::string name, std::string error)
ssm::SsmState get_state() const
void DeleteSession(SandeshSession *session)
bool GetQueueCount(uint64_t &queue_count) const
void set_idle_hold_time(int idle_hold_time)
void unconsumed_event(const sc::event_base &event)
const std::string & StateName() const
SandeshLevel::type message_drop_level_
static const int kQueueSize
bool GetMessageDropLevel(std::string &drop_level) const
void Enqueue(const Ev &event)
void UpdateEventStats(const sc::event_base &event, bool enqueue, bool fail)
void SetSandeshMessageDropLevel(size_t queue_count, SandeshLevel::type level, boost::function< void(void)> cb)
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
Definition: cpp/sandesh.h:149
Definition: timer.h:57
size_t AtomicDecrementQueueCount(EventContainer *entry)
Definition: queue_task.h:435
size_t AtomicIncrementQueueCount(EventContainer *entry)
Definition: queue_task.h:431
size_t max_queue_len() const
Definition: queue_task.h:377
uint8_t type
Definition: load_balance.h:2
boost::function< bool(SandeshStateMachine *)> EvValidate
boost::function< bool(StateMachine *)> EvValidate
Definition: state_machine.h:33
boost::intrusive_ptr< const sc::event_base > event
EventContainer(const EventContainer &)=default
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13