OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tcp_session.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "io/tcp_session.h"
6 
7 #include <algorithm>
8 #include <string>
9 
10 #include <boost/asio.hpp>
11 #include <boost/asio/detail/socket_option.hpp>
12 #include <boost/bind.hpp>
13 #include <boost/scoped_array.hpp>
14 #include <boost/asio/detail/recycling_allocator.hpp>
15 
16 #include "base/logging.h"
17 #include "base/address_util.h"
18 #include "io/event_manager.h"
19 #include "io/io_log.h"
20 #include "io/io_utils.h"
21 #include "io/tcp_message_write.h"
22 #include "io/tcp_server.h"
23 #include "base/address_util.h"
24 
25 using boost::asio::async_write;
26 using boost::asio::buffer;
27 using boost::asio::buffer_cast;
28 using boost::asio::detail::socket_option::integer;
29 using boost::asio::const_buffer;
30 using boost::asio::mutable_buffer;
31 using boost::asio::mutable_buffers_1;
32 using boost::asio::null_buffers;
33 using boost::asio::socket_base;
34 using boost::bind;
35 using boost::function;
36 using boost::scoped_array;
37 using boost::system::error_code;
38 using std::min;
39 using std::ostringstream;
40 using std::string;
41 
42 using boost::asio::error::try_again;
43 using boost::asio::error::would_block;
44 using boost::asio::error::in_progress;
45 using boost::asio::error::interrupted;
46 using boost::asio::error::network_down;
47 using boost::asio::error::network_reset;
48 using boost::asio::error::network_unreachable;
49 using boost::asio::error::no_buffer_space;
50 using boost::asio::placeholders::error;
51 using boost::asio::placeholders::bytes_transferred;
52 using boost::asio::ip::tcp;
53 
55 
56 class TcpSession::Reader : public Task {
57 public:
58  typedef function<void(Buffer)> ReadHandler;
59 
60  Reader(TcpSessionPtr session, ReadHandler read_fn, Buffer buffer)
61  : Task(session->reader_task_id(), session->GetSessionInstance()),
62  session_(session), read_fn_(read_fn), buffer_(buffer) {
63  }
64  virtual bool Run() {
65  if (session_->IsEstablished()) {
67  if (session_->IsReaderDeferred()) {
68  // Update socket read block count.
69  session_->stats_.read_block_start_time = UTCTimestampUsec();
70  session_->stats_.read_blocked++;
71  session_->server_->stats_.read_blocked++;
72  } else {
73  session_->AsyncReadStart();
74  }
75  }
76  return true;
77  }
78  string Description() const { return "TcpSession::Reader"; }
79 
80 private:
84 };
85 
87  TcpServer *server, Socket *socket, bool async_read_ready,
88  size_t buffer_send_size)
89  : server_(server),
90  socket_(socket),
91  read_on_connect_(async_read_ready),
92  established_(false),
93  closed_(false),
94  direction_(ACTIVE),
95  writer_(new TcpMessageWriter(this, buffer_send_size)),
96  name_("-") {
97  refcount_ = 0;
98  if (reader_task_id_ == -1) {
100  reader_task_id_ = scheduler->GetTaskId("io::ReaderTask");
101  }
102  if (server_) {
103  io_strand_.reset(new Strand(server->event_manager()->io_service()->get_executor()));
104  }
105  defer_reader_ = false;
106  write_blocked_ = false;
107  tcp_close_in_progress_ = false;
108 }
109 
111  assert(!established_);
112  for (BufferQueue::iterator iter = buffer_queue_.begin();
113  iter != buffer_queue_.end(); ++iter) {
114  DeleteBuffer(*iter);
115  }
116  buffer_queue_.clear();
117 }
118 
119 mutable_buffer TcpSession::AllocateBuffer(size_t buffer_size) {
120  uint8_t *data = new uint8_t[buffer_size];
121  mutable_buffer buffer = mutable_buffer(data, buffer_size);
122  buffer_queue_.push_back(buffer);
123  return buffer;
124 }
125 
126 void TcpSession::DeleteBuffer(mutable_buffer buffer) {
127  uint8_t *data = buffer_cast<uint8_t *>(buffer);
128  delete[] data;
129 }
130 
131 static int BufferCmp(const mutable_buffer &lhs, const const_buffer &rhs) {
132  const uint8_t *lp = buffer_cast<uint8_t *>(lhs);
133  const uint8_t *rp = buffer_cast<const uint8_t *>(rhs);
134  if (lp < rp) {
135  return -1;
136  }
137  if (lp > rp) {
138  return 1;
139  }
140  return 0;
141 }
142 
144  tbb::mutex::scoped_lock lock(mutex_);
145  ReleaseBufferLocked(buffer);
146 }
147 
149  for (BufferQueue::iterator iter = buffer_queue_.begin();
150  iter != buffer_queue_.end(); ++iter) {
151  if (BufferCmp(*iter, buffer) == 0) {
152  DeleteBuffer(*iter);
153  buffer_queue_.erase(iter);
154  return;
155  }
156  }
157  assert(false);
158 }
159 
161  // Update socket read block time.
163  uint64_t blocked_usecs = UTCTimestampUsec() -
166  stats_.read_blocked_duration_usecs += blocked_usecs;
167  server_->stats_.read_blocked_duration_usecs += blocked_usecs;
168  }
169 
170  tbb::mutex::scoped_lock lock(mutex_);
171  AsyncReadSome();
172 }
173 
175  if (io_strand_) {
176  boost::asio::detail::recycling_allocator<void> allocator;
178  TcpSessionPtr(this)), allocator);
179  }
180 }
181 
182 void TcpSession::SetDeferReader(bool defer_reader) {
183  if (defer_reader_ != defer_reader) {
184  defer_reader_ = defer_reader;
185  // Call AsyncReadStart if reader was previously deferred
186  if (!defer_reader_) {
187  AsyncReadStart();
188  }
189  }
190 }
191 
193  if (IsEstablishedLocked()) {
194  socket()->async_read_some(null_buffers(),
196  }
197 }
198 
199 void TcpSession::AsyncWrite(const uint8_t *data, std::size_t size) {
200  async_write(*socket(), buffer(data, size),
202  error, bytes_transferred));
203 }
204 
206  tbb::mutex::scoped_lock lock(mutex_);
207  if (!established_)
208  return Endpoint();
209 
210  error_code error;
211  Endpoint local = socket()->local_endpoint(error);
212  if (error) {
213  return Endpoint();
214  }
215  return local;
216 }
217 
219  tbb::mutex::scoped_lock lock(obs_mutex_);
221 }
222 
224  ostringstream out;
225  error_code error;
226  Endpoint local;
227 
228  local = socket()->local_endpoint(error);
229  out << local.address().to_string() << ":" << local.port() << "::";
230  out << remote_.address().to_string() << ":" << remote_.port();
231 
232  name_ = out.str();
233 
234  out.str("");
235  std::string hostname = "";
236  if (local.address().is_v4()) {
237  hostname = ResolveCanonicalName(local.address().to_string());
238  } else {
239  hostname = ResolveCanonicalNameIPv6(local.address().to_string());
240  }
241  out << hostname << ":" << remote_.address().to_string();
242  uve_key_str_ = out.str();
243 }
244 
246  Direction direction) {
247  established_ = true;
248  remote_ = remote;
249  remote_addr_str_ = remote.address().to_string();
250  direction_ = direction;
251  SetName();
252 }
253 
256  "Passive session Accept complete");
257  {
258  tbb::mutex::scoped_lock obs_lock(obs_mutex_);
259  if (observer_) {
260  observer_(this, ACCEPT);
261  }
262  }
263 
264  if (read_on_connect_) {
265  AsyncReadStart();
266  }
267 }
268 
270  assert(refcount_);
271 
272  {
273  tbb::mutex::scoped_lock lock(mutex_);
274  if (closed_) {
275  return false;
276  }
278  }
280 
282  "Active session connection complete");
283 
284  {
285  tbb::mutex::scoped_lock obs_lock(obs_mutex_);
286  if (observer_) {
288  }
289  }
290 
291  if (read_on_connect_) {
292  AsyncReadStart();
293  }
294  return true;
295 }
296 
298  tbb::mutex::scoped_lock obs_lock(obs_mutex_);
299  if (observer_) {
300  observer_(this, CONNECT_FAILED);
301  }
302 }
303 
304 // Requires: lock must not be held
305 void TcpSession::CloseInternal(const error_code &ec,
306  bool call_observer, bool notify_server) {
307  tbb::mutex::scoped_lock lock(mutex_);
308 
309  if (socket() != NULL && !closed_) {
310  error_code error;
311  socket()->shutdown(tcp::socket::shutdown_both, error);
312  if (error) {
314  "Shutdown failed due to error: " << error.message());
315  }
316  socket()->close(error);
317  }
318  closed_ = true;
319  tcp_close_in_progress_ = false;
320 
321  if (!established_) {
322  return;
323  }
324  established_ = false;
325 
326  // copy the ec to close reason
327  close_reason_ = ec;
328 
329  // Take a reference through intrusive pointer to protect session from
330  // possibly getting deleted from another thread.
331  TcpSessionPtr session = TcpSessionPtr(this);
332  lock.release();
333 
334  if (call_observer) {
335  tbb::mutex::scoped_lock obs_lock(obs_mutex_);
336  if (observer_) {
337  observer_(this, CLOSE);
338  }
339  }
340 
341  if (notify_server) {
342  server_->OnSessionClose(this);
343  }
344 }
345 
347  if (io_strand_) {
348  boost::asio::detail::recycling_allocator<void> allocator;
350  TcpSessionPtr(this)), allocator);
351  }
352 }
353 
355  tbb::mutex::scoped_lock lock(mutex_);
356 
357  // Close can be called by application during cleanup. At this time
358  // session may be already closed due to error and there may be write
359  // data in the buffer, ignore if socket is closed.
360  if (closed_) {
361  return;
362  }
363 
364  if (server_ && writer_->IsWritePending()) {
365  tcp_close_in_progress_ = true;
366  return;
367  }
368  lock.release();
369 
370  error_code ec;
371  CloseInternal(ec, false);
372 }
373 
374 // virtual method overriden in derrived classes.
375 void TcpSession::WriteReady(const error_code &error) {
376 }
377 
379  const error_code &error,
380  std::size_t wrote) {
381  tbb::mutex::scoped_lock lock(session->mutex_);
382  if (session->IsSocketErrorHard(error)) {
383  lock.release();
385  "Write failed due to error: " << error.message());
386  session->CloseInternal(error, true);
387  return;
388  }
389 
390  //
391  // Ignore if connection is already closed.
392  //
393  if (session->IsClosedLocked()) return;
394 
395  // Update socket write bytes statistics.
396  session->stats_.write_bytes += wrote;
397  session->server_->stats_.write_bytes += wrote;
398 
399  bool send_ready = false;
400  bool more_write = session->writer_->UpdateBufferQueue(wrote, &send_ready);
401 
402  // Subsequent write
403  if (more_write) {
404  session->writer_->TriggerAsyncWrite();
405  } else if (session->tcp_close_in_progress_) {
406  lock.release();
407  session->CloseInternal(error, true);
408  return;
409  }
410 
411  lock.release();
412  if (send_ready)
413  session->WriteReady(error);
414  return;
415 }
416 
418 
419  tbb::mutex::scoped_lock lock(session->mutex_);
420 
421  //
422  // Ignore if connection is already closed.
423  //
424  if (session->IsClosedLocked()) return;
425  session->writer_->TriggerAsyncWrite();
426 }
427 
428 bool TcpSession::Send(const uint8_t *data, size_t size, size_t *sent) {
429  bool ret = true;
430  tbb::mutex::scoped_lock lock(mutex_);
431 
432  // Reset sent, if provided.
433  if (sent) *sent = 0;
434 
435  //
436  // If the session closed in the mean while, bail out
437  // If session close is triggered, but close in progress, bail out
438  //
439  if (!IsEstablishedLocked()) return false;
440 
441  if (socket()->non_blocking()) {
442  error_code error;
443  int len = writer_->AsyncSend(data, size, &error);
444  lock.release();
445  if (len < 0) {
447  "Write failed due to error: "
448  << error.category().name() << " "
449  << error.message());
450  CloseInternal(error, true);
451  return false;
452  }
453  if ((size_t) len != size)
454  ret = false;
455  if (sent) *sent = (len > 0) ? len : 0;
456  }
457  return ret;
458 }
459 
460 Task* TcpSession::CreateReaderTask(mutable_buffer buffer,
461  size_t bytes_transferred) {
462  Buffer rdbuf(buffer_cast<const uint8_t *>(buffer), bytes_transferred);
463  Reader *task = new Reader(TcpSessionPtr(this),
464  bind(&TcpSession::OnRead, this, _1), rdbuf);
465  return (task);
466 }
467 
468 size_t TcpSession::ReadSome(mutable_buffer buffer, error_code *error) {
469  return socket()->read_some(mutable_buffers_1(buffer), *error);
470 }
471 
472 // Tests with large data have shown large amounts of data being read in one
473 // read_some() call, if available. Hence, allocate memory for all the bytes
474 // available in the socket, but no less t han kDefaultBufferSize.
476  error_code error;
477  size_t size = socket_->available(error);
478  if (size < kDefaultBufferSize)
479  size = kDefaultBufferSize;
480  return size;
481 }
482 
484  tbb::mutex::scoped_lock lock(session->mutex_);
485  if (session->closed_) {
486  return;
487  }
488 
489  mutable_buffer buffer =
490  session->AllocateBuffer(session->GetReadBufferSize());
491 
492  error_code error;
493  size_t bytes_transferred = session->ReadSome(buffer, &error);
494  if (session->IsSocketErrorHard(error)) {
495  session->ReleaseBufferLocked(buffer);
496  // eof is returned when the peer closed the socket, no need to log error
497  if (error != boost::asio::error::eof) {
498  if (strcmp(error.category().name(), "asio.ssl") == 0 &&
499  error.value() == SSL_SHORT_READ_ERROR) {
501  "Read failed due to error "
502  << error.category().name() << " "
503  << error.value()
504  << " : " << error.message());
505  } else {
507  "Read failed due to error "
508  << error.category().name() << " "
509  << error.value()
510  << " : " << error.message());
511  }
512  }
513  lock.release();
514  session->CloseInternal(error, true);
515  return;
516  }
517 
518  // Update read statistics.
519  session->stats_.read_calls++;
520  session->stats_.read_bytes += bytes_transferred;
521  session->server_->stats_.read_calls++;
522  session->server_->stats_.read_bytes += bytes_transferred;
523 
524  Task *task = session->CreateReaderTask(buffer, bytes_transferred);
525  // Starting a new task for the session
527  scheduler->Enqueue(task);
528 }
529 
531  return Task::kTaskInstanceAny;
532 }
533 
534 
535 int32_t TcpSession::local_port() const {
536  if (socket() == NULL) {
537  return -1;
538  }
539  error_code error;
540  Endpoint local = socket()->local_endpoint(error);
541  if (IsSocketErrorHard(error)) {
542  return -1;
543  }
544  return local.port();
545 }
546 
547 int32_t TcpSession::remote_port() const {
548  if (socket() == NULL) {
549  return -1;
550  }
551  error_code error;
552  Endpoint remote = socket()->remote_endpoint(error);
553  if (IsSocketErrorHard(error)) {
554  return -1;
555  }
556  return remote.port();
557 }
558 
559 int TcpSession::SetMd5SocketOption(uint32_t peer_ip,
560  const string &md5_password) {
561  return server()->SetMd5SocketOption(socket_->native_handle(), peer_ip,
562  md5_password);
563 }
564 
565 int TcpSession::ClearMd5SocketOption(uint32_t peer_ip) {
566  return server()->SetMd5SocketOption(socket_->native_handle(), peer_ip, "");
567 }
568 
570  return server()->SetDscpSocketOption(socket()->native_handle(), value);
571 }
572 
573 uint8_t TcpSession::GetDscpValue() const {
574  return server_->GetDscpValue(socket()->native_handle());
575 }
576 
578  ReceiveCallback callback)
579  : session_(session), callback_(callback), offset_(0), remain_(-1) {
580 }
581 
583 }
584 
585 // Returns a buffer allocation size that is larger than the message.
587  const int kMaxMessageSize = GetMaxMessageSize();
588  if (length == -1) {
589  return kMaxMessageSize;
590  }
591  int bufsize = 1 << 8;
592  for (; bufsize < kMaxMessageSize && bufsize < length; bufsize <<= 1) {
593  }
594  return bufsize;
595 }
596 
597 uint8_t *TcpMessageReader::BufferConcat(uint8_t *data, Buffer buffer,
598  int msglength) {
599  uint8_t *dst = data;
600 
601  while (!queue_.empty()) {
602  Buffer head = queue_.front();
603  const uint8_t *cp = TcpSession::BufferData(head) + offset_;
604  int bytes = TcpSession::BufferSize(head) - offset_;
605  assert((dst - data) + bytes < msglength);
606  memcpy(dst, cp, bytes);
607  dst += bytes;
608  queue_.pop_front();
609  session_->ReleaseBuffer(head);
610  offset_ = 0;
611  remain_ = -1;
612  }
613 
614  int count = msglength - (dst - data);
615  assert((dst - data) + count <= msglength);
616  memcpy(dst, TcpSession::BufferData(buffer), count);
617  offset_ = count;
618 
619  return data;
620 }
621 
623  int total = 0;
624  for (BufferQueue::const_iterator iter = queue_.begin();
625  iter != queue_.end(); ++iter) {
626  if (total == 0) {
627  total = TcpSession::BufferSize(*iter) - offset_;
628  } else {
629  total += TcpSession::BufferSize(*iter);
630  }
631  }
632  return total;
633 }
634 
636  uint8_t *data, Buffer buffer, size_t size) const {
637  size_t offset = 0;
638 
639  for (BufferQueue::const_iterator iter = queue_.begin();
640  iter != queue_.end(); ++iter) {
641  const uint8_t *cp;
642  int avail;
643  if (offset == 0) {
644  cp = TcpSession::BufferData(*iter) + offset_;
645  avail = TcpSession::BufferSize(*iter) - offset_;
646  } else {
647  cp = TcpSession::BufferData(*iter);
648  avail = TcpSession::BufferSize(*iter);
649  }
650  int remain = size - offset;
651  avail = min(avail, remain);
652  assert(offset + avail <= size);
653  memcpy(data + offset, cp, avail);
654  offset += avail;
655  }
656 
657  int avail = TcpSession::BufferSize(buffer);
658  int remain = size - offset;
659  avail = min(avail, remain);
660  assert(offset + avail <= size);
661  memcpy(data + offset, TcpSession::BufferData(buffer), avail);
662  offset += avail;
663 
664  if (offset < size) {
665  return Buffer();
666  }
667  return Buffer(data, size);
668 }
669 
670 // Read the socket stream and send messages to the peer object.
672  const int kHeaderLenSize = GetHeaderLenSize();
673  size_t size = TcpSession::BufferSize(buffer);
674  TCP_SESSION_LOG_UT_DEBUG(session_, TCP_DIR_IN, "Read " << size << " bytes");
675 
676  if (!queue_.empty()) {
677  int msglength = MsgLength(queue_.front(), offset_);
678  if (msglength < 0) {
679  int queuelen = QueueByteLength();
680  if (queuelen + static_cast<int>(size) < kHeaderLenSize) {
681  queue_.push_back(buffer);
682  return;
683  }
684  scoped_array<uint8_t> data(new uint8_t[kHeaderLenSize]);
685  Buffer header = PullUp(data.get(), buffer, kHeaderLenSize);
686  assert(TcpSession::BufferSize(header) == (size_t) kHeaderLenSize);
687 
688  msglength = MsgLength(header, 0);
689  remain_ = msglength - queuelen;
690  }
691 
692  assert(remain_ > 0);
693  if (size < (size_t) remain_) {
694  queue_.push_back(buffer);
695  remain_ -= size;
696  return;
697  }
698 
699  // concat the buffers into a contiguous message.
700  scoped_array<uint8_t> data(new uint8_t[AllocBufferSize(msglength)]);
701  BufferConcat(data.get(), buffer, msglength);
702  assert(remain_ == -1);
703  // Receive the message
704  bool success = callback_(data.get(), msglength);
705  if (!success)
706  return;
707  }
708 
709  int avail = size - offset_;
710  while (avail > 0) {
711  int msglength = MsgLength(buffer, offset_);
712  if (msglength < 0) {
713  break;
714  }
715  if (msglength > avail) {
716  remain_ = msglength - avail;
717  break;
718  }
719  // Receive the message
720  bool success =
721  callback_(TcpSession::BufferData(buffer) + offset_, msglength);
722  offset_ += msglength;
723  avail -= msglength;
724  if (!success)
725  return;
726  }
727 
728  if (avail > 0) {
729  queue_.push_back(buffer);
730  } else {
731  session_->ReleaseBuffer(buffer);
732  offset_ = 0;
733  assert(remain_ == -1);
734  }
735 }
736 
737 //
738 // Check if a socker error is hard and fatal. Only then should we close the
739 // socket. Soft errors like EINTR and EAGAIN should be ignored or properly
740 // handled with retries
741 //
742 bool TcpSession::IsSocketErrorHard(const error_code &ec) {
743  if (!ec)
744  return false;
745  if (ec == try_again)
746  return false;
747  if (ec == would_block)
748  return false;
749  if (ec == in_progress)
750  return false;
751  if (ec == interrupted)
752  return false;
753  if (ec == network_down)
754  return false;
755  if (ec == network_reset)
756  return false;
757  if (ec == network_unreachable)
758  return false;
759  if (ec == no_buffer_space)
760  return false;
761 
762  return true;
763 }
764 
766  error_code ec;
767  boost::asio::ip::tcp::no_delay no_delay_option(true);
768  socket()->set_option(no_delay_option, ec);
769  if (ec) {
771  "tcp_no_delay set error: " << ec);
772  }
773  return ec;
774 }
775 
776 error_code TcpSession::SetTcpSendBufSize(uint32_t size) {
777  error_code ec;
778  socket_base::send_buffer_size send_buffer_size_option(size);
779  socket()->set_option(send_buffer_size_option, ec);
780  if (ec) {
782  "send_buffer_size set error: " << ec);
783  return ec;
784  }
785 
786  return ec;
787 }
788 
789 error_code TcpSession::SetTcpRecvBufSize(uint32_t size) {
790  error_code ec;
791  socket_base::receive_buffer_size receive_buffer_size_option(size);
792  socket()->set_option(receive_buffer_size_option, ec);
793  if (ec) {
795  "receive_buffer_size set error: " << ec);
796  return ec;
797  }
798 
799  return ec;
800 }
801 
802 error_code TcpSession::SetSocketKeepaliveOptions(int keepalive_time,
803  int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val) {
804  error_code ec;
805  socket_base::keep_alive keep_alive_option(true);
806  socket()->set_option(keep_alive_option, ec);
807  if (ec) {
809  "keep_alive set error: " << ec);
810  return ec;
811  }
812 #ifdef TCP_KEEPIDLE
813  typedef integer< IPPROTO_TCP, TCP_KEEPIDLE > keepalive_idle_time;
814  keepalive_idle_time keepalive_idle_time_option(keepalive_time);
815  socket()->set_option(keepalive_idle_time_option, ec);
816  if (ec) {
818  "keepalive_idle_time: " << keepalive_time << " set error: " << ec);
819  return ec;
820  }
821 #elif TCP_KEEPALIVE
822  typedef integer< IPPROTO_TCP, TCP_KEEPALIVE > keepalive_idle_time;
823  keepalive_idle_time keepalive_idle_time_option(keepalive_time);
824  socket()->set_option(keepalive_idle_time_option, ec);
825  if (ec) {
827  "keepalive_idle_time: " << keepalive_time << " set error: " << ec);
828  return ec;
829  }
830 #else
831 #error No TCP keepalive option defined.
832 #endif
833 #ifdef TCP_KEEPINTVL
834  typedef integer< IPPROTO_TCP, TCP_KEEPINTVL > keepalive_interval;
835  keepalive_interval keepalive_interval_option(keepalive_intvl);
836  socket()->set_option(keepalive_interval_option, ec);
837  if (ec) {
839  "keepalive_interval: " << keepalive_intvl << " set error: " << ec);
840  return ec;
841  }
842 #endif
843 #ifdef TCP_KEEPCNT
844  typedef integer< IPPROTO_TCP, TCP_KEEPCNT > keepalive_count;
845  keepalive_count keepalive_count_option(keepalive_probes);
846  socket()->set_option(keepalive_count_option, ec);
847  if (ec) {
849  "keepalive_probes: " << keepalive_probes << " set error: " << ec);
850  return ec;
851  }
852 #endif
853 #ifdef TCP_USER_TIMEOUT
854  typedef integer< IPPROTO_TCP, TCP_USER_TIMEOUT > tcp_user_timeout;
855  tcp_user_timeout tcp_user_timeout_option(tcp_user_timeout_val);
856  socket()->set_option(tcp_user_timeout_option, ec);
857  if (ec) {
859  "tcp_user_timeout: " << tcp_user_timeout_val << " set error: "
860  << ec);
861  return ec;
862  }
863 #endif
864 
865  return ec;
866 }
867 
869  error_code ec;
870 
871  //
872  // Make socket write non-blocking
873  //
874  socket()->non_blocking(true, ec);
875  if (ec) {
877  "Cannot set socket non blocking: " << ec);
878  return ec;
879  }
880 
881  char *buffer_size_str = getenv("TCP_SESSION_SOCKET_BUFFER_SIZE");
882  if (!buffer_size_str) return ec;
883 
884  uint32_t sz = static_cast<uint32_t>(strtoul(buffer_size_str, NULL, 0));
885  if (sz) {
886  //
887  // Set socket send and receive buffer size
888  //
889  // Currently used only under test environments to trigger partial
890  // sends more deterministically
891  //
892  socket_base::send_buffer_size send_buffer_size_option(sz);
893  socket()->set_option(send_buffer_size_option, ec);
894  if (ec) {
896  "send_buffer_size set error: " << ec);
897  return ec;
898  }
899 
900  socket_base::receive_buffer_size receive_buffer_size_option(sz);
901  socket()->set_option(receive_buffer_size_option, ec);
902  if (ec) {
904  "receive_buffer_size set error: " << ec);
905  return ec;
906  }
907  }
908 
909  return ec;
910 }
911 
912 void TcpSession::GetRxSocketStats(SocketIOStats *socket_stats) const {
913  stats_.GetRxStats(socket_stats);
914 }
915 
916 void TcpSession::GetTxSocketStats(SocketIOStats *socket_stats) const {
917  stats_.GetTxStats(socket_stats);
918 }
virtual const int GetHeaderLenSize()=0
void Close()
Definition: tcp_session.cc:354
virtual int MsgLength(Buffer buffer, int offset)=0
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_session.h:62
EventObserver observer_
Definition: tcp_session.h:278
boost::asio::const_buffer Buffer
Definition: tcp_session.h:64
boost::intrusive_ptr< TcpSession > TcpSessionPtr
Definition: tcp_session.h:184
Endpoint local_endpoint() const
Definition: tcp_session.cc:205
void DeleteBuffer(boost::asio::mutable_buffer buffer)
Definition: tcp_session.cc:126
void CloseInternal(const boost::system::error_code &ec, bool call_observer, bool notify_server=true)
Definition: tcp_session.cc:305
void AsyncReadStartInternal(TcpSessionPtr session)
Definition: tcp_session.cc:160
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
TcpSession * session_
Definition: tcp_session.h:334
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
Definition: tcp_session.h:311
static const int kDefaultBufferSize
Definition: tcp_session.h:44
static const int kTaskInstanceAny
Definition: task.h:102
BufferQueue queue_
Definition: tcp_session.h:336
function< void(Buffer)> ReadHandler
Definition: tcp_session.cc:58
void AsyncWriteInternal(TcpSessionPtr session)
Definition: tcp_session.cc:417
static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error, std::size_t bytes_transferred)
Definition: tcp_session.cc:378
bool read_on_connect_
Definition: tcp_session.h:263
static int reader_task_id_
Definition: tcp_session.h:259
virtual size_t ReadSome(boost::asio::mutable_buffer buffer, boost::system::error_code *error)
Definition: tcp_session.cc:468
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:116
virtual Task * CreateReaderTask(boost::asio::mutable_buffer, size_t)
Definition: tcp_session.cc:460
int32_t remote_port() const
Definition: tcp_session.cc:547
virtual void WriteReady(const boost::system::error_code &error)
Definition: tcp_session.cc:375
TcpServerPtr server_
Definition: tcp_session.h:261
void GetTxStats(SocketIOStats *socket_stats) const
Definition: io_utils.cc:47
boost::asio::io_context * io_service()
Definition: event_manager.h:42
string Description() const
Definition: tcp_session.cc:78
bool closed_
Definition: tcp_session.h:267
virtual void Accepted()
Definition: tcp_session.cc:254
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
Definition: tcp_session.cc:802
std::string name_
Definition: tcp_session.h:283
Definition: task_int.h:10
virtual void SetDeferReader(bool defer_reader)
Definition: tcp_session.cc:182
boost::scoped_ptr< Strand > io_strand_
Definition: tcp_session.h:230
EventManager * event_manager()
Definition: tcp_server.h:76
static void AsyncReadHandler(TcpSessionPtr session)
Definition: tcp_session.cc:483
void TriggerAsyncReadHandler()
Definition: tcp_session.cc:346
int ClearMd5SocketOption(uint32_t peer_ip)
Definition: tcp_session.cc:565
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:428
#define SSL_SHORT_READ_ERROR
Definition: tcp_session.h:30
BufferQueue buffer_queue_
Definition: tcp_session.h:271
#define TCP_DIR_NA
Definition: io_log.h:48
TcpSession(TcpServer *server, Socket *socket, bool async_read_ready=true, size_t buffer_send_size=TcpSession::kDefaultWriteBufferSize)
Definition: tcp_session.cc:86
tbb::atomic< int > refcount_
Definition: tcp_session.h:282
void SetName()
Definition: tcp_session.cc:223
int32_t local_port() const
Definition: tcp_session.cc:535
int GetTaskId(const std::string &name)
Definition: task.cc:856
std::string ResolveCanonicalNameIPv6(const std::string &ipv6)
Definition: address_util.cc:89
boost::asio::ip::tcp::socket Socket
Definition: tcp_session.h:60
io::SocketStats stats_
Definition: tcp_session.h:226
void set_observer(EventObserver observer)
Definition: tcp_session.cc:218
int SetDscpSocketOption(NativeSocketType fd, uint8_t value)
Definition: tcp_server.cc:535
virtual bool Connected(Endpoint remote)
Definition: tcp_session.cc:269
void GetRxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:912
boost::asio::strand< boost::asio::io_context::executor_type > Strand
Definition: tcp_session.h:229
#define TCP_SESSION_LOG_DEBUG(session, dir, arg)
Definition: io_log.h:123
virtual void OnRead(Buffer buffer)
Definition: tcp_session.cc:671
boost::system::error_code close_reason_
Definition: tcp_session.h:272
tbb::atomic< uint64_t > read_blocked_duration_usecs
Definition: io_utils.h:31
virtual void OnRead(Buffer buffer)=0
tbb::mutex mutex_
Definition: tcp_session.h:225
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
boost::asio::const_buffer Buffer
Definition: tcp_session.h:310
void GetRxStats(SocketIOStats *socket_stats) const
Definition: io_utils.cc:28
ReceiveCallback callback_
Definition: tcp_session.h:335
static bool IsSocketErrorHard(const boost::system::error_code &ec)
Definition: tcp_session.cc:742
uint8_t * BufferConcat(uint8_t *data, Buffer buffer, int msglength)
Definition: tcp_session.cc:597
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
Definition: tcp_session.cc:199
virtual int reader_task_id() const
Definition: tcp_session.h:208
void SessionEstablished(Endpoint remote, Direction direction)
Definition: tcp_session.cc:245
virtual void AsyncReadSome()
Definition: tcp_session.cc:192
void ConnectFailed()
Definition: tcp_session.cc:297
#define TCP_SESSION_LOG_ERROR(session, dir, arg)
Definition: io_log.h:117
virtual const int GetMaxMessageSize()=0
void GetTxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:916
boost::scoped_ptr< TcpMessageWriter > writer_
Definition: tcp_session.h:280
Reader(TcpSessionPtr session, ReadHandler read_fn, Buffer buffer)
Definition: tcp_session.cc:60
Direction direction_
Definition: tcp_session.h:270
#define TCP_DIR_OUT
Definition: io_log.h:46
tbb::atomic< bool > defer_reader_
Definition: tcp_session.h:284
int AllocBufferSize(int length)
Definition: tcp_session.cc:586
TcpSessionPtr session_
Definition: tcp_session.cc:81
tbb::mutex obs_mutex_
Definition: tcp_session.h:277
std::string ResolveCanonicalName()
Definition: address_util.cc:40
int QueueByteLength() const
Definition: tcp_session.cc:622
bool established_
Definition: tcp_session.h:266
tbb::atomic< uint64_t > read_block_start_time
Definition: io_utils.h:29
TcpMessageReader(TcpSession *session, ReceiveCallback callback)
Definition: tcp_session.cc:577
std::string uve_key_str_
Definition: tcp_session.h:285
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Definition: tcp_session.cc:64
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
Definition: tcp_session.cc:789
int SetDscpSocketOption(uint8_t value)
Definition: tcp_session.cc:569
bool IsEstablishedLocked() const
Definition: tcp_session.h:250
TcpServer * server()
Definition: tcp_session.h:88
static int BufferCmp(const mutable_buffer &lhs, const const_buffer &rhs)
Definition: tcp_session.cc:131
virtual void ReleaseBuffer(Buffer buffer)
Definition: tcp_session.cc:143
virtual int GetSessionInstance() const
Definition: tcp_session.cc:530
virtual size_t GetReadBufferSize() const
Definition: tcp_session.cc:475
Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const
Definition: tcp_session.cc:635
virtual ~TcpMessageReader()
Definition: tcp_session.cc:582
#define TCP_DIR_IN
Definition: io_log.h:47
ReadHandler read_fn_
Definition: tcp_session.cc:82
#define TCP_SESSION_LOG_UT_DEBUG(session, dir, arg)
Definition: io_log.h:126
boost::system::error_code SetTcpSendBufSize(uint32_t size)
Definition: tcp_session.cc:776
Endpoint remote_
Definition: tcp_session.h:268
boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size)
Definition: tcp_session.cc:119
virtual void AsyncReadStart()
Definition: tcp_session.cc:174
virtual Socket * socket() const
Definition: tcp_session.h:86
tbb::atomic< bool > write_blocked_
Definition: tcp_session.h:286
virtual ~TcpSession()
Definition: tcp_session.cc:110
void ReleaseBufferLocked(Buffer buffer)
Definition: tcp_session.cc:148
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:113
int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password)
Definition: tcp_session.cc:559
boost::function< void(TcpSession *, Event)> EventObserver
Definition: tcp_session.h:63
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
int SetMd5SocketOption(NativeSocketType fd, uint32_t peer_ip, const std::string &md5_password)
Definition: tcp_server.cc:482
tbb::atomic< bool > tcp_close_in_progress_
Definition: tcp_session.h:287
EventObserver observer()
Definition: tcp_session.h:214
uint8_t GetDscpValue() const
Definition: tcp_session.cc:573
boost::scoped_ptr< Socket > socket_
Definition: tcp_session.h:262
virtual boost::system::error_code SetSocketOptions()
Definition: tcp_session.cc:868
boost::system::error_code SetTcpNoDelay()
Definition: tcp_session.cc:765
std::string remote_addr_str_
Definition: tcp_session.h:269