OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
usock_server.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 /*
6  * The primary method implemented here is Send(), to transmit a
7  * message over the Unix socket. It uses boost::asio::async_write to
8  * send one message at a time over the socket, that is transmitted
9  * asynchronously. The user can repeatedly call Send(). All those
10  * buffers are tail-queued. Upon write_complete callback, the next
11  * message from the front of the queue is sent.
12  */
13 #include "io/usock_server.h"
14 
15 using boost::asio::buffer_cast;
16 using boost::asio::buffer;
17 using boost::asio::mutable_buffer;
18 
19 UnixDomainSocketSession::~UnixDomainSocketSession() {
20  if (observer_) {
21  observer_(this, CLOSE);
22  }
23 
24  /* Free up any remaining buffers in the queue. */
25  for (BufferQueue::iterator iter = buffer_queue_.begin();
26  iter != buffer_queue_.end(); ++iter) {
27  DeleteBuffer(*iter);
28  }
29  buffer_queue_.clear();
30 }
31 
32 void UnixDomainSocketSession::Start() {
33  if (observer_) {
34  observer_(this, READY);
35  }
36 
37  socket_.async_read_some(boost::asio::buffer(data_),
38  boost::bind(&UnixDomainSocketSession::
39  HandleRead, shared_from_this(),
40  boost::asio::placeholders::error,
41  boost::asio::placeholders::
42  bytes_transferred));
43 }
44 
45 void UnixDomainSocketSession::Send(const uint8_t * data, int data_len) {
46  if (!data || !data_len) {
47  return;
48  }
49  bool write_now = buffer_queue_.empty();
50  AppendBuffer(data, data_len);
51  if (write_now) {
52  WriteToSocket();
53  }
54 }
55 
56 void UnixDomainSocketSession::WriteToSocket() {
57  if (buffer_queue_.empty()) {
58  return;
59  }
60 
61  boost::asio::mutable_buffer head = buffer_queue_.front();
62  boost::asio::async_write(socket_,
63  buffer(buffer_cast <const uint8_t *>(head),
64  boost::asio::buffer_size(head)),
65  boost::bind(&UnixDomainSocketSession::
66  HandleWrite, shared_from_this(),
67  boost::asio::placeholders::error));
68 }
69 
70 void UnixDomainSocketSession::AppendBuffer(const uint8_t *src, int bytes) {
71  u_int8_t *data = new u_int8_t[bytes];
72  memcpy(data, src, bytes);
73  boost::asio::mutable_buffer buffer =
74  boost::asio::mutable_buffer(data, bytes);
75  buffer_queue_.push_back(buffer);
76 }
77 
78 void UnixDomainSocketSession::DeleteBuffer(boost::asio::mutable_buffer buffer) {
79  const uint8_t *data = buffer_cast <const uint8_t *>(buffer);
80  delete []data;
81  return;
82 }
83 
84 void UnixDomainSocketSession::HandleRead(const boost::system::error_code &error,
85  size_t bytes_transferred) {
86  if (error) {
87  return;
88  }
89  if (observer_) {
90  observer_(this, READY);
91  }
92 }
93 
94 void UnixDomainSocketSession::HandleWrite(
95  const boost::system::error_code &error) {
96  /*
97  * async_write() is atomic in that it returns success once the entire message
98  * is sent. If there is an error, it's okay to return from here so that the
99  * session gets closed.
100  */
101  if (error) {
102  return;
103  }
104 
105  /*
106  * We are done with the buffer at the head of the queue. Delete it.
107  */
108  DeleteBuffer(buffer_queue_.front());
109  buffer_queue_.pop_front();
110 
111  /*
112  * Write the next message, if there.
113  */
114  WriteToSocket();
115 
116  /*
117  * Engage on the socket to keep it alive.
118  */
119  socket_.async_read_some(boost::asio::buffer(data_),
120  boost::bind(&UnixDomainSocketSession::
121  HandleRead, shared_from_this(),
122  boost::asio::placeholders::error,
123  boost::asio::placeholders::
124  bytes_transferred));
125 }
126 
127 UnixDomainSocketServer::UnixDomainSocketServer(
128  boost::asio::io_context *io, const std::string &file)
129  : io_service_(io),
130  acceptor_(*io, boost::asio::local::stream_protocol::endpoint(file)),
131  session_idspace_(0) {
132  SessionPtr new_session(new UnixDomainSocketSession(io_service_));
133  acceptor_.async_accept(new_session->socket(),
134  boost::bind(&UnixDomainSocketServer::
135  HandleAccept, this, new_session,
136  boost::asio::placeholders::error));
137 }
138 
139 void
140 UnixDomainSocketServer::HandleAccept(SessionPtr session,
141  const boost::system::error_code &error) {
142  UnixDomainSocketSession *socket_session = session.get();
143 
144  if (error) {
145  if (observer_) {
146  observer_(this, socket_session, DELETE_SESSION);
147  }
148  return;
149  }
150 
151  socket_session->set_session_id(++session_idspace_);
152  if (observer_) {
153  observer_(this, socket_session, NEW_SESSION);
154  session->Start();
155  }
156 
157  SessionPtr new_session(new UnixDomainSocketSession(io_service_));
158  acceptor_.async_accept(new_session->socket(),
159  boost::bind(&UnixDomainSocketServer::
160  HandleAccept, this, new_session,
161  boost::asio::placeholders::error));
162 }