OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
sandesh_session.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh_session.cc
7 //
8 // Sandesh session
9 //
10 
11 #include <boost/bind.hpp>
12 #include <boost/assign.hpp>
13 #include <boost/algorithm/string.hpp>
14 
15 #include <base/parse_object.h>
16 
17 #include <sandesh/common/vns_types.h>
18 #include <sandesh/common/vns_constants.h>
19 #include <sandesh/transport/TBufferTransports.h>
20 #include <sandesh/protocol/TXMLProtocol.h>
21 #include <sandesh/protocol/TJSONProtocol.h>
22 #include "sandesh/sandesh_types.h"
23 #include "sandesh/sandesh.h"
24 
25 #include "sandesh_connection.h"
26 #include "sandesh_session.h"
27 
28 
29 using namespace std;
30 using namespace contrail::sandesh::protocol;
31 using namespace contrail::sandesh::transport;
32 
33 using boost::asio::mutable_buffer;
34 using boost::asio::buffer_cast;
35 
40 
41 //
42 // SandeshWriter
43 //
45  : session_(session),
46  ready_to_send_(true),
47  send_buf_(new uint8_t[kDefaultSendSize]),
48  send_buf_offset_(0) {
49 }
50 
52  delete [] send_buf_;
53 }
54 
55 void SandeshWriter::WriteReady(const boost::system::error_code &ec) {
56  if (ec) {
57  SANDESH_LOG(ERROR, "SandeshSession Write error value: " << ec.value()
58  << " category: " << ec.category().name()
59  << " message: " << ec.message());
61  return;
62  }
63 
64  {
65  tbb::mutex::scoped_lock lock(send_mutex_);
66  ready_to_send_ = true;
67  }
68 
69  // We may want to start the Runner for the send_queue
71 }
72 
73 void SandeshWriter::SendMsg(Sandesh *sandesh, bool more) {
74  SandeshHeader header;
75  std::stringstream ss;
76  uint8_t *buffer;
77  int32_t xfer = 0, ret;
78  uint32_t offset;
79  boost::shared_ptr<TMemoryBuffer> btrans(
81  boost::shared_ptr<TXMLProtocol> prot(
82  new TXMLProtocol(btrans));
83  // Populate the header
84  header.set_Namespace(sandesh->scope());
85  header.set_Timestamp(sandesh->timestamp());
86  header.set_Module(sandesh->module());
87  header.set_Source(sandesh->source());
88  header.set_Context(sandesh->context());
89  header.set_SequenceNum(sandesh->seqnum());
90  header.set_VersionSig(sandesh->versionsig());
91  header.set_Type(sandesh->type());
92  header.set_Hints(sandesh->hints());
93  header.set_Level(sandesh->level());
94  header.set_Category(sandesh->category());
95  header.set_NodeType(sandesh->node_type());
96  header.set_InstanceId(sandesh->instance_id());
97 
98  // Write the sandesh open envelope.
99  buffer = btrans->getWritePtr(sandesh_open_.length());
100  memcpy(buffer, sandesh_open_.c_str(), sandesh_open_.length());
101  btrans->wroteBytes(sandesh_open_.length());
102  // Write the sandesh header
103  if ((ret = header.write(prot)) < 0) {
104  SANDESH_LOG(ERROR, __func__ << ": Sandesh header write FAILED: " <<
105  sandesh->Name() << " : " << sandesh->source() << ":" <<
106  sandesh->module() << ":" << sandesh->instance_id() <<
107  " Sequence Number:" << sandesh->seqnum());
109  Sandesh::UpdateTxMsgFailStats(sandesh->Name(), 0,
110  SandeshTxDropReason::HeaderWriteFailed);
111  sandesh->Release();
112  return;
113  }
114  xfer += ret;
115  // Write the sandesh
116  if ((ret = sandesh->Write(prot)) < 0) {
117  SANDESH_LOG(ERROR, __func__ << ": Sandesh write FAILED: "<<
118  sandesh->Name() << " : " << sandesh->source() << ":" <<
119  sandesh->module() << ":" << sandesh->instance_id() <<
120  " Sequence Number:" << sandesh->seqnum());
122  Sandesh::UpdateTxMsgFailStats(sandesh->Name(), 0,
123  SandeshTxDropReason::WriteFailed);
124  sandesh->Release();
125  return;
126  }
127  xfer += ret;
128  // Write the sandesh close envelope
129  buffer = btrans->getWritePtr(sandesh_close_.length());
130  memcpy(buffer, sandesh_close_.c_str(), sandesh_close_.length());
131  btrans->wroteBytes(sandesh_close_.length());
132  // Get the buffer
133  btrans->getBuffer(&buffer, &offset);
134  // Sanity
135  assert(sandesh_open_.length() + xfer + sandesh_close_.length() ==
136  offset);
137  // Update the sandesh open envelope length;
138  char prev = ss.fill('0');
139  // Adjust for '">'
140  ss.width(sandesh_open_.length() - sandesh_open_attr_length_.length() - 2);
141  ss << offset;
142  ss.fill(prev);
143  memcpy(buffer + sandesh_open_attr_length_.length(), ss.str().c_str(),
144  ss.str().length());
145 
146  // Update sandesh stats
147  Sandesh::UpdateTxMsgStats(sandesh->Name(), offset);
149 
150  if (send_buf()) {
151  if (more) {
152  // There are more messages in the send_queue_.
153  // Try to package as many sandesh messages as possible
154  // (== kEncodeBufferSize) before transporting to the
155  // receiver.
156  SendMsgMore(btrans);
157  } else {
158  // send_queue_ is empty. Flush sandesh->send_buf_ and this message.
159  SendMsgAll(btrans);
160  }
161  } else {
162  // Send the message
163  SendInternal(btrans);
164  }
165  sandesh->Release();
166 }
167 
168 // Package as many sandesh messages as possible [not more than
169 // kEncodeBufferSize] before transporting it to the receiver.
170 //
171 // send_buf_ => unsent data (partial/complete message).
172 // buf => new message
173 // buf_len => buf's len
174 void SandeshWriter::SendMsgMore(boost::shared_ptr<TMemoryBuffer>
175  send_buffer) {
176  uint8_t *buf;
177  uint32_t buf_len;
178 
179  send_buffer->getBuffer(&buf, &buf_len);
180 
181  if (send_buf_offset()) {
182  // We have some unsent data.
183  size_t bulk_msg_len = send_buf_offset() + buf_len;
184  if (bulk_msg_len < kDefaultSendSize) {
185  // We still have space for more data. Don't send the message yet.
186  // Add the message to the existing data.
187  append_send_buf(buf, buf_len);
188  } else {
189  uint8_t *buffer;
190  // (send_buf_offset() + buf_len) >= kDefaultSendSize
191  boost::shared_ptr<TMemoryBuffer> bulk_msg(
193  buffer = bulk_msg->getWritePtr(send_buf_offset());
194  // Copy unsent data
195  memcpy(buffer, send_buf(), send_buf_offset());
196  bulk_msg->wroteBytes(send_buf_offset());
197  // Send it
198  SendInternal(bulk_msg);
199  // Cleanup send_buf
200  reset_send_buf();
201  // Next send the new message
202  SendInternal(send_buffer);
203  }
204  } else {
205  // We don't have any unsent data.
206  if (buf_len >= kDefaultSendSize) {
207  // We don't have room to accommodate anything more.
208  // Send the message now.
209  SendInternal(send_buffer);
210  } else {
211  // We have room to accommodate more data.
212  // Save this message.
213  // Note: The memcpy here can be avoided by passing send_buf_
214  // to TMemoryBuffer so that the message is encoded in
215  // send_buf_ itself.
216  set_send_buf(buf, buf_len);
217  }
218  }
219 }
220 
221 // sandesh->send_queue_ is empty.
222 // Flush unsent data (if any) and this message.
223 void SandeshWriter::SendMsgAll(boost::shared_ptr<TMemoryBuffer> send_buffer) {
224  uint8_t *buf;
225  uint32_t buf_len;
226 
227  send_buffer->getBuffer(&buf, &buf_len);
228 
229  if (send_buf_offset()) {
230  // We have some unsent data.
231  size_t bulk_msg_len = send_buf_offset() + buf_len;
232  if (bulk_msg_len <= kDefaultSendSize) {
233  uint8_t *buffer;
234  // We have enough room to send all the pending data in one message.
235  boost::shared_ptr<TMemoryBuffer> bulk_msg(new
236  TMemoryBuffer(bulk_msg_len));
237  buffer = bulk_msg->getWritePtr(bulk_msg_len);
238  memcpy(buffer, send_buf(), send_buf_offset());
239  memcpy(buffer + send_buf_offset(), buf, buf_len);
240  bulk_msg->wroteBytes(bulk_msg_len);
241  // send the message
242  SendInternal(bulk_msg);
243  // reset send_buf_
244  reset_send_buf();
245  } else {
246  uint8_t *buffer;
247  // We don't have enough space to accommodate all the
248  // pending data in one message.
249  boost::shared_ptr<TMemoryBuffer> old_buf(new
251  buffer = old_buf->getWritePtr(send_buf_offset());
252  memcpy(buffer, send_buf(),
253  send_buf_offset());
254  old_buf->wroteBytes(send_buf_offset());
255  // Take care of the unsent data in send_buf_ first.
256  // Note that we could have accomodated part of buf [last message]
257  // here. But, not doing so to avoid additional memcpy() :)
258  SendInternal(old_buf);
259  // Cleanup send_buf_
260  reset_send_buf();
261  // Well, send the last message now.
262  SendInternal(send_buffer);
263  }
264  } else {
265  // No unsent data. Send the message now.
266  SendInternal(send_buffer);
267  }
268 }
269 
270 void SandeshWriter::SendInternal(boost::shared_ptr<TMemoryBuffer> buf) {
271  uint8_t *buffer;
272  uint32_t len;
273  buf->getBuffer(&buffer, &len);
274  tbb::mutex::scoped_lock lock(send_mutex_);
275  ready_to_send_ = session_->Send((const uint8_t *)buffer, len, NULL);
276 }
277 
278 //
279 // SandeshSession
280 //
282  int task_instance, int writer_task_id, int reader_task_id) :
283  SslSession(client, socket),
284  instance_(task_instance),
285  writer_(new SandeshWriter(this)),
286  reader_(new SandeshReader(this)),
287  send_queue_(new Sandesh::SandeshQueue(writer_task_id,
288  task_instance,
289  boost::bind(&SandeshSession::SendMsg, this, _1),
290  kQueueSize)),
291  stats_client_(NULL),
292  connection_(NULL),
293  keepalive_idle_time_(kSessionKeepaliveIdleTime),
294  keepalive_interval_(kSessionKeepaliveInterval),
295  keepalive_probes_(kSessionKeepaliveProbes),
296  tcp_user_timeout_(kSessionTcpUserTimeout),
297  reader_task_id_(reader_task_id),
298  sending_level_(SandeshLevel::INVALID) {
300  send_buffer_queue_.reset(new Sandesh::SandeshBufferQueue(writer_task_id,
301  task_instance,
302  boost::bind(&SandeshSession::SendBuffer, this, _1)));
303  send_buffer_queue_->SetStartRunnerFunc(boost::bind(&SandeshSession::SessionSendReady, this));
304  }
305  send_queue_->SetStartRunnerFunc(boost::bind(&SandeshSession::SessionSendReady, this));
306 }
307 
309 }
310 
312  return (IsEstablished() && writer_->SendReady() &&
314 }
315 
318  WaterMarkInfo wm(boost::get<0>(swmi),
319  boost::bind(&SandeshSession::SetSendingLevel, this, _1,
320  boost::get<1>(swmi)));
321  if (boost::get<2>(swmi)) {
322  send_queue_->SetHighWaterMark(wm);
323  } else {
324  send_queue_->SetLowWaterMark(wm);
325  }
326 }
327 
329  send_queue_->ResetHighWaterMark();
330  send_queue_->ResetLowWaterMark();
331 }
332 
334  if (sending_level_ != level) {
335  sending_level_ = level;
336  }
337 }
338 
340  return sending_level_;
341 }
342 
345  send_buffer_queue_->Shutdown();
346  }
347  send_queue_->Shutdown();
348 }
349 
350 std::string SandeshSession::ToString() const {
351  std::stringstream out;
352  out << TcpSession::ToString() << "(" << instance_ << ")";
353  return out.str();
354 }
355 
356 boost::system::error_code SandeshSession::SetSocketOptions() {
357  boost::system::error_code ec = TcpSession::SetSocketOptions();
358  if (ec) {
359  return ec;
360  }
363 }
364 
366  reader_->OnRead(buffer);
367 }
368 
370  Sandesh *sandesh = element.snh_;
371  tbb::mutex::scoped_lock lock(send_mutex_);
372  if (!IsEstablished()) {
373  if (Sandesh::IsLoggingDroppedAllowed(sandesh->type())) {
374  SANDESH_LOG(ERROR, __func__ << " Not Connected : Dropping Message: " <<
375  sandesh->ToString());
376  }
378  Sandesh::UpdateTxMsgFailStats(sandesh->Name(), 0,
379  SandeshTxDropReason::SessionNotConnected);
380  sandesh->Release();
381  return true;
382  }
383  if (sandesh->IsLoggingAllowed()) {
384  sandesh->Log();
385  }
386  bool more = !send_queue_->IsQueueEmpty();
387  if (stats_client_ && sandesh->type() == SandeshType::UVE) {
388  stats_client_->SendMsg(sandesh);
389  }
390  writer_->SendMsg(sandesh, more);
391  return true;
392 }
393 
394 bool SandeshSession::SendBuffer(boost::shared_ptr<TMemoryBuffer> sbuffer) {
395  tbb::mutex::scoped_lock lock(send_mutex_);
396  if (!IsEstablished()) {
398  return true;
399  }
400  // No buffer packing supported currently
401  writer_->SendBuffer(sbuffer);
402  return true;
403 }
404 
405 bool SandeshSession::EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len) {
406  boost::shared_ptr<TMemoryBuffer> sbuffer(new TMemoryBuffer(buf_len));
407  u_int8_t *write_buf = sbuffer->getWritePtr(buf_len);
408  memcpy(write_buf, buf, buf_len);
409  sbuffer->wroteBytes(buf_len);
410  return send_buffer_queue()->Enqueue(sbuffer);
411 }
412 
414  const SandeshHeader& header,
415  const string& sandesh_name, const uint32_t& header_offset) {
416  namespace sandesh_prot = contrail::sandesh::protocol;
417  namespace sandesh_trans = contrail::sandesh::transport;
418 
419  assert(header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT);
420 
421  // Create and process the sandesh
422  Sandesh *sandesh = SandeshBaseFactory::CreateInstance(sandesh_name);
423  if (sandesh == NULL) {
424  SANDESH_LOG(ERROR, __func__ << ": Unknown sandesh ctrl message: " << sandesh_name);
425  return NULL;
426  }
427  boost::shared_ptr<sandesh_trans::TMemoryBuffer> btrans =
428  boost::shared_ptr<sandesh_trans::TMemoryBuffer>(
429  new sandesh_trans::TMemoryBuffer((uint8_t *)msg.c_str() + header_offset,
430  msg.size() - header_offset));
431  boost::shared_ptr<sandesh_prot::TXMLProtocol> prot =
432  boost::shared_ptr<sandesh_prot::TXMLProtocol>(new sandesh_prot::TXMLProtocol(btrans));
433  int32_t xfer = sandesh->Read(prot);
434  if (xfer < 0) {
435  SANDESH_LOG(ERROR, __func__ << ": Decoding " << sandesh_name << " for ctrl FAILED");
436  sandesh->Release();
437  return NULL;
438  } else {
439  return sandesh;
440  }
441 }
442 
444  if (IsClosed()) {
445  return;
446  }
447  tbb::mutex::scoped_lock lock(conn_mutex_);
448  if (connection_) {
451  } else {
453  if (eobs) {
454  eobs(this, TcpSession::CLOSE);
455  }
456  }
457 }
458 
459 //
460 // SandeshReader
461 //
463  buf_(""),
464  offset_(0),
465  msg_length_(-1),
466  session_(session) {
467  buf_.reserve(kDefaultRecvSize);
468 }
469 
471 }
472 
473 int SandeshReader::ExtractMsgHeader(const std::string& msg,
474  SandeshHeader& header, std::string& msg_type, uint32_t& header_offset) {
475  int32_t xfer = 0, ret;
476  boost::shared_ptr<TMemoryBuffer> btrans =
477  boost::shared_ptr<TMemoryBuffer>(
478  new TMemoryBuffer((uint8_t *)msg.c_str(), msg.size()));
479  boost::shared_ptr<TXMLProtocol> prot =
480  boost::shared_ptr<TXMLProtocol>(new TXMLProtocol(btrans));
481  // Read the sandesh header and note the offset
482  if ((ret = header.read(prot)) <= 0) {
483  SANDESH_LOG(ERROR, __func__ << ": Sandesh header read FAILED: " << msg);
484  return EINVAL;
485  }
486  xfer += ret;
487  header_offset = xfer;
488  // Extract the message name
489  if ((ret = prot->readSandeshBegin(msg_type)) <= 0) {
490  SANDESH_LOG(ERROR, __func__ << ": Sandesh begin read FAILED: " << msg);
491  return EINVAL;
492  }
493  xfer += ret;
494  return 0;
495 }
496 
497 void SandeshReader::SetBuf(const std::string &str) {
498  if (buf_.empty()) {
499  ReplaceBuf(str);
500  } else {
501  buf_ += str;
502  }
503  // TODO handle buf_ > kMaxMessageSize
504 }
505 
506 void SandeshReader::ReplaceBuf(const std::string &str) {
507  buf_ = str;
509  offset_ = 0;
510 }
511 
513  if (buf_.empty()) {
514  return false;
515  }
516  return (buf_.size() != offset_);
517 }
518 
519 // Returns false if not able to extract the message length, true otherwise
520 bool SandeshReader::ExtractMsgLength(size_t &msg_length, int *result) {
521  // Have we read enough to extract the message length?
522  if (buf_.size() - offset_ < SandeshWriter::sandesh_open_.size()) {
523  return false;
524  }
525  // Some sanity check
526  if (!boost::algorithm::starts_with(buf_.c_str() + offset_,
528  *result = -1;
529  return false;
530  }
531 
532  std::string::const_iterator end = buf_.begin() + offset_ +
533  SandeshWriter::sandesh_open_.size() - 1;
534  if (*end != '>') {
535  *result = -2;
536  return false;
537  }
538 
539  std::string::const_iterator st = buf_.begin() + offset_ +
541  // Adjust for double quote
542  --end;
543  string length = string(st, end);
544 
545  stringToInteger(length.c_str(), msg_length);
546  if (msg_length == 0) {
547  *result = -3;
548  return false;
549  }
550  return true;
551 }
552 
553 // Returns false if not able to extract the full message, true otherwise
554 bool SandeshReader::ExtractMsg(Buffer buffer, int *result, bool NewBuf) {
555  if (NewBuf) {
556  const uint8_t *cp = TcpSession::BufferData(buffer);
557  // TODO Avoid this copy
558  std::string str(cp, cp + TcpSession::BufferSize(buffer));
559  SetBuf(str);
560  }
561  // Extract the message length
562  if (!MsgLengthKnown()) {
563  size_t msg_length = 0;
564  bool done = ExtractMsgLength(msg_length, result);
565  if (done == false) {
566  return false;
567  }
568  set_msg_length(msg_length);
569  }
570  // Check if the entire message is read or not
571  if (buf_.size() < msg_length()) {
572  return false;
573  }
574  return true;
575 }
576 
578  tbb::mutex::scoped_lock lock(cb_mutex_);
579  // Check if session is being deleted, then drop the packet
580  if (cb_.empty()) {
581  SANDESH_LOG(ERROR, __func__ <<
582  " Session being deleted: Dropping Message");
584  session_->ReleaseBuffer(buffer);
585  return;
586  }
587  int result = 0;
588  bool done = ExtractMsg(buffer, &result, true);
589  do {
590  if (result < 0) {
591  // Generate error and close connection
592  SANDESH_LOG(ERROR, __func__ << " Message extract failed: " << result);
593  const uint8_t *cp = TcpSession::BufferData(buffer);
594  size_t cp_size = TcpSession::BufferSize(buffer);
595  SANDESH_LOG(ERROR, __func__ << " OnRead Buffer Size: " << cp_size);
596  SANDESH_LOG(ERROR, __func__ << " OnRead Buffer: ");
597  std::string debug((const char*)cp, cp_size);
598  SANDESH_LOG(ERROR, debug);
599  SANDESH_LOG(ERROR, __func__ << " Reader Size: " << buf_.size());
600  SANDESH_LOG(ERROR, __func__ << " Reader Offset: " << offset_);
601  SANDESH_LOG(ERROR, __func__ << " Reader Buffer: " << buf_);
602  buf_.clear();
603  offset_ = 0;
604  // Enqueue a close on the state machine
607  break;
608  }
609  if (done == true) {
610  // We got good match. Process the message after extracting out
611  // the sandesh open and close envelope
612  std::string::const_iterator st = buf_.begin() + offset_ +
614  std::string::const_iterator end = buf_.begin() + offset_ +
616  std::string xml(st, end);
617  offset_ += msg_length();
619  if (!cb_(xml, session_)) {
620  // Enqueue a close on the state machine
623  break;
624  }
625  } else {
626  // Read more data.
627  break;
628  }
629 
630  if (LeftOver()) {
631  ReplaceBuf(string(buf_, offset_, buf_.size() - offset_));
632  done = ExtractMsg(buffer, &result, false);
633  } else {
634  // No more data in the Buffer
635  buf_.clear();
636  offset_ = 0;
637  break;
638  }
639  } while (true);
640 
641  session_->ReleaseBuffer(buffer);
642  return;
643 }
644 
646  tbb::mutex::scoped_lock lock(cb_mutex_);
647  cb_ = cb;
648 }
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
void SetSendingLevel(size_t count, SandeshLevel::type level)
SandeshLevel::type level() const
Definition: p/sandesh.h:318
tbb::mutex send_mutex_
virtual void OnRead(Buffer buffer)
tbb::mutex send_mutex_
boost::asio::const_buffer Buffer
Definition: tcp_session.h:64
virtual void OnRead(Buffer buffer)
boost::scoped_ptr< SandeshWriter > writer_
static const int kDefaultRecvSize
std::string scope() const
Definition: p/sandesh.h:313
void append_send_buf(uint8_t *buf, size_t len)
void set_msg_length(size_t length)
static Sandesh * CreateInstance(std::string const &s)
Definition: p/sandesh.h:623
void SendMsgAll(boost::shared_ptr< TMemoryBuffer >)
void set_send_buf(uint8_t *buf, size_t len)
virtual std::string ToString() const
Definition: tcp_session.h:83
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:116
bool stringToInteger(const std::string &str, NumberType &num)
Definition: string_util.h:71
SandeshSession * session_
uint8_t * send_buf_
size_t msg_length()
void SendMsgMore(boost::shared_ptr< TMemoryBuffer >)
virtual void Log() const =0
Sandesh * snh_
Definition: p/sandesh.h:454
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
Definition: tcp_session.cc:802
virtual void Shutdown()
SandeshReceiveMsgCb cb_
virtual const char * Name() const
Definition: p/sandesh.h:277
boost::scoped_ptr< Sandesh::SandeshBufferQueue > send_buffer_queue_
void SendInternal(boost::shared_ptr< TMemoryBuffer >)
static int ExtractMsgHeader(const std::string &msg, SandeshHeader &header, std::string &msg_type, uint32_t &header_offset)
#define SANDESH_LOG(_Level, _Msg)
Definition: p/sandesh.h:474
SandeshSession(SslServer *client, SslSocket *socket, int task_instance, int writer_task_id, int reader_task_id)
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:428
bool ExtractMsgLength(size_t &msg_length, int *result)
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
Definition: ssl_session.h:18
boost::asio::const_buffer Buffer
boost::scoped_ptr< Sandesh::SandeshQueue > send_queue_
SandeshLevel::type SendingLevel() const
void WriteReady(const boost::system::error_code &ec)
static void UpdateTxMsgStats(const std::string &msg_name, uint64_t bytes)
Definition: sandesh.cc:884
tbb::mutex conn_mutex_
virtual ~SandeshReader()
static std::string node_type()
Definition: p/sandesh.h:295
static const std::string sandesh_open_
Sandesh::SandeshBufferQueue * send_buffer_queue()
void increment_send_buffer_fail()
boost::scoped_ptr< SandeshReader > reader_
void increment_send_msg()
void ResetSendQueueWaterMark()
void SetSendQueueWaterMark(Sandesh::QueueWaterMarkInfo &wm_info)
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
virtual boost::system::error_code SetSocketOptions()
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
Definition: sandesh.cc:890
uint8_t type
Definition: load_balance.h:109
SandeshLevel::type sending_level_
StatsClient * stats_client_
static SandeshRole::type role()
Definition: p/sandesh.h:296
static std::string module()
Definition: p/sandesh.h:291
tbb::mutex cb_mutex_
void increment_send_msg_fail()
SandeshType::type type() const
Definition: p/sandesh.h:314
uint8_t * send_buf() const
std::string category() const
Definition: p/sandesh.h:320
virtual std::string ToString() const
std::string context() const
Definition: p/sandesh.h:311
static const std::string sandesh_close_
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
void increment_write_ready_cb_error()
Sandesh::SandeshQueue * send_queue()
bool IsLoggingAllowed() const
Definition: sandesh.cc:835
SandeshWriter(SandeshSession *session)
bool SendBuffer(boost::shared_ptr< TMemoryBuffer > sbuffer)
void SetBuf(const std::string &str)
virtual const int32_t versionsig() const =0
void reset_send_buf()
void MayBeStartRunner()
Definition: queue_task.h:281
virtual void Release()
Definition: p/sandesh.h:266
SandeshSession * session_
time_t timestamp() const
Definition: p/sandesh.h:309
static const uint32_t kEncodeBufferSize
void SendMsg(Sandesh *sandesh, bool more)
virtual bool SendMsg(Sandesh *sandesh)=0
bool IsClosed() const
Definition: tcp_session.h:125
virtual bool EnqueueBuffer(u_int8_t *buf, u_int32_t buf_len)
virtual const uint32_t seqnum()
Definition: p/sandesh.h:275
static const std::string sandesh_open_attr_length_
bool ExtractMsg(Buffer buffer, int *result, bool NewBuf)
static const unsigned int kDefaultSendSize
#define sXML_SANDESH_CLOSE
int32_t hints() const
Definition: p/sandesh.h:316
bool LeftOver() const
virtual void ReleaseBuffer(Buffer buffer)
Definition: tcp_session.cc:143
static bool IsLoggingDroppedAllowed(SandeshType::type)
Definition: sandesh.cc:844
SandeshStateMachine * state_machine() const
std::string buf_
virtual std::string ToString() const =0
#define sXML_SANDESH_OPEN
SandeshConnection * connection_
SandeshReader(SandeshSession *session)
virtual int32_t Write(boost::shared_ptr< contrail::sandesh::protocol::TProtocol > oprot) const =0
void increment_recv_fail()
static std::string source()
Definition: p/sandesh.h:289
virtual ~SandeshSession()
void ReplaceBuf(const std::string &str)
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
Definition: p/sandesh.h:147
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:113
boost::function< bool(const std::string &, SandeshSession *)> SandeshReceiveMsgCb
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
boost::function< void(TcpSession *, Event)> EventObserver
Definition: tcp_session.h:63
#define sXML_SANDESH_OPEN_ATTR_LENGTH
virtual void EnqueueClose()
EventObserver observer()
Definition: tcp_session.h:214
static bool IsSendQueueEnabled()
Definition: p/sandesh.h:239
void reset_msg_length()
size_t send_buf_offset()
virtual boost::system::error_code SetSocketOptions()
Definition: tcp_session.cc:868
bool SendMsg(SandeshElement element)
static std::string instance_id()
Definition: p/sandesh.h:293
bool IsEstablished() const
Definition: tcp_session.h:120