15 using boost::asio::buffer_cast;
16 using boost::asio::buffer;
17 using boost::asio::mutable_buffer;
19 UnixDomainSocketSession::~UnixDomainSocketSession() {
21 observer_(
this,
CLOSE);
25 for (BufferQueue::iterator iter = buffer_queue_.begin();
26 iter != buffer_queue_.end(); ++iter) {
29 buffer_queue_.clear();
32 void UnixDomainSocketSession::Start() {
34 observer_(
this,
READY);
37 socket_.async_read_some(boost::asio::buffer(data_),
38 boost::bind(&UnixDomainSocketSession::
39 HandleRead, shared_from_this(),
40 boost::asio::placeholders::error,
41 boost::asio::placeholders::
45 void UnixDomainSocketSession::Send(
const uint8_t * data,
int data_len) {
46 if (!data || !data_len) {
49 bool write_now = buffer_queue_.empty();
50 AppendBuffer(data, data_len);
56 void UnixDomainSocketSession::WriteToSocket() {
57 if (buffer_queue_.empty()) {
61 boost::asio::mutable_buffer head = buffer_queue_.front();
62 boost::asio::async_write(socket_,
63 buffer(buffer_cast <const uint8_t *>(head),
64 boost::asio::buffer_size(head)),
65 boost::bind(&UnixDomainSocketSession::
66 HandleWrite, shared_from_this(),
67 boost::asio::placeholders::error));
70 void UnixDomainSocketSession::AppendBuffer(
const uint8_t *src,
int bytes) {
71 u_int8_t *data =
new u_int8_t[bytes];
72 memcpy(data, src, bytes);
73 boost::asio::mutable_buffer buffer =
74 boost::asio::mutable_buffer(data, bytes);
75 buffer_queue_.push_back(buffer);
78 void UnixDomainSocketSession::DeleteBuffer(boost::asio::mutable_buffer buffer) {
79 const uint8_t *data = buffer_cast <
const uint8_t *>(buffer);
84 void UnixDomainSocketSession::HandleRead(
const boost::system::error_code &error,
85 size_t bytes_transferred) {
90 observer_(
this,
READY);
94 void UnixDomainSocketSession::HandleWrite(
95 const boost::system::error_code &error) {
108 DeleteBuffer(buffer_queue_.front());
109 buffer_queue_.pop_front();
119 socket_.async_read_some(boost::asio::buffer(data_),
120 boost::bind(&UnixDomainSocketSession::
121 HandleRead, shared_from_this(),
122 boost::asio::placeholders::error,
123 boost::asio::placeholders::
127 UnixDomainSocketServer::UnixDomainSocketServer(
128 boost::asio::io_context *io,
const std::string &file)
130 acceptor_(*io, boost::asio::local::stream_protocol::endpoint(file)),
131 session_idspace_(0) {
132 SessionPtr new_session(
new UnixDomainSocketSession(io_service_));
133 acceptor_.async_accept(new_session->socket(),
134 boost::bind(&UnixDomainSocketServer::
135 HandleAccept,
this, new_session,
136 boost::asio::placeholders::error));
140 UnixDomainSocketServer::HandleAccept(SessionPtr session,
141 const boost::system::error_code &error) {
142 UnixDomainSocketSession *socket_session = session.get();
146 observer_(
this, socket_session, DELETE_SESSION);
151 socket_session->set_session_id(++session_idspace_);
153 observer_(
this, socket_session, NEW_SESSION);
157 SessionPtr new_session(
new UnixDomainSocketSession(io_service_));
158 acceptor_.async_accept(new_session->socket(),
159 boost::bind(&UnixDomainSocketServer::
160 HandleAccept,
this, new_session,
161 boost::asio::placeholders::error));