11 #include <boost/bind.hpp>
12 #include <boost/assign.hpp>
13 #include <boost/algorithm/string.hpp>
17 #include <sandesh/common/vns_types.h>
18 #include <sandesh/common/vns_constants.h>
19 #include <sandesh/transport/TBufferTransports.h>
20 #include <sandesh/protocol/TXMLProtocol.h>
21 #include <sandesh/protocol/TJSONProtocol.h>
22 #include "sandesh/sandesh_types.h"
23 #include "sandesh/sandesh.h"
30 using namespace contrail::sandesh::protocol;
31 using namespace contrail::sandesh::transport;
33 using boost::asio::mutable_buffer;
34 using boost::asio::buffer_cast;
47 send_buf_(new uint8_t[kDefaultSendSize]),
57 SANDESH_LOG(ERROR,
"SandeshSession Write error value: " << ec.value()
58 <<
" category: " << ec.category().name()
59 <<
" message: " << ec.message());
77 int32_t xfer = 0, ret;
79 boost::shared_ptr<TMemoryBuffer> btrans(
81 boost::shared_ptr<TXMLProtocol> prot(
84 header.set_Namespace(sandesh->
scope());
85 header.set_Timestamp(sandesh->
timestamp());
86 header.set_Module(sandesh->
module());
87 header.set_Source(sandesh->
source());
88 header.set_Context(sandesh->
context());
89 header.set_SequenceNum(sandesh->
seqnum());
91 header.set_Type(sandesh->
type());
92 header.set_Hints(sandesh->
hints());
93 header.set_Level(sandesh->
level());
94 header.set_Category(sandesh->
category());
95 header.set_NodeType(sandesh->
node_type());
103 if ((ret = header.write(prot)) < 0) {
104 SANDESH_LOG(ERROR, __func__ <<
": Sandesh header write FAILED: " <<
105 sandesh->
Name() <<
" : " << sandesh->
source() <<
":" <<
107 " Sequence Number:" << sandesh->
seqnum());
110 SandeshTxDropReason::HeaderWriteFailed);
116 if ((ret = sandesh->
Write(prot)) < 0) {
117 SANDESH_LOG(ERROR, __func__ <<
": Sandesh write FAILED: "<<
118 sandesh->
Name() <<
" : " << sandesh->
source() <<
":" <<
120 " Sequence Number:" << sandesh->
seqnum());
123 SandeshTxDropReason::WriteFailed);
133 btrans->getBuffer(&buffer, &offset);
138 char prev = ss.fill(
'0');
179 send_buffer->getBuffer(&buf, &buf_len);
191 boost::shared_ptr<TMemoryBuffer> bulk_msg(
227 send_buffer->getBuffer(&buf, &buf_len);
235 boost::shared_ptr<TMemoryBuffer> bulk_msg(
new
237 buffer = bulk_msg->getWritePtr(bulk_msg_len);
240 bulk_msg->wroteBytes(bulk_msg_len);
249 boost::shared_ptr<TMemoryBuffer> old_buf(
new
273 buf->getBuffer(&buffer, &len);
282 int task_instance,
int writer_task_id,
int reader_task_id) :
284 instance_(task_instance),
287 send_queue_(new
Sandesh::SandeshQueue(writer_task_id,
293 keepalive_idle_time_(kSessionKeepaliveIdleTime),
294 keepalive_interval_(kSessionKeepaliveInterval),
295 keepalive_probes_(kSessionKeepaliveProbes),
296 tcp_user_timeout_(kSessionTcpUserTimeout),
297 reader_task_id_(reader_task_id),
298 sending_level_(SandeshLevel::
INVALID) {
320 boost::get<1>(swmi)));
321 if (boost::get<2>(swmi)) {
351 std::stringstream out;
374 SANDESH_LOG(ERROR, __func__ <<
" Not Connected : Dropping Message: " <<
379 SandeshTxDropReason::SessionNotConnected);
390 writer_->SendMsg(sandesh, more);
406 boost::shared_ptr<TMemoryBuffer> sbuffer(
new TMemoryBuffer(buf_len));
407 u_int8_t *write_buf = sbuffer->getWritePtr(buf_len);
408 memcpy(write_buf, buf, buf_len);
409 sbuffer->wroteBytes(buf_len);
414 const SandeshHeader& header,
415 const string& sandesh_name,
const uint32_t& header_offset) {
416 namespace sandesh_prot = contrail::sandesh::protocol;
417 namespace sandesh_trans = contrail::sandesh::transport;
419 assert(header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT);
423 if (sandesh == NULL) {
424 SANDESH_LOG(ERROR, __func__ <<
": Unknown sandesh ctrl message: " << sandesh_name);
427 boost::shared_ptr<sandesh_trans::TMemoryBuffer> btrans =
428 boost::shared_ptr<sandesh_trans::TMemoryBuffer>(
429 new sandesh_trans::TMemoryBuffer((uint8_t *)msg.c_str() + header_offset,
430 msg.size() - header_offset));
431 boost::shared_ptr<sandesh_prot::TXMLProtocol> prot =
432 boost::shared_ptr<sandesh_prot::TXMLProtocol>(
new sandesh_prot::TXMLProtocol(btrans));
433 int32_t xfer = sandesh->Read(prot);
435 SANDESH_LOG(ERROR, __func__ <<
": Decoding " << sandesh_name <<
" for ctrl FAILED");
474 SandeshHeader& header, std::string& msg_type, uint32_t& header_offset) {
475 int32_t xfer = 0, ret;
476 boost::shared_ptr<TMemoryBuffer> btrans =
477 boost::shared_ptr<TMemoryBuffer>(
479 boost::shared_ptr<TXMLProtocol> prot =
480 boost::shared_ptr<TXMLProtocol>(
new TXMLProtocol(btrans));
482 if ((ret = header.read(prot)) <= 0) {
483 SANDESH_LOG(ERROR, __func__ <<
": Sandesh header read FAILED: " << msg);
487 header_offset = xfer;
489 if ((ret = prot->readSandeshBegin(msg_type)) <= 0) {
490 SANDESH_LOG(ERROR, __func__ <<
": Sandesh begin read FAILED: " << msg);
526 if (!boost::algorithm::starts_with(
buf_.c_str() +
offset_,
532 std::string::const_iterator end =
buf_.begin() +
offset_ +
539 std::string::const_iterator st =
buf_.begin() +
offset_ +
543 string length = string(st, end);
546 if (msg_length == 0) {
582 " Session being deleted: Dropping Message");
588 bool done =
ExtractMsg(buffer, &result,
true);
592 SANDESH_LOG(ERROR, __func__ <<
" Message extract failed: " << result);
595 SANDESH_LOG(ERROR, __func__ <<
" OnRead Buffer Size: " << cp_size);
596 SANDESH_LOG(ERROR, __func__ <<
" OnRead Buffer: ");
597 std::string debug((
const char*)cp, cp_size);
612 std::string::const_iterator st =
buf_.begin() +
offset_ +
614 std::string::const_iterator end =
buf_.begin() +
offset_ +
616 std::string xml(st, end);
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
void SetSendingLevel(size_t count, SandeshLevel::type level)
SandeshLevel::type level() const
virtual void OnRead(Buffer buffer)
boost::asio::const_buffer Buffer
virtual void OnRead(Buffer buffer)
boost::scoped_ptr< SandeshWriter > writer_
static const int kDefaultRecvSize
std::string scope() const
void append_send_buf(uint8_t *buf, size_t len)
void set_msg_length(size_t length)
static Sandesh * CreateInstance(std::string const &s)
void SendMsgAll(boost::shared_ptr< TMemoryBuffer >)
void set_send_buf(uint8_t *buf, size_t len)
virtual std::string ToString() const
static size_t BufferSize(const Buffer &buffer)
bool stringToInteger(const std::string &str, NumberType &num)
SandeshSession * session_
void SendMsgMore(boost::shared_ptr< TMemoryBuffer >)
virtual void Log() const =0
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
virtual const char * Name() const
boost::scoped_ptr< Sandesh::SandeshBufferQueue > send_buffer_queue_
void SendInternal(boost::shared_ptr< TMemoryBuffer >)
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
#define SANDESH_LOG(_Level, _Msg)
SandeshSession(SslServer *client, SslSocket *socket, int task_instance, int writer_task_id, int reader_task_id)
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
bool ExtractMsgLength(size_t &msg_length, int *result)
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
boost::asio::const_buffer Buffer
boost::scoped_ptr< Sandesh::SandeshQueue > send_queue_
SandeshLevel::type SendingLevel() const
void WriteReady(const boost::system::error_code &ec)
static void UpdateTxMsgStats(const std::string &msg_name, uint64_t bytes)
static std::string node_type()
static const std::string sandesh_open_
Sandesh::SandeshBufferQueue * send_buffer_queue()
void increment_send_buffer_fail()
boost::scoped_ptr< SandeshReader > reader_
void increment_send_msg()
void ResetSendQueueWaterMark()
void SetSendQueueWaterMark(Sandesh::QueueWaterMarkInfo &wm_info)
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
virtual boost::system::error_code SetSocketOptions()
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
SandeshLevel::type sending_level_
StatsClient * stats_client_
static SandeshRole::type role()
static std::string module()
void increment_send_msg_fail()
SandeshType::type type() const
uint8_t * send_buf() const
std::string category() const
virtual std::string ToString() const
std::string context() const
static const std::string sandesh_close_
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
void increment_write_ready_cb_error()
Sandesh::SandeshQueue * send_queue()
bool IsLoggingAllowed() const
SandeshWriter(SandeshSession *session)
bool SendBuffer(boost::shared_ptr< TMemoryBuffer > sbuffer)
void SetBuf(const std::string &str)
virtual const int32_t versionsig() const =0
SandeshSession * session_
static const uint32_t kEncodeBufferSize
void SendMsg(Sandesh *sandesh, bool more)
virtual bool SendMsg(Sandesh *sandesh)=0
virtual bool EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len)
virtual const uint32_t seqnum()
static const std::string sandesh_open_attr_length_
bool ExtractMsg(Buffer buffer, int *result, bool NewBuf)
static const unsigned int kDefaultSendSize
#define sXML_SANDESH_CLOSE
virtual void ReleaseBuffer(Buffer buffer)
static bool IsLoggingDroppedAllowed(SandeshType::type)
SandeshStateMachine * state_machine() const
virtual std::string ToString() const =0
#define sXML_SANDESH_OPEN
SandeshConnection * connection_
SandeshReader(SandeshSession *session)
virtual int32_t Write(boost::shared_ptr< contrail::sandesh::protocol::TProtocol > oprot) const =0
void increment_recv_fail()
static std::string source()
virtual ~SandeshSession()
void ReplaceBuf(const std::string &str)
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
static const uint8_t * BufferData(const Buffer &buffer)
boost::function< bool(const std::string &, SandeshSession *)> SandeshReceiveMsgCb
bool Enqueue(QueueEntryT entry)
boost::function< void(TcpSession *, Event)> EventObserver
#define sXML_SANDESH_OPEN_ATTR_LENGTH
virtual void EnqueueClose()
static bool IsSendQueueEnabled()
virtual boost::system::error_code SetSocketOptions()
bool SendMsg(SandeshElement element)
static std::string instance_id()
bool IsEstablished() const