OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
tcp_message_write.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "boost/asio/detail/recycling_allocator.hpp"
6 
7 #include "io/tcp_message_write.h"
8 
9 #include "base/util.h"
10 #include "base/logging.h"
11 #include "io/tcp_session.h"
12 #include "io/io_log.h"
13 
14 using boost::asio::buffer;
15 using boost::asio::buffer_cast;
16 using boost::asio::mutable_buffer;
17 using boost::system::error_code;
18 using tbb::mutex;
19 using std::min;
20 
24 
26  size_t buffer_send_size) :
27  offset_(0), last_write_(0), buffer_send_size_(buffer_send_size),
28  session_(session) {
29 }
30 
32  for (BufferQueue::iterator iter = buffer_queue_.begin();
33  iter != buffer_queue_.end(); ++iter) {
34  DeleteBuffer(*iter);
35  }
36  buffer_queue_.clear();
37 }
38 
39 int TcpMessageWriter::AsyncSend(const uint8_t *data, size_t len, error_code *ec) {
40 
41  int write = len;
42 
43  if (buffer_queue_.empty()) {
44  BufferAppend(data, len);
45  if (session_->io_strand_) {
46  boost::asio::detail::recycling_allocator<void> allocator;
48  session_, TcpSessionPtr(session_)), allocator);
49  }
50  } else {
51  BufferAppend(data, len);
52  }
53 
55  if (!session_->write_blocked_) {
56  /* throttle the sender */
58  session_->server_->stats_.write_blocked++;
60  session_->write_blocked_ = true;
61  }
62  write = 0;
63  }
64 
65  return write;
66 }
67 
69 
70  /* assert if there is an async write in progress */
71  assert(last_write_ == 0);
72  assert(!buffer_queue_.empty());
73 
74  boost::asio::mutable_buffer head = buffer_queue_.front();
75  size_t remaining = buffer_size(head) - offset_;
76  last_write_ = min(buffer_send_size_, remaining);
77 
78  // Update socket write call statistics.
80  session_->server_->stats_.write_calls++;
81 
82  const uint8_t *data = buffer_cast<const uint8_t *>(head) + offset_;
84 }
85 
86 bool TcpMessageWriter::UpdateBufferQueue(size_t wrote, bool *send_ready) {
87 
88  assert(last_write_ == wrote);
89  assert(!buffer_queue_.empty());
90 
91  bool more_write = true;
92  last_write_ = 0;
93  *send_ready = false;
94 
95  boost::asio::mutable_buffer head = buffer_queue_.front();
96  if ((offset_ + wrote) == buffer_size(head)) {
97  offset_ = 0;
98  DeleteBuffer(head);
99  buffer_queue_.pop_front();
100  } else {
101  offset_ += wrote;
102  }
103 
106  uint64_t blocked_usecs = UTCTimestampUsec() -
108  session_->stats_.write_blocked_duration_usecs += blocked_usecs;
109  session_->server_->stats_.write_blocked_duration_usecs += blocked_usecs;
110  session_->write_blocked_ = false;
111  *send_ready = true;
112  }
113 
114  if (buffer_queue_.empty()) {
115  buffer_queue_.clear();
116  more_write = false;
117  }
118 
119  return more_write;
120 }
121 
122 void TcpMessageWriter::BufferAppend(const uint8_t *src, int bytes) {
123  uint8_t *data = new uint8_t[bytes];
124  memcpy(data, src, bytes);
125  mutable_buffer buffer = mutable_buffer(data, bytes);
126  buffer_queue_.push_back(buffer);
127 }
128 
129 void TcpMessageWriter::DeleteBuffer(mutable_buffer buffer) {
130  const uint8_t *data = buffer_cast<const uint8_t *>(buffer);
131  delete[] data;
132  return;
133 }
134 
bool UpdateBufferQueue(size_t wrote, bool *send_ready)
TcpSession * session_
tbb::atomic< uint64_t > write_block_start_time
Definition: io_utils.h:26
void AsyncWriteInternal(TcpSessionPtr session)
Definition: tcp_session.cc:417
tbb::atomic< uint64_t > write_blocked
Definition: io_utils.h:27
TcpServerPtr server_
Definition: tcp_session.h:261
boost::scoped_ptr< Strand > io_strand_
Definition: tcp_session.h:230
void BufferAppend(const uint8_t *data, int len)
boost::intrusive_ptr< TcpSession > TcpSessionPtr
io::SocketStats stats_
Definition: tcp_session.h:226
tbb::atomic< uint64_t > write_calls
Definition: io_utils.h:23
tbb::atomic< uint64_t > write_blocked_duration_usecs
Definition: io_utils.h:28
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
Definition: tcp_session.cc:199
void DeleteBuffer(boost::asio::mutable_buffer buffer)
BufferQueue buffer_queue_
static const int kMaxPendingBufferSize
TcpMessageWriter(TcpSession *session, size_t buffer_send_size)
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
size_t GetBufferQueueSize() const
int AsyncSend(const uint8_t *msg, size_t len, boost::system::error_code *ec)
static const int kDefaultWriteBufferSize
tbb::atomic< bool > write_blocked_
Definition: tcp_session.h:286
static const int kMinPendingBufferSize