OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sandesh_session.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_session.h
7 //
8 // Sandesh Session
9 //
10 
11 #ifndef __SANDESH_SESSION_H__
12 #define __SANDESH_SESSION_H__
13 
14 #include <tbb/mutex.h>
15 
16 #include <boost/system/error_code.hpp>
17 #include <boost/asio.hpp>
18 #include <boost/tuple/tuple.hpp>
19 
20 #include <base/util.h>
21 #include <io/ssl_session.h>
22 #include <io/udp_server.h>
23 
24 #include <sandesh/transport/TBufferTransports.h>
25 #include <sandesh/sandesh.h>
26 #include <sandesh/sandesh_util.h>
27 #include <sandesh/sandesh_uve_types.h>
28 #include <sandesh/stats_client.h>
29 
31 class SandeshSession;
32 class Sandesh;
33 
35 public:
36  static const uint32_t kEncodeBufferSize = 2048;
37  static const unsigned int kDefaultSendSize = 16384;
38 
39  SandeshWriter(SandeshSession *session);
41  void SendMsg(Sandesh *sandesh, bool more);
42  void SendBuffer(boost::shared_ptr<TMemoryBuffer> sbuffer,
43  bool more = false) {
44  SendInternal(sbuffer);
45  }
46  void WriteReady(const boost::system::error_code &ec);
47  bool SendReady() {
48  tbb::mutex::scoped_lock lock(send_mutex_);
49  return ready_to_send_;
50  }
51 
52  static const std::string sandesh_open_;
53  static const std::string sandesh_open_attr_length_;
54  static const std::string sandesh_close_;
55 
56 protected:
57  friend class SandeshSessionTest;
58 
59  // Inline routines invoked by SendMsg()
60  void SendMsgMore(boost::shared_ptr<TMemoryBuffer>);
61  void SendMsgAll(boost::shared_ptr<TMemoryBuffer>);
62 
63 private:
64  friend class SandeshSendMsgUnitTest;
65 
67 
68  void SendInternal(boost::shared_ptr<TMemoryBuffer>);
69  void ConnectTimerExpired(const boost::system::error_code &error);
70  size_t send_buf_offset() { return send_buf_offset_; }
71  uint8_t* send_buf() const { return send_buf_; }
72  void set_send_buf(uint8_t *buf, size_t len) {
73  assert(len && (len < kDefaultSendSize));
74  memcpy(send_buf(), buf, len);
75  send_buf_offset_ = len;
76  }
77  void append_send_buf(uint8_t *buf, size_t len) {
78  assert(len && ((len + send_buf_offset_) < kDefaultSendSize));
79  memcpy(send_buf() + send_buf_offset_, buf, len);
80  send_buf_offset_ += len;
81  }
82  void reset_send_buf() {
83  send_buf_offset_ = 0;
84  }
85 
86  tbb::mutex send_mutex_;
88  // send_buf_ is used to store unsent data
89  uint8_t *send_buf_;
91 
92 #define sXML_SANDESH_OPEN_ATTR_LENGTH "<sandesh length=\""
93 #define sXML_SANDESH_OPEN "<sandesh length=\"0000000000\">"
94 #define sXML_SANDESH_CLOSE "</sandesh>"
95 
97 };
98 
99 typedef boost::function<bool(const std::string&, SandeshSession *)>
101 
103 public:
104  typedef boost::asio::const_buffer Buffer;
105 
106  SandeshReader(SandeshSession *session);
107  virtual ~SandeshReader();
108  virtual void OnRead(Buffer buffer);
110  static int ExtractMsgHeader(const std::string& msg, SandeshHeader& header,
111  std::string& msg_type, uint32_t& header_offset);
112 
113 private:
114  bool MsgLengthKnown() { return msg_length_ != (size_t)-1; }
115 
116  size_t msg_length() { return msg_length_; }
117 
118  void set_msg_length(size_t length) { msg_length_ = length; }
119 
121 
122  void SetBuf(const std::string &str);
123  void ReplaceBuf(const std::string &str);
124  bool LeftOver() const;
125  int MatchString(const std::string& match, size_t &m_offset);
126  bool ExtractMsgLength(size_t &msg_length, int *result);
127  bool ExtractMsg(Buffer buffer, int *result, bool NewBuf);
128 
129  std::string buf_;
130  size_t offset_;
131  size_t msg_length_;
133  tbb::mutex cb_mutex_;
135 
137 
139 };
140 
141 class SandeshConnection;
142 
143 class SandeshSession : public SslSession {
144 public:
145  SandeshSession(SslServer *client, SslSocket *socket, int task_instance,
146  int writer_task_id, int reader_task_id);
147  virtual ~SandeshSession();
148  virtual void Shutdown();
149  virtual void OnRead(Buffer buffer);
150  virtual void WriteReady(const boost::system::error_code &ec) {
151  writer_->WriteReady(ec);
152  }
153  virtual bool EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len);
155  return send_queue_.get();
156  }
158  return send_buffer_queue_.get();
159  }
161  return writer_.get();
162  }
164  tbb::mutex::scoped_lock lock(conn_mutex_);
166  }
168  tbb::mutex::scoped_lock lock(conn_mutex_);
169  return connection_;
170  }
172  reader_->SetReceiveMsgCb(cb);
173  }
174  virtual int GetSessionInstance() const {
175  return instance_;
176  }
177  virtual void EnqueueClose();
178  virtual boost::system::error_code SetSocketOptions();
179  virtual std::string ToString() const;
180  void set_stats_client(StatsClient *stats_client) { stats_client_ = stats_client;}
181  static Sandesh * DecodeCtrlSandesh(const std::string& msg, const SandeshHeader& header,
182  const std::string& sandesh_name, const uint32_t& header_offset);
183  // Session statistics
184  inline void increment_recv_msg() {
185  sstats_.num_recv_msg++;
186  }
187  inline void increment_recv_msg_fail() {
188  sstats_.num_recv_msg_fail++;
189  }
190  inline void increment_recv_fail() {
191  sstats_.num_recv_fail++;
192  }
193  inline void increment_send_msg() {
194  sstats_.num_send_msg++;
195  }
196  inline void increment_send_msg_fail() {
197  sstats_.num_send_msg_fail++;
198  }
200  sstats_.num_send_buffer_fail++;
201  }
203  sstats_.num_wait_msgq_enqueue++;
204  }
206  sstats_.num_wait_msgq_dequeue++;
207  }
209  sstats_.num_write_ready_cb_error++;
210  }
211  const SandeshSessionStats& GetStats() const {
212  return sstats_;
213  }
217 
218 protected:
219  virtual int reader_task_id() const {
220  return reader_task_id_;
221  }
222 
223 private:
224  friend class SandeshSessionTest;
225 
226  // 60 seconds - 45s + (3*5)s
227  static const int kSessionKeepaliveIdleTime = 15; // in seconds
228  static const int kSessionKeepaliveInterval = 3; // in seconds
229  static const int kSessionKeepaliveProbes = 5; // count
230  static const int kSessionTcpUserTimeout = 30000; // ms
231  static const int kQueueSize = 200 * 1024 * 1024; // 200 MB
232 
233  bool SendMsg(SandeshElement element);
234  bool SendBuffer(boost::shared_ptr<TMemoryBuffer> sbuffer);
235  bool SessionSendReady();
236  void SetSendingLevel(size_t count, SandeshLevel::type level);
237 
239  boost::scoped_ptr<SandeshWriter> writer_;
240  boost::scoped_ptr<SandeshReader> reader_;
241  boost::scoped_ptr<Sandesh::SandeshQueue> send_queue_;
242  boost::scoped_ptr<Sandesh::SandeshBufferQueue> send_buffer_queue_;
245  tbb::mutex conn_mutex_;
246  tbb::mutex send_mutex_;
253 
254  // Session statistics
255  SandeshSessionStats sstats_;
256 
258 };
259 
260 #endif // __SANDESH_SESSION_H__
void increment_wait_msgq_dequeue()
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
void SetSendingLevel(size_t count, SandeshLevel::type level)
tbb::mutex send_mutex_
virtual void OnRead(Buffer buffer)
tbb::mutex send_mutex_
boost::asio::const_buffer Buffer
Definition: tcp_session.h:64
virtual void OnRead(Buffer buffer)
void set_stats_client(StatsClient *stats_client)
boost::scoped_ptr< SandeshWriter > writer_
static const int kDefaultRecvSize
void append_send_buf(uint8_t *buf, size_t len)
void set_msg_length(size_t length)
void SendMsgAll(boost::shared_ptr< TMemoryBuffer >)
void set_send_buf(uint8_t *buf, size_t len)
SandeshSession * session_
virtual void WriteReady(const boost::system::error_code &ec)
static const int kSessionKeepaliveProbes
void increment_recv_msg()
uint8_t * send_buf_
size_t msg_length()
void SendMsgMore(boost::shared_ptr< TMemoryBuffer >)
size_t send_buf_offset_
virtual void Shutdown()
virtual int GetSessionInstance() const
SandeshReceiveMsgCb cb_
boost::scoped_ptr< Sandesh::SandeshBufferQueue > send_buffer_queue_
void SendInternal(boost::shared_ptr< TMemoryBuffer >)
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
SandeshSession(SslServer *client, SslSocket *socket, int task_instance, int writer_task_id, int reader_task_id)
int MatchString(const std::string &match, size_t &m_offset)
bool ExtractMsgLength(size_t &msg_length, int *result)
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
Definition: ssl_session.h:18
static const int kSessionKeepaliveIdleTime
static const int kSessionKeepaliveInterval
boost::asio::const_buffer Buffer
boost::scoped_ptr< Sandesh::SandeshQueue > send_queue_
DISALLOW_COPY_AND_ASSIGN(SandeshWriter)
DISALLOW_COPY_AND_ASSIGN(SandeshSession)
SandeshLevel::type SendingLevel() const
void WriteReady(const boost::system::error_code &ec)
tbb::mutex conn_mutex_
virtual ~SandeshReader()
static const std::string sandesh_open_
Sandesh::SandeshBufferQueue * send_buffer_queue()
void increment_send_buffer_fail()
boost::scoped_ptr< SandeshReader > reader_
void increment_send_msg()
void ResetSendQueueWaterMark()
void SetSendQueueWaterMark(Sandesh::QueueWaterMarkInfo &wm_info)
DISALLOW_COPY_AND_ASSIGN(SandeshReader)
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
virtual boost::system::error_code SetSocketOptions()
uint8_t type
Definition: load_balance.h:109
SandeshLevel::type sending_level_
StatsClient * stats_client_
void ConnectTimerExpired(const boost::system::error_code &error)
tbb::mutex cb_mutex_
void increment_send_msg_fail()
uint8_t * send_buf() const
virtual std::string ToString() const
friend class SandeshSessionTest
static const std::string sandesh_close_
void increment_write_ready_cb_error()
void SetConnection(SandeshConnection *connection)
Sandesh::SandeshQueue * send_queue()
SandeshWriter(SandeshSession *session)
bool SendBuffer(boost::shared_ptr< TMemoryBuffer > sbuffer)
void SetBuf(const std::string &str)
void reset_send_buf()
SandeshSession * session_
static const uint32_t kEncodeBufferSize
void SendMsg(Sandesh *sandesh, bool more)
virtual bool EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len)
static const int kSessionTcpUserTimeout
static const std::string sandesh_open_attr_length_
static const int kQueueSize
const SandeshSessionStats & GetStats() const
void increment_recv_msg_fail()
bool ExtractMsg(Buffer buffer, int *result, bool NewBuf)
static const unsigned int kDefaultSendSize
bool LeftOver() const
SandeshWriter * writer()
void SendBuffer(boost::shared_ptr< TMemoryBuffer > sbuffer, bool more=false)
std::string buf_
SandeshConnection * connection_
SandeshReader(SandeshSession *session)
SandeshConnection * connection()
void increment_recv_fail()
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
SandeshSessionStats sstats_
friend class SandeshSessionTest
virtual ~SandeshSession()
void ReplaceBuf(const std::string &str)
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
Definition: p/sandesh.h:147
boost::function< bool(const std::string &, SandeshSession *)> SandeshReceiveMsgCb
friend class SandeshSendMsgUnitTest
virtual void EnqueueClose()
virtual Socket * socket() const
Definition: ssl_session.cc:97
void reset_msg_length()
virtual int reader_task_id() const
size_t send_buf_offset()
bool SendMsg(SandeshElement element)
void increment_wait_msgq_enqueue()