OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tcp_session.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef SRC_IO_TCP_SESSION_H_
6 #define SRC_IO_TCP_SESSION_H_
7 
8 #include <tbb/mutex.h>
9 #include <tbb/task.h>
10 
11 #include <deque>
12 #include <list>
13 #include <string>
14 
15 #include <boost/asio/buffer.hpp>
16 #include <boost/asio/io_service.hpp>
17 #include <boost/asio/ip/tcp.hpp>
18 #include <boost/asio/strand.hpp>
19 #include <boost/intrusive_ptr.hpp>
20 #include <boost/function.hpp>
21 #include <boost/scoped_ptr.hpp>
22 
23 #ifndef _LIBCPP_VERSION
24 #include <tbb/compat/condition_variable>
25 #endif
26 #include "base/util.h"
27 #include "base/task.h"
28 #include "io/tcp_server.h"
29 
30 #define SSL_SHORT_READ_ERROR 335544539
31 
32 class EventManager;
33 class TcpServer;
34 class TcpSession;
35 class TcpMessageWriter;
36 
37 // TcpSession
38 //
39 // Concurrency: the session is created by the event manager thread, which
40 // also invokes the AsyncHandlers. ReleaseBuffer and Send will typically be
41 // invoked by a different thread.
42 class TcpSession {
43 public:
44  static const int kDefaultBufferSize = 16 * 1024;
45  static const int kDefaultWriteBufferSize = 32 * 1024;
46 
47  enum Event {
53  };
54 
55  enum Direction {
58  };
59 
60  typedef boost::asio::ip::tcp::socket Socket;
61  typedef boost::asio::ip::tcp::socket::native_handle_type NativeSocketType;
62  typedef boost::asio::ip::tcp::endpoint Endpoint;
63  typedef boost::function<void(TcpSession *, Event)> EventObserver;
64  typedef boost::asio::const_buffer Buffer;
65 
66  // TcpSession constructor takes ownership of socket.
68  bool async_read_ready = true,
69  size_t buffer_send_size = TcpSession::kDefaultWriteBufferSize);
70  // Performs a non-blocking send operation.
71  virtual bool Send(const uint8_t *data, size_t size, size_t *sent);
72 
73  // Called by TcpServer to trigger async read.
74  virtual bool Connected(Endpoint remote);
75 
76  // Called by TcpServer to trigger async read.
77  virtual void Accepted();
78 
79  void ConnectFailed();
80 
81  void Close();
82 
83  virtual std::string ToString() const { return name_; }
84 
85  // Getters and setters
86  virtual Socket *socket() const { return socket_.get(); }
87  NativeSocketType sock_descriptor() { return socket_->native_handle(); }
88  TcpServer *server() { return server_.get(); }
89  int32_t local_port() const;
90  int32_t remote_port() const;
91 
92  // Concurrency: changing the observer guarantees mutual exclusion with
93  // the observer invocation. e.g. if the caller sets the observer to NULL
94  // it is guaranteed that the observer will not get invoked after this
95  // method returns.
97 
98  // Buffers must be freed in arrival order.
99  virtual void ReleaseBuffer(Buffer buffer);
100 
101  // This function returns the instance to run SessionTask.
102  // Returning Task::kTaskInstanceAny would allow multiple session tasks to
103  // run in parallel.
104  // Derived class may override implementation if it expects the all the
105  // Tasks of the session to run in specific instance
106  // Note: Two tasks of same task ID and task instance can't run
107  // at in parallel
108  // E.g. BgpSession is created per BgpPeer and to ensure that
109  // there is one SessionTask per peer, PeerIndex is returned
110  // from this function
111  virtual int GetSessionInstance() const;
112 
113  static const uint8_t *BufferData(const Buffer &buffer) {
114  return boost::asio::buffer_cast<const uint8_t *>(buffer);
115  }
116  static size_t BufferSize(const Buffer &buffer) {
117  return boost::asio::buffer_size(buffer);
118  }
119 
120  bool IsEstablished() const {
121  tbb::mutex::scoped_lock lock(mutex_);
122  return established_;
123  }
124 
125  bool IsClosed() const {
126  tbb::mutex::scoped_lock lock(mutex_);
127  return closed_;
128  }
129 
131  if (direction_ == PASSIVE) return true;
132  return false;
133  }
134 
136  return remote_;
137  }
138 
139  const std::string &remote_addr_string() const {
140  return remote_addr_str_;
141  }
142 
143  Endpoint local_endpoint() const;
144 
145  const boost::system::error_code &close_reason() const {
146  return close_reason_;
147  }
148 
149  virtual boost::system::error_code SetSocketOptions();
150  static bool IsSocketErrorHard(const boost::system::error_code &ec);
151  void set_read_on_connect(bool read) { read_on_connect_ = read; }
152  void SessionEstablished(Endpoint remote, Direction direction);
153 
154  virtual void AsyncReadStart();
155  virtual void SetDeferReader(bool defer_reader);
156  // Is the reader deferred ? If reader is deferred, SetDeferReader needs
157  // to be called to undefer/restart reading.
158  virtual bool IsReaderDeferred() const {
159  return defer_reader_;
160  }
161 
162  const io::SocketStats &GetSocketStats() const { return stats_; }
163  void GetRxSocketStats(SocketIOStats *socket_stats) const;
164  void GetTxSocketStats(SocketIOStats *socket_stats) const;
165 
166  void GetRxSocketStats(SocketIOStats &socket_stats) const {
167  GetRxSocketStats(&socket_stats);
168  }
169 
170  void GetTxSocketStats(SocketIOStats &socket_stats) const {
171  GetTxSocketStats(&socket_stats);
172  }
173 
174  int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password);
175  int ClearMd5SocketOption(uint32_t peer_ip);
176  int SetDscpSocketOption(uint8_t value);
177  uint8_t GetDscpValue() const;
178  const std::string &ToUVEKey() const { return uve_key_str_; }
179  boost::system::error_code SetTcpNoDelay();
180  boost::system::error_code SetTcpSendBufSize(uint32_t size);
181  boost::system::error_code SetTcpRecvBufSize(uint32_t size);
182 
183 protected:
184  typedef boost::intrusive_ptr<TcpSession> TcpSessionPtr;
185  static void AsyncReadHandler(TcpSessionPtr session);
186  static void AsyncWriteHandler(TcpSessionPtr session,
187  const boost::system::error_code &error,
188  std::size_t bytes_transferred);
189 
191  virtual Task* CreateReaderTask(boost::asio::mutable_buffer, size_t);
192 
193  virtual ~TcpSession();
194 
195  // Read handler. Called from a TBB task.
196  virtual void OnRead(Buffer buffer) = 0;
197  // Callback after socket is ready for write.
198  virtual void WriteReady(const boost::system::error_code &error);
199 
200  void AsyncWriteInternal(TcpSessionPtr session);
201 
202  virtual void AsyncReadSome();
203  virtual size_t GetReadBufferSize() const;
204  virtual size_t ReadSome(boost::asio::mutable_buffer buffer,
205  boost::system::error_code *error);
206  virtual void AsyncWrite(const uint8_t *data, std::size_t size);
207 
208  virtual int reader_task_id() const {
209  return reader_task_id_;
210  }
211 
212  bool established() const { return established_; }
213 
215  boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time,
216  int keepalive_intvl, int keepalive_probes,
217  int tcp_user_timeout_val = 0);
218 
219  void CloseInternal(const boost::system::error_code &ec,
220  bool call_observer, bool notify_server = true);
221 
223 
224  // Protects session state and buffer queue.
225  mutable tbb::mutex mutex_;
227 
228 protected:
229  typedef boost::asio::strand<boost::asio::io_context::executor_type> Strand;
230  boost::scoped_ptr<Strand> io_strand_;
231 
232 private:
233  class Reader;
234  friend class TcpServer;
235  friend class TcpMessageWriter;
236  friend void intrusive_ptr_add_ref(TcpSession *session);
237  friend void intrusive_ptr_release(TcpSession *session);
238  typedef std::list<boost::asio::mutable_buffer> BufferQueue;
239 
240  static void WriteReadyInternal(TcpSessionPtr session,
241  const boost::system::error_code &error,
242  uint64_t block_start_time);
243  void ReleaseBufferLocked(Buffer buffer);
244  void SetEstablished(Endpoint remote, Direction dir);
245 
246  bool IsClosedLocked() const {
247  return closed_;
248  }
249 
250  bool IsEstablishedLocked() const {
252  }
253 
254  void SetName();
255 
256  boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size);
257  void DeleteBuffer(boost::asio::mutable_buffer buffer);
258 
259  static int reader_task_id_;
260 
262  boost::scoped_ptr<Socket> socket_;
264 
265  /**************** protected by mutex_ ****************/
266  bool established_; // In TCP ESTABLISHED state.
267  bool closed_; // Close has been called.
268  Endpoint remote_; // Remote end-point
269  std::string remote_addr_str_; // Remote end-point address string
270  Direction direction_; // direction (active, passive)
272  boost::system::error_code close_reason_;
273  /**************** end protected by mutex_ ****************/
274 
275  // Protects observer manipulation and invocation. When this lock is
276  // held the session mutex should not be held and vice-versa.
277  tbb::mutex obs_mutex_;
279 
280  boost::scoped_ptr<TcpMessageWriter> writer_;
281 
282  tbb::atomic<int> refcount_;
283  std::string name_;
284  tbb::atomic<bool> defer_reader_;
285  std::string uve_key_str_;
286  tbb::atomic<bool> write_blocked_;
287  tbb::atomic<bool> tcp_close_in_progress_;
288 
290 };
291 
292 inline void intrusive_ptr_add_ref(TcpSession *session) {
293  session->refcount_.fetch_and_increment();
294 }
295 
296 inline void intrusive_ptr_release(TcpSession *session) {
297  int prev = session->refcount_.fetch_and_decrement();
298  if (prev == 1) {
299  delete session;
300  }
301 }
302 
303 // TcpMessageReader
304 //
305 // Provides base implementation of OnRead() for TcpSession assuming
306 // fixed message header length
307 //
309 public:
310  typedef boost::asio::const_buffer Buffer;
311  typedef boost::function<bool(const uint8_t *, size_t)> ReceiveCallback;
312 
313  TcpMessageReader(TcpSession *session, ReceiveCallback callback);
314  virtual ~TcpMessageReader();
315  virtual void OnRead(Buffer buffer);
316 
317 protected:
318  virtual int MsgLength(Buffer buffer, int offset) = 0;
319  virtual const int GetHeaderLenSize() = 0;
320  virtual const int GetMaxMessageSize() = 0;
321 
322 private:
323  typedef std::deque<Buffer> BufferQueue;
324 
325  // Copy the queue into one contiguous buffer.
326  uint8_t *BufferConcat(uint8_t *data, Buffer buffer, int msglength);
327 
328  int QueueByteLength() const;
329 
330  Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const;
331 
332  int AllocBufferSize(int length);
333 
337  int offset_;
338  int remain_;
339 
341 };
342 
343 #endif // SRC_IO_TCP_SESSION_H_
static const int kDefaultWriteBufferSize
Definition: tcp_session.h:45
virtual const int GetHeaderLenSize()=0
void Close()
Definition: tcp_session.cc:354
int intrusive_ptr_add_ref(const AsPath *cpath)
Definition: bgp_aspath.h:147
void set_read_on_connect(bool read)
Definition: tcp_session.h:151
virtual int MsgLength(Buffer buffer, int offset)=0
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_session.h:62
EventObserver observer_
Definition: tcp_session.h:278
boost::asio::const_buffer Buffer
Definition: tcp_session.h:64
boost::intrusive_ptr< TcpSession > TcpSessionPtr
Definition: tcp_session.h:184
Endpoint local_endpoint() const
Definition: tcp_session.cc:205
void DeleteBuffer(boost::asio::mutable_buffer buffer)
Definition: tcp_session.cc:126
void CloseInternal(const boost::system::error_code &ec, bool call_observer, bool notify_server=true)
Definition: tcp_session.cc:305
void AsyncReadStartInternal(TcpSessionPtr session)
Definition: tcp_session.cc:160
TcpSession * session_
Definition: tcp_session.h:334
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
Definition: tcp_session.h:311
static const int kDefaultBufferSize
Definition: tcp_session.h:44
virtual bool IsReaderDeferred() const
Definition: tcp_session.h:158
BufferQueue queue_
Definition: tcp_session.h:336
void AsyncWriteInternal(TcpSessionPtr session)
Definition: tcp_session.cc:417
static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error, std::size_t bytes_transferred)
Definition: tcp_session.cc:378
bool read_on_connect_
Definition: tcp_session.h:263
static int reader_task_id_
Definition: tcp_session.h:259
virtual std::string ToString() const
Definition: tcp_session.h:83
virtual size_t ReadSome(boost::asio::mutable_buffer buffer, boost::system::error_code *error)
Definition: tcp_session.cc:468
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:116
virtual Task * CreateReaderTask(boost::asio::mutable_buffer, size_t)
Definition: tcp_session.cc:460
int32_t remote_port() const
Definition: tcp_session.cc:547
virtual void WriteReady(const boost::system::error_code &error)
Definition: tcp_session.cc:375
static void WriteReadyInternal(TcpSessionPtr session, const boost::system::error_code &error, uint64_t block_start_time)
TcpServerPtr server_
Definition: tcp_session.h:261
bool closed_
Definition: tcp_session.h:267
virtual void Accepted()
Definition: tcp_session.cc:254
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
Definition: tcp_session.cc:802
std::string name_
Definition: tcp_session.h:283
bool IsServerSession()
Definition: tcp_session.h:130
const boost::system::error_code & close_reason() const
Definition: tcp_session.h:145
virtual void SetDeferReader(bool defer_reader)
Definition: tcp_session.cc:182
boost::scoped_ptr< Strand > io_strand_
Definition: tcp_session.h:230
static void AsyncReadHandler(TcpSessionPtr session)
Definition: tcp_session.cc:483
void TriggerAsyncReadHandler()
Definition: tcp_session.cc:346
int ClearMd5SocketOption(uint32_t peer_ip)
Definition: tcp_session.cc:565
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:428
BufferQueue buffer_queue_
Definition: tcp_session.h:271
TcpSession(TcpServer *server, Socket *socket, bool async_read_ready=true, size_t buffer_send_size=TcpSession::kDefaultWriteBufferSize)
Definition: tcp_session.cc:86
tbb::atomic< int > refcount_
Definition: tcp_session.h:282
void SetName()
Definition: tcp_session.cc:223
int32_t local_port() const
Definition: tcp_session.cc:535
boost::asio::ip::tcp::socket Socket
Definition: tcp_session.h:60
io::SocketStats stats_
Definition: tcp_session.h:226
void set_observer(EventObserver observer)
Definition: tcp_session.cc:218
virtual bool Connected(Endpoint remote)
Definition: tcp_session.cc:269
void GetRxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:912
boost::asio::strand< boost::asio::io_context::executor_type > Strand
Definition: tcp_session.h:229
DISALLOW_COPY_AND_ASSIGN(TcpMessageReader)
friend void intrusive_ptr_add_ref(TcpSession *session)
Definition: tcp_session.h:292
std::list< boost::asio::mutable_buffer > BufferQueue
Definition: tcp_session.h:238
virtual void OnRead(Buffer buffer)
Definition: tcp_session.cc:671
boost::system::error_code close_reason_
Definition: tcp_session.h:272
virtual void OnRead(Buffer buffer)=0
tbb::mutex mutex_
Definition: tcp_session.h:225
boost::asio::const_buffer Buffer
Definition: tcp_session.h:310
const std::string & remote_addr_string() const
Definition: tcp_session.h:139
ReceiveCallback callback_
Definition: tcp_session.h:335
static bool IsSocketErrorHard(const boost::system::error_code &ec)
Definition: tcp_session.cc:742
uint8_t * BufferConcat(uint8_t *data, Buffer buffer, int msglength)
Definition: tcp_session.cc:597
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
Definition: tcp_session.cc:199
virtual int reader_task_id() const
Definition: tcp_session.h:208
void SessionEstablished(Endpoint remote, Direction direction)
Definition: tcp_session.cc:245
bool IsClosedLocked() const
Definition: tcp_session.h:246
virtual void AsyncReadSome()
Definition: tcp_session.cc:192
void ConnectFailed()
Definition: tcp_session.cc:297
void GetRxSocketStats(SocketIOStats &socket_stats) const
Definition: tcp_session.h:166
virtual const int GetMaxMessageSize()=0
void GetTxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:916
boost::scoped_ptr< TcpMessageWriter > writer_
Definition: tcp_session.h:280
boost::intrusive_ptr< TcpServer > TcpServerPtr
Definition: tcp_server.h:191
DISALLOW_COPY_AND_ASSIGN(TcpSession)
Direction direction_
Definition: tcp_session.h:270
tbb::atomic< bool > defer_reader_
Definition: tcp_session.h:284
int AllocBufferSize(int length)
Definition: tcp_session.cc:586
tbb::mutex obs_mutex_
Definition: tcp_session.h:277
void SetEstablished(Endpoint remote, Direction dir)
int QueueByteLength() const
Definition: tcp_session.cc:622
bool IsClosed() const
Definition: tcp_session.h:125
friend void intrusive_ptr_release(TcpSession *session)
Definition: tcp_session.h:296
bool established() const
Definition: tcp_session.h:212
bool established_
Definition: tcp_session.h:266
const io::SocketStats & GetSocketStats() const
Definition: tcp_session.h:162
TcpMessageReader(TcpSession *session, ReceiveCallback callback)
Definition: tcp_session.cc:577
std::string uve_key_str_
Definition: tcp_session.h:285
const std::string & ToUVEKey() const
Definition: tcp_session.h:178
std::deque< Buffer > BufferQueue
Definition: tcp_session.h:323
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
Definition: tcp_session.cc:789
int SetDscpSocketOption(uint8_t value)
Definition: tcp_session.cc:569
bool IsEstablishedLocked() const
Definition: tcp_session.h:250
TcpServer * server()
Definition: tcp_session.h:88
Endpoint remote_endpoint() const
Definition: tcp_session.h:135
virtual void ReleaseBuffer(Buffer buffer)
Definition: tcp_session.cc:143
virtual int GetSessionInstance() const
Definition: tcp_session.cc:530
NativeSocketType sock_descriptor()
Definition: tcp_session.h:87
void intrusive_ptr_release(const AsPath *cpath)
Definition: bgp_aspath.h:155
virtual size_t GetReadBufferSize() const
Definition: tcp_session.cc:475
void GetTxSocketStats(SocketIOStats &socket_stats) const
Definition: tcp_session.h:170
Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const
Definition: tcp_session.cc:635
virtual ~TcpMessageReader()
Definition: tcp_session.cc:582
boost::system::error_code SetTcpSendBufSize(uint32_t size)
Definition: tcp_session.cc:776
Endpoint remote_
Definition: tcp_session.h:268
boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size)
Definition: tcp_session.cc:119
virtual void AsyncReadStart()
Definition: tcp_session.cc:174
virtual Socket * socket() const
Definition: tcp_session.h:86
tbb::atomic< bool > write_blocked_
Definition: tcp_session.h:286
virtual ~TcpSession()
Definition: tcp_session.cc:110
void ReleaseBufferLocked(Buffer buffer)
Definition: tcp_session.cc:148
boost::asio::ip::tcp::socket::native_handle_type NativeSocketType
Definition: tcp_session.h:61
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:113
int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password)
Definition: tcp_session.cc:559
boost::function< void(TcpSession *, Event)> EventObserver
Definition: tcp_session.h:63
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
tbb::atomic< bool > tcp_close_in_progress_
Definition: tcp_session.h:287
EventObserver observer()
Definition: tcp_session.h:214
uint8_t GetDscpValue() const
Definition: tcp_session.cc:573
boost::scoped_ptr< Socket > socket_
Definition: tcp_session.h:262
virtual boost::system::error_code SetSocketOptions()
Definition: tcp_session.cc:868
boost::system::error_code SetTcpNoDelay()
Definition: tcp_session.cc:765
std::string remote_addr_str_
Definition: tcp_session.h:269
bool IsEstablished() const
Definition: tcp_session.h:120