11 #ifndef __SANDESH_SESSION_H__
12 #define __SANDESH_SESSION_H__
14 #include <tbb/mutex.h>
16 #include <boost/system/error_code.hpp>
17 #include <boost/asio.hpp>
18 #include <boost/tuple/tuple.hpp>
24 #include <sandesh/transport/TBufferTransports.h>
25 #include <sandesh/sandesh.h>
26 #include <sandesh/sandesh_util.h>
27 #include <sandesh/sandesh_uve_types.h>
28 #include <sandesh/stats_client.h>
42 void SendBuffer(boost::shared_ptr<TMemoryBuffer> sbuffer,
46 void WriteReady(
const boost::system::error_code &ec);
61 void SendMsgAll(boost::shared_ptr<TMemoryBuffer>);
92 #define sXML_SANDESH_OPEN_ATTR_LENGTH "<sandesh length=\""
93 #define sXML_SANDESH_OPEN "<sandesh length=\"0000000000\">"
94 #define sXML_SANDESH_CLOSE "</sandesh>"
99 typedef boost::function<bool(const std::string&, SandeshSession *)>
104 typedef boost::asio::const_buffer
Buffer;
111 std::string& msg_type, uint32_t& header_offset);
122 void SetBuf(
const std::string &str);
125 int MatchString(
const std::string& match,
size_t &m_offset);
150 virtual void WriteReady(
const boost::system::error_code &ec) {
153 virtual bool EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len);
179 virtual std::string
ToString()
const;
182 const std::string& sandesh_name,
const uint32_t& header_offset);
200 sstats_.num_send_buffer_fail++;
203 sstats_.num_wait_msgq_enqueue++;
206 sstats_.num_wait_msgq_dequeue++;
209 sstats_.num_write_ready_cb_error++;
234 bool SendBuffer(boost::shared_ptr<TMemoryBuffer> sbuffer);
260 #endif // __SANDESH_SESSION_H__
void increment_wait_msgq_dequeue()
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
void SetSendingLevel(size_t count, SandeshLevel::type level)
virtual void OnRead(Buffer buffer)
boost::asio::const_buffer Buffer
virtual void OnRead(Buffer buffer)
void set_stats_client(StatsClient *stats_client)
boost::scoped_ptr< SandeshWriter > writer_
static const int kDefaultRecvSize
void append_send_buf(uint8_t *buf, size_t len)
void set_msg_length(size_t length)
void SendMsgAll(boost::shared_ptr< TMemoryBuffer >)
void set_send_buf(uint8_t *buf, size_t len)
SandeshSession * session_
virtual void WriteReady(const boost::system::error_code &ec)
static const int kSessionKeepaliveProbes
void increment_recv_msg()
void SendMsgMore(boost::shared_ptr< TMemoryBuffer >)
virtual int GetSessionInstance() const
boost::scoped_ptr< Sandesh::SandeshBufferQueue > send_buffer_queue_
void SendInternal(boost::shared_ptr< TMemoryBuffer >)
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
SandeshSession(SslServer *client, SslSocket *socket, int task_instance, int writer_task_id, int reader_task_id)
int MatchString(const std::string &match, size_t &m_offset)
bool ExtractMsgLength(size_t &msg_length, int *result)
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
static const int kSessionKeepaliveIdleTime
static const int kSessionKeepaliveInterval
boost::asio::const_buffer Buffer
boost::scoped_ptr< Sandesh::SandeshQueue > send_queue_
DISALLOW_COPY_AND_ASSIGN(SandeshWriter)
DISALLOW_COPY_AND_ASSIGN(SandeshSession)
SandeshLevel::type SendingLevel() const
void WriteReady(const boost::system::error_code &ec)
static const std::string sandesh_open_
Sandesh::SandeshBufferQueue * send_buffer_queue()
void increment_send_buffer_fail()
boost::scoped_ptr< SandeshReader > reader_
void increment_send_msg()
void ResetSendQueueWaterMark()
void SetSendQueueWaterMark(Sandesh::QueueWaterMarkInfo &wm_info)
DISALLOW_COPY_AND_ASSIGN(SandeshReader)
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
virtual boost::system::error_code SetSocketOptions()
SandeshLevel::type sending_level_
StatsClient * stats_client_
void ConnectTimerExpired(const boost::system::error_code &error)
void increment_send_msg_fail()
uint8_t * send_buf() const
virtual std::string ToString() const
friend class SandeshSessionTest
static const std::string sandesh_close_
void increment_write_ready_cb_error()
void SetConnection(SandeshConnection *connection)
Sandesh::SandeshQueue * send_queue()
SandeshWriter(SandeshSession *session)
bool SendBuffer(boost::shared_ptr< TMemoryBuffer > sbuffer)
void SetBuf(const std::string &str)
SandeshSession * session_
static const uint32_t kEncodeBufferSize
void SendMsg(Sandesh *sandesh, bool more)
virtual bool EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len)
static const int kSessionTcpUserTimeout
static const std::string sandesh_open_attr_length_
static const int kQueueSize
const SandeshSessionStats & GetStats() const
void increment_recv_msg_fail()
bool ExtractMsg(Buffer buffer, int *result, bool NewBuf)
static const unsigned int kDefaultSendSize
void SendBuffer(boost::shared_ptr< TMemoryBuffer > sbuffer, bool more=false)
SandeshConnection * connection_
SandeshReader(SandeshSession *session)
SandeshConnection * connection()
void increment_recv_fail()
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
SandeshSessionStats sstats_
friend class SandeshSessionTest
virtual ~SandeshSession()
void ReplaceBuf(const std::string &str)
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
boost::function< bool(const std::string &, SandeshSession *)> SandeshReceiveMsgCb
friend class SandeshSendMsgUnitTest
virtual void EnqueueClose()
virtual Socket * socket() const
virtual int reader_task_id() const
bool SendMsg(SandeshElement element)
void increment_wait_msgq_enqueue()