OpenSDN source code
tcp_session.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef SRC_IO_TCP_SESSION_H_
6 #define SRC_IO_TCP_SESSION_H_
7 
8 #include <deque>
9 #include <list>
10 #include <string>
11 #include <mutex>
12 #include <atomic>
13 
14 #include <boost/asio/buffer.hpp>
15 #include <boost/asio/io_service.hpp>
16 #include <boost/asio/ip/tcp.hpp>
17 #include <boost/asio/strand.hpp>
18 #include <boost/intrusive_ptr.hpp>
19 #include <boost/function.hpp>
20 #include <boost/scoped_ptr.hpp>
21 
22 #include "base/util.h"
23 #include "base/task.h"
24 #include "io/tcp_server.h"
25 
26 #define SSL_SHORT_READ_ERROR 335544539
27 
28 class EventManager;
29 class TcpServer;
30 class TcpSession;
31 class TcpMessageWriter;
32 
33 // TcpSession
34 //
35 // Concurrency: the session is created by the event manager thread, which
36 // also invokes the AsyncHandlers. ReleaseBuffer and Send will typically be
37 // invoked by a different thread.
38 class TcpSession {
39 public:
40  static const int kDefaultBufferSize = 16 * 1024;
41  static const int kDefaultWriteBufferSize = 32 * 1024;
42 
43  enum Event {
48  CLOSE
49  };
50 
51  enum Direction {
53  PASSIVE
54  };
55 
56  typedef boost::asio::ip::tcp::socket Socket;
57  typedef boost::asio::ip::tcp::socket::native_handle_type NativeSocketType;
58  typedef boost::asio::ip::tcp::endpoint Endpoint;
59  typedef boost::function<void(TcpSession *, Event)> EventObserver;
60  typedef boost::asio::const_buffer Buffer;
61 
62  // TcpSession constructor takes ownership of socket.
64  bool async_read_ready = true,
65  size_t buffer_send_size = TcpSession::kDefaultWriteBufferSize);
66  // Performs a non-blocking send operation.
67  virtual bool Send(const uint8_t *data, size_t size, size_t *sent);
68 
69  // Called by TcpServer to trigger async read.
70  virtual bool Connected(Endpoint remote);
71 
72  // Called by TcpServer to trigger async read.
73  virtual void Accepted();
74 
75  void ConnectFailed();
76 
77  void Close();
78 
79  virtual std::string ToString() const { return name_; }
80 
81  // Getters and setters
82  virtual Socket *socket() const { return socket_.get(); }
83  NativeSocketType sock_descriptor() { return socket_->native_handle(); }
84  TcpServer *server() { return server_.get(); }
85  int32_t local_port() const;
86  int32_t remote_port() const;
87 
88  // Concurrency: changing the observer guarantees mutual exclusion with
89  // the observer invocation. e.g. if the caller sets the observer to NULL
90  // it is guaranteed that the observer will not get invoked after this
91  // method returns.
93 
94  // Buffers must be freed in arrival order.
95  virtual void ReleaseBuffer(Buffer buffer);
96 
97  // This function returns the instance to run SessionTask.
98  // Returning Task::kTaskInstanceAny would allow multiple session tasks to
99  // run in parallel.
100  // Derived class may override implementation if it expects the all the
101  // Tasks of the session to run in specific instance
102  // Note: Two tasks of same task ID and task instance can't run
103  // at in parallel
104  // E.g. BgpSession is created per BgpPeer and to ensure that
105  // there is one SessionTask per peer, PeerIndex is returned
106  // from this function
107  virtual int GetSessionInstance() const;
108 
109  static const uint8_t *BufferData(const Buffer &buffer) {
110  return boost::asio::buffer_cast<const uint8_t *>(buffer);
111  }
112  static size_t BufferSize(const Buffer &buffer) {
113  return boost::asio::buffer_size(buffer);
114  }
115 
116  bool IsEstablished() const {
117  std::scoped_lock lock(mutex_);
118  return established_;
119  }
120 
121  bool IsClosed() const {
122  std::scoped_lock lock(mutex_);
123  return closed_;
124  }
125 
127  if (direction_ == PASSIVE) return true;
128  return false;
129  }
130 
132  return remote_;
133  }
134 
135  const std::string &remote_addr_string() const {
136  return remote_addr_str_;
137  }
138 
139  Endpoint local_endpoint() const;
140 
141  const boost::system::error_code &close_reason() const {
142  return close_reason_;
143  }
144 
145  virtual boost::system::error_code SetSocketOptions();
146  static bool IsSocketErrorHard(const boost::system::error_code &ec);
147  void set_read_on_connect(bool read) { read_on_connect_ = read; }
148  void SessionEstablished(Endpoint remote, Direction direction);
149 
150  virtual void AsyncReadStart();
151  virtual void SetDeferReader(bool defer_reader);
152  // Is the reader deferred ? If reader is deferred, SetDeferReader needs
153  // to be called to undefer/restart reading.
154  virtual bool IsReaderDeferred() const {
155  return defer_reader_;
156  }
157 
158  const io::SocketStats &GetSocketStats() const { return stats_; }
159  void GetRxSocketStats(SocketIOStats *socket_stats) const;
160  void GetTxSocketStats(SocketIOStats *socket_stats) const;
161 
162  void GetRxSocketStats(SocketIOStats &socket_stats) const {
163  GetRxSocketStats(&socket_stats);
164  }
165 
166  void GetTxSocketStats(SocketIOStats &socket_stats) const {
167  GetTxSocketStats(&socket_stats);
168  }
169 
170  int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password);
171  int ClearMd5SocketOption(uint32_t peer_ip);
172  int SetDscpSocketOption(uint8_t value);
173  uint8_t GetDscpValue() const;
174  const std::string &ToUVEKey() const { return uve_key_str_; }
175  boost::system::error_code SetTcpNoDelay();
176  boost::system::error_code SetTcpSendBufSize(uint32_t size);
177  boost::system::error_code SetTcpRecvBufSize(uint32_t size);
178 
179 protected:
180  typedef boost::intrusive_ptr<TcpSession> TcpSessionPtr;
181  static void AsyncReadHandler(TcpSessionPtr session);
182  static void AsyncWriteHandler(TcpSessionPtr session,
183  const boost::system::error_code &error,
184  std::size_t bytes_transferred);
185 
187  virtual Task* CreateReaderTask(boost::asio::mutable_buffer, size_t);
188 
189  virtual ~TcpSession();
190 
191  // Read handler. Called from a TBB task.
192  virtual void OnRead(Buffer buffer) = 0;
193  // Callback after socket is ready for write.
194  virtual void WriteReady(const boost::system::error_code &error);
195 
196  void AsyncWriteInternal(TcpSessionPtr session);
197 
198  virtual void AsyncReadSome();
199  virtual size_t GetReadBufferSize() const;
200  virtual size_t ReadSome(boost::asio::mutable_buffer buffer,
201  boost::system::error_code *error);
202  virtual void AsyncWrite(const uint8_t *data, std::size_t size);
203 
204  virtual int reader_task_id() const {
205  return reader_task_id_;
206  }
207 
208  bool established() const { return established_; }
209 
211  boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time,
212  int keepalive_intvl, int keepalive_probes,
213  int tcp_user_timeout_val = 0);
214 
215  void CloseInternal(const boost::system::error_code &ec,
216  bool call_observer, bool notify_server = true);
217 
219 
220  // Protects session state and buffer queue.
221  mutable std::mutex mutex_;
223 
224 protected:
225  typedef boost::asio::strand<boost::asio::io_context::executor_type> Strand;
226  boost::scoped_ptr<Strand> io_strand_;
227 
228 private:
229  class Reader;
230  friend class TcpServer;
231  friend class TcpMessageWriter;
232  friend void intrusive_ptr_add_ref(TcpSession *session);
233  friend void intrusive_ptr_release(TcpSession *session);
234  typedef std::list<boost::asio::mutable_buffer> BufferQueue;
235 
236  static void WriteReadyInternal(TcpSessionPtr session,
237  const boost::system::error_code &error,
238  uint64_t block_start_time);
239  void ReleaseBufferLocked(Buffer buffer);
240  void SetEstablished(Endpoint remote, Direction dir);
241 
242  bool IsClosedLocked() const {
243  return closed_;
244  }
245 
246  bool IsEstablishedLocked() const {
248  }
249 
250  void SetName();
251 
252  boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size);
253  void DeleteBuffer(boost::asio::mutable_buffer buffer);
254 
255  static int reader_task_id_;
256 
258  boost::scoped_ptr<Socket> socket_;
260 
261  /**************** protected by mutex_ ****************/
262  bool established_; // In TCP ESTABLISHED state.
263  bool closed_; // Close has been called.
264  Endpoint remote_; // Remote end-point
265  std::string remote_addr_str_; // Remote end-point address string
266  Direction direction_; // direction (active, passive)
268  boost::system::error_code close_reason_;
269  /**************** end protected by mutex_ ****************/
270 
271  // Protects observer manipulation and invocation. When this lock is
272  // held the session mutex should not be held and vice-versa.
273  std::mutex obs_mutex_;
275 
276  boost::scoped_ptr<TcpMessageWriter> writer_;
277 
278  std::atomic<int> refcount_;
279  std::string name_;
280  std::atomic<bool> defer_reader_;
281  std::string uve_key_str_;
282  std::atomic<bool> write_blocked_;
283  std::atomic<bool> tcp_close_in_progress_;
284 
286 };
287 
288 inline void intrusive_ptr_add_ref(TcpSession *session) {
289  session->refcount_.fetch_add(1);
290 }
291 
292 inline void intrusive_ptr_release(TcpSession *session) {
293  int prev = session->refcount_.fetch_sub(1);
294  if (prev == 1) {
295  delete session;
296  }
297 }
298 
299 // TcpMessageReader
300 //
301 // Provides base implementation of OnRead() for TcpSession assuming
302 // fixed message header length
303 //
305 public:
306  typedef boost::asio::const_buffer Buffer;
307  typedef boost::function<bool(const uint8_t *, size_t)> ReceiveCallback;
308 
309  TcpMessageReader(TcpSession *session, ReceiveCallback callback);
310  virtual ~TcpMessageReader();
311  virtual void OnRead(Buffer buffer);
312 
313 protected:
314  virtual int MsgLength(Buffer buffer, int offset) = 0;
315  virtual const int GetHeaderLenSize() = 0;
316  virtual const int GetMaxMessageSize() = 0;
317 
318 private:
319  typedef std::deque<Buffer> BufferQueue;
320 
321  // Copy the queue into one contiguous buffer.
322  uint8_t *BufferConcat(uint8_t *data, Buffer buffer, int msglength);
323 
324  int QueueByteLength() const;
325 
326  Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const;
327 
328  int AllocBufferSize(int length);
329 
333  int offset_;
334  int remain_;
335 
337 };
338 
339 #endif // SRC_IO_TCP_SESSION_H_
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
TcpMessageReader(TcpSession *session, ReceiveCallback callback)
Definition: tcp_session.cc:578
Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const
Definition: tcp_session.cc:636
int QueueByteLength() const
Definition: tcp_session.cc:623
int AllocBufferSize(int length)
Definition: tcp_session.cc:587
virtual void OnRead(Buffer buffer)
Definition: tcp_session.cc:672
ReceiveCallback callback_
Definition: tcp_session.h:331
virtual ~TcpMessageReader()
Definition: tcp_session.cc:583
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
Definition: tcp_session.h:307
virtual const int GetMaxMessageSize()=0
BufferQueue queue_
Definition: tcp_session.h:332
TcpSession * session_
Definition: tcp_session.h:330
std::deque< Buffer > BufferQueue
Definition: tcp_session.h:319
virtual int MsgLength(Buffer buffer, int offset)=0
uint8_t * BufferConcat(uint8_t *data, Buffer buffer, int msglength)
Definition: tcp_session.cc:598
boost::asio::const_buffer Buffer
Definition: tcp_session.h:306
DISALLOW_COPY_AND_ASSIGN(TcpMessageReader)
virtual const int GetHeaderLenSize()=0
@ CONNECT_COMPLETE
Definition: tcp_session.h:46
@ CONNECT_FAILED
Definition: tcp_session.h:47
virtual size_t GetReadBufferSize() const
Definition: tcp_session.cc:476
void SetName()
Definition: tcp_session.cc:224
virtual int reader_task_id() const
Definition: tcp_session.h:204
TcpServer * server()
Definition: tcp_session.h:84
static void AsyncReadHandler(TcpSessionPtr session)
Definition: tcp_session.cc:484
bool IsServerSession()
Definition: tcp_session.h:126
const io::SocketStats & GetSocketStats() const
Definition: tcp_session.h:158
virtual int GetSessionInstance() const
Definition: tcp_session.cc:531
DISALLOW_COPY_AND_ASSIGN(TcpSession)
int ClearMd5SocketOption(uint32_t peer_ip)
Definition: tcp_session.cc:566
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
Definition: tcp_session.cc:200
static bool IsSocketErrorHard(const boost::system::error_code &ec)
Definition: tcp_session.cc:743
boost::asio::ip::tcp::socket::native_handle_type NativeSocketType
Definition: tcp_session.h:57
void TriggerAsyncReadHandler()
Definition: tcp_session.cc:347
boost::scoped_ptr< Strand > io_strand_
Definition: tcp_session.h:226
void GetTxSocketStats(SocketIOStats &socket_stats) const
Definition: tcp_session.h:166
virtual void WriteReady(const boost::system::error_code &error)
Definition: tcp_session.cc:376
std::atomic< bool > defer_reader_
Definition: tcp_session.h:280
const std::string & ToUVEKey() const
Definition: tcp_session.h:174
int32_t remote_port() const
Definition: tcp_session.cc:548
bool established() const
Definition: tcp_session.h:208
const boost::system::error_code & close_reason() const
Definition: tcp_session.h:141
void AsyncReadStartInternal(TcpSessionPtr session)
Definition: tcp_session.cc:161
boost::scoped_ptr< Socket > socket_
Definition: tcp_session.h:258
virtual Task * CreateReaderTask(boost::asio::mutable_buffer, size_t)
Definition: tcp_session.cc:461
std::atomic< bool > tcp_close_in_progress_
Definition: tcp_session.h:283
boost::system::error_code SetTcpSendBufSize(uint32_t size)
Definition: tcp_session.cc:777
std::mutex obs_mutex_
Definition: tcp_session.h:273
virtual void AsyncReadSome()
Definition: tcp_session.cc:193
uint8_t GetDscpValue() const
Definition: tcp_session.cc:574
const std::string & remote_addr_string() const
Definition: tcp_session.h:135
boost::asio::strand< boost::asio::io_context::executor_type > Strand
Definition: tcp_session.h:225
void AsyncWriteInternal(TcpSessionPtr session)
Definition: tcp_session.cc:418
static const int kDefaultWriteBufferSize
Definition: tcp_session.h:41
void ReleaseBufferLocked(Buffer buffer)
Definition: tcp_session.cc:149
bool IsClosed() const
Definition: tcp_session.h:121
virtual void OnRead(Buffer buffer)=0
static int reader_task_id_
Definition: tcp_session.h:255
virtual Socket * socket() const
Definition: tcp_session.h:82
std::atomic< bool > write_blocked_
Definition: tcp_session.h:282
void GetRxSocketStats(SocketIOStats &socket_stats) const
Definition: tcp_session.h:162
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:109
NativeSocketType sock_descriptor()
Definition: tcp_session.h:83
void ConnectFailed()
Definition: tcp_session.cc:298
virtual std::string ToString() const
Definition: tcp_session.h:79
static void WriteReadyInternal(TcpSessionPtr session, const boost::system::error_code &error, uint64_t block_start_time)
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
Definition: tcp_session.cc:790
int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password)
Definition: tcp_session.cc:560
void set_read_on_connect(bool read)
Definition: tcp_session.h:147
BufferQueue buffer_queue_
Definition: tcp_session.h:267
boost::system::error_code SetTcpNoDelay()
Definition: tcp_session.cc:766
static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error, std::size_t bytes_transferred)
Definition: tcp_session.cc:379
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:112
void GetRxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:913
void DeleteBuffer(boost::asio::mutable_buffer buffer)
Definition: tcp_session.cc:127
void SessionEstablished(Endpoint remote, Direction direction)
Definition: tcp_session.cc:246
virtual void ReleaseBuffer(Buffer buffer)
Definition: tcp_session.cc:144
boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size)
Definition: tcp_session.cc:120
void set_observer(EventObserver observer)
Definition: tcp_session.cc:219
std::list< boost::asio::mutable_buffer > BufferQueue
Definition: tcp_session.h:234
virtual ~TcpSession()
Definition: tcp_session.cc:111
boost::function< void(TcpSession *, Event)> EventObserver
Definition: tcp_session.h:59
std::atomic< int > refcount_
Definition: tcp_session.h:278
virtual void SetDeferReader(bool defer_reader)
Definition: tcp_session.cc:183
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_session.h:58
std::mutex mutex_
Definition: tcp_session.h:221
Endpoint remote_endpoint() const
Definition: tcp_session.h:131
boost::asio::const_buffer Buffer
Definition: tcp_session.h:60
void SetEstablished(Endpoint remote, Direction dir)
boost::system::error_code close_reason_
Definition: tcp_session.h:268
boost::scoped_ptr< TcpMessageWriter > writer_
Definition: tcp_session.h:276
TcpServerPtr server_
Definition: tcp_session.h:257
EventObserver observer_
Definition: tcp_session.h:274
void CloseInternal(const boost::system::error_code &ec, bool call_observer, bool notify_server=true)
Definition: tcp_session.cc:306
bool IsEstablished() const
Definition: tcp_session.h:116
virtual bool IsReaderDeferred() const
Definition: tcp_session.h:154
TcpSession(TcpServer *server, Socket *socket, bool async_read_ready=true, size_t buffer_send_size=TcpSession::kDefaultWriteBufferSize)
Definition: tcp_session.cc:87
Direction direction_
Definition: tcp_session.h:266
bool closed_
Definition: tcp_session.h:263
friend void intrusive_ptr_add_ref(TcpSession *session)
Definition: tcp_session.h:288
boost::intrusive_ptr< TcpSession > TcpSessionPtr
Definition: tcp_session.h:180
bool IsClosedLocked() const
Definition: tcp_session.h:242
bool IsEstablishedLocked() const
Definition: tcp_session.h:246
virtual boost::system::error_code SetSocketOptions()
Definition: tcp_session.cc:869
Endpoint local_endpoint() const
Definition: tcp_session.cc:206
EventObserver observer()
Definition: tcp_session.h:210
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
Definition: tcp_session.cc:803
bool established_
Definition: tcp_session.h:262
void GetTxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:917
std::string name_
Definition: tcp_session.h:279
virtual void Accepted()
Definition: tcp_session.cc:255
Endpoint remote_
Definition: tcp_session.h:264
static const int kDefaultBufferSize
Definition: tcp_session.h:40
std::string remote_addr_str_
Definition: tcp_session.h:265
virtual bool Connected(Endpoint remote)
Definition: tcp_session.cc:270
boost::asio::ip::tcp::socket Socket
Definition: tcp_session.h:56
virtual void AsyncReadStart()
Definition: tcp_session.cc:175
io::SocketStats stats_
Definition: tcp_session.h:222
std::string uve_key_str_
Definition: tcp_session.h:281
int32_t local_port() const
Definition: tcp_session.cc:536
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:429
virtual size_t ReadSome(boost::asio::mutable_buffer buffer, boost::system::error_code *error)
Definition: tcp_session.cc:469
int SetDscpSocketOption(uint8_t value)
Definition: tcp_session.cc:570
friend void intrusive_ptr_release(TcpSession *session)
Definition: tcp_session.h:292
void Close()
Definition: tcp_session.cc:355
bool read_on_connect_
Definition: tcp_session.h:259
boost::intrusive_ptr< TcpServer > TcpServerPtr
Definition: tcp_server.h:191
void intrusive_ptr_add_ref(TcpSession *session)
Definition: tcp_session.h:288
void intrusive_ptr_release(TcpSession *session)
Definition: tcp_session.h:292