OpenSDN source code
tcp_message_write.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "boost/asio/detail/recycling_allocator.hpp"
6 
7 #include "io/tcp_message_write.h"
8 
9 #include "base/util.h"
10 #include "base/logging.h"
11 #include "io/tcp_session.h"
12 #include "io/io_log.h"
13 
14 using boost::asio::buffer;
15 using boost::asio::buffer_cast;
16 using boost::asio::mutable_buffer;
17 using boost::system::error_code;
18 using std::min;
19 
23 
25  size_t buffer_send_size) :
26  offset_(0), last_write_(0), buffer_send_size_(buffer_send_size),
27  session_(session) {
28 }
29 
31  for (BufferQueue::iterator iter = buffer_queue_.begin();
32  iter != buffer_queue_.end(); ++iter) {
33  DeleteBuffer(*iter);
34  }
35  buffer_queue_.clear();
36 }
37 
38 int TcpMessageWriter::AsyncSend(const uint8_t *data, size_t len, error_code *ec) {
39 
40  int write = len;
41 
42  if (buffer_queue_.empty()) {
43  BufferAppend(data, len);
44  if (session_->io_strand_) {
45  boost::asio::detail::recycling_allocator<void> allocator;
47  session_, TcpSessionPtr(session_)), allocator);
48  }
49  } else {
50  BufferAppend(data, len);
51  }
52 
54  if (!session_->write_blocked_) {
55  /* throttle the sender */
57  session_->server_->stats_.write_blocked++;
59  session_->write_blocked_ = true;
60  }
61  write = 0;
62  }
63 
64  return write;
65 }
66 
68 
69  /* assert if there is an async write in progress */
70  assert(last_write_ == 0);
71  assert(!buffer_queue_.empty());
72 
73  boost::asio::mutable_buffer head = buffer_queue_.front();
74  size_t remaining = buffer_size(head) - offset_;
75  last_write_ = min(buffer_send_size_, remaining);
76 
77  // Update socket write call statistics.
79  session_->server_->stats_.write_calls++;
80 
81  const uint8_t *data = buffer_cast<const uint8_t *>(head) + offset_;
83 }
84 
85 bool TcpMessageWriter::UpdateBufferQueue(size_t wrote, bool *send_ready) {
86 
87  assert(last_write_ == wrote);
88  assert(!buffer_queue_.empty());
89 
90  bool more_write = true;
91  last_write_ = 0;
92  *send_ready = false;
93 
94  boost::asio::mutable_buffer head = buffer_queue_.front();
95  if ((offset_ + wrote) == buffer_size(head)) {
96  offset_ = 0;
97  DeleteBuffer(head);
98  buffer_queue_.pop_front();
99  } else {
100  offset_ += wrote;
101  }
102 
105  uint64_t blocked_usecs = UTCTimestampUsec() -
107  session_->stats_.write_blocked_duration_usecs += blocked_usecs;
108  session_->server_->stats_.write_blocked_duration_usecs += blocked_usecs;
109  session_->write_blocked_ = false;
110  *send_ready = true;
111  }
112 
113  if (buffer_queue_.empty()) {
114  buffer_queue_.clear();
115  more_write = false;
116  }
117 
118  return more_write;
119 }
120 
121 void TcpMessageWriter::BufferAppend(const uint8_t *src, int bytes) {
122  uint8_t *data = new uint8_t[bytes];
123  memcpy(data, src, bytes);
124  mutable_buffer buffer = mutable_buffer(data, bytes);
125  buffer_queue_.push_back(buffer);
126 }
127 
128 void TcpMessageWriter::DeleteBuffer(mutable_buffer buffer) {
129  const uint8_t *data = buffer_cast<const uint8_t *>(buffer);
130  delete[] data;
131  return;
132 }
133 
TcpSession * session_
boost::intrusive_ptr< TcpSession > TcpSessionPtr
void DeleteBuffer(boost::asio::mutable_buffer buffer)
static const int kMaxPendingBufferSize
static const int kMinPendingBufferSize
void BufferAppend(const uint8_t *data, int len)
int AsyncSend(const uint8_t *msg, size_t len, boost::system::error_code *ec)
TcpMessageWriter(TcpSession *session, size_t buffer_send_size)
size_t GetBufferQueueSize() const
BufferQueue buffer_queue_
bool UpdateBufferQueue(size_t wrote, bool *send_ready)
static const int kDefaultWriteBufferSize
virtual void AsyncWrite(const uint8_t *data, std::size_t size)
Definition: tcp_session.cc:200
boost::scoped_ptr< Strand > io_strand_
Definition: tcp_session.h:226
void AsyncWriteInternal(TcpSessionPtr session)
Definition: tcp_session.cc:418
std::atomic< bool > write_blocked_
Definition: tcp_session.h:282
TcpServerPtr server_
Definition: tcp_session.h:257
io::SocketStats stats_
Definition: tcp_session.h:222
std::atomic< uint64_t > write_calls
Definition: io_utils.h:23
std::atomic< uint64_t > write_blocked_duration_usecs
Definition: io_utils.h:28
std::atomic< uint64_t > write_blocked
Definition: io_utils.h:27
std::atomic< uint64_t > write_block_start_time
Definition: io_utils.h:26
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13