7 #include <boost/bind.hpp> 
   14 using boost::asio::buffer_cast;
 
   15 using boost::asio::mutable_buffer;
 
   16 using boost::asio::mutable_buffers_1;
 
   17 using boost::asio::const_buffer;
 
   18 using boost::asio::ip::udp;
 
   25         const const_buffer &buffer)
 
   34         tbb::mutex::scoped_lock lock(
server_->state_guard_);
 
   41     std::string 
Description()
 const { 
return "UdpServer::Reader"; }
 
   51     buffer_size_(buffer_size),
 
   52     state_(Uninitialized),
 
   63     socket_(*(
evm->io_service())),
 
   64     buffer_size_(buffer_size),
 
   65     state_(Uninitialized),
 
   81     boost::system::error_code ec;
 
   82     s << 
"Udpsocket@" << ep;
 
   94         assert(
pbuf_.empty());
 
  102         while (!
pbuf_.empty()) {
 
  103             delete[] 
pbuf_.back();
 
  108         boost::system::error_code ec;
 
  109         socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
 
  112                 "ERROR shutdown UDP socket: " << ec);
 
  117                 "ERROR closing UDP socket: " << ec);
 
  124     boost::system::error_code error;
 
  127         udp::endpoint local_endpoint = udp::endpoint(ip, port);
 
  131             << ipaddress << 
": " << error);
 
  137     udp::endpoint local_endpoint = udp::endpoint(udp::v4(), port);
 
  144             "Initialize UDP server in WRONG state: " << 
state_);
 
  147     boost::system::error_code error;
 
  148     socket_.open(udp::v4(), error);
 
  155     socket_.bind(local_endpoint, error);
 
  157         boost::system::error_code ec;
 
  159             << error.message() << 
":" << 
socket_.local_endpoint(ec));
 
  170     uint8_t *p = 
new uint8_t[s];
 
  175     return mutable_buffer(p, s);
 
  183     const uint8_t *p = buffer_cast<const uint8_t *>(buffer);
 
  186         std::vector<uint8_t *>::iterator f = std::find(
pbuf_.begin(),
 
  188         if (f != 
pbuf_.end())
 
  195         const_buffer buffer) {
 
  197         socket_.async_send_to(boost::asio::buffer(buffer), ep,
 
  200             boost::asio::placeholders::bytes_transferred,
 
  201             boost::asio::placeholders::error));
 
  205             "StartSend UDP server in WRONG state: " << 
state_);
 
  211         udp::endpoint remote_endpoint, std::size_t bytes_transferred,
 
  212         const boost::system::error_code& error) {
 
  217             "Send UDP server in WRONG state: " << 
state_);
 
  223             "Send to " << remote_endpoint << 
" FAILED due to error: " <<
 
  224             error.value() << 
" : " << error.message());
 
  232     HandleSend(send_buffer, remote_endpoint, bytes_transferred, error);
 
  238         const_buffer buffer(buffer_cast<const uint8_t*>(b), buffer_size(b));
 
  239         socket_.async_receive_from(mutable_buffers_1(b),
 
  242             boost::asio::placeholders::bytes_transferred,
 
  243             boost::asio::placeholders::error));
 
  247             "StartReceive UDP server in WRONG state: " << 
state_);
 
  252     std::size_t bytes_transferred, 
const boost::system::error_code& error) {
 
  257             "Receive UDP server in WRONG state: " << 
state_);
 
  263             "Read FAILED due to error: " << error.value() << 
" : " <<
 
  277     udp::endpoint remote_endpoint, std::size_t bytes_transferred,
 
  278     const boost::system::error_code& error) {
 
  279     const_buffer rdbuf(buffer_cast<const uint8_t *>(recv_buffer),
 
  289     const udp::endpoint &remote_endpoint) {
 
  291         "Default implementation of OnRead does NOT process received message");
 
  295     udp::endpoint remote_endpoint, std::size_t bytes_transferred,
 
  296     const boost::system::error_code& error) {
 
  302     return socket_.local_endpoint(*error);
 
  306     boost::system::error_code error;
 
  310     return ep.address().to_string();
 
  314     boost::system::error_code error;
 
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
 
static void AddServer(ServerType *server)
 
static void DeleteServer(ServerType *server)
 
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
 
int GetTaskId(const std::string &name)
 
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
 
static TaskScheduler * GetInstance()
 
Task is a wrapper over tbb::task to support policies.
 
static const int kTaskInstanceAny
 
static ServerManager< UdpServer, UdpServerPtr > impl_
 
static void DeleteServer(UdpServer *server)
 
static void AddServer(UdpServer *server)
 
Reader(UdpServerPtr server, const udp::endpoint &remote_endpoint, const const_buffer &buffer)
 
udp::endpoint remote_endpoint_
 
std::string Description() const
 
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
 
void HandleReceiveInternal(boost::asio::const_buffer recv_buffer, std::size_t bytes_transferred, const boost::system::error_code &error)
 
std::vector< uint8_t * > pbuf_
 
UdpServer(EventManager *evm, int buffer_size=kDefaultBufferSize)
 
virtual bool Initialize(unsigned short port)
 
virtual int reader_task_instance(const boost::asio::ip::udp::endpoint &remote_endpoint) const
 
boost::asio::ip::udp::endpoint remote_endpoint_
 
boost::asio::ip::udp::socket socket_
 
tbb::atomic< int > refcount_
 
boost::asio::ip::udp::endpoint GetLocalEndpoint(boost::system::error_code *error) const
 
virtual void OnRead(const boost::asio::const_buffer &recv_buffer, const boost::asio::ip::udp::endpoint &remote_endpoint)
 
int GetLocalEndpointPort() const
 
boost::asio::mutable_buffer AllocateBuffer()
 
void GetTxSocketStats(SocketIOStats *socket_stats) const
 
virtual int reader_task_id() const
 
virtual void HandleReceive(const boost::asio::const_buffer &recv_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
 
static int reader_task_id_
 
void StartSend(boost::asio::ip::udp::endpoint ep, std::size_t bytes_to_send, boost::asio::const_buffer buffer)
 
ServerState GetServerState() const
 
void HandleSendInternal(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
 
void GetRxSocketStats(SocketIOStats *socket_stats) const
 
void SetName(boost::asio::ip::udp::endpoint ep)
 
std::string GetLocalEndpointAddress() const
 
virtual void HandleSend(boost::asio::const_buffer send_buffer, boost::asio::ip::udp::endpoint remote_endpoint, std::size_t bytes_transferred, const boost::system::error_code &error)
 
void DeallocateBuffer(const boost::asio::const_buffer &buffer)
 
#define UDP_SERVER_LOG_ERROR(server, dir, arg)
 
tbb::atomic< uint64_t > read_errors
 
tbb::atomic< uint64_t > read_calls
 
void GetRxStats(SocketIOStats *socket_stats) const
 
tbb::atomic< uint64_t > write_calls
 
tbb::atomic< uint64_t > write_errors
 
tbb::atomic< uint64_t > write_bytes
 
tbb::atomic< uint64_t > read_bytes
 
void GetTxStats(SocketIOStats *socket_stats) const
 
boost::intrusive_ptr< UdpServer > UdpServerPtr