11 #include <boost/bind.hpp>
12 #include <boost/assign.hpp>
13 #include <boost/foreach.hpp>
21 #include <sandesh/sandesh_constants.h>
22 #include <sandesh/sandesh_types.h>
23 #include <sandesh/sandesh.h>
24 #include <sandesh/sandesh_trace.h>
25 #include <sandesh/sandesh_session.h>
26 #include <sandesh/sandesh_http.h>
30 #include <sandesh/protocol/TXMLProtocol.h>
31 #include <sandesh/sandesh_ctrl_types.h>
32 #include <sandesh/sandesh_uve_types.h>
33 #include <sandesh/derived_stats_results_types.h>
34 #include <sandesh/common/vns_constants.h>
39 using boost::asio::ip::address;
40 using namespace boost::asio;
41 using boost::system::error_code;
51 const std::vector<Sandesh::QueueWaterMarkInfo>
53 (50*1024*1024, SandeshLevel::SYS_UVE,
true,
false)
54 (30*1024*1024, SandeshLevel::SYS_EMERG,
true,
false)
55 (20*1024*1024, SandeshLevel::SYS_ERR,
true,
false)
56 (1*1024*1024, SandeshLevel::SYS_DEBUG,
true,
false)
57 (35*1024*1024, SandeshLevel::SYS_EMERG,
false,
false)
58 (25*1024*1024, SandeshLevel::SYS_ERR,
false,
false)
59 (15*1024*1024, SandeshLevel::SYS_DEBUG,
false,
false)
63 const std::vector<Endpoint> &collectors,
66 :
SslServer(evm, boost::asio::ssl::context::tlsv12_client,
67 config.sandesh_ssl_enable),
68 sm_task_instance_(kSMTaskInstance),
69 sm_task_id_(
TaskScheduler::GetInstance()->GetTaskId(kSMTask)),
70 session_task_instance_(kSessionTaskInstance),
71 session_writer_task_id_(
TaskScheduler::GetInstance()->GetTaskId(kSessionWriterTask)),
72 session_reader_task_id_(
TaskScheduler::GetInstance()->GetTaskId(kSessionReaderTask)),
74 collectors_(collectors),
75 stats_collector_(config.stats_collector),
76 sm_(
SandeshClientSM::CreateClientSM(evm, this, sm_task_instance_, sm_task_id_,
78 session_wm_info_(kSessionWaterMarkInfo),
79 session_close_interval_msec_(0),
80 session_close_time_usec_(0) {
84 TaskPolicy sm_task_policy = boost::assign::list_of
91 boost::asio::ssl::context *ctx =
context();
92 boost::system::error_code ec;
93 ctx->set_options(boost::asio::ssl::context::default_workarounds |
94 boost::asio::ssl::context::no_tlsv1 |
95 boost::asio::ssl::context::no_sslv3 |
96 boost::asio::ssl::context::no_sslv2 |
97 boost::asio::ssl::context::no_tlsv1_1, ec);
98 if (ec.value() != 0) {
99 SANDESH_LOG(ERROR,
"Error setting ssl options: " << ec.message());
105 ctx->set_verify_mode(boost::asio::ssl::verify_peer |
106 boost::asio::ssl::verify_fail_if_no_peer_cert,
108 if (ec.value() != 0) {
109 SANDESH_LOG(ERROR,
"Error setting verification mode: " <<
113 ctx->load_verify_file(config.
ca_cert, ec);
114 if (ec.value() != 0) {
115 SANDESH_LOG(ERROR,
"Error loading CA certificate: " <<
121 ctx->use_certificate_chain_file(config.
certfile, ec);
122 if (ec.value() != 0) {
123 SANDESH_LOG(ERROR,
"Error using server certificate: " <<
128 ctx->use_private_key_file(config.
keyfile,
129 boost::asio::ssl::context::pem, ec);
130 if (ec.value() != 0) {
131 SANDESH_LOG(ERROR,
"Error using server private key file: " <<
139 if (found != std::string::npos) {
142 #if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
145 SANDESH_LOG(ERROR,
"Unix Domain Sockets are not supported on this platform");
154 const std::vector<std::string>& collector_list) {
155 std::vector<Endpoint> collector_endpoints;
157 BOOST_FOREACH(
const std::string& collector, collector_list) {
160 SANDESH_LOG(ERROR, __func__ <<
": Invalid collector address: " <<
164 collector_endpoints.push_back(ep);
166 sm_->SetCollectors(collector_endpoints);
170 sm_->SetAdminState(
false);
179 sm_->SetAdminState(
true);
183 return sm_->SendSandesh(snh);
187 return sm_->SendSandeshUVE(snh);
191 const SandeshHeader &header,
const std::string &sandesh_name,
192 const uint32_t header_offset) {
196 const SandeshCtrlServerToClient * snh =
dynamic_cast<const SandeshCtrlServerToClient *
>(sandesh);
198 SANDESH_LOG(ERROR,
"Received Ctrl Message with wrong type " << sandesh->
Name());
202 if (!snh->get_success()) {
203 SANDESH_LOG(ERROR,
"Received Ctrl Message : Connection with server has failed");
207 SANDESH_LOG(DEBUG,
"Received Ctrl Message with size " << snh->get_type_info().size());
209 map<string,uint32_t> sMap;
210 const vector<UVETypeInfo> & vu = snh->get_type_info();
211 for(uint32_t i = 0; i < vu.size(); i++) {
212 sMap.insert(std::make_pair(vu[i].get_type_name(), vu[i].get_seq_num()));
222 const SandeshHeader &header,
const std::string &sandesh_name,
223 const uint32_t header_offset) {
225 namespace sandesh_prot = contrail::sandesh::protocol;
226 namespace sandesh_trans = contrail::sandesh::transport;
228 if (header.get_Hints() & g_sandesh_constants.SANDESH_CONTROL_HINT) {
229 bool success =
ReceiveCtrlMsg(msg, header, sandesh_name, header_offset);
234 SandeshRxDropReason::ControlMsgFailed);
241 if (sandesh == NULL) {
242 SANDESH_LOG(ERROR, __func__ <<
": Unknown sandesh: " << sandesh_name);
244 SandeshRxDropReason::CreateFailed);
247 boost::shared_ptr<sandesh_trans::TMemoryBuffer> btrans =
248 boost::shared_ptr<sandesh_trans::TMemoryBuffer>(
249 new sandesh_trans::TMemoryBuffer((uint8_t *)msg.c_str() + header_offset,
250 msg.size() - header_offset));
251 boost::shared_ptr<sandesh_prot::TXMLProtocol> prot =
252 boost::shared_ptr<sandesh_prot::TXMLProtocol>(
new sandesh_prot::TXMLProtocol(btrans));
253 int32_t xfer = sandesh->
Read(prot);
255 SANDESH_LOG(ERROR, __func__ <<
": Decoding " << sandesh_name <<
" FAILED");
257 SandeshRxDropReason::DecodingFailed);
269 SandeshCtrlServerToClient::HandleRequest()
const { }
272 SandeshCtrlClientToServer::HandleRequest()
const { }
283 socket->open(ip::tcp::v4(), ec);
285 SANDESH_LOG(ERROR, __func__ <<
" Open FAILED: " << ec.message());
291 SANDESH_LOG(ERROR, __func__ <<
" Unable to set socket options: " << ec.message());
309 return sandesh_session;
313 std::vector<string> stv;
315 SandeshUVETypeMaps::uve_global_map::const_iterator it =
318 stv.push_back(it->first);
340 uint64_t last_close_interval_usec,
int *close_interval_msec) {
343 if (last_close_interval_usec == 0 || last_close_usec == 0) {
344 *close_interval_msec =
348 assert(now_usec >= last_close_usec);
349 uint64_t time_since_close_usec(now_usec - last_close_usec);
352 if (time_since_close_usec <= last_close_interval_usec) {
353 *close_interval_msec = 0;
363 if (time_since_close_usec > last_close_interval_usec &&
364 time_since_close_usec <= 2 * last_close_interval_usec) {
365 uint64_t nclose_interval_msec((2 * last_close_interval_usec)/1000);
366 *close_interval_msec = std::min(nclose_interval_msec,
367 static_cast<uint64_t>(
370 }
else if ((2 * last_close_interval_usec <= time_since_close_usec) &&
371 (time_since_close_usec <= 4 * last_close_interval_usec)) {
372 *close_interval_msec = last_close_interval_usec/1000;
375 *close_interval_msec =
383 int close_interval_msec(0);
398 const string & stateName,
const string & server,
400 const std::vector<TcpServer::Endpoint> & collector_eps) {
401 ModuleClientState mcs;
404 SandeshClientInfo sci;
410 sci.set_successful_connections(count);
411 sci.set_pid(getpid());
413 sci.set_status(stateName);
414 sci.set_collector_name(server);
415 std::ostringstream collector_ip;
416 collector_ip << server_ip;
417 sci.set_collector_ip(collector_ip.str());
418 std::vector<std::string> collectors;
420 std::ostringstream collector_ip;
422 collectors.push_back(collector_ip.str());
424 sci.set_collector_list(collectors);
426 SocketIOStats rx_stats;
428 sci.set_rx_socket_stats(rx_stats);
429 SocketIOStats tx_stats;
431 sci.set_tx_socket_stats(tx_stats);
433 mcs.set_client_info(sci);
435 std::vector<SandeshMessageTypeStats> mtype_stats;
436 SandeshMessageStats magg_stats;
439 map<string,uint64_t> csev;
440 csev.insert(make_pair(
"sent", magg_stats.get_messages_sent()));
441 csev.insert(make_pair(
"dropped_no_queue",
442 magg_stats.get_messages_sent_dropped_no_queue()));
443 csev.insert(make_pair(
"dropped_no_client",
444 magg_stats.get_messages_sent_dropped_no_client()));
445 csev.insert(make_pair(
"dropped_no_session",
446 magg_stats.get_messages_sent_dropped_no_session()));
447 csev.insert(make_pair(
"dropped_queue_level",
448 magg_stats.get_messages_sent_dropped_queue_level()));
449 csev.insert(make_pair(
"dropped_client_send_failed",
450 magg_stats.get_messages_sent_dropped_client_send_failed()));
451 csev.insert(make_pair(
"dropped_session_not_connected",
452 magg_stats.get_messages_sent_dropped_session_not_connected()));
453 csev.insert(make_pair(
"dropped_header_write_failed",
454 magg_stats.get_messages_sent_dropped_header_write_failed()));
455 csev.insert(make_pair(
"dropped_write_failed",
456 magg_stats.get_messages_sent_dropped_write_failed()));
457 csev.insert(make_pair(
"dropped_wrong_client_sm_state",
458 magg_stats.get_messages_sent_dropped_wrong_client_sm_state()));
459 csev.insert(make_pair(
"dropped_validation_failed",
460 magg_stats.get_messages_sent_dropped_validation_failed()));
461 csev.insert(make_pair(
"dropped_rate_limited",
462 magg_stats.get_messages_sent_dropped_rate_limited()));
463 csev.insert(make_pair(
"dropped_sending_disabled",
464 magg_stats.get_messages_sent_dropped_sending_disabled()));
465 csev.insert(make_pair(
"dropped_sending_to_syslog",
466 magg_stats.get_messages_sent_dropped_sending_to_syslog()));
467 mcs.set_tx_msg_agg(csev);
469 map <string,SandeshMessageStats> csevm;
470 for (vector<SandeshMessageTypeStats>::const_iterator smit = mtype_stats.begin();
471 smit != mtype_stats.end(); smit++) {
472 SandeshMessageStats res_sms;
473 const SandeshMessageStats& src_sms = smit->get_stats();
474 res_sms.set_messages_sent(src_sms.get_messages_sent());
475 res_sms.set_messages_sent_dropped_no_queue(
476 src_sms.get_messages_sent_dropped_no_queue());
477 res_sms.set_messages_sent_dropped_no_client(
478 src_sms.get_messages_sent_dropped_no_client());
479 res_sms.set_messages_sent_dropped_no_session(
480 src_sms.get_messages_sent_dropped_no_session());
481 res_sms.set_messages_sent_dropped_queue_level(
482 src_sms.get_messages_sent_dropped_queue_level());
483 res_sms.set_messages_sent_dropped_client_send_failed(
484 src_sms.get_messages_sent_dropped_client_send_failed());
485 res_sms.set_messages_sent_dropped_session_not_connected(
486 src_sms.get_messages_sent_dropped_session_not_connected());
487 res_sms.set_messages_sent_dropped_header_write_failed(
488 src_sms.get_messages_sent_dropped_header_write_failed());
489 res_sms.set_messages_sent_dropped_write_failed(
490 src_sms.get_messages_sent_dropped_write_failed());
491 res_sms.set_messages_sent_dropped_wrong_client_sm_state(
492 src_sms.get_messages_sent_dropped_wrong_client_sm_state());
493 res_sms.set_messages_sent_dropped_validation_failed(
494 src_sms.get_messages_sent_dropped_validation_failed());
495 res_sms.set_messages_sent_dropped_rate_limited(
496 src_sms.get_messages_sent_dropped_rate_limited());
497 res_sms.set_messages_sent_dropped_sending_disabled(
498 src_sms.get_messages_sent_dropped_sending_disabled());
499 res_sms.set_messages_sent_dropped_sending_to_syslog(
500 src_sms.get_messages_sent_dropped_sending_to_syslog());
501 csevm.insert(make_pair(smit->get_message_type(), res_sms));
503 mcs.set_msg_type_agg(csevm);
505 SandeshModuleClientTrace::Send(mcs);
526 std::vector<Sandesh::QueueWaterMarkInfo> &scwm_info)
const {
int session_writer_task_id_
bool MakeEndpoint(TcpServer::Endpoint *ep, const std::string &epstr)
virtual SandeshSession * CreateSMSession(SslSession::EventObserver eocb, SandeshReceiveMsgCb rmcb, TcpServer::Endpoint ep)
static const std::string kSMTask
virtual void DeleteSession(TcpSession *session)
static const int kMaxSMSessionCloseIntervalMSec
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
boost::asio::ip::udp::endpoint Endpoint
boost::asio::ip::tcp::socket Socket
virtual void Connect(TcpSession *session, Endpoint remote)
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
static Sandesh * CreateInstance(std::string const &s)
SandeshSession * session() const
virtual SslSession * AllocSession(SslSocket *socket)
virtual TcpSession * CreateSession()
void GetTxSocketStats(SocketIOStats *socket_stats) const
boost::scoped_ptr< SandeshClientSM > sm_
boost::asio::io_context * io_service()
bool CloseSMSessionInternal()
boost::asio::ssl::context * context()
SandeshClient(EventManager *evm, const std::vector< Endpoint > &collectors, const SandeshConfig &config, bool periodicuve=false)
static const int kInitialSMSessionCloseIntervalMSec
static uve_global_map::const_iterator Begin()
virtual const char * Name() const
void GetSessionWaterMarkInfo(std::vector< Sandesh::QueueWaterMarkInfo > &scwm_info) const
#define SANDESH_LOG(_Level, _Msg)
void GetRxSocketStats(SocketIOStats *socket_stats) const
static void UpdateRxMsgStats(const std::string &msg_name, uint64_t bytes)
void SetDscpValue(uint8_t value)
int session_close_interval_msec_
static uint64_t client_start_time
static void UpdateRxMsgFailStats(const std::string &msg_name, uint64_t bytes, SandeshRxDropReason::type dreason)
static std::string node_type()
void set_observer(EventObserver observer)
bool ReceiveCtrlMsg(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t header_offset)
void ResetSendQueueWaterMark()
void SetSendQueueWaterMark(Sandesh::QueueWaterMarkInfo &wm_info)
static Sandesh * DecodeCtrlSandesh(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t &header_offset)
void SetSessionWaterMarkInfo(Sandesh::QueueWaterMarkInfo &scwm)
bool SendSandeshUVE(Sandesh *snh_uve)
static void SyncAllMaps(const std::map< std::string, uint32_t > &, bool periodic=false)
static std::string module()
virtual int32_t Read(boost::shared_ptr< contrail::sandesh::protocol::TProtocol > iprot)=0
static TaskScheduler * GetInstance()
uint64_t session_close_time_usec_
static const std::vector< Sandesh::QueueWaterMarkInfo > kSessionWaterMarkInfo
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
void SetConnection(SandeshConnection *connection)
std::vector< Sandesh::QueueWaterMarkInfo > session_wm_info_
int session_task_instance_
std::vector< TaskExclusion > TaskPolicy
int session_reader_task_id_
boost::scoped_ptr< StatsClient > stats_client_
static uint64_t UTCTimestampUsec()
void InitializeSMSession(int connects)
static SandeshRxQueue * recv_queue()
int SetDscpSocketOption(uint8_t value)
void ResetSessionWaterMarkInfo()
void SetReceiveMsgCb(SandeshReceiveMsgCb cb)
bool SendSandesh(Sandesh *snh)
static uve_global_map::const_iterator End()
void ReConfigCollectors(const std::vector< std::string > &)
static bool task_policy_set_
static std::string source()
virtual Socket * socket() const
boost::tuple< size_t, SandeshLevel::type, bool, bool > QueueWaterMarkInfo
boost::function< bool(const std::string &, SandeshSession *)> SandeshReceiveMsgCb
static void GetMsgStats(std::vector< SandeshMessageTypeStats > *mtype_stats, SandeshMessageStats *magg_stats)
boost::function< void(TcpSession *, Event)> EventObserver
boost::asio::ip::tcp::endpoint Endpoint
void SendUVE(int count, const std::string &stateName, const std::string &server, const Endpoint &server_ip, const std::vector< Endpoint > &collector_eps)
bool ReceiveMsg(const std::string &msg, const SandeshHeader &header, const std::string &sandesh_name, const uint32_t header_offset)
static void UpdateDscp(uint8_t dscp)
std::vector< Endpoint > collectors_
std::string stats_collector_
virtual void EnqueueClose()
virtual boost::system::error_code SetSocketOptions()
static const std::string kSessionReaderTask
static std::string instance_id()
bool DoCloseSMSession(uint64_t now_usec, uint64_t last_close_usec, uint64_t last_close_interval_usec, int *close_interval_msec)
static const std::string kSessionWriterTask