OpenSDN source code
udp_server.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "io/udp_server.h"
6 
7 #include <boost/bind/bind.hpp>
8 
9 #include "base/logging.h"
10 #include "base/address_util.h"
11 #include "io/io_log.h"
12 #include "io/io_utils.h"
13 
14 using boost::asio::buffer_cast;
15 using boost::asio::mutable_buffer;
16 using boost::asio::mutable_buffers_1;
17 using boost::asio::const_buffer;
18 using boost::asio::ip::udp;
19 using namespace boost::placeholders;
20 
22 
23 class UdpServer::Reader : public Task {
24 public:
25  Reader(UdpServerPtr server, const udp::endpoint &remote_endpoint,
26  const const_buffer &buffer)
27  : Task(server->reader_task_id(),
28  server->reader_task_instance(remote_endpoint)),
29  server_(server),
30  remote_endpoint_(remote_endpoint),
31  buffer_(buffer) {
32  }
33 
34  virtual bool Run() {
35  std::scoped_lock lock(server_->state_guard_);
36  if (server_->state_ == OK) {
37  server_->OnRead(buffer_, remote_endpoint_);
38  server_->DeallocateBuffer(buffer_);
39  }
40  return true;
41  }
42  std::string Description() const { return "UdpServer::Reader"; }
43 
44 private:
46  udp::endpoint remote_endpoint_;
47  const_buffer buffer_;
48 };
49 
50 UdpServer::UdpServer(boost::asio::io_context *io_service, int buffer_size):
51  socket_(*io_service),
52  buffer_size_(buffer_size),
53  state_(Uninitialized),
54  evm_(NULL) {
55  if (reader_task_id_ == -1) {
57  reader_task_id_ = scheduler->GetTaskId("io::udp::ReaderTask");
58  }
59  refcount_ = 0;
61 }
62 
64  socket_(*(evm->io_service())),
65  buffer_size_(buffer_size),
66  state_(Uninitialized),
67  evm_(evm) {
68  if (reader_task_id_ == -1) {
70  reader_task_id_ = scheduler->GetTaskId("io::udp::ReaderTask");
71  }
72  refcount_ = 0;
74 }
75 
76 int UdpServer::reader_task_instance(const udp::endpoint &rep) const {
78 }
79 
80 void UdpServer::SetName(udp::endpoint ep) {
81  std::ostringstream s;
82  boost::system::error_code ec;
83  s << "Udpsocket@" << ep;
84  name_ = s.str();
85 }
86 
88  {
89  std::scoped_lock lock(state_guard_);
90  assert(state_ == Uninitialized || state_ == SocketOpenFailed ||
92  }
93  {
94  std::scoped_lock lock(pbuf_guard_);
95  assert(pbuf_.empty());
96  }
97 }
98 
100  std::scoped_lock lock(state_guard_);
101  {
102  std::scoped_lock lock_pbuf(pbuf_guard_);
103  while (!pbuf_.empty()) {
104  delete[] pbuf_.back();
105  pbuf_.pop_back();
106  }
107  }
108  if (socket_.is_open()) {
109  boost::system::error_code ec;
110  socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
111  if (ec) {
113  "ERROR shutdown UDP socket: " << ec);
114  }
115  socket_.close(ec);
116  if (ec) {
118  "ERROR closing UDP socket: " << ec);
119  }
120  }
122 }
123 
124 bool UdpServer::Initialize(const std::string &ipaddress, unsigned short port) {
125  boost::system::error_code error;
126  boost::asio::ip::address ip = AddressFromString(ipaddress, &error);
127  if (!error) {
128  udp::endpoint local_endpoint = udp::endpoint(ip, port);
129  return Initialize(local_endpoint);
130  } else {
131  UDP_SERVER_LOG_ERROR(this, UDP_DIR_NA, "IP address conversion: "
132  << ipaddress << ": " << error);
133  return false;
134  }
135 }
136 
137 bool UdpServer::Initialize(unsigned short port) {
138  udp::endpoint local_endpoint = udp::endpoint(udp::v4(), port);
139  return Initialize(local_endpoint);
140 }
141 
142 bool UdpServer::Initialize(udp::endpoint local_endpoint) {
143  if (GetServerState() != Uninitialized) {
145  "Initialize UDP server in WRONG state: " << state_);
146  return false;
147  }
148  boost::system::error_code error;
149  socket_.open(udp::v4(), error);
150  if (error) {
151  UDP_SERVER_LOG_ERROR(this, UDP_DIR_NA, "UDP socket open FAILED: " <<
152  error.message());
154  return false;
155  }
156  socket_.bind(local_endpoint, error);
157  if (error) {
158  boost::system::error_code ec;
159  UDP_SERVER_LOG_ERROR(this, UDP_DIR_NA, "UDP socket bind FAILED: "
160  << error.message() << ":" << socket_.local_endpoint(ec));
162  socket_.close(ec);
163  return false;
164  }
165  SetName(local_endpoint);
166  state_ = OK;
167  return true;
168 }
169 
170 mutable_buffer UdpServer::AllocateBuffer(std::size_t s) {
171  uint8_t *p = new uint8_t[s];
172  {
173  std::scoped_lock lock(pbuf_guard_);
174  pbuf_.push_back(p);
175  }
176  return mutable_buffer(p, s);
177 }
178 
179 mutable_buffer UdpServer::AllocateBuffer() {
181 }
182 
183 void UdpServer::DeallocateBuffer(const const_buffer &buffer) {
184  const uint8_t *p = buffer_cast<const uint8_t *>(buffer);
185  {
186  std::scoped_lock lock(pbuf_guard_);
187  std::vector<uint8_t *>::iterator f = std::find(pbuf_.begin(),
188  pbuf_.end(), p);
189  if (f != pbuf_.end())
190  pbuf_.erase(f);
191  }
192  delete[] p;
193 }
194 
195 void UdpServer::StartSend(udp::endpoint ep, std::size_t bytes_to_send,
196  const_buffer buffer) {
197  if (state_ == OK) {
198  socket_.async_send_to(boost::asio::buffer(buffer), ep,
199  boost::bind(&UdpServer::HandleSendInternal, UdpServerPtr(this),
200  buffer, ep,
201  boost::asio::placeholders::bytes_transferred,
202  boost::asio::placeholders::error));
203  } else {
206  "StartSend UDP server in WRONG state: " << state_);
207  DeallocateBuffer(buffer);
208  }
209 }
210 
211 void UdpServer::HandleSendInternal(const const_buffer send_buffer,
212  udp::endpoint remote_endpoint, std::size_t bytes_transferred,
213  const boost::system::error_code& error) {
214  std::scoped_lock lock(state_guard_);
215  if (state_ != OK) {
218  "Send UDP server in WRONG state: " << state_);
219  return;
220  }
221  if (error) {
224  "Send to " << remote_endpoint << " FAILED due to error: " <<
225  error.value() << " : " << error.message());
226  DeallocateBuffer(send_buffer);
227  return;
228  }
229  // Update write statistics.
231  stats_.write_bytes += bytes_transferred;
232  // Call the handler
233  HandleSend(send_buffer, remote_endpoint, bytes_transferred, error);
234 }
235 
237  if (state_ == OK) {
238  mutable_buffer b(AllocateBuffer());
239  const_buffer buffer(buffer_cast<const uint8_t*>(b), buffer_size(b));
240  socket_.async_receive_from(mutable_buffers_1(b),
242  UdpServerPtr(this), buffer,
243  boost::asio::placeholders::bytes_transferred,
244  boost::asio::placeholders::error));
245  } else {
248  "StartReceive UDP server in WRONG state: " << state_);
249  }
250 }
251 
252 void UdpServer::HandleReceiveInternal(const_buffer recv_buffer,
253  std::size_t bytes_transferred, const boost::system::error_code& error) {
254  std::scoped_lock lock(state_guard_);
255  if (state_ != OK) {
258  "Receive UDP server in WRONG state: " << state_);
259  return;
260  }
261  if (error) {
264  "Read FAILED due to error: " << error.value() << " : " <<
265  error.message());
266  DeallocateBuffer(recv_buffer);
267  } else {
268  // Update read statistics.
269  stats_.read_calls++;
270  stats_.read_bytes += bytes_transferred;
271  // Call the handler
272  HandleReceive(recv_buffer, remote_endpoint_, bytes_transferred, error);
273  }
274  StartReceive();
275 }
276 
277 void UdpServer::HandleReceive(const const_buffer &recv_buffer,
278  udp::endpoint remote_endpoint, std::size_t bytes_transferred,
279  const boost::system::error_code& error) {
280  const_buffer rdbuf(buffer_cast<const uint8_t *>(recv_buffer),
281  bytes_transferred);
282  Reader *task = new Reader(UdpServerPtr(this), remote_endpoint,
283  rdbuf);
284  // Starting a new task for the session
286  scheduler->Enqueue(task);
287 }
288 
289 void UdpServer::OnRead(const const_buffer &recv_buffer,
290  const udp::endpoint &remote_endpoint) {
291  UDP_SERVER_LOG_ERROR(this, UDP_DIR_IN, "Receive UDP: " <<
292  "Default implementation of OnRead does NOT process received message");
293 }
294 
295 void UdpServer::HandleSend(boost::asio::const_buffer send_buffer,
296  udp::endpoint remote_endpoint, std::size_t bytes_transferred,
297  const boost::system::error_code& error) {
298  DeallocateBuffer(send_buffer);
299 }
300 
301 udp::endpoint UdpServer::GetLocalEndpoint(boost::system::error_code *error)
302  const {
303  return socket_.local_endpoint(*error);
304 }
305 
307  boost::system::error_code error;
308  udp::endpoint ep = GetLocalEndpoint(&error);
309  if (error.value())
310  return "";
311  return ep.address().to_string();
312 }
313 
315  boost::system::error_code error;
316  udp::endpoint ep = GetLocalEndpoint(&error);
317  if (error.value())
318  return -1;
319  return ep.port();
320 }
321 
322 void UdpServer::GetRxSocketStats(SocketIOStats *socket_stats) const {
323  stats_.GetRxStats(socket_stats);
324 }
325 
326 void UdpServer::GetTxSocketStats(SocketIOStats *socket_stats) const {
327  stats_.GetTxStats(socket_stats);
328 }
329 
330 //
331 // UdpServerManager class routines
332 //
334 
336  impl_.AddServer(server);
337 }
338 
340  impl_.DeleteServer(server);
341 }
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
static void AddServer(ServerType *server)
static void DeleteServer(ServerType *server)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
int GetTaskId(const std::string &name)
Definition: task.cc:861
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
static const int kTaskInstanceAny
Specifies value for wildcard (any or *) task data ID.
Definition: task.h:104
static ServerManager< UdpServer, UdpServerPtr > impl_
Definition: udp_server.h:158
static void DeleteServer(UdpServer *server)
Definition: udp_server.cc:339
static void AddServer(UdpServer *server)
Definition: udp_server.cc:335
Reader(UdpServerPtr server, const udp::endpoint &remote_endpoint, const const_buffer &buffer)
Definition: udp_server.cc:25
UdpServerPtr server_
Definition: udp_server.cc:45
udp::endpoint remote_endpoint_
Definition: udp_server.cc:46
const_buffer buffer_
Definition: udp_server.cc:47
std::string Description() const
Gives a description of the task.
Definition: udp_server.cc:42
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
Definition: udp_server.cc:34
void HandleReceiveInternal(boost::asio::const_buffer recv_buffer, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: udp_server.cc:252
std::string name_
Definition: udp_server.h:128
std::vector< uint8_t * > pbuf_
Definition: udp_server.h:132
virtual void Shutdown()
Definition: udp_server.cc:99
UdpServer(EventManager *evm, int buffer_size=kDefaultBufferSize)
Definition: udp_server.cc:63
virtual bool Initialize(unsigned short port)
Definition: udp_server.cc:137
std::mutex pbuf_guard_
Definition: udp_server.h:131
virtual int reader_task_instance(const boost::asio::ip::udp::endpoint &remote_endpoint) const
Definition: udp_server.cc:76
@ SocketOpenFailed
Definition: udp_server.h:29
@ SocketBindFailed
Definition: udp_server.h:30
@ Uninitialized
Definition: udp_server.h:28
boost::asio::ip::udp::endpoint remote_endpoint_
Definition: udp_server.h:129
boost::asio::ip::udp::socket socket_
Definition: udp_server.h:124
void StartReceive()
Definition: udp_server.cc:236
io::SocketStats stats_
Definition: udp_server.h:134
boost::asio::ip::udp::endpoint GetLocalEndpoint(boost::system::error_code *error) const
Definition: udp_server.cc:301
virtual void OnRead(const boost::asio::const_buffer &recv_buffer, const boost::asio::ip::udp::endpoint &remote_endpoint)
Definition: udp_server.cc:289
int GetLocalEndpointPort() const
Definition: udp_server.cc:314
boost::asio::mutable_buffer AllocateBuffer()
Definition: udp_server.cc:179
std::mutex state_guard_
Definition: udp_server.h:130
void GetTxSocketStats(SocketIOStats *socket_stats) const
Definition: udp_server.cc:326
int buffer_size_
Definition: udp_server.h:125
virtual void HandleReceive(const boost::asio::const_buffer &recv_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: udp_server.cc:277
static int reader_task_id_
Definition: udp_server.h:123
void StartSend(boost::asio::ip::udp::endpoint ep, std::size_t bytes_to_send, boost::asio::const_buffer buffer)
Definition: udp_server.cc:195
std::atomic< int > refcount_
Definition: udp_server.h:133
ServerState GetServerState() const
Definition: udp_server.h:51
void HandleSendInternal(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: udp_server.cc:211
void GetRxSocketStats(SocketIOStats *socket_stats) const
Definition: udp_server.cc:322
void SetName(boost::asio::ip::udp::endpoint ep)
Definition: udp_server.cc:80
ServerState state_
Definition: udp_server.h:126
std::string GetLocalEndpointAddress() const
Definition: udp_server.cc:306
virtual ~UdpServer()
Definition: udp_server.cc:87
virtual void HandleSend(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
Definition: udp_server.cc:295
void DeallocateBuffer(const boost::asio::const_buffer &buffer)
Definition: udp_server.cc:183
static EventManager evm
#define UDP_SERVER_LOG_ERROR(server, dir, arg)
Definition: io_log.h:160
#define UDP_DIR_NA
Definition: io_log.h:55
#define UDP_DIR_IN
Definition: io_log.h:54
#define UDP_DIR_OUT
Definition: io_log.h:53
std::atomic< uint64_t > write_errors
Definition: io_utils.h:25
std::atomic< uint64_t > write_calls
Definition: io_utils.h:23
std::atomic< uint64_t > read_bytes
Definition: io_utils.h:21
void GetRxStats(SocketIOStats *socket_stats) const
Definition: io_utils.cc:28
std::atomic< uint64_t > read_calls
Definition: io_utils.h:20
std::atomic< uint64_t > write_bytes
Definition: io_utils.h:24
void GetTxStats(SocketIOStats *socket_stats) const
Definition: io_utils.cc:47
std::atomic< uint64_t > read_errors
Definition: io_utils.h:22
struct task_ task
boost::intrusive_ptr< UdpServer > UdpServerPtr
Definition: udp_server.h:139