OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ksync_sock_tcp.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "ksync_sock.h"
6 
7 #include <boost/asio.hpp>
8 #include <boost/bind.hpp>
9 
10 using namespace boost::asio;
11 
13 // KSyncSockTcp routines
15 //TCP socket class for interacting with vrouter
17  boost::asio::ip::address ip_address, int port) : TcpServer(evm), evm_(evm),
18  session_(NULL), server_ep_(ip_address, port), connect_complete_(false) {
19 
22  if (rx_buff_ == NULL) {
23  rx_buff_ = new char[kBufLen];
24  }
25  rx_buff_rem_ = new char[kBufLen];
26  remain_ = 0;
27 
30 }
31 
32 void KSyncSockTcp::Init(EventManager *evm, boost::asio::ip::address ip_addr,
33  int port, const std::string &cpu_pin_policy) {
34  KSyncSock::SetSockTableEntry(new KSyncSockTcp(evm, ip_addr, port));
36  KSyncSock::Init(false, cpu_pin_policy);
37 }
38 
40  TcpSession *session = new KSyncSockTcpSession(this, socket, false);
41  session->set_observer(boost::bind(&KSyncSockTcp::OnSessionEvent,
42  this, _1, _2));
43  return session;
44 }
45 
46 uint32_t KSyncSockTcp::GetSeqno(char *data) {
47  return GetNetlinkSeqno(data);
48 }
49 
50 bool KSyncSockTcp::IsMoreData(char *data) {
51  return NetlinkMsgDone(data);
52 }
53 
54 size_t KSyncSockTcp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
55  size_t len = 0, ret;
56  struct msghdr msg;
57  struct iovec iov[max_bulk_msg_count_*2];
58  int i, fd;
59 
60  memset(&msg, 0, sizeof(msg));
61  msg.msg_iov = iov;
62 
64  int offset = nl_client_->cl_buf_offset;
66 
67  KSyncBufferList::iterator it = iovec->begin();
68  iovec->insert(it, buffer((char *)nl_client_->cl_buf, offset));
69 
70  int count = iovec->size();
71  for(i = 0; i < count; i++) {
72  mutable_buffers_1 buf = iovec->at(i);
73  size_t buf_size = boost::asio::buffer_size(buf);
74  void* cbuf = boost::asio::buffer_cast<void*>(buf);
75  len += buf_size;
76  iov[i].iov_base = cbuf;
77  iov[i].iov_len = buf_size;
78  }
79 
80  msg.msg_iovlen = i;
81  fd = tcp_socket_->native_handle();
82  ret = sendmsg(fd, &msg, 0);
83  if (ret != len) {
84  LOG(ERROR, "sendmsg failure " << ret << "len " << len);
85  }
86  return len;
87 }
88 
89 void KSyncSockTcp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
90  HandlerCb cb) {
91  SendTo(iovec, seq_no);
92  return;
93 }
94 
95 bool KSyncSockTcp::Validate(char *data) {
96  return ValidateNetlink(data);
97 }
98 
99 bool KSyncSockTcp::Decoder(char *data, AgentSandeshContext *context) {
100  KSyncSockNetlink::NetlinkDecoder(data, context);
101  return true;
102 }
103 
105  KSyncBulkSandeshContext *bulk_sandesh_context) {
106  // Get sandesh buffer and buffer-length
107  uint32_t buf_len = 0;
108  char *buf = NULL;
109  GetNetlinkPayload(data, &buf, &buf_len);
110  return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
111 }
112 
113 void KSyncSockTcp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
114  //Data would be read from ksync tcp session
115  //hence no socket operation would be required
116 }
117 
118 void KSyncSockTcp::Receive(mutable_buffers_1 buf) {
119  uint32_t bytes_read = 0;
120  boost::system::error_code ec;
121  const struct nlmsghdr *nlh = NULL;
122 
123  //Create a buffer to read netlink header first
124  mutable_buffers_1 netlink_header(buffer_cast<void *>(buf),
125  sizeof(struct nlmsghdr));
126 
127  bool blocking_socket = session_->socket()->non_blocking();
128  session_->socket()->non_blocking(false, ec);
129  while (bytes_read < sizeof(struct nlmsghdr)) {
130  mutable_buffers_1 buffer =
131  static_cast<mutable_buffers_1>(netlink_header + bytes_read);
132  bytes_read += session_->socket()->receive(buffer, 0, ec);
133  if (ec.failed()) {
134  assert(0);
135  }
136  //Data read is lesser than netlink header
137  //continue reading
138  if (bytes_read == sizeof(struct nlmsghdr)) {
139  nlh = buffer_cast<struct nlmsghdr *>(buf);
140  }
141  }
142 
143  if (nlh->nlmsg_type == NLMSG_ERROR) {
144  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
145  << " len " << nlh->nlmsg_len);
146  assert(0);
147  }
148 
149  bytes_read = 0;
150  uint32_t payload_size = nlh->nlmsg_len - sizeof(struct nlmsghdr);
151  //Read data
152  mutable_buffers_1 data(buffer_cast<void *>(buf + sizeof(struct nlmsghdr)),
153  payload_size);
154 
155  while (bytes_read < payload_size) {
156  mutable_buffers_1 buffer =
157  static_cast<mutable_buffers_1>(data + bytes_read);
158  bytes_read += session_->socket()->receive(buffer, 0, ec);
159  if (ec.failed()) {
160  assert(0);
161  }
162  }
163  session_->socket()->non_blocking(blocking_socket, ec);
164 }
165 
166 bool KSyncSockTcp::ReceiveMsg(const u_int8_t *msg, size_t size) {
168  ctxt->SetErrno(0);
169  ProcessDataInline((char *) msg);
170  return true;
171 }
172 
175  int fd = tcp_socket_->native_handle();
176 
177  while (1) {
178  char *bufp = rx_buff_;
179  struct nlmsghdr *nlh = NULL;
180  struct nlmsghdr tnlh;
181  int offset = 0;
182  int bytes_transferred = 0;
183 
184  bytes_transferred = recv(fd, rx_buff_, kBufLen, 0);
185  if (bytes_transferred <= 0) {
186  LOG(ERROR, "Connection to dpdk-vrouter lost.");
187  sleep(10);
188  exit(EXIT_FAILURE);
189  }
190 
191  if (remain_ != 0) {
192  if (remain_ < sizeof(struct nlmsghdr)) {
193  memcpy((char *)&tnlh, rx_buff_rem_, remain_);
194  memcpy(((char *)&tnlh) + remain_, rx_buff_,
195  (sizeof(struct nlmsghdr) - remain_));
196  nlh = &tnlh;
197  } else {
198  nlh = (struct nlmsghdr *)rx_buff_rem_;
199  }
200 
201  if (remain_ > nlh->nlmsg_len)
202  assert(0);
203 
204  memcpy(rx_buff_rem_+remain_, rx_buff_, nlh->nlmsg_len - remain_);
205  bufp += (nlh->nlmsg_len - remain_);
206  ctxt->SetErrno(0);
208  offset = nlh->nlmsg_len - remain_;
209  }
210 
211  while (offset < bytes_transferred) {
212  if ((unsigned int)(bytes_transferred - offset) > (sizeof(struct nlmsghdr))) {
213  nlh = (struct nlmsghdr *)(rx_buff_ + offset);
214  if ((unsigned int)(bytes_transferred - offset) > nlh->nlmsg_len) {
215  ctxt->SetErrno(0);
216  ProcessDataInline(rx_buff_ + offset);
217  offset += nlh->nlmsg_len;
218  } else {
219  break;
220  }
221  } else {
222  break;
223  }
224  }
225 
226  remain_ = bytes_transferred - offset;
227  if (remain_) {
228  memcpy(rx_buff_rem_, rx_buff_ + offset, bytes_transferred - offset);
229  }
230  }
231 
232  return true;
233 }
234 
235 class KSyncSockTcpReadTask : public Task {
236 public:
238  Task(scheduler->GetTaskId("Ksync::KSyncSockTcpRead"), 0), sock_(sock) {
239  }
241  }
242 
243  bool Run() {
244  sock_->Run();
245  return true;
246  }
247  std::string Description() const { return "KSyncSockTcpRead"; }
248 private:
250 
251 };
252 
254  static int started = 0;
255  boost::system::error_code ec;
256 
257  if (!started) {
259  KSyncSockTcpReadTask *task = new KSyncSockTcpReadTask(scheduler, this);
260  tcp_socket_->non_blocking(false, ec);
261  scheduler->Enqueue(task);
262  started = 1;
263  }
264 }
265 
267  TcpSession::Event event) {
268  switch (event) {
270  //Retry
272  break;
273  case TcpSession::CLOSE:
274  LOG(ERROR, "Connection to dpdk-vrouter lost.");
275  sleep(10);
276  exit(EXIT_FAILURE);
277  break;
280  connect_complete_ = true;
284  default:
285  break;
286  }
287 }
288 
290 // KSyncSockTcpSession routines
293  bool async_ready) : TcpSession(server, sock, async_ready) {
294  KSyncSockTcp *tcp_ptr = static_cast<KSyncSockTcp *>(server);
296  boost::bind(&KSyncSockTcp::ReceiveMsg, tcp_ptr, _1, _2));
297 }
298 
300  if (reader_) {
301  delete reader_;
302  }
303 }
304 
306  reader_->OnRead(buffer);
307 }
308 
310  TcpSession *session, ReceiveCallback callback) :
311  TcpMessageReader(session, callback) {
312 }
313 
315  size_t size = TcpSession::BufferSize(buffer);
316  int remain = size - offset;
317  if (remain < GetHeaderLenSize()) {
318  return -1;
319  }
320 
321  //Byte ordering?
322  const struct nlmsghdr *nlh =
323  (const struct nlmsghdr *)(TcpSession::BufferData(buffer) + offset);
324  return nlh->nlmsg_len;
325 }
virtual const int GetHeaderLenSize()
Definition: ksync_sock.h:610
virtual bool IsMoreData(char *data)
virtual TcpSession * AllocSession(Socket *socket)
char * rx_buff_
Definition: ksync_sock.h:668
boost::asio::const_buffer Buffer
Definition: tcp_session.h:64
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
Definition: tcp_session.h:311
virtual void SetErrno(int err)
Definition: ksync_sock.h:78
virtual bool Validate(char *data)
boost::asio::ip::tcp::socket Socket
Definition: tcp_server.h:31
void AsyncReadStart()
virtual void Connect(TcpSession *session, Endpoint remote)
Definition: tcp_server.cc:474
KSyncSockTcpSession(TcpServer *server, Socket *sock, bool async_ready=false)
virtual bool Run(void)
std::string Description() const
static size_t BufferSize(const Buffer &buffer)
Definition: tcp_session.h:116
virtual uint32_t GetSeqno(char *data)
virtual TcpSession * CreateSession()
Definition: tcp_server.cc:188
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
Definition: ksync_sock.h:37
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
Definition: ksync_sock.h:396
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
KSyncSockTcp(EventManager *evm, boost::asio::ip::address ip_addr, int port)
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
Definition: ksync_sock.cc:140
void ProcessDataInline(char *data)
Definition: ksync_sock.cc:837
boost::asio::ip::tcp::endpoint server_ep_
Definition: ksync_sock.h:665
bool ValidateNetlink(char *data)
Definition: ksync_sock.cc:68
nl_client * nl_client_
Definition: ksync_sock.h:428
boost::asio::ip::tcp::socket Socket
Definition: tcp_session.h:60
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
Definition: ksync_sock.cc:113
void set_observer(EventObserver observer)
Definition: tcp_session.cc:218
uint32_t max_bulk_buf_size_
Definition: ksync_sock.h:440
uint32_t GetNetlinkSeqno(char *data)
Definition: ksync_sock.cc:57
bool ReceiveMsg(const u_int8_t *msg, size_t size)
virtual void OnRead(Buffer buffer)
Definition: tcp_session.cc:671
uint32_t bulk_buf_size_
Definition: ksync_sock.h:446
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
size_t remain_
Definition: ksync_sock.h:670
virtual int MsgLength(Buffer buffer, int offset)
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
boost::asio::const_buffer Buffer
Definition: tcp_session.h:310
static void SetSockTableEntry(KSyncSock *sock)
Definition: ksync_sock.cc:264
KSyncSockTcp * sock_
virtual void Receive(boost::asio::mutable_buffers_1)
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
Definition: ksync_sock.cc:918
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
Definition: ksync_sock.h:328
KSyncSockTcpReadTask(TaskScheduler *scheduler, KSyncSockTcp *sock)
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:226
KSyncSockTcpSessionReader(TcpSession *session, ReceiveCallback callback)
bool connect_complete_
Definition: ksync_sock.h:667
boost::asio::ip::tcp::socket * tcp_socket_
Definition: ksync_sock.h:666
TcpSession * session_
Definition: ksync_sock.h:664
char * rx_buff_rem_
Definition: ksync_sock.h:669
void reset_use_wait_tree()
Definition: ksync_sock.h:416
virtual void OnRead(Buffer buffer)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
bool NetlinkMsgDone(char *data)
Definition: ksync_sock.cc:62
static void Init(EventManager *evm, boost::asio::ip::address ip_addr, int port, const std::string &cpu_pin_policy)
int GetTaskId() const
Definition: task.h:118
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
Definition: tcp_session.cc:789
KSyncSockTcpSessionReader * reader_
Definition: ksync_sock.h:630
TcpServer * server()
Definition: tcp_session.h:88
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
#define LOG(_Level, _Msg)
Definition: logging.h:33
static void SetNetlinkFamilyId(int id)
Definition: ksync_sock.cc:269
void set_process_data_inline()
Definition: ksync_sock.h:417
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
boost::system::error_code SetTcpSendBufSize(uint32_t size)
Definition: tcp_session.cc:776
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
virtual Socket * socket() const
Definition: tcp_session.h:86
static const uint8_t * BufferData(const Buffer &buffer)
Definition: tcp_session.h:113
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
void ResetNetlink(nl_client *client)
Definition: ksync_sock.cc:133
static const unsigned kBufLen
Definition: ksync_sock.h:316
virtual ~KSyncSockTcpSession()
struct task_ task
static EventManager evm
boost::system::error_code SetTcpNoDelay()
Definition: tcp_session.cc:765