OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ksync_sock_uds.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "ksync_sock.h"
6 
7 #include <boost/asio.hpp>
8 #include <boost/bind.hpp>
9 #include <errno.h>
10 
11 class AgentParam;
12 
13 using namespace boost::asio;
14 
16 // KSyncSockUds routines
18 //Unix domain socket class for interacting with dpdk vrouter
19 
20 class KSyncSockUdsReadTask : public Task {
21 public:
23  Task(scheduler->GetTaskId("Ksync::KSyncSockUdsRead"), 0), queue_(queue) {
24  }
26  }
27 
28  bool Run() {
29  queue_->Run();
30  return true;
31  }
32  std::string Description() const { return "KSyncSockUdsRead"; }
33 private:
35 
36 };
37 
38 
39 
41 
42 KSyncSockUds::KSyncSockUds(boost::asio::io_context &ios) :
43  sock_(ios),
44  server_ep_(sockpath_),
45  rx_buff_(NULL),
46  rx_buff_q_(NULL),
47  remain_(0),
48  socket_(0),
49  connected_(false) {
50  boost::system::error_code ec;
53 retry:;
54  sock_.connect(server_ep_, ec);
55  if (ec) {
56  sleep(1);
57  goto retry;
58  }
59  socket_ = sock_.native_handle();
60  connected_ = true;
61  rx_buff_ = new char[10*kBufLen];
62  rx_buff_q_ = new char[10*kBufLen];
63 }
64 
67  char *ret_buff = new char[1024*kBufLen];
68  boost::system::error_code ec;
69 
70  // Read data from the socket and append it to the existing
71  // unprocessed data in the local buffer.
72  while (1) {
73  char *bufp = rx_buff_;
74  struct nlmsghdr *nlh = NULL;
75  struct nlmsghdr tnlh;
76  size_t offset = 0;
77  int ret_val;
78  size_t bytes_transferred = 0;
79  bytes_transferred = ret_val = recv(socket_, rx_buff_, 10*kBufLen, 0);
80  if (ret_val == 0) {
81  // connection reset by peer
82  // close socket and exit
83  sock_.close(ec);
84  LOG(INFO, " dpdk vrouter is down, exiting.. errno:" << errno);
85  exit(0);
86  }
87  if (ret_val < 0) {
88  if (errno != EAGAIN) {
89  sock_.close(ec);
90  connected_ = false;
91 retry:;
92  sock_.connect(server_ep_, ec);
93  if (ec) {
94  sleep(1);
95  goto retry;
96  }
97  socket_ = sock_.native_handle();
98  connected_ = true;
99  }
100  continue;
101  }
102 
103  if (remain_ != 0) {
104  if (remain_ < sizeof(struct nlmsghdr)) {
105  memcpy((char *)&tnlh, rx_buff_q_, remain_);
106  memcpy(((char *)&tnlh) + remain_, rx_buff_,
107  (sizeof(struct nlmsghdr) - remain_));
108  nlh = &tnlh;
109  } else {
110  nlh = (struct nlmsghdr *)rx_buff_q_;
111  }
112  if (remain_ > nlh->nlmsg_len)
113  assert(0);
114  memcpy(ret_buff, rx_buff_q_, remain_);
115  memcpy(ret_buff+remain_, rx_buff_, nlh->nlmsg_len - remain_);
116  bufp += (nlh->nlmsg_len - remain_);
117  ctxt->SetErrno(0);
118  ProcessDataInline(ret_buff);
119  offset = nlh->nlmsg_len - remain_;
120  }
121  while (offset < bytes_transferred) {
122  if ((bytes_transferred - offset) > (sizeof(struct nlmsghdr))) {
123  nlh = (struct nlmsghdr *)(rx_buff_ + offset);
124  if ((bytes_transferred - offset) > nlh->nlmsg_len) {
125  memcpy(ret_buff, rx_buff_ + offset, nlh->nlmsg_len);
126  ctxt->SetErrno(0);
127  ProcessDataInline(ret_buff);
128  offset += nlh->nlmsg_len;
129  } else {
130  break;
131  }
132  } else {
133  break;
134  }
135  }
136  memcpy(rx_buff_q_, rx_buff_ + offset, bytes_transferred - offset);
137  remain_ = bytes_transferred - offset;
138  }
139  return true;
140 }
141 
142 void KSyncSockUds::Init(io_service &ios, const std::string &cpu_pin_policy,
143  const std::string &sockpathvr) {
145  SetNetlinkFamilyId(10);
146  KSyncSock::Init(false, cpu_pin_policy);
147  sockpath_ = sockpathvr;
148 }
149 
150 uint32_t KSyncSockUds::GetSeqno(char *data) {
151  return GetNetlinkSeqno(data);
152 }
153 
154 bool KSyncSockUds::IsMoreData(char *data) {
155  return NetlinkMsgDone(data);
156 }
157 
158 bool KSyncSockUds::Decoder(char *data, AgentSandeshContext *context) {
159  KSyncSockNetlink::NetlinkDecoder(data, context);
160  return true;
161 }
162 
164  KSyncBulkSandeshContext *bulk_sandesh_context) {
165  // Get sandesh buffer and buffer-length
166  uint32_t buf_len = 0;
167  char *buf = NULL;
168  GetNetlinkPayload(data, &buf, &buf_len);
169  return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
170 }
171 
172 void KSyncSockUds::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
173  HandlerCb cb) {
174  if (connected_ == true)
175  SendTo(iovec, seq_no);
176 }
177 
178 size_t KSyncSockUds::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
179  size_t len = 0, ret;
180  struct msghdr msg;
181  struct iovec iov[max_bulk_msg_count_*2];
182  int i;
183 
184  memset(&msg, 0, sizeof(msg));
185  msg.msg_iov = iov;
186 
188  int offset = nl_client_->cl_buf_offset;
190 
191  KSyncBufferList::iterator it = iovec->begin();
192  iovec->insert(it, buffer((char *)nl_client_->cl_buf, offset));
193 
194  int count = iovec->size();
195  for(i = 0; i < count; i++) {
196  mutable_buffers_1 buf = iovec->at(i);
197  size_t buf_size = boost::asio::buffer_size(buf);
198  void* cbuf = boost::asio::buffer_cast<void*>(buf);
199  len += buf_size;
200  iov[i].iov_base = cbuf;
201  iov[i].iov_len = buf_size;
202  }
203 
204  msg.msg_iovlen = i;
205  ret = sendmsg(socket_, &msg, 0);
206  if (ret != len) {
207  LOG(ERROR, "sendmsg failure " << ret << "len " << len);
208  }
209  return len;
210 }
211 
212 bool KSyncSockUds::Validate(char *data) {
213  return true;
214 }
215 
216 void KSyncSockUds::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
217  static int started = 0;
218  if (!started) {
220  //Receive is handled in a separate Run() thread
221  KSyncSockUdsReadTask *task = new KSyncSockUdsReadTask(scheduler, this);
222  scheduler->Enqueue(task);
223  started = 1;
224  }
225 }
226 
227 void KSyncSockUds::Receive(mutable_buffers_1 buf) {
228  boost::system::error_code ec;
229  uint32_t bytes_read = 0;
230  const struct nlmsghdr *nlh = NULL;
231 
232  char *netlink_header(buffer_cast<char *>(buf));
233 
234  while (bytes_read < sizeof(struct nlmsghdr)) {
235  char *buffer = netlink_header + bytes_read;
236  bytes_read += recv(socket_, buffer, sizeof(struct nlmsghdr) - bytes_read, 0);
237  //Data read is lesser than netlink header
238  //continue reading
239  if (bytes_read == sizeof(struct nlmsghdr)) {
240  nlh = buffer_cast<struct nlmsghdr *>(buf);
241  }
242  }
243 
244  if (nlh->nlmsg_type == NLMSG_ERROR) {
245  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
246  << " len " << nlh->nlmsg_len);
247  assert(0);
248  }
249 
250  bytes_read = 0;
251  uint32_t payload_size = nlh->nlmsg_len - sizeof(struct nlmsghdr);
252  char *data(buffer_cast<char *>(buf + sizeof(struct nlmsghdr)));
253 
254  while (bytes_read < payload_size) {
255  char *buffer = data + bytes_read;
256  bytes_read += recv(socket_, buffer, payload_size - bytes_read, 0);
257  }
258 }
char * rx_buff_
Definition: ksync_sock.h:594
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
virtual void SetErrno(int err)
Definition: ksync_sock.h:78
KSyncSockUds * queue_
virtual bool Validate(char *data)
virtual uint32_t GetSeqno(char *data)
virtual bool IsMoreData(char *data)
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
Definition: ksync_sock.h:37
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
Definition: ksync_sock.h:396
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
Definition: ksync_sock.cc:140
void ProcessDataInline(char *data)
Definition: ksync_sock.cc:837
KSyncSockUds(boost::asio::io_context &ios)
KSyncSockUdsReadTask(TaskScheduler *scheduler, KSyncSockUds *queue)
std::string Description() const
nl_client * nl_client_
Definition: ksync_sock.h:428
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
Definition: ksync_sock.cc:113
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
uint32_t GetNetlinkSeqno(char *data)
Definition: ksync_sock.cc:57
uint32_t bulk_buf_size_
Definition: ksync_sock.h:446
char * rx_buff_q_
Definition: ksync_sock.h:595
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
static void SetSockTableEntry(KSyncSock *sock)
Definition: ksync_sock.cc:264
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
Definition: ksync_sock.cc:918
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
Definition: ksync_sock.h:328
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:226
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
size_t remain_
Definition: ksync_sock.h:596
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
#define KSYNC_AGENT_VROUTER_SOCK_PATH
Definition: ksync_sock.h:567
static void Init(boost::asio::io_context &ios, const std::string &cpu_pin_policy, const std::string &sockpathvr="")
void reset_use_wait_tree()
Definition: ksync_sock.h:416
boost::asio::local::stream_protocol::socket sock_
Definition: ksync_sock.h:592
boost::asio::local::stream_protocol::endpoint server_ep_
Definition: ksync_sock.h:593
bool NetlinkMsgDone(char *data)
Definition: ksync_sock.cc:62
#define LOG(_Level, _Msg)
Definition: logging.h:33
virtual bool Run(void)
static void SetNetlinkFamilyId(int id)
Definition: ksync_sock.cc:269
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
void set_process_data_inline()
Definition: ksync_sock.h:417
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
virtual void Receive(boost::asio::mutable_buffers_1)
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
void ResetNetlink(nl_client *client)
Definition: ksync_sock.cc:133
static string sockpath_
Definition: ksync_sock.h:599
static const unsigned kBufLen
Definition: ksync_sock.h:316
struct task_ task