OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
xmpp_connection.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "xmpp/xmpp_connection.h"
6 
7 #include <boost/date_time/posix_time/posix_time.hpp>
8 #include <sstream>
9 
10 #include "base/lifetime.h"
11 #include "base/task_annotations.h"
12 #include "io/event_manager.h"
13 #include "xml/xml_base.h"
14 #include "xmpp/xmpp_client.h"
15 #include "xmpp/xmpp_config.h"
16 #include "xmpp/xmpp_factory.h"
17 #include "xmpp/xmpp_log.h"
18 #include "xmpp/xmpp_server.h"
19 #include "xmpp/xmpp_session.h"
20 
21 #include "sandesh/common/vns_types.h"
22 #include "sandesh/common/vns_constants.h"
23 #include "sandesh/xmpp_client_server_sandesh_types.h"
24 #include "sandesh/xmpp_message_sandesh_types.h"
25 #include "sandesh/xmpp_server_types.h"
26 #include "sandesh/xmpp_state_machine_sandesh_types.h"
27 #include "sandesh/xmpp_trace_sandesh_types.h"
28 #include "sandesh/xmpp_peer_info_types.h"
29 
30 using namespace std;
31 using boost::system::error_code;
32 
33 const char *XmppConnection::kAuthTypeNil = "NIL";
34 const char *XmppConnection::kAuthTypeTls = "TLS";
35 
36 // Maximum XMPP control message size. Typically this is less then 256 bytes. But
37 // in scenarios where host names are quite long, we need a larger buffer size.
38 #define XMPP_CONTROL_MESSAGE_MAX_SIZE 1024
39 
41  const XmppChannelConfig *config)
42  : server_(server),
43  session_(NULL),
44  endpoint_(config->endpoint),
45  local_endpoint_(config->local_endpoint),
46  config_(NULL),
47  keepalive_timer_(TimerManager::CreateTimer(
48  *server->event_manager()->io_service(),
49  "Xmpp keepalive timer",
50  TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"),
51  GetTaskInstance(config->ClientOnly()))),
52  is_client_(config->ClientOnly()),
53  log_uve_(config->logUVE),
54  admin_down_(false),
55  disable_read_(false),
56  from_(config->FromAddr),
57  to_(config->ToAddr),
58  auth_enabled_(config->auth_enabled),
59  dscp_value_(config->dscp_value), xmlns_(config->xmlns),
60  state_machine_(XmppStaticObjectFactory::Create<XmppStateMachine>(
61  this, config->ClientOnly(), config->auth_enabled, config->xmpp_hold_time)),
62  mux_(XmppStaticObjectFactory::Create<XmppChannelMux>(this)) {
63  ostringstream oss;
64  oss << FromString() << ":" << endpoint().address().to_string();
65  uve_key_str_ = oss.str();
66 }
67 
71  XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
72  "XmppConnection destructor", FromString(), ToString());
73 }
74 
76  if (auth_enabled_) {
78  } else {
80  }
81 }
82 
84  config_ = config;
85 }
86 
88  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
89  assert(session);
90  session_ = session;
91  if (session_ && dscp_value_) {
93  }
94 }
95 
97  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
98  if (!session_)
99  return;
101  session_ = NULL;
102 }
103 
105  return session_;
106 }
107 
109  return session_;
110 }
111 
113  boost::system::error_code ec;
114  mux_->WriteReady(ec);
115 }
116 
118  ManagedDelete();
119 }
120 
122  return deleter()->IsDeleted();
123 }
124 
126  return !mux_->ReceiverCount() && !mux_->RefererCount();
127 }
128 
131  XmppSession *xmpp_session = static_cast<XmppSession *>(session);
132  xmpp_session->SetConnection(this);
133  return xmpp_session;
134 }
135 
136 //
137 // Return the task instance for this XmppConnection.
138 // Calculate from the remote IpAddress so that a restarting session uses the
139 // same value as before.
140 // Do not make this method virtual since it gets called from the constructor.
141 //
142 
143 int XmppConnection::GetTaskInstance(bool is_client) const {
144  if (is_client)
145  return 0;
146  IpAddress address = endpoint().address();
147  int thread_count = TaskScheduler::GetInstance()->HardwareThreadCount();
148  if (address.is_v4()) {
149  return address.to_v4().to_ulong() % thread_count;
150  } else {
151  return 0;
152  }
153 }
154 
156  const XmppStateMachine *sm = state_machine();
157  assert(sm);
158  return sm->StateType();
159 }
160 
162  const XmppStateMachine *sm = state_machine();
163  assert(sm);
164  return sm->OpenConfirmStateType();
165 }
166 
167 
168 boost::asio::ip::tcp::endpoint XmppConnection::endpoint() const {
169  return endpoint_;
170 }
171 
172 boost::asio::ip::tcp::endpoint XmppConnection::local_endpoint() const {
173  return local_endpoint_;
174 }
175 
177  ostringstream oss;
178  oss << endpoint_;
179  return oss.str();
180 }
181 
183  ostringstream oss;
184  oss << local_endpoint_;
185  return oss.str();
186 }
187 
188 const string &XmppConnection::FromString() const {
189  return from_;
190 }
191 
192 const string &XmppConnection::ToString() const {
193  return to_;
194 }
195 
196 const std::string &XmppConnection::ToUVEKey() const {
197  return uve_key_str_;
198 }
199 
200 static void XMPPPeerInfoSend(XmppPeerInfoData &peer_info) {
201  assert(!peer_info.get_name().empty());
202  XMPPPeerInfo::Send(peer_info);
203 }
204 
205 void XmppConnection::SetTo(const string &to) {
206  if ((to_.size() == 0) && (to.size() != 0)) {
207  to_ = to;
208  if (!logUVE()) return;
209  XmppPeerInfoData peer_info;
210  peer_info.set_name(ToUVEKey());
211  peer_info.set_identifier(to_);
212  XMPPPeerInfoSend(peer_info);
213  }
214 }
215 
216 void XmppConnection::SetAdminDown(bool toggle) {
217  // TODO: generate state machine event.
218  admin_down_ = toggle;
219 }
220 
222  session->SetConnection(this);
223  return state_machine_->PassiveOpen(session);
224 }
225 
226 bool XmppConnection::Send(const uint8_t *data, size_t size,
227  const string *msg_str) {
228  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
229  if (session_ == NULL) {
230  return false;
231  }
232 
234  const string &endpoint_addr_str = session_->remote_addr_string();
235  string str;
236  if (!msg_str) {
237  str.append(reinterpret_cast<const char *>(data), size);
238  msg_str = &str;
239  }
240 
241  if (!(mux_ &&
242  (mux_->TxMessageTrace(endpoint_addr_str, endpoint.port(),
243  size, *msg_str, NULL)))) {
244  XMPP_MESSAGE_TRACE(XmppTxStream,
245  endpoint_addr_str, endpoint.port(), size, *msg_str);
246  }
247 
248  stats_[1].update++;
249  size_t sent;
250  return session_->Send(data, size, &sent);
251 }
252 
253 int XmppConnection::SetDscpValue(uint8_t value) {
254  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
255  dscp_value_ = value;
256  if (!session_) {
257  return 0;
258  }
259  return session_->SetDscpSocketOption(value);
260 }
261 
263  if (!session) return false;
264  XmppProto::XmppStanza::XmppStreamMessage openstream;
266  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
267  int len = XmppProto::EncodeStream(openstream, to_, from_, xmlns_, data,
268  sizeof(data));
269  if (len <= 0) {
270  inc_open_fail();
271  return false;
272  } else {
273  XMPP_UTDEBUG(XmppOpen, ToUVEKey(), XMPP_PEER_DIR_OUT, len, from_, to_,
274  xmlns_);
275  session->Send(data, len, NULL);
276  stats_[1].open++;
277  return true;
278  }
279 }
280 
282  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
283  if (!session_) return false;
286  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
287  int len = XmppProto::EncodeStream(openstream, to_, from_, xmlns_, data,
288  sizeof(data));
289  if (len <= 0) {
290  inc_open_fail();
291  return false;
292  } else {
293  XMPP_UTDEBUG(XmppOpenConfirm, ToUVEKey(), XMPP_PEER_DIR_OUT, len,
294  from_, to_);
295  session_->Send(data, len, NULL);
296  stats_[1].open++;
297  return true;
298  }
299 }
300 
302  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
303  if (!session_) return false;
304  XmppStanza::XmppStreamMessage featurestream;
307  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
308  int len = XmppProto::EncodeStream(featurestream, to_, from_, xmlns_, data,
309  sizeof(data));
310  if (len <= 0) {
312  return false;
313  } else {
314  XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
315  "Send Stream Feature Request", len, from_, to_);
316  session_->Send(data, len, NULL);
317  //stats_[1].open++;
318  return true;
319  }
320 }
321 
323  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
324  if (!session_) return false;
328  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
329  int len = XmppProto::EncodeStream(stream, to_, from_, xmlns_, data,
330  sizeof(data));
331  if (len <= 0) {
333  return false;
334  } else {
335  XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
336  "Send Start Tls", len, from_, to_);
337  session_->Send(data, len, NULL);
338  //stats_[1].open++;
339  return true;
340  }
341 }
342 
344  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
345  if (!session_) return false;
349  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
350  int len = XmppProto::EncodeStream(stream, to_, from_, xmlns_, data,
351  sizeof(data));
352  if (len <= 0) {
354  return false;
355  } else {
356  XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
357  "Send Proceed Tls", len, from_, to_);
358  session_->Send(data, len, NULL);
359  //stats_[1].open++;
360  return true;
361  }
362 }
363 
365  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
366  if (!session_) return;
367  string str("</stream:stream>");
368  uint8_t data[64];
369  memcpy(data, str.data(), str.size());
370  XMPP_UTDEBUG(XmppClose, ToUVEKey(), XMPP_PEER_DIR_OUT, str.size(), from_,
371  to_);
372  session_->Send(data, str.size(), NULL);
373  stats_[1].close++;
374 }
375 
377  const boost::system::error_code& error) {
378  if (!state_machine())
379  return;
380 
381  if (error) {
383 
384  if (error.category() == boost::asio::error::get_ssl_category()) {
385  string err = error.message();
386  err = string(" (")
387  +boost::lexical_cast<string>(ERR_GET_LIB(error.value()))+","
388  +boost::lexical_cast<string>(ERR_GET_REASON(error.value()))+") ";
389 
390  char buf[128];
391  ::ERR_error_string_n(error.value(), buf, sizeof(buf));
392  err += buf;
393  XMPP_ALERT(XmppSslHandShakeFailure, ToUVEKey(), XMPP_PEER_DIR_IN,
394  "failure", err);
395  }
396 
398 
399  } else {
400  XMPP_DEBUG(XmppSslHandShakeMessage, session->ToUVEKey(),
401  XMPP_PEER_DIR_IN, "success", "");
403  }
404 }
405 
406 void XmppConnection::LogMsg(std::string msg) {
407  log4cplus::Logger logger = log4cplus::Logger::getRoot();
408  LOG4CPLUS_DEBUG(logger, msg << ToString() << " " <<
409  local_endpoint_.address() << ":" << local_endpoint_.port() << "::" <<
410  endpoint_.address() << ":" << endpoint_.port());
411 }
412 
414  static bool init_ = false;
415  static bool log_ = false;
416 
417  if (!init_) {
418  char *str = getenv("XMPP_ASSERT_ON_HOLD_TIMEOUT");
419  if (str && strtoul(str, NULL, 0) != 0) log_ = true;
420  init_ = true;
421  }
422 
423  if (log_) LogMsg("SEND KEEPALIVE: ");
424 }
425 
427  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
428  if (!session_) return;
430  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
431  int len = XmppProto::EncodeStream(msg, data, sizeof(data));
432  assert(len > 0);
433  session_->Send(data, len, NULL);
434  stats_[1].keepalive++;
436 }
437 
439  if (state_machine_->get_state() != xmsm::ESTABLISHED)
440  return false;
441 
442  // TODO: check timestamp of last received packet.
443  SendKeepAlive();
444 
445  //
446  // Start the timer again, by returning true
447  //
448  return true;
449 }
450 
452  string error_message) {
453  XMPP_WARNING(XmppKeepaliveTimeError, ToUVEKey(), XMPP_PEER_DIR_NA,
454  error_name, error_message);
455 }
456 
458  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
459  if (!session_)
460  return;
461 
462  int holdtime_msecs = state_machine_->hold_time_msecs();
463  if (holdtime_msecs <= 0)
464  return;
465 
466  keepalive_timer_->Start(holdtime_msecs / 3,
467  boost::bind(&XmppConnection::KeepAliveTimerExpired, this),
468  boost::bind(&XmppConnection::KeepaliveTimerErrorHanlder, this, _1, _2));
469 }
470 
472  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
474 }
475 
476 void XmppConnection::UpdateKeepAliveTimer(uint8_t time_out) {
477  state_machine_->set_hold_time(time_out);
480 }
481 
483  return state_machine_.get();
484 }
485 
487  return state_machine_.get();
488 }
489 
491  return mux_.get();
492 }
493 
495  switch (type) {
497  stats_[0].open++;
498  break;
500  stats_[0].keepalive++;
501  break;
503  stats_[0].update++;
504  break;
506  stats_[0].update++;
507  break;
508  }
509 }
510 
513 }
514 
517 }
518 
521 }
522 
525 }
526 
529 }
530 
533 }
534 
537 }
538 
540  return error_stats_.open_fail;
541 }
542 
545 }
546 
549 }
550 
552  return state_machine_->get_connect_attempts();
553 }
554 
556  return state_machine_->get_keepalive_count();
557 }
558 
559 void XmppConnection::ReceiveMsg(XmppSession *session, const string &msg) {
560  XmppStanza::XmppMessage *minfo = XmppDecode(msg);
561 
562  if (minfo) {
563  session->IncStats((unsigned int)minfo->type, msg.size());
565  if (!(mux_ &&
566  (mux_->RxMessageTrace(session->
567  remote_endpoint().address().to_string(),
568  session->remote_endpoint().port(),
569  msg.size(), msg, minfo)))) {
570  XMPP_MESSAGE_TRACE(XmppRxStream,
571  session->
572  remote_endpoint().address().to_string(),
573  session->
574  remote_endpoint().port(), msg.size(), msg);
575  }
576  }
577  IncProtoStats((unsigned int)minfo->type);
578  state_machine_->OnMessage(session, minfo);
579  } else if ((minfo = last_msg_.get()) != NULL) {
580  session->IncStats((unsigned int)minfo->type, msg.size());
581  IncProtoStats((unsigned int)minfo->type);
582  } else {
583  session->IncStats(XmppStanza::INVALID, msg.size());
584  XMPP_MESSAGE_TRACE(XmppRxStreamInvalid,
585  session->remote_endpoint().address().to_string(),
586  session->remote_endpoint().port(), msg.size(), msg);
587  }
588  return;
589 }
590 
592  unique_ptr<XmppStanza::XmppMessage> minfo(XmppProto::Decode(this, msg));
593  if (minfo.get() == NULL) {
594  XMPP_INFO(XmppSessionDelete, ToUVEKey(), XMPP_PEER_DIR_IN, "Server",
595  FromString(), ToString());
596  Clear();
597  return NULL;
598  }
599 
600  if (minfo->type == XmppStanza::IQ_STANZA) {
601  const XmppStanza::XmppMessageIq *iq =
602  static_cast<const XmppStanza::XmppMessageIq *>(minfo.get());
603 
604 
605  if (iq->action.compare("publish") == 0) {
606  last_msg_.reset(minfo.release());
607  return NULL;
608  }
609 
610  if (iq->action.compare("collection") == 0) {
611  if (last_msg_.get() != NULL) {
612  XmppStanza::XmppMessageIq *last_iq =
613  static_cast<XmppStanza::XmppMessageIq *>(last_msg_.get());
614 
615  if (last_iq->node.compare(iq->as_node) == 0) {
616  XmlBase *impl = last_iq->dom.get();
617  impl->ReadNode("publish");
618  impl->ModifyAttribute("node", iq->node);
619  last_iq->node = impl->ReadAttrib("node");
620  last_iq->is_as_node = iq->is_as_node;
621  //Save the complete ass/dissociate node
622  last_iq->as_node = iq->as_node;
623  } else {
624  XMPP_WARNING(XmppIqMessageInvalid, ToUVEKey(),
626  goto error;
627  }
628  } else {
629  XMPP_ERROR(XmppIqCollectionError, ToUVEKey(), XMPP_PEER_DIR_IN);
630  goto error;
631  }
632  // iq message merged with collection info
633  return last_msg_.release();
634  }
635  }
636  return minfo.release();
637 
638 error:
639  last_msg_.reset();
640  return NULL;
641 }
642 
644  const XmppStanza::XmppChatMessage *msg) {
645  mux_->ProcessXmppMessage(msg);
646  return 0;
647 }
648 
650  mux_->ProcessXmppMessage(msg);
651  return 0;
652 }
653 
655 public:
657  : LifetimeActor(server->lifetime_manager()),
658  server_(server), parent_(parent) {
659  }
660 
661  virtual bool MayDelete() const {
662  return (!parent_->on_work_queue() && parent_->MayDelete());
663  }
664 
665  virtual void Shutdown() {
666  CHECK_CONCURRENCY("bgp::Config");
667 
668  // If the connection is still on the WorkQueue, simply add it to the
669  // ConnectionSet. It won't be on the ConnectionMap.
670  if (parent_->on_work_queue()) {
672  }
673 
674  // If the connection was rejected as duplicate, it will already be in
675  // the ConnectionSet. Non-duplicate connections need to be moved from
676  // from the ConnectionMap into the ConnectionSet. We add it to the
677  // ConnectionSet and then remove it from ConnectionMap to ensure that
678  // the XmppServer connection count doesn't temporarily become 0. This
679  // is friendly to tests that wait for the XmppServer connection count
680  // to become 0.
681  //
682  // Breaking association with the XmppConnectionEndpoint here allows
683  // a new XmppServerConnection with the same Endpoint to come up. We
684  // may end up leaking memory if current XmppServerConnection doesn't
685  // get cleaned up completely, but we at least prevent the other end
686  // from getting stuck forever.
687  else if (!parent_->duplicate()) {
691  }
692 
693  if (parent_->state_machine()) {
695  }
696 
697  XmppSession *session = NULL;
698  if (parent_->state_machine()) {
699  session = parent_->state_machine()->session();
701  }
702  if (session) {
703  server_->DeleteSession(session);
704  }
705  }
706 
707  virtual void Destroy() {
708  delete parent_;
709  }
710 
711 private:
714 };
715 
717  const XmppChannelConfig *config)
718  : XmppConnection(server, config),
719  duplicate_(false),
720  on_work_queue_(false),
721  conn_endpoint_(NULL),
722  deleter_(new DeleteActor(server, this)),
723  server_delete_ref_(this, server->deleter()) {
724  assert(!config->ClientOnly());
725  XMPP_INFO(XmppConnectionCreate, ToUVEKey(), XMPP_PEER_DIR_IN,
726  "Server", FromString(), ToString());
727 }
728 
730  CHECK_CONCURRENCY("bgp::Config");
731 
732  XMPP_INFO(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA, "Server",
733  FromString(), ToString());
735 }
736 
738  XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
739  "Managed server connection delete", FromString(), ToString());
740  deleter_->Delete();
741 }
742 
744  if (!deleter()->IsDeleted())
745  return;
746  deleter()->RetryDelete();
747 }
748 
750  return server()->lifetime_manager();
751 }
752 
754  return static_cast<XmppServer *>(server_);
755 }
756 
758  return deleter_.get();
759 }
760 
762  return deleter_.get();
763 }
764 
765 void XmppServerConnection::set_close_reason(const string &close_reason) {
766  if (conn_endpoint_)
767  conn_endpoint_->set_close_reason(close_reason);
768 
769  if (!logUVE())
770  return;
771 
772  XmppPeerInfoData peer_info;
773  peer_info.set_name(ToUVEKey());
774  peer_info.set_close_reason(close_reason);
775  XMPPPeerInfoSend(peer_info);
776 }
777 
779  return conn_endpoint_ ? conn_endpoint_->flap_count() : 0;
780 }
781 
784  if (!conn_endpoint)
785  conn_endpoint = server()->FindConnectionEndpoint(ToString());
786  if (!conn_endpoint)
787  return;
788  conn_endpoint->increment_flap_count();
789 
790  if (!logUVE())
791  return;
792 
793  XmppPeerInfoData peer_info;
794  peer_info.set_name(ToUVEKey());
795  PeerFlapInfo flap_info;
796  flap_info.set_flap_count(conn_endpoint->flap_count());
797  flap_info.set_flap_time(conn_endpoint->last_flap());
798  peer_info.set_flap_info(flap_info);
799  XMPPPeerInfoSend(peer_info);
800 }
801 
802 const std::string XmppServerConnection::last_flap_at() const {
803  return conn_endpoint_ ? conn_endpoint_->last_flap_at() : "";
804 }
805 
807  ShowXmppConnection *show_connection) const {
808  show_connection->set_name(ToString());
809  show_connection->set_deleted(IsDeleted());
810  show_connection->set_remote_endpoint(endpoint_string());
811  show_connection->set_local_endpoint(local_endpoint_string());
812  show_connection->set_state(StateName());
813  show_connection->set_last_event(LastEvent());
814  show_connection->set_last_state(LastStateName());
815  show_connection->set_last_state_at(LastStateChangeAt());
816  show_connection->set_receivers(channel_mux()->GetReceiverList());
817  show_connection->set_server_auth_type(GetXmppAuthenticationType());
818  show_connection->set_dscp_value(dscp_value());
819 }
820 
822 public:
824  : LifetimeActor(client->lifetime_manager()),
825  client_(client), parent_(parent) {
826  }
827 
828  virtual bool MayDelete() const {
829  return parent_->MayDelete();
830  }
831 
832  virtual void Shutdown() {
833  if (parent_->session()) {
836  }
837 
838  XmppSession *session = NULL;
839  if (parent_->state_machine()) {
840  session = parent_->state_machine()->session();
842  }
843  if (session) {
844  client_->DeleteSession(session);
845  }
846  }
847 
848  virtual void Destroy() {
849  delete parent_;
850  }
851 
852 private:
855 };
856 
858  const XmppChannelConfig *config)
859  : XmppConnection(server, config),
860  flap_count_(0),
861  deleter_(new DeleteActor(server, this)),
862  server_delete_ref_(this, server->deleter()) {
863  assert(config->ClientOnly());
864  XMPP_UTDEBUG(XmppConnectionCreate, ToUVEKey(), XMPP_PEER_DIR_NA, "Client",
865  FromString(), ToString());
866 }
867 
869  CHECK_CONCURRENCY("bgp::Config");
870 
871  XMPP_INFO(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
872  "Client", FromString(), ToString());
873  server()->RemoveConnection(this);
874 }
875 
877  XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
878  "Managed Client Delete", FromString(), ToString());
879  deleter_->Delete();
880 }
881 
883  if (!deleter()->IsDeleted())
884  return;
885  deleter()->RetryDelete();
886 }
887 
889  return server()->lifetime_manager();
890 }
891 
893  return static_cast<XmppClient *>(server_);
894 }
895 
897  return deleter_.get();
898 }
899 
901  return deleter_.get();
902 }
903 
904 void XmppClientConnection::set_close_reason(const string &close_reason) {
905  close_reason_ = close_reason;
906  if (!logUVE())
907  return;
908 
909  XmppPeerInfoData peer_info;
910  peer_info.set_name(ToUVEKey());
911  peer_info.set_close_reason(close_reason_);
912  XMPPPeerInfoSend(peer_info);
913 }
914 
916  return flap_count_;
917 }
918 
920  flap_count_++;
922 
923  if (!logUVE())
924  return;
925 
926  XmppPeerInfoData peer_info;
927  peer_info.set_name(ToUVEKey());
928  PeerFlapInfo flap_info;
929  flap_info.set_flap_count(flap_count_);
930  flap_info.set_flap_time(last_flap_);
931  peer_info.set_flap_info(flap_info);
932  XMPPPeerInfoSend(peer_info);
933 }
934 
935 const std::string XmppClientConnection::last_flap_at() const {
937 }
938 
940  : client_(client), flap_count_(0), last_flap_(0), connection_(NULL) {
941 }
942 
943 void XmppConnectionEndpoint::set_close_reason(const string &close_reason) {
944  close_reason_ = close_reason;
945 }
946 
948  return flap_count_;
949 }
950 
952  flap_count_++;
954 }
955 
957  return last_flap_;
958 }
959 
960 const std::string XmppConnectionEndpoint::last_flap_at() const {
962 }
963 
965  return connection_;
966 }
967 
969  return connection_;
970 }
971 
973  assert(!connection_);
975 }
976 
978  assert(connection_);
979  connection_ = NULL;
980 }
981 
982 // Swap relavent contents between two XmppConnection objects.
984  assert(!IsClient());
985  assert(!other->IsClient());
986  // Update the ConnectionMap in the server as the endpoints are the keys.
987  XmppServer *server = dynamic_cast<XmppServerConnection *>(this)->server();
988  server->SwapXmppConnectionMapEntries(this, other);
989  // Swap all other connection related information.
990  swap(local_endpoint_, other->local_endpoint_);
991  swap(stats_, other->stats_);
992  swap(error_stats_, other->error_stats_);
993  swap(last_msg_, other->last_msg_);
994  swap(to_, other->to_);
995  swap(from_, other->from_);
996  swap(xmlns_, other->xmlns_);
997  swap(dscp_value_, other->dscp_value_);
998  swap(disable_read_, other->disable_read_);
999 }
const std::string & FromString() const
void SetTo(const std::string &)
virtual void WriteReady()
boost::scoped_ptr< DeleteActor > deleter_
#define XMPP_MESSAGE_TRACE(obj,...)
Definition: xmpp_log.h:78
LifetimeManager * lifetime_manager()
Definition: xmpp_client.cc:131
XmppSession * CreateSession()
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_session.h:62
bool MayDelete() const
void ClearConnection()
Definition: xmpp_session.cc:63
std::string StateName() const
#define XMPP_UTDEBUG(obj,...)
Definition: xmpp_log.h:67
LifetimeManager * lifetime_manager()
Definition: xmpp_server.cc:360
XmppMessageType type
Definition: xmpp_proto.h:57
#define XMPP_INFO(obj,...)
Definition: xmpp_log.h:51
std::string to_
size_t get_stream_feature_fail()
virtual void DeleteSession(TcpSession *session)
Definition: tcp_server.cc:197
void SetConfig(const XmppChannelConfig *)
tbb::atomic< uint32_t > update
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
virtual void set_close_reason(const std::string &reason)
virtual bool AcceptSession(XmppSession *session)
virtual LifetimeActor * deleter()=0
const std::string last_flap_at() const
virtual const std::string last_flap_at() const
virtual void InsertDeletedConnection(XmppServerConnection *connection)
Definition: xmpp_server.cc:652
uint32_t flap_count() const
XmppStreamTlsType strmtlstype
Definition: xmpp_proto.h:99
void KeepaliveTimerErrorHanlder(std::string error_name, std::string error_message)
void inc_stream_feature_fail()
tbb::atomic< uint32_t > open
tbb::atomic< uint32_t > close
#define XMPP_PEER_DIR_NA
Definition: xmpp_log.h:16
bool KeepAliveTimerExpired()
const XmppChannelConfig * config_
virtual boost::asio::ip::tcp::endpoint endpoint() const
XmppServerConnection(XmppServer *server, const XmppChannelConfig *config)
std::string endpoint_string() const
void UpdateKeepAliveTimer(uint8_t time_out)
XmppConnectionEndpoint * conn_endpoint()
#define XMPP_PEER_DIR_IN
Definition: xmpp_log.h:15
boost::asio::ip::address IpAddress
Definition: address.h:13
size_t get_sm_keepalive_count()
bool on_work_queue() const
virtual TcpSession * CreateSession()
Definition: tcp_server.cc:188
Timer * keepalive_timer_
virtual void ManagedDelete()
void inc_handshake_failure()
void set_close_reason(const std::string &close_reason)
virtual const char * ReadNode(const std::string &name)=0
boost::intrusive_ptr< SslSession > SslSessionPtr
Definition: ssl_session.h:11
std::string from_
#define XMPP_PEER_DIR_OUT
Definition: xmpp_log.h:14
tbb::atomic< uint32_t > keepalive
void StartKeepAliveTimer()
tbb::atomic< uint32_t > open_fail
boost::scoped_ptr< DeleteActor > deleter_
XmppConnection(TcpServer *server, const XmppChannelConfig *config)
virtual LifetimeActor * deleter()
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:428
std::unique_ptr< XmppStanza::XmppMessage > last_msg_
virtual ~XmppConnection()
std::string LastStateName() const
XmppStreamMsgType strmtype
Definition: xmpp_proto.h:98
void SendClose(XmppSession *session)
bool IsClient() const
XmppConnectionEndpoint(const std::string &client)
void RetryDelete()
Definition: lifetime.cc:71
virtual void RetryDelete()
virtual int ModifyAttribute(const std::string &key, const std::string &value)=0
void SetAdminDown(bool toggle)
xmsm::XmOpenConfirmState GetStateMcOpenConfirmState() const
static void XMPPPeerInfoSend(XmppPeerInfoData &peer_info)
virtual void increment_flap_count()
virtual bool SendProceedTls(XmppSession *session)
XmppConnection * connection()
bool Send(const uint8_t *data, size_t size, const std::string *msg_str=NULL)
std::unique_ptr< XmlBase > dom
Definition: xmpp_proto.h:62
virtual void ManagedDelete()
virtual LifetimeActor * deleter()
bool IsDeleted() const
uint8_t type
Definition: load_balance.h:109
static const std::string integerToString(const NumberType &num)
Definition: string_util.h:19
xmsm::XmOpenConfirmState OpenConfirmStateType() const
bool duplicate() const
virtual LifetimeManager * lifetime_manager()
bool IsDeleted() const
Definition: lifetime.h:131
std::string LastEvent() const
static TaskScheduler * GetInstance()
Definition: task.cc:547
const std::string & remote_addr_string() const
Definition: tcp_session.h:139
void RemoveConnection(XmppClientConnection *connection)
Definition: xmpp_client.cc:261
boost::asio::ip::tcp::endpoint local_endpoint_
std::string local_endpoint_string() const
virtual bool SendOpenConfirm(XmppSession *session)
#define CHECK_CONCURRENCY(...)
static boost::posix_time::ptime UTCUsecToPTime(uint64_t tusec)
Definition: time_util.h:38
void IncStats(unsigned int message_type, uint64_t bytes)
int SetDscpValue(uint8_t value)
virtual void RemoveDeletedConnection(XmppServerConnection *connection)
Definition: xmpp_server.cc:665
virtual void RetryDelete()
virtual const std::string last_flap_at() const
virtual bool SendStreamFeatureRequest(XmppSession *session)
virtual const char * ReadAttrib(const std::string &str)=0
xmsm::XmState GetStateMcState() const
tbb::atomic< uint32_t > session_close
xmsm::XmState StateType() const
int ProcessXmppChatMessage(const XmppStanza::XmppChatMessage *)
void ReleaseConnectionEndpoint(XmppServerConnection *connection)
Definition: xmpp_server.cc:721
virtual uint32_t flap_count() const
virtual void increment_flap_count()
XmppChannelMux * ChannelMux()
static const char * kAuthTypeNil
std::string GetXmppAuthenticationType() const
uint64_t last_flap() const
bool ClientOnly() const
Definition: xmpp_config.h:42
virtual void ReceiveMsg(XmppSession *session, const std::string &)
size_t get_sm_connect_attempts()
XmppClientConnection(XmppClient *server, const XmppChannelConfig *config)
void set_connection(XmppConnection *connection)
bool Cancel()
Definition: timer.cc:150
virtual void RemoveConnection(XmppServerConnection *connection)
Definition: xmpp_server.cc:518
const std::string & ToString() const
static XmppStanza::XmppMessage * Decode(const XmppConnection *connection, const std::string &ts)
Definition: xmpp_proto.cc:213
TcpServer * server()
int GetTaskInstance() const
boost::scoped_ptr< XmppChannelMux > mux_
XmppSession * session_
const XmppChannelMux * channel_mux() const
void set_session(XmppSession *session)
void ProcessSslHandShakeResponse(SslSessionPtr session, const boost::system::error_code &error)
#define XMPP_CONTROL_MESSAGE_MAX_SIZE
TcpServer * server_
void OnEvent(SslSession *session, xmsm::SslHandShakeResponse)
size_t get_session_close()
tbb::atomic< uint32_t > stream_feature_fail
void SetConnection(XmppConnection *connection)
Definition: xmpp_session.cc:52
int ProcessXmppIqMessage(const XmppStanza::XmppMessage *)
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
ErrorStats error_stats_
virtual uint32_t flap_count() const
std::string LastStateChangeAt() const
void IncProtoStats(unsigned int type)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
tbb::atomic< uint32_t > handshake_fail
int SetDscpSocketOption(uint8_t value)
Definition: tcp_session.cc:569
#define XMPP_DEBUG(obj,...)
Definition: xmpp_log.h:59
static const char * kAuthTypeTls
virtual void set_close_reason(const std::string &reason)
ProtoStats stats_[2]
Endpoint remote_endpoint() const
Definition: tcp_session.h:135
tbb::spin_mutex spin_mutex_
virtual void ManagedDelete()=0
bool logUVE() const
size_t get_open_fail()
void NotifyConnectionEvent(XmppChannelMux *, xmps::PeerState)
Definition: xmpp_client.cc:208
static int EncodeStream(const XmppStreamMessage &str, std::string &to, std::string &from, const std::string &xmlns, uint8_t *data, size_t size)
size_t get_connect_error()
boost::scoped_ptr< XmppStateMachine > state_machine_
XmppSession * session()
DeleteActor(XmppClient *client, XmppClientConnection *parent)
void FillShowInfo(ShowXmppConnection *show_connection) const
virtual bool SendOpen(XmppSession *session)
DeleteActor(XmppServer *server, XmppServerConnection *parent)
XmppConnectionEndpoint * FindConnectionEndpoint(const std::string &endpoint_name)
Definition: xmpp_server.cc:675
XmppStateMachine * state_machine()
boost::asio::ip::tcp::endpoint endpoint_
void LogMsg(std::string msg)
void SwapContents(XmppConnection *other)
const XmppSession * session() const
virtual boost::asio::ip::tcp::endpoint local_endpoint() const
#define XMPP_ALERT(obj,...)
Definition: xmpp_log.h:27
tbb::atomic< uint32_t > connect_error
std::string uve_key_str_
const std::string & ToUVEKey() const
virtual bool SendStartTls(XmppSession *session)
uint8_t dscp_value() const
XmppConnection * connection_
#define XMPP_ERROR(obj,...)
Definition: xmpp_log.h:18
XmppConnectionEndpoint * conn_endpoint_
int HardwareThreadCount()
Definition: task.h:276
size_t get_handshake_failure()
XmppStanza::XmppMessage * XmppDecode(const std::string &msg)
virtual LifetimeManager * lifetime_manager()
#define XMPP_WARNING(obj,...)
Definition: xmpp_log.h:35
std::string xmlns_
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222