11 #include <boost/bind/bind.hpp>
12 #include <boost/assign.hpp>
13 #include <boost/foreach.hpp>
21 #include <sandesh/sandesh_constants.h>
22 #include <sandesh/sandesh_types.h>
23 #include <sandesh/sandesh.h>
24 #include <sandesh/sandesh_trace.h>
25 #include <sandesh/sandesh_session.h>
26 #include <sandesh/sandesh_http.h>
30 #include <sandesh/protocol/TXMLProtocol.h>
31 #include <sandesh/sandesh_ctrl_types.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/derived_stats_results_types.h>
34 #include <sandesh/common/vns_constants.h>
39 using boost::asio::ip::address;
40 using namespace boost::asio;
41 using boost::system::error_code;
46 using namespace boost::placeholders;
52 const std::vector<Sandesh::QueueWaterMarkInfo>
54 (50*1024*1024, SandeshLevel::SYS_UVE,
true,
false)
55 (30*1024*1024, SandeshLevel::SYS_EMERG,
true,
false)
56 (20*1024*1024, SandeshLevel::SYS_ERR,
true,
false)
57 (1*1024*1024, SandeshLevel::SYS_DEBUG,
true,
false)
58 (35*1024*1024, SandeshLevel::SYS_EMERG,
false,
false)
59 (25*1024*1024, SandeshLevel::SYS_ERR,
false,
false)
60 (15*1024*1024, SandeshLevel::SYS_DEBUG,
false,
false)
64 const std::vector<Endpoint> &collectors,
68 config.sandesh_ssl_enable),
69 sm_task_instance_(kSMTaskInstance),
70 sm_task_id_(
TaskScheduler::GetInstance()->GetTaskId(kSMTask)),
71 session_task_instance_(kSessionTaskInstance),
72 session_writer_task_id_(
TaskScheduler::GetInstance()->GetTaskId(kSessionWriterTask)),
73 session_reader_task_id_(
TaskScheduler::GetInstance()->GetTaskId(kSessionReaderTask)),
75 collectors_(collectors),
76 stats_collector_(config.stats_collector),
79 session_wm_info_(kSessionWaterMarkInfo),
80 session_close_interval_msec_(0),
81 session_close_time_usec_(0) {
85 TaskPolicy sm_task_policy = boost::assign::list_of
92 boost::asio::ssl::context *ctx =
context();
93 boost::system::error_code ec;
94 ctx->set_options(boost::asio::ssl::context::default_workarounds |
95 boost::asio::ssl::context::no_tlsv1 |
96 boost::asio::ssl::context::no_sslv3 |
97 boost::asio::ssl::context::no_sslv2 |
98 boost::asio::ssl::context::no_tlsv1_1, ec);
99 if (ec.value() != 0) {
100 SANDESH_LOG(ERROR,
"Error setting ssl options: " << ec.message());
106 ctx->set_verify_mode(boost::asio::ssl::verify_peer |
107 boost::asio::ssl::verify_fail_if_no_peer_cert,
109 if (ec.value() != 0) {
110 SANDESH_LOG(ERROR,
"Error setting verification mode: " <<
114 ctx->load_verify_file(config.
ca_cert, ec);
115 if (ec.value() != 0) {
116 SANDESH_LOG(ERROR,
"Error loading CA certificate: " <<
122 ctx->use_certificate_chain_file(config.
certfile, ec);
123 if (ec.value() != 0) {
124 SANDESH_LOG(ERROR,
"Error using server certificate: " <<
129 ctx->use_private_key_file(config.
keyfile,
130 boost::asio::ssl::context::pem, ec);
131 if (ec.value() != 0) {
132 SANDESH_LOG(ERROR,
"Error using server private key file: " <<
140 if (found != std::string::npos) {
143 #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
146 SANDESH_LOG(ERROR,
"Unix Domain Sockets are not supported on this platform");
155 const std::vector<std::string>& collector_list) {
156 std::vector<Endpoint> collector_endpoints;
158 BOOST_FOREACH(
const std::string& collector, collector_list) {
161 SANDESH_LOG(ERROR, __func__ <<
": Invalid collector address: " <<
165 collector_endpoints.push_back(ep);
167 sm_->SetCollectors(collector_endpoints);
171 sm_->SetAdminState(
false);
180 sm_->SetAdminState(
true);
184 return sm_->SendSandesh(snh);
188 return sm_->SendSandeshUVE(snh);
192 const SandeshHeader &header,
const std::string &sandesh_name,
193 const uint32_t header_offset) {
197 const SandeshCtrlServerToClient * snh =
dynamic_cast<const SandeshCtrlServerToClient *
>(
sandesh);
203 if (!snh->get_success()) {
204 SANDESH_LOG(ERROR,
"Received Ctrl Message : Connection with server has failed");
208 SANDESH_LOG(DEBUG,
"Received Ctrl Message with size " << snh->get_type_info().size());
210 map<string,uint32_t> sMap;
211 const vector<UVETypeInfo> & vu = snh->get_type_info();
212 for(uint32_t i = 0; i < vu.size(); i++) {
213 sMap.insert(std::make_pair(vu[i].get_type_name(), vu[i].get_seq_num()));
223 const SandeshHeader &header,
const std::string &sandesh_name,
224 const uint32_t header_offset) {
229 if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
230 bool success =
ReceiveCtrlMsg(msg, header, sandesh_name, header_offset);
235 SandeshRxDropReason::ControlMsgFailed);
243 SANDESH_LOG(ERROR, __func__ <<
": Unknown sandesh: " << sandesh_name);
245 SandeshRxDropReason::CreateFailed);
248 boost::shared_ptr<sandesh_trans::TMemoryBuffer> btrans =
249 boost::shared_ptr<sandesh_trans::TMemoryBuffer>(
250 new sandesh_trans::TMemoryBuffer((uint8_t *)msg.c_str() + header_offset,
251 msg.size() - header_offset));
252 boost::shared_ptr<sandesh_prot::TXMLProtocol> prot =
253 boost::shared_ptr<sandesh_prot::TXMLProtocol>(
new sandesh_prot::TXMLProtocol(btrans));
254 int32_t xfer =
sandesh->Read(prot);
256 SANDESH_LOG(ERROR, __func__ <<
": Decoding " << sandesh_name <<
" FAILED");
258 SandeshRxDropReason::DecodingFailed);
270 SandeshCtrlServerToClient::HandleRequest()
const { }
273 SandeshCtrlClientToServer::HandleRequest()
const { }
284 socket->open(ip::tcp::v4(), ec);
286 SANDESH_LOG(ERROR, __func__ <<
" Open FAILED: " << ec.message());
292 SANDESH_LOG(ERROR, __func__ <<
" Unable to set socket options: " << ec.message());
310 return sandesh_session;
314 std::vector<string> stv;
316 SandeshUVETypeMaps::uve_global_map::const_iterator it =
319 stv.push_back(it->first);
341 uint64_t last_close_interval_usec,
int *close_interval_msec) {
344 if (last_close_interval_usec == 0 || last_close_usec == 0) {
345 *close_interval_msec =
349 assert(now_usec >= last_close_usec);
350 uint64_t time_since_close_usec(now_usec - last_close_usec);
353 if (time_since_close_usec <= last_close_interval_usec) {
354 *close_interval_msec = 0;
364 if (time_since_close_usec > last_close_interval_usec &&
365 time_since_close_usec <= 2 * last_close_interval_usec) {
366 uint64_t nclose_interval_msec((2 * last_close_interval_usec)/1000);
367 *close_interval_msec = std::min(nclose_interval_msec,
368 static_cast<uint64_t
>(
371 }
else if ((2 * last_close_interval_usec <= time_since_close_usec) &&
372 (time_since_close_usec <= 4 * last_close_interval_usec)) {
373 *close_interval_msec = last_close_interval_usec/1000;
376 *close_interval_msec =
384 int close_interval_msec(0);
399 const string & stateName,
const string & server,
401 const std::vector<TcpServer::Endpoint> & collector_eps) {
402 ModuleClientState mcs;
405 SandeshClientInfo sci;
411 sci.set_successful_connections(count);
412 sci.set_pid(getpid());
414 sci.set_status(stateName);
415 sci.set_collector_name(server);
416 std::ostringstream collector_ip;
417 collector_ip << server_ip;
418 sci.set_collector_ip(collector_ip.str());
419 std::vector<std::string> collectors;
421 std::ostringstream collector_ip;
423 collectors.push_back(collector_ip.str());
425 sci.set_collector_list(collectors);
427 SocketIOStats rx_stats;
429 sci.set_rx_socket_stats(rx_stats);
430 SocketIOStats tx_stats;
432 sci.set_tx_socket_stats(tx_stats);
434 mcs.set_client_info(sci);
436 std::vector<SandeshMessageTypeStats> mtype_stats;
437 SandeshMessageStats magg_stats;
440 map<string,uint64_t> csev;
441 csev.insert(make_pair(
"sent", magg_stats.get_messages_sent()));
442 csev.insert(make_pair(
"dropped_no_queue",
443 magg_stats.get_messages_sent_dropped_no_queue()));
444 csev.insert(make_pair(
"dropped_no_client",
445 magg_stats.get_messages_sent_dropped_no_client()));
446 csev.insert(make_pair(
"dropped_no_session",
447 magg_stats.get_messages_sent_dropped_no_session()));
448 csev.insert(make_pair(
"dropped_queue_level",
449 magg_stats.get_messages_sent_dropped_queue_level()));
450 csev.insert(make_pair(
"dropped_client_send_failed",
451 magg_stats.get_messages_sent_dropped_client_send_failed()));
452 csev.insert(make_pair(
"dropped_session_not_connected",
453 magg_stats.get_messages_sent_dropped_session_not_connected()));
454 csev.insert(make_pair(
"dropped_header_write_failed",
455 magg_stats.get_messages_sent_dropped_header_write_failed()));
456 csev.insert(make_pair(
"dropped_write_failed",
457 magg_stats.get_messages_sent_dropped_write_failed()));
458 csev.insert(make_pair(
"dropped_wrong_client_sm_state",
459 magg_stats.get_messages_sent_dropped_wrong_client_sm_state()));
460 csev.insert(make_pair(
"dropped_validation_failed",
461 magg_stats.get_messages_sent_dropped_validation_failed()));
462 csev.insert(make_pair(
"dropped_rate_limited",
463 magg_stats.get_messages_sent_dropped_rate_limited()));
464 csev.insert(make_pair(
"dropped_sending_disabled",
465 magg_stats.get_messages_sent_dropped_sending_disabled()));
466 csev.insert(make_pair(
"dropped_sending_to_syslog",
467 magg_stats.get_messages_sent_dropped_sending_to_syslog()));
468 mcs.set_tx_msg_agg(csev);
470 map <string,SandeshMessageStats> csevm;
471 for (vector<SandeshMessageTypeStats>::const_iterator smit = mtype_stats.begin();
472 smit != mtype_stats.end(); smit++) {
473 SandeshMessageStats res_sms;
474 const SandeshMessageStats& src_sms = smit->get_stats();
475 res_sms.set_messages_sent(src_sms.get_messages_sent());
476 res_sms.set_messages_sent_dropped_no_queue(
477 src_sms.get_messages_sent_dropped_no_queue());
478 res_sms.set_messages_sent_dropped_no_client(
479 src_sms.get_messages_sent_dropped_no_client());
480 res_sms.set_messages_sent_dropped_no_session(
481 src_sms.get_messages_sent_dropped_no_session());
482 res_sms.set_messages_sent_dropped_queue_level(
483 src_sms.get_messages_sent_dropped_queue_level());
484 res_sms.set_messages_sent_dropped_client_send_failed(
485 src_sms.get_messages_sent_dropped_client_send_failed());
486 res_sms.set_messages_sent_dropped_session_not_connected(
487 src_sms.get_messages_sent_dropped_session_not_connected());
488 res_sms.set_messages_sent_dropped_header_write_failed(
489 src_sms.get_messages_sent_dropped_header_write_failed());
490 res_sms.set_messages_sent_dropped_write_failed(
491 src_sms.get_messages_sent_dropped_write_failed());
492 res_sms.set_messages_sent_dropped_wrong_client_sm_state(
493 src_sms.get_messages_sent_dropped_wrong_client_sm_state());
494 res_sms.set_messages_sent_dropped_validation_failed(
495 src_sms.get_messages_sent_dropped_validation_failed());
496 res_sms.set_messages_sent_dropped_rate_limited(
497 src_sms.get_messages_sent_dropped_rate_limited());
498 res_sms.set_messages_sent_dropped_sending_disabled(
499 src_sms.get_messages_sent_dropped_sending_disabled());
500 res_sms.set_messages_sent_dropped_sending_to_syslog(
501 src_sms.get_messages_sent_dropped_sending_to_syslog());
502 csevm.insert(make_pair(smit->get_message_type(), res_sms));
504 mcs.set_msg_type_agg(csevm);
506 SandeshModuleClientTrace::Send(mcs);
527 std::vector<Sandesh::QueueWaterMarkInfo> &scwm_info)
const {
boost::asio::io_context * io_service()
static Sandesh * CreateInstance(std::string const &s)
void InitializeSMSession(int connects)
virtual SandeshSession * CreateSMSession(SslSession::EventObserver eocb, SandeshReceiveMsgCb rmcb, TcpServer::Endpoint ep)
SandeshSession * session() const
std::vector< Sandesh::QueueWaterMarkInfo > session_wm_info_
void ReConfigCollectors(const std::vector< std::string > &)
std::vector< Endpoint > collectors_
static const std::string kSessionReaderTask
uint64_t session_close_time_usec_
void GetSessionWaterMarkInfo(std::vector< Sandesh::QueueWaterMarkInfo > &scwm_info) const
bool ReceiveMsg(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t header_offset)
boost::scoped_ptr< StatsClient > stats_client_
int session_task_instance_
static bool task_policy_set_
void ResetSessionWaterMarkInfo()
static const int kInitialSMSessionCloseIntervalMSec
void SetDscpValue(uint8_t value)
bool CloseSMSessionInternal()
bool SendSandeshUVE(Sandesh *snh_uve)
void SendUVE(int count, const std::string &stateName, const std::string &server, const Endpoint &server_ip, const std::vector< Endpoint > &collector_eps)
std::string stats_collector_
static const int kMaxSMSessionCloseIntervalMSec
int session_writer_task_id_
bool SendSandesh(Sandesh *snh)
int session_close_interval_msec_
bool ReceiveCtrlMsg(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t header_offset)
static const std::vector< Sandesh::QueueWaterMarkInfo > kSessionWaterMarkInfo
static const std::string kSessionWriterTask
int session_reader_task_id_
static const std::string kSMTask
boost::scoped_ptr< SandeshClientSM > sm_
SandeshClient(EventManager *evm, const std::vector< Endpoint > &collectors, const SandeshConfig &config, bool periodicuve=false)
void SetSessionWaterMarkInfo(Sandesh::QueueWaterMarkInfo &scwm)
virtual SslSession * AllocSession(SslSocket *socket)
static void UpdateDscp(uint8_t dscp)
bool Enqueue(SandeshRxQueue *queue)
virtual void EnqueueClose()
virtual boost::system::error_code SetSocketOptions()
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
void SetConnection(SandeshConnection *connection)
void SetSendQueueWaterMark(Sandesh::QueueWaterMarkInfo &wm_info)
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
void ResetSendQueueWaterMark()
static uve_global_map::const_iterator Begin()
static uve_global_map::const_iterator End()
static void SyncAllMaps(const std::map< std::string, uint32_t > &, bool periodic=false)
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
static void UpdateRxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshRxDropReason::type dreason)
static void GetMsgStats(std::vector< SandeshMessageTypeStats > *mtype_stats, SandeshMessageStats *magg_stats)
static void UpdateRxMsgStats(const std::string &msg_name, uint64_t bytes)
static std::string source()
static std::string module()
static SandeshRxQueue * recv_queue()
static std::string instance_id()
static std::string node_type()
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
boost::asio::ssl::context * context()
virtual Socket * socket() const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
static TaskScheduler * GetInstance()
boost::asio::ip::tcp::endpoint Endpoint
virtual void Connect(TcpSession *session, Endpoint remote)
void GetRxSocketStats(SocketIOStats *socket_stats) const
virtual TcpSession * CreateSession()
virtual void DeleteSession(TcpSession *session)
boost::asio::ip::tcp::socket Socket
void GetTxSocketStats(SocketIOStats *socket_stats) const
void set_observer(EventObserver observer)
boost::function< void(TcpSession *, Event)> EventObserver
int SetDscpSocketOption(uint8_t value)
boost::asio::ip::udp::endpoint Endpoint
#define SANDESH_LOG(_Level, _Msg)
static uint64_t client_start_time
bool DoCloseSMSession(uint64_t now_usec, uint64_t last_close_usec, uint64_t last_close_interval_usec, int *close_interval_msec)
boost::function< bool(const std::string &, SandeshSession *)> SandeshReceiveMsgCb
bool MakeEndpoint(TcpServer::Endpoint *ep, const std::string &epstr)
The class is used to specify a Task label for formulating a task exclusion list (an execution policy)...
std::vector< TaskExclusion > TaskPolicy
Defines a type to store an execution policy (a list of task exclusions).
static uint64_t UTCTimestampUsec()