OpenSDN source code
tcp_session.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "io/tcp_session.h"
6 
7 #include <algorithm>
8 #include <string>
9 
10 #include <boost/asio.hpp>
11 #include <boost/asio/detail/socket_option.hpp>
12 #include <boost/bind/bind.hpp>
13 #include <boost/scoped_array.hpp>
14 #include <boost/asio/detail/recycling_allocator.hpp>
15 
16 #include "base/logging.h"
17 #include "base/address_util.h"
18 #include "io/event_manager.h"
19 #include "io/io_log.h"
20 #include "io/io_utils.h"
21 #include "io/tcp_message_write.h"
22 #include "io/tcp_server.h"
23 #include "base/address_util.h"
24 
25 using boost::asio::async_write;
26 using boost::asio::buffer;
27 using boost::asio::buffer_cast;
28 using boost::asio::detail::socket_option::integer;
29 using boost::asio::const_buffer;
30 using boost::asio::mutable_buffer;
31 using boost::asio::mutable_buffers_1;
32 using boost::asio::null_buffers;
33 using boost::asio::socket_base;
34 using boost::bind;
35 using boost::function;
36 using boost::scoped_array;
37 using boost::system::error_code;
38 using std::min;
39 using std::ostringstream;
40 using std::string;
41 using namespace boost::placeholders;
42 
43 using boost::asio::error::try_again;
44 using boost::asio::error::would_block;
45 using boost::asio::error::in_progress;
46 using boost::asio::error::interrupted;
47 using boost::asio::error::network_down;
48 using boost::asio::error::network_reset;
49 using boost::asio::error::network_unreachable;
50 using boost::asio::error::no_buffer_space;
51 using boost::asio::placeholders::error;
52 using boost::asio::placeholders::bytes_transferred;
53 using boost::asio::ip::tcp;
54 
56 
57 class TcpSession::Reader : public Task {
58 public:
59  typedef function<void(Buffer)> ReadHandler;
60 
61  Reader(TcpSessionPtr session, ReadHandler read_fn, Buffer buffer)
62  : Task(session->reader_task_id(), session->GetSessionInstance()),
63  session_(session), read_fn_(read_fn), buffer_(buffer) {
64  }
65  virtual bool Run() {
66  if (session_->IsEstablished()) {
67  read_fn_(buffer_);
68  if (session_->IsReaderDeferred()) {
69  // Update socket read block count.
70  session_->stats_.read_block_start_time = UTCTimestampUsec();
71  session_->stats_.read_blocked++;
72  session_->server_->stats_.read_blocked++;
73  } else {
74  session_->AsyncReadStart();
75  }
76  }
77  return true;
78  }
79  string Description() const { return "TcpSession::Reader"; }
80 
81 private:
85 };
86 
88  TcpServer *server, Socket *socket, bool async_read_ready,
89  size_t buffer_send_size)
90  : server_(server),
91  socket_(socket),
92  read_on_connect_(async_read_ready),
93  established_(false),
94  closed_(false),
95  direction_(ACTIVE),
96  writer_(new TcpMessageWriter(this, buffer_send_size)),
97  name_("-") {
98  refcount_ = 0;
99  if (reader_task_id_ == -1) {
101  reader_task_id_ = scheduler->GetTaskId("io::ReaderTask");
102  }
103  if (server_) {
104  io_strand_.reset(new Strand(server->event_manager()->io_service()->get_executor()));
105  }
106  defer_reader_ = false;
107  write_blocked_ = false;
108  tcp_close_in_progress_ = false;
109 }
110 
112  assert(!established_);
113  for (BufferQueue::iterator iter = buffer_queue_.begin();
114  iter != buffer_queue_.end(); ++iter) {
115  DeleteBuffer(*iter);
116  }
117  buffer_queue_.clear();
118 }
119 
120 mutable_buffer TcpSession::AllocateBuffer(size_t buffer_size) {
121  uint8_t *data = new uint8_t[buffer_size];
122  mutable_buffer buffer = mutable_buffer(data, buffer_size);
123  buffer_queue_.push_back(buffer);
124  return buffer;
125 }
126 
127 void TcpSession::DeleteBuffer(mutable_buffer buffer) {
128  uint8_t *data = buffer_cast<uint8_t *>(buffer);
129  delete[] data;
130 }
131 
132 static int BufferCmp(const mutable_buffer &lhs, const const_buffer &rhs) {
133  const uint8_t *lp = buffer_cast<uint8_t *>(lhs);
134  const uint8_t *rp = buffer_cast<const uint8_t *>(rhs);
135  if (lp < rp) {
136  return -1;
137  }
138  if (lp > rp) {
139  return 1;
140  }
141  return 0;
142 }
143 
145  std::scoped_lock lock(mutex_);
146  ReleaseBufferLocked(buffer);
147 }
148 
150  for (BufferQueue::iterator iter = buffer_queue_.begin();
151  iter != buffer_queue_.end(); ++iter) {
152  if (BufferCmp(*iter, buffer) == 0) {
153  DeleteBuffer(*iter);
154  buffer_queue_.erase(iter);
155  return;
156  }
157  }
158  assert(false);
159 }
160 
162  // Update socket read block time.
164  uint64_t blocked_usecs = UTCTimestampUsec() -
167  stats_.read_blocked_duration_usecs += blocked_usecs;
168  server_->stats_.read_blocked_duration_usecs += blocked_usecs;
169  }
170 
171  std::scoped_lock lock(mutex_);
172  AsyncReadSome();
173 }
174 
176  if (io_strand_) {
177  boost::asio::detail::recycling_allocator<void> allocator;
179  TcpSessionPtr(this)), allocator);
180  }
181 }
182 
183 void TcpSession::SetDeferReader(bool defer_reader) {
184  if (defer_reader_ != defer_reader) {
185  defer_reader_ = defer_reader;
186  // Call AsyncReadStart if reader was previously deferred
187  if (!defer_reader_) {
188  AsyncReadStart();
189  }
190  }
191 }
192 
194  if (IsEstablishedLocked()) {
195  socket()->async_read_some(null_buffers(),
197  }
198 }
199 
200 void TcpSession::AsyncWrite(const uint8_t *data, std::size_t size) {
201  async_write(*socket(), buffer(data, size),
203  error, bytes_transferred));
204 }
205 
207  std::scoped_lock lock(mutex_);
208  if (!established_)
209  return Endpoint();
210 
211  error_code error;
212  Endpoint local = socket()->local_endpoint(error);
213  if (error) {
214  return Endpoint();
215  }
216  return local;
217 }
218 
220  std::scoped_lock lock(obs_mutex_);
222 }
223 
225  ostringstream out;
226  error_code error;
227  Endpoint local;
228 
229  local = socket()->local_endpoint(error);
230  out << local.address().to_string() << ":" << local.port() << "::";
231  out << remote_.address().to_string() << ":" << remote_.port();
232 
233  name_ = out.str();
234 
235  out.str("");
236  std::string hostname = "";
237  if (local.address().is_v4()) {
238  hostname = ResolveCanonicalName(local.address().to_string());
239  } else {
240  hostname = ResolveCanonicalNameIPv6(local.address().to_string());
241  }
242  out << hostname << ":" << remote_.address().to_string();
243  uve_key_str_ = out.str();
244 }
245 
247  Direction direction) {
248  established_ = true;
249  remote_ = remote;
250  remote_addr_str_ = remote.address().to_string();
251  direction_ = direction;
252  SetName();
253 }
254 
257  "Passive session Accept complete");
258  {
259  std::scoped_lock obs_lock(obs_mutex_);
260  if (observer_) {
261  observer_(this, ACCEPT);
262  }
263  }
264 
265  if (read_on_connect_) {
266  AsyncReadStart();
267  }
268 }
269 
271  assert(refcount_);
272 
273  {
274  std::scoped_lock lock(mutex_);
275  if (closed_) {
276  return false;
277  }
279  }
281 
283  "Active session connection complete");
284 
285  {
286  std::scoped_lock obs_lock(obs_mutex_);
287  if (observer_) {
289  }
290  }
291 
292  if (read_on_connect_) {
293  AsyncReadStart();
294  }
295  return true;
296 }
297 
299  std::scoped_lock obs_lock(obs_mutex_);
300  if (observer_) {
301  observer_(this, CONNECT_FAILED);
302  }
303 }
304 
305 // Requires: lock must not be held
306 void TcpSession::CloseInternal(const error_code &ec,
307  bool call_observer, bool notify_server) {
308  std::unique_lock<std::mutex> lock(mutex_);
309 
310  if (socket() != NULL && !closed_) {
311  error_code error;
312  socket()->shutdown(tcp::socket::shutdown_both, error);
313  if (error) {
315  "Shutdown failed due to error: " << error.message());
316  }
317  socket()->close(error);
318  }
319  closed_ = true;
320  tcp_close_in_progress_ = false;
321 
322  if (!established_) {
323  return;
324  }
325  established_ = false;
326 
327  // copy the ec to close reason
328  close_reason_ = ec;
329 
330  // Take a reference through intrusive pointer to protect session from
331  // possibly getting deleted from another thread.
332  TcpSessionPtr session = TcpSessionPtr(this);
333  lock.unlock();
334 
335  if (call_observer) {
336  std::scoped_lock obs_lock(obs_mutex_);
337  if (observer_) {
338  observer_(this, CLOSE);
339  }
340  }
341 
342  if (notify_server) {
343  server_->OnSessionClose(this);
344  }
345 }
346 
348  if (io_strand_) {
349  boost::asio::detail::recycling_allocator<void> allocator;
351  TcpSessionPtr(this)), allocator);
352  }
353 }
354 
356  std::unique_lock<std::mutex> lock(mutex_);
357 
358  // Close can be called by application during cleanup. At this time
359  // session may be already closed due to error and there may be write
360  // data in the buffer, ignore if socket is closed.
361  if (closed_) {
362  return;
363  }
364 
365  if (server_ && writer_->IsWritePending()) {
366  tcp_close_in_progress_ = true;
367  return;
368  }
369  lock.unlock();
370 
371  error_code ec;
372  CloseInternal(ec, false);
373 }
374 
375 // virtual method overriden in derrived classes.
376 void TcpSession::WriteReady(const error_code &error) {
377 }
378 
380  const error_code &error,
381  std::size_t wrote) {
382  std::unique_lock<std::mutex> lock(session->mutex_);
383  if (session->IsSocketErrorHard(error)) {
384  lock.unlock();
386  "Write failed due to error: " << error.message());
387  session->CloseInternal(error, true);
388  return;
389  }
390 
391  //
392  // Ignore if connection is already closed.
393  //
394  if (session->IsClosedLocked()) return;
395 
396  // Update socket write bytes statistics.
397  session->stats_.write_bytes += wrote;
398  session->server_->stats_.write_bytes += wrote;
399 
400  bool send_ready = false;
401  bool more_write = session->writer_->UpdateBufferQueue(wrote, &send_ready);
402 
403  // Subsequent write
404  if (more_write) {
405  session->writer_->TriggerAsyncWrite();
406  } else if (session->tcp_close_in_progress_) {
407  lock.unlock();
408  session->CloseInternal(error, true);
409  return;
410  }
411 
412  lock.unlock();
413  if (send_ready)
414  session->WriteReady(error);
415  return;
416 }
417 
419 
420  std::scoped_lock lock(session->mutex_);
421 
422  //
423  // Ignore if connection is already closed.
424  //
425  if (session->IsClosedLocked()) return;
426  session->writer_->TriggerAsyncWrite();
427 }
428 
429 bool TcpSession::Send(const uint8_t *data, size_t size, size_t *sent) {
430  bool ret = true;
431  std::unique_lock<std::mutex> lock(mutex_);
432 
433  // Reset sent, if provided.
434  if (sent) *sent = 0;
435 
436  //
437  // If the session closed in the mean while, bail out
438  // If session close is triggered, but close in progress, bail out
439  //
440  if (!IsEstablishedLocked()) return false;
441 
442  if (socket()->non_blocking()) {
443  error_code error;
444  int len = writer_->AsyncSend(data, size, &error);
445  lock.unlock();
446  if (len < 0) {
448  "Write failed due to error: "
449  << error.category().name() << " "
450  << error.message());
451  CloseInternal(error, true);
452  return false;
453  }
454  if ((size_t) len != size)
455  ret = false;
456  if (sent) *sent = (len > 0) ? len : 0;
457  }
458  return ret;
459 }
460 
461 Task* TcpSession::CreateReaderTask(mutable_buffer buffer,
462  size_t bytes_transferred) {
463  Buffer rdbuf(buffer_cast<const uint8_t *>(buffer), bytes_transferred);
464  Reader *task = new Reader(TcpSessionPtr(this),
465  bind(&TcpSession::OnRead, this, _1), rdbuf);
466  return (task);
467 }
468 
469 size_t TcpSession::ReadSome(mutable_buffer buffer, error_code *error) {
470  return socket()->read_some(mutable_buffers_1(buffer), *error);
471 }
472 
473 // Tests with large data have shown large amounts of data being read in one
474 // read_some() call, if available. Hence, allocate memory for all the bytes
475 // available in the socket, but no less t han kDefaultBufferSize.
477  error_code error;
478  size_t size = socket_->available(error);
479  if (size < kDefaultBufferSize)
480  size = kDefaultBufferSize;
481  return size;
482 }
483 
485  std::unique_lock<std::mutex> lock(session->mutex_);
486  if (session->closed_) {
487  return;
488  }
489 
490  mutable_buffer buffer =
491  session->AllocateBuffer(session->GetReadBufferSize());
492 
493  error_code error;
494  size_t bytes_transferred = session->ReadSome(buffer, &error);
495  if (session->IsSocketErrorHard(error)) {
496  session->ReleaseBufferLocked(buffer);
497  // eof is returned when the peer closed the socket, no need to log error
498  if (error != boost::asio::error::eof) {
499  if (strcmp(error.category().name(), "asio.ssl") == 0 &&
500  error.value() == SSL_SHORT_READ_ERROR) {
502  "Read failed due to error "
503  << error.category().name() << " "
504  << error.value()
505  << " : " << error.message());
506  } else {
508  "Read failed due to error "
509  << error.category().name() << " "
510  << error.value()
511  << " : " << error.message());
512  }
513  }
514  lock.unlock();
515  session->CloseInternal(error, true);
516  return;
517  }
518 
519  // Update read statistics.
520  session->stats_.read_calls++;
521  session->stats_.read_bytes += bytes_transferred;
522  session->server_->stats_.read_calls++;
523  session->server_->stats_.read_bytes += bytes_transferred;
524 
525  Task *task = session->CreateReaderTask(buffer, bytes_transferred);
526  // Starting a new task for the session
528  scheduler->Enqueue(task);
529 }
530 
532  return Task::kTaskInstanceAny;
533 }
534 
535 
536 int32_t TcpSession::local_port() const {
537  if (socket() == NULL) {
538  return -1;
539  }
540  error_code error;
541  Endpoint local = socket()->local_endpoint(error);
542  if (IsSocketErrorHard(error)) {
543  return -1;
544  }
545  return local.port();
546 }
547 
548 int32_t TcpSession::remote_port() const {
549  if (socket() == NULL) {
550  return -1;
551  }
552  error_code error;
553  Endpoint remote = socket()->remote_endpoint(error);
554  if (IsSocketErrorHard(error)) {
555  return -1;
556  }
557  return remote.port();
558 }
559 
560 int TcpSession::SetMd5SocketOption(uint32_t peer_ip,
561  const string &md5_password) {
562  return server()->SetMd5SocketOption(socket_->native_handle(), peer_ip,
563  md5_password);
564 }
565 
566 int TcpSession::ClearMd5SocketOption(uint32_t peer_ip) {
567  return server()->SetMd5SocketOption(socket_->native_handle(), peer_ip, "");
568 }
569 
571  return server()->SetDscpSocketOption(socket()->native_handle(), value);
572 }
573 
574 uint8_t TcpSession::GetDscpValue() const {
575  return server_->GetDscpValue(socket()->native_handle());
576 }
577 
579  ReceiveCallback callback)
580  : session_(session), callback_(callback), offset_(0), remain_(-1) {
581 }
582 
584 }
585 
586 // Returns a buffer allocation size that is larger than the message.
588  const int kMaxMessageSize = GetMaxMessageSize();
589  if (length == -1) {
590  return kMaxMessageSize;
591  }
592  int bufsize = 1 << 8;
593  for (; bufsize < kMaxMessageSize && bufsize < length; bufsize <<= 1) {
594  }
595  return bufsize;
596 }
597 
598 uint8_t *TcpMessageReader::BufferConcat(uint8_t *data, Buffer buffer,
599  int msglength) {
600  uint8_t *dst = data;
601 
602  while (!queue_.empty()) {
603  Buffer head = queue_.front();
604  const uint8_t *cp = TcpSession::BufferData(head) + offset_;
605  int bytes = TcpSession::BufferSize(head) - offset_;
606  assert((dst - data) + bytes < msglength);
607  memcpy(dst, cp, bytes);
608  dst += bytes;
609  queue_.pop_front();
610  session_->ReleaseBuffer(head);
611  offset_ = 0;
612  remain_ = -1;
613  }
614 
615  int count = msglength - (dst - data);
616  assert((dst - data) + count <= msglength);
617  memcpy(dst, TcpSession::BufferData(buffer), count);
618  offset_ = count;
619 
620  return data;
621 }
622 
624  int total = 0;
625  for (BufferQueue::const_iterator iter = queue_.begin();
626  iter != queue_.end(); ++iter) {
627  if (total == 0) {
628  total = TcpSession::BufferSize(*iter) - offset_;
629  } else {
630  total += TcpSession::BufferSize(*iter);
631  }
632  }
633  return total;
634 }
635 
637  uint8_t *data, Buffer buffer, size_t size) const {
638  size_t offset = 0;
639 
640  for (BufferQueue::const_iterator iter = queue_.begin();
641  iter != queue_.end(); ++iter) {
642  const uint8_t *cp;
643  int avail;
644  if (offset == 0) {
645  cp = TcpSession::BufferData(*iter) + offset_;
646  avail = TcpSession::BufferSize(*iter) - offset_;
647  } else {
648  cp = TcpSession::BufferData(*iter);
649  avail = TcpSession::BufferSize(*iter);
650  }
651  int remain = size - offset;
652  avail = min(avail, remain);
653  assert(offset + avail <= size);
654  memcpy(data + offset, cp, avail);
655  offset += avail;
656  }
657 
658  int avail = TcpSession::BufferSize(buffer);
659  int remain = size - offset;
660  avail = min(avail, remain);
661  assert(offset + avail <= size);
662  memcpy(data + offset, TcpSession::BufferData(buffer), avail);
663  offset += avail;
664 
665  if (offset < size) {
666  return Buffer();
667  }
668  return Buffer(data, size);
669 }
670 
671 // Read the socket stream and send messages to the peer object.
673  const int kHeaderLenSize = GetHeaderLenSize();
674  size_t size = TcpSession::BufferSize(buffer);
675  TCP_SESSION_LOG_UT_DEBUG(session_, TCP_DIR_IN, "Read " << size << " bytes");
676 
677  if (!queue_.empty()) {
678  int msglength = MsgLength(queue_.front(), offset_);
679  if (msglength < 0) {
680  int queuelen = QueueByteLength();
681  if (queuelen + static_cast<int>(size) < kHeaderLenSize) {
682  queue_.push_back(buffer);
683  return;
684  }
685  scoped_array<uint8_t> data(new uint8_t[kHeaderLenSize]);
686  Buffer header = PullUp(data.get(), buffer, kHeaderLenSize);
687  assert(TcpSession::BufferSize(header) == (size_t) kHeaderLenSize);
688 
689  msglength = MsgLength(header, 0);
690  remain_ = msglength - queuelen;
691  }
692 
693  assert(remain_ > 0);
694  if (size < (size_t) remain_) {
695  queue_.push_back(buffer);
696  remain_ -= size;
697  return;
698  }
699 
700  // concat the buffers into a contiguous message.
701  scoped_array<uint8_t> data(new uint8_t[AllocBufferSize(msglength)]);
702  BufferConcat(data.get(), buffer, msglength);
703  assert(remain_ == -1);
704  // Receive the message
705  bool success = callback_(data.get(), msglength);
706  if (!success)
707  return;
708  }
709 
710  int avail = size - offset_;
711  while (avail > 0) {
712  int msglength = MsgLength(buffer, offset_);
713  if (msglength < 0) {
714  break;
715  }
716  if (msglength > avail) {
717  remain_ = msglength - avail;
718  break;
719  }
720  // Receive the message
721  bool success =
722  callback_(TcpSession::BufferData(buffer) + offset_, msglength);
723  offset_ += msglength;
724  avail -= msglength;
725  if (!success)
726  return;
727  }
728 
729  if (avail > 0) {
730  queue_.push_back(buffer);
731  } else {
732  session_->ReleaseBuffer(buffer);
733  offset_ = 0;
734  assert(remain_ == -1);
735  }
736 }
737 
738 //
739 // Check if a socker error is hard and fatal. Only then should we close the
740 // socket. Soft errors like EINTR and EAGAIN should be ignored or properly
741 // handled with retries
742 //
743 bool TcpSession::IsSocketErrorHard(const error_code &ec) {
744  if (!ec)
745  return false;
746  if (ec == try_again)
747  return false;
748  if (ec == would_block)
749  return false;
750  if (ec == in_progress)
751  return false;
752  if (ec == interrupted)
753  return false;
754  if (ec == network_down)
755  return false;
756  if (ec == network_reset)
757  return false;
758  if (ec == network_unreachable)
759  return false;
760  if (ec == no_buffer_space)
761  return false;
762 
763  return true;
764 }
765 
767  error_code ec;
768  boost::asio::ip::tcp::no_delay no_delay_option(true);
769  socket()->set_option(no_delay_option, ec);
770  if (ec) {
772  "tcp_no_delay set error: " << ec);
773  }
774  return ec;
775 }
776 
777 error_code TcpSession::SetTcpSendBufSize(uint32_t size) {
778  error_code ec;
779  socket_base::send_buffer_size send_buffer_size_option(size);
780  socket()->set_option(send_buffer_size_option, ec);
781  if (ec) {
783  "send_buffer_size set error: " << ec);
784  return ec;
785  }
786 
787  return ec;
788 }
789 
790 error_code TcpSession::SetTcpRecvBufSize(uint32_t size) {
791  error_code ec;
792  socket_base::receive_buffer_size receive_buffer_size_option(size);
793  socket()->set_option(receive_buffer_size_option, ec);
794  if (ec) {
796  "receive_buffer_size set error: " << ec);
797  return ec;
798  }
799 
800  return ec;
801 }
802 
803 error_code TcpSession::SetSocketKeepaliveOptions(int keepalive_time,
804  int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val) {
805  error_code ec;
806  socket_base::keep_alive keep_alive_option(true);
807  socket()->set_option(keep_alive_option, ec);
808  if (ec) {
810  "keep_alive set error: " << ec);
811  return ec;
812  }
813 #ifdef TCP_KEEPIDLE
814  typedef integer< IPPROTO_TCP, TCP_KEEPIDLE > keepalive_idle_time;
815  keepalive_idle_time keepalive_idle_time_option(keepalive_time);
816  socket()->set_option(keepalive_idle_time_option, ec);
817  if (ec) {
819  "keepalive_idle_time: " << keepalive_time << " set error: " << ec);
820  return ec;
821  }
822 #elif TCP_KEEPALIVE
823  typedef integer< IPPROTO_TCP, TCP_KEEPALIVE > keepalive_idle_time;
824  keepalive_idle_time keepalive_idle_time_option(keepalive_time);
825  socket()->set_option(keepalive_idle_time_option, ec);
826  if (ec) {
828  "keepalive_idle_time: " << keepalive_time << " set error: " << ec);
829  return ec;
830  }
831 #else
832 #error No TCP keepalive option defined.
833 #endif
834 #ifdef TCP_KEEPINTVL
835  typedef integer< IPPROTO_TCP, TCP_KEEPINTVL > keepalive_interval;
836  keepalive_interval keepalive_interval_option(keepalive_intvl);
837  socket()->set_option(keepalive_interval_option, ec);
838  if (ec) {
840  "keepalive_interval: " << keepalive_intvl << " set error: " << ec);
841  return ec;
842  }
843 #endif
844 #ifdef TCP_KEEPCNT
845  typedef integer< IPPROTO_TCP, TCP_KEEPCNT > keepalive_count;
846  keepalive_count keepalive_count_option(keepalive_probes);
847  socket()->set_option(keepalive_count_option, ec);
848  if (ec) {
850  "keepalive_probes: " << keepalive_probes << " set error: " << ec);
851  return ec;
852  }
853 #endif
854 #ifdef TCP_USER_TIMEOUT
855  typedef integer< IPPROTO_TCP, TCP_USER_TIMEOUT > tcp_user_timeout;
856  tcp_user_timeout tcp_user_timeout_option(tcp_user_timeout_val);
857  socket()->set_option(tcp_user_timeout_option, ec);
858  if (ec) {
860  "tcp_user_timeout: " << tcp_user_timeout_val << " set error: "
861  << ec);
862  return ec;
863  }
864 #endif
865 
866  return ec;
867 }
868 
870  error_code ec;
871 
872  //
873  // Make socket write non-blocking
874  //
875  socket()->non_blocking(true, ec);
876  if (ec) {
878  "Cannot set socket non blocking: " << ec);
879  return ec;
880  }
881 
882  char *buffer_size_str = getenv("TCP_SESSION_SOCKET_BUFFER_SIZE");
883  if (!buffer_size_str) return ec;
884 
885  uint32_t sz = static_cast<uint32_t>(strtoul(buffer_size_str, NULL, 0));
886  if (sz) {
887  //
888  // Set socket send and receive buffer size
889  //
890  // Currently used only under test environments to trigger partial
891  // sends more deterministically
892  //
893  socket_base::send_buffer_size send_buffer_size_option(sz);
894  socket()->set_option(send_buffer_size_option, ec);
895  if (ec) {
897  "send_buffer_size set error: " << ec);
898  return ec;
899  }
900 
901  socket_base::receive_buffer_size receive_buffer_size_option(sz);
902  socket()->set_option(receive_buffer_size_option, ec);
903  if (ec) {
905  "receive_buffer_size set error: " << ec);
906  return ec;
907  }
908  }
909 
910  return ec;
911 }
912 
913 void TcpSession::GetRxSocketStats(SocketIOStats *socket_stats) const {
914  stats_.GetRxStats(socket_stats);
915 }
916 
917 void TcpSession::GetTxSocketStats(SocketIOStats *socket_stats) const {
918  stats_.GetTxStats(socket_stats);
919 }
std::string ResolveCanonicalNameIPv6(const std::string &ipv6)
Definition: address_util.cc:89
std::string ResolveCanonicalName()
Definition: address_util.cc:40
boost::asio::io_context * io_service()
Definition: event_manager.h:42
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
int GetTaskId(const std::string &name)
Definition: task.cc:861
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
static const int kTaskInstanceAny
Specifies value for wildcard (any or *) task data ID.
Definition: task.h:104
TcpMessageReader(TcpSession *session, ReceiveCallback callback)
Definition: tcp_session.cc:578
Buffer PullUp(uint8_t *data, Buffer buffer, size_t size) const
Definition: tcp_session.cc:636
int QueueByteLength() const
Definition: tcp_session.cc:623
int AllocBufferSize(int length)
Definition: tcp_session.cc:587
virtual void OnRead(Buffer buffer)
Definition: tcp_session.cc:672
ReceiveCallback callback_
Definition: tcp_session.h:331
virtual ~TcpMessageReader()
Definition: tcp_session.cc:583
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
Definition: tcp_session.h:307
virtual const int GetMaxMessageSize()=0
BufferQueue queue_
Definition: tcp_session.h:332
TcpSession * session_
Definition: tcp_session.h:330
virtual int MsgLength(Buffer buffer, int offset)=0
uint8_t * BufferConcat(uint8_t *data, Buffer buffer, int msglength)
Definition: tcp_session.cc:598
boost::asio::const_buffer Buffer
Definition: tcp_session.h:306
virtual const int GetHeaderLenSize()=0
EventManager * event_manager()
Definition: tcp_server.h:76
int SetDscpSocketOption(NativeSocketType fd, uint8_t value)
Definition: tcp_server.cc:537
int SetMd5SocketOption(NativeSocketType fd, uint32_t peer_ip, const std::string &md5_password)
Definition: tcp_server.cc:484
TcpSessionPtr session_
Definition: tcp_session.cc:82
string Description() const
Gives a description of the task.
Definition: tcp_session.cc:79
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
Definition: tcp_session.cc:65
Reader(TcpSessionPtr session, ReadHandler read_fn, Buffer buffer)
Definition: tcp_session.cc:61
ReadHandler read_fn_
Definition: tcp_session.cc:83
function< void(Buffer)> ReadHandler
Definition: tcp_session.cc:59
@ CONNECT_COMPLETE
Definition: tcp_session.h:46
@ CONNECT_FAILED
Definition: tcp_session.h:47
virtual size_t GetReadBufferSize() const
Definition: tcp_session.cc:476
void SetName()
Definition: tcp_session.cc:224
TcpServer * server()
Definition: tcp_session.h:84
static void AsyncReadHandler(TcpSessionPtr session)
Definition: tcp_session.cc:484
virtual int GetSessionInstance() const
Definition: tcp_session.cc:531
int ClearMd5SocketOption(uint32_t peer_ip)
Definition: tcp_session.cc:566
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
Definition: tcp_session.cc:200
static bool IsSocketErrorHard(const boost::system::error_code &ec)
Definition: tcp_session.cc:743
void TriggerAsyncReadHandler()
Definition: tcp_session.cc:347
boost::scoped_ptr< Strand > io_strand_
Definition: tcp_session.h:226
virtual void WriteReady(const boost::system::error_code &error)
Definition: tcp_session.cc:376
std::atomic< bool > defer_reader_
Definition: tcp_session.h:280
int32_t remote_port() const
Definition: tcp_session.cc:548
void AsyncReadStartInternal(TcpSessionPtr session)
Definition: tcp_session.cc:161
boost::scoped_ptr< Socket > socket_
Definition: tcp_session.h:258
virtual Task * CreateReaderTask(boost::asio::mutable_buffer, size_t)
Definition: tcp_session.cc:461
std::atomic< bool > tcp_close_in_progress_
Definition: tcp_session.h:283
boost::system::error_code SetTcpSendBufSize(uint32_t size)
Definition: tcp_session.cc:777
std::mutex obs_mutex_
Definition: tcp_session.h:273
virtual void AsyncReadSome()
Definition: tcp_session.cc:193
uint8_t GetDscpValue() const
Definition: tcp_session.cc:574
boost::asio::strand< boost::asio::io_context::executor_type > Strand
Definition: tcp_session.h:225
void AsyncWriteInternal(TcpSessionPtr session)
Definition: tcp_session.cc:418
void ReleaseBufferLocked(Buffer buffer)
Definition: tcp_session.cc:149
virtual void OnRead(Buffer buffer)=0
static int reader_task_id_
Definition: tcp_session.h:255
virtual Socket * socket() const
Definition: tcp_session.h:82
std::atomic< bool > write_blocked_
Definition: tcp_session.h:282
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:109
void ConnectFailed()
Definition: tcp_session.cc:298
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
Definition: tcp_session.cc:790
int SetMd5SocketOption(uint32_t peer_ip, const std::string &md5_password)
Definition: tcp_session.cc:560
BufferQueue buffer_queue_
Definition: tcp_session.h:267
boost::system::error_code SetTcpNoDelay()
Definition: tcp_session.cc:766
static void AsyncWriteHandler(TcpSessionPtr session, const boost::system::error_code &error, std::size_t bytes_transferred)
Definition: tcp_session.cc:379
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:112
void GetRxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:913
void DeleteBuffer(boost::asio::mutable_buffer buffer)
Definition: tcp_session.cc:127
void SessionEstablished(Endpoint remote, Direction direction)
Definition: tcp_session.cc:246
virtual void ReleaseBuffer(Buffer buffer)
Definition: tcp_session.cc:144
boost::asio::mutable_buffer AllocateBuffer(size_t buffer_size)
Definition: tcp_session.cc:120
void set_observer(EventObserver observer)
Definition: tcp_session.cc:219
virtual ~TcpSession()
Definition: tcp_session.cc:111
boost::function< void(TcpSession *, Event)> EventObserver
Definition: tcp_session.h:59
std::atomic< int > refcount_
Definition: tcp_session.h:278
virtual void SetDeferReader(bool defer_reader)
Definition: tcp_session.cc:183
boost::asio::ip::tcp::endpoint Endpoint
Definition: tcp_session.h:58
std::mutex mutex_
Definition: tcp_session.h:221
boost::asio::const_buffer Buffer
Definition: tcp_session.h:60
boost::system::error_code close_reason_
Definition: tcp_session.h:268
boost::scoped_ptr< TcpMessageWriter > writer_
Definition: tcp_session.h:276
TcpServerPtr server_
Definition: tcp_session.h:257
EventObserver observer_
Definition: tcp_session.h:274
void CloseInternal(const boost::system::error_code &ec, bool call_observer, bool notify_server=true)
Definition: tcp_session.cc:306
TcpSession(TcpServer *server, Socket *socket, bool async_read_ready=true, size_t buffer_send_size=TcpSession::kDefaultWriteBufferSize)
Definition: tcp_session.cc:87
Direction direction_
Definition: tcp_session.h:266
bool closed_
Definition: tcp_session.h:263
boost::intrusive_ptr< TcpSession > TcpSessionPtr
Definition: tcp_session.h:180
bool IsEstablishedLocked() const
Definition: tcp_session.h:246
virtual boost::system::error_code SetSocketOptions()
Definition: tcp_session.cc:869
Endpoint local_endpoint() const
Definition: tcp_session.cc:206
EventObserver observer()
Definition: tcp_session.h:210
boost::system::error_code SetSocketKeepaliveOptions(int keepalive_time, int keepalive_intvl, int keepalive_probes, int tcp_user_timeout_val=0)
Definition: tcp_session.cc:803
bool established_
Definition: tcp_session.h:262
void GetTxSocketStats(SocketIOStats *socket_stats) const
Definition: tcp_session.cc:917
std::string name_
Definition: tcp_session.h:279
virtual void Accepted()
Definition: tcp_session.cc:255
Endpoint remote_
Definition: tcp_session.h:264
static const int kDefaultBufferSize
Definition: tcp_session.h:40
std::string remote_addr_str_
Definition: tcp_session.h:265
virtual bool Connected(Endpoint remote)
Definition: tcp_session.cc:270
boost::asio::ip::tcp::socket Socket
Definition: tcp_session.h:56
virtual void AsyncReadStart()
Definition: tcp_session.cc:175
io::SocketStats stats_
Definition: tcp_session.h:222
std::string uve_key_str_
Definition: tcp_session.h:281
int32_t local_port() const
Definition: tcp_session.cc:536
virtual bool Send(const uint8_t *data, size_t size, size_t *sent)
Definition: tcp_session.cc:429
virtual size_t ReadSome(boost::asio::mutable_buffer buffer, boost::system::error_code *error)
Definition: tcp_session.cc:469
int SetDscpSocketOption(uint8_t value)
Definition: tcp_session.cc:570
void Close()
Definition: tcp_session.cc:355
bool read_on_connect_
Definition: tcp_session.h:259
#define TCP_SESSION_LOG_DEBUG(session, dir, arg)
Definition: io_log.h:123
#define TCP_DIR_NA
Definition: io_log.h:48
#define TCP_DIR_OUT
Definition: io_log.h:46
#define TCP_SESSION_LOG_ERROR(session, dir, arg)
Definition: io_log.h:117
#define TCP_DIR_IN
Definition: io_log.h:47
#define TCP_SESSION_LOG_UT_DEBUG(session, dir, arg)
Definition: io_log.h:126
std::atomic< uint64_t > read_block_start_time
Definition: io_utils.h:29
void GetRxStats(SocketIOStats *socket_stats) const
Definition: io_utils.cc:28
std::atomic< uint64_t > read_blocked_duration_usecs
Definition: io_utils.h:31
void GetTxStats(SocketIOStats *socket_stats) const
Definition: io_utils.cc:47
struct task_ task
static int BufferCmp(const mutable_buffer &lhs, const const_buffer &rhs)
Definition: tcp_session.cc:132
#define SSL_SHORT_READ_ERROR
Definition: tcp_session.h:26
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13