OpenSDN source code
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
xmpp_connection.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "xmpp/xmpp_connection.h"
6 
7 #include <boost/date_time/posix_time/posix_time.hpp>
8 #include <sstream>
9 
10 #include "base/lifetime.h"
11 #include "base/task_annotations.h"
12 #include "io/event_manager.h"
13 #include "xml/xml_base.h"
14 #include "xmpp/xmpp_client.h"
15 #include "xmpp/xmpp_config.h"
16 #include "xmpp/xmpp_factory.h"
17 #include "xmpp/xmpp_log.h"
18 #include "xmpp/xmpp_server.h"
19 #include "xmpp/xmpp_session.h"
20 
21 #include "sandesh/common/vns_types.h"
22 #include "sandesh/common/vns_constants.h"
23 #include "sandesh/xmpp_client_server_sandesh_types.h"
24 #include "sandesh/xmpp_message_sandesh_types.h"
25 #include "sandesh/xmpp_server_types.h"
26 #include "sandesh/xmpp_state_machine_sandesh_types.h"
27 #include "sandesh/xmpp_trace_sandesh_types.h"
28 #include "sandesh/xmpp_peer_info_types.h"
29 
30 using namespace std;
31 using boost::system::error_code;
32 
33 const char *XmppConnection::kAuthTypeNil = "NIL";
34 const char *XmppConnection::kAuthTypeTls = "TLS";
35 
36 // Maximum XMPP control message size. Typically this is less then 256 bytes. But
37 // in scenarios where host names are quite long, we need a larger buffer size.
38 #define XMPP_CONTROL_MESSAGE_MAX_SIZE 1024
39 
41  const XmppChannelConfig *config)
42  : server_(server),
43  session_(NULL),
44  endpoint_(config->endpoint),
45  local_endpoint_(config->local_endpoint),
46  config_(NULL),
47  keepalive_timer_(TimerManager::CreateTimer(
48  *server->event_manager()->io_service(),
49  "Xmpp keepalive timer",
50  TaskScheduler::GetInstance()->GetTaskId("xmpp::StateMachine"),
51  GetTaskInstance(config->ClientOnly()))),
52  is_client_(config->ClientOnly()),
53  log_uve_(config->logUVE),
54  admin_down_(false),
55  disable_read_(false),
56  from_(config->FromAddr),
57  to_(config->ToAddr),
58  auth_enabled_(config->auth_enabled),
59  dscp_value_(config->dscp_value), xmlns_(config->xmlns),
60  state_machine_(XmppStaticObjectFactory::Create<XmppStateMachine>(
61  this, config->ClientOnly(), config->auth_enabled, config->xmpp_hold_time)),
62  mux_(XmppStaticObjectFactory::Create<XmppChannelMux>(this)) {
63  ostringstream oss;
64  oss << FromString() << ":" << endpoint().address().to_string();
65  uve_key_str_ = oss.str();
66 }
67 
71  XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
72  "XmppConnection destructor", FromString(), ToString());
73 }
74 
76  if (auth_enabled_) {
78  } else {
80  }
81 }
82 
84  config_ = config;
85 }
86 
88  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
89  assert(session);
90  session_ = session;
91  if (session_ && dscp_value_) {
93  }
94 }
95 
97  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
98  if (!session_)
99  return;
101  session_ = NULL;
102 }
103 
105  return session_;
106 }
107 
109  return session_;
110 }
111 
113  boost::system::error_code ec;
114  mux_->WriteReady(ec);
115 }
116 
118  ManagedDelete();
119 }
120 
122  return deleter()->IsDeleted();
123 }
124 
126  return !mux_->ReceiverCount() && !mux_->RefererCount();
127 }
128 
131  XmppSession *xmpp_session = static_cast<XmppSession *>(session);
132  xmpp_session->SetConnection(this);
133  return xmpp_session;
134 }
135 
136 //
137 // Return the task instance for this XmppConnection.
138 // Calculate from the remote IpAddress so that a restarting session uses the
139 // same value as before.
140 // Do not make this method virtual since it gets called from the constructor.
141 //
142 
143 int XmppConnection::GetTaskInstance(bool is_client) const {
144  if (is_client)
145  return 0;
146  IpAddress address = endpoint().address();
147  int thread_count = TaskScheduler::GetInstance()->HardwareThreadCount();
148  if (address.is_v4()) {
149  return address.to_v4().to_ulong() % thread_count;
150  } else {
151  return 0;
152  }
153 }
154 
156  const XmppStateMachine *sm = state_machine();
157  assert(sm);
158  return sm->StateType();
159 }
160 
162  const XmppStateMachine *sm = state_machine();
163  assert(sm);
164  return sm->OpenConfirmStateType();
165 }
166 
167 
168 boost::asio::ip::tcp::endpoint XmppConnection::endpoint() const {
169  return endpoint_;
170 }
171 
172 boost::asio::ip::tcp::endpoint XmppConnection::local_endpoint() const {
173  return local_endpoint_;
174 }
175 
177  ostringstream oss;
178  oss << endpoint_;
179  return oss.str();
180 }
181 
183  ostringstream oss;
184  oss << local_endpoint_;
185  return oss.str();
186 }
187 
188 const string &XmppConnection::FromString() const {
189  return from_;
190 }
191 
192 const string &XmppConnection::ToString() const {
193  return to_;
194 }
195 
196 const std::string &XmppConnection::ToUVEKey() const {
197  return uve_key_str_;
198 }
199 
200 static void XMPPPeerInfoSend(XmppPeerInfoData &peer_info) {
201  assert(!peer_info.get_name().empty());
202  XMPPPeerInfo::Send(peer_info);
203 }
204 
205 void XmppConnection::SetTo(const string &to) {
206  if ((to_.size() == 0) && (to.size() != 0)) {
207  to_ = to;
208  if (!logUVE()) return;
209  XmppPeerInfoData peer_info;
210  peer_info.set_name(ToUVEKey());
211  peer_info.set_identifier(to_);
212  XMPPPeerInfoSend(peer_info);
213  }
214 }
215 
216 void XmppConnection::SetAdminDown(bool toggle) {
217  // TODO: generate state machine event.
218  admin_down_ = toggle;
219 }
220 
222  session->SetConnection(this);
223  return state_machine_->PassiveOpen(session);
224 }
225 
226 bool XmppConnection::Send(const uint8_t *data, size_t size,
227  const string *msg_str) {
228  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
229  if (session_ == NULL) {
230  return false;
231  }
232 
234  const string &endpoint_addr_str = session_->remote_addr_string();
235  string str;
236  if (!msg_str) {
237  str.append(reinterpret_cast<const char *>(data), size);
238  msg_str = &str;
239  }
240 
241  if (!(mux_ &&
242  (mux_->TxMessageTrace(endpoint_addr_str, endpoint.port(),
243  size, *msg_str, NULL)))) {
244  XMPP_MESSAGE_TRACE(XmppTxStream,
245  endpoint_addr_str, endpoint.port(), size, *msg_str);
246  }
247 
248  stats_[1].update++;
249  size_t sent;
250  return session_->Send(data, size, &sent);
251 }
252 
253 int XmppConnection::SetDscpValue(uint8_t value) {
254  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
255  dscp_value_ = value;
256  if (!session_) {
257  return 0;
258  }
259  return session_->SetDscpSocketOption(value);
260 }
261 
263  if (!session) return false;
264  XmppProto::XmppStanza::XmppStreamMessage openstream;
266  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
267  int len = XmppProto::EncodeStream(openstream, to_, from_, xmlns_, data,
268  sizeof(data));
269  if (len <= 0) {
270  inc_open_fail();
271  return false;
272  } else {
273  XMPP_UTDEBUG(XmppOpen, ToUVEKey(), XMPP_PEER_DIR_OUT, len, from_, to_,
274  xmlns_);
275  session->Send(data, len, NULL);
276  stats_[1].open++;
277  return true;
278  }
279 }
280 
282  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
283  if (!session_) return false;
286  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
287  int len = XmppProto::EncodeStream(openstream, to_, from_, xmlns_, data,
288  sizeof(data));
289  if (len <= 0) {
290  inc_open_fail();
291  return false;
292  } else {
293  XMPP_UTDEBUG(XmppOpenConfirm, ToUVEKey(), XMPP_PEER_DIR_OUT, len,
294  from_, to_);
295  session_->Send(data, len, NULL);
296  stats_[1].open++;
297  return true;
298  }
299 }
300 
302  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
303  if (!session_) return false;
304  XmppStanza::XmppStreamMessage featurestream;
307  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
308  int len = XmppProto::EncodeStream(featurestream, to_, from_, xmlns_, data,
309  sizeof(data));
310  if (len <= 0) {
312  return false;
313  } else {
314  XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
315  "Send Stream Feature Request", len, from_, to_);
316  session_->Send(data, len, NULL);
317  //stats_[1].open++;
318  return true;
319  }
320 }
321 
323  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
324  if (!session_) return false;
328  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
329  int len = XmppProto::EncodeStream(stream, to_, from_, xmlns_, data,
330  sizeof(data));
331  if (len <= 0) {
333  return false;
334  } else {
335  XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
336  "Send Start Tls", len, from_, to_);
337  session_->Send(data, len, NULL);
338  //stats_[1].open++;
339  return true;
340  }
341 }
342 
344  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
345  if (!session_) return false;
349  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
350  int len = XmppProto::EncodeStream(stream, to_, from_, xmlns_, data,
351  sizeof(data));
352  if (len <= 0) {
354  return false;
355  } else {
356  XMPP_UTDEBUG(XmppControlMessage, ToUVEKey(), XMPP_PEER_DIR_OUT,
357  "Send Proceed Tls", len, from_, to_);
358  session_->Send(data, len, NULL);
359  //stats_[1].open++;
360  return true;
361  }
362 }
363 
365  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
366  if (!session_) return;
367  string str("</stream:stream>");
368  uint8_t data[64];
369  memcpy(data, str.data(), str.size());
370  XMPP_UTDEBUG(XmppClose, ToUVEKey(), XMPP_PEER_DIR_OUT, str.size(), from_,
371  to_);
372  session_->Send(data, str.size(), NULL);
373  stats_[1].close++;
374 }
375 
377  const boost::system::error_code& error) {
378  if (!state_machine())
379  return;
380 
381  if (error) {
383 
384  if (error.category() == boost::asio::error::get_ssl_category()) {
385  string err = error.message();
386  err = string(" (")
387  +boost::lexical_cast<string>(ERR_GET_LIB(error.value()))+","
388  +boost::lexical_cast<string>(ERR_GET_FUNC(error.value()))+","
389  +boost::lexical_cast<string>(ERR_GET_REASON(error.value()))+") ";
390 
391  char buf[128];
392  ::ERR_error_string_n(error.value(), buf, sizeof(buf));
393  err += buf;
394  XMPP_ALERT(XmppSslHandShakeFailure, ToUVEKey(), XMPP_PEER_DIR_IN,
395  "failure", err);
396  }
397 
399 
400  } else {
401  XMPP_DEBUG(XmppSslHandShakeMessage, session->ToUVEKey(),
402  XMPP_PEER_DIR_IN, "success", "");
404  }
405 }
406 
407 void XmppConnection::LogMsg(std::string msg) {
408  log4cplus::Logger logger = log4cplus::Logger::getRoot();
409  LOG4CPLUS_DEBUG(logger, msg << ToString() << " " <<
410  local_endpoint_.address() << ":" << local_endpoint_.port() << "::" <<
411  endpoint_.address() << ":" << endpoint_.port());
412 }
413 
415  static bool init_ = false;
416  static bool log_ = false;
417 
418  if (!init_) {
419  char *str = getenv("XMPP_ASSERT_ON_HOLD_TIMEOUT");
420  if (str && strtoul(str, NULL, 0) != 0) log_ = true;
421  init_ = true;
422  }
423 
424  if (log_) LogMsg("SEND KEEPALIVE: ");
425 }
426 
428  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
429  if (!session_) return;
431  uint8_t data[XMPP_CONTROL_MESSAGE_MAX_SIZE];
432  int len = XmppProto::EncodeStream(msg, data, sizeof(data));
433  assert(len > 0);
434  session_->Send(data, len, NULL);
435  stats_[1].keepalive++;
437 }
438 
440  if (state_machine_->get_state() != xmsm::ESTABLISHED)
441  return false;
442 
443  // TODO: check timestamp of last received packet.
444  SendKeepAlive();
445 
446  //
447  // Start the timer again, by returning true
448  //
449  return true;
450 }
451 
453  string error_message) {
454  XMPP_WARNING(XmppKeepaliveTimeError, ToUVEKey(), XMPP_PEER_DIR_NA,
455  error_name, error_message);
456 }
457 
459  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
460  if (!session_)
461  return;
462 
463  int holdtime_msecs = state_machine_->hold_time_msecs();
464  if (holdtime_msecs <= 0)
465  return;
466 
467  keepalive_timer_->Start(holdtime_msecs / 3,
468  boost::bind(&XmppConnection::KeepAliveTimerExpired, this),
469  boost::bind(&XmppConnection::KeepaliveTimerErrorHanlder, this, _1, _2));
470 }
471 
473  tbb::spin_mutex::scoped_lock lock(spin_mutex_);
475 }
476 
477 void XmppConnection::UpdateKeepAliveTimer(uint8_t time_out) {
478  state_machine_->set_hold_time(time_out);
481 }
482 
484  return state_machine_.get();
485 }
486 
488  return state_machine_.get();
489 }
490 
492  return mux_.get();
493 }
494 
496  switch (type) {
498  stats_[0].open++;
499  break;
501  stats_[0].keepalive++;
502  break;
504  stats_[0].update++;
505  break;
507  stats_[0].update++;
508  break;
509  }
510 }
511 
514 }
515 
518 }
519 
522 }
523 
526 }
527 
530 }
531 
534 }
535 
538 }
539 
541  return error_stats_.open_fail;
542 }
543 
546 }
547 
550 }
551 
553  return state_machine_->get_connect_attempts();
554 }
555 
557  return state_machine_->get_keepalive_count();
558 }
559 
560 void XmppConnection::ReceiveMsg(XmppSession *session, const string &msg) {
561  XmppStanza::XmppMessage *minfo = XmppDecode(msg);
562 
563  if (minfo) {
564  session->IncStats((unsigned int)minfo->type, msg.size());
566  if (!(mux_ &&
567  (mux_->RxMessageTrace(session->
568  remote_endpoint().address().to_string(),
569  session->remote_endpoint().port(),
570  msg.size(), msg, minfo)))) {
571  XMPP_MESSAGE_TRACE(XmppRxStream,
572  session->
573  remote_endpoint().address().to_string(),
574  session->
575  remote_endpoint().port(), msg.size(), msg);
576  }
577  }
578  IncProtoStats((unsigned int)minfo->type);
579  state_machine_->OnMessage(session, minfo);
580  } else if ((minfo = last_msg_.get()) != NULL) {
581  session->IncStats((unsigned int)minfo->type, msg.size());
582  IncProtoStats((unsigned int)minfo->type);
583  } else {
584  session->IncStats(XmppStanza::INVALID, msg.size());
585  XMPP_MESSAGE_TRACE(XmppRxStreamInvalid,
586  session->remote_endpoint().address().to_string(),
587  session->remote_endpoint().port(), msg.size(), msg);
588  }
589  return;
590 }
591 
593  unique_ptr<XmppStanza::XmppMessage> minfo(XmppProto::Decode(this, msg));
594  if (minfo.get() == NULL) {
595  XMPP_INFO(XmppSessionDelete, ToUVEKey(), XMPP_PEER_DIR_IN, "Server",
596  FromString(), ToString());
597  Clear();
598  return NULL;
599  }
600 
601  if (minfo->type == XmppStanza::IQ_STANZA) {
602  const XmppStanza::XmppMessageIq *iq =
603  static_cast<const XmppStanza::XmppMessageIq *>(minfo.get());
604 
605 
606  if (iq->action.compare("publish") == 0) {
607  last_msg_.reset(minfo.release());
608  return NULL;
609  }
610 
611  if (iq->action.compare("collection") == 0) {
612  if (last_msg_.get() != NULL) {
613  XmppStanza::XmppMessageIq *last_iq =
614  static_cast<XmppStanza::XmppMessageIq *>(last_msg_.get());
615 
616  if (last_iq->node.compare(iq->as_node) == 0) {
617  XmlBase *impl = last_iq->dom.get();
618  impl->ReadNode("publish");
619  impl->ModifyAttribute("node", iq->node);
620  last_iq->node = impl->ReadAttrib("node");
621  last_iq->is_as_node = iq->is_as_node;
622  //Save the complete ass/dissociate node
623  last_iq->as_node = iq->as_node;
624  } else {
625  XMPP_WARNING(XmppIqMessageInvalid, ToUVEKey(),
627  goto error;
628  }
629  } else {
630  XMPP_ERROR(XmppIqCollectionError, ToUVEKey(), XMPP_PEER_DIR_IN);
631  goto error;
632  }
633  // iq message merged with collection info
634  return last_msg_.release();
635  }
636  }
637  return minfo.release();
638 
639 error:
640  last_msg_.reset();
641  return NULL;
642 }
643 
645  const XmppStanza::XmppChatMessage *msg) {
646  mux_->ProcessXmppMessage(msg);
647  return 0;
648 }
649 
651  mux_->ProcessXmppMessage(msg);
652  return 0;
653 }
654 
656 public:
658  : LifetimeActor(server->lifetime_manager()),
659  server_(server), parent_(parent) {
660  }
661 
662  virtual bool MayDelete() const {
663  return (!parent_->on_work_queue() && parent_->MayDelete());
664  }
665 
666  virtual void Shutdown() {
667  CHECK_CONCURRENCY("bgp::Config");
668 
669  // If the connection is still on the WorkQueue, simply add it to the
670  // ConnectionSet. It won't be on the ConnectionMap.
671  if (parent_->on_work_queue()) {
673  }
674 
675  // If the connection was rejected as duplicate, it will already be in
676  // the ConnectionSet. Non-duplicate connections need to be moved from
677  // from the ConnectionMap into the ConnectionSet. We add it to the
678  // ConnectionSet and then remove it from ConnectionMap to ensure that
679  // the XmppServer connection count doesn't temporarily become 0. This
680  // is friendly to tests that wait for the XmppServer connection count
681  // to become 0.
682  //
683  // Breaking association with the XmppConnectionEndpoint here allows
684  // a new XmppServerConnection with the same Endpoint to come up. We
685  // may end up leaking memory if current XmppServerConnection doesn't
686  // get cleaned up completely, but we at least prevent the other end
687  // from getting stuck forever.
688  else if (!parent_->duplicate()) {
692  }
693 
694  if (parent_->state_machine()) {
696  }
697 
698  XmppSession *session = NULL;
699  if (parent_->state_machine()) {
700  session = parent_->state_machine()->session();
702  }
703  if (session) {
704  server_->DeleteSession(session);
705  }
706  }
707 
708  virtual void Destroy() {
709  delete parent_;
710  }
711 
712 private:
715 };
716 
718  const XmppChannelConfig *config)
719  : XmppConnection(server, config),
720  duplicate_(false),
721  on_work_queue_(false),
722  conn_endpoint_(NULL),
723  deleter_(new DeleteActor(server, this)),
724  server_delete_ref_(this, server->deleter()) {
725  assert(!config->ClientOnly());
726  XMPP_INFO(XmppConnectionCreate, ToUVEKey(), XMPP_PEER_DIR_IN,
727  "Server", FromString(), ToString());
728 }
729 
731  CHECK_CONCURRENCY("bgp::Config");
732 
733  XMPP_INFO(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA, "Server",
734  FromString(), ToString());
736 }
737 
739  XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
740  "Managed server connection delete", FromString(), ToString());
741  deleter_->Delete();
742 }
743 
745  if (!deleter()->IsDeleted())
746  return;
747  deleter()->RetryDelete();
748 }
749 
751  return server()->lifetime_manager();
752 }
753 
755  return static_cast<XmppServer *>(server_);
756 }
757 
759  return deleter_.get();
760 }
761 
763  return deleter_.get();
764 }
765 
766 void XmppServerConnection::set_close_reason(const string &close_reason) {
767  if (conn_endpoint_)
768  conn_endpoint_->set_close_reason(close_reason);
769 
770  if (!logUVE())
771  return;
772 
773  XmppPeerInfoData peer_info;
774  peer_info.set_name(ToUVEKey());
775  peer_info.set_close_reason(close_reason);
776  XMPPPeerInfoSend(peer_info);
777 }
778 
780  return conn_endpoint_ ? conn_endpoint_->flap_count() : 0;
781 }
782 
785  if (!conn_endpoint)
786  conn_endpoint = server()->FindConnectionEndpoint(ToString());
787  if (!conn_endpoint)
788  return;
789  conn_endpoint->increment_flap_count();
790 
791  if (!logUVE())
792  return;
793 
794  XmppPeerInfoData peer_info;
795  peer_info.set_name(ToUVEKey());
796  PeerFlapInfo flap_info;
797  flap_info.set_flap_count(conn_endpoint->flap_count());
798  flap_info.set_flap_time(conn_endpoint->last_flap());
799  peer_info.set_flap_info(flap_info);
800  XMPPPeerInfoSend(peer_info);
801 }
802 
803 const std::string XmppServerConnection::last_flap_at() const {
804  return conn_endpoint_ ? conn_endpoint_->last_flap_at() : "";
805 }
806 
808  ShowXmppConnection *show_connection) const {
809  show_connection->set_name(ToString());
810  show_connection->set_deleted(IsDeleted());
811  show_connection->set_remote_endpoint(endpoint_string());
812  show_connection->set_local_endpoint(local_endpoint_string());
813  show_connection->set_state(StateName());
814  show_connection->set_last_event(LastEvent());
815  show_connection->set_last_state(LastStateName());
816  show_connection->set_last_state_at(LastStateChangeAt());
817  show_connection->set_receivers(channel_mux()->GetReceiverList());
818  show_connection->set_server_auth_type(GetXmppAuthenticationType());
819  show_connection->set_dscp_value(dscp_value());
820 }
821 
823 public:
825  : LifetimeActor(client->lifetime_manager()),
826  client_(client), parent_(parent) {
827  }
828 
829  virtual bool MayDelete() const {
830  return parent_->MayDelete();
831  }
832 
833  virtual void Shutdown() {
834  if (parent_->session()) {
837  }
838 
839  XmppSession *session = NULL;
840  if (parent_->state_machine()) {
841  session = parent_->state_machine()->session();
843  }
844  if (session) {
845  client_->DeleteSession(session);
846  }
847  }
848 
849  virtual void Destroy() {
850  delete parent_;
851  }
852 
853 private:
856 };
857 
859  const XmppChannelConfig *config)
860  : XmppConnection(server, config),
861  flap_count_(0),
862  deleter_(new DeleteActor(server, this)),
863  server_delete_ref_(this, server->deleter()) {
864  assert(config->ClientOnly());
865  XMPP_UTDEBUG(XmppConnectionCreate, ToUVEKey(), XMPP_PEER_DIR_NA, "Client",
866  FromString(), ToString());
867 }
868 
870  CHECK_CONCURRENCY("bgp::Config");
871 
872  XMPP_INFO(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
873  "Client", FromString(), ToString());
874  server()->RemoveConnection(this);
875 }
876 
878  XMPP_UTDEBUG(XmppConnectionDelete, ToUVEKey(), XMPP_PEER_DIR_NA,
879  "Managed Client Delete", FromString(), ToString());
880  deleter_->Delete();
881 }
882 
884  if (!deleter()->IsDeleted())
885  return;
886  deleter()->RetryDelete();
887 }
888 
890  return server()->lifetime_manager();
891 }
892 
894  return static_cast<XmppClient *>(server_);
895 }
896 
898  return deleter_.get();
899 }
900 
902  return deleter_.get();
903 }
904 
905 void XmppClientConnection::set_close_reason(const string &close_reason) {
906  close_reason_ = close_reason;
907  if (!logUVE())
908  return;
909 
910  XmppPeerInfoData peer_info;
911  peer_info.set_name(ToUVEKey());
912  peer_info.set_close_reason(close_reason_);
913  XMPPPeerInfoSend(peer_info);
914 }
915 
917  return flap_count_;
918 }
919 
921  flap_count_++;
923 
924  if (!logUVE())
925  return;
926 
927  XmppPeerInfoData peer_info;
928  peer_info.set_name(ToUVEKey());
929  PeerFlapInfo flap_info;
930  flap_info.set_flap_count(flap_count_);
931  flap_info.set_flap_time(last_flap_);
932  peer_info.set_flap_info(flap_info);
933  XMPPPeerInfoSend(peer_info);
934 }
935 
936 const std::string XmppClientConnection::last_flap_at() const {
938 }
939 
941  : client_(client), flap_count_(0), last_flap_(0), connection_(NULL) {
942 }
943 
944 void XmppConnectionEndpoint::set_close_reason(const string &close_reason) {
945  close_reason_ = close_reason;
946 }
947 
949  return flap_count_;
950 }
951 
953  flap_count_++;
955 }
956 
958  return last_flap_;
959 }
960 
961 const std::string XmppConnectionEndpoint::last_flap_at() const {
963 }
964 
966  return connection_;
967 }
968 
970  return connection_;
971 }
972 
974  assert(!connection_);
976 }
977 
979  assert(connection_);
980  connection_ = NULL;
981 }
982 
983 // Swap relavent contents between two XmppConnection objects.
985  assert(!IsClient());
986  assert(!other->IsClient());
987  // Update the ConnectionMap in the server as the endpoints are the keys.
988  XmppServer *server = dynamic_cast<XmppServerConnection *>(this)->server();
989  server->SwapXmppConnectionMapEntries(this, other);
990  // Swap all other connection related information.
991  swap(local_endpoint_, other->local_endpoint_);
992  swap(stats_, other->stats_);
993  swap(error_stats_, other->error_stats_);
994  swap(last_msg_, other->last_msg_);
995  swap(to_, other->to_);
996  swap(from_, other->from_);
997  swap(xmlns_, other->xmlns_);
998  swap(dscp_value_, other->dscp_value_);
999  swap(disable_read_, other->disable_read_);
1000 }
const std::string & FromString() const
void SetTo(const std::string &)
virtual void WriteReady()
boost::scoped_ptr< DeleteActor > deleter_
#define XMPP_MESSAGE_TRACE(obj,...)
Definition: xmpp_log.h:78
LifetimeManager * lifetime_manager()
Definition: xmpp_client.cc:131
XmppSession * CreateSession()
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_session.h:62
bool MayDelete() const
void ClearConnection()
Definition: xmpp_session.cc:63
std::string StateName() const
#define XMPP_UTDEBUG(obj,...)
Definition: xmpp_log.h:67
LifetimeManager * lifetime_manager()
Definition: xmpp_server.cc:360
XmppMessageType type
Definition: xmpp_proto.h:57
#define XMPP_INFO(obj,...)
Definition: xmpp_log.h:51
std::string to_
size_t get_stream_feature_fail()
virtual void DeleteSession(TcpSession *session)
Definition: tcp_server.cc:197
void SetConfig(const XmppChannelConfig *)
tbb::atomic< uint32_t > update
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
virtual void set_close_reason(const std::string &reason)
virtual bool AcceptSession(XmppSession *session)
virtual LifetimeActor * deleter()=0
const std::string last_flap_at() const
virtual const std::string last_flap_at() const
virtual void InsertDeletedConnection(XmppServerConnection *connection)
Definition: xmpp_server.cc:652
uint32_t flap_count() const
XmppStreamTlsType strmtlstype
Definition: xmpp_proto.h:99
void KeepaliveTimerErrorHanlder(std::string error_name, std::string error_message)
void inc_stream_feature_fail()
tbb::atomic< uint32_t > open
tbb::atomic< uint32_t > close
#define XMPP_PEER_DIR_NA
Definition: xmpp_log.h:16
bool KeepAliveTimerExpired()
const XmppChannelConfig * config_
virtual boost::asio::ip::tcp::endpoint endpoint() const
XmppServerConnection(XmppServer *server, const XmppChannelConfig *config)
std::string endpoint_string() const
void UpdateKeepAliveTimer(uint8_t time_out)
XmppConnectionEndpoint * conn_endpoint()
#define XMPP_PEER_DIR_IN
Definition: xmpp_log.h:15
boost::asio::ip::address IpAddress
Definition: address.h:13
size_t get_sm_keepalive_count()
bool on_work_queue() const
virtual TcpSession * CreateSession()
Definition: tcp_server.cc:188
Timer * keepalive_timer_
virtual void ManagedDelete()
void inc_handshake_failure()
void set_close_reason(const std::string &close_reason)
virtual const char * ReadNode(const std::string &name)=0
boost::intrusive_ptr< SslSession > SslSessionPtr
Definition: ssl_session.h:11
std::string from_
#define XMPP_PEER_DIR_OUT
Definition: xmpp_log.h:14
tbb::atomic< uint32_t > keepalive
void StartKeepAliveTimer()
tbb::atomic< uint32_t > open_fail
boost::scoped_ptr< DeleteActor > deleter_
XmppConnection(TcpServer *server, const XmppChannelConfig *config)
virtual LifetimeActor * deleter()
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:428
std::unique_ptr< XmppStanza::XmppMessage > last_msg_
virtual ~XmppConnection()
std::string LastStateName() const
XmppStreamMsgType strmtype
Definition: xmpp_proto.h:98
void SendClose(XmppSession *session)
bool IsClient() const
XmppConnectionEndpoint(const std::string &client)
void RetryDelete()
Definition: lifetime.cc:71
virtual void RetryDelete()
virtual int ModifyAttribute(const std::string &key, const std::string &value)=0
void SetAdminDown(bool toggle)
xmsm::XmOpenConfirmState GetStateMcOpenConfirmState() const
static void XMPPPeerInfoSend(XmppPeerInfoData &peer_info)
virtual void increment_flap_count()
virtual bool SendProceedTls(XmppSession *session)
XmppConnection * connection()
bool Send(const uint8_t *data, size_t size, const std::string *msg_str=NULL)
std::unique_ptr< XmlBase > dom
Definition: xmpp_proto.h:62
virtual void ManagedDelete()
virtual LifetimeActor * deleter()
bool IsDeleted() const
uint8_t type
Definition: load_balance.h:109
static const std::string integerToString(const NumberType &num)
Definition: string_util.h:19
xmsm::XmOpenConfirmState OpenConfirmStateType() const
bool duplicate() const
virtual LifetimeManager * lifetime_manager()
bool IsDeleted() const
Definition: lifetime.h:131
std::string LastEvent() const
static TaskScheduler * GetInstance()
Definition: task.cc:547
const std::string & remote_addr_string() const
Definition: tcp_session.h:139
void RemoveConnection(XmppClientConnection *connection)
Definition: xmpp_client.cc:261
boost::asio::ip::tcp::endpoint local_endpoint_
std::string local_endpoint_string() const
virtual bool SendOpenConfirm(XmppSession *session)
#define CHECK_CONCURRENCY(...)
static boost::posix_time::ptime UTCUsecToPTime(uint64_t tusec)
Definition: time_util.h:38
void IncStats(unsigned int message_type, uint64_t bytes)
int SetDscpValue(uint8_t value)
virtual void RemoveDeletedConnection(XmppServerConnection *connection)
Definition: xmpp_server.cc:665
virtual void RetryDelete()
virtual const std::string last_flap_at() const
virtual bool SendStreamFeatureRequest(XmppSession *session)
virtual const char * ReadAttrib(const std::string &str)=0
xmsm::XmState GetStateMcState() const
tbb::atomic< uint32_t > session_close
xmsm::XmState StateType() const
int ProcessXmppChatMessage(const XmppStanza::XmppChatMessage *)
void ReleaseConnectionEndpoint(XmppServerConnection *connection)
Definition: xmpp_server.cc:721
virtual uint32_t flap_count() const
virtual void increment_flap_count()
XmppChannelMux * ChannelMux()
static const char * kAuthTypeNil
std::string GetXmppAuthenticationType() const
uint64_t last_flap() const
bool ClientOnly() const
Definition: xmpp_config.h:42
virtual void ReceiveMsg(XmppSession *session, const std::string &)
size_t get_sm_connect_attempts()
XmppClientConnection(XmppClient *server, const XmppChannelConfig *config)
void set_connection(XmppConnection *connection)
bool Cancel()
Definition: timer.cc:150
virtual void RemoveConnection(XmppServerConnection *connection)
Definition: xmpp_server.cc:518
const std::string & ToString() const
static XmppStanza::XmppMessage * Decode(const XmppConnection *connection, const std::string &ts)
Definition: xmpp_proto.cc:213
TcpServer * server()
int GetTaskInstance() const
boost::scoped_ptr< XmppChannelMux > mux_
XmppSession * session_
const XmppChannelMux * channel_mux() const
void set_session(XmppSession *session)
void ProcessSslHandShakeResponse(SslSessionPtr session, const boost::system::error_code &error)
#define XMPP_CONTROL_MESSAGE_MAX_SIZE
TcpServer * server_
void OnEvent(SslSession *session, xmsm::SslHandShakeResponse)
size_t get_session_close()
tbb::atomic< uint32_t > stream_feature_fail
void SetConnection(XmppConnection *connection)
Definition: xmpp_session.cc:52
int ProcessXmppIqMessage(const XmppStanza::XmppMessage *)
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
ErrorStats error_stats_
virtual uint32_t flap_count() const
std::string LastStateChangeAt() const
void IncProtoStats(unsigned int type)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
tbb::atomic< uint32_t > handshake_fail
int SetDscpSocketOption(uint8_t value)
Definition: tcp_session.cc:569
#define XMPP_DEBUG(obj,...)
Definition: xmpp_log.h:59
static const char * kAuthTypeTls
virtual void set_close_reason(const std::string &reason)
ProtoStats stats_[2]
Endpoint remote_endpoint() const
Definition: tcp_session.h:135
tbb::spin_mutex spin_mutex_
virtual void ManagedDelete()=0
bool logUVE() const
size_t get_open_fail()
void NotifyConnectionEvent(XmppChannelMux *, xmps::PeerState)
Definition: xmpp_client.cc:208
static int EncodeStream(const XmppStreamMessage &str, std::string &to, std::string &from, const std::string &xmlns, uint8_t *data, size_t size)
size_t get_connect_error()
boost::scoped_ptr< XmppStateMachine > state_machine_
XmppSession * session()
DeleteActor(XmppClient *client, XmppClientConnection *parent)
void FillShowInfo(ShowXmppConnection *show_connection) const
virtual bool SendOpen(XmppSession *session)
DeleteActor(XmppServer *server, XmppServerConnection *parent)
XmppConnectionEndpoint * FindConnectionEndpoint(const std::string &endpoint_name)
Definition: xmpp_server.cc:675
XmppStateMachine * state_machine()
boost::asio::ip::tcp::endpoint endpoint_
void LogMsg(std::string msg)
void SwapContents(XmppConnection *other)
const XmppSession * session() const
virtual boost::asio::ip::tcp::endpoint local_endpoint() const
#define XMPP_ALERT(obj,...)
Definition: xmpp_log.h:27
tbb::atomic< uint32_t > connect_error
std::string uve_key_str_
const std::string & ToUVEKey() const
virtual bool SendStartTls(XmppSession *session)
uint8_t dscp_value() const
XmppConnection * connection_
#define XMPP_ERROR(obj,...)
Definition: xmpp_log.h:18
XmppConnectionEndpoint * conn_endpoint_
int HardwareThreadCount()
Definition: task.h:276
size_t get_handshake_failure()
XmppStanza::XmppMessage * XmppDecode(const std::string &msg)
virtual LifetimeManager * lifetime_manager()
#define XMPP_WARNING(obj,...)
Definition: xmpp_log.h:35
std::string xmlns_
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222