OpenSDN source code
ksync_sock_uds.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "ksync_sock.h"
6 
7 #include <boost/asio.hpp>
8 #include <boost/bind/bind.hpp>
9 #include <errno.h>
10 
11 class AgentParam;
12 
13 using namespace boost::asio;
14 using namespace boost::placeholders;
15 
17 // KSyncSockUds routines
19 //Unix domain socket class for interacting with dpdk vrouter
20 
21 class KSyncSockUdsReadTask : public Task {
22 public:
24  Task(scheduler->GetTaskId("Ksync::KSyncSockUdsRead"), 0), queue_(queue) {
25  }
27  }
28 
29  bool Run() {
30  queue_->Run();
31  return true;
32  }
33  std::string Description() const { return "KSyncSockUdsRead"; }
34 private:
36 
37 };
38 
39 
40 
42 
43 KSyncSockUds::KSyncSockUds(boost::asio::io_context &ios) :
44  sock_(ios),
45  server_ep_(sockpath_),
46  rx_buff_(NULL),
47  rx_buff_q_(NULL),
48  remain_(0),
49  socket_(0),
50  connected_(false) {
51  boost::system::error_code ec;
54 retry:;
55  sock_.connect(server_ep_, ec);
56  if (ec) {
57  sleep(1);
58  goto retry;
59  }
60  socket_ = sock_.native_handle();
61  connected_ = true;
62  rx_buff_ = new char[10*kBufLen];
63  rx_buff_q_ = new char[10*kBufLen];
64 }
65 
68  char *ret_buff = new char[1024*kBufLen];
69  boost::system::error_code ec;
70 
71  // Read data from the socket and append it to the existing
72  // unprocessed data in the local buffer.
73  while (1) {
74  char *bufp = rx_buff_;
75  struct nlmsghdr *nlh = NULL;
76  struct nlmsghdr tnlh;
77  size_t offset = 0;
78  int ret_val;
79  size_t bytes_transferred = 0;
80  bytes_transferred = ret_val = recv(socket_, rx_buff_, 10*kBufLen, 0);
81  if (ret_val == 0) {
82  // connection reset by peer
83  // close socket and exit
84  sock_.close(ec);
85  LOG(INFO, " dpdk vrouter is down, exiting.. errno:" << errno);
86  exit(0);
87  }
88  if (ret_val < 0) {
89  if (errno != EAGAIN) {
90  sock_.close(ec);
91  connected_ = false;
92 retry:;
93  sock_.connect(server_ep_, ec);
94  if (ec) {
95  sleep(1);
96  goto retry;
97  }
98  socket_ = sock_.native_handle();
99  connected_ = true;
100  }
101  continue;
102  }
103 
104  if (remain_ != 0) {
105  if (remain_ < sizeof(struct nlmsghdr)) {
106  memcpy((char *)&tnlh, rx_buff_q_, remain_);
107  memcpy(((char *)&tnlh) + remain_, rx_buff_,
108  (sizeof(struct nlmsghdr) - remain_));
109  nlh = &tnlh;
110  } else {
111  nlh = (struct nlmsghdr *)rx_buff_q_;
112  }
113  if (remain_ > nlh->nlmsg_len)
114  assert(0);
115  memcpy(ret_buff, rx_buff_q_, remain_);
116  memcpy(ret_buff+remain_, rx_buff_, nlh->nlmsg_len - remain_);
117  bufp += (nlh->nlmsg_len - remain_);
118  ctxt->SetErrno(0);
119  ProcessDataInline(ret_buff);
120  offset = nlh->nlmsg_len - remain_;
121  }
122  while (offset < bytes_transferred) {
123  if ((bytes_transferred - offset) > (sizeof(struct nlmsghdr))) {
124  nlh = (struct nlmsghdr *)(rx_buff_ + offset);
125  if ((bytes_transferred - offset) > nlh->nlmsg_len) {
126  memcpy(ret_buff, rx_buff_ + offset, nlh->nlmsg_len);
127  ctxt->SetErrno(0);
128  ProcessDataInline(ret_buff);
129  offset += nlh->nlmsg_len;
130  } else {
131  break;
132  }
133  } else {
134  break;
135  }
136  }
137  memcpy(rx_buff_q_, rx_buff_ + offset, bytes_transferred - offset);
138  remain_ = bytes_transferred - offset;
139  }
140  return true;
141 }
142 
143 void KSyncSockUds::Init(io_service &ios, const std::string &cpu_pin_policy,
144  const std::string &sockpathvr) {
146  SetNetlinkFamilyId(10);
147  KSyncSock::Init(false, cpu_pin_policy);
148  sockpath_ = sockpathvr;
149 }
150 
151 uint32_t KSyncSockUds::GetSeqno(char *data) {
152  return GetNetlinkSeqno(data);
153 }
154 
155 bool KSyncSockUds::IsMoreData(char *data) {
156  return NetlinkMsgDone(data);
157 }
158 
159 bool KSyncSockUds::Decoder(char *data, AgentSandeshContext *context) {
160  KSyncSockNetlink::NetlinkDecoder(data, context);
161  return true;
162 }
163 
165  KSyncBulkSandeshContext *bulk_sandesh_context) {
166  // Get sandesh buffer and buffer-length
167  uint32_t buf_len = 0;
168  char *buf = NULL;
169  GetNetlinkPayload(data, &buf, &buf_len);
170  return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
171 }
172 
173 void KSyncSockUds::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
174  HandlerCb cb) {
175  if (connected_ == true)
176  SendTo(iovec, seq_no);
177 }
178 
179 size_t KSyncSockUds::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
180  size_t len = 0, ret;
181  struct msghdr msg;
182  struct iovec iov[max_bulk_msg_count_*2];
183  int i;
184 
185  memset(&msg, 0, sizeof(msg));
186  msg.msg_iov = iov;
187 
189  int offset = nl_client_->cl_buf_offset;
191 
192  KSyncBufferList::iterator it = iovec->begin();
193  iovec->insert(it, buffer((char *)nl_client_->cl_buf, offset));
194 
195  int count = iovec->size();
196  for(i = 0; i < count; i++) {
197  mutable_buffers_1 buf = iovec->at(i);
198  size_t buf_size = boost::asio::buffer_size(buf);
199  void* cbuf = boost::asio::buffer_cast<void*>(buf);
200  len += buf_size;
201  iov[i].iov_base = cbuf;
202  iov[i].iov_len = buf_size;
203  }
204 
205  msg.msg_iovlen = i;
206  ret = sendmsg(socket_, &msg, 0);
207  if (ret != len) {
208  LOG(ERROR, "sendmsg failure " << ret << "len " << len);
209  }
210  return len;
211 }
212 
213 bool KSyncSockUds::Validate(char *data) {
214  return true;
215 }
216 
217 void KSyncSockUds::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
218  static int started = 0;
219  if (!started) {
221  //Receive is handled in a separate Run() thread
222  KSyncSockUdsReadTask *task = new KSyncSockUdsReadTask(scheduler, this);
223  scheduler->Enqueue(task);
224  started = 1;
225  }
226 }
227 
228 void KSyncSockUds::Receive(mutable_buffers_1 buf) {
229  boost::system::error_code ec;
230  uint32_t bytes_read = 0;
231  const struct nlmsghdr *nlh = NULL;
232 
233  char *netlink_header(buffer_cast<char *>(buf));
234 
235  while (bytes_read < sizeof(struct nlmsghdr)) {
236  char *buffer = netlink_header + bytes_read;
237  bytes_read += recv(socket_, buffer, sizeof(struct nlmsghdr) - bytes_read, 0);
238  //Data read is lesser than netlink header
239  //continue reading
240  if (bytes_read == sizeof(struct nlmsghdr)) {
241  nlh = buffer_cast<struct nlmsghdr *>(buf);
242  }
243  }
244 
245  if (nlh->nlmsg_type == NLMSG_ERROR) {
246  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
247  << " len " << nlh->nlmsg_len);
248  assert(0);
249  }
250 
251  bytes_read = 0;
252  uint32_t payload_size = nlh->nlmsg_len - sizeof(struct nlmsghdr);
253  char *data(buffer_cast<char *>(buf + sizeof(struct nlmsghdr)));
254 
255  while (bytes_read < payload_size) {
256  char *buffer = data + bytes_read;
257  bytes_read += recv(socket_, buffer, payload_size - bytes_read, 0);
258  }
259 }
virtual void SetErrno(int err)
Definition: ksync_sock.h:78
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
Definition: ksync_sock.cc:920
KSyncSockUdsReadTask(TaskScheduler *scheduler, KSyncSockUds *queue)
bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
std::string Description() const
Gives a description of the task.
KSyncSockUds * queue_
virtual bool IsMoreData(char *data)
static string sockpath_
Definition: ksync_sock.h:597
char * rx_buff_
Definition: ksync_sock.h:592
static void Init(boost::asio::io_context &ios, const std::string &cpu_pin_policy, const std::string &sockpathvr="")
size_t remain_
Definition: ksync_sock.h:594
KSyncSockUds(boost::asio::io_context &ios)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
boost::asio::local::stream_protocol::endpoint server_ep_
Definition: ksync_sock.h:591
boost::asio::local::stream_protocol::socket sock_
Definition: ksync_sock.h:590
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
virtual void Receive(boost::asio::mutable_buffers_1)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
virtual bool Validate(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual uint32_t GetSeqno(char *data)
char * rx_buff_q_
Definition: ksync_sock.h:593
virtual bool Run(void)
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
Definition: ksync_sock.h:394
void reset_use_wait_tree()
Definition: ksync_sock.h:414
void set_process_data_inline()
Definition: ksync_sock.h:415
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:231
void ProcessDataInline(char *data)
Definition: ksync_sock.cc:839
uint32_t bulk_buf_size_
Definition: ksync_sock.h:444
static void SetSockTableEntry(KSyncSock *sock)
Definition: ksync_sock.cc:269
static void SetNetlinkFamilyId(int id)
Definition: ksync_sock.cc:274
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
Definition: ksync_sock.h:326
static const unsigned kBufLen
Definition: ksync_sock.h:316
nl_client * nl_client_
Definition: ksync_sock.h:426
uint32_t max_bulk_msg_count_
Definition: ksync_sock.h:436
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
bool NetlinkMsgDone(char *data)
Definition: ksync_sock.cc:67
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
Definition: ksync_sock.cc:118
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
Definition: ksync_sock.cc:145
uint32_t GetNetlinkSeqno(char *data)
Definition: ksync_sock.cc:62
void ResetNetlink(nl_client *client)
Definition: ksync_sock.cc:138
#define KSYNC_AGENT_VROUTER_SOCK_PATH
Definition: ksync_sock.h:565
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
Definition: ksync_sock.h:37
#define LOG(_Level, _Msg)
Definition: logging.h:34
struct task_ task