OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
xmpp_session.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "base/regex.h"
6 #include "xmpp/xmpp_session.h"
7 
8 #include "xmpp/xmpp_connection.h"
9 #include "xmpp/xmpp_log.h"
10 #include "xmpp/xmpp_proto.h"
11 #include "xmpp/xmpp_server.h"
13 
14 #include "sandesh/sandesh_trace.h"
15 #include "sandesh/xmpp_trace_sandesh_types.h"
16 
17 using namespace std;
18 using contrail::regex;
21 
22 using boost::asio::mutable_buffer;
23 
32 
34  bool async_ready)
35  : SslSession(manager, socket, async_ready),
36  manager_(manager),
37  connection_(NULL),
38  tag_known_(0),
39  task_instance_(-1),
40  stats_(XmppStanza::RESERVED_STANZA, XmppSession::StatsPair(0, 0)),
41  keepalive_probes_(kSessionKeepaliveProbes) {
42  buf_.reserve(kMaxMessageSize);
43  offset_ = buf_.begin();
44  stream_open_matched_ = false;
45 }
46 
48  set_observer(NULL);
49  connection_ = NULL;
50 }
51 
53  assert(connection);
54  connection_ = connection;
56 }
57 
58 //
59 // Dissociate the connection from the this XmppSession.
60 // Do not invalidate the task_instance since it can be used to spawn an
61 // io::ReaderTask while this method is being executed.
62 //
64  connection_ = NULL;
65 }
66 
67 //
68 // Concurrency: called in the context of bgp::Config task.
69 //
70 // Process write ready callback.
71 //
73  if (!connection_)
74  return;
76 }
77 
78 //
79 // Concurrency: called in the context of io thread.
80 //
81 // Handle write ready callback.
82 //
83 // Enqueue session to the XmppConnectionManager. The session is added to a
84 // WorkQueue gets processed in the context of bgp::Config task. Doing this
85 // ensures that we don't access the XmppConnection while the XmppConnection
86 // is trying to clear our back pointer to it.
87 //
88 // We can ignore any errors since the StateMachine will get informed of the
89 // TcpSession close independently and react to it.
90 //
91 void XmppSession::WriteReady(const boost::system::error_code &error) {
92  if (error)
93  return;
94  manager_->EnqueueSession(this);
95 }
96 
98  assert (type < (unsigned int)XmppStanza::RESERVED_STANZA);
99  return stats_[type];
100 }
101 
102 void XmppSession::IncStats(unsigned int type, uint64_t bytes) {
103  assert (type < (unsigned int)XmppStanza::RESERVED_STANZA);
104  stats_[type].first++;
105  stats_[type].second += bytes;
106 }
107 
108 boost::system::error_code XmppSession::EnableTcpKeepalive(int hold_time) {
109  char *keepalive_time_str = getenv("TCP_KEEPALIVE_SECONDS");
110  if (keepalive_time_str) {
111  hold_time = strtoul(keepalive_time_str, NULL, 0) * 3;
112  if (!hold_time)
113  return boost::system::error_code();
114  }
115 
116  if (hold_time <= 9) {
117  hold_time = 9; // min hold-time in secs.
118  }
119  hold_time = ((hold_time > 18)? hold_time/2 : hold_time);
120  keepalive_idle_time_ = hold_time/3;
122  ((hold_time - keepalive_idle_time_)/keepalive_probes_);
123  tcp_user_timeout_ = (hold_time * 1000); // msec
124 
129 }
130 
132  std::string token("</");
133  token += ++tag;
134  token += "[\\s\\t\\r\\n]*>";
135 
136  regex exp(token.c_str());
137  return exp;
138 }
139 
140 void XmppSession::SetBuf(const std::string &str) {
141  if (buf_.empty()) {
142  ReplaceBuf(str);
143  } else {
144  int pos = offset_ - buf_.begin();
145  buf_ += str;
146  offset_ = buf_.begin() + pos;
147  }
148 }
149 
150 void XmppSession::ReplaceBuf(const std::string &str) {
151  buf_ = str;
152  buf_.reserve(kMaxMessageSize+8);
153  offset_ = buf_.begin();
154 }
155 
156 bool XmppSession::LeftOver() const {
157  if (buf_.empty())
158  return false;
159  return (buf_.end() != offset_);
160 }
161 
162 // Match a pattern in the buffer. Partially matched string is
163 // kept in buf_ for use in conjucntion with next buffer read.
164 int XmppSession::MatchRegex(const regex &patt) {
165 
166  std::string::const_iterator end = buf_.end();
167 
168  if (regex_search(offset_, end, res_, patt,
169  boost::match_default | boost::match_partial) == 0) {
170  return -1;
171  }
172  if(res_[0].matched == false) {
173  // partial match
174  offset_ = res_[0].first;
175  return 1;
176  } else {
177  begin_tag_ = string(res_[0].first, res_[0].second);
178  offset_ = res_[0].second;
179  return 0;
180  }
181 }
182 
183 bool XmppSession::Match(Buffer buffer, int *result, bool NewBuf) {
184  const XmppConnection *connection = this->Connection();
185 
186  if (connection == NULL) {
187  return true;
188  }
189 
190  xmsm::XmState state = connection->GetStateMcState();
191  xmsm::XmOpenConfirmState oc_state =
192  connection->GetStateMcOpenConfirmState();
193 
194  if (NewBuf) {
195  const uint8_t *cp = BufferData(buffer);
196  // TODO Avoid this copy
197  std::string str(cp, cp + BufferSize(buffer));
198  XmppSession::SetBuf(str);
199  }
200 
201  int m = -1;
202  *result = 0;
203  do {
204  if (!tag_known_) {
205  // check for whitespaces
206  size_t pos = buf_.find_first_not_of(sXMPP_VALIDWS);
207  if (pos != 0) {
208  if (pos == string::npos) pos = buf_.size();
209  offset_ = buf_.begin() + pos;
210  return false;
211  }
212  }
213 
214  if (state == xmsm::ACTIVE || state == xmsm::IDLE) {
216  } else if (state == xmsm::CONNECT || state == xmsm::OPENSENT) {
217  // Note, these are client only states
218  if (!stream_open_matched_) {
220  if ((m == 0) && (tag_known_)) {
221  stream_open_matched_ = true;
222  }
223  } else {
226  }
227  } else if ((state == xmsm::OPENCONFIRM) && !(IsSslDisabled())) {
228  if (connection->IsClient()) {
229  if (oc_state == xmsm::OPENCONFIRM_FEATURE_NEGOTIATION) {
231  if ((m == 0) && (tag_known_)) {
232  // set the flag, as we do not want OnRead function to
233  // read any more data from basic socket.
235  }
236  } else if (oc_state == xmsm::OPENCONFIRM_FEATURE_SUCCESS) {
238  } else {
241  }
242  } else {
243  if (oc_state == xmsm::OPENCONFIRM_FEATURE_SUCCESS) {
245  } else {
247  if ((m == 0) && (tag_known_)) {
249  }
250  }
251  }
252  } else if (state == xmsm::OPENCONFIRM || state == xmsm::ESTABLISHED) {
254  }
255 
256  if (m == 0) { // full match
257  *result = 0;
258  tag_known_ ^= 1;
259  if (!tag_known_) {
260  // Found well formed xml
261  return false;
262  }
263  } else if (m == -1) { // no match
264  return true;
265  } else {
266  return true; // partial. read more
267  }
268  } while (true);
269 
270  return true;
271 }
272 
273 // Read the socket stream and send messages to the connection object.
274 // The buffer is copied to local string for regex match.
275 // TODO Code need to change st Match() is done on buffer itself.
277  if (this->Connection() == NULL || !connection_) {
278  // Connection is deleted. Session is being deleted as well
279  // Drop the packet.
280  ReleaseBuffer(buffer);
281  return;
282  }
283 
284  if (connection_->disable_read()) {
285  ReleaseBuffer(buffer);
286 
287  // Reset the hold timer as we did receive some thing from the peer
289  return;
290  }
291 
292  int result = 0;
293  bool more = Match(buffer, &result, true);
294  do {
295  if (more == false) {
296  if (result < 0) {
297  // TODO generate error, close connection.
298  break;
299  }
300 
301  // We got good match. Process the message
302  std::string::const_iterator st = buf_.begin();
303  std::string xml = string(st, offset_);
304  // Ensure we have not reached the end
305  if (buf_.begin() == offset_) { // xml.size() == 0
306  buf_.clear();
307  break;
308  }
309 
310  connection_->ReceiveMsg(this, xml);
311 
312  } else {
313  // Read more data. Either we have partial match
314  // or no match but in this state we need to keep
315  // reading data.
316  break;
317  }
318 
319  if (LeftOver()) {
320  std::string::const_iterator st = buf_.end();
321  ReplaceBuf(string(offset_, st));
322  more = Match(buffer, &result, false);
323  } else {
324  // No more data in the Buffer
325  buf_.clear();
326  break;
327  }
328  } while (true);
329 
330  ReleaseBuffer(buffer);
331  return;
332 }
XmppConnectionManager * manager_
Definition: xmpp_session.h:62
virtual void WriteReady()
int keepalive_interval_
Definition: xmpp_session.h:73
static const contrail::regex starttls_patt_
Definition: xmpp_session.h:83
void ClearConnection()
Definition: xmpp_session.cc:63
virtual void WriteReady(const boost::system::error_code &error)
Definition: xmpp_session.cc:91
virtual ~XmppSession()
Definition: xmpp_session.cc:47
boost::asio::const_buffer Buffer
Definition: tcp_session.h:64
bool IsSslDisabled()
Definition: ssl_session.h:30
XmppSession(XmppConnectionManager *manager, SslSocket *sock, bool async_ready=true)
Definition: xmpp_session.cc:33
bool disable_read() const
virtual void StartHoldTimer()
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:116
#define rXMPP_MESSAGE
Definition: xmpp_str.h:57
std::string::const_iterator offset_
Definition: xmpp_session.h:67
std::string begin_tag_
Definition: xmpp_session.h:65
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
Definition: tcp_session.cc:802
boost::system::error_code EnableTcpKeepalive(int tcp_hold_time)
int keepalive_idle_time_
Definition: xmpp_session.h:72
boost::asio::ssl::stream< boost::asio::ip::tcp::socket > SslSocket
Definition: ssl_session.h:18
#define sXMPP_VALIDWS
Definition: xmpp_str.h:48
static const contrail::regex stream_features_patt_
Definition: xmpp_session.h:82
#define rXMPP_STREAM_START
Definition: xmpp_str.h:58
bool IsClient() const
void EnqueueSession(XmppSession *session)
xmsm::XmOpenConfirmState GetStateMcOpenConfirmState() const
void set_observer(EventObserver observer)
Definition: tcp_session.cc:218
static bool regex_match(const std::string &input, const regex &regex)
Definition: regex.h:34
static const contrail::regex proceed_patt_
Definition: xmpp_session.h:84
static const contrail::regex end_patt_
Definition: xmpp_session.h:85
StatsPair Stats(unsigned int message_type) const
Definition: xmpp_session.cc:97
uint8_t type
Definition: load_balance.h:109
XmppConnection * Connection()
Definition: xmpp_session.h:26
bool LeftOver() const
void ReplaceBuf(const std::string &)
std::vector< StatsPair > stats_
Definition: xmpp_session.h:71
static const contrail::regex whitespace_
Definition: xmpp_session.h:81
static const contrail::regex stream_res_end_
Definition: xmpp_session.h:80
void IncStats(unsigned int message_type, uint64_t bytes)
#define rXMPP_STREAM_STARTTLS
Definition: xmpp_str.h:61
int task_instance_
Definition: xmpp_session.h:69
contrail::regex tag_to_pattern(const char *)
xmsm::XmState GetStateMcState() const
static bool regex_search(const std::string &input, const regex &regex)
Definition: regex.h:25
static const int kMaxMessageSize
Definition: xmpp_session.h:35
int tcp_user_timeout_
Definition: xmpp_session.h:75
std::pair< uint64_t, uint64_t > StatsPair
Definition: xmpp_session.h:31
#define rXMPP_STREAM_END
Definition: xmpp_str.h:59
virtual void ReceiveMsg(XmppSession *session, const std::string &)
XmppConnection * connection_
Definition: xmpp_session.h:63
int GetTaskInstance() const
static const contrail::regex stream_patt_
Definition: xmpp_session.h:79
int keepalive_probes_
Definition: xmpp_session.h:74
static const contrail::regex patt_
Definition: xmpp_session.h:78
void SetConnection(XmppConnection *connection)
Definition: xmpp_session.cc:52
void SetBuf(const std::string &)
boost::match_results< std::string::const_iterator > res_
Definition: xmpp_session.h:70
int MatchRegex(const contrail::regex &patt)
std::string buf_
Definition: xmpp_session.h:66
virtual void ReleaseBuffer(Buffer buffer)
Definition: tcp_session.cc:143
virtual void OnRead(Buffer buffer)
#define rXMPP_STREAM_FEATURES
Definition: xmpp_str.h:60
void SetSslHandShakeInProgress(bool state)
Definition: ssl_session.h:52
void ProcessWriteReady()
Definition: xmpp_session.cc:72
bool stream_open_matched_
Definition: xmpp_session.h:76
XmppStateMachine * state_machine()
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:113
bool Match(Buffer buffer, int *result, bool NewBuf)
#define sXMPP_WHITESPACE
Definition: xmpp_str.h:46
#define rXMPP_STREAM_STANZA_END
Definition: xmpp_str.h:63
#define rXMPP_STREAM_PROCEED
Definition: xmpp_str.h:62