OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
udp_server.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "io/udp_server.h"
6 
7 #include <boost/bind.hpp>
8 
9 #include "base/logging.h"
10 #include "base/address_util.h"
11 #include "io/io_log.h"
12 #include "io/io_utils.h"
13 
14 using boost::asio::buffer_cast;
15 using boost::asio::mutable_buffer;
16 using boost::asio::mutable_buffers_1;
17 using boost::asio::const_buffer;
18 using boost::asio::ip::udp;
19 
21 
22 class UdpServer::Reader : public Task {
23 public:
24  Reader(UdpServerPtr server, const udp::endpoint &remote_endpoint,
25  const const_buffer &buffer)
26  : Task(server->reader_task_id(),
27  server->reader_task_instance(remote_endpoint)),
28  server_(server),
29  remote_endpoint_(remote_endpoint),
30  buffer_(buffer) {
31  }
32 
33  virtual bool Run() {
34  tbb::mutex::scoped_lock lock(server_->state_guard_);
35  if (server_->state_ == OK) {
37  server_->DeallocateBuffer(buffer_);
38  }
39  return true;
40  }
41  std::string Description() const { return "UdpServer::Reader"; }
42 
43 private:
45  udp::endpoint remote_endpoint_;
46  const_buffer buffer_;
47 };
48 
49 UdpServer::UdpServer(boost::asio::io_context *io_service, int buffer_size):
50  socket_(*io_service),
51  buffer_size_(buffer_size),
52  state_(Uninitialized),
53  evm_(NULL) {
54  if (reader_task_id_ == -1) {
56  reader_task_id_ = scheduler->GetTaskId("io::udp::ReaderTask");
57  }
58  refcount_ = 0;
60 }
61 
63  socket_(*(evm->io_service())),
64  buffer_size_(buffer_size),
65  state_(Uninitialized),
66  evm_(evm) {
67  if (reader_task_id_ == -1) {
69  reader_task_id_ = scheduler->GetTaskId("io::udp::ReaderTask");
70  }
71  refcount_ = 0;
73 }
74 
75 int UdpServer::reader_task_instance(const udp::endpoint &rep) const {
77 }
78 
79 void UdpServer::SetName(udp::endpoint ep) {
80  std::ostringstream s;
81  boost::system::error_code ec;
82  s << "Udpsocket@" << ep;
83  name_ = s.str();
84 }
85 
87  {
88  tbb::mutex::scoped_lock lock(state_guard_);
89  assert(state_ == Uninitialized || state_ == SocketOpenFailed ||
91  }
92  {
93  tbb::mutex::scoped_lock lock(pbuf_guard_);
94  assert(pbuf_.empty());
95  }
96 }
97 
99  tbb::mutex::scoped_lock lock(state_guard_);
100  {
101  tbb::mutex::scoped_lock lock_pbuf(pbuf_guard_);
102  while (!pbuf_.empty()) {
103  delete[] pbuf_.back();
104  pbuf_.pop_back();
105  }
106  }
107  if (socket_.is_open()) {
108  boost::system::error_code ec;
109  socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
110  if (ec) {
112  "ERROR shutdown UDP socket: " << ec);
113  }
114  socket_.close(ec);
115  if (ec) {
117  "ERROR closing UDP socket: " << ec);
118  }
119  }
121 }
122 
123 bool UdpServer::Initialize(const std::string &ipaddress, unsigned short port) {
124  boost::system::error_code error;
125  boost::asio::ip::address ip = AddressFromString(ipaddress, &error);
126  if (!error) {
127  udp::endpoint local_endpoint = udp::endpoint(ip, port);
128  return Initialize(local_endpoint);
129  } else {
130  UDP_SERVER_LOG_ERROR(this, UDP_DIR_NA, "IP address conversion: "
131  << ipaddress << ": " << error);
132  return false;
133  }
134 }
135 
136 bool UdpServer::Initialize(unsigned short port) {
137  udp::endpoint local_endpoint = udp::endpoint(udp::v4(), port);
138  return Initialize(local_endpoint);
139 }
140 
141 bool UdpServer::Initialize(udp::endpoint local_endpoint) {
142  if (GetServerState() != Uninitialized) {
144  "Initialize UDP server in WRONG state: " << state_);
145  return false;
146  }
147  boost::system::error_code error;
148  socket_.open(udp::v4(), error);
149  if (error) {
150  UDP_SERVER_LOG_ERROR(this, UDP_DIR_NA, "UDP socket open FAILED: " <<
151  error.message());
153  return false;
154  }
155  socket_.bind(local_endpoint, error);
156  if (error) {
157  boost::system::error_code ec;
158  UDP_SERVER_LOG_ERROR(this, UDP_DIR_NA, "UDP socket bind FAILED: "
159  << error.message() << ":" << socket_.local_endpoint(ec));
161  socket_.close(ec);
162  return false;
163  }
164  SetName(local_endpoint);
165  state_ = OK;
166  return true;
167 }
168 
169 mutable_buffer UdpServer::AllocateBuffer(std::size_t s) {
170  uint8_t *p = new uint8_t[s];
171  {
172  tbb::mutex::scoped_lock lock(pbuf_guard_);
173  pbuf_.push_back(p);
174  }
175  return mutable_buffer(p, s);
176 }
177 
178 mutable_buffer UdpServer::AllocateBuffer() {
180 }
181 
182 void UdpServer::DeallocateBuffer(const const_buffer &buffer) {
183  const uint8_t *p = buffer_cast<const uint8_t *>(buffer);
184  {
185  tbb::mutex::scoped_lock lock(pbuf_guard_);
186  std::vector<uint8_t *>::iterator f = std::find(pbuf_.begin(),
187  pbuf_.end(), p);
188  if (f != pbuf_.end())
189  pbuf_.erase(f);
190  }
191  delete[] p;
192 }
193 
194 void UdpServer::StartSend(udp::endpoint ep, std::size_t bytes_to_send,
195  const_buffer buffer) {
196  if (state_ == OK) {
197  socket_.async_send_to(boost::asio::buffer(buffer), ep,
198  boost::bind(&UdpServer::HandleSendInternal, UdpServerPtr(this),
199  buffer, ep,
200  boost::asio::placeholders::bytes_transferred,
201  boost::asio::placeholders::error));
202  } else {
205  "StartSend UDP server in WRONG state: " << state_);
206  DeallocateBuffer(buffer);
207  }
208 }
209 
210 void UdpServer::HandleSendInternal(const const_buffer send_buffer,
211  udp::endpoint remote_endpoint, std::size_t bytes_transferred,
212  const boost::system::error_code& error) {
213  tbb::mutex::scoped_lock lock(state_guard_);
214  if (state_ != OK) {
217  "Send UDP server in WRONG state: " << state_);
218  return;
219  }
220  if (error) {
223  "Send to " << remote_endpoint << " FAILED due to error: " <<
224  error.value() << " : " << error.message());
225  DeallocateBuffer(send_buffer);
226  return;
227  }
228  // Update write statistics.
230  stats_.write_bytes += bytes_transferred;
231  // Call the handler
232  HandleSend(send_buffer, remote_endpoint, bytes_transferred, error);
233 }
234 
236  if (state_ == OK) {
237  mutable_buffer b(AllocateBuffer());
238  const_buffer buffer(buffer_cast<const uint8_t*>(b), buffer_size(b));
239  socket_.async_receive_from(mutable_buffers_1(b),
241  UdpServerPtr(this), buffer,
242  boost::asio::placeholders::bytes_transferred,
243  boost::asio::placeholders::error));
244  } else {
247  "StartReceive UDP server in WRONG state: " << state_);
248  }
249 }
250 
251 void UdpServer::HandleReceiveInternal(const_buffer recv_buffer,
252  std::size_t bytes_transferred, const boost::system::error_code& error) {
253  tbb::mutex::scoped_lock lock(state_guard_);
254  if (state_ != OK) {
257  "Receive UDP server in WRONG state: " << state_);
258  return;
259  }
260  if (error) {
263  "Read FAILED due to error: " << error.value() << " : " <<
264  error.message());
265  DeallocateBuffer(recv_buffer);
266  } else {
267  // Update read statistics.
268  stats_.read_calls++;
269  stats_.read_bytes += bytes_transferred;
270  // Call the handler
271  HandleReceive(recv_buffer, remote_endpoint_, bytes_transferred, error);
272  }
273  StartReceive();
274 }
275 
276 void UdpServer::HandleReceive(const const_buffer &recv_buffer,
277  udp::endpoint remote_endpoint, std::size_t bytes_transferred,
278  const boost::system::error_code& error) {
279  const_buffer rdbuf(buffer_cast<const uint8_t *>(recv_buffer),
280  bytes_transferred);
281  Reader *task = new Reader(UdpServerPtr(this), remote_endpoint,
282  rdbuf);
283  // Starting a new task for the session
285  scheduler->Enqueue(task);
286 }
287 
288 void UdpServer::OnRead(const const_buffer &recv_buffer,
289  const udp::endpoint &remote_endpoint) {
290  UDP_SERVER_LOG_ERROR(this, UDP_DIR_IN, "Receive UDP: " <<
291  "Default implementation of OnRead does NOT process received message");
292 }
293 
294 void UdpServer::HandleSend(boost::asio::const_buffer send_buffer,
295  udp::endpoint remote_endpoint, std::size_t bytes_transferred,
296  const boost::system::error_code& error) {
297  DeallocateBuffer(send_buffer);
298 }
299 
300 udp::endpoint UdpServer::GetLocalEndpoint(boost::system::error_code *error)
301  const {
302  return socket_.local_endpoint(*error);
303 }
304 
306  boost::system::error_code error;
307  udp::endpoint ep = GetLocalEndpoint(&error);
308  if (error.value())
309  return "";
310  return ep.address().to_string();
311 }
312 
314  boost::system::error_code error;
315  udp::endpoint ep = GetLocalEndpoint(&error);
316  if (error.value())
317  return -1;
318  return ep.port();
319 }
320 
321 void UdpServer::GetRxSocketStats(SocketIOStats *socket_stats) const {
322  stats_.GetRxStats(socket_stats);
323 }
324 
325 void UdpServer::GetTxSocketStats(SocketIOStats *socket_stats) const {
326  stats_.GetTxStats(socket_stats);
327 }
328 
329 //
330 // UdpServerManager class routines
331 //
333 
335  impl_.AddServer(server);
336 }
337 
339  impl_.DeleteServer(server);
340 }
#define UDP_DIR_OUT
Definition: io_log.h:53
boost::intrusive_ptr< UdpServer > UdpServerPtr
Definition: udp_server.h:136
virtual ~UdpServer()
Definition: udp_server.cc:86
ServerState state_
Definition: udp_server.h:123
boost::asio::ip::udp::endpoint remote_endpoint_
Definition: udp_server.h:126
virtual int reader_task_instance(const boost::asio::ip::udp::endpoint &remote_endpoint) const
Definition: udp_server.cc:75
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
virtual void Shutdown()
Definition: udp_server.cc:98
void DeallocateBuffer(const boost::asio::const_buffer &buffer)
Definition: udp_server.cc:182
static void DeleteServer(ServerType *server)
static const int kTaskInstanceAny
Definition: task.h:102
std::string name_
Definition: udp_server.h:125
tbb::atomic< uint64_t > write_errors
Definition: io_utils.h:25
tbb::atomic< uint64_t > write_bytes
Definition: io_utils.h:24
boost::asio::mutable_buffer AllocateBuffer()
Definition: udp_server.cc:178
void GetTxStats(SocketIOStats *socket_stats) const
Definition: io_utils.cc:47
static ServerManager< UdpServer, UdpServerPtr > impl_
Definition: udp_server.h:155
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Definition: udp_server.cc:33
Definition: task_int.h:10
static void AddServer(ServerType *server)
void HandleReceiveInternal(boost::asio::const_buffer recv_buffer, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: udp_server.cc:251
virtual void HandleSend(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: udp_server.cc:294
int GetTaskId(const std::string &name)
Definition: task.cc:856
Reader(UdpServerPtr server, const udp::endpoint &remote_endpoint, const const_buffer &buffer)
Definition: udp_server.cc:24
void GetRxSocketStats(SocketIOStats *socket_stats) const
Definition: udp_server.cc:321
tbb::atomic< int > refcount_
Definition: udp_server.h:130
void GetTxSocketStats(SocketIOStats *socket_stats) const
Definition: udp_server.cc:325
#define UDP_DIR_IN
Definition: io_log.h:54
tbb::atomic< uint64_t > write_calls
Definition: io_utils.h:23
io::SocketStats stats_
Definition: udp_server.h:131
tbb::atomic< uint64_t > read_bytes
Definition: io_utils.h:21
boost::asio::ip::udp::socket socket_
Definition: udp_server.h:121
tbb::atomic< uint64_t > read_calls
Definition: io_utils.h:20
std::string Description() const
Definition: udp_server.cc:41
UdpServerPtr server_
Definition: udp_server.cc:44
void SetName(boost::asio::ip::udp::endpoint ep)
Definition: udp_server.cc:79
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
virtual void OnRead(const boost::asio::const_buffer &recv_buffer, const boost::asio::ip::udp::endpoint &remote_endpoint)
Definition: udp_server.cc:288
void GetRxStats(SocketIOStats *socket_stats) const
Definition: io_utils.cc:28
std::vector< uint8_t * > pbuf_
Definition: udp_server.h:129
void StartSend(boost::asio::ip::udp::endpoint ep, std::size_t bytes_to_send, boost::asio::const_buffer buffer)
Definition: udp_server.cc:194
virtual int reader_task_id() const
Definition: udp_server.h:81
virtual void HandleReceive(const boost::asio::const_buffer &recv_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: udp_server.cc:276
#define UDP_SERVER_LOG_ERROR(server, dir, arg)
Definition: io_log.h:160
tbb::atomic< uint64_t > read_errors
Definition: io_utils.h:22
boost::asio::ip::udp::endpoint GetLocalEndpoint(boost::system::error_code *error) const
Definition: udp_server.cc:300
UdpServer(EventManager *evm, int buffer_size=kDefaultBufferSize)
Definition: udp_server.cc:62
udp::endpoint remote_endpoint_
Definition: udp_server.cc:45
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
void HandleSendInternal(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: udp_server.cc:210
tbb::mutex state_guard_
Definition: udp_server.h:127
static int reader_task_id_
Definition: udp_server.h:120
static void DeleteServer(UdpServer *server)
Definition: udp_server.cc:338
std::string GetLocalEndpointAddress() const
Definition: udp_server.cc:305
static void AddServer(UdpServer *server)
Definition: udp_server.cc:334
void StartReceive()
Definition: udp_server.cc:235
int GetLocalEndpointPort() const
Definition: udp_server.cc:313
const_buffer buffer_
Definition: udp_server.cc:46
ServerState GetServerState() const
Definition: udp_server.h:48
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
tbb::mutex pbuf_guard_
Definition: udp_server.h:128
virtual bool Initialize(unsigned short port)
Definition: udp_server.cc:136
int buffer_size_
Definition: udp_server.h:122
static EventManager evm
#define UDP_DIR_NA
Definition: io_log.h:55