7 #include <boost/date_time/posix_time/posix_time.hpp>
21 #include "sandesh/common/vns_types.h"
22 #include "sandesh/common/vns_constants.h"
23 #include "sandesh/xmpp_client_server_sandesh_types.h"
24 #include "sandesh/xmpp_message_sandesh_types.h"
25 #include "sandesh/xmpp_server_types.h"
26 #include "sandesh/xmpp_state_machine_sandesh_types.h"
27 #include "sandesh/xmpp_trace_sandesh_types.h"
28 #include "sandesh/xmpp_peer_info_types.h"
31 using boost::system::error_code;
38 #define XMPP_CONTROL_MESSAGE_MAX_SIZE 1024
44 endpoint_(config->endpoint),
45 local_endpoint_(config->local_endpoint),
48 *server->event_manager()->io_service(),
49 "Xmpp keepalive timer",
50 TaskScheduler::GetInstance()->GetTaskId(
"xmpp::StateMachine"),
51 GetTaskInstance(config->ClientOnly()))),
52 is_client_(config->ClientOnly()),
53 log_uve_(config->logUVE),
56 from_(config->FromAddr),
58 auth_enabled_(config->auth_enabled),
59 dscp_value_(config->dscp_value), xmlns_(config->xmlns),
61 this, config->ClientOnly(), config->auth_enabled, config->xmpp_hold_time)),
113 boost::system::error_code ec;
114 mux_->WriteReady(ec);
126 return !
mux_->ReceiverCount() && !
mux_->RefererCount();
148 if (address.is_v4()) {
149 return address.to_v4().to_ulong() % thread_count;
201 assert(!peer_info.get_name().empty());
202 XMPPPeerInfo::Send(peer_info);
206 if ((
to_.size() == 0) && (to.size() != 0)) {
209 XmppPeerInfoData peer_info;
211 peer_info.set_identifier(
to_);
227 const string *msg_str) {
237 str.append(reinterpret_cast<const char *>(data), size);
242 (
mux_->TxMessageTrace(endpoint_addr_str, endpoint.port(),
243 size, *msg_str, NULL)))) {
245 endpoint_addr_str, endpoint.port(), size, *msg_str);
263 if (!session)
return false;
264 XmppProto::XmppStanza::XmppStreamMessage openstream;
275 session->
Send(data, len, NULL);
315 "Send Stream Feature Request", len,
from_,
to_);
357 "Send Proceed Tls", len,
from_,
to_);
367 string str(
"</stream:stream>");
369 memcpy(data, str.data(), str.size());
377 const boost::system::error_code& error) {
384 if (error.category() == boost::asio::error::get_ssl_category()) {
385 string err = error.message();
387 +boost::lexical_cast<
string>(ERR_GET_LIB(error.value()))+
","
388 +boost::lexical_cast<
string>(ERR_GET_FUNC(error.value()))+
","
389 +boost::lexical_cast<
string>(ERR_GET_REASON(error.value()))+
") ";
392 ::ERR_error_string_n(error.value(), buf,
sizeof(buf));
401 XMPP_DEBUG(XmppSslHandShakeMessage, session->ToUVEKey(),
408 log4cplus::Logger logger = log4cplus::Logger::getRoot();
409 LOG4CPLUS_DEBUG(logger, msg <<
ToString() <<
" " <<
415 static bool init_ =
false;
416 static bool log_ =
false;
419 char *str = getenv(
"XMPP_ASSERT_ON_HOLD_TIMEOUT");
420 if (str && strtoul(str, NULL, 0) != 0) log_ =
true;
424 if (log_)
LogMsg(
"SEND KEEPALIVE: ");
453 string error_message) {
455 error_name, error_message);
464 if (holdtime_msecs <= 0)
564 session->
IncStats((
unsigned int)minfo->
type, msg.size());
567 (
mux_->RxMessageTrace(session->
568 remote_endpoint().address().to_string(),
570 msg.size(), msg, minfo)))) {
573 remote_endpoint().address().to_string(),
575 remote_endpoint().port(), msg.size(), msg);
580 }
else if ((minfo =
last_msg_.get()) != NULL) {
581 session->
IncStats((
unsigned int)minfo->
type, msg.size());
594 if (minfo.get() == NULL) {
606 if (iq->
action.compare(
"publish") == 0) {
611 if (iq->
action.compare(
"collection") == 0) {
637 return minfo.release();
646 mux_->ProcessXmppMessage(msg);
651 mux_->ProcessXmppMessage(msg);
721 on_work_queue_(false),
722 conn_endpoint_(NULL),
724 server_delete_ref_(this, server->
deleter()) {
773 XmppPeerInfoData peer_info;
775 peer_info.set_close_reason(close_reason);
794 XmppPeerInfoData peer_info;
796 PeerFlapInfo flap_info;
797 flap_info.set_flap_count(conn_endpoint->
flap_count());
798 flap_info.set_flap_time(conn_endpoint->
last_flap());
799 peer_info.set_flap_info(flap_info);
808 ShowXmppConnection *show_connection)
const {
809 show_connection->set_name(
ToString());
810 show_connection->set_deleted(
IsDeleted());
814 show_connection->set_last_event(
LastEvent());
817 show_connection->set_receivers(
channel_mux()->GetReceiverList());
819 show_connection->set_dscp_value(
dscp_value());
863 server_delete_ref_(this, server->
deleter()) {
910 XmppPeerInfoData peer_info;
927 XmppPeerInfoData peer_info;
929 PeerFlapInfo flap_info;
932 peer_info.set_flap_info(flap_info);
941 : client_(client), flap_count_(0), last_flap_(0), connection_(NULL) {
989 server->SwapXmppConnectionMapEntries(
this, other);
const std::string & FromString() const
void SetTo(const std::string &)
virtual void WriteReady()
boost::scoped_ptr< DeleteActor > deleter_
#define XMPP_MESSAGE_TRACE(obj,...)
LifetimeManager * lifetime_manager()
XmppSession * CreateSession()
boost::asio::ip::tcp::endpoint Endpoint
std::string StateName() const
#define XMPP_UTDEBUG(obj,...)
LifetimeManager * lifetime_manager()
#define XMPP_INFO(obj,...)
size_t get_stream_feature_fail()
virtual void DeleteSession(TcpSession *session)
void SetConfig(const XmppChannelConfig *)
tbb::atomic< uint32_t > update
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
virtual void set_close_reason(const std::string &reason)
virtual bool AcceptSession(XmppSession *session)
virtual LifetimeActor * deleter()=0
const std::string last_flap_at() const
virtual const std::string last_flap_at() const
virtual ~XmppServerConnection()
virtual void InsertDeletedConnection(XmppServerConnection *connection)
uint32_t flap_count() const
XmppStreamTlsType strmtlstype
void KeepaliveTimerErrorHanlder(std::string error_name, std::string error_message)
void inc_stream_feature_fail()
tbb::atomic< uint32_t > open
tbb::atomic< uint32_t > close
bool KeepAliveTimerExpired()
const XmppChannelConfig * config_
virtual boost::asio::ip::tcp::endpoint endpoint() const
XmppServerConnection(XmppServer *server, const XmppChannelConfig *config)
std::string endpoint_string() const
void UpdateKeepAliveTimer(uint8_t time_out)
XmppConnectionEndpoint * conn_endpoint()
boost::asio::ip::address IpAddress
size_t get_sm_keepalive_count()
bool on_work_queue() const
virtual TcpSession * CreateSession()
virtual void ManagedDelete()
void inc_handshake_failure()
void set_close_reason(const std::string &close_reason)
virtual const char * ReadNode(const std::string &name)=0
boost::intrusive_ptr< SslSession > SslSessionPtr
#define XMPP_PEER_DIR_OUT
tbb::atomic< uint32_t > keepalive
void StartKeepAliveTimer()
virtual ~XmppClientConnection()
tbb::atomic< uint32_t > open_fail
void increment_flap_count()
boost::scoped_ptr< DeleteActor > deleter_
XmppConnection(TcpServer *server, const XmppChannelConfig *config)
virtual LifetimeActor * deleter()
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
std::unique_ptr< XmppStanza::XmppMessage > last_msg_
virtual ~XmppConnection()
std::string LastStateName() const
XmppStreamMsgType strmtype
void SendClose(XmppSession *session)
virtual bool MayDelete() const
XmppConnectionEndpoint(const std::string &client)
virtual void RetryDelete()
virtual int ModifyAttribute(const std::string &key, const std::string &value)=0
void SetAdminDown(bool toggle)
xmsm::XmOpenConfirmState GetStateMcOpenConfirmState() const
static void XMPPPeerInfoSend(XmppPeerInfoData &peer_info)
virtual void increment_flap_count()
virtual bool SendProceedTls(XmppSession *session)
XmppConnection * connection()
bool Send(const uint8_t *data, size_t size, const std::string *msg_str=NULL)
std::unique_ptr< XmlBase > dom
virtual void ManagedDelete()
virtual LifetimeActor * deleter()
static const std::string integerToString(const NumberType &num)
xmsm::XmOpenConfirmState OpenConfirmStateType() const
virtual LifetimeManager * lifetime_manager()
std::string LastEvent() const
static TaskScheduler * GetInstance()
const std::string & remote_addr_string() const
void RemoveConnection(XmppClientConnection *connection)
boost::asio::ip::tcp::endpoint local_endpoint_
std::string local_endpoint_string() const
XmppClientConnection * parent_
virtual bool SendOpenConfirm(XmppSession *session)
#define CHECK_CONCURRENCY(...)
static boost::posix_time::ptime UTCUsecToPTime(uint64_t tusec)
void IncStats(unsigned int message_type, uint64_t bytes)
int SetDscpValue(uint8_t value)
virtual void RemoveDeletedConnection(XmppServerConnection *connection)
virtual void RetryDelete()
XmppServerConnection * parent_
void StopKeepAliveTimer()
virtual const std::string last_flap_at() const
virtual bool SendStreamFeatureRequest(XmppSession *session)
virtual const char * ReadAttrib(const std::string &str)=0
xmsm::XmState GetStateMcState() const
tbb::atomic< uint32_t > session_close
xmsm::XmState StateType() const
int ProcessXmppChatMessage(const XmppStanza::XmppChatMessage *)
void ReleaseConnectionEndpoint(XmppServerConnection *connection)
virtual uint32_t flap_count() const
virtual void increment_flap_count()
XmppChannelMux * ChannelMux()
static const char * kAuthTypeNil
std::string GetXmppAuthenticationType() const
uint64_t last_flap() const
virtual void ReceiveMsg(XmppSession *session, const std::string &)
size_t get_sm_connect_attempts()
XmppClientConnection(XmppClient *server, const XmppChannelConfig *config)
void set_connection(XmppConnection *connection)
virtual void RemoveConnection(XmppServerConnection *connection)
const std::string & ToString() const
static XmppStanza::XmppMessage * Decode(const XmppConnection *connection, const std::string &ts)
int GetTaskInstance() const
boost::scoped_ptr< XmppChannelMux > mux_
const XmppChannelMux * channel_mux() const
void set_session(XmppSession *session)
std::string close_reason_
void ProcessSslHandShakeResponse(SslSessionPtr session, const boost::system::error_code &error)
#define XMPP_CONTROL_MESSAGE_MAX_SIZE
void OnEvent(SslSession *session, xmsm::SslHandShakeResponse)
size_t get_session_close()
tbb::atomic< uint32_t > stream_feature_fail
void SetConnection(XmppConnection *connection)
int ProcessXmppIqMessage(const XmppStanza::XmppMessage *)
static uint64_t UTCTimestampUsec()
virtual uint32_t flap_count() const
std::string LastStateChangeAt() const
void IncProtoStats(unsigned int type)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
tbb::atomic< uint32_t > handshake_fail
int SetDscpSocketOption(uint8_t value)
#define XMPP_DEBUG(obj,...)
static const char * kAuthTypeTls
virtual void set_close_reason(const std::string &reason)
Endpoint remote_endpoint() const
tbb::spin_mutex spin_mutex_
virtual void ManagedDelete()=0
void NotifyConnectionEvent(XmppChannelMux *, xmps::PeerState)
static int EncodeStream(const XmppStreamMessage &str, std::string &to, std::string &from, const std::string &xmlns, uint8_t *data, size_t size)
size_t get_connect_error()
boost::scoped_ptr< XmppStateMachine > state_machine_
DeleteActor(XmppClient *client, XmppClientConnection *parent)
void FillShowInfo(ShowXmppConnection *show_connection) const
virtual bool SendOpen(XmppSession *session)
DeleteActor(XmppServer *server, XmppServerConnection *parent)
std::string close_reason_
XmppConnectionEndpoint * FindConnectionEndpoint(const std::string &endpoint_name)
XmppStateMachine * state_machine()
boost::asio::ip::tcp::endpoint endpoint_
void LogMsg(std::string msg)
void SwapContents(XmppConnection *other)
const XmppSession * session() const
virtual boost::asio::ip::tcp::endpoint local_endpoint() const
virtual bool MayDelete() const
#define XMPP_ALERT(obj,...)
tbb::atomic< uint32_t > connect_error
const std::string & ToUVEKey() const
virtual bool SendStartTls(XmppSession *session)
uint8_t dscp_value() const
XmppConnection * connection_
#define XMPP_ERROR(obj,...)
XmppConnectionEndpoint * conn_endpoint_
int HardwareThreadCount()
size_t get_handshake_failure()
XmppStanza::XmppMessage * XmppDecode(const std::string &msg)
virtual LifetimeManager * lifetime_manager()
#define XMPP_WARNING(obj,...)
static bool DeleteTimer(Timer *Timer)