10 #include <boost/asio.hpp>
11 #include <boost/asio/detail/socket_option.hpp>
12 #include <boost/bind.hpp>
13 #include <boost/scoped_array.hpp>
14 #include <boost/asio/detail/recycling_allocator.hpp>
25 using boost::asio::async_write;
26 using boost::asio::buffer;
27 using boost::asio::buffer_cast;
28 using boost::asio::detail::socket_option::integer;
29 using boost::asio::const_buffer;
30 using boost::asio::mutable_buffer;
31 using boost::asio::mutable_buffers_1;
32 using boost::asio::null_buffers;
33 using boost::asio::socket_base;
35 using boost::function;
36 using boost::scoped_array;
37 using boost::system::error_code;
39 using std::ostringstream;
42 using boost::asio::error::try_again;
43 using boost::asio::error::would_block;
44 using boost::asio::error::in_progress;
45 using boost::asio::error::interrupted;
46 using boost::asio::error::network_down;
47 using boost::asio::error::network_reset;
48 using boost::asio::error::network_unreachable;
49 using boost::asio::error::no_buffer_space;
50 using boost::asio::placeholders::error;
51 using boost::asio::placeholders::bytes_transferred;
52 using boost::asio::ip::tcp;
71 session_->server_->stats_.read_blocked++;
88 size_t buffer_send_size)
91 read_on_connect_(async_read_ready),
120 uint8_t *data =
new uint8_t[buffer_size];
121 mutable_buffer buffer = mutable_buffer(data, buffer_size);
127 uint8_t *data = buffer_cast<uint8_t *>(buffer);
131 static int BufferCmp(
const mutable_buffer &lhs,
const const_buffer &rhs) {
132 const uint8_t *lp = buffer_cast<uint8_t *>(lhs);
133 const uint8_t *rp = buffer_cast<
const uint8_t *>(rhs);
144 tbb::mutex::scoped_lock lock(
mutex_);
167 server_->stats_.read_blocked_duration_usecs += blocked_usecs;
170 tbb::mutex::scoped_lock lock(
mutex_);
176 boost::asio::detail::recycling_allocator<void> allocator;
194 socket()->async_read_some(null_buffers(),
200 async_write(*
socket(), buffer(data, size),
202 error, bytes_transferred));
206 tbb::mutex::scoped_lock lock(
mutex_);
228 local =
socket()->local_endpoint(error);
229 out << local.address().to_string() <<
":" << local.port() <<
"::";
235 std::string hostname =
"";
236 if (local.address().is_v4()) {
241 out << hostname <<
":" <<
remote_.address().to_string();
256 "Passive session Accept complete");
273 tbb::mutex::scoped_lock lock(
mutex_);
282 "Active session connection complete");
306 bool call_observer,
bool notify_server) {
307 tbb::mutex::scoped_lock lock(
mutex_);
311 socket()->shutdown(tcp::socket::shutdown_both, error);
314 "Shutdown failed due to error: " << error.message());
348 boost::asio::detail::recycling_allocator<void> allocator;
355 tbb::mutex::scoped_lock lock(
mutex_);
379 const error_code &error,
381 tbb::mutex::scoped_lock lock(session->mutex_);
382 if (session->IsSocketErrorHard(error)) {
385 "Write failed due to error: " << error.message());
386 session->CloseInternal(error,
true);
393 if (session->IsClosedLocked())
return;
396 session->stats_.write_bytes += wrote;
397 session->server_->stats_.write_bytes += wrote;
399 bool send_ready =
false;
400 bool more_write = session->writer_->UpdateBufferQueue(wrote, &send_ready);
404 session->writer_->TriggerAsyncWrite();
405 }
else if (session->tcp_close_in_progress_) {
407 session->CloseInternal(error,
true);
413 session->WriteReady(error);
419 tbb::mutex::scoped_lock lock(session->mutex_);
424 if (session->IsClosedLocked())
return;
425 session->writer_->TriggerAsyncWrite();
430 tbb::mutex::scoped_lock lock(
mutex_);
441 if (
socket()->non_blocking()) {
443 int len =
writer_->AsyncSend(data, size, &error);
447 "Write failed due to error: "
448 << error.category().name() <<
" "
453 if ((
size_t) len != size)
455 if (sent) *sent = (len > 0) ? len : 0;
461 size_t bytes_transferred) {
462 Buffer rdbuf(buffer_cast<const uint8_t *>(buffer), bytes_transferred);
469 return socket()->read_some(mutable_buffers_1(buffer), *error);
477 size_t size =
socket_->available(error);
484 tbb::mutex::scoped_lock lock(session->mutex_);
485 if (session->closed_) {
489 mutable_buffer buffer =
490 session->AllocateBuffer(session->GetReadBufferSize());
493 size_t bytes_transferred = session->ReadSome(buffer, &error);
494 if (session->IsSocketErrorHard(error)) {
495 session->ReleaseBufferLocked(buffer);
497 if (error != boost::asio::error::eof) {
498 if (strcmp(error.category().name(),
"asio.ssl") == 0 &&
501 "Read failed due to error "
502 << error.category().name() <<
" "
504 <<
" : " << error.message());
507 "Read failed due to error "
508 << error.category().name() <<
" "
510 <<
" : " << error.message());
514 session->CloseInternal(error,
true);
519 session->stats_.read_calls++;
520 session->stats_.read_bytes += bytes_transferred;
521 session->server_->stats_.read_calls++;
522 session->server_->stats_.read_bytes += bytes_transferred;
524 Task *
task = session->CreateReaderTask(buffer, bytes_transferred);
556 return remote.port();
560 const string &md5_password) {
579 : session_(session), callback_(callback), offset_(0), remain_(-1) {
589 return kMaxMessageSize;
591 int bufsize = 1 << 8;
592 for (; bufsize < kMaxMessageSize && bufsize < length; bufsize <<= 1) {
605 assert((dst - data) + bytes < msglength);
606 memcpy(dst, cp, bytes);
614 int count = msglength - (dst - data);
615 assert((dst - data) + count <= msglength);
624 for (BufferQueue::const_iterator iter =
queue_.begin();
625 iter !=
queue_.end(); ++iter) {
636 uint8_t *data,
Buffer buffer,
size_t size)
const {
639 for (BufferQueue::const_iterator iter =
queue_.begin();
640 iter !=
queue_.end(); ++iter) {
650 int remain = size - offset;
651 avail = min(avail, remain);
652 assert(offset + avail <= size);
653 memcpy(data + offset, cp, avail);
658 int remain = size - offset;
659 avail = min(avail, remain);
660 assert(offset + avail <= size);
667 return Buffer(data, size);
680 if (queuelen + static_cast<int>(size) < kHeaderLenSize) {
684 scoped_array<uint8_t> data(
new uint8_t[kHeaderLenSize]);
685 Buffer header =
PullUp(data.get(), buffer, kHeaderLenSize);
689 remain_ = msglength - queuelen;
702 assert(remain_ == -1);
704 bool success =
callback_(data.get(), msglength);
711 int msglength =
MsgLength(buffer, offset_);
715 if (msglength > avail) {
722 offset_ += msglength;
747 if (ec == would_block)
749 if (ec == in_progress)
751 if (ec == interrupted)
753 if (ec == network_down)
755 if (ec == network_reset)
757 if (ec == network_unreachable)
759 if (ec == no_buffer_space)
767 boost::asio::ip::tcp::no_delay no_delay_option(
true);
768 socket()->set_option(no_delay_option, ec);
771 "tcp_no_delay set error: " << ec);
778 socket_base::send_buffer_size send_buffer_size_option(size);
779 socket()->set_option(send_buffer_size_option, ec);
782 "send_buffer_size set error: " << ec);
791 socket_base::receive_buffer_size receive_buffer_size_option(size);
792 socket()->set_option(receive_buffer_size_option, ec);
795 "receive_buffer_size set error: " << ec);
803 int keepalive_intvl,
int keepalive_probes,
int tcp_user_timeout_val) {
805 socket_base::keep_alive keep_alive_option(
true);
806 socket()->set_option(keep_alive_option, ec);
809 "keep_alive set error: " << ec);
813 typedef integer< IPPROTO_TCP, TCP_KEEPIDLE > keepalive_idle_time;
814 keepalive_idle_time keepalive_idle_time_option(keepalive_time);
815 socket()->set_option(keepalive_idle_time_option, ec);
818 "keepalive_idle_time: " << keepalive_time <<
" set error: " << ec);
822 typedef integer< IPPROTO_TCP, TCP_KEEPALIVE > keepalive_idle_time;
823 keepalive_idle_time keepalive_idle_time_option(keepalive_time);
824 socket()->set_option(keepalive_idle_time_option, ec);
827 "keepalive_idle_time: " << keepalive_time <<
" set error: " << ec);
831 #error No TCP keepalive option defined.
834 typedef integer< IPPROTO_TCP, TCP_KEEPINTVL > keepalive_interval;
835 keepalive_interval keepalive_interval_option(keepalive_intvl);
836 socket()->set_option(keepalive_interval_option, ec);
839 "keepalive_interval: " << keepalive_intvl <<
" set error: " << ec);
844 typedef integer< IPPROTO_TCP, TCP_KEEPCNT > keepalive_count;
845 keepalive_count keepalive_count_option(keepalive_probes);
846 socket()->set_option(keepalive_count_option, ec);
849 "keepalive_probes: " << keepalive_probes <<
" set error: " << ec);
853 #ifdef TCP_USER_TIMEOUT
854 typedef integer< IPPROTO_TCP, TCP_USER_TIMEOUT > tcp_user_timeout;
855 tcp_user_timeout tcp_user_timeout_option(tcp_user_timeout_val);
856 socket()->set_option(tcp_user_timeout_option, ec);
859 "tcp_user_timeout: " << tcp_user_timeout_val <<
" set error: "
874 socket()->non_blocking(
true, ec);
877 "Cannot set socket non blocking: " << ec);
881 char *buffer_size_str = getenv(
"TCP_SESSION_SOCKET_BUFFER_SIZE");
882 if (!buffer_size_str)
return ec;
884 uint32_t sz =
static_cast<uint32_t
>(strtoul(buffer_size_str, NULL, 0));
892 socket_base::send_buffer_size send_buffer_size_option(sz);
893 socket()->set_option(send_buffer_size_option, ec);
896 "send_buffer_size set error: " << ec);
900 socket_base::receive_buffer_size receive_buffer_size_option(sz);
901 socket()->set_option(receive_buffer_size_option, ec);
904 "receive_buffer_size set error: " << ec);
virtual const int GetHeaderLenSize()=0
virtual int MsgLength(Buffer buffer, int offset)=0
boost::asio::ip::tcp::endpoint Endpoint
boost::asio::const_buffer Buffer
boost::intrusive_ptr< TcpSession > TcpSessionPtr
Endpoint local_endpoint() const
void DeleteBuffer(boost::asio::mutable_buffer buffer)
void CloseInternal(const boost::system::error_code &ec, bool call_observer, bool notify_server=true)
void AsyncReadStartInternal(TcpSessionPtr session)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
static const int kDefaultBufferSize
static const int kTaskInstanceAny
function< void(Buffer)> ReadHandler
void AsyncWriteInternal(TcpSessionPtr session)
static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error, std::size_t bytes_transferred)
static int reader_task_id_
virtual size_t ReadSome(boost::asio::mutable_buffer buffer, boost::system::error_code *error)
static size_t BufferSize(const Buffer &buffer)
virtual Task * CreateReaderTask(boost::asio::mutable_buffer, size_t)
int32_t remote_port() const
virtual void WriteReady(const boost::system::error_code &error)
void GetTxStats(SocketIOStats *socket_stats) const
boost::asio::io_context * io_service()
string Description() const
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
virtual void SetDeferReader(bool defer_reader)
boost::scoped_ptr< Strand > io_strand_
EventManager * event_manager()
static void AsyncReadHandler(TcpSessionPtr session)
void TriggerAsyncReadHandler()
int ClearMd5SocketOption(uint32_t peer_ip)
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
#define SSL_SHORT_READ_ERROR
BufferQueue buffer_queue_
TcpSession(TcpServer *server, Socket *socket, bool async_read_ready=true, size_t buffer_send_size=TcpSession::kDefaultWriteBufferSize)
tbb::atomic< int > refcount_
int32_t local_port() const
int GetTaskId(const std::string &name)
std::string ResolveCanonicalNameIPv6(const std::string &ipv6)
boost::asio::ip::tcp::socket Socket
void set_observer(EventObserver observer)
int SetDscpSocketOption(NativeSocketType fd, uint8_t value)
virtual bool Connected(Endpoint remote)
void GetRxSocketStats(SocketIOStats *socket_stats) const
boost::asio::strand< boost::asio::io_context::executor_type > Strand
#define TCP_SESSION_LOG_DEBUG(session, dir, arg)
virtual void OnRead(Buffer buffer)
boost::system::error_code close_reason_
tbb::atomic< uint64_t > read_blocked_duration_usecs
virtual void OnRead(Buffer buffer)=0
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
boost::asio::const_buffer Buffer
void GetRxStats(SocketIOStats *socket_stats) const
ReceiveCallback callback_
static bool IsSocketErrorHard(const boost::system::error_code &ec)
uint8_t * BufferConcat(uint8_t *data, Buffer buffer, int msglength)
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
virtual int reader_task_id() const
void SessionEstablished(Endpoint remote, Direction direction)
virtual void AsyncReadSome()
#define TCP_SESSION_LOG_ERROR(session, dir, arg)
virtual const int GetMaxMessageSize()=0
void GetTxSocketStats(SocketIOStats *socket_stats) const
boost::scoped_ptr< TcpMessageWriter > writer_
Reader(TcpSessionPtr session, ReadHandler read_fn, Buffer buffer)
tbb::atomic< bool > defer_reader_
int AllocBufferSize(int length)
std::string ResolveCanonicalName()
int QueueByteLength() const
tbb::atomic< uint64_t > read_block_start_time
TcpMessageReader(TcpSession *session, ReceiveCallback callback)
static uint64_t UTCTimestampUsec()
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
int SetDscpSocketOption(uint8_t value)
bool IsEstablishedLocked() const
static int BufferCmp(const mutable_buffer &lhs, const const_buffer &rhs)
virtual void ReleaseBuffer(Buffer buffer)
virtual int GetSessionInstance() const
virtual size_t GetReadBufferSize() const
Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const
virtual ~TcpMessageReader()
#define TCP_SESSION_LOG_UT_DEBUG(session, dir, arg)
boost::system::error_code SetTcpSendBufSize(uint32_t size)
boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size)
virtual void AsyncReadStart()
virtual Socket * socket() const
tbb::atomic< bool > write_blocked_
void ReleaseBufferLocked(Buffer buffer)
static const uint8_t * BufferData(const Buffer &buffer)
int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password)
boost::function< void(TcpSession *, Event)> EventObserver
Task is a wrapper over tbb::task to support policies.
int SetMd5SocketOption(NativeSocketType fd, uint32_t peer_ip, const std::string &md5_password)
tbb::atomic< bool > tcp_close_in_progress_
uint8_t GetDscpValue() const
boost::scoped_ptr< Socket > socket_
virtual boost::system::error_code SetSocketOptions()
boost::system::error_code SetTcpNoDelay()
std::string remote_addr_str_