OpenSDN source code
sandesh_session.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_session.cc
7 //
8 // Sandesh session
9 //
10 
11 #include <boost/bind/bind.hpp>
12 #include <boost/assign.hpp>
13 #include <boost/algorithm/string.hpp>
14 
15 #include <base/parse_object.h>
16 
17 #include <sandesh/common/vns_types.h>
18 #include <sandesh/common/vns_constants.h>
19 #include <sandesh/transport/TBufferTransports.h>
20 #include <sandesh/protocol/TXMLProtocol.h>
21 #include <sandesh/protocol/TJSONProtocol.h>
22 #include "sandesh/sandesh_types.h"
23 #include "sandesh/sandesh.h"
24 
25 #include "sandesh_connection.h"
26 #include "sandesh_session.h"
27 
28 
29 using namespace std;
30 using namespace contrail::sandesh::protocol;
31 using namespace contrail::sandesh::transport;
32 using namespace boost::placeholders;
33 
34 using boost::asio::mutable_buffer;
35 using boost::asio::buffer_cast;
36 
41 
42 //
43 // SandeshWriter
44 //
46  : session_(session),
47  ready_to_send_(true),
48  send_buf_(new uint8_t[kDefaultSendSize]),
49  send_buf_offset_(0) {
50 }
51 
53  delete [] send_buf_;
54 }
55 
56 void SandeshWriter::WriteReady(const boost::system::error_code &ec) {
57  if (ec) {
58  SANDESH_LOG(ERROR, "SandeshSession Write error value: " << ec.value()
59  << " category: " << ec.category().name()
60  << " message: " << ec.message());
62  return;
63  }
64 
65  {
66  std::scoped_lock lock(send_mutex_);
67  ready_to_send_ = true;
68  }
69 
70  // We may want to start the Runner for the send_queue
72 }
73 
75  SandeshHeader header;
76  std::stringstream ss;
77  uint8_t *buffer;
78  int32_t xfer = 0, ret;
79  uint32_t offset;
80  boost::shared_ptr<TMemoryBuffer> btrans(
82  boost::shared_ptr<TXMLProtocol> prot(
83  new TXMLProtocol(btrans));
84  // Populate the header
85  header.set_Namespace(sandesh->scope());
86  header.set_Timestamp(sandesh->timestamp());
87  header.set_Module(sandesh->module());
88  header.set_Source(sandesh->source());
89  header.set_Context(sandesh->context());
90  header.set_SequenceNum(sandesh->seqnum());
91  header.set_VersionSig(sandesh->versionsig());
92  header.set_Type(sandesh->type());
93  header.set_Hints(sandesh->hints());
94  header.set_Level(sandesh->level());
95  header.set_Category(sandesh->category());
96  header.set_NodeType(sandesh->node_type());
97  header.set_InstanceId(sandesh->instance_id());
98 
99  // Write the sandesh open envelope.
100  buffer = btrans->getWritePtr(sandesh_open_.length());
101  memcpy(buffer, sandesh_open_.c_str(), sandesh_open_.length());
102  btrans->wroteBytes(sandesh_open_.length());
103  // Write the sandesh header
104  if ((ret = header.write(prot)) < 0) {
105  SANDESH_LOG(ERROR, __func__ << ": Sandesh header write FAILED: " <<
106  sandesh->Name() << " : " << sandesh->source() << ":" <<
107  sandesh->module() << ":" << sandesh->instance_id() <<
108  " Sequence Number:" << sandesh->seqnum());
111  SandeshTxDropReason::HeaderWriteFailed);
112  sandesh->Release();
113  return;
114  }
115  xfer += ret;
116  // Write the sandesh
117  if ((ret = sandesh->Write(prot)) < 0) {
118  SANDESH_LOG(ERROR, __func__ << ": Sandesh write FAILED: "<<
119  sandesh->Name() << " : " << sandesh->source() << ":" <<
120  sandesh->module() << ":" << sandesh->instance_id() <<
121  " Sequence Number:" << sandesh->seqnum());
124  SandeshTxDropReason::WriteFailed);
125  sandesh->Release();
126  return;
127  }
128  xfer += ret;
129  // Write the sandesh close envelope
130  buffer = btrans->getWritePtr(sandesh_close_.length());
131  memcpy(buffer, sandesh_close_.c_str(), sandesh_close_.length());
132  btrans->wroteBytes(sandesh_close_.length());
133  // Get the buffer
134  btrans->getBuffer(&buffer, &offset);
135  // Sanity
136  assert(sandesh_open_.length() + xfer + sandesh_close_.length() ==
137  offset);
138  // Update the sandesh open envelope length;
139  char prev = ss.fill('0');
140  // Adjust for '">'
141  ss.width(sandesh_open_.length() - sandesh_open_attr_length_.length() - 2);
142  ss << offset;
143  ss.fill(prev);
144  memcpy(buffer + sandesh_open_attr_length_.length(), ss.str().c_str(),
145  ss.str().length());
146 
147  // Update sandesh stats
148  Sandesh::UpdateTxMsgStats(sandesh->Name(), offset);
150 
151  if (send_buf()) {
152  if (more) {
153  // There are more messages in the send_queue_.
154  // Try to package as many sandesh messages as possible
155  // (== kEncodeBufferSize) before transporting to the
156  // receiver.
157  SendMsgMore(btrans);
158  } else {
159  // send_queue_ is empty. Flush sandesh->send_buf_ and this message.
160  SendMsgAll(btrans);
161  }
162  } else {
163  // Send the message
164  SendInternal(btrans);
165  }
166  sandesh->Release();
167 }
168 
169 // Package as many sandesh messages as possible [not more than
170 // kEncodeBufferSize] before transporting it to the receiver.
171 //
172 // send_buf_ => unsent data (partial/complete message).
173 // buf => new message
174 // buf_len => buf's len
175 void SandeshWriter::SendMsgMore(boost::shared_ptr<TMemoryBuffer>
176  send_buffer) {
177  uint8_t *buf;
178  uint32_t buf_len;
179 
180  send_buffer->getBuffer(&buf, &buf_len);
181 
182  if (send_buf_offset()) {
183  // We have some unsent data.
184  size_t bulk_msg_len = send_buf_offset() + buf_len;
185  if (bulk_msg_len < kDefaultSendSize) {
186  // We still have space for more data. Don't send the message yet.
187  // Add the message to the existing data.
188  append_send_buf(buf, buf_len);
189  } else {
190  uint8_t *buffer;
191  // (send_buf_offset() + buf_len) >= kDefaultSendSize
192  boost::shared_ptr<TMemoryBuffer> bulk_msg(
194  buffer = bulk_msg->getWritePtr(send_buf_offset());
195  // Copy unsent data
196  memcpy(buffer, send_buf(), send_buf_offset());
197  bulk_msg->wroteBytes(send_buf_offset());
198  // Send it
199  SendInternal(bulk_msg);
200  // Cleanup send_buf
201  reset_send_buf();
202  // Next send the new message
203  SendInternal(send_buffer);
204  }
205  } else {
206  // We don't have any unsent data.
207  if (buf_len >= kDefaultSendSize) {
208  // We don't have room to accommodate anything more.
209  // Send the message now.
210  SendInternal(send_buffer);
211  } else {
212  // We have room to accommodate more data.
213  // Save this message.
214  // Note: The memcpy here can be avoided by passing send_buf_
215  // to TMemoryBuffer so that the message is encoded in
216  // send_buf_ itself.
217  set_send_buf(buf, buf_len);
218  }
219  }
220 }
221 
222 // sandesh->send_queue_ is empty.
223 // Flush unsent data (if any) and this message.
224 void SandeshWriter::SendMsgAll(boost::shared_ptr<TMemoryBuffer> send_buffer) {
225  uint8_t *buf;
226  uint32_t buf_len;
227 
228  send_buffer->getBuffer(&buf, &buf_len);
229 
230  if (send_buf_offset()) {
231  // We have some unsent data.
232  size_t bulk_msg_len = send_buf_offset() + buf_len;
233  if (bulk_msg_len <= kDefaultSendSize) {
234  uint8_t *buffer;
235  // We have enough room to send all the pending data in one message.
236  boost::shared_ptr<TMemoryBuffer> bulk_msg(new
237  TMemoryBuffer(bulk_msg_len));
238  buffer = bulk_msg->getWritePtr(bulk_msg_len);
239  memcpy(buffer, send_buf(), send_buf_offset());
240  memcpy(buffer + send_buf_offset(), buf, buf_len);
241  bulk_msg->wroteBytes(bulk_msg_len);
242  // send the message
243  SendInternal(bulk_msg);
244  // reset send_buf_
245  reset_send_buf();
246  } else {
247  uint8_t *buffer;
248  // We don't have enough space to accommodate all the
249  // pending data in one message.
250  boost::shared_ptr<TMemoryBuffer> old_buf(new
252  buffer = old_buf->getWritePtr(send_buf_offset());
253  memcpy(buffer, send_buf(),
254  send_buf_offset());
255  old_buf->wroteBytes(send_buf_offset());
256  // Take care of the unsent data in send_buf_ first.
257  // Note that we could have accomodated part of buf [last message]
258  // here. But, not doing so to avoid additional memcpy() :)
259  SendInternal(old_buf);
260  // Cleanup send_buf_
261  reset_send_buf();
262  // Well, send the last message now.
263  SendInternal(send_buffer);
264  }
265  } else {
266  // No unsent data. Send the message now.
267  SendInternal(send_buffer);
268  }
269 }
270 
271 void SandeshWriter::SendInternal(boost::shared_ptr<TMemoryBuffer> buf) {
272  uint8_t *buffer;
273  uint32_t len;
274  buf->getBuffer(&buffer, &len);
275  std::scoped_lock lock(send_mutex_);
276  ready_to_send_ = session_->Send((const uint8_t *)buffer, len, NULL);
277 }
278 
279 //
280 // SandeshSession
281 //
283  int task_instance, int writer_task_id, int reader_task_id) :
284  SslSession(client, socket),
285  instance_(task_instance),
286  writer_(new SandeshWriter(this)),
287  reader_(new SandeshReader(this)),
288  send_queue_(new Sandesh::SandeshQueue(writer_task_id,
289  task_instance,
290  boost::bind(&SandeshSession::SendMsg, this, _1),
291  kQueueSize)),
292  stats_client_(NULL),
293  connection_(NULL),
294  keepalive_idle_time_(kSessionKeepaliveIdleTime),
295  keepalive_interval_(kSessionKeepaliveInterval),
296  keepalive_probes_(kSessionKeepaliveProbes),
297  tcp_user_timeout_(kSessionTcpUserTimeout),
298  reader_task_id_(reader_task_id),
299  sending_level_(SandeshLevel::INVALID) {
301  send_buffer_queue_.reset(new Sandesh::SandeshBufferQueue(writer_task_id,
302  task_instance,
303  boost::bind(&SandeshSession::SendBuffer, this, _1)));
304  send_buffer_queue_->SetStartRunnerFunc(boost::bind(&SandeshSession::SessionSendReady, this));
305  }
306  send_queue_->SetStartRunnerFunc(boost::bind(&SandeshSession::SessionSendReady, this));
307 }
308 
310 }
311 
313  return (IsEstablished() && writer_->SendReady() &&
315 }
316 
319  WaterMarkInfo wm(boost::get<0>(swmi),
320  boost::bind(&SandeshSession::SetSendingLevel, this, _1,
321  boost::get<1>(swmi)));
322  if (boost::get<2>(swmi)) {
323  send_queue_->SetHighWaterMark(wm);
324  } else {
325  send_queue_->SetLowWaterMark(wm);
326  }
327 }
328 
330  send_queue_->ResetHighWaterMark();
331  send_queue_->ResetLowWaterMark();
332 }
333 
335  if (sending_level_ != level) {
336  sending_level_ = level;
337  }
338 }
339 
341  return sending_level_;
342 }
343 
346  send_buffer_queue_->Shutdown();
347  }
348  send_queue_->Shutdown();
349 }
350 
351 std::string SandeshSession::ToString() const {
352  std::stringstream out;
353  out << TcpSession::ToString() << "(" << instance_ << ")";
354  return out.str();
355 }
356 
357 boost::system::error_code SandeshSession::SetSocketOptions() {
358  boost::system::error_code ec = TcpSession::SetSocketOptions();
359  if (ec) {
360  return ec;
361  }
364 }
365 
367  reader_->OnRead(buffer);
368 }
369 
371  Sandesh *sandesh = element.snh_;
372  std::scoped_lock lock(send_mutex_);
373  if (!IsEstablished()) {
375  SANDESH_LOG(ERROR, __func__ << " Not Connected : Dropping Message: " <<
376  sandesh->ToString());
377  }
380  SandeshTxDropReason::SessionNotConnected);
381  sandesh->Release();
382  return true;
383  }
384  if (sandesh->IsLoggingAllowed()) {
385  sandesh->Log();
386  }
387  bool more = !send_queue_->IsQueueEmpty();
388  if (stats_client_ && sandesh->type() == SandeshType::UVE) {
390  }
391  writer_->SendMsg(sandesh, more);
392  return true;
393 }
394 
395 bool SandeshSession::SendBuffer(boost::shared_ptr<TMemoryBuffer> sbuffer) {
396  std::scoped_lock lock(send_mutex_);
397  if (!IsEstablished()) {
399  return true;
400  }
401  // No buffer packing supported currently
402  writer_->SendBuffer(sbuffer);
403  return true;
404 }
405 
406 bool SandeshSession::EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len) {
407  boost::shared_ptr<TMemoryBuffer> sbuffer(new TMemoryBuffer(buf_len));
408  u_int8_t *write_buf = sbuffer->getWritePtr(buf_len);
409  memcpy(write_buf, buf, buf_len);
410  sbuffer->wroteBytes(buf_len);
411  return send_buffer_queue()->Enqueue(sbuffer);
412 }
413 
415  const SandeshHeader& header,
416  const string& sandesh_name, const uint32_t& header_offset) {
417  namespace sandesh_prot = contrail::sandesh::protocol;
418  namespace sandesh_trans = contrail::sandesh::transport;
419 
420  assert(header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT);
421 
422  // Create and process the sandesh
424  if (sandesh == NULL) {
425  SANDESH_LOG(ERROR, __func__ << ": Unknown sandesh ctrl message: " << sandesh_name);
426  return NULL;
427  }
428  boost::shared_ptr<sandesh_trans::TMemoryBuffer> btrans =
429  boost::shared_ptr<sandesh_trans::TMemoryBuffer>(
430  new sandesh_trans::TMemoryBuffer((uint8_t *)msg.c_str() + header_offset,
431  msg.size() - header_offset));
432  boost::shared_ptr<sandesh_prot::TXMLProtocol> prot =
433  boost::shared_ptr<sandesh_prot::TXMLProtocol>(new sandesh_prot::TXMLProtocol(btrans));
434  int32_t xfer = sandesh->Read(prot);
435  if (xfer < 0) {
436  SANDESH_LOG(ERROR, __func__ << ": Decoding " << sandesh_name << " for ctrl FAILED");
437  sandesh->Release();
438  return NULL;
439  } else {
440  return sandesh;
441  }
442 }
443 
445  if (IsClosed()) {
446  return;
447  }
448  std::scoped_lock lock(conn_mutex_);
449  if (connection_) {
452  } else {
454  if (eobs) {
455  eobs(this, TcpSession::CLOSE);
456  }
457  }
458 }
459 
460 //
461 // SandeshReader
462 //
464  buf_(""),
465  offset_(0),
466  msg_length_(-1),
467  session_(session) {
468  buf_.reserve(kDefaultRecvSize);
469 }
470 
472 }
473 
474 int SandeshReader::ExtractMsgHeader(const std::string& msg,
475  SandeshHeader& header, std::string& msg_type, uint32_t& header_offset) {
476  int32_t xfer = 0, ret;
477  boost::shared_ptr<TMemoryBuffer> btrans =
478  boost::shared_ptr<TMemoryBuffer>(
479  new TMemoryBuffer((uint8_t *)msg.c_str(), msg.size()));
480  boost::shared_ptr<TXMLProtocol> prot =
481  boost::shared_ptr<TXMLProtocol>(new TXMLProtocol(btrans));
482  // Read the sandesh header and note the offset
483  if ((ret = header.read(prot)) <= 0) {
484  SANDESH_LOG(ERROR, __func__ << ": Sandesh header read FAILED: " << msg);
485  return EINVAL;
486  }
487  xfer += ret;
488  header_offset = xfer;
489  // Extract the message name
490  if ((ret = prot->readSandeshBegin(msg_type)) <= 0) {
491  SANDESH_LOG(ERROR, __func__ << ": Sandesh begin read FAILED: " << msg);
492  return EINVAL;
493  }
494  xfer += ret;
495  return 0;
496 }
497 
498 void SandeshReader::SetBuf(const std::string &str) {
499  if (buf_.empty()) {
500  ReplaceBuf(str);
501  } else {
502  buf_ += str;
503  }
504  // TODO handle buf_ > kMaxMessageSize
505 }
506 
507 void SandeshReader::ReplaceBuf(const std::string &str) {
508  buf_ = str;
510  offset_ = 0;
511 }
512 
514  if (buf_.empty()) {
515  return false;
516  }
517  return (buf_.size() != offset_);
518 }
519 
520 // Returns false if not able to extract the message length, true otherwise
521 bool SandeshReader::ExtractMsgLength(size_t &msg_length, int *result) {
522  // Have we read enough to extract the message length?
523  if (buf_.size() - offset_ < SandeshWriter::sandesh_open_.size()) {
524  return false;
525  }
526  // Some sanity check
527  if (!boost::algorithm::starts_with(buf_.c_str() + offset_,
529  *result = -1;
530  return false;
531  }
532 
533  std::string::const_iterator end = buf_.begin() + offset_ +
534  SandeshWriter::sandesh_open_.size() - 1;
535  if (*end != '>') {
536  *result = -2;
537  return false;
538  }
539 
540  std::string::const_iterator st = buf_.begin() + offset_ +
542  // Adjust for double quote
543  --end;
544  string length = string(st, end);
545 
546  stringToInteger(length.c_str(), msg_length);
547  if (msg_length == 0) {
548  *result = -3;
549  return false;
550  }
551  return true;
552 }
553 
554 // Returns false if not able to extract the full message, true otherwise
555 bool SandeshReader::ExtractMsg(Buffer buffer, int *result, bool NewBuf) {
556  if (NewBuf) {
557  const uint8_t *cp = TcpSession::BufferData(buffer);
558  // TODO Avoid this copy
559  std::string str(cp, cp + TcpSession::BufferSize(buffer));
560  SetBuf(str);
561  }
562  // Extract the message length
563  if (!MsgLengthKnown()) {
564  size_t msg_length = 0;
565  bool done = ExtractMsgLength(msg_length, result);
566  if (done == false) {
567  return false;
568  }
570  }
571  // Check if the entire message is read or not
572  if (buf_.size() < msg_length()) {
573  return false;
574  }
575  return true;
576 }
577 
579  std::scoped_lock lock(cb_mutex_);
580  // Check if session is being deleted, then drop the packet
581  if (cb_.empty()) {
582  SANDESH_LOG(ERROR, __func__ <<
583  " Session being deleted: Dropping Message");
585  session_->ReleaseBuffer(buffer);
586  return;
587  }
588  int result = 0;
589  bool done = ExtractMsg(buffer, &result, true);
590  do {
591  if (result < 0) {
592  // Generate error and close connection
593  SANDESH_LOG(ERROR, __func__ << " Message extract failed: " << result);
594  const uint8_t *cp = TcpSession::BufferData(buffer);
595  size_t cp_size = TcpSession::BufferSize(buffer);
596  SANDESH_LOG(ERROR, __func__ << " OnRead Buffer Size: " << cp_size);
597  SANDESH_LOG(ERROR, __func__ << " OnRead Buffer: ");
598  std::string debug((const char*)cp, cp_size);
599  SANDESH_LOG(ERROR, debug);
600  SANDESH_LOG(ERROR, __func__ << " Reader Size: " << buf_.size());
601  SANDESH_LOG(ERROR, __func__ << " Reader Offset: " << offset_);
602  SANDESH_LOG(ERROR, __func__ << " Reader Buffer: " << buf_);
603  buf_.clear();
604  offset_ = 0;
605  // Enqueue a close on the state machine
608  break;
609  }
610  if (done == true) {
611  // We got good match. Process the message after extracting out
612  // the sandesh open and close envelope
613  std::string::const_iterator st = buf_.begin() + offset_ +
615  std::string::const_iterator end = buf_.begin() + offset_ +
617  std::string xml(st, end);
618  offset_ += msg_length();
620  if (!cb_(xml, session_)) {
621  // Enqueue a close on the state machine
624  break;
625  }
626  } else {
627  // Read more data.
628  break;
629  }
630 
631  if (LeftOver()) {
632  ReplaceBuf(string(buf_, offset_, buf_.size() - offset_));
633  done = ExtractMsg(buffer, &result, false);
634  } else {
635  // No more data in the Buffer
636  buf_.clear();
637  offset_ = 0;
638  break;
639  }
640  } while (true);
641 
642  session_->ReleaseBuffer(buffer);
643  return;
644 }
645 
647  std::scoped_lock lock(cb_mutex_);
648  cb_ = cb;
649 }
static Sandesh * CreateInstance(std::string const &s)
Definition: cpp/sandesh.h:625
SandeshStateMachine * state_machine() const
void reset_msg_length()
virtual ~SandeshReader()
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
virtual void OnRead(Buffer buffer)
size_t msg_length()
SandeshReader(SandeshSession *session)
std::mutex cb_mutex_
void ReplaceBuf(const std::string &str)
SandeshReceiveMsgCb cb_
SandeshSession * session_
boost::asio::const_buffer Buffer
std::string buf_
void SetBuf(const std::string &str)
static const int kDefaultRecvSize
bool ExtractMsg(Buffer buffer, int *result, bool NewBuf)
bool LeftOver() const
bool ExtractMsgLength(size_t &msg_length, int *result)
void set_msg_length(size_t length)
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
bool SendMsg(SandeshElement element)
virtual void EnqueueClose()
virtual std::string ToString() const
virtual void OnRead(Buffer buffer)
virtual ~SandeshSession()
virtual boost::system::error_code SetSocketOptions()
void increment_send_buffer_fail()
SandeshSession(SslServer *client, SslSocket *socket, int task_instance, int writer_task_id, int reader_task_id)
void increment_write_ready_cb_error()
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
StatsClient * stats_client_
virtual void Shutdown()
boost::scoped_ptr< SandeshWriter > writer_
void SetSendQueueWaterMark(Sandesh::QueueWaterMarkInfo &wm_info)
boost::scoped_ptr< SandeshReader > reader_
Sandesh::SandeshBufferQueue * send_buffer_queue()
bool SendBuffer(boost::shared_ptr< TMemoryBuffer > sbuffer)
void increment_recv_fail()
void SetSendingLevel(size_t count, SandeshLevel::type level)
void increment_send_msg_fail()
SandeshLevel::type sending_level_
Sandesh::SandeshQueue * send_queue()
std::mutex conn_mutex_
std::mutex send_mutex_
virtual bool EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len)
void increment_send_msg()
boost::scoped_ptr< Sandesh::SandeshQueue > send_queue_
SandeshLevel::type SendingLevel() const
boost::scoped_ptr< Sandesh::SandeshBufferQueue > send_buffer_queue_
void ResetSendQueueWaterMark()
SandeshConnection * connection_
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
static const std::string sandesh_open_
void append_send_buf(uint8_t *buf, size_t len)
static const unsigned int kDefaultSendSize
void SendMsg(Sandesh *sandesh, bool more)
SandeshWriter(SandeshSession *session)
void WriteReady(const boost::system::error_code &ec)
void reset_send_buf()
void SendInternal(boost::shared_ptr< TMemoryBuffer >)
size_t send_buf_offset()
SandeshSession * session_
void set_send_buf(uint8_t *buf, size_t len)
static const uint32_t kEncodeBufferSize
static const std::string sandesh_close_
void SendMsgAll(boost::shared_ptr< TMemoryBuffer >)
void SendMsgMore(boost::shared_ptr< TMemoryBuffer >)
uint8_t * send_buf() const
uint8_t * send_buf_
std::mutex send_mutex_
static const std::string sandesh_open_attr_length_
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
Definition: sandesh.cc:891
static SandeshRole::type role()
Definition: cpp/sandesh.h:298
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
Definition: cpp/sandesh.h:149
static void UpdateTxMsgStats(const std::string &msg_name, uint64_t bytes)
Definition: sandesh.cc:885
static bool IsSendQueueEnabled()
Definition: cpp/sandesh.h:241
static bool IsLoggingDroppedAllowed(SandeshType::type)
Definition: sandesh.cc:845
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
Definition: ssl_session.h:20
virtual bool SendMsg(Sandesh *sandesh)=0
bool IsClosed() const
Definition: tcp_session.h:121
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:109
virtual std::string ToString() const
Definition: tcp_session.h:79
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:112
virtual void ReleaseBuffer(Buffer buffer)
Definition: tcp_session.cc:144
boost::function< void(TcpSession *, Event)> EventObserver
Definition: tcp_session.h:59
boost::asio::const_buffer Buffer
Definition: tcp_session.h:60
bool IsEstablished() const
Definition: tcp_session.h:116
virtual boost::system::error_code SetSocketOptions()
Definition: tcp_session.cc:869
EventObserver observer()
Definition: tcp_session.h:210
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
Definition: tcp_session.cc:803
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:429
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
void MayBeStartRunner()
Definition: queue_task.h:281
#define SANDESH_LOG(_Level, _Msg)
Definition: cpp/sandesh.h:476
@ INVALID
Definition: globals.h:147
uint8_t type
Definition: load_balance.h:2
boost::function< bool(const std::string &, SandeshSession *)> SandeshReceiveMsgCb
#define sXML_SANDESH_OPEN
#define sXML_SANDESH_CLOSE
#define sXML_SANDESH_OPEN_ATTR_LENGTH
bool stringToInteger(const std::string &str, NumberType &num)
Definition: string_util.h:71
Sandesh * snh_
Definition: cpp/sandesh.h:456