5 #ifndef SRC_IO_TCP_SESSION_H_
6 #define SRC_IO_TCP_SESSION_H_
15 #include <boost/asio/buffer.hpp>
16 #include <boost/asio/io_service.hpp>
17 #include <boost/asio/ip/tcp.hpp>
18 #include <boost/asio/strand.hpp>
19 #include <boost/intrusive_ptr.hpp>
20 #include <boost/function.hpp>
21 #include <boost/scoped_ptr.hpp>
23 #ifndef _LIBCPP_VERSION
24 #include <tbb/compat/condition_variable>
30 #define SSL_SHORT_READ_ERROR 335544539
60 typedef boost::asio::ip::tcp::socket
Socket;
62 typedef boost::asio::ip::tcp::endpoint
Endpoint;
64 typedef boost::asio::const_buffer
Buffer;
68 bool async_read_ready =
true,
71 virtual bool Send(
const uint8_t *data,
size_t size,
size_t *sent);
114 return boost::asio::buffer_cast<const uint8_t *>(buffer);
117 return boost::asio::buffer_size(buffer);
121 tbb::mutex::scoped_lock lock(
mutex_);
126 tbb::mutex::scoped_lock lock(
mutex_);
187 const boost::system::error_code &error,
188 std::size_t bytes_transferred);
198 virtual void WriteReady(
const boost::system::error_code &error);
204 virtual size_t ReadSome(boost::asio::mutable_buffer buffer,
205 boost::system::error_code *error);
206 virtual void AsyncWrite(
const uint8_t *data, std::size_t size);
216 int keepalive_intvl,
int keepalive_probes,
217 int tcp_user_timeout_val = 0);
220 bool call_observer,
bool notify_server =
true);
229 typedef boost::asio::strand<boost::asio::io_context::executor_type>
Strand;
241 const boost::system::error_code &error,
242 uint64_t block_start_time);
293 session->
refcount_.fetch_and_increment();
297 int prev = session->
refcount_.fetch_and_decrement();
310 typedef boost::asio::const_buffer
Buffer;
Task is a wrapper over tbb::task to support policies.
TcpMessageReader(TcpSession *session, ReceiveCallback callback)
Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const
int QueueByteLength() const
int AllocBufferSize(int length)
virtual void OnRead(Buffer buffer)
ReceiveCallback callback_
virtual ~TcpMessageReader()
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
virtual const int GetMaxMessageSize()=0
std::deque< Buffer > BufferQueue
virtual int MsgLength(Buffer buffer, int offset)=0
uint8_t * BufferConcat(uint8_t *data, Buffer buffer, int msglength)
boost::asio::const_buffer Buffer
DISALLOW_COPY_AND_ASSIGN(TcpMessageReader)
virtual const int GetHeaderLenSize()=0
virtual size_t GetReadBufferSize() const
virtual int reader_task_id() const
static void AsyncReadHandler(TcpSessionPtr session)
const io::SocketStats & GetSocketStats() const
virtual int GetSessionInstance() const
DISALLOW_COPY_AND_ASSIGN(TcpSession)
int ClearMd5SocketOption(uint32_t peer_ip)
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
static bool IsSocketErrorHard(const boost::system::error_code &ec)
boost::asio::ip::tcp::socket::native_handle_type NativeSocketType
void TriggerAsyncReadHandler()
boost::scoped_ptr< Strand > io_strand_
void GetTxSocketStats(SocketIOStats &socket_stats) const
virtual void WriteReady(const boost::system::error_code &error)
const std::string & ToUVEKey() const
int32_t remote_port() const
const boost::system::error_code & close_reason() const
void AsyncReadStartInternal(TcpSessionPtr session)
boost::scoped_ptr< Socket > socket_
virtual Task * CreateReaderTask(boost::asio::mutable_buffer, size_t)
boost::system::error_code SetTcpSendBufSize(uint32_t size)
virtual void AsyncReadSome()
uint8_t GetDscpValue() const
const std::string & remote_addr_string() const
tbb::atomic< int > refcount_
boost::asio::strand< boost::asio::io_context::executor_type > Strand
void AsyncWriteInternal(TcpSessionPtr session)
static const int kDefaultWriteBufferSize
void ReleaseBufferLocked(Buffer buffer)
virtual void OnRead(Buffer buffer)=0
static int reader_task_id_
virtual Socket * socket() const
void GetRxSocketStats(SocketIOStats &socket_stats) const
static const uint8_t * BufferData(const Buffer &buffer)
NativeSocketType sock_descriptor()
virtual std::string ToString() const
static void WriteReadyInternal(TcpSessionPtr session, const boost::system::error_code &error, uint64_t block_start_time)
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password)
void set_read_on_connect(bool read)
BufferQueue buffer_queue_
boost::system::error_code SetTcpNoDelay()
static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error, std::size_t bytes_transferred)
static size_t BufferSize(const Buffer &buffer)
void GetRxSocketStats(SocketIOStats *socket_stats) const
void DeleteBuffer(boost::asio::mutable_buffer buffer)
void SessionEstablished(Endpoint remote, Direction direction)
virtual void ReleaseBuffer(Buffer buffer)
boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size)
void set_observer(EventObserver observer)
std::list< boost::asio::mutable_buffer > BufferQueue
boost::function< void(TcpSession *, Event)> EventObserver
virtual void SetDeferReader(bool defer_reader)
boost::asio::ip::tcp::endpoint Endpoint
Endpoint remote_endpoint() const
boost::asio::const_buffer Buffer
void SetEstablished(Endpoint remote, Direction dir)
boost::system::error_code close_reason_
boost::scoped_ptr< TcpMessageWriter > writer_
void CloseInternal(const boost::system::error_code &ec, bool call_observer, bool notify_server=true)
bool IsEstablished() const
virtual bool IsReaderDeferred() const
TcpSession(TcpServer *server, Socket *socket, bool async_read_ready=true, size_t buffer_send_size=TcpSession::kDefaultWriteBufferSize)
friend void intrusive_ptr_add_ref(TcpSession *session)
boost::intrusive_ptr< TcpSession > TcpSessionPtr
bool IsClosedLocked() const
bool IsEstablishedLocked() const
virtual boost::system::error_code SetSocketOptions()
Endpoint local_endpoint() const
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
void GetTxSocketStats(SocketIOStats *socket_stats) const
static const int kDefaultBufferSize
std::string remote_addr_str_
virtual bool Connected(Endpoint remote)
boost::asio::ip::tcp::socket Socket
tbb::atomic< bool > defer_reader_
virtual void AsyncReadStart()
int32_t local_port() const
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
virtual size_t ReadSome(boost::asio::mutable_buffer buffer, boost::system::error_code *error)
int SetDscpSocketOption(uint8_t value)
tbb::atomic< bool > tcp_close_in_progress_
friend void intrusive_ptr_release(TcpSession *session)
tbb::atomic< bool > write_blocked_
boost::intrusive_ptr< TcpServer > TcpServerPtr
void intrusive_ptr_add_ref(TcpSession *session)
void intrusive_ptr_release(TcpSession *session)