OpenSDN source code
sandesh.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // sandesh.cc
7 //
8 // Sandesh Implementation
9 //
10 
11 #include <boost/filesystem.hpp>
12 #include <boost/bind/bind.hpp>
13 #include <boost/foreach.hpp>
14 #include <boost/format.hpp>
15 #include <boost/algorithm/string/predicate.hpp>
16 
17 #include <base/logging.h>
18 #include <base/parse_object.h>
19 #include <base/queue_task.h>
20 #include <base/address_util.h>
21 #include <http/http_session.h>
22 #include <io/tcp_session.h>
23 
24 #include <sandesh/transport/TBufferTransports.h>
25 #include <sandesh/transport/TSimpleFileTransport.h>
26 #include <sandesh/protocol/TBinaryProtocol.h>
27 #include <sandesh/protocol/TProtocol.h>
28 
29 #include <sandesh/sandesh_types.h>
30 #include <sandesh/sandesh.h>
31 #include <sandesh/sandesh_trace.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include "sandesh_statistics.h"
34 #include "sandesh_uve.h"
35 #include "sandesh_session.h"
36 #include "sandesh_http.h"
37 #include "sandesh_client.h"
38 #include "sandesh_connection.h"
39 #include "sandesh_state_machine.h"
40 
41 #include <log4cplus/helpers/pointer.h>
42 #include <log4cplus/configurator.h>
43 #include <log4cplus/fileappender.h>
44 #include <log4cplus/syslogappender.h>
45 
46 using boost::asio::ip::tcp;
47 using boost::asio::ip::address;
48 
49 using namespace contrail::sandesh::protocol;
50 using namespace contrail::sandesh::transport;
51 using namespace log4cplus;
52 using namespace boost::placeholders;
53 
54 // Statics
55 Sandesh::SandeshRole::type Sandesh::role_ = SandeshRole::Invalid;
56 bool Sandesh::enable_local_log_ = false;
57 bool Sandesh::enable_flow_log_ = false;
59 int Sandesh::http_port_ = 0;
60 bool Sandesh::enable_trace_print_ = false;
67 bool Sandesh::slo_to_collector_ = false;
69 bool Sandesh::slo_to_logger_ = false;
70 bool Sandesh::sampled_to_logger_ = false;
73 std::unique_ptr<Sandesh::SandeshRxQueue> Sandesh::recv_queue_;
74 std::string Sandesh::module_;
75 std::string Sandesh::source_;
76 std::string Sandesh::node_type_;
77 std::string Sandesh::instance_id_;
78 int Sandesh::recv_task_id_ = -1;
83  getenv("SANDSH_UT_DEBUG") ? SandeshLevel::SYS_DEBUG : SandeshLevel::UT_DEBUG;
84 std::string Sandesh::logging_category_;
87 std::mutex Sandesh::stats_mutex_;
88 log4cplus::Logger Sandesh::logger_ =
89  log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("SANDESH"));
90 log4cplus::Logger Sandesh::slo_logger_ =
91  log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("SLO_SESSION"));
92 log4cplus::Logger Sandesh::sampled_logger_ =
93  log4cplus::Logger::getInstance(LOG4CPLUS_TEXT("SAMPLED_SESSION"));
94 
96 std::atomic<uint32_t> Sandesh::sandesh_send_ratelimit_;
97 
98 const char *loggingPattern = "%D{%Y-%m-%d %a %H:%M:%S:%Q %Z} "
99  " %h [Thread %t, Pid %i]: %m%n";
100 
102  switch (role) {
103  case SandeshRole::Generator:
104  return "Generator";
105  case SandeshRole::Collector:
106  return "Collector";
107  case SandeshRole::Test:
108  return "Test";
109  case SandeshRole::Invalid:
110  return "Invalid";
111  default:
112  return "Unknown";
113  }
114 }
115 
116 void Sandesh::InitReceive(int recv_task_inst) {
117  assert(recv_task_id_ == -1);
119  recv_task_id_ = scheduler->GetTaskId("sandesh::RecvQueue");
120  recv_queue_.reset(new SandeshRxQueue(recv_task_id_, recv_task_inst,
122 }
123 
125  const SandeshConfig &config, bool periodicuve) {
126  connect_to_collector_ = true;
127  SANDESH_LOG(INFO, "SANDESH: CONNECT TO COLLECTOR: " <<
128  connect_to_collector_);
129  // Create and initialize the client
130  assert(client_ == NULL);
131  std::vector<Endpoint> collector_endpoints = boost::assign::list_of(server);
132  client_ = new SandeshClient(evm, collector_endpoints, config,
133  periodicuve);
134  client_->Initiate();
135 }
136 
137 static int32_t SandeshHttpCallback(SandeshRequest *rsnh) {
138  return rsnh->Enqueue(Sandesh::recv_queue());
139 }
140 
141 extern int PullSandeshGenStatsReq;
142 extern int PullSandeshUVE;
143 extern int PullSandeshTraceReq;
144 
146  const std::string &module,
147  const std::string &source,
148  const std::string &node_type,
149  const std::string &instance_id,
150  EventManager *evm,
151  unsigned short http_port,
152  SandeshContext *client_context,
153  const SandeshConfig &config) {
155  PullSandeshUVE = 1;
157 
158  if (role_ != SandeshRole::Invalid || role == SandeshRole::Invalid) {
159  return true;
160  }
161 
162  SANDESH_LOG(INFO, "SANDESH: ROLE : " << SandeshRoleToString(role));
163  SANDESH_LOG(INFO, "SANDESH: MODULE : " << module);
164  SANDESH_LOG(INFO, "SANDESH: SOURCE : " << source);
165  SANDESH_LOG(INFO, "SANDESH: NODE TYPE : " << node_type);
166  SANDESH_LOG(INFO, "SANDESH: INSTANCE ID : " << instance_id);
167  SANDESH_LOG(INFO, "SANDESH: HTTP SERVER PORT : " << http_port);
168 
169  role_ = role;
170  module_ = module;
171  source_ = source;
172  node_type_ = node_type;
173  instance_id_ = instance_id;
174  client_context_ = client_context;
175  config_ = config;
176  event_manager_ = evm;
177 
178  set_send_rate_limit(config.system_logs_rate_limit);
179  DisableSendingObjectLogs(config.disable_object_logs);
180  InitReceive(Task::kTaskInstanceAny);
181  bool success(SandeshHttp::Init(evm, module, http_port,
182  &SandeshHttpCallback, &http_port_, config_));
183  if (!success) {
184  SANDESH_LOG(ERROR, "SANDESH: HTTP INIT FAILED (PORT " <<
185  http_port << ")");
186  return false;
187  }
188  RecordPort("http", module_, http_port_);
189  return true;
190 }
191 
192 void Sandesh::RecordPort(const std::string& name, const std::string& module, unsigned short port) {
193  int fd;
194  std::ostringstream myfifoss;
195  myfifoss << "/tmp/" << module << "." << getppid() << "." << name << "_port";
196  std::string myfifo = myfifoss.str();
197  std::ostringstream hss;
198  hss << port << "\n";
199  std::string hstr = hss.str();
200 
201  fd = open(myfifo.c_str(), O_WRONLY | O_NONBLOCK);
202  if (fd != -1) {
203  SANDESH_LOG(INFO, "SANDESH: Write " << name << "_port " << port <<
204  "TO : " << myfifo);
205  write(fd, hstr.c_str(), hstr.length());
206  close(fd);
207  } else {
208  SANDESH_LOG(INFO, "SANDESH: NOT Writing " << name << "_port " << port <<
209  "TO : " << myfifo);
210  }
211 }
212 
213 bool Sandesh::ConnectToCollector(const std::string &collector_ip,
214  int collector_port, bool periodicuve) {
215  boost::system::error_code ec;
216  address collector_addr = AddressFromString(collector_ip, &ec);
217  if (ec) {
218  SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " <<
219  collector_ip << " Error: " << ec);
220  return false;
221  }
222 
223  SANDESH_LOG(INFO, "SANDESH: COLLECTOR : " << collector_ip);
224  SANDESH_LOG(INFO, "SANDESH: COLLECTOR PORT : " << collector_port);
225 
226  tcp::endpoint collector(collector_addr, collector_port);
227  InitClient(event_manager_, collector, Sandesh::config(), periodicuve);
228  return true;
229 }
230 
231 void Sandesh::ReConfigCollectors(const std::vector<std::string>& collector_list) {
232  if (client_) {
233  client_->ReConfigCollectors(collector_list);
234  }
235 }
236 
238  const std::vector<std::string> &collectors,
239  const SandeshConfig &config) {
240  connect_to_collector_ = true;
241  SANDESH_LOG(INFO, "SANDESH: CONNECT TO COLLECTOR: " <<
242  connect_to_collector_);
243  std::vector<Endpoint> collector_endpoints;
244  BOOST_FOREACH(const std::string &collector, collectors) {
245  Endpoint ep;
246  if (!MakeEndpoint(&ep, collector)) {
247  SANDESH_LOG(ERROR, __func__ << ": Invalid collector address: " <<
248  collector);
249  return false;
250  }
251  collector_endpoints.push_back(ep);
252  }
253  client_ = new SandeshClient(evm, collector_endpoints, config, true);
254  client_->Initiate();
255  return true;
256 }
257 
258 bool Sandesh::InitGenerator(const std::string &module,
259  const std::string &source,
260  const std::string &node_type,
261  const std::string &instance_id,
262  EventManager *evm,
263  unsigned short http_port,
264  SandeshContext *client_context,
265  DerivedStats ds,
266  const SandeshConfig &config) {
268  return Initialize(SandeshRole::Generator, module, source, node_type,
269  instance_id, evm, http_port, client_context, config);
270 }
271 
272 bool Sandesh::InitGenerator(const std::string &module,
273  const std::string &source,
274  const std::string &node_type,
275  const std::string &instance_id,
276  EventManager *evm,
277  unsigned short http_port,
278  const std::vector<std::string> &collectors,
279  SandeshContext *client_context,
280  DerivedStats ds,
281  const SandeshConfig &config) {
283  bool success(Initialize(SandeshRole::Generator, module, source, node_type,
284  instance_id, evm, http_port, client_context,
285  config));
286  if (!success) {
287  return false;
288  }
289  return InitClient(evm, collectors, config);
290 }
291 
292 // Collector
293 bool Sandesh::InitCollector(const std::string &module,
294  const std::string &source,
295  const std::string &node_type,
296  const std::string &instance_id,
297  EventManager *evm,
298  const std::string &collector_ip, int collector_port,
299  unsigned short http_port,
300  SandeshContext *client_context,
301  const SandeshConfig &config) {
302  bool success(Initialize(SandeshRole::Collector, module, source, node_type,
303  instance_id, evm, http_port, client_context,
304  config));
305  if (!success) {
306  return false;
307  }
308  return ConnectToCollector(collector_ip, collector_port, true);
309 }
310 
311 bool Sandesh::InitGeneratorTest(const std::string &module,
312  const std::string &source,
313  const std::string &node_type,
314  const std::string &instance_id,
315  EventManager *evm,
316  unsigned short http_port,
317  SandeshContext *client_context,
318  const SandeshConfig &config) {
319  return Initialize(SandeshRole::Test, module, source, node_type,
320  instance_id, evm, http_port, client_context, config);
321 }
322 
323 static void WaitForIdle() {
324  static const int kTimeout = 15;
326 
327  for (int i = 0; i < (kTimeout * 1000); i++) {
328  if (scheduler->IsEmpty()) {
329  break;
330  }
331  usleep(1000);
332  }
333 }
334 
335 void Sandesh::SetDscpValue(uint8_t value) {
336  SandeshClient *client = Sandesh::client();
337  if (client) {
338  client->SetDscpValue(value);
339  }
340 }
341 
343 
344  // Wait until all pending http session based tasks are cleaned up.
345  long count = 60000;
346  while (count--) {
347  if (HttpSession::GetPendingTaskCount() == 0) break;
348  usleep(1000);
349  }
351  role_ = SandeshRole::Invalid;
352  if (recv_queue_.get() != NULL) {
353  recv_queue_->Shutdown();
354  recv_queue_.reset(NULL);
355  assert(recv_task_id_ != -1);
356  recv_task_id_ = -1;
357  } else {
358  assert(recv_task_id_ == -1);
359  }
360  if (client_ != NULL) {
361  client_->Shutdown();
362  while (client()->IsSession()) usleep(100);
363  WaitForIdle();
364  client_->ClearSessions();
366  client_ = NULL;
367  }
368 }
369 
370 void Sandesh::SetLoggingParams(bool enable_local_log, std::string category,
371  std::string level, bool enable_trace_print, bool enable_flow_log,
372  bool enable_session_syslog) {
373  SetLocalLogging(enable_local_log);
374  SetLoggingCategory(category);
375  SetLoggingLevel(level);
376  SetTracePrint(enable_trace_print);
377  SetFlowLogging(enable_flow_log);
378  SetSessionSyslogging(enable_session_syslog);
379 }
380 
381 void Sandesh::SetLoggingParams(bool enable_local_log, std::string category,
382  SandeshLevel::type level, bool enable_trace_print,
383  bool enable_flow_log) {
384  SetLocalLogging(enable_local_log);
385  SetLoggingCategory(category);
386  SetLoggingLevel(level);
387  SetTracePrint(enable_trace_print);
388  SetFlowLogging(enable_flow_log);
389 }
390 
391 void Sandesh::SetLoggingLevel(std::string level) {
392  SandeshLevel::type nlevel = StringToLevel(level);
393  SetLoggingLevel(nlevel);
394 }
395 
396 log4cplus::LogLevel SandeshLevelTolog4Level(
397  SandeshLevel::type slevel) {
398  switch (slevel) {
399  case SandeshLevel::SYS_EMERG:
400  case SandeshLevel::SYS_ALERT:
401  case SandeshLevel::SYS_CRIT:
402  return log4cplus::FATAL_LOG_LEVEL;
403  case SandeshLevel::SYS_ERR:
404  return log4cplus::ERROR_LOG_LEVEL;
405  case SandeshLevel::SYS_WARN:
406  case SandeshLevel::SYS_NOTICE:
407  return log4cplus::WARN_LOG_LEVEL;
408  case SandeshLevel::SYS_INFO:
409  return log4cplus::INFO_LOG_LEVEL;
410  case SandeshLevel::SYS_DEBUG:
411  return log4cplus::DEBUG_LOG_LEVEL;
412  default:
414  return log4cplus::ALL_LOG_LEVEL;
415  }
416 }
417 
419  log4cplus::LogLevel log4_new_level(SandeshLevelTolog4Level(level));
420  log4cplus::LogLevel log4_old_level(logger_.getLogLevel());
421  if (logging_level_ != level ||
422  log4_old_level != log4_new_level) {
423  const log4cplus::LogLevelManager &log4level_manager(
424  log4cplus::getLogLevelManager());
425  SANDESH_LOG(INFO, "SANDESH: Logging: LEVEL: " << "[ " <<
426  LevelToString(logging_level_) << " ] -> [ " <<
427  LevelToString(level) << " ] log4level: [ " <<
428  log4level_manager.toString(log4_old_level) <<
429  " ] -> [ " <<
430  log4level_manager.toString(log4_new_level) <<
431  " ]");
432  logging_level_ = level;
433  logger_.setLogLevel(log4_new_level);
434  // Set the LogLevel on rootLogger
435  ::SetLoggingLevel(log4_new_level);
436  }
437 }
438 
439 void Sandesh::SetLoggingCategory(std::string category) {
440  if (logging_category_ != category) {
441  SANDESH_LOG(INFO, "SANDESH: Logging: CATEGORY: " <<
442  (logging_category_.empty() ? "*" : logging_category_) << " -> " <<
443  (category.empty() ? "*" : category));
444  logging_category_ = category;
445  }
446 }
447 
448 void Sandesh::SetLocalLogging(bool enable_local_log) {
449  if (enable_local_log_ != enable_local_log) {
450  SANDESH_LOG(INFO, "SANDESH: Logging: " <<
451  (enable_local_log_ ? "ENABLED" : "DISABLED") << " -> " <<
452  (enable_local_log ? "ENABLED" : "DISABLED"));
453  enable_local_log_ = enable_local_log;
454  }
455 }
456 
457 void Sandesh::SetTracePrint(bool enable_trace_print) {
458  if (enable_trace_print_ != enable_trace_print) {
459  SANDESH_LOG(INFO, "SANDESH: Trace: PRINT: " <<
460  (enable_trace_print_ ? "ENABLED" : "DISABLED") << " -> " <<
461  (enable_trace_print ? "ENABLED" : "DISABLED"));
462  enable_trace_print_ = enable_trace_print;
463  }
464 }
465 
466 void Sandesh::SetFlowLogging(bool enable_flow_log) {
467  if (enable_flow_log_ != enable_flow_log) {
468  SANDESH_LOG(INFO, "SANDESH: Flow Logging: " <<
469  (enable_flow_log_ ? "ENABLED" : "DISABLED") << " -> " <<
470  (enable_flow_log ? "ENABLED" : "DISABLED"));
471  enable_flow_log_ = enable_flow_log;
472  }
473 }
474 
475 void Sandesh::SetSessionSyslogging(bool enable_session_syslog) {
476  if (enable_session_syslog_ != enable_session_syslog) {
477  SANDESH_LOG(INFO, "SANDESH: Flow Logging: " <<
478  (enable_session_syslog_ ? "ENABLED" : "DISABLED") << " -> " <<
479  (enable_session_syslog ? "ENABLED" : "DISABLED"));
480  enable_session_syslog_ = enable_session_syslog;
481  }
482 
483 }
484 
485 void Sandesh::DisableFlowCollection(bool disable) {
486  if (disable_flow_collection_ != disable) {
487  SANDESH_LOG(INFO, "SANDESH: Disable Flow Collection: " <<
488  disable_flow_collection_ << " -> " << disable);
489  disable_flow_collection_ = disable;
490  }
491 }
492 
494  if (disable_sending_all_ != disable) {
495  SANDESH_LOG(INFO, "SANDESH: Disable Sending ALL Messages: " <<
496  disable_sending_all_ << " -> " << disable);
497  disable_sending_all_ = disable;
498  }
499 }
500 
502  return disable_sending_all_;
503 }
504 
506  if (disable_sending_object_logs_ != disable) {
507  SANDESH_LOG(INFO, "SANDESH: Disable Sending Object Logs: " <<
508  disable_sending_object_logs_ << " -> " << disable);
509  disable_sending_object_logs_ = disable;
510  }
511 }
512 
514  return disable_sending_object_logs_;
515 }
516 
517 void Sandesh::DisableSendingFlows(bool disable) {
518  if (disable_sending_flows_ != disable) {
519  SANDESH_LOG(INFO, "SANDESH: Disable Sending Flows: " <<
520  disable_sending_flows_ << " -> " << disable);
521  disable_sending_flows_ = disable;
522  }
523 }
524 
526  return disable_sending_flows_;
527 }
528 
530  return sandesh_send_ratelimit_ == 0;
531 }
532 
533 void Sandesh::set_send_rate_limit(int rate_limit) {
534  if (rate_limit >= 0) {
535  SANDESH_LOG(INFO, "SANDESH: System Log Send Rate Limit: " <<
536  sandesh_send_ratelimit_ << " -> " << rate_limit);
537  sandesh_send_ratelimit_ = rate_limit;
538  }
539 }
540 
542  return sandesh_send_ratelimit_;
543 }
544 
546  if (!queue) {
547  if (IsLoggingDroppedAllowed(type())) {
548  SANDESH_LOG(ERROR, __func__ << ": SandeshQueue NULL : Dropping Message: "
549  << ToString());
550  }
551  UpdateTxMsgFailStats(name_, 0, SandeshTxDropReason::NoQueue);
552  Release();
553  return false;
554  }
555  //Frame an elemet object and enqueue it
556  SandeshElement elem(this);
557  if (!queue->Enqueue(elem)) {
558  // XXX Change when WorkQueue implements bounded queues
559  return true;
560  }
561  return true;
562 }
563 
565  rsnh->HandleRequest();
566  rsnh->Release();
567  return true;
568 }
569 
570 void SandeshRequest::Release() { self_.reset(); }
571 
572 int32_t Sandesh::WriteBinary(u_int8_t *buf, u_int32_t buf_len,
573  int *error) {
574  int32_t xfer;
575  boost::shared_ptr<TMemoryBuffer> btrans =
576  boost::shared_ptr<TMemoryBuffer>(
577  new TMemoryBuffer(buf, buf_len));
578  btrans->setWriteBuffer(buf, buf_len);
579  boost::shared_ptr<TBinaryProtocol> prot =
580  boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(btrans));
581  xfer = Write(prot);
582  if (xfer < 0) {
583  SANDESH_LOG(DEBUG, __func__ << "Write sandesh to " << buf_len <<
584  " bytes FAILED" << std::endl);
585  *error = EINVAL;
586  return xfer;
587  }
588  return xfer;
589 }
590 
591 int32_t Sandesh::ReadBinary(u_int8_t *buf, u_int32_t buf_len,
592  int *error) {
593  int32_t xfer = 0;
594  boost::shared_ptr<TMemoryBuffer> btrans =
595  boost::shared_ptr<TMemoryBuffer>(
596  new TMemoryBuffer(buf, buf_len));
597  boost::shared_ptr<TBinaryProtocol> prot =
598  boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(btrans));
599  xfer = Read(prot);
600  if (xfer < 0) {
601  SANDESH_LOG(DEBUG, __func__ << "Read sandesh from " << buf_len <<
602  " bytes FAILED" << std::endl);
603  *error = EINVAL;
604  return xfer;
605  }
606  return xfer;
607 }
608 
609 int32_t Sandesh::WriteBinaryToFile(const std::string& path, int *error) {
610  int32_t xfer;
611  boost::shared_ptr<TSimpleFileTransport> btrans =
612  boost::shared_ptr<TSimpleFileTransport>(
613  new TSimpleFileTransport(path, false, true));
614  boost::shared_ptr<TBinaryProtocol> prot =
615  boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(btrans));
616  xfer = Write(prot);
617  if (xfer < 0) {
618  SANDESH_LOG(DEBUG, __func__ << "Write sandesh to file FAILED"
619  << std::endl);
620  *error = EINVAL;
621  return xfer;
622  }
623  return xfer;
624 }
625 
626 int32_t Sandesh::ReadBinaryFromFile(const std::string& path, int *error) {
627  int32_t xfer;
628  boost::shared_ptr<TSimpleFileTransport> btrans =
629  boost::shared_ptr<TSimpleFileTransport>(
630  new TSimpleFileTransport(path));
631  boost::shared_ptr<TBinaryProtocol> prot =
632  boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(btrans));
633  xfer = Read(prot);
634  if (xfer < 0) {
635  SANDESH_LOG(DEBUG, __func__ << "Read sandesh from file FAILED"
636  << std::endl);
637  *error = EINVAL;
638  return xfer;
639  }
640  return xfer;
641 }
642 
643 int32_t Sandesh::ReceiveBinaryMsgOne(u_int8_t *buf, u_int32_t buf_len,
644  int *error, SandeshContext *client_context) {
645  int32_t xfer;
646  std::string sandesh_name;
647  boost::shared_ptr<TMemoryBuffer> btrans =
648  boost::shared_ptr<TMemoryBuffer>(
649  new TMemoryBuffer(buf, buf_len));
650  boost::shared_ptr<TBinaryProtocol> prot =
651  boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(btrans));
652  // Extract sandesh name
653  xfer = prot->readSandeshBegin(sandesh_name);
654  if (xfer < 0) {
655  SANDESH_LOG(DEBUG, __func__ << "Read sandesh begin from " << buf_len <<
656  " bytes FAILED" << std::endl);
657  *error = EINVAL;
658  return xfer;
659  }
660  // Create and process the sandesh
662  if (sandesh == NULL) {
663  SANDESH_LOG(DEBUG, __func__ << " Unknown sandesh:" <<
664  sandesh_name << std::endl);
665  *error = EINVAL;
666  return -1;
667  }
668  // Reinitialize buffer and protocol
669  btrans = boost::shared_ptr<TMemoryBuffer>(
670  new TMemoryBuffer(buf, buf_len));
671  prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(btrans));
672  xfer = sandesh->Read(prot);
673  if (xfer < 0) {
674  SANDESH_LOG(DEBUG, __func__ << " Decoding " << sandesh_name << " FAILED" <<
675  std::endl);
676  *error = EINVAL;
677  return xfer;
678  }
679  SandeshBuffer * bsnh = dynamic_cast<SandeshBuffer *>(sandesh);
680  if (bsnh) bsnh->Process(client_context);
681  sandesh->Release();
682  return xfer;
683 }
684 
685 int32_t Sandesh::ReceiveBinaryMsg(u_int8_t *buf, u_int32_t buf_len,
686  int *error, SandeshContext *client_context) {
687  u_int32_t xfer = 0;
688  int ret;
689  while (xfer < buf_len) {
690  ret = ReceiveBinaryMsgOne(buf + xfer, buf_len - xfer, error,
691  client_context);
692  if (ret < 0) {
693  SANDESH_LOG(DEBUG, __func__ << "Read sandesh from " << buf_len <<
694  " bytes at offset " << xfer << " FAILED (" <<
695  error << ")");
696  return ret;
697  }
698  xfer += ret;
699  }
700  return xfer;
701 }
702 
704  if (!client_) {
705  if (IsLoggingDroppedAllowed(type())) {
706  if (IsConnectToCollectorEnabled()) {
707  SANDESH_LOG(ERROR, "SANDESH: No client: " << ToString());
708  } else {
709  Log();
710  }
711  }
712  UpdateTxMsgFailStats(name_, 0, SandeshTxDropReason::NoClient);
713  Release();
714  return false;
715  }
716  if (!client_->SendSandesh(this)) {
717  if (IsLoggingDroppedAllowed(type())) {
718  SANDESH_LOG(ERROR, "SANDESH: Send FAILED: " << ToString());
719  }
720  UpdateTxMsgFailStats(name_, 0,
721  SandeshTxDropReason::ClientSendFailed);
722  Release();
723  return false;
724  }
725  return true;
726 }
727 
729  // Sandesh client does not have a connection
730  if (sconn) {
731  return sconn->SendSandesh(this);
732  } else {
733  return SendEnqueue();
734  }
735 }
736 
738  assert(sconn == NULL);
739  if ((context().find("http%") == 0) ||
740  (context().find("https%") == 0)) {
741  SandeshHttp::Response(this, context());
742  return true;
743  }
744  if (response_callback_) {
745  response_callback_(this);
746  }
747  return Sandesh::Dispatch(sconn);
748 }
749 
751  assert(sconn == NULL);
752  if ((0 == context().find("http%")) ||
753  (0 == context().find("https%"))) {
754  SandeshHttp::Response(this, context());
755  return true;
756  }
757  return Sandesh::Dispatch();
758 }
759 
761  assert(sconn == NULL);
762  if ((0 == context().find("http%")) ||
763  (0 == context().find("https%"))) {
764  SandeshHttp::Response(this, context());
765  return true;
766  }
767  if (client_) {
768  if (IsSendingAllMessagesDisabled()) {
769  Log();
770  UpdateTxMsgFailStats(Name(), 0,
771  SandeshTxDropReason::SendingDisabled);
772  Release();
773  return false;
774  }
775  // SandeshUVE has an implicit send level of SandeshLevel::SYS_UVE
776  // which is irrespective of the level set by the user in the Send.
777  // This is needed so that the send queue does not grow unbounded.
778  // Once the send queue's sending level reaches SandeshLevel::SYS_UVE
779  // we will reset the connection to the collector to initiate resync
780  // of the UVE cache
781  if (SandeshLevel::SYS_UVE >= SendingLevel()) {
782  client_->CloseSMSession();
783  }
784  if (!client_->SendSandeshUVE(this)) {
785  SANDESH_LOG(ERROR, "SandeshUVE : Send FAILED: " << ToString());
786  UpdateTxMsgFailStats(Name(), 0,
787  SandeshTxDropReason::ClientSendFailed);
788  Release();
789  return false;
790  }
791  return true;
792  }
793  if (IsConnectToCollectorEnabled()) {
794  SANDESH_LOG(ERROR, "SANDESH: No Client: " << ToString());
795  } else {
796  Log();
797  }
798  UpdateTxMsgFailStats(Name(), 0, SandeshTxDropReason::NoClient);
799  Release();
800  return false;
801 }
802 
804  if (!queue) {
805  SANDESH_LOG(ERROR, "SandeshRequest: No RxQueue: " << ToString());
806  UpdateRxMsgFailStats(Name(), 0, SandeshRxDropReason::NoQueue);
807  Release();
808  return false;
809  }
810  if (!queue->Enqueue(this)) {
811  // XXX Change when WorkQueue implements bounded queues
812  return true;
813  }
814  return true;
815 }
816 
818  return level >= SandeshLevel::UT_START &&
819  level <= SandeshLevel::UT_END;
820 }
821 
823  SandeshLevel::type level,
824  const std::string& category) {
825  // Do not log UVEs unless explicitly configured via setting log_level
826  // to INVALID. This is to avoid flooding the log files with UVEs
827  if (type == SandeshType::UVE) {
828  level = SandeshLevel::INVALID;
829  }
830  bool level_allowed = logging_level_ >= level;
831  bool category_allowed = !logging_category_.empty() ?
832  logging_category_ == category : true;
833  return level_allowed && category_allowed;
834 }
835 
837  if (type_ == SandeshType::FLOW || type_ == SandeshType::SESSION) {
838  return enable_flow_log_;
839  } else {
840  return IsLocalLoggingEnabled() &&
841  IsLevelCategoryLoggingAllowed(type_, level_, category_);
842  }
843 }
844 
846  if (type == SandeshType::FLOW || type == SandeshType::SESSION) {
847  return enable_flow_log_;
848  } else {
849  return true;
850  }
851 }
852 
854  std::map<int, const char*>::const_iterator it = _SandeshLevel_VALUES_TO_NAMES.find(level);
855  if (it != _SandeshLevel_VALUES_TO_NAMES.end()) {
856  return it->second;
857  } else {
858  return "UNKNOWN";
859  }
860 }
861 
863  std::map<int, const char*>::const_iterator it = _SandeshLevel_VALUES_TO_NAMES.begin();
864  while (it != _SandeshLevel_VALUES_TO_NAMES.end()) {
865  if (strncmp(level.c_str(), it->second, strlen(it->second)) == 0) {
866  return static_cast<SandeshLevel::type>(it->first);
867  }
868  it++;
869  }
870  return SandeshLevel::INVALID;
871 }
872 
873 void Sandesh::UpdateRxMsgStats(const std::string &msg_name,
874  uint64_t bytes) {
875  std::scoped_lock lock(stats_mutex_);
876  msg_stats_.UpdateRecv(msg_name, bytes);
877 }
878 
879 void Sandesh::UpdateRxMsgFailStats(const std::string &msg_name,
880  uint64_t bytes, SandeshRxDropReason::type dreason) {
881  std::scoped_lock lock(stats_mutex_);
882  msg_stats_.UpdateRecvFailed(msg_name, bytes, dreason);
883 }
884 
885 void Sandesh::UpdateTxMsgStats(const std::string &msg_name,
886  uint64_t bytes) {
887  std::scoped_lock lock(stats_mutex_);
888  msg_stats_.UpdateSend(msg_name, bytes);
889 }
890 
891 void Sandesh::UpdateTxMsgFailStats(const std::string &msg_name,
892  uint64_t bytes, SandeshTxDropReason::type dreason) {
893  std::scoped_lock lock(stats_mutex_);
894  msg_stats_.UpdateSendFailed(msg_name, bytes, dreason);
895 }
896 
898  std::vector<SandeshMessageTypeStats> *mtype_stats,
899  SandeshMessageStats *magg_stats) {
900  std::scoped_lock lock(stats_mutex_);
901  msg_stats_.Get(mtype_stats, magg_stats);
902 }
903 
905  boost::ptr_map<std::string, SandeshMessageTypeStats> *mtype_stats,
906  SandeshMessageStats *magg_stats) {
907  std::scoped_lock lock(stats_mutex_);
908  msg_stats_.Get(mtype_stats, magg_stats);
909 }
910 
911 void Sandesh::SetSendQueue(bool enable) {
912  if (send_queue_enabled_ != enable) {
913  SANDESH_LOG(INFO, "SANDESH: CLIENT: SEND QUEUE: " <<
914  (send_queue_enabled_ ? "ENABLED" : "DISABLED") << " -> " <<
915  (enable ? "ENABLED" : "DISABLED"));
916  send_queue_enabled_ = enable;
917  if (enable) {
918  if (client_ && client_->IsSession()) {
919  client_->session()->send_queue()->MayBeStartRunner();
920  }
921  }
922  }
923 }
924 
925 // In sandesh state machine on collector, we can only drop systemlog,
926 // objectlog, and flow. UVEs can only be dropped after being dequeued from
927 // the state machine and published on redis/kafka
928 bool DoDropSandeshMessage(const SandeshHeader &header,
929  const SandeshLevel::type drop_level) {
930  SandeshType::type stype(header.get_Type());
931  if (stype == SandeshType::SYSTEM ||
932  stype == SandeshType::OBJECT ||
933  stype == SandeshType::FLOW ||
934  stype == SandeshType::SESSION) {
935  // Is level above drop level?
936  SandeshLevel::type slevel(
937  static_cast<SandeshLevel::type>(header.get_Level()));
938  if (slevel >= drop_level) {
939  return true;
940  }
941  }
942  // Drop flow message if flow collection is disabled
944  (stype == SandeshType::FLOW || stype == SandeshType::SESSION)) {
945  return true;
946  }
947  return false;
948 }
949 
950 SandeshContext *Sandesh::module_context(const std::string &module_name) {
951  ModuleContextMap::const_iterator loc = module_context_.find(module_name);
952  if (loc != module_context_.end()) {
953  return loc->second;
954  }
955  return NULL;
956 }
957 
958 void Sandesh::set_module_context(const std::string &module_name,
959  SandeshContext *context) {
960  std::pair<ModuleContextMap::iterator, bool> result =
961  module_context_.insert(std::make_pair(module_name, context));
962  if (!result.second) {
963  result.first->second = context;
964  }
965 }
966 
968  const std::string& category) {
969  // Handle unit test scenario
970  if (IsUnitTest() || IsLevelUT(level)) {
971  return true;
972  }
973  return false;
974 }
975 
977  if (client_) {
978  SandeshSession *sess = client_->session();
979  if (sess) {
980  return sess->SendingLevel();
981  }
982  }
983  return SandeshLevel::INVALID;
984 }
985 
986 template<>
988  SandeshElement *element)
989  {
990  size_t sandesh_size = element->GetSize();
991  return count_.fetch_add(sandesh_size) + sandesh_size;
992 }
993 
994 template<>
996  SandeshElement *element) {
997  size_t sandesh_size = element->GetSize();
998  return count_.fetch_sub(sandesh_size) - sandesh_size;
999 }
1000 
1001 /*
1002  * Add the configured appenders for the sample logger
1003  */
1004 void Sandesh::set_logger_appender(const std::string &file_name, long max_file_size,
1005  int max_backup_index,
1006  const std::string &syslog_facility,
1007  const std::vector<std::string> &destn,
1008  const std::string &ident,
1009  bool is_sampled_logger) {
1010  log4cplus::Logger logger;
1011  if (is_sampled_logger) {
1012  logger = Sandesh::sampled_logger();
1013  } else {
1014  logger = Sandesh::slo_logger();
1015  }
1016  logger.setAdditivity(false);
1017  // Session messages log level is
1018  logger.setLogLevel(SandeshLevel::SYS_NOTICE);
1019  // Local logging for SLO logger
1020  if (std::find(destn.begin(), destn.end(), "file") !=
1021  destn.end()) {
1022  // Append file appender to SLO logger
1023  SharedAppenderPtr fileappender(new RollingFileAppender(file_name,
1024  max_file_size, max_backup_index));
1025  logger.addAppender(fileappender);
1026  if (is_sampled_logger) {
1028  } else {
1029  Sandesh::slo_to_logger_ = true;
1030  }
1031  }
1032  // SYSLOG appender for SLO logger
1033  if (std::find(destn.begin(), destn.end(), "syslog") !=
1034  destn.end()) {
1035  helpers::Properties props;
1036  std::string syslogident = boost::str(
1037  boost::format("%1%[%2%]") % ident % getpid());
1038  props.setProperty(LOG4CPLUS_TEXT("facility"),
1039  boost::starts_with(syslog_facility, "LOG_")
1040  ? syslog_facility.substr(4)
1041  : syslog_facility);
1042  props.setProperty(LOG4CPLUS_TEXT("ident"), syslogident);
1043  props.setProperty(LOG4CPLUS_TEXT("additivity"), "false");
1044  SharedAppenderPtr syslogappender(new SysLogAppender(props));
1045  std::unique_ptr<Layout> syslog_layout_ptr(new PatternLayout(
1046  loggingPattern));
1047  syslogappender->setLayout(std::move(syslog_layout_ptr));
1048  logger.addAppender(syslogappender);
1049  if (is_sampled_logger) {
1051  } else {
1052  Sandesh::slo_to_logger_ = true;
1053  }
1054  }
1055 }
1056 
1058  const std::vector<std::string> &sampled_destn,
1059  const std::vector<std::string> &slo_destn) {
1060  if (std::find(slo_destn.begin(), slo_destn.end(), "collector") !=
1061  slo_destn.end()) {
1063  }
1064  if (std::find(sampled_destn.begin(), sampled_destn.end(), "collector") !=
1065  sampled_destn.end()) {
1067  }
1068 }
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
static Sandesh * CreateInstance(std::string const &s)
Definition: cpp/sandesh.h:625
virtual void Process(SandeshContext *context)=0
void SetDscpValue(uint8_t value)
bool SendSandesh(Sandesh *snh)
static void Response(Sandesh *snh, std::string context)
static bool Init(EventManager *evm, const std::string module, short port, RequestCallbackFn reqcb, int *hport, const SandeshConfig &config=SandeshConfig())
static void Uninit(void)
void Release()
Definition: sandesh.cc:570
virtual void HandleRequest() const =0
bool Enqueue(SandeshRxQueue *queue)
Definition: sandesh.cc:803
bool Dispatch(SandeshConnection *sconn=NULL)
Definition: sandesh.cc:737
SandeshLevel::type SendingLevel() const
bool Dispatch(SandeshConnection *sconn=NULL)
Definition: sandesh.cc:750
static bool InitDerivedStats(const std::map< std::string, ds_conf_elem > &)
Definition: sandesh_uve.cc:25
bool Dispatch(SandeshConnection *sconn=NULL)
Definition: sandesh.cc:760
static bool ConnectToCollector(const std::string &collector_ip, int collector_port, bool periodicuve=false)
Definition: sandesh.cc:213
static void SetLoggingLevel(std::string level)
Definition: sandesh.cc:391
static bool IsSendingSystemLogsDisabled()
Definition: sandesh.cc:529
static void UpdateTxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshTxDropReason::type dreason)
Definition: sandesh.cc:891
static log4cplus::Logger logger_
Definition: cpp/sandesh.h:430
static void set_module_context(const std::string &module_name, SandeshContext *context)
Definition: sandesh.cc:958
static bool disable_sending_all_
Definition: cpp/sandesh.h:435
boost::asio::ip::tcp::endpoint Endpoint
Definition: cpp/sandesh.h:139
static SandeshLevel::type SendingLevel()
Definition: sandesh.cc:976
static bool disable_sending_object_logs_
Definition: cpp/sandesh.h:436
static void InitClient(EventManager *evm, Endpoint server, const SandeshConfig &config, bool periodicuve)
Definition: sandesh.cc:124
static bool slo_to_logger_
Definition: cpp/sandesh.h:451
static SandeshLevel::type logging_ut_level_
Definition: cpp/sandesh.h:422
static bool enable_session_syslog_
Definition: cpp/sandesh.h:420
static std::unique_ptr< SandeshRxQueue > recv_queue_
Definition: cpp/sandesh.h:414
virtual int32_t WriteBinary(u_int8_t *buf, u_int32_t buf_len, int *error)
Definition: sandesh.cc:572
static bool enable_flow_log_
Definition: cpp/sandesh.h:419
static log4cplus::Logger & sampled_logger()
Definition: cpp/sandesh.h:327
static int32_t ReceiveBinaryMsgOne(u_int8_t *buf, u_int32_t buf_len, int *error, SandeshContext *client_context)
Definition: sandesh.cc:643
static EventManager * event_manager_
Definition: cpp/sandesh.h:426
static void DisableSendingFlows(bool disable)
Definition: sandesh.cc:517
static std::mutex stats_mutex_
Definition: cpp/sandesh.h:429
static void ReConfigCollectors(const std::vector< std::string > &collector_list)
Definition: sandesh.cc:231
static SandeshConfig & config()
Definition: cpp/sandesh.h:309
static void UpdateRxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshRxDropReason::type dreason)
Definition: sandesh.cc:879
static void set_send_to_collector_flags(const std::vector< std::string > &sampled_destination, const std::vector< std::string > &slo_destination)
Definition: sandesh.cc:1057
static void GetMsgStats(std::vector< SandeshMessageTypeStats > *mtype_stats, SandeshMessageStats *magg_stats)
Definition: sandesh.cc:897
static bool IsLevelCategoryLoggingAllowed(SandeshType::type type, SandeshLevel::type level, const std::string &category)
Definition: sandesh.cc:822
static void UpdateTxMsgStats(const std::string &msg_name, uint64_t bytes)
Definition: sandesh.cc:885
static void SetLocalLogging(bool enable)
Definition: sandesh.cc:448
static void UpdateRxMsgStats(const std::string &msg_name, uint64_t bytes)
Definition: sandesh.cc:873
static bool InitGenerator(const std::string &module, const std::string &source, const std::string &node_type, const std::string &instance_id, EventManager *evm, unsigned short http_port, const std::vector< std::string > &collectors, SandeshContext *client_context=NULL, DerivedStats ds=DerivedStats(), const SandeshConfig &config=SandeshConfig())
Definition: sandesh.cc:272
static ModuleContextMap module_context_
Definition: cpp/sandesh.h:417
static bool connect_to_collector_
Definition: cpp/sandesh.h:425
bool Enqueue(SandeshQueue *queue)
Definition: sandesh.cc:545
static std::string node_type_
Definition: cpp/sandesh.h:411
static void SetSendQueue(bool enable)
Definition: sandesh.cc:911
static bool IsFlowCollectionDisabled()
Definition: cpp/sandesh.h:201
std::map< std::string, std::map< std::string, std::string > > DerivedStats
Definition: cpp/sandesh.h:150
static bool disable_sending_flows_
Definition: cpp/sandesh.h:437
static SandeshClient * client()
Definition: cpp/sandesh.h:308
static SandeshContext * client_context_
Definition: cpp/sandesh.h:416
static void SetLoggingCategory(std::string category)
Definition: sandesh.cc:439
static void set_logger_appender(const std::string &file_name, long max_file_size, int max_backup_index, const std::string &syslog_facility, const std::vector< std::string > &destn, const std::string &ident, bool is_sampled_logger)
Definition: sandesh.cc:1004
static const char * SandeshRoleToString(SandeshRole::type role)
Definition: sandesh.cc:101
static SandeshConfig config_
Definition: cpp/sandesh.h:434
static bool IsSendingObjectLogsDisabled()
Definition: sandesh.cc:513
static int recv_task_id_
Definition: cpp/sandesh.h:415
static bool IsSendingAllMessagesDisabled()
Definition: sandesh.cc:501
static int http_port_
Definition: cpp/sandesh.h:413
virtual int32_t ReadBinary(u_int8_t *buf, u_int32_t buf_len, int *error)
Definition: sandesh.cc:591
static const char * LevelToString(SandeshLevel::type level)
Definition: sandesh.cc:853
static bool send_queue_enabled_
Definition: cpp/sandesh.h:427
static void set_send_rate_limit(int rate_limit)
Definition: sandesh.cc:533
static uint32_t get_send_rate_limit()
Definition: sandesh.cc:541
static void DisableFlowCollection(bool disable)
Definition: sandesh.cc:485
static log4cplus::Logger slo_logger_
Definition: cpp/sandesh.h:431
static bool HandleTest(SandeshLevel::type level, const std::string &category)
Definition: sandesh.cc:967
static bool ProcessRecv(SandeshRequest *)
Definition: sandesh.cc:564
static bool slo_to_collector_
Definition: cpp/sandesh.h:449
static bool InitCollector(const std::string &module, const std::string &source, const std::string &node_type, const std::string &instance_id, EventManager *evm, const std::string &collector_ip, int collector_port, unsigned short http_port, SandeshContext *client_context=NULL, const SandeshConfig &config=SandeshConfig())
Definition: sandesh.cc:293
static void RecordPort(const std::string &name, const std::string &module, unsigned short port)
Definition: sandesh.cc:192
static bool sampled_to_logger_
Definition: cpp/sandesh.h:452
static void SetDscpValue(uint8_t value)
Definition: sandesh.cc:335
static SandeshMessageStatistics msg_stats_
Definition: cpp/sandesh.h:428
static bool IsLoggingDroppedAllowed(SandeshType::type)
Definition: sandesh.cc:845
static void Uninit()
Definition: sandesh.cc:342
bool IsLoggingAllowed() const
Definition: sandesh.cc:836
static void SetSessionSyslogging(bool enable_session_syslog)
Definition: sandesh.cc:475
boost::function< void(Sandesh *)> SandeshCallback
Definition: cpp/sandesh.h:140
static void DisableSendingAllMessages(bool disable)
Definition: sandesh.cc:493
static SandeshRxQueue * recv_queue()
Definition: cpp/sandesh.h:300
static SandeshRole::type role_
Definition: cpp/sandesh.h:408
static SandeshContext * module_context(const std::string &module_name)
Definition: sandesh.cc:950
static std::string instance_id_
Definition: cpp/sandesh.h:412
static SandeshCallback response_callback_
Definition: cpp/sandesh.h:380
static std::atomic< uint32_t > sandesh_send_ratelimit_
Definition: cpp/sandesh.h:448
static int32_t ReceiveBinaryMsg(u_int8_t *buf, u_int32_t buf_len, int *error, SandeshContext *client_context)
Definition: sandesh.cc:685
static SandeshLevel::type logging_level_
Definition: cpp/sandesh.h:421
static SandeshClient * client_
Definition: cpp/sandesh.h:381
static log4cplus::Logger & slo_logger()
Definition: cpp/sandesh.h:326
static bool enable_trace_print_
Definition: cpp/sandesh.h:424
virtual int32_t WriteBinaryToFile(const std::string &path, int *error)
Definition: sandesh.cc:609
virtual int32_t ReadBinaryFromFile(const std::string &path, int *error)
Definition: sandesh.cc:626
static void SetFlowLogging(bool enable)
Definition: sandesh.cc:466
static bool IsSendingFlowsDisabled()
Definition: sandesh.cc:525
static void SetTracePrint(bool enable)
Definition: sandesh.cc:457
static void InitReceive(int recv_task_inst=-1)
Definition: sandesh.cc:116
static bool Initialize(SandeshRole::type role, const std::string &module, const std::string &source, const std::string &node_type, const std::string &instance_id, EventManager *evm, unsigned short http_port, SandeshContext *client_context=NULL, const SandeshConfig &config=SandeshConfig())
Definition: sandesh.cc:145
std::map< std::string, SandeshContext * > ModuleContextMap
Definition: cpp/sandesh.h:390
virtual bool SendEnqueue()
Definition: sandesh.cc:703
static std::string source_
Definition: cpp/sandesh.h:410
static std::string logging_category_
Definition: cpp/sandesh.h:423
static bool IsLevelUT(SandeshLevel::type level)
Definition: sandesh.cc:817
static bool InitGeneratorTest(const std::string &module, const std::string &source, const std::string &node_type, const std::string &instance_id, EventManager *evm, unsigned short http_port, SandeshContext *client_context=NULL, const SandeshConfig &config=SandeshConfig())
Definition: sandesh.cc:311
static bool enable_local_log_
Definition: cpp/sandesh.h:418
static bool disable_flow_collection_
Definition: cpp/sandesh.h:433
static void SetLoggingParams(bool enable_local_log, std::string category, std::string level, bool enable_trace_print=false, bool enable_flow_log=false, bool enable_session_syslog=false)
Definition: sandesh.cc:370
static log4cplus::Logger sampled_logger_
Definition: cpp/sandesh.h:432
static std::string module_
Definition: cpp/sandesh.h:409
static void DisableSendingObjectLogs(bool disable)
Definition: sandesh.cc:505
virtual bool Dispatch(SandeshConnection *sconn=NULL)
Definition: sandesh.cc:728
static SandeshLevel::type StringToLevel(std::string level)
Definition: sandesh.cc:862
static bool sampled_to_collector_
Definition: cpp/sandesh.h:450
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
int GetTaskId(const std::string &name)
Definition: task.cc:861
static TaskScheduler * GetInstance()
Definition: task.cc:554
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true,...
Definition: task.cc:828
static const int kTaskInstanceAny
Specifies value for wildcard (any or *) task data ID.
Definition: task.h:104
static void DeleteServer(TcpServer *server)
Definition: tcp_server.cc:658
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
size_t AtomicDecrementQueueCount(QueueEntryT *entry)
Definition: queue_task.h:435
size_t AtomicIncrementQueueCount(QueueEntryT *entry)
Definition: queue_task.h:431
static EventManager evm
#define SANDESH_LOG(_Level, _Msg)
Definition: cpp/sandesh.h:476
@ INVALID
Definition: globals.h:147
uint8_t type
Definition: load_balance.h:2
void SetLoggingLevel(LogLevel logLevel)
Definition: logging.cc:77
TBinaryProtocolT< TTransport > TBinaryProtocol
static string ToString(PhysicalDevice::ManagementProtocol proto)
bool DoDropSandeshMessage(const SandeshHeader &header, const SandeshLevel::type drop_level)
Definition: sandesh.cc:928
const char * loggingPattern
Definition: sandesh.cc:98
int PullSandeshUVE
Definition: sandesh_uve.cc:21
log4cplus::LogLevel SandeshLevelTolog4Level(SandeshLevel::type slevel)
Definition: sandesh.cc:396
static int32_t SandeshHttpCallback(SandeshRequest *rsnh)
Definition: sandesh.cc:137
int PullSandeshGenStatsReq
Definition: sandesh_req.cc:21
static void WaitForIdle()
Definition: sandesh.cc:323
int PullSandeshTraceReq
bool MakeEndpoint(TcpServer::Endpoint *ep, const std::string &epstr)
Definition: sandesh_util.cc:18
static long GetPendingTaskCount()
Definition: http_session.h:47
bool disable_object_logs
uint32_t system_logs_rate_limit
size_t GetSize() const
Definition: cpp/sandesh.h:461