10 #include <boost/asio.hpp>
11 #include <boost/asio/detail/socket_option.hpp>
12 #include <boost/bind/bind.hpp>
13 #include <boost/scoped_array.hpp>
14 #include <boost/asio/detail/recycling_allocator.hpp>
25 using boost::asio::async_write;
26 using boost::asio::buffer;
27 using boost::asio::buffer_cast;
28 using boost::asio::detail::socket_option::integer;
29 using boost::asio::const_buffer;
30 using boost::asio::mutable_buffer;
31 using boost::asio::mutable_buffers_1;
32 using boost::asio::null_buffers;
33 using boost::asio::socket_base;
35 using boost::function;
36 using boost::scoped_array;
37 using boost::system::error_code;
39 using std::ostringstream;
41 using namespace boost::placeholders;
43 using boost::asio::error::try_again;
44 using boost::asio::error::would_block;
45 using boost::asio::error::in_progress;
46 using boost::asio::error::interrupted;
47 using boost::asio::error::network_down;
48 using boost::asio::error::network_reset;
49 using boost::asio::error::network_unreachable;
50 using boost::asio::error::no_buffer_space;
51 using boost::asio::placeholders::error;
52 using boost::asio::placeholders::bytes_transferred;
53 using boost::asio::ip::tcp;
62 :
Task(session->reader_task_id(), session->GetSessionInstance()),
63 session_(session), read_fn_(read_fn), buffer_(buffer) {
66 if (session_->IsEstablished()) {
68 if (session_->IsReaderDeferred()) {
71 session_->stats_.read_blocked++;
72 session_->server_->stats_.read_blocked++;
74 session_->AsyncReadStart();
89 size_t buffer_send_size)
92 read_on_connect_(async_read_ready),
121 uint8_t *data =
new uint8_t[buffer_size];
122 mutable_buffer buffer = mutable_buffer(data, buffer_size);
128 uint8_t *data = buffer_cast<uint8_t *>(buffer);
132 static int BufferCmp(
const mutable_buffer &lhs,
const const_buffer &rhs) {
133 const uint8_t *lp = buffer_cast<uint8_t *>(lhs);
134 const uint8_t *rp = buffer_cast<const uint8_t *>(rhs);
145 std::scoped_lock lock(
mutex_);
168 server_->stats_.read_blocked_duration_usecs += blocked_usecs;
171 std::scoped_lock lock(
mutex_);
177 boost::asio::detail::recycling_allocator<void> allocator;
195 socket()->async_read_some(null_buffers(),
201 async_write(*
socket(), buffer(data, size),
203 error, bytes_transferred));
207 std::scoped_lock lock(
mutex_);
229 local =
socket()->local_endpoint(error);
230 out << local.address().to_string() <<
":" << local.port() <<
"::";
236 std::string hostname =
"";
237 if (local.address().is_v4()) {
242 out << hostname <<
":" <<
remote_.address().to_string();
257 "Passive session Accept complete");
274 std::scoped_lock lock(
mutex_);
283 "Active session connection complete");
307 bool call_observer,
bool notify_server) {
308 std::unique_lock<std::mutex> lock(
mutex_);
312 socket()->shutdown(tcp::socket::shutdown_both, error);
315 "Shutdown failed due to error: " << error.message());
349 boost::asio::detail::recycling_allocator<void> allocator;
356 std::unique_lock<std::mutex> lock(
mutex_);
380 const error_code &error,
382 std::unique_lock<std::mutex> lock(session->mutex_);
383 if (session->IsSocketErrorHard(error)) {
386 "Write failed due to error: " << error.message());
387 session->CloseInternal(error,
true);
394 if (session->IsClosedLocked())
return;
397 session->stats_.write_bytes += wrote;
398 session->server_->stats_.write_bytes += wrote;
400 bool send_ready =
false;
401 bool more_write = session->writer_->UpdateBufferQueue(wrote, &send_ready);
405 session->writer_->TriggerAsyncWrite();
406 }
else if (session->tcp_close_in_progress_) {
408 session->CloseInternal(error,
true);
414 session->WriteReady(error);
420 std::scoped_lock lock(session->mutex_);
425 if (session->IsClosedLocked())
return;
426 session->writer_->TriggerAsyncWrite();
431 std::unique_lock<std::mutex> lock(
mutex_);
442 if (
socket()->non_blocking()) {
444 int len =
writer_->AsyncSend(data, size, &error);
448 "Write failed due to error: "
449 << error.category().name() <<
" "
454 if ((
size_t) len != size)
456 if (sent) *sent = (len > 0) ? len : 0;
462 size_t bytes_transferred) {
463 Buffer rdbuf(buffer_cast<const uint8_t *>(buffer), bytes_transferred);
470 return socket()->read_some(mutable_buffers_1(buffer), *error);
478 size_t size =
socket_->available(error);
485 std::unique_lock<std::mutex> lock(session->mutex_);
486 if (session->closed_) {
490 mutable_buffer buffer =
491 session->AllocateBuffer(session->GetReadBufferSize());
494 size_t bytes_transferred = session->ReadSome(buffer, &error);
495 if (session->IsSocketErrorHard(error)) {
496 session->ReleaseBufferLocked(buffer);
498 if (error != boost::asio::error::eof) {
499 if (strcmp(error.category().name(),
"asio.ssl") == 0 &&
502 "Read failed due to error "
503 << error.category().name() <<
" "
505 <<
" : " << error.message());
508 "Read failed due to error "
509 << error.category().name() <<
" "
511 <<
" : " << error.message());
515 session->CloseInternal(error,
true);
520 session->stats_.read_calls++;
521 session->stats_.read_bytes += bytes_transferred;
522 session->server_->stats_.read_calls++;
523 session->server_->stats_.read_bytes += bytes_transferred;
525 Task *
task = session->CreateReaderTask(buffer, bytes_transferred);
557 return remote.port();
561 const string &md5_password) {
580 : session_(session), callback_(callback), offset_(0), remain_(-1) {
590 return kMaxMessageSize;
592 int bufsize = 1 << 8;
593 for (; bufsize < kMaxMessageSize && bufsize < length; bufsize <<= 1) {
606 assert((dst - data) + bytes < msglength);
607 memcpy(dst, cp, bytes);
615 int count = msglength - (dst - data);
616 assert((dst - data) + count <= msglength);
625 for (BufferQueue::const_iterator iter =
queue_.begin();
626 iter !=
queue_.end(); ++iter) {
637 uint8_t *data,
Buffer buffer,
size_t size)
const {
640 for (BufferQueue::const_iterator iter =
queue_.begin();
641 iter !=
queue_.end(); ++iter) {
651 int remain = size - offset;
652 avail = min(avail, remain);
653 assert(offset + avail <= size);
654 memcpy(data + offset, cp, avail);
659 int remain = size - offset;
660 avail = min(avail, remain);
661 assert(offset + avail <= size);
668 return Buffer(data, size);
681 if (queuelen +
static_cast<int>(size) < kHeaderLenSize) {
685 scoped_array<uint8_t> data(
new uint8_t[kHeaderLenSize]);
686 Buffer header =
PullUp(data.get(), buffer, kHeaderLenSize);
690 remain_ = msglength - queuelen;
705 bool success =
callback_(data.get(), msglength);
716 if (msglength > avail) {
748 if (ec == would_block)
750 if (ec == in_progress)
752 if (ec == interrupted)
754 if (ec == network_down)
756 if (ec == network_reset)
758 if (ec == network_unreachable)
760 if (ec == no_buffer_space)
768 boost::asio::ip::tcp::no_delay no_delay_option(
true);
769 socket()->set_option(no_delay_option, ec);
772 "tcp_no_delay set error: " << ec);
779 socket_base::send_buffer_size send_buffer_size_option(size);
780 socket()->set_option(send_buffer_size_option, ec);
783 "send_buffer_size set error: " << ec);
792 socket_base::receive_buffer_size receive_buffer_size_option(size);
793 socket()->set_option(receive_buffer_size_option, ec);
796 "receive_buffer_size set error: " << ec);
804 int keepalive_intvl,
int keepalive_probes,
int tcp_user_timeout_val) {
806 socket_base::keep_alive keep_alive_option(
true);
807 socket()->set_option(keep_alive_option, ec);
810 "keep_alive set error: " << ec);
814 typedef integer< IPPROTO_TCP, TCP_KEEPIDLE > keepalive_idle_time;
815 keepalive_idle_time keepalive_idle_time_option(keepalive_time);
816 socket()->set_option(keepalive_idle_time_option, ec);
819 "keepalive_idle_time: " << keepalive_time <<
" set error: " << ec);
823 typedef integer< IPPROTO_TCP, TCP_KEEPALIVE > keepalive_idle_time;
824 keepalive_idle_time keepalive_idle_time_option(keepalive_time);
825 socket()->set_option(keepalive_idle_time_option, ec);
828 "keepalive_idle_time: " << keepalive_time <<
" set error: " << ec);
832 #error No TCP keepalive option defined.
835 typedef integer< IPPROTO_TCP, TCP_KEEPINTVL > keepalive_interval;
836 keepalive_interval keepalive_interval_option(keepalive_intvl);
837 socket()->set_option(keepalive_interval_option, ec);
840 "keepalive_interval: " << keepalive_intvl <<
" set error: " << ec);
845 typedef integer< IPPROTO_TCP, TCP_KEEPCNT > keepalive_count;
846 keepalive_count keepalive_count_option(keepalive_probes);
847 socket()->set_option(keepalive_count_option, ec);
850 "keepalive_probes: " << keepalive_probes <<
" set error: " << ec);
854 #ifdef TCP_USER_TIMEOUT
855 typedef integer< IPPROTO_TCP, TCP_USER_TIMEOUT > tcp_user_timeout;
856 tcp_user_timeout tcp_user_timeout_option(tcp_user_timeout_val);
857 socket()->set_option(tcp_user_timeout_option, ec);
860 "tcp_user_timeout: " << tcp_user_timeout_val <<
" set error: "
875 socket()->non_blocking(
true, ec);
878 "Cannot set socket non blocking: " << ec);
882 char *buffer_size_str = getenv(
"TCP_SESSION_SOCKET_BUFFER_SIZE");
883 if (!buffer_size_str)
return ec;
885 uint32_t sz =
static_cast<uint32_t
>(strtoul(buffer_size_str, NULL, 0));
893 socket_base::send_buffer_size send_buffer_size_option(sz);
894 socket()->set_option(send_buffer_size_option, ec);
897 "send_buffer_size set error: " << ec);
901 socket_base::receive_buffer_size receive_buffer_size_option(sz);
902 socket()->set_option(receive_buffer_size_option, ec);
905 "receive_buffer_size set error: " << ec);
std::string ResolveCanonicalNameIPv6(const std::string &ipv6)
std::string ResolveCanonicalName()
boost::asio::io_context * io_service()
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
int GetTaskId(const std::string &name)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a class to describe a computational task within OpenSDN control plane applications....
static const int kTaskInstanceAny
Specifies value for wildcard (any or *) task data ID.
TcpMessageReader(TcpSession *session, ReceiveCallback callback)
Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const
int QueueByteLength() const
int AllocBufferSize(int length)
virtual void OnRead(Buffer buffer)
ReceiveCallback callback_
virtual ~TcpMessageReader()
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
virtual const int GetMaxMessageSize()=0
virtual int MsgLength(Buffer buffer, int offset)=0
uint8_t * BufferConcat(uint8_t *data, Buffer buffer, int msglength)
boost::asio::const_buffer Buffer
virtual const int GetHeaderLenSize()=0
EventManager * event_manager()
int SetDscpSocketOption(NativeSocketType fd, uint8_t value)
int SetMd5SocketOption(NativeSocketType fd, uint32_t peer_ip, const std::string &md5_password)
string Description() const
Gives a description of the task.
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
Reader(TcpSessionPtr session, ReadHandler read_fn, Buffer buffer)
function< void(Buffer)> ReadHandler
virtual size_t GetReadBufferSize() const
static void AsyncReadHandler(TcpSessionPtr session)
virtual int GetSessionInstance() const
int ClearMd5SocketOption(uint32_t peer_ip)
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
static bool IsSocketErrorHard(const boost::system::error_code &ec)
void TriggerAsyncReadHandler()
boost::scoped_ptr< Strand > io_strand_
virtual void WriteReady(const boost::system::error_code &error)
std::atomic< bool > defer_reader_
int32_t remote_port() const
void AsyncReadStartInternal(TcpSessionPtr session)
boost::scoped_ptr< Socket > socket_
virtual Task * CreateReaderTask(boost::asio::mutable_buffer, size_t)
std::atomic< bool > tcp_close_in_progress_
boost::system::error_code SetTcpSendBufSize(uint32_t size)
virtual void AsyncReadSome()
uint8_t GetDscpValue() const
boost::asio::strand< boost::asio::io_context::executor_type > Strand
void AsyncWriteInternal(TcpSessionPtr session)
void ReleaseBufferLocked(Buffer buffer)
virtual void OnRead(Buffer buffer)=0
static int reader_task_id_
virtual Socket * socket() const
std::atomic< bool > write_blocked_
static const uint8_t * BufferData(const Buffer &buffer)
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password)
BufferQueue buffer_queue_
boost::system::error_code SetTcpNoDelay()
static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error, std::size_t bytes_transferred)
static size_t BufferSize(const Buffer &buffer)
void GetRxSocketStats(SocketIOStats *socket_stats) const
void DeleteBuffer(boost::asio::mutable_buffer buffer)
void SessionEstablished(Endpoint remote, Direction direction)
virtual void ReleaseBuffer(Buffer buffer)
boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size)
void set_observer(EventObserver observer)
boost::function< void(TcpSession *, Event)> EventObserver
std::atomic< int > refcount_
virtual void SetDeferReader(bool defer_reader)
boost::asio::ip::tcp::endpoint Endpoint
boost::asio::const_buffer Buffer
boost::system::error_code close_reason_
boost::scoped_ptr< TcpMessageWriter > writer_
void CloseInternal(const boost::system::error_code &ec, bool call_observer, bool notify_server=true)
TcpSession(TcpServer *server, Socket *socket, bool async_read_ready=true, size_t buffer_send_size=TcpSession::kDefaultWriteBufferSize)
boost::intrusive_ptr< TcpSession > TcpSessionPtr
bool IsEstablishedLocked() const
virtual boost::system::error_code SetSocketOptions()
Endpoint local_endpoint() const
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
void GetTxSocketStats(SocketIOStats *socket_stats) const
static const int kDefaultBufferSize
std::string remote_addr_str_
virtual bool Connected(Endpoint remote)
boost::asio::ip::tcp::socket Socket
virtual void AsyncReadStart()
int32_t local_port() const
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
virtual size_t ReadSome(boost::asio::mutable_buffer buffer, boost::system::error_code *error)
int SetDscpSocketOption(uint8_t value)
#define TCP_SESSION_LOG_DEBUG(session, dir, arg)
#define TCP_SESSION_LOG_ERROR(session, dir, arg)
#define TCP_SESSION_LOG_UT_DEBUG(session, dir, arg)
std::atomic< uint64_t > read_block_start_time
void GetRxStats(SocketIOStats *socket_stats) const
std::atomic< uint64_t > read_blocked_duration_usecs
void GetTxStats(SocketIOStats *socket_stats) const
static int BufferCmp(const mutable_buffer &lhs, const const_buffer &rhs)
#define SSL_SHORT_READ_ERROR
static uint64_t UTCTimestampUsec()