7 #include <boost/bind/bind.hpp>
14 using boost::asio::buffer_cast;
15 using boost::asio::mutable_buffer;
16 using boost::asio::mutable_buffers_1;
17 using boost::asio::const_buffer;
18 using boost::asio::ip::udp;
19 using namespace boost::placeholders;
26 const const_buffer &buffer)
27 :
Task(server->reader_task_id(),
28 server->reader_task_instance(remote_endpoint)),
30 remote_endpoint_(remote_endpoint),
35 std::scoped_lock lock(server_->state_guard_);
36 if (server_->state_ == OK) {
37 server_->OnRead(buffer_, remote_endpoint_);
38 server_->DeallocateBuffer(buffer_);
42 std::string
Description()
const {
return "UdpServer::Reader"; }
52 buffer_size_(buffer_size),
53 state_(Uninitialized),
64 socket_(*(
evm->io_service())),
65 buffer_size_(buffer_size),
66 state_(Uninitialized),
82 boost::system::error_code ec;
83 s <<
"Udpsocket@" << ep;
95 assert(
pbuf_.empty());
103 while (!
pbuf_.empty()) {
104 delete[]
pbuf_.back();
109 boost::system::error_code ec;
110 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
113 "ERROR shutdown UDP socket: " << ec);
118 "ERROR closing UDP socket: " << ec);
125 boost::system::error_code error;
128 udp::endpoint local_endpoint = udp::endpoint(ip, port);
132 << ipaddress <<
": " << error);
138 udp::endpoint local_endpoint = udp::endpoint(udp::v4(), port);
145 "Initialize UDP server in WRONG state: " <<
state_);
148 boost::system::error_code error;
149 socket_.open(udp::v4(), error);
156 socket_.bind(local_endpoint, error);
158 boost::system::error_code ec;
160 << error.message() <<
":" <<
socket_.local_endpoint(ec));
171 uint8_t *p =
new uint8_t[s];
176 return mutable_buffer(p, s);
184 const uint8_t *p = buffer_cast<const uint8_t *>(buffer);
187 std::vector<uint8_t *>::iterator f = std::find(
pbuf_.begin(),
189 if (f !=
pbuf_.end())
196 const_buffer buffer) {
198 socket_.async_send_to(boost::asio::buffer(buffer), ep,
201 boost::asio::placeholders::bytes_transferred,
202 boost::asio::placeholders::error));
206 "StartSend UDP server in WRONG state: " <<
state_);
212 udp::endpoint remote_endpoint, std::size_t bytes_transferred,
213 const boost::system::error_code& error) {
218 "Send UDP server in WRONG state: " <<
state_);
224 "Send to " << remote_endpoint <<
" FAILED due to error: " <<
225 error.value() <<
" : " << error.message());
233 HandleSend(send_buffer, remote_endpoint, bytes_transferred, error);
239 const_buffer buffer(buffer_cast<const uint8_t*>(b), buffer_size(b));
240 socket_.async_receive_from(mutable_buffers_1(b),
243 boost::asio::placeholders::bytes_transferred,
244 boost::asio::placeholders::error));
248 "StartReceive UDP server in WRONG state: " <<
state_);
253 std::size_t bytes_transferred,
const boost::system::error_code& error) {
258 "Receive UDP server in WRONG state: " <<
state_);
264 "Read FAILED due to error: " << error.value() <<
" : " <<
278 udp::endpoint remote_endpoint, std::size_t bytes_transferred,
279 const boost::system::error_code& error) {
280 const_buffer rdbuf(buffer_cast<const uint8_t *>(recv_buffer),
290 const udp::endpoint &remote_endpoint) {
292 "Default implementation of OnRead does NOT process received message");
296 udp::endpoint remote_endpoint, std::size_t bytes_transferred,
297 const boost::system::error_code& error) {
303 return socket_.local_endpoint(*error);
307 boost::system::error_code error;
311 return ep.address().to_string();
315 boost::system::error_code error;
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
static void AddServer(ServerType *server)
static void DeleteServer(ServerType *server)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
int GetTaskId(const std::string &name)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a class to describe a computational task within OpenSDN control plane applications....
static const int kTaskInstanceAny
Specifies value for wildcard (any or *) task data ID.
static ServerManager< UdpServer, UdpServerPtr > impl_
static void DeleteServer(UdpServer *server)
static void AddServer(UdpServer *server)
Reader(UdpServerPtr server, const udp::endpoint &remote_endpoint, const const_buffer &buffer)
udp::endpoint remote_endpoint_
std::string Description() const
Gives a description of the task.
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
void HandleReceiveInternal(boost::asio::const_buffer recv_buffer, std::size_t bytes_transferred, const boost::system::error_code &error)
std::vector< uint8_t * > pbuf_
UdpServer(EventManager *evm, int buffer_size=kDefaultBufferSize)
virtual bool Initialize(unsigned short port)
virtual int reader_task_instance(const boost::asio::ip::udp::endpoint &remote_endpoint) const
boost::asio::ip::udp::endpoint remote_endpoint_
boost::asio::ip::udp::socket socket_
boost::asio::ip::udp::endpoint GetLocalEndpoint(boost::system::error_code *error) const
virtual void OnRead(const boost::asio::const_buffer &recv_buffer, const boost::asio::ip::udp::endpoint &remote_endpoint)
int GetLocalEndpointPort() const
boost::asio::mutable_buffer AllocateBuffer()
void GetTxSocketStats(SocketIOStats *socket_stats) const
virtual void HandleReceive(const boost::asio::const_buffer &recv_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
static int reader_task_id_
void StartSend(boost::asio::ip::udp::endpoint ep, std::size_t bytes_to_send, boost::asio::const_buffer buffer)
std::atomic< int > refcount_
ServerState GetServerState() const
void HandleSendInternal(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
void GetRxSocketStats(SocketIOStats *socket_stats) const
void SetName(boost::asio::ip::udp::endpoint ep)
std::string GetLocalEndpointAddress() const
virtual void HandleSend(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
void DeallocateBuffer(const boost::asio::const_buffer &buffer)
#define UDP_SERVER_LOG_ERROR(server, dir, arg)
std::atomic< uint64_t > write_errors
std::atomic< uint64_t > write_calls
std::atomic< uint64_t > read_bytes
void GetRxStats(SocketIOStats *socket_stats) const
std::atomic< uint64_t > read_calls
std::atomic< uint64_t > write_bytes
void GetTxStats(SocketIOStats *socket_stats) const
std::atomic< uint64_t > read_errors
boost::intrusive_ptr< UdpServer > UdpServerPtr