7 #include <boost/bind.hpp>
14 using boost::asio::buffer_cast;
15 using boost::asio::mutable_buffer;
16 using boost::asio::mutable_buffers_1;
17 using boost::asio::const_buffer;
18 using boost::asio::ip::udp;
25 const const_buffer &buffer)
34 tbb::mutex::scoped_lock lock(
server_->state_guard_);
41 std::string
Description()
const {
return "UdpServer::Reader"; }
51 buffer_size_(buffer_size),
52 state_(Uninitialized),
63 socket_(*(evm->io_service())),
64 buffer_size_(buffer_size),
65 state_(Uninitialized),
81 boost::system::error_code ec;
82 s <<
"Udpsocket@" << ep;
94 assert(
pbuf_.empty());
102 while (!
pbuf_.empty()) {
103 delete[]
pbuf_.back();
108 boost::system::error_code ec;
109 socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
112 "ERROR shutdown UDP socket: " << ec);
117 "ERROR closing UDP socket: " << ec);
124 boost::system::error_code error;
127 udp::endpoint local_endpoint = udp::endpoint(ip, port);
131 << ipaddress <<
": " << error);
137 udp::endpoint local_endpoint = udp::endpoint(udp::v4(), port);
144 "Initialize UDP server in WRONG state: " <<
state_);
147 boost::system::error_code error;
148 socket_.open(udp::v4(), error);
155 socket_.bind(local_endpoint, error);
157 boost::system::error_code ec;
159 << error.message() <<
":" <<
socket_.local_endpoint(ec));
170 uint8_t *p =
new uint8_t[s];
175 return mutable_buffer(p, s);
183 const uint8_t *p = buffer_cast<
const uint8_t *>(buffer);
186 std::vector<uint8_t *>::iterator f = std::find(
pbuf_.begin(),
188 if (f !=
pbuf_.end())
195 const_buffer buffer) {
197 socket_.async_send_to(boost::asio::buffer(buffer), ep,
200 boost::asio::placeholders::bytes_transferred,
201 boost::asio::placeholders::error));
205 "StartSend UDP server in WRONG state: " <<
state_);
211 udp::endpoint remote_endpoint, std::size_t bytes_transferred,
212 const boost::system::error_code& error) {
217 "Send UDP server in WRONG state: " <<
state_);
223 "Send to " << remote_endpoint <<
" FAILED due to error: " <<
224 error.value() <<
" : " << error.message());
232 HandleSend(send_buffer, remote_endpoint, bytes_transferred, error);
238 const_buffer buffer(buffer_cast<const uint8_t*>(b), buffer_size(b));
239 socket_.async_receive_from(mutable_buffers_1(b),
242 boost::asio::placeholders::bytes_transferred,
243 boost::asio::placeholders::error));
247 "StartReceive UDP server in WRONG state: " <<
state_);
252 std::size_t bytes_transferred,
const boost::system::error_code& error) {
257 "Receive UDP server in WRONG state: " <<
state_);
263 "Read FAILED due to error: " << error.value() <<
" : " <<
277 udp::endpoint remote_endpoint, std::size_t bytes_transferred,
278 const boost::system::error_code& error) {
279 const_buffer rdbuf(buffer_cast<const uint8_t *>(recv_buffer),
289 const udp::endpoint &remote_endpoint) {
291 "Default implementation of OnRead does NOT process received message");
295 udp::endpoint remote_endpoint, std::size_t bytes_transferred,
296 const boost::system::error_code& error) {
302 return socket_.local_endpoint(*error);
306 boost::system::error_code error;
310 return ep.address().to_string();
314 boost::system::error_code error;
boost::intrusive_ptr< UdpServer > UdpServerPtr
boost::asio::ip::udp::endpoint remote_endpoint_
virtual int reader_task_instance(const boost::asio::ip::udp::endpoint &remote_endpoint) const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void DeallocateBuffer(const boost::asio::const_buffer &buffer)
static void DeleteServer(ServerType *server)
static const int kTaskInstanceAny
tbb::atomic< uint64_t > write_errors
tbb::atomic< uint64_t > write_bytes
boost::asio::mutable_buffer AllocateBuffer()
void GetTxStats(SocketIOStats *socket_stats) const
static ServerManager< UdpServer, UdpServerPtr > impl_
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
static void AddServer(ServerType *server)
void HandleReceiveInternal(boost::asio::const_buffer recv_buffer, std::size_t bytes_transferred, const boost::system::error_code &error)
virtual void HandleSend(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
int GetTaskId(const std::string &name)
Reader(UdpServerPtr server, const udp::endpoint &remote_endpoint, const const_buffer &buffer)
void GetRxSocketStats(SocketIOStats *socket_stats) const
tbb::atomic< int > refcount_
void GetTxSocketStats(SocketIOStats *socket_stats) const
tbb::atomic< uint64_t > write_calls
tbb::atomic< uint64_t > read_bytes
boost::asio::ip::udp::socket socket_
tbb::atomic< uint64_t > read_calls
std::string Description() const
void SetName(boost::asio::ip::udp::endpoint ep)
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
virtual void OnRead(const boost::asio::const_buffer &recv_buffer, const boost::asio::ip::udp::endpoint &remote_endpoint)
void GetRxStats(SocketIOStats *socket_stats) const
std::vector< uint8_t * > pbuf_
void StartSend(boost::asio::ip::udp::endpoint ep, std::size_t bytes_to_send, boost::asio::const_buffer buffer)
virtual int reader_task_id() const
virtual void HandleReceive(const boost::asio::const_buffer &recv_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
#define UDP_SERVER_LOG_ERROR(server, dir, arg)
tbb::atomic< uint64_t > read_errors
boost::asio::ip::udp::endpoint GetLocalEndpoint(boost::system::error_code *error) const
UdpServer(EventManager *evm, int buffer_size=kDefaultBufferSize)
udp::endpoint remote_endpoint_
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
void HandleSendInternal(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
static int reader_task_id_
static void DeleteServer(UdpServer *server)
std::string GetLocalEndpointAddress() const
static void AddServer(UdpServer *server)
int GetLocalEndpointPort() const
ServerState GetServerState() const
Task is a wrapper over tbb::task to support policies.
virtual bool Initialize(unsigned short port)