7 #include <boost/foreach.hpp>
8 #include <boost/tuple/tuple.hpp>
18 #include "sandesh/request_pipeline.h"
19 #include "sandesh/common/vns_types.h"
20 #include "sandesh/common/vns_constants.h"
21 #include "sandesh/xmpp_server_types.h"
22 #include "sandesh/xmpp_trace_sandesh_types.h"
23 #include "sandesh/xmpp_client_server_sandesh_types.h"
26 using namespace boost::asio;
29 #define DEFAULT_XMPP_HOLD_TIME 90
33 :
LifetimeActor(server->lifetime_manager()), server_(server) {
37 return server_->MayDelete();
41 server_->SessionShutdown();
56 evm, ssl::context::sslv23_server, config->auth_enabled, true),
61 server_addr_(server_addr),
63 auth_enabled_(config->auth_enabled),
64 tcp_hold_time_(config->tcp_hold_time),
65 gr_helper_disable_(config->gr_helper_disable),
67 connection_queue_(
TaskScheduler::GetInstance()->GetTaskId(
"bgp::Config"),
68 0, boost::bind(&
XmppServer::DequeueConnection, this, _1)) {
73 boost::asio::ssl::context *ctx =
context();
74 boost::system::error_code ec;
77 ctx->set_options(ssl::context::default_workarounds |
78 ssl::context::no_sslv3 | ssl::context::no_sslv2, ec);
79 if (ec.value() != 0) {
80 LOG(ERROR,
"Error : " << ec.message() <<
", setting ssl options");
87 if (!ca_cert_filename.empty()) {
90 ctx->set_verify_mode(boost::asio::ssl::verify_peer, ec);
91 if (ec.value() != 0) {
92 LOG(ERROR,
"Error : " << ec.message()
93 <<
", while setting ssl verification mode");
98 if (ec.value() != 0) {
99 LOG(ERROR,
"Error : " << ec.message()
100 <<
", while using cacert file : "
108 boost::asio::ssl::context::pem, ec);
109 if (ec.value() != 0) {
110 LOG(ERROR,
"Error : " << ec.message()
111 <<
", while using server cert file : "
118 boost::asio::ssl::context::pem, ec);
119 if (ec.value() != 0) {
120 LOG(ERROR,
"Error : " << ec.message()
121 <<
", while using privkey file : "
146 if (server_->subcluster_name() != protocol_config->
subcluster_name()) {
148 server_->ClearAllConnections();
155 bool clear_peers = config_.gr_enable() || system->
gr_enable();
156 bool update_peers =
false;
157 config_.set_gr_enable(system->
gr_enable());
158 config_.set_gr_time(system->
gr_time());
159 config_.set_llgr_time(system->
llgr_time());
164 if (config_.fc_enabled() != system->
fc_enabled() ||
181 server_->ClearAllConnections();
182 else if (update_peers)
183 server_->UpdateAllConnections(config_.xmpp_hold_time());
188 server_->set_subcluster_name(name);
193 config_.set_xmpp_hold_time(hold_time);
226 auth_enabled_(false),
228 gr_helper_disable_(false),
230 connection_queue_(
TaskScheduler::GetInstance()->GetTaskId(
"bgp::Config"),
231 0, boost::bind(&
XmppServer::DequeueConnection, this, _1)) {
366 SOL_SOCKET, SO_REUSEADDR> reuse_addr_t;
370 boost::system::error_code err;
371 socket->open(ip::tcp::v4(), err);
376 socket->set_option(reuse_addr_t(
true), err);
421 if (value.second->ToString() == address)
429 for (
auto& value : connection_map_) {
430 if (value.second->GetComputeHostName() == hostname) {
431 value.second->Clear();
440 for (
auto& value : connection_map_) {
441 value.second->UpdateKeepAliveTimer(time_out);
447 for (
auto& value : connection_map_) {
448 value.second->Clear();
474 boost::system::error_code err;
524 ConnectionMap::iterator loc = connection_map_.find(endpoint);
525 assert(loc != connection_map_.end() && loc->second == connection);
526 connection_map_.erase(loc);
532 ConnectionMap::iterator loc1 =
533 connection_map_.find(connection1->
endpoint());
534 assert(loc1 != connection_map_.end());
535 ConnectionMap::iterator loc2 =
536 connection_map_.find(connection2->
endpoint());
537 assert(loc2 != connection_map_.end());
538 swap(loc1->second, loc2->second);
539 swap(loc1->second->endpoint(), loc2->second->endpoint());
551 ConnectionMap::iterator loc;
554 tie(loc, result) = connection_map_.insert(make_pair(endpoint, connection));
579 connection = XmppStaticObjectFactory::Create<XmppServerConnection>
588 for (
auto& value : connection_map_) {
620 if (old_connection) {
622 "Close duplicate connection " + session->
ToString());
656 ConnectionSet::iterator it;
676 const string &endpoint_name) {
678 ConnectionEndpointMap::const_iterator loc =
691 ConnectionEndpointMap::const_iterator loc =
696 conn_endpoint = loc->second;
702 return conn_endpoint;
709 make_pair(connection->
ToString(), conn_endpoint));
713 return conn_endpoint;
732 vector<ShowXmppConnection> *show_connection_list)
const {
734 for (
const auto& value : connection_map_) {
736 ShowXmppConnection show_connection;
738 show_connection_list->push_back(show_connection);
741 ShowXmppConnection show_connection;
742 connection->FillShowInfo(&show_connection);
743 show_connection_list->push_back(show_connection);
748 SocketIOStats peer_socket_stats;
750 resp->set_rx_socket_stats(peer_socket_stats);
752 resp->set_tx_socket_stats(peer_socket_stats);
762 const ShowXmppConnectionReq *req =
763 static_cast<const ShowXmppConnectionReq *
>(ps.
snhRequest_.get());
767 ShowXmppConnectionResp *resp =
new ShowXmppConnectionResp;
768 vector<ShowXmppConnection> connections;
771 resp->set_connections(connections);
772 resp->set_context(req->context());
778 void ShowXmppConnectionReq::HandleRequest()
const {
788 ps.stages_.push_back(s1);
798 const ClearXmppConnectionReq *req =
799 static_cast<const ClearXmppConnectionReq *
>(ps.
snhRequest_.get());
803 ClearXmppConnectionResp *resp =
new ClearXmppConnectionResp;
805 resp->set_success(
false);
806 }
else if (req->get_hostname_or_all() !=
"all") {
808 resp->set_success(
true);
810 resp->set_success(
false);
815 resp->set_success(
true);
817 resp->set_success(
false);
821 resp->set_context(req->context());
827 void ClearXmppConnectionReq::HandleRequest()
const {
837 ps.stages_.push_back(s1);
846 const ShowXmppServerReq *req =
847 static_cast<const ShowXmppServerReq *
>(ps.
snhRequest_.get());
851 ShowXmppServerResp *resp =
new ShowXmppServerResp;
854 resp->set_context(req->context());
860 void ShowXmppServerReq::HandleRequest()
const {
867 ps.stages_.push_back(s1);
XmppConnectionEndpoint * LocateConnectionEndpoint(XmppServerConnection *connection, bool &created)
boost::asio::ip::tcp::endpoint endpoint
BgpGlobalSystemConfigObserver system
std::vector< int > instances_
void set_read_on_connect(bool read)
void UnRegisterConnectionEvent(xmps::PeerId)
LifetimeManager * lifetime_manager()
uint32_t GetEndOfRibSendTime() const
Endpoint local_endpoint() const
size_t GetSessionQueueSize() const
virtual void DeleteSession(TcpSession *session)
void FillShowConnections(std::vector< ShowXmppConnection > *show_connection_list) const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void FillShowServer(ShowXmppServerResp *resp) const
virtual bool AcceptSession(XmppSession *session)
void ProcessGlobalSystemConfig(const BgpGlobalSystemConfig *system, BgpConfigManager::EventType event)
void Shutdown(bool delete_entries=true)
virtual void InsertDeletedConnection(XmppServerConnection *connection)
boost::asio::ip::tcp::socket Socket
tbb::reader_writer_lock connection_map_mutex_
ConnectionEventCbMap connection_event_map_
void set_hold_time(int hold_time)
boost::asio::ip::tcp::endpoint local_endpoint
virtual bool Initialize(short port)
virtual boost::asio::ip::tcp::endpoint endpoint() const
virtual bool IsPeerCloseGraceful() const
boost::function< void(XmppChannelMux *, xmps::PeerState)> ConnectionEventCb
XmppConnectionEndpoint * conn_endpoint()
DeleteActor(XmppServer *server)
virtual std::string ToString() const
const BgpGlobalSystemConfig & config() const
boost::asio::ip::address IpAddress
void RegisterObservers(const Observers &obs)
uint16_t rd_cluster_seed() const
virtual TcpSession * CreateSession()
boost::asio::ip::tcp::endpoint Endpoint
void GetTxSocketStats(SocketIOStats *socket_stats) const
virtual void ManagedDelete()
void set_conn_endpoint(XmppConnectionEndpoint *conn_endpoint)
boost::asio::ssl::context * context()
#define XMPP_PEER_DIR_OUT
static bool CallbackS1(const Sandesh *sr, const RequestPipeline::PipeSpec ps, int stage, int instNum, RequestPipeline::InstData *data)
boost::system::error_code EnableTcpKeepalive(int tcp_hold_time)
virtual LifetimeActor * deleter()
void SetDscpValue(uint8_t value)
BgpProtocolObserver protocol
void GetRxSocketStats(SocketIOStats *socket_stats) const
static const int kEndOfRibTime
uint32_t GetEndOfRibReceiveTime() const
bool ClearConnection(const std::string &hostname)
int GetTaskId(const std::string &name)
virtual void RetryDelete()
Endpoint LocalEndpoint() const
void set_subcluster_name(const string &name)
bool DequeueConnection(XmppServerConnection *connection)
std::string path_to_ca_cert
void set_observer(EventObserver observer)
ConnectionEndpointMap connection_endpoint_map_
bool gr_xmpp_helper() const
virtual SslSession * AllocSession(SslSocket *socket)
XmppConnection * connection()
XmppConfigUpdater(XmppServer *server, BgpConfigManager *config_manager)
void UpdateAllConnections(uint8_t time_out)
void NotifyConnectionEvent(XmppChannelMux *, xmps::PeerState)
virtual XmppServerConnection * CreateConnection(XmppSession *session)
boost::scoped_ptr< DeleteActor > deleter_
static TaskScheduler * GetInstance()
void ProcessProtocolConfig(const BgpProtocolConfig *protocol_config, BgpConfigManager::EventType event)
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
virtual bool AcceptSession(TcpSession *session)
static bool CallbackS1(const Sandesh *sr, const RequestPipeline::PipeSpec ps, int stage, int instNum, RequestPipeline::InstData *data)
tbb::mutex endpoint_map_mutex_
void STLDeleteElements(Container *container)
#define CHECK_CONCURRENCY(...)
virtual void InsertConnection(XmppServerConnection *connection)
ConnectionSet deleted_connection_set_
int SetDscpValue(uint8_t value)
virtual void RemoveDeletedConnection(XmppServerConnection *connection)
void ClearAllConnections()
virtual bool MayDelete() const
std::string path_to_server_cert
uint16_t end_of_rib_timeout() const
void ReleaseConnectionEndpoint(XmppServerConnection *connection)
void set_xmpp_hold_time(int hold_time)
size_t ConnectionMapSize() const
boost::scoped_ptr< LifetimeManager > lifetime_manager_
void set_connection(XmppConnection *connection)
BgpGlobalSystemConfig config_
virtual void RemoveConnection(XmppServerConnection *connection)
const std::string & ToString() const
const string subcluster_name() const
void set_disable(bool disabled)
ConnectionMap connection_map_
void set_session(XmppSession *session)
WorkQueue< XmppServerConnection * > connection_queue_
uint8_t xmpp_hold_time() const
const std::string & ToUVEKey() const
void set_session(TcpSession *session)
uint16_t GetGracefulRestartTime() const
uint32_t GetLongLivedGracefulRestartTime() const
#define XMPP_DEBUG(obj,...)
Endpoint remote_endpoint() const
void SwapXmppConnectionMapEntries(XmppConnection *connection1, XmppConnection *connection2)
#define LOG(_Level, _Msg)
boost::shared_ptr< const SandeshRequest > snhRequest_
void clear_on_work_queue()
virtual bool Initialize(unsigned short port)
static bool CallbackS1(const Sandesh *sr, const RequestPipeline::PipeSpec ps, int stage, int instNum, RequestPipeline::InstData *data)
size_t GetConnectionQueueSize() const
XmppServer(EventManager *evm, const std::string &server_addr, const XmppChannelConfig *config)
size_t ConnectionCount() const
void FillShowInfo(ShowXmppConnection *show_connection) const
XmppConnectionEndpoint * FindConnectionEndpoint(const std::string &endpoint_name)
tbb::mutex deletion_mutex_
virtual void OnSessionEvent(TcpSession *session, TcpSession::Event event)
XmppStateMachine * state_machine()
void SetConnectionQueueDisable(bool disabled)
virtual Socket * socket() const
const XmppSession * session() const
size_t ConnectionEventCount() const
bool Enqueue(QueueEntryT entry)
uint32_t llgr_time() const
boost::scoped_ptr< XmppConfigUpdater > xmpp_config_updater_
bool IsGRHelperModeEnabled() const
void RegisterConnectionEvent(xmps::PeerId, ConnectionEventCb)
#define DEFAULT_XMPP_HOLD_TIME
const std::string & subcluster_name() const
std::string path_to_server_priv_key
#define XMPP_WARNING(obj,...)
void CreateConfigUpdater(BgpConfigManager *config_manager)
uint8_t xmpp_hold_time() const
virtual XmppServerConnection * FindConnection(Endpoint remote_endpoint)
virtual TcpSession * CreateSession()