11 #include <boost/bind.hpp>
12 #include <boost/assign.hpp>
13 #include <boost/algorithm/string.hpp>
17 #include <sandesh/common/vns_types.h>
18 #include <sandesh/common/vns_constants.h>
19 #include <sandesh/transport/TBufferTransports.h>
20 #include <sandesh/protocol/TXMLProtocol.h>
21 #include <sandesh/protocol/TJSONProtocol.h>
22 #include "sandesh/sandesh_types.h"
23 #include "sandesh/sandesh.h"
33 using boost::asio::mutable_buffer;
34 using boost::asio::buffer_cast;
47 send_buf_(new uint8_t[kDefaultSendSize]),
57 SANDESH_LOG(ERROR,
"SandeshSession Write error value: " << ec.value()
58 <<
" category: " << ec.category().name()
59 <<
" message: " << ec.message());
77 int32_t xfer = 0, ret;
79 boost::shared_ptr<TMemoryBuffer> btrans(
81 boost::shared_ptr<TXMLProtocol> prot(
84 header.set_Namespace(
sandesh->scope());
85 header.set_Timestamp(
sandesh->timestamp());
86 header.set_Module(
sandesh->module());
87 header.set_Source(
sandesh->source());
88 header.set_Context(
sandesh->context());
89 header.set_SequenceNum(
sandesh->seqnum());
90 header.set_VersionSig(
sandesh->versionsig());
91 header.set_Type(
sandesh->type());
92 header.set_Hints(
sandesh->hints());
93 header.set_Level(
sandesh->level());
94 header.set_Category(
sandesh->category());
95 header.set_NodeType(
sandesh->node_type());
96 header.set_InstanceId(
sandesh->instance_id());
103 if ((ret = header.write(prot)) < 0) {
104 SANDESH_LOG(ERROR, __func__ <<
": Sandesh header write FAILED: " <<
107 " Sequence Number:" <<
sandesh->seqnum());
110 SandeshTxDropReason::HeaderWriteFailed);
116 if ((ret =
sandesh->Write(prot)) < 0) {
117 SANDESH_LOG(ERROR, __func__ <<
": Sandesh write FAILED: "<<
120 " Sequence Number:" <<
sandesh->seqnum());
123 SandeshTxDropReason::WriteFailed);
133 btrans->getBuffer(&buffer, &offset);
138 char prev = ss.fill(
'0');
179 send_buffer->getBuffer(&buf, &buf_len);
191 boost::shared_ptr<TMemoryBuffer> bulk_msg(
227 send_buffer->getBuffer(&buf, &buf_len);
235 boost::shared_ptr<TMemoryBuffer> bulk_msg(
new
237 buffer = bulk_msg->getWritePtr(bulk_msg_len);
240 bulk_msg->wroteBytes(bulk_msg_len);
249 boost::shared_ptr<TMemoryBuffer> old_buf(
new
273 buf->getBuffer(&buffer, &len);
282 int task_instance,
int writer_task_id,
int reader_task_id) :
284 instance_(task_instance),
287 send_queue_(new
Sandesh::SandeshQueue(writer_task_id,
293 keepalive_idle_time_(kSessionKeepaliveIdleTime),
294 keepalive_interval_(kSessionKeepaliveInterval),
295 keepalive_probes_(kSessionKeepaliveProbes),
296 tcp_user_timeout_(kSessionTcpUserTimeout),
297 reader_task_id_(reader_task_id),
298 sending_level_(SandeshLevel::
INVALID) {
320 boost::get<1>(swmi)));
321 if (boost::get<2>(swmi)) {
351 std::stringstream out;
374 SANDESH_LOG(ERROR, __func__ <<
" Not Connected : Dropping Message: " <<
379 SandeshTxDropReason::SessionNotConnected);
383 if (
sandesh->IsLoggingAllowed()) {
406 boost::shared_ptr<TMemoryBuffer> sbuffer(
new TMemoryBuffer(buf_len));
407 u_int8_t *write_buf = sbuffer->getWritePtr(buf_len);
408 memcpy(write_buf, buf, buf_len);
409 sbuffer->wroteBytes(buf_len);
414 const SandeshHeader& header,
415 const string& sandesh_name,
const uint32_t& header_offset) {
419 assert(header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT);
424 SANDESH_LOG(ERROR, __func__ <<
": Unknown sandesh ctrl message: " << sandesh_name);
427 boost::shared_ptr<sandesh_trans::TMemoryBuffer> btrans =
428 boost::shared_ptr<sandesh_trans::TMemoryBuffer>(
429 new sandesh_trans::TMemoryBuffer((uint8_t *)msg.c_str() + header_offset,
430 msg.size() - header_offset));
431 boost::shared_ptr<sandesh_prot::TXMLProtocol> prot =
432 boost::shared_ptr<sandesh_prot::TXMLProtocol>(
new sandesh_prot::TXMLProtocol(btrans));
433 int32_t xfer =
sandesh->Read(prot);
435 SANDESH_LOG(ERROR, __func__ <<
": Decoding " << sandesh_name <<
" for ctrl FAILED");
474 SandeshHeader& header, std::string& msg_type, uint32_t& header_offset) {
475 int32_t xfer = 0, ret;
476 boost::shared_ptr<TMemoryBuffer> btrans =
477 boost::shared_ptr<TMemoryBuffer>(
479 boost::shared_ptr<TXMLProtocol> prot =
480 boost::shared_ptr<TXMLProtocol>(
new TXMLProtocol(btrans));
482 if ((ret = header.read(prot)) <= 0) {
483 SANDESH_LOG(ERROR, __func__ <<
": Sandesh header read FAILED: " << msg);
487 header_offset = xfer;
489 if ((ret = prot->readSandeshBegin(msg_type)) <= 0) {
490 SANDESH_LOG(ERROR, __func__ <<
": Sandesh begin read FAILED: " << msg);
526 if (!boost::algorithm::starts_with(
buf_.c_str() +
offset_,
532 std::string::const_iterator end =
buf_.begin() +
offset_ +
539 std::string::const_iterator st =
buf_.begin() +
offset_ +
543 string length = string(st, end);
582 " Session being deleted: Dropping Message");
588 bool done =
ExtractMsg(buffer, &result,
true);
592 SANDESH_LOG(ERROR, __func__ <<
" Message extract failed: " << result);
595 SANDESH_LOG(ERROR, __func__ <<
" OnRead Buffer Size: " << cp_size);
596 SANDESH_LOG(ERROR, __func__ <<
" OnRead Buffer: ");
597 std::string debug((
const char*)cp, cp_size);
612 std::string::const_iterator st =
buf_.begin() +
offset_ +
614 std::string::const_iterator end =
buf_.begin() +
offset_ +
616 std::string xml(st, end);
static Sandesh * CreateInstance(std::string const &s)
SandeshStateMachine * state_machine() const
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
virtual void OnRead(Buffer buffer)
SandeshReader(SandeshSession *session)
void ReplaceBuf(const std::string &str)
SandeshSession * session_
boost::asio::const_buffer Buffer
void SetBuf(const std::string &str)
static const int kDefaultRecvSize
bool ExtractMsg(Buffer buffer, int *result, bool NewBuf)
bool ExtractMsgLength(size_t &msg_length, int *result)
void set_msg_length(size_t length)
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
bool SendMsg(SandeshElement element)
virtual void EnqueueClose()
virtual std::string ToString() const
virtual void OnRead(Buffer buffer)
virtual ~SandeshSession()
virtual boost::system::error_code SetSocketOptions()
void increment_send_buffer_fail()
SandeshSession(SslServer *client, SslSocket *socket, int task_instance, int writer_task_id, int reader_task_id)
void increment_write_ready_cb_error()
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
StatsClient * stats_client_
boost::scoped_ptr< SandeshWriter > writer_
void SetSendQueueWaterMark(Sandesh::QueueWaterMarkInfo &wm_info)
boost::scoped_ptr< SandeshReader > reader_
Sandesh::SandeshBufferQueue * send_buffer_queue()
bool SendBuffer(boost::shared_ptr< TMemoryBuffer > sbuffer)
void increment_recv_fail()
void SetSendingLevel(size_t count, SandeshLevel::type level)
void increment_send_msg_fail()
SandeshLevel::type sending_level_
Sandesh::SandeshQueue * send_queue()
virtual bool EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len)
void increment_send_msg()
boost::scoped_ptr< Sandesh::SandeshQueue > send_queue_
SandeshLevel::type SendingLevel() const
boost::scoped_ptr< Sandesh::SandeshBufferQueue > send_buffer_queue_
void ResetSendQueueWaterMark()
SandeshConnection * connection_
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
static const std::string sandesh_open_
void append_send_buf(uint8_t *buf, size_t len)
static const unsigned int kDefaultSendSize
void SendMsg(Sandesh *sandesh, bool more)
SandeshWriter(SandeshSession *session)
void WriteReady(const boost::system::error_code &ec)
void SendInternal(boost::shared_ptr< TMemoryBuffer >)
SandeshSession * session_
void set_send_buf(uint8_t *buf, size_t len)
static const uint32_t kEncodeBufferSize
static const std::string sandesh_close_
void SendMsgAll(boost::shared_ptr< TMemoryBuffer >)
void SendMsgMore(boost::shared_ptr< TMemoryBuffer >)
uint8_t * send_buf() const
static const std::string sandesh_open_attr_length_
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
static SandeshRole::type role()
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
static void UpdateTxMsgStats(const std::string &msg_name, uint64_t bytes)
static bool IsSendQueueEnabled()
static bool IsLoggingDroppedAllowed(SandeshType::type)
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
virtual bool SendMsg(Sandesh *sandesh)=0
static const uint8_t * BufferData(const Buffer &buffer)
virtual std::string ToString() const
static size_t BufferSize(const Buffer &buffer)
virtual void ReleaseBuffer(Buffer buffer)
boost::function< void(TcpSession *, Event)> EventObserver
boost::asio::const_buffer Buffer
bool IsEstablished() const
virtual boost::system::error_code SetSocketOptions()
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
bool Enqueue(QueueEntryT entry)
#define SANDESH_LOG(_Level, _Msg)
boost::function< bool(const std::string &, SandeshSession *)> SandeshReceiveMsgCb
#define sXML_SANDESH_OPEN
#define sXML_SANDESH_CLOSE
#define sXML_SANDESH_OPEN_ATTR_LENGTH
bool stringToInteger(const std::string &str, NumberType &num)