OpenSDN source code
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
xmpp_channel_mux.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 
7 #include <boost/foreach.hpp>
8 
10 #include "xmpp/xmpp_init.h"
11 #include "xmpp/xmpp_connection.h"
12 
13 using namespace std;
14 using namespace xmsm;
15 
17  : connection_(connection), rx_message_trace_cb_(NULL),
18  tx_message_trace_cb_(NULL) {
19  last_received_ = 0;
20  last_sent_ = 0;
21 }
22 
24  assert(map_.empty());
25 }
26 
28  connection_->Clear();
29 }
30 
31 bool XmppChannelMux::LastReceived(time_t duration) const {
32  return (UTCTimestamp() - last_received_) <= duration;
33 }
34 
35 bool XmppChannelMux::LastSent(time_t duration) const {
36  return (UTCTimestamp() - last_sent_) <= duration;
37 }
38 
41  return (st == xmsm::ESTABLISHED) ? xmps::READY :
43 }
44 
45 void XmppChannelMux::WriteReady(const boost::system::error_code &ec) {
46  tbb::mutex::scoped_lock lock(mutex_);
47 
48  WriteReadyCbMap::iterator iter = map_.begin();
49  WriteReadyCbMap::iterator next = iter;
50  for (; iter != map_.end(); iter = next) {
51  ++next;
52  SendReadyCb cb = iter->second;
53  cb(ec);
54  map_.erase(iter);
55  }
56 }
57 
58 bool XmppChannelMux::Send(const uint8_t *msg, size_t msgsize,
59  const string *msg_str, xmps::PeerId id,
60  SendReadyCb cb) {
61  if (!connection_) return false;
62 
63  tbb::mutex::scoped_lock lock(mutex_);
65  bool res = connection_->Send(msg, msgsize, msg_str);
66  if (res == false) {
67  RegisterWriteReady(id, cb);
68  }
69  return res;
70 }
71 
73  return connection_->GetTaskInstance();
74 }
75 
77  referers_.insert(id);
78 }
79 
81  referers_.erase(id);
82 }
83 
85  rxmap_.insert(make_pair(id, cb));
86 }
87 
89  ReceiveCbMap::iterator it = rxmap_.find(id);
90  if (it != rxmap_.end()) {
91  rxmap_.erase(it);
92  }
93 
94  if (ReceiverCount())
95  return;
96 
97  XmppServerConnection *server_connection =
98  dynamic_cast<XmppServerConnection *>(connection_);
99 
100  // If GracefulRestart helper mode close process is complete, restart the
101  // state machine to form new session with the client.
102  if (!connection_->IsDeleted() && server_connection &&
103  server_connection->server()->IsGRHelperModeEnabled()) {
104  server_connection->state_machine()->Initialize();
105  return;
106  }
107 
109 }
110 
112  return referers_.size();
113 }
114 
116  return rxmap_.size();
117 }
118 
119 vector<string> XmppChannelMux::GetReceiverList() const {
120  vector<string> receivers;
121  for (const auto& value : rxmap_) {
122  receivers.push_back(xmps::PeerIdToName(value.first));
123  }
124  return receivers;
125 }
126 
127 //
128 // To be called after acquiring mutex
129 //
131  map_.insert(make_pair(id, cb));
132 }
133 
134 //
135 // To be called after acquiring mutex
136 //
138  map_.erase(id);
139 }
140 
141 const std::string &XmppChannelMux::ToString() const {
142  return connection_->ToString();
143 }
144 
145 const std::string &XmppChannelMux::FromString() const {
146  return connection_->FromString();
147 }
148 
149 std::string XmppChannelMux::StateName() const {
150  return connection_->StateName();
151 }
152 
153 std::string XmppChannelMux::AuthType() const {
155 }
156 
157 std::string XmppChannelMux::PeerAddress() const {
158  return connection_->endpoint_string();
159 }
160 
161 inline bool MatchCallback(string to, xmps::PeerId peer) {
162  if ((to.find(XmppInit::kBgpPeer) != string::npos) &&
163  (peer == xmps::BGP)) {
164  return true;
165  }
166  if ((to.find(XmppInit::kConfigPeer) != string::npos) &&
167  (peer == xmps::CONFIG)) {
168  return true;
169  }
170  if ((to.find(XmppInit::kDnsPeer) != string::npos) &&
171  (peer == xmps::DNS)) {
172  return true;
173  }
174  if ((to.find(XmppInit::kOtherPeer) != string::npos) &&
175  (peer == xmps::OTHER)) {
176  return true;
177  }
178  return false;
179 }
180 
183  ReceiveCbMap::iterator iter = rxmap_.begin();
184  for (; iter != rxmap_.end(); ++iter) {
185  if (MatchCallback(msg->to, iter->first)) {
186  ReceiveCb cb = iter->second;
187  cb(msg, GetPeerState());
188  }
189  }
190 }
191 
193  CHECK_CONCURRENCY("xmpp::StateMachine");
195  if (state == xmsm::ESTABLISHED) {
196  st = xmps::READY;
197  } else if (state == xmsm::ACTIVE) {
198  st = xmps::TIMEDOUT;
199  }
200 
201  if (connection_->IsClient()) {
202  XmppClient *client = static_cast<XmppClient *>(connection_->server());
203  client->NotifyConnectionEvent(this, st);
204  } else {
205  // Event to create the peer on server
206  XmppServer *server = static_cast<XmppServer *>(connection_->server());
207  server->NotifyConnectionEvent(this, st);
208  }
209 }
210 
211 std::string XmppChannelMux::LastStateName() const {
212  return connection_->LastStateName();
213 }
215  return connection_->LastStateChangeAt();
216 }
217 std::string XmppChannelMux::LastEvent() const {
218  return connection_->LastEvent();
219 }
220 uint32_t XmppChannelMux::rx_open() const {
221  return connection_->rx_open();
222 }
223 uint32_t XmppChannelMux::rx_close() const {
224  return connection_->rx_close();
225 }
226 uint32_t XmppChannelMux::rx_update() const {
227  return connection_->rx_update();
228 }
230  return connection_->rx_keepalive();
231 }
232 uint32_t XmppChannelMux::tx_open() const {
233  return connection_->tx_open();
234 }
235 uint32_t XmppChannelMux::tx_close() const {
236  return connection_->tx_close();
237 }
238 uint32_t XmppChannelMux::tx_update() const {
239  return connection_->tx_update();
240 }
242  return connection_->tx_keepalive();
243 }
244 uint32_t XmppChannelMux::FlapCount() const {
245  return connection_->flap_count();
246 }
247 std::string XmppChannelMux::LastFlap() const {
248  return connection_->last_flap_at();
249 }
250 
253 }
256 }
257 
258 bool XmppChannelMux::RxMessageTrace(const std::string &to_address,
259  int port,
260  int msg_size,
261  const std::string &msg,
262  const XmppStanza::XmppMessage *xmpp_msg) {
263  if (rx_message_trace_cb_) {
264  return rx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg);
265  }
266  return false;
267 }
268 
269 bool XmppChannelMux::TxMessageTrace(const std::string &to_address,
270  int port,
271  int msg_size,
272  const std::string &msg,
273  const XmppStanza::XmppMessage *xmpp_msg) {
274  if (tx_message_trace_cb_) {
275  return tx_message_trace_cb_(to_address, port, msg_size, msg, xmpp_msg);
276  }
277  return false;
278 }
const std::string & FromString() const
virtual uint32_t rx_keepalive() const
virtual std::string LastStateName() const
virtual void UnRegisterWriteReady(xmps::PeerId id)
virtual void Close()
virtual const std::string last_flap_at() const =0
std::string StateName() const
virtual uint32_t FlapCount() const
virtual xmps::PeerState GetPeerState() const
virtual void RegisterReceive(xmps::PeerId, ReceiveCb)
virtual uint32_t tx_open() const
virtual void ProcessXmppMessage(const XmppStanza::XmppMessage *msg)
virtual std::string LastEvent() const
virtual std::string StateName() const
virtual void RegisterRxMessageTraceCallback(RxMessageTraceCb cb)
boost::function< void(const boost::system::error_code &)> SendReadyCb
Definition: xmpp_channel.h:35
virtual bool Send(const uint8_t *msg, size_t msg_size, xmps::PeerId id, SendReadyCb cb)
std::vector< std::string > GetReceiverList() const
uint32_t tx_update() const
std::string endpoint_string() const
virtual void UnRegisterReferer(xmps::PeerId)
virtual int GetTaskInstance() const
RefererSet referers_
bool MatchCallback(string to, xmps::PeerId peer)
tbb::atomic< time_t > last_received_
ReceiveCbMap rxmap_
RxMessageTraceCb rx_message_trace_cb_
virtual std::string PeerAddress() const
virtual uint32_t rx_update() const
std::string LastStateName() const
virtual std::string AuthType() const
bool IsClient() const
boost::function< bool(const std::string &, int, int, const std::string &, const XmppStanza::XmppMessage *msg) > RxMessageTraceCb
Definition: xmpp_channel.h:44
virtual uint32_t tx_close() const
virtual uint32_t rx_open() const
virtual void UnRegisterReceive(xmps::PeerId)
XmppConnection * connection_
bool Send(const uint8_t *data, size_t size, const std::string *msg_str=NULL)
bool IsDeleted() const
void HandleStateEvent(xmsm::XmState state)
void NotifyConnectionEvent(XmppChannelMux *, xmps::PeerState)
Definition: xmpp_server.cc:463
virtual ~XmppChannelMux()
virtual uint32_t tx_keepalive() const
std::string LastEvent() const
virtual uint32_t rx_close() const
virtual void RetryDelete()=0
#define CHECK_CONCURRENCY(...)
uint32_t rx_open() const
static const char * kDnsPeer
Definition: xmpp_init.h:25
tbb::mutex mutex_
virtual void RegisterReferer(xmps::PeerId)
xmsm::XmState GetStateMcState() const
uint32_t tx_open() const
std::string GetXmppAuthenticationType() const
string PeerIdToName(PeerId id)
Definition: xmpp_channel.cc:11
const std::string & ToString() const
virtual void RegisterTxMessageTraceCallback(TxMessageTraceCb cb)
TcpServer * server()
void WriteReady(const boost::system::error_code &ec)
virtual uint32_t flap_count() const =0
int GetTaskInstance() const
void RegisterWriteReady(xmps::PeerId, SendReadyCb)
virtual bool LastSent(time_t duration) const
static time_t UTCTimestamp()
Definition: time_util.h:23
virtual std::string LastStateChangeAt() const
boost::function< bool(const std::string &, int, int, const std::string &, const XmppStanza::XmppMessage *msg) > TxMessageTraceCb
Definition: xmpp_channel.h:50
virtual bool LastReceived(time_t duration) const
std::string LastStateChangeAt() const
XmppChannelMux(XmppConnection *)
boost::function< void(const XmppStanza::XmppMessage *, xmps::PeerState state) > ReceiveCb
Definition: xmpp_channel.h:38
size_t RefererCount() const
virtual std::string LastFlap() const
static const char * kConfigPeer
Definition: xmpp_init.h:23
void NotifyConnectionEvent(XmppChannelMux *, xmps::PeerState)
Definition: xmpp_client.cc:208
size_t ReceiverCount() const
virtual uint32_t tx_update() const
uint32_t rx_close() const
XmppStateMachine * state_machine()
virtual const std::string & ToString() const
bool IsGRHelperModeEnabled() const
Definition: xmpp_server.cc:264
bool TxMessageTrace(const std::string &to_address, int port, int msg_size, const std::string &msg, const XmppStanza::XmppMessage *xmpp_msg)
static const char * kBgpPeer
Definition: xmpp_init.h:24
TxMessageTraceCb tx_message_trace_cb_
bool RxMessageTrace(const std::string &to_address, int port, int msg_size, const std::string &msg, const XmppStanza::XmppMessage *xmpp_msg)
uint32_t rx_update() const
static const char * kOtherPeer
Definition: xmpp_init.h:27
tbb::atomic< time_t > last_sent_
uint32_t tx_keepalive() const
uint32_t tx_close() const
uint32_t rx_keepalive() const
virtual const std::string & FromString() const
WriteReadyCbMap map_