OpenSDN source code
ksync_sock_tcp.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "ksync_sock.h"
6 
7 #include <boost/asio.hpp>
8 #include <boost/bind/bind.hpp>
9 
10 using namespace boost::asio;
11 using namespace boost::placeholders;
12 
14 // KSyncSockTcp routines
16 //TCP socket class for interacting with vrouter
18  boost::asio::ip::address ip_address, int port) : TcpServer(evm), evm_(evm),
19  session_(NULL), server_ep_(ip_address, port), connect_complete_(false) {
20 
23  if (rx_buff_ == NULL) {
24  rx_buff_ = new char[kBufLen];
25  }
26  rx_buff_rem_ = new char[kBufLen];
27  remain_ = 0;
28 
31 }
32 
33 void KSyncSockTcp::Init(EventManager *evm, boost::asio::ip::address ip_addr,
34  int port, const std::string &cpu_pin_policy) {
35  KSyncSock::SetSockTableEntry(new KSyncSockTcp(evm, ip_addr, port));
37  KSyncSock::Init(false, cpu_pin_policy);
38 }
39 
41  TcpSession *session = new KSyncSockTcpSession(this, socket, false);
42  session->set_observer(boost::bind(&KSyncSockTcp::OnSessionEvent,
43  this, _1, _2));
44  return session;
45 }
46 
47 uint32_t KSyncSockTcp::GetSeqno(char *data) {
48  return GetNetlinkSeqno(data);
49 }
50 
51 bool KSyncSockTcp::IsMoreData(char *data) {
52  return NetlinkMsgDone(data);
53 }
54 
55 size_t KSyncSockTcp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
56  size_t len = 0, ret;
57  struct msghdr msg;
58  struct iovec iov[max_bulk_msg_count_*2];
59  int i, fd;
60 
61  memset(&msg, 0, sizeof(msg));
62  msg.msg_iov = iov;
63 
65  int offset = nl_client_->cl_buf_offset;
67 
68  KSyncBufferList::iterator it = iovec->begin();
69  iovec->insert(it, buffer((char *)nl_client_->cl_buf, offset));
70 
71  int count = iovec->size();
72  for(i = 0; i < count; i++) {
73  mutable_buffers_1 buf = iovec->at(i);
74  size_t buf_size = boost::asio::buffer_size(buf);
75  void* cbuf = boost::asio::buffer_cast<void*>(buf);
76  len += buf_size;
77  iov[i].iov_base = cbuf;
78  iov[i].iov_len = buf_size;
79  }
80 
81  msg.msg_iovlen = i;
82  fd = tcp_socket_->native_handle();
83  ret = sendmsg(fd, &msg, 0);
84  if (ret != len) {
85  LOG(ERROR, "sendmsg failure " << ret << "len " << len);
86  }
87  return len;
88 }
89 
90 void KSyncSockTcp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
91  HandlerCb cb) {
92  SendTo(iovec, seq_no);
93  return;
94 }
95 
96 bool KSyncSockTcp::Validate(char *data) {
97  return ValidateNetlink(data);
98 }
99 
100 bool KSyncSockTcp::Decoder(char *data, AgentSandeshContext *context) {
101  KSyncSockNetlink::NetlinkDecoder(data, context);
102  return true;
103 }
104 
106  KSyncBulkSandeshContext *bulk_sandesh_context) {
107  // Get sandesh buffer and buffer-length
108  uint32_t buf_len = 0;
109  char *buf = NULL;
110  GetNetlinkPayload(data, &buf, &buf_len);
111  return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
112 }
113 
114 void KSyncSockTcp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
115  //Data would be read from ksync tcp session
116  //hence no socket operation would be required
117 }
118 
119 void KSyncSockTcp::Receive(mutable_buffers_1 buf) {
120  uint32_t bytes_read = 0;
121  boost::system::error_code ec;
122  const struct nlmsghdr *nlh = NULL;
123 
124  //Create a buffer to read netlink header first
125  mutable_buffers_1 netlink_header(buffer_cast<void *>(buf),
126  sizeof(struct nlmsghdr));
127 
128  bool blocking_socket = session_->socket()->non_blocking();
129  session_->socket()->non_blocking(false, ec);
130  while (bytes_read < sizeof(struct nlmsghdr)) {
131  mutable_buffers_1 buffer =
132  static_cast<mutable_buffers_1>(netlink_header + bytes_read);
133  bytes_read += session_->socket()->receive(buffer, 0, ec);
134  if (ec.failed()) {
135  assert(0);
136  }
137  //Data read is lesser than netlink header
138  //continue reading
139  if (bytes_read == sizeof(struct nlmsghdr)) {
140  nlh = buffer_cast<struct nlmsghdr *>(buf);
141  }
142  }
143 
144  if (nlh->nlmsg_type == NLMSG_ERROR) {
145  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
146  << " len " << nlh->nlmsg_len);
147  assert(0);
148  }
149 
150  bytes_read = 0;
151  uint32_t payload_size = nlh->nlmsg_len - sizeof(struct nlmsghdr);
152  //Read data
153  mutable_buffers_1 data(buffer_cast<void *>(buf + sizeof(struct nlmsghdr)),
154  payload_size);
155 
156  while (bytes_read < payload_size) {
157  mutable_buffers_1 buffer =
158  static_cast<mutable_buffers_1>(data + bytes_read);
159  bytes_read += session_->socket()->receive(buffer, 0, ec);
160  if (ec.failed()) {
161  assert(0);
162  }
163  }
164  session_->socket()->non_blocking(blocking_socket, ec);
165 }
166 
167 bool KSyncSockTcp::ReceiveMsg(const u_int8_t *msg, size_t size) {
169  ctxt->SetErrno(0);
170  ProcessDataInline((char *) msg);
171  return true;
172 }
173 
176  int fd = tcp_socket_->native_handle();
177 
178  while (1) {
179  char *bufp = rx_buff_;
180  struct nlmsghdr *nlh = NULL;
181  struct nlmsghdr tnlh;
182  int offset = 0;
183  int bytes_transferred = 0;
184 
185  bytes_transferred = recv(fd, rx_buff_, kBufLen, 0);
186  if (bytes_transferred <= 0) {
187  LOG(ERROR, "Connection to dpdk-vrouter lost.");
188  sleep(10);
189  exit(EXIT_FAILURE);
190  }
191 
192  if (remain_ != 0) {
193  if (remain_ < sizeof(struct nlmsghdr)) {
194  memcpy((char *)&tnlh, rx_buff_rem_, remain_);
195  memcpy(((char *)&tnlh) + remain_, rx_buff_,
196  (sizeof(struct nlmsghdr) - remain_));
197  nlh = &tnlh;
198  } else {
199  nlh = (struct nlmsghdr *)rx_buff_rem_;
200  }
201 
202  if (remain_ > nlh->nlmsg_len)
203  assert(0);
204 
205  memcpy(rx_buff_rem_+remain_, rx_buff_, nlh->nlmsg_len - remain_);
206  bufp += (nlh->nlmsg_len - remain_);
207  ctxt->SetErrno(0);
209  offset = nlh->nlmsg_len - remain_;
210  }
211 
212  while (offset < bytes_transferred) {
213  if ((unsigned int)(bytes_transferred - offset) > (sizeof(struct nlmsghdr))) {
214  nlh = (struct nlmsghdr *)(rx_buff_ + offset);
215  if ((unsigned int)(bytes_transferred - offset) > nlh->nlmsg_len) {
216  ctxt->SetErrno(0);
217  ProcessDataInline(rx_buff_ + offset);
218  offset += nlh->nlmsg_len;
219  } else {
220  break;
221  }
222  } else {
223  break;
224  }
225  }
226 
227  remain_ = bytes_transferred - offset;
228  if (remain_) {
229  memcpy(rx_buff_rem_, rx_buff_ + offset, bytes_transferred - offset);
230  }
231  }
232 
233  return true;
234 }
235 
236 class KSyncSockTcpReadTask : public Task {
237 public:
239  Task(scheduler->GetTaskId("Ksync::KSyncSockTcpRead"), 0), sock_(sock) {
240  }
242  }
243 
244  bool Run() {
245  sock_->Run();
246  return true;
247  }
248  std::string Description() const { return "KSyncSockTcpRead"; }
249 private:
251 
252 };
253 
255  static int started = 0;
256  boost::system::error_code ec;
257 
258  if (!started) {
260  KSyncSockTcpReadTask *task = new KSyncSockTcpReadTask(scheduler, this);
261  tcp_socket_->non_blocking(false, ec);
262  scheduler->Enqueue(task);
263  started = 1;
264  }
265 }
266 
268  TcpSession::Event event) {
269  switch (event) {
271  //Retry
273  break;
274  case TcpSession::CLOSE:
275  LOG(ERROR, "Connection to dpdk-vrouter lost.");
276  sleep(10);
277  exit(EXIT_FAILURE);
278  break;
281  connect_complete_ = true;
285  default:
286  break;
287  }
288 }
289 
291 // KSyncSockTcpSession routines
294  bool async_ready) : TcpSession(server, sock, async_ready) {
295  KSyncSockTcp *tcp_ptr = static_cast<KSyncSockTcp *>(server);
297  boost::bind(&KSyncSockTcp::ReceiveMsg, tcp_ptr, _1, _2));
298 }
299 
301  if (reader_) {
302  delete reader_;
303  }
304 }
305 
307  reader_->OnRead(buffer);
308 }
309 
311  TcpSession *session, ReceiveCallback callback) :
312  TcpMessageReader(session, callback) {
313 }
314 
316  size_t size = TcpSession::BufferSize(buffer);
317  int remain = size - offset;
318  if (remain < GetHeaderLenSize()) {
319  return -1;
320  }
321 
322  //Byte ordering?
323  const struct nlmsghdr *nlh =
324  (const struct nlmsghdr *)(TcpSession::BufferData(buffer) + offset);
325  return nlh->nlmsg_len;
326 }
virtual void SetErrno(int err)
Definition: ksync_sock.h:78
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
Definition: ksync_sock.cc:920
std::string Description() const
Gives a description of the task.
KSyncSockTcp * sock_
KSyncSockTcpReadTask(TaskScheduler *scheduler, KSyncSockTcp *sock)
bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
virtual const int GetHeaderLenSize()
Definition: ksync_sock.h:608
virtual int MsgLength(Buffer buffer, int offset)
KSyncSockTcpSessionReader(TcpSession *session, ReceiveCallback callback)
KSyncSockTcpSessionReader * reader_
Definition: ksync_sock.h:628
virtual void OnRead(Buffer buffer)
KSyncSockTcpSession(TcpServer *server, Socket *sock, bool async_ready=false)
virtual ~KSyncSockTcpSession()
virtual bool IsMoreData(char *data)
boost::asio::ip::tcp::endpoint server_ep_
Definition: ksync_sock.h:663
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
TcpSession * session_
Definition: ksync_sock.h:662
virtual void Receive(boost::asio::mutable_buffers_1)
virtual uint32_t GetSeqno(char *data)
void AsyncReadStart()
bool ReceiveMsg(const u_int8_t *msg, size_t size)
KSyncSockTcp(EventManager *evm, boost::asio::ip::address ip_addr, int port)
virtual bool Run(void)
boost::asio::ip::tcp::socket * tcp_socket_
Definition: ksync_sock.h:664
size_t remain_
Definition: ksync_sock.h:668
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
char * rx_buff_rem_
Definition: ksync_sock.h:667
virtual bool Validate(char *data)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
bool connect_complete_
Definition: ksync_sock.h:665
char * rx_buff_
Definition: ksync_sock.h:666
virtual TcpSession * AllocSession(Socket *socket)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
static void Init(EventManager *evm, boost::asio::ip::address ip_addr, int port, const std::string &cpu_pin_policy)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
Definition: ksync_sock.h:394
void reset_use_wait_tree()
Definition: ksync_sock.h:414
void set_process_data_inline()
Definition: ksync_sock.h:415
uint32_t max_bulk_buf_size_
Definition: ksync_sock.h:438
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:231
void ProcessDataInline(char *data)
Definition: ksync_sock.cc:839
uint32_t bulk_buf_size_
Definition: ksync_sock.h:444
static void SetSockTableEntry(KSyncSock *sock)
Definition: ksync_sock.cc:269
static void SetNetlinkFamilyId(int id)
Definition: ksync_sock.cc:274
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
Definition: ksync_sock.h:326
static const unsigned kBufLen
Definition: ksync_sock.h:316
nl_client * nl_client_
Definition: ksync_sock.h:426
uint32_t max_bulk_msg_count_
Definition: ksync_sock.h:436
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
virtual void OnRead(Buffer buffer)
Definition: tcp_session.cc:672
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
Definition: tcp_session.h:307
boost::asio::const_buffer Buffer
Definition: tcp_session.h:306
virtual void Connect(TcpSession *session, Endpoint remote)
Definition: tcp_server.cc:476
virtual TcpSession * CreateSession()
Definition: tcp_server.cc:190
boost::asio::ip::tcp::socket Socket
Definition: tcp_server.h:31
@ CONNECT_COMPLETE
Definition: tcp_session.h:46
@ CONNECT_FAILED
Definition: tcp_session.h:47
TcpServer * server()
Definition: tcp_session.h:84
boost::system::error_code SetTcpSendBufSize(uint32_t size)
Definition: tcp_session.cc:777
virtual Socket * socket() const
Definition: tcp_session.h:82
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:109
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
Definition: tcp_session.cc:790
boost::system::error_code SetTcpNoDelay()
Definition: tcp_session.cc:766
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:112
void set_observer(EventObserver observer)
Definition: tcp_session.cc:219
boost::asio::const_buffer Buffer
Definition: tcp_session.h:60
boost::asio::ip::tcp::socket Socket
Definition: tcp_session.h:56
static EventManager evm
bool NetlinkMsgDone(char *data)
Definition: ksync_sock.cc:67
bool ValidateNetlink(char *data)
Definition: ksync_sock.cc:73
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
Definition: ksync_sock.cc:118
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
Definition: ksync_sock.cc:145
uint32_t GetNetlinkSeqno(char *data)
Definition: ksync_sock.cc:62
void ResetNetlink(nl_client *client)
Definition: ksync_sock.cc:138
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
Definition: ksync_sock.h:37
#define LOG(_Level, _Msg)
Definition: logging.h:34
struct task_ task