5 #ifndef SRC_IO_TCP_SESSION_H_
6 #define SRC_IO_TCP_SESSION_H_
15 #include <boost/asio/buffer.hpp>
16 #include <boost/asio/io_service.hpp>
17 #include <boost/asio/ip/tcp.hpp>
18 #include <boost/asio/strand.hpp>
19 #include <boost/intrusive_ptr.hpp>
20 #include <boost/function.hpp>
21 #include <boost/scoped_ptr.hpp>
23 #ifndef _LIBCPP_VERSION
24 #include <tbb/compat/condition_variable>
30 #define SSL_SHORT_READ_ERROR 335544539
60 typedef boost::asio::ip::tcp::socket
Socket;
62 typedef boost::asio::ip::tcp::endpoint
Endpoint;
64 typedef boost::asio::const_buffer
Buffer;
68 bool async_read_ready =
true,
71 virtual bool Send(
const uint8_t *data,
size_t size,
size_t *sent);
114 return boost::asio::buffer_cast<
const uint8_t *>(buffer);
117 return boost::asio::buffer_size(buffer);
121 tbb::mutex::scoped_lock lock(
mutex_);
126 tbb::mutex::scoped_lock lock(
mutex_);
187 const boost::system::error_code &error,
188 std::size_t bytes_transferred);
198 virtual void WriteReady(
const boost::system::error_code &error);
204 virtual size_t ReadSome(boost::asio::mutable_buffer buffer,
205 boost::system::error_code *error);
206 virtual void AsyncWrite(
const uint8_t *data, std::size_t size);
216 int keepalive_intvl,
int keepalive_probes,
217 int tcp_user_timeout_val = 0);
220 bool call_observer,
bool notify_server =
true);
229 typedef boost::asio::strand<boost::asio::io_context::executor_type>
Strand;
241 const boost::system::error_code &error,
242 uint64_t block_start_time);
293 session->
refcount_.fetch_and_increment();
297 int prev = session->
refcount_.fetch_and_decrement();
310 typedef boost::asio::const_buffer
Buffer;
343 #endif // SRC_IO_TCP_SESSION_H_
static const int kDefaultWriteBufferSize
virtual const int GetHeaderLenSize()=0
int intrusive_ptr_add_ref(const AsPath *cpath)
void set_read_on_connect(bool read)
virtual int MsgLength(Buffer buffer, int offset)=0
boost::asio::ip::tcp::endpoint Endpoint
boost::asio::const_buffer Buffer
boost::intrusive_ptr< TcpSession > TcpSessionPtr
Endpoint local_endpoint() const
void DeleteBuffer(boost::asio::mutable_buffer buffer)
void CloseInternal(const boost::system::error_code &ec, bool call_observer, bool notify_server=true)
void AsyncReadStartInternal(TcpSessionPtr session)
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
static const int kDefaultBufferSize
virtual bool IsReaderDeferred() const
void AsyncWriteInternal(TcpSessionPtr session)
static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error, std::size_t bytes_transferred)
static int reader_task_id_
virtual std::string ToString() const
virtual size_t ReadSome(boost::asio::mutable_buffer buffer, boost::system::error_code *error)
static size_t BufferSize(const Buffer &buffer)
virtual Task * CreateReaderTask(boost::asio::mutable_buffer, size_t)
int32_t remote_port() const
virtual void WriteReady(const boost::system::error_code &error)
static void WriteReadyInternal(TcpSessionPtr session, const boost::system::error_code &error, uint64_t block_start_time)
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
const boost::system::error_code & close_reason() const
virtual void SetDeferReader(bool defer_reader)
boost::scoped_ptr< Strand > io_strand_
static void AsyncReadHandler(TcpSessionPtr session)
void TriggerAsyncReadHandler()
int ClearMd5SocketOption(uint32_t peer_ip)
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
BufferQueue buffer_queue_
TcpSession(TcpServer *server, Socket *socket, bool async_read_ready=true, size_t buffer_send_size=TcpSession::kDefaultWriteBufferSize)
tbb::atomic< int > refcount_
int32_t local_port() const
boost::asio::ip::tcp::socket Socket
void set_observer(EventObserver observer)
virtual bool Connected(Endpoint remote)
void GetRxSocketStats(SocketIOStats *socket_stats) const
boost::asio::strand< boost::asio::io_context::executor_type > Strand
DISALLOW_COPY_AND_ASSIGN(TcpMessageReader)
friend void intrusive_ptr_add_ref(TcpSession *session)
std::list< boost::asio::mutable_buffer > BufferQueue
virtual void OnRead(Buffer buffer)
boost::system::error_code close_reason_
virtual void OnRead(Buffer buffer)=0
boost::asio::const_buffer Buffer
const std::string & remote_addr_string() const
ReceiveCallback callback_
static bool IsSocketErrorHard(const boost::system::error_code &ec)
uint8_t * BufferConcat(uint8_t *data, Buffer buffer, int msglength)
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
virtual int reader_task_id() const
void SessionEstablished(Endpoint remote, Direction direction)
bool IsClosedLocked() const
virtual void AsyncReadSome()
void GetRxSocketStats(SocketIOStats &socket_stats) const
virtual const int GetMaxMessageSize()=0
void GetTxSocketStats(SocketIOStats *socket_stats) const
boost::scoped_ptr< TcpMessageWriter > writer_
boost::intrusive_ptr< TcpServer > TcpServerPtr
DISALLOW_COPY_AND_ASSIGN(TcpSession)
tbb::atomic< bool > defer_reader_
int AllocBufferSize(int length)
void SetEstablished(Endpoint remote, Direction dir)
int QueueByteLength() const
friend void intrusive_ptr_release(TcpSession *session)
const io::SocketStats & GetSocketStats() const
TcpMessageReader(TcpSession *session, ReceiveCallback callback)
const std::string & ToUVEKey() const
std::deque< Buffer > BufferQueue
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
int SetDscpSocketOption(uint8_t value)
bool IsEstablishedLocked() const
Endpoint remote_endpoint() const
virtual void ReleaseBuffer(Buffer buffer)
virtual int GetSessionInstance() const
NativeSocketType sock_descriptor()
void intrusive_ptr_release(const AsPath *cpath)
virtual size_t GetReadBufferSize() const
void GetTxSocketStats(SocketIOStats &socket_stats) const
Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const
virtual ~TcpMessageReader()
boost::system::error_code SetTcpSendBufSize(uint32_t size)
boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size)
virtual void AsyncReadStart()
virtual Socket * socket() const
tbb::atomic< bool > write_blocked_
void ReleaseBufferLocked(Buffer buffer)
boost::asio::ip::tcp::socket::native_handle_type NativeSocketType
static const uint8_t * BufferData(const Buffer &buffer)
int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password)
boost::function< void(TcpSession *, Event)> EventObserver
Task is a wrapper over tbb::task to support policies.
tbb::atomic< bool > tcp_close_in_progress_
uint8_t GetDscpValue() const
boost::scoped_ptr< Socket > socket_
virtual boost::system::error_code SetSocketOptions()
boost::system::error_code SetTcpNoDelay()
std::string remote_addr_str_
bool IsEstablished() const