7 #include <boost/date_time/posix_time/posix_time.hpp>
21 #include "sandesh/common/vns_types.h"
22 #include "sandesh/common/vns_constants.h"
23 #include "sandesh/xmpp_client_server_sandesh_types.h"
24 #include "sandesh/xmpp_message_sandesh_types.h"
25 #include "sandesh/xmpp_server_types.h"
26 #include "sandesh/xmpp_state_machine_sandesh_types.h"
27 #include "sandesh/xmpp_trace_sandesh_types.h"
28 #include "sandesh/xmpp_peer_info_types.h"
31 using boost::system::error_code;
38 #define XMPP_CONTROL_MESSAGE_MAX_SIZE 1024
44 endpoint_(config->endpoint),
45 local_endpoint_(config->local_endpoint),
48 *server->event_manager()->io_service(),
49 "Xmpp keepalive timer",
50 TaskScheduler::GetInstance()->GetTaskId(
"xmpp::StateMachine"),
51 GetTaskInstance(config->ClientOnly()))),
52 is_client_(config->ClientOnly()),
53 log_uve_(config->logUVE),
56 from_(config->FromAddr),
58 auth_enabled_(config->auth_enabled),
59 dscp_value_(config->dscp_value), xmlns_(config->xmlns),
61 this, config->ClientOnly(), config->auth_enabled, config->xmpp_hold_time)),
113 boost::system::error_code ec;
114 mux_->WriteReady(ec);
126 return !
mux_->ReceiverCount() && !
mux_->RefererCount();
148 if (address.is_v4()) {
149 return address.to_v4().to_ulong() % thread_count;
201 assert(!peer_info.get_name().empty());
202 XMPPPeerInfo::Send(peer_info);
206 if ((
to_.size() == 0) && (to.size() != 0)) {
209 XmppPeerInfoData peer_info;
211 peer_info.set_identifier(
to_);
227 const string *msg_str) {
237 str.append(
reinterpret_cast<const char *
>(data), size);
242 (
mux_->TxMessageTrace(endpoint_addr_str,
endpoint.port(),
243 size, *msg_str, NULL)))) {
245 endpoint_addr_str,
endpoint.port(), size, *msg_str);
264 XmppProto::XmppStanza::XmppStreamMessage openstream;
315 "Send Stream Feature Request", len,
from_,
to_);
357 "Send Proceed Tls", len,
from_,
to_);
367 string str(
"</stream:stream>");
369 memcpy(data, str.data(), str.size());
377 const boost::system::error_code& error) {
384 if (error.category() == boost::asio::error::get_ssl_category()) {
385 string err = error.message();
387 +boost::lexical_cast<string>(ERR_GET_LIB(error.value()))+
","
388 +boost::lexical_cast<string>(ERR_GET_REASON(error.value()))+
") ";
391 ::ERR_error_string_n(error.value(), buf,
sizeof(buf));
407 log4cplus::Logger logger = log4cplus::Logger::getRoot();
408 LOG4CPLUS_DEBUG(logger, msg <<
ToString() <<
" " <<
414 static bool init_ =
false;
415 static bool log_ =
false;
418 char *str = getenv(
"XMPP_ASSERT_ON_HOLD_TIMEOUT");
419 if (str && strtoul(str, NULL, 0) != 0) log_ =
true;
423 if (log_)
LogMsg(
"SEND KEEPALIVE: ");
452 string error_message) {
454 error_name, error_message);
463 if (holdtime_msecs <= 0)
567 remote_endpoint().address().to_string(),
569 msg.size(), msg, minfo)))) {
572 remote_endpoint().address().to_string(),
574 remote_endpoint().port(), msg.size(), msg);
579 }
else if ((minfo =
last_msg_.get()) != NULL) {
593 if (minfo.get() == NULL) {
605 if (iq->
action.compare(
"publish") == 0) {
610 if (iq->
action.compare(
"collection") == 0) {
636 return minfo.release();
645 mux_->ProcessXmppMessage(msg);
650 mux_->ProcessXmppMessage(msg);
720 on_work_queue_(false),
721 conn_endpoint_(NULL),
723 server_delete_ref_(this, server->
deleter()) {
772 XmppPeerInfoData peer_info;
774 peer_info.set_close_reason(close_reason);
793 XmppPeerInfoData peer_info;
795 PeerFlapInfo flap_info;
798 peer_info.set_flap_info(flap_info);
807 ShowXmppConnection *show_connection)
const {
808 show_connection->set_name(
ToString());
809 show_connection->set_deleted(
IsDeleted());
813 show_connection->set_last_event(
LastEvent());
816 show_connection->set_receivers(
channel_mux()->GetReceiverList());
818 show_connection->set_dscp_value(
dscp_value());
862 server_delete_ref_(this, server->
deleter()) {
909 XmppPeerInfoData peer_info;
926 XmppPeerInfoData peer_info;
928 PeerFlapInfo flap_info;
931 peer_info.set_flap_info(flap_info);
940 : client_(client), flap_count_(0), last_flap_(0), connection_(NULL) {
988 server->SwapXmppConnectionMapEntries(
this, other);
boost::asio::ip::address IpAddress
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
static TaskScheduler * GetInstance()
int HardwareThreadCount()
virtual TcpSession * CreateSession()
virtual void DeleteSession(TcpSession *session)
const std::string & ToUVEKey() const
const std::string & remote_addr_string() const
boost::asio::ip::tcp::endpoint Endpoint
Endpoint remote_endpoint() const
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
int SetDscpSocketOption(uint8_t value)
static bool DeleteTimer(Timer *Timer)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
virtual int ModifyAttribute(const std::string &key, const std::string &value)=0
virtual const char * ReadAttrib(const std::string &str)=0
virtual const char * ReadNode(const std::string &name)=0
DeleteActor(XmppClient *client, XmppClientConnection *parent)
virtual bool MayDelete() const
XmppClientConnection * parent_
virtual ~XmppClientConnection()
virtual void set_close_reason(const std::string &reason)
virtual void ManagedDelete()
virtual void increment_flap_count()
virtual const std::string last_flap_at() const
boost::scoped_ptr< DeleteActor > deleter_
virtual LifetimeManager * lifetime_manager()
virtual LifetimeActor * deleter()
XmppClientConnection(XmppClient *server, const XmppChannelConfig *config)
std::string close_reason_
virtual uint32_t flap_count() const
virtual void RetryDelete()
void NotifyConnectionEvent(XmppChannelMux *, xmps::PeerState)
void RemoveConnection(XmppClientConnection *connection)
LifetimeManager * lifetime_manager()
void set_connection(XmppConnection *connection)
XmppConnectionEndpoint(const std::string &client)
void increment_flap_count()
void set_close_reason(const std::string &close_reason)
uint32_t flap_count() const
XmppConnection * connection()
XmppConnection * connection_
uint64_t last_flap() const
std::string close_reason_
const std::string last_flap_at() const
boost::asio::ip::tcp::endpoint local_endpoint_
virtual LifetimeActor * deleter()=0
bool KeepAliveTimerExpired()
bool Send(const uint8_t *data, size_t size, const std::string *msg_str=NULL)
const XmppSession * session() const
std::string LastEvent() const
tbb::spin_mutex spin_mutex_
void StopKeepAliveTimer()
std::string local_endpoint_string() const
int GetTaskInstance() const
std::string GetXmppAuthenticationType() const
XmppChannelMux * ChannelMux()
const XmppChannelMux * channel_mux() const
virtual bool SendStreamFeatureRequest(XmppSession *session)
int ProcessXmppIqMessage(const XmppStanza::XmppMessage *)
static const char * kAuthTypeNil
size_t get_connect_error()
XmppSession * CreateSession()
void inc_handshake_failure()
void SwapContents(XmppConnection *other)
size_t get_sm_connect_attempts()
virtual boost::asio::ip::tcp::endpoint local_endpoint() const
virtual void ReceiveMsg(XmppSession *session, const std::string &)
virtual ~XmppConnection()
virtual bool SendOpen(XmppSession *session)
virtual void ManagedDelete()=0
xmsm::XmOpenConfirmState GetStateMcOpenConfirmState() const
virtual bool SendStartTls(XmppSession *session)
virtual boost::asio::ip::tcp::endpoint endpoint() const
void UpdateKeepAliveTimer(uint8_t time_out)
void KeepaliveTimerErrorHanlder(std::string error_name, std::string error_message)
virtual bool SendProceedTls(XmppSession *session)
std::string StateName() const
void SetAdminDown(bool toggle)
XmppConnection(TcpServer *server, const XmppChannelConfig *config)
int ProcessXmppChatMessage(const XmppStanza::XmppChatMessage *)
std::string LastStateName() const
void SetConfig(const XmppChannelConfig *)
const std::string & FromString() const
XmppStanza::XmppMessage * XmppDecode(const std::string &msg)
boost::scoped_ptr< XmppStateMachine > state_machine_
virtual bool AcceptSession(XmppSession *session)
void SetTo(const std::string &)
void StartKeepAliveTimer()
int SetDscpValue(uint8_t value)
const XmppChannelConfig * config_
size_t get_stream_feature_fail()
std::unique_ptr< XmppStanza::XmppMessage > last_msg_
boost::scoped_ptr< XmppChannelMux > mux_
void inc_stream_feature_fail()
size_t get_session_close()
size_t get_handshake_failure()
void IncProtoStats(unsigned int type)
void SendClose(XmppSession *session)
virtual void WriteReady()
XmppStateMachine * state_machine()
static const char * kAuthTypeTls
virtual bool SendOpenConfirm(XmppSession *session)
xmsm::XmState GetStateMcState() const
std::string endpoint_string() const
void LogMsg(std::string msg)
std::string LastStateChangeAt() const
void ProcessSslHandShakeResponse(SslSessionPtr session, const boost::system::error_code &error)
const std::string & ToString() const
void set_session(XmppSession *session)
uint8_t dscp_value() const
boost::asio::ip::tcp::endpoint endpoint_
const std::string & ToUVEKey() const
size_t get_sm_keepalive_count()
static XmppStanza::XmppMessage * Decode(const XmppConnection *connection, const std::string &ts)
static int EncodeStream(const XmppStreamMessage &str, std::string &to, std::string &from, const std::string &xmlns, uint8_t *data, size_t size)
DeleteActor(XmppServer *server, XmppServerConnection *parent)
virtual bool MayDelete() const
XmppServerConnection * parent_
virtual void ManagedDelete()
virtual uint32_t flap_count() const
bool on_work_queue() const
XmppConnectionEndpoint * conn_endpoint()
XmppServerConnection(XmppServer *server, const XmppChannelConfig *config)
virtual void increment_flap_count()
virtual LifetimeManager * lifetime_manager()
virtual const std::string last_flap_at() const
boost::scoped_ptr< DeleteActor > deleter_
virtual LifetimeActor * deleter()
XmppConnectionEndpoint * conn_endpoint_
virtual void RetryDelete()
virtual void set_close_reason(const std::string &reason)
void FillShowInfo(ShowXmppConnection *show_connection) const
virtual ~XmppServerConnection()
LifetimeManager * lifetime_manager()
XmppConnectionEndpoint * FindConnectionEndpoint(const std::string &endpoint_name)
virtual void RemoveDeletedConnection(XmppServerConnection *connection)
virtual void RemoveConnection(XmppServerConnection *connection)
void ReleaseConnectionEndpoint(XmppServerConnection *connection)
virtual void InsertDeletedConnection(XmppServerConnection *connection)
void IncStats(unsigned int message_type, uint64_t bytes)
void SetConnection(XmppConnection *connection)
std::unique_ptr< XmlBase > dom
@ WHITESPACE_MESSAGE_STANZA
xmsm::XmOpenConfirmState OpenConfirmStateType() const
xmsm::XmState StateType() const
void OnEvent(SslSession *session, xmsm::SslHandShakeResponse)
boost::intrusive_ptr< SslSession > SslSessionPtr
static const std::string integerToString(const NumberType &num)
tbb::atomic< uint32_t > handshake_fail
tbb::atomic< uint32_t > connect_error
tbb::atomic< uint32_t > session_close
tbb::atomic< uint32_t > stream_feature_fail
tbb::atomic< uint32_t > open_fail
tbb::atomic< uint32_t > keepalive
tbb::atomic< uint32_t > open
tbb::atomic< uint32_t > close
tbb::atomic< uint32_t > update
@ INIT_STREAM_HEADER_RESP
XmppStreamTlsType strmtlstype
XmppStreamMsgType strmtype
#define CHECK_CONCURRENCY(...)
static boost::posix_time::ptime UTCUsecToPTime(uint64_t tusec)
static uint64_t UTCTimestampUsec()
#define XMPP_CONTROL_MESSAGE_MAX_SIZE
static void XMPPPeerInfoSend(XmppPeerInfoData &peer_info)
#define XMPP_ALERT(obj,...)
#define XMPP_PEER_DIR_OUT
#define XMPP_UTDEBUG(obj,...)
#define XMPP_WARNING(obj,...)
#define XMPP_DEBUG(obj,...)
#define XMPP_INFO(obj,...)
#define XMPP_MESSAGE_TRACE(obj,...)
#define XMPP_ERROR(obj,...)