OpenSDN source code
ksync_sock.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <string>
6 #include "base/os.h"
7 #if defined(__linux__)
8 #include <asm/types.h>
9 #include <linux/netlink.h>
10 #include <linux/rtnetlink.h>
11 #include <linux/genetlink.h>
12 #include <linux/sockios.h>
13 #endif
14 #include <sys/socket.h>
15 
16 #include <boost/bind.hpp>
17 
18 #include <base/logging.h>
19 #include <db/db.h>
20 #include <db/db_entry.h>
21 #include <db/db_table.h>
22 #include <db/db_table_partition.h>
23 
24 #include "ksync_index.h"
25 #include "ksync_entry.h"
26 #include "ksync_object.h"
27 #include "ksync_sock.h"
28 #include "ksync_sock_user.h"
29 #include "ksync_types.h"
30 
31 #include "nl_util.h"
32 #include "udp_util.h"
33 #include "vr_genetlink.h"
34 #include "vr_types.h"
35 
36 using namespace boost::asio;
37 
38 /* Note SO_RCVBUFFORCE is supported only for linux version 2.6.14 and above */
39 typedef boost::asio::detail::socket_option::integer<SOL_SOCKET,
40  SO_RCVBUFFORCE> ReceiveBuffForceSize;
41 
44 std::unique_ptr<KSyncSock> KSyncSock::sock_;
45 pid_t KSyncSock::pid_;
46 tbb::atomic<bool> KSyncSock::shutdown_;
47 
48 // Name of task used in KSync Response work-queues
50  {
51  "Agent::Uve",
52  "Agent::KSync"
53  };
55 // Netlink utilities
57 uint32_t GetNetlinkSeqno(char *data) {
58  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
59  return nlh->nlmsg_seq;
60 }
61 
62 bool NetlinkMsgDone(char *data) {
63  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
64  return ((nlh->nlmsg_flags & NLM_F_MULTI) != 0);
65 }
66 
67 // Common validation for netlink messages
68 bool ValidateNetlink(char *data) {
69  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
70  if (nlh->nlmsg_type == NLMSG_ERROR) {
71  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq << " len "
72  << nlh->nlmsg_len);
73  assert(0);
74  return false;
75  }
76 
77  if (nlh->nlmsg_len > KSyncSock::kBufLen) {
78  LOG(ERROR, "Length of " << nlh->nlmsg_len << " is more than expected "
79  "length of " << KSyncSock::kBufLen);
80  assert(0);
81  return false;
82  }
83 
84  if (nlh->nlmsg_type == NLMSG_DONE) {
85  return true;
86  }
87 
88  // Sanity checks for generic-netlink message
89  if (nlh->nlmsg_type != KSyncSock::GetNetlinkFamilyId()) {
90  LOG(ERROR, "Netlink unknown message type : " << nlh->nlmsg_type);
91  assert(0);
92  return false;
93  }
94 
95  struct genlmsghdr *genlh = (struct genlmsghdr *) (data + NLMSG_HDRLEN);
96  if (genlh->cmd != SANDESH_REQUEST) {
97  LOG(ERROR, "Unknown generic netlink cmd : " << genlh->cmd);
98  assert(0);
99  return false;
100  }
101 
102  struct nlattr * attr = (struct nlattr *)(data + NLMSG_HDRLEN
103  + GENL_HDRLEN);
104  if (attr->nla_type != NL_ATTR_VR_MESSAGE_PROTOCOL) {
105  LOG(ERROR, "Unknown generic netlink TLV type : " << attr->nla_type);
106  assert(0);
107  return false;
108  }
109 
110  return true;
111 }
112 
113 void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len) {
114  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
115  int len = 0;
116  if (nlh->nlmsg_type == NLMSG_DONE) {
117  len = NLMSG_HDRLEN;
118  } else {
119  len = NLMSG_HDRLEN + GENL_HDRLEN + NLA_HDRLEN;
120  }
121 
122  *buf = data + len;
123  *buf_len = nlh->nlmsg_len - len;
124 }
125 
126 void InitNetlink(nl_client *client) {
127  nl_init_generic_client_req(client, KSyncSock::GetNetlinkFamilyId());
128  unsigned char *nl_buf;
129  uint32_t nl_buf_len;
130  assert(nl_build_header(client, &nl_buf, &nl_buf_len) >= 0);
131 }
132 
133 void ResetNetlink(nl_client *client) {
134  unsigned char *nl_buf;
135  uint32_t nl_buf_len;
136  client->cl_buf_offset = 0;
137  nl_build_header(client, &nl_buf, &nl_buf_len);
138 }
139 
140 void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no) {
141  nl_update_header(client, len);
142  struct nlmsghdr *nlh = (struct nlmsghdr *)client->cl_buf;
143  nlh->nlmsg_pid = KSyncSock::GetPid();
144  nlh->nlmsg_seq = seq_no;
145 }
146 
147 void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context,
148  uint32_t alignment) {
149  while (buf_len > (alignment - 1)) {
150  int error;
151  int decode_len = Sandesh::ReceiveBinaryMsgOne((uint8_t *)buf, buf_len,
152  &error, sandesh_context);
153  if (decode_len < 0) {
154  LOG(DEBUG, "Incorrect decode len " << decode_len);
155  break;
156  }
157  buf += decode_len;
158  buf_len -= decode_len;
159  }
160 }
161 
163 // KSyncSock routines
166  nl_client_(NULL), wait_tree_(), send_queue_(this),
167  max_bulk_msg_count_(kMaxBulkMsgCount),
168  bulk_seq_no_(kInvalidBulkSeqNo), bulk_buf_size_(0), bulk_msg_count_(0),
169  rx_buff_(NULL), read_inline_(true), bulk_msg_context_(NULL),
170  use_wait_tree_(true), process_data_inline_(false),
171  ksync_bulk_sandesh_context_(), uve_bulk_sandesh_context_(),
172  tx_count_(0), ack_count_(0), err_count_(0),
173  rx_process_queue_(TaskScheduler::GetInstance()->GetTaskId("Agent::KSync"), 0,
174  boost::bind(&KSyncSock::ProcessRxData, this, _1)) {
176 
177  uint32_t uve_task_id =
179  uint32_t ksync_task_id =
181  for(uint32_t i = 0; i < kRxWorkQueueCount; i++) {
183  ksync_task_id, i, "KSync Receive Queue");
185  uve_task_id, i, "KSync UVE Receive Queue");
186  }
187 
188  nl_client_ = (nl_client *)malloc(sizeof(nl_client));
189  memset(nl_client_, 0, sizeof(nl_client));
190  rx_buff_ = NULL;
191  seqno_ = 0;
192  uve_seqno_ = 0;
193 
194  memset(bulk_mctx_arr_, 0, sizeof(bulk_mctx_arr_));
195  bmca_prod_ = bmca_cons_ = 0;
196 }
197 
199  assert(wait_tree_.size() == 0);
200 
201  if (rx_buff_) {
202  delete [] rx_buff_;
203  rx_buff_ = NULL;
204  }
205 
206  for(int i = 0; i < kRxWorkQueueCount; i++) {
207  ksync_rx_queue[i]->Shutdown();
208  delete ksync_rx_queue[i];
209 
210  uve_rx_queue[i]->Shutdown();
211  delete uve_rx_queue[i];
212  }
213 
214  if (nl_client_->cl_buf) {
215  free(nl_client_->cl_buf);
216  }
217  free(nl_client_);
218 }
219 
221  shutdown_ = true;
222  sock_->send_queue_.Shutdown();
223  sock_.release();
224 }
225 
226 void KSyncSock::Init(bool use_work_queue, const std::string &cpu_pin_policy) {
227  sock_->send_queue_.Init(use_work_queue, cpu_pin_policy);
228  pid_ = getpid();
229  shutdown_ = false;
230 }
231 
233 (KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance,
234  const char *name) {
235  KSyncReceiveQueue *queue;
236  queue = new KSyncReceiveQueue
237  (task_id, instance, boost::bind(&KSyncSock::ProcessKernelData, this,
238  &ctxt[instance], _1));
239  char tmp[128];
240  sprintf(tmp, "%s-%d", name, instance);
241  queue->set_name(tmp);
242  return queue;
243 }
244 
246  sock_->send_queue_.set_measure_busy_time(val);
247  for (int i = 0; i < kRxWorkQueueCount; i++) {
249  }
250 }
251 
252 void KSyncSock::Start(bool read_inline) {
253  sock_->read_inline_ = read_inline;
254  if (sock_->read_inline_) {
255  return;
256  }
257  sock_->rx_buff_ = new char[kBufLen];
258  sock_->AsyncReceive(boost::asio::buffer(sock_->rx_buff_, kBufLen),
259  boost::bind(&KSyncSock::ReadHandler, sock_.get(),
260  placeholders::error,
261  placeholders::bytes_transferred));
262 }
263 
265  assert(sock_.get() == NULL);
266  sock_.reset(sock);
267 }
268 
271  InitNetlink(sock_->nl_client_);
272 }
273 
274 uint32_t KSyncSock::WaitTreeSize() const {
275  return wait_tree_.size();
276 }
277 
278 void KSyncSock::SetSeqno(uint32_t seq) {
279  seqno_ = seq;
280  uve_seqno_ = seq;
281 }
282 
283 uint32_t KSyncSock::AllocSeqNo(IoContext::Type type, uint32_t instance) {
284  uint32_t seq;
285  if (type == IoContext::IOC_UVE) {
286  seq = uve_seqno_.fetch_and_add(1);
287  seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
288  } else {
289  seq = seqno_.fetch_and_add(1);
290  seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
291  seq |= KSYNC_DEFAULT_Q_ID_SEQ;
292  }
293  if (seq == kInvalidBulkSeqNo) {
294  return AllocSeqNo(type, instance);
295  }
296  return seq;
297 }
298 
300  return AllocSeqNo(type, 0);
301 }
302 
304  uint32_t instance) {
305  if (type == IoContext::IOC_UVE) {
306  return uve_rx_queue[instance % kRxWorkQueueCount];
307  } else {
308  return ksync_rx_queue[instance % kRxWorkQueueCount];
309  }
310 }
311 
314  if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
316  else
318 
319  uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
320  return GetReceiveQueue(type, instance);
321 }
324 }
326  assert(data.event_ != KSyncEntry::INVALID);
327  KSyncObject *object = data.entry_->GetObject();
328  object->NetlinkAck(data.entry_, data.event_);
329  return true;
330 }
332 
333  uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
334  if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
335  return &ksync_bulk_sandesh_context_[instance];
336  else
337  return &uve_bulk_sandesh_context_[instance];
338 }
339 
341  return sock_.get();
342 }
343 
345  assert(idx == 0);
346  return sock_.get();
347 }
348 
350  Validate(data);
351 
352  KSyncReceiveQueue *queue;
353  if (context) {
354  queue = GetReceiveQueue(context->io_context_type(),
355  context->work_queue_index());
356  } else {
357  queue = GetReceiveQueue(GetSeqno(data));
358  }
359  queue->Enqueue(KSyncRxData(data, context));
360  return true;
361 }
362 
363 // Read handler registered with boost::asio. Demux done based on seqno_
364 void KSyncSock::ReadHandler(const boost::system::error_code& error,
365  size_t bytes_transferred) {
366  if (error) {
367  LOG(ERROR, "Error reading from Ksync sock. Error : " <<
368  boost::system::system_error(error).what());
369  if (shutdown_ == false) {
370  assert(0);
371  }
372  return;
373  }
374 
376 
377  rx_buff_ = new char[kBufLen];
378  AsyncReceive(boost::asio::buffer(rx_buff_, kBufLen),
379  boost::bind(&KSyncSock::ReadHandler, this,
380  placeholders::error,
381  placeholders::bytes_transferred));
382 }
383 
384 // Process kernel data - executes in the task specified by IoContext
385 // Currently only Agent::KSync and Agent::Uve are possibilities
387  const KSyncRxData &data) {
388  KSyncBulkMsgContext *bulk_message_context = data.bulk_msg_context_;
389  WaitTree::iterator it;
390  if (data.bulk_msg_context_ == NULL) {
391  uint32_t seqno = GetSeqno(data.buff_);
392  {
393  tbb::mutex::scoped_lock lock(mutex_);
394  it = wait_tree_.find(seqno);
395  }
396  if (it == wait_tree_.end()) {
397  LOG(ERROR, "KSync error in finding for sequence number : "
398  << seqno);
399  assert(0);
400  }
401  bulk_message_context = &(it->second);
402  }
403 
404  bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
405  BulkDecoder(data.buff_, bulk_sandesh_context);
406  // Remove the IoContext only on last netlink message
407  if (IsMoreData(data.buff_) == false) {
408  if (data.bulk_msg_context_ != NULL) {
409  delete data.bulk_msg_context_;
410  } else {
411  tbb::mutex::scoped_lock lock(mutex_);
412  wait_tree_.erase(it);
413  }
414  }
415  delete[] data.buff_;
416  return true;
417 }
418 
420  char data[kBufLen];
421  bool ret = false;
422 
423  do {
424  Receive(boost::asio::buffer(data, kBufLen));
426  ctxt->SetErrno(0);
427  // BlockingRecv used only during Init and doesnt support bulk messages
428  // Use non-bulk version of decoder
429  Decoder(data, ctxt);
430  if (ctxt->GetErrno() != 0 && ctxt->GetErrno() != EEXIST) {
431  KSYNC_ERROR(VRouterError, "VRouter operation failed. Error <",
432  ctxt->GetErrno(), ":",
434  ">. Object <", "N/A", ">. State <", "N/A",
435  ">. Message number :", 0);
436  ret = true;
437  }
438  } while (IsMoreData(data));
439 
440  return ret;
441 }
442 
443 // BlockingSend does not support bulk messages.
444 size_t KSyncSock::BlockingSend(char *msg, int msg_len) {
445  KSyncBufferList iovec;
446  iovec.push_back(buffer(msg, msg_len));
447  bulk_buf_size_ = msg_len;
448  return SendTo(&iovec, 0);
449 }
450 
452  send_queue_.Enqueue(ioc);
453 }
454 
455 void KSyncSock::SendAsync(KSyncEntry *entry, int msg_len, char *msg,
456  KSyncEntry::KSyncEvent event) {
457  KSyncIoContext *ioc = new KSyncIoContext(this, entry, msg_len, msg, event);
458  // Pre-allocate buffers to minimize processing in KSyncTxQueue context
459  if (read_inline_ && entry->pre_alloc_rx_buffer()) {
460  ioc->rx_buffer1_ = new char [kBufLen];
461  ioc->rx_buffer2_ = new char [kBufLen];
462  } else {
463  ioc->rx_buffer1_ = ioc->rx_buffer2_ = NULL;
464  }
465  send_queue_.Enqueue(ioc);
466 }
467 
468 // Write handler registered with boost::asio
469 void KSyncSock::WriteHandler(const boost::system::error_code& error,
470  size_t bytes_transferred) {
471  if (error) {
472  LOG(ERROR, "Ksync sock write error : " <<
473  boost::system::system_error(error).what());
474  if (shutdown_ == false) {
475  assert(0);
476  }
477  }
478 }
479 
480 // End of messages in the work-queue. Send messages pending in bulk context
481 void KSyncSock::OnEmptyQueue(bool done) {
483  return;
484 
485  KSyncBulkMsgContext *bulk_message_context = NULL;
486  if (use_wait_tree_) {
487  if (read_inline_ == false) {
488  tbb::mutex::scoped_lock lock(mutex_);
489  WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
490  assert(it != wait_tree_.end());
491  bulk_message_context = &it->second;
492  } else {
493  bulk_message_context = bulk_msg_context_;
494  }
495  } else {
496  bulk_message_context = bulk_mctx_arr_[bmca_prod_];
497  }
498 
499  SendBulkMessage(bulk_message_context, bulk_seq_no_);
500 }
501 
502 // Send messages accumilated in bulk context
504  uint32_t seqno) {
505  KSyncBufferList iovec;
506  // Get all buffers to send into single io-vector
507  bulk_message_context->Data(&iovec);
508  tx_count_++;
509 
510  if (!read_inline_) {
511  if (!use_wait_tree_) {
512  bmca_prod_++;
514  bmca_prod_ = 0;
515  }
516  }
517 
518  AsyncSendTo(&iovec, seqno,
519  boost::bind(&KSyncSock::WriteHandler, this,
520  placeholders::error,
521  placeholders::bytes_transferred));
522  } else {
523  SendTo(&iovec, seqno);
524  bool more_data = false;
525  do {
526  char *rxbuf = bulk_message_context->GetReceiveBuffer();
527  Receive(boost::asio::buffer(rxbuf, kBufLen));
528  more_data = IsMoreData(rxbuf);
529  if (!process_data_inline_) {
530  ValidateAndEnqueue(rxbuf, bulk_message_context);
531  } else {
532  ProcessDataInline(rxbuf);
533  }
534  } while(more_data);
535  }
536 
537  bulk_msg_context_ = NULL;
539  return true;
540 }
541 
542 // Get the bulk-context for sequence-number
544 (uint32_t seqno, IoContext::Type io_context_type,
545  uint32_t work_queue_index) {
546  if (read_inline_) {
548  assert(bulk_msg_context_ == NULL);
549  bulk_seq_no_ = seqno;
550  bulk_buf_size_ = 0;
551  bulk_msg_count_ = 0;
552  bulk_msg_context_ = new KSyncBulkMsgContext(io_context_type,
553  work_queue_index);
554  }
555  return bulk_msg_context_;
556  }
557 
558  if (use_wait_tree_) {
559  tbb::mutex::scoped_lock lock(mutex_);
561  bulk_seq_no_ = seqno;
562  bulk_buf_size_ = 0;
563  bulk_msg_count_ = 0;
564 
565  wait_tree_.insert(WaitTreePair(seqno,
566  KSyncBulkMsgContext(io_context_type,
567  work_queue_index)));
568  }
569 
570  WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
571  assert(it != wait_tree_.end());
572  return &it->second;
573  } else {
575  bulk_seq_no_ = seqno;
576  bulk_buf_size_ = 0;
577  bulk_msg_count_ = 0;
578 
579  bulk_mctx_arr_[bmca_prod_] = new KSyncBulkMsgContext(io_context_type,
580  work_queue_index);
582  }
583 
584  return bulk_mctx_arr_[bmca_prod_];
585  }
586 
587  return NULL;
588 }
589 
590 // Try adding an io-context to bulk context. Returns
591 // - true : if message can be added to bulk context
592 // - false : if message cannot be added to bulk context
593 bool KSyncSock::TryAddToBulk(KSyncBulkMsgContext *bulk_message_context,
594  IoContext *ioc) {
596  return false;
597 
598  if (bulk_message_context->io_context_type() != ioc->type())
599  return false;
600 
601  if (use_wait_tree_) {
602  if (bulk_message_context->work_queue_index() != ioc->index())
603  return false;
604  }
605 
606  bulk_buf_size_ += ioc->GetMsgLen();
607  bulk_msg_count_++;
608 
609  bulk_message_context->Insert(ioc);
610  if (ioc->rx_buffer1()) {
611  bulk_message_context->AddReceiveBuffer(ioc->rx_buffer1());
612  ioc->reset_rx_buffer1();
613 
614  }
615  if (ioc->rx_buffer2()) {
616  bulk_message_context->AddReceiveBuffer(ioc->rx_buffer2());
617  ioc->reset_rx_buffer2();
618  }
619  return true;
620 }
621 
623  KSyncBulkMsgContext *bulk_message_context =
624  LocateBulkContext(ioc->GetSeqno(), ioc->type(), ioc->index());
625  // Try adding message to bulk-message list
626  if (TryAddToBulk(bulk_message_context, ioc)) {
627  // Message added to bulk-list. Nothing more to do
628  return true;
629  }
630 
631  // Message cannot be added to bulk-list. Send the current list
632  SendBulkMessage(bulk_message_context, bulk_seq_no_);
633 
634  // Allocate a new context and add message to it
635  bulk_message_context = LocateBulkContext(ioc->GetSeqno(), ioc->type(),
636  ioc->index());
637  assert(TryAddToBulk(bulk_message_context, ioc));
638  return true;
639 }
640 
641 
643 // KSyncSockNetlink routines
645 KSyncSockNetlink::KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
646  : sock_(ios, protocol) {
647  ReceiveBuffForceSize set_rcv_buf;
648  set_rcv_buf = KSYNC_SOCK_RECV_BUFF_SIZE;
649  boost::system::error_code ec;
650  sock_.set_option(set_rcv_buf, ec);
651  if (ec.value() != 0) {
652  LOG(ERROR, "Error Changing netlink receive sock buffer size to " <<
653  set_rcv_buf.value() << " error = " <<
654  boost::system::system_error(ec).what());
655  }
656  boost::asio::socket_base::receive_buffer_size rcv_buf_size;
657  boost::system::error_code ec1;
658  sock_.get_option(rcv_buf_size, ec);
659  LOG(INFO, "Current receive sock buffer size is " << rcv_buf_size.value());
660 }
661 
663 }
664 
665 void KSyncSockNetlink::Init(io_service &ios, int protocol, bool use_work_queue,
666  const std::string &cpu_pin_policy) {
668  KSyncSock::Init(use_work_queue, cpu_pin_policy);
669 }
670 
671 uint32_t KSyncSockNetlink::GetSeqno(char *data) {
672  return GetNetlinkSeqno(data);
673 }
674 
676  return NetlinkMsgDone(data);
677 }
678 
679 bool KSyncSockNetlink::Validate(char *data) {
680  return ValidateNetlink(data);
681 }
682 
683 //netlink socket class for interacting with kernel
684 void KSyncSockNetlink::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
685  HandlerCb cb) {
687  KSyncBufferList::iterator it = iovec->begin();
688  iovec->insert(it, buffer((char *)nl_client_->cl_buf,
689  nl_client_->cl_buf_offset));
691 
692  boost::asio::netlink::raw::endpoint ep;
693  sock_.async_send_to(*iovec, ep, cb);
694 }
695 
696 size_t KSyncSockNetlink::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
698  KSyncBufferList::iterator it = iovec->begin();
699  iovec->insert(it, buffer((char *)nl_client_->cl_buf,
700  nl_client_->cl_buf_offset));
702 
703  boost::asio::netlink::raw::endpoint ep;
704  return sock_.send_to(*iovec, ep);
705 }
706 
707 // Static method to decode non-bulk message
709  assert(ValidateNetlink(data));
710  char *buf = NULL;
711  uint32_t buf_len = 0;
712  GetNetlinkPayload(data, &buf, &buf_len);
713  DecodeSandeshMessages(buf, buf_len, ctxt, NLA_ALIGNTO);
714 }
715 
716 bool KSyncSockNetlink::Decoder(char *data, AgentSandeshContext *context) {
717  NetlinkDecoder(data, context);
718  return true;
719 }
720 
721 // Static method used in ksync_sock_user only
723  bool more) {
724  assert(ValidateNetlink(data));
725  char *buf = NULL;
726  uint32_t buf_len = 0;
727  GetNetlinkPayload(data, &buf, &buf_len);
728  KSyncBulkSandeshContext *bulk_sandesh_context =
729  dynamic_cast<KSyncBulkSandeshContext *>(ctxt);
730  bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, more);
731 }
732 
734  KSyncBulkSandeshContext *bulk_sandesh_context) {
735  // Get sandesh buffer and buffer-length
736  uint32_t buf_len = 0;
737  char *buf = NULL;
738  GetNetlinkPayload(data, &buf, &buf_len);
739  return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
740 }
741 
742 void KSyncSockNetlink::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
743  sock_.async_receive(buf, cb);
744 }
745 
746 void KSyncSockNetlink::Receive(mutable_buffers_1 buf) {
747  sock_.receive(buf);
748  struct nlmsghdr *nlh = buffer_cast<struct nlmsghdr *>(buf);
749  if (nlh->nlmsg_type == NLMSG_ERROR) {
750  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
751  << " len " << nlh->nlmsg_len);
752  assert(0);
753  }
754 }
755 
757 // KSyncSockUdp routines
759 //Udp socket class for interacting with kernel
760 KSyncSockUdp::KSyncSockUdp(boost::asio::io_context &ios, int port) :
761  sock_(ios, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
762  server_ep_(boost::asio::ip::address::from_string("127.0.0.1"), port) {
763 }
764 
765 void KSyncSockUdp::Init(io_service &ios, int port,
766  const std::string &cpu_pin_policy) {
768  KSyncSock::Init(false, cpu_pin_policy);
769 }
770 
771 uint32_t KSyncSockUdp::GetSeqno(char *data) {
772  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
773  return hdr->seq_no;
774 }
775 
776 bool KSyncSockUdp::IsMoreData(char *data) {
777  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
778  return ((hdr->flags & UVR_MORE) == UVR_MORE);
779 }
780 
781 // We dont expect any non-bulk operation on UDP
782 bool KSyncSockUdp::Decoder(char *data, AgentSandeshContext *context) {
783  assert(0);
784  return false;
785 }
786 
788  KSyncBulkSandeshContext *bulk_sandesh_context) {
789  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
790  uint32_t buf_len = hdr->msg_len;
791  char *buf = data + sizeof(struct uvr_msg_hdr);
792  return bulk_sandesh_context->Decoder(buf, buf_len, 1, IsMoreData(data));
793 }
794 
795 void KSyncSockUdp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
796  HandlerCb cb) {
797  struct uvr_msg_hdr hdr;
798  hdr.seq_no = seq_no;
799  hdr.flags = 0;
800  hdr.msg_len = bulk_buf_size_;
801 
802  KSyncBufferList::iterator it = iovec->begin();
803  iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
804 
805  sock_.async_send_to(*iovec, server_ep_, cb);
806 }
807 
808 size_t KSyncSockUdp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
809  struct uvr_msg_hdr hdr;
810  hdr.seq_no = seq_no;
811  hdr.flags = 0;
812  hdr.msg_len = bulk_buf_size_;
813 
814  KSyncBufferList::iterator it = iovec->begin();
815  iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
816 
817  return sock_.send_to(*iovec, server_ep_, MSG_DONTWAIT);
818 }
819 
820 bool KSyncSockUdp::Validate(char *data) {
821  return true;
822 }
823 
824 void KSyncSockUdp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
825  boost::asio::ip::udp::endpoint ep;
826  sock_.async_receive_from(buf, ep, cb);
827 }
828 
829 void KSyncSockUdp::Receive(mutable_buffers_1 buf) {
830  boost::asio::ip::udp::endpoint ep;
831  sock_.receive_from(buf, ep);
832 }
833 
835  KSyncBulkMsgContext *bulk_message_context = NULL;
836  KSyncBulkSandeshContext *bulk_sandesh_context;
837  uint32_t seqno = GetSeqno(data);
838 
839  assert(!use_wait_tree_);
840  Validate(data);
841 
842  bulk_sandesh_context = GetBulkSandeshContext(seqno);
843  bulk_message_context = bulk_mctx_arr_[bmca_cons_];
844  assert(bulk_message_context->seqno() == seqno);
845 
846  bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
847  BulkDecoder(data, bulk_sandesh_context);
848 
849  // Remove the IoContext only on last netlink message
850  if (IsMoreData(data) == false) {
851  delete bulk_message_context;
852  bmca_cons_++;
854  bmca_cons_ = 0;
855  }
856  }
857 
858  return;
859 }
860 
862 // KSyncIoContext routines
865  int msg_len, char *msg,
866  KSyncEntry::KSyncEvent event) :
867  IoContext(msg, msg_len, 0,
868  sock->GetAgentSandeshContext(sync_entry->GetTableIndex()),
869  IoContext::IOC_KSYNC, sync_entry->GetTableIndex()),
870  entry_(sync_entry), event_(event), sock_(sock) {
871  SetSeqno(sock->AllocSeqNo(type(), index()));
872 }
873 
876 }
877 
880 }
881 
883 // Routines for KSyncBulkSandeshContext
886  AgentSandeshContext(), bulk_msg_context_(NULL) { }
887 
889 }
890 
891 // Sandesh responses for old context are done. Check for any errors
894  AgentSandeshContext *sandesh_context = io_context->GetSandeshContext();
895 
896  sandesh_context->set_ksync_io_ctx(NULL);
897  if (sandesh_context->GetErrno() != 0 &&
898  sandesh_context->GetErrno() != EEXIST) {
899  io_context->ErrorHandler(sandesh_context->GetErrno());
900  }
901  io_context->Handler();
902 }
903 
907  AgentSandeshContext *sandesh_context = io_context.GetSandeshContext();
908  sandesh_context->set_ksync_io_ctx
909  (static_cast<KSyncIoContext *>(&io_context));
910 }
911 
912 // Process the sandesh messages
913 // There can be more then one sandesh messages in the netlink buffer.
914 // Iterate and process all of them
915 bool KSyncBulkSandeshContext::Decoder(char *data, uint32_t len,
916  uint32_t alignment, bool more) {
917  DecodeSandeshMessages(data, len, this, alignment);
920  if (more == true)
921  return false;
922 
923  IoContextDone();
924 
925  // No more netlink messages. Validate that iterator points to last element
926  // in IoContextList
930  return true;
931 }
932 
935  context->SetErrno(err);
936 }
937 
940  return bulk_msg_context_->io_context_list_it_->GetSandeshContext();
941 }
942 
943 void KSyncBulkSandeshContext::IfMsgHandler(vr_interface_req *req) {
945  context->IfMsgHandler(req);
946 }
947 
948 void KSyncBulkSandeshContext::NHMsgHandler(vr_nexthop_req *req) {
950  context->NHMsgHandler(req);
951 }
952 
955  context->RouteMsgHandler(req);
956 }
957 
960  context->MplsMsgHandler(req);
961 }
962 
965  context->QosConfigMsgHandler(req);
966 }
967 
970  context->ForwardingClassMsgHandler(req);
971 }
972 
973 // vr_response message is treated as delimiter in a bulk-context. So, move to
974 // next io-context within bulk-message context.
976  AgentSandeshContext *sandesh_context = NULL;
977  // If this is first vr_reponse received, move io-context to first entry in
978  // bulk context
982  sandesh_context =
983  bulk_msg_context_->io_context_list_it_->GetSandeshContext();
984  IoContextStart();
985  } else {
986  // Sandesh responses for old io-context are done.
987  // Check for any errors and trigger state-machine for old io-context
988  IoContextDone();
989  // Move to the next io-context
993  sandesh_context =
994  bulk_msg_context_->io_context_list_it_->GetSandeshContext();
995  IoContextStart();
996  }
997  return sandesh_context->VrResponseMsgHandler(resp);
998 }
999 
1002  context->MirrorMsgHandler(req);
1003 }
1004 
1007  context->FlowMsgHandler(req);
1008 }
1009 
1012  context->FlowResponseHandler(req);
1013 }
1014 
1015 void KSyncBulkSandeshContext::VrfAssignMsgHandler(vr_vrf_assign_req *req) {
1017  context->VrfAssignMsgHandler(req);
1018 }
1019 
1022  context->VrfMsgHandler(req);
1023 }
1024 
1025 void KSyncBulkSandeshContext::VrfStatsMsgHandler(vr_vrf_stats_req *req) {
1027  context->VrfStatsMsgHandler(req);
1028 }
1029 
1030 void KSyncBulkSandeshContext::DropStatsMsgHandler(vr_drop_stats_req *req) {
1032  context->DropStatsMsgHandler(req);
1033 }
1034 
1037  context->VxLanMsgHandler(req);
1038 }
1039 
1042  context->VrouterOpsMsgHandler(req);
1043 }
1044 
1046 // KSyncBulkMsgContext routines
1049  uint32_t index) :
1050  io_context_list_(), io_context_type_(type), work_queue_index_(index),
1051  rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1052 }
1053 
1055  io_context_list_(), io_context_type_(rhs.io_context_type_),
1056  work_queue_index_(rhs.work_queue_index_),
1057  rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1058  assert(rhs.vr_response_count_ == 0);
1059  assert(rhs.rx_buffer_index_ == 0);
1060  assert(rhs.io_context_list_.size() == 0);
1061 }
1062 
1064  void operator() (IoContext *io_context) { delete io_context; }
1065 };
1066 
1068  assert(vr_response_count_ == io_context_list_.size());
1069  io_context_list_.clear_and_dispose(IoContextDisposer());
1070  for (uint32_t i = 0; i < rx_buffer_index_; i++) {
1071  delete[] rx_buffers_[i];
1072  }
1073 }
1074 
1076  if (rx_buffer_index_ == 0)
1077  return new char[KSyncSock::kBufLen];
1078 
1079  return rx_buffers_[--rx_buffer_index_];
1080 }
1081 
1084  rx_buffers_[rx_buffer_index_++] = buff;
1085 }
1086 
1088  io_context_list_.push_back(*ioc);
1089  return;
1090 }
1091 
1093  IoContextList::iterator it = io_context_list_.begin();
1094  while (it != io_context_list_.end()) {
1095  iovec->push_back(buffer(it->GetMsg(), it->GetMsgLen()));
1096  it++;
1097  }
1098 }
void set_ksync_io_ctx(const KSyncIoContext *ioc)
Definition: ksync_sock.h:81
virtual void MplsMsgHandler(vr_mpls_req *req)=0
virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req)=0
int GetErrno() const
Definition: ksync_sock.h:80
virtual void VrfMsgHandler(vr_vrf_req *req)=0
virtual void MirrorMsgHandler(vr_mirror_req *req)=0
virtual void NHMsgHandler(vr_nexthop_req *req)=0
virtual void QosConfigMsgHandler(vr_qos_map_req *req)=0
virtual void VrouterOpsMsgHandler(vrouter_ops *req)=0
virtual void SetErrno(int err)
Definition: ksync_sock.h:78
virtual void FlowResponseHandler(vr_flow_response *req)
Definition: ksync_sock.h:66
virtual void IfMsgHandler(vr_interface_req *req)=0
virtual void FlowMsgHandler(vr_flow_req *req)=0
virtual void ForwardingClassMsgHandler(vr_fc_map_req *req)=0
virtual void RouteMsgHandler(vr_route_req *req)=0
virtual void DropStatsMsgHandler(vr_drop_stats_req *req)=0
virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req)=0
virtual void VxLanMsgHandler(vr_vxlan_req *req)=0
virtual int VrResponseMsgHandler(vr_response *resp)=0
AgentSandeshContext * GetSandeshContext()
Definition: ksync_sock.h:134
char * rx_buffer1() const
Definition: ksync_sock.h:141
virtual void ErrorHandler(int err)
Definition: ksync_sock.h:132
uint32_t index() const
Definition: ksync_sock.h:145
char * rx_buffer1_
Definition: ksync_sock.h:161
static const char * io_wq_names[MAX_WORK_QUEUES]
Definition: ksync_sock.h:104
virtual void Handler()
Definition: ksync_sock.h:131
@ MAX_WORK_QUEUES
Definition: ksync_sock.h:101
void reset_rx_buffer2()
Definition: ksync_sock.h:144
uint32_t GetSeqno() const
Definition: ksync_sock.h:138
uint32_t GetMsgLen() const
Definition: ksync_sock.h:140
char * rx_buffer2()
Definition: ksync_sock.h:143
void reset_rx_buffer1()
Definition: ksync_sock.h:142
Type type()
Definition: ksync_sock.h:135
void SetSeqno(uint32_t seqno)
Definition: ksync_sock.h:137
char * rx_buffer2_
Definition: ksync_sock.h:162
void set_seqno(uint32_t seq)
Definition: ksync_sock.h:246
uint32_t rx_buffer_index_
Definition: ksync_sock.h:262
KSyncBulkMsgContext(IoContext::Type type, uint32_t index)
Definition: ksync_sock.cc:1048
void Insert(IoContext *ioc)
Definition: ksync_sock.cc:1087
void Data(KSyncBufferList *iovec)
Definition: ksync_sock.cc:1092
char * GetReceiveBuffer()
Definition: ksync_sock.cc:1075
char * rx_buffers_[kMaxRxBufferCount]
Definition: ksync_sock.h:260
uint32_t vr_response_count_
Definition: ksync_sock.h:268
static const unsigned kMaxRxBufferCount
Definition: ksync_sock.h:233
IoContextList::iterator io_context_list_it_
Definition: ksync_sock.h:270
IoContextList io_context_list_
Definition: ksync_sock.h:251
IoContext::Type io_context_type() const
Definition: ksync_sock.h:240
uint32_t work_queue_index() const
Definition: ksync_sock.h:245
uint32_t seqno()
Definition: ksync_sock.h:247
void AddReceiveBuffer(char *buff)
Definition: ksync_sock.cc:1082
void set_bulk_message_context(KSyncBulkMsgContext *bulk_context)
Definition: ksync_sock.h:299
void VxLanMsgHandler(vr_vxlan_req *req)
Definition: ksync_sock.cc:1035
void IfMsgHandler(vr_interface_req *req)
Definition: ksync_sock.cc:943
void DropStatsMsgHandler(vr_drop_stats_req *req)
Definition: ksync_sock.cc:1030
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
Definition: ksync_sock.cc:915
void ForwardingClassMsgHandler(vr_fc_map_req *req)
Definition: ksync_sock.cc:968
void VrfAssignMsgHandler(vr_vrf_assign_req *req)
Definition: ksync_sock.cc:1015
void NHMsgHandler(vr_nexthop_req *req)
Definition: ksync_sock.cc:948
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:307
void FlowResponseHandler(vr_flow_response *req)
Definition: ksync_sock.cc:1010
virtual ~KSyncBulkSandeshContext()
Definition: ksync_sock.cc:888
void VrfStatsMsgHandler(vr_vrf_stats_req *req)
Definition: ksync_sock.cc:1025
void VrouterOpsMsgHandler(vrouter_ops *req)
Definition: ksync_sock.cc:1040
void RouteMsgHandler(vr_route_req *req)
Definition: ksync_sock.cc:953
void QosConfigMsgHandler(vr_qos_map_req *req)
Definition: ksync_sock.cc:963
void SetErrno(int err)
Definition: ksync_sock.cc:933
int VrResponseMsgHandler(vr_response *resp)
Definition: ksync_sock.cc:975
void MplsMsgHandler(vr_mpls_req *req)
Definition: ksync_sock.cc:958
AgentSandeshContext * GetSandeshContext()
Definition: ksync_sock.cc:938
void VrfMsgHandler(vr_vrf_req *req)
Definition: ksync_sock.cc:1020
void MirrorMsgHandler(vr_mirror_req *req)
Definition: ksync_sock.cc:1000
void FlowMsgHandler(vr_flow_req *req)
Definition: ksync_sock.cc:1005
virtual KSyncObject * GetObject() const =0
virtual bool pre_alloc_rx_buffer() const
Definition: ksync_entry.h:149
virtual void ErrorHandler(int err, uint32_t seqno, KSyncEvent event) const
static std::string VrouterErrorToString(uint32_t error)
KSyncEntry * entry_
Definition: ksync_sock.h:181
void ErrorHandler(int err)
Definition: ksync_sock.cc:878
KSyncSock * sock_
Definition: ksync_sock.h:184
KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:864
virtual void Handler()
Definition: ksync_sock.cc:874
KSyncEntry::KSyncEvent event_
Definition: ksync_sock.h:182
virtual void NetlinkAck(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
virtual void Receive(boost::asio::mutable_buffers_1)
Definition: ksync_sock.cc:829
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
Definition: ksync_sock.cc:824
static void Init(boost::asio::io_context &ios, int port, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:765
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
Definition: ksync_sock.cc:787
KSyncSockUdp(boost::asio::io_context &ios, int port)
Definition: ksync_sock.cc:760
virtual bool IsMoreData(char *data)
Definition: ksync_sock.cc:776
boost::asio::ip::udp::endpoint server_ep_
Definition: ksync_sock.h:561
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
Definition: ksync_sock.cc:795
virtual bool Validate(char *data)
Definition: ksync_sock.cc:820
boost::asio::ip::udp::socket sock_
Definition: ksync_sock.h:560
virtual uint32_t GetSeqno(char *data)
Definition: ksync_sock.cc:771
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
Definition: ksync_sock.cc:808
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
Definition: ksync_sock.cc:782
WorkQueue< KSyncRxData > KSyncReceiveQueue
Definition: ksync_sock.h:343
static std::unique_ptr< KSyncSock > sock_
Definition: ksync_sock.h:502
static AgentSandeshContext * agent_sandesh_ctx_[kRxWorkQueueCount]
Definition: ksync_sock.h:509
uint32_t bmca_prod_
Definition: ksync_sock.h:448
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
Definition: ksync_sock.h:394
static pid_t pid_
Definition: ksync_sock.h:503
static const unsigned kInvalidBulkSeqNo
Definition: ksync_sock.h:321
std::size_t BlockingSend(char *msg, int msg_len)
Definition: ksync_sock.cc:444
KSyncReceiveQueue * ksync_rx_queue[kRxWorkQueueCount]
Definition: ksync_sock.h:431
bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context, const KSyncRxData &data)
Definition: ksync_sock.cc:386
static uint32_t GetPid()
Definition: ksync_sock.h:390
virtual bool Validate(char *data)=0
void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:322
KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount]
Definition: ksync_sock.h:490
tbb::mutex mutex_
Definition: ksync_sock.h:425
virtual uint32_t GetSeqno(char *data)=0
virtual void Receive(boost::asio::mutable_buffers_1)=0
std::pair< uint32_t, KSyncBulkMsgContext > WaitTreePair
Definition: ksync_sock.h:324
uint32_t bulk_seq_no_
Definition: ksync_sock.h:442
KSyncBulkMsgContext * LocateBulkContext(uint32_t seqno, IoContext::Type io_context_type, uint32_t work_queue_index)
Definition: ksync_sock.cc:544
void OnEmptyQueue(bool done)
Definition: ksync_sock.cc:481
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)=0
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:226
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)=0
void ProcessDataInline(char *data)
Definition: ksync_sock.cc:834
bool read_inline_
Definition: ksync_sock.h:486
static void Shutdown()
Definition: ksync_sock.cc:220
void ReadHandler(const boost::system::error_code &error, size_t bytes_transferred)
Definition: ksync_sock.cc:364
bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context)
Definition: ksync_sock.cc:349
void GenericSend(IoContext *ctx)
Definition: ksync_sock.cc:451
int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno)
Definition: ksync_sock.cc:503
tbb::atomic< uint32_t > seqno_
Definition: ksync_sock.h:482
bool SendAsyncImpl(IoContext *ioc)
Definition: ksync_sock.cc:622
static void Start(bool read_inline)
Definition: ksync_sock.cc:252
uint32_t WaitTreeSize() const
Definition: ksync_sock.cc:274
void SendAsync(KSyncEntry *entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:455
uint32_t bulk_buf_size_
Definition: ksync_sock.h:444
static const int kRxWorkQueueCount
Definition: ksync_sock.h:314
virtual ~KSyncSock()
Definition: ksync_sock.cc:198
KSyncReceiveQueue * AllocQueue(KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance, const char *name)
Definition: ksync_sock.cc:233
bool BlockingRecv()
Definition: ksync_sock.cc:419
static void SetSockTableEntry(KSyncSock *sock)
Definition: ksync_sock.cc:264
KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount]
Definition: ksync_sock.h:491
KSyncRxWorkQueue rx_process_queue_
Definition: ksync_sock.h:501
static void SetNetlinkFamilyId(int id)
Definition: ksync_sock.cc:269
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
Definition: ksync_sock.h:326
static const unsigned kBufLen
Definition: ksync_sock.h:316
static tbb::atomic< bool > shutdown_
Definition: ksync_sock.h:510
void WriteHandler(const boost::system::error_code &error, size_t bytes_transferred)
Definition: ksync_sock.cc:469
void SetMeasureQueueDelay(bool val)
Definition: ksync_sock.cc:245
bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc)
Definition: ksync_sock.cc:593
char * rx_buff_
Definition: ksync_sock.h:481
KSyncReceiveQueue * GetReceiveQueue(IoContext::Type type, uint32_t instance)
Definition: ksync_sock.cc:303
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)=0
uint32_t AllocSeqNo(IoContext::Type type)
Definition: ksync_sock.cc:299
WaitTree wait_tree_
Definition: ksync_sock.h:428
uint32_t bulk_msg_count_
Definition: ksync_sock.h:446
nl_client * nl_client_
Definition: ksync_sock.h:426
void SetSeqno(uint32_t seq)
Definition: ksync_sock.cc:278
KSyncTxQueue send_queue_
Definition: ksync_sock.h:429
static KSyncSock * Get(DBTablePartBase *partition)
Definition: ksync_sock.cc:340
bool use_wait_tree_
Definition: ksync_sock.h:488
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)=0
KSyncReceiveQueue * uve_rx_queue[kRxWorkQueueCount]
Definition: ksync_sock.h:430
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)=0
uint32_t max_bulk_msg_count_
Definition: ksync_sock.h:436
KSyncBulkSandeshContext * GetBulkSandeshContext(uint32_t seqno)
Definition: ksync_sock.cc:331
tbb::atomic< uint32_t > uve_seqno_
Definition: ksync_sock.h:483
KSyncBulkMsgContext * bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE]
Definition: ksync_sock.h:450
static int vnsw_netlink_family_id_
Definition: ksync_sock.h:504
int tx_count_
Definition: ksync_sock.h:494
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:487
bool ProcessRxData(KSyncRxQueueData data)
Definition: ksync_sock.cc:325
uint32_t bmca_cons_
Definition: ksync_sock.h:449
virtual bool IsMoreData(char *data)=0
bool process_data_inline_
Definition: ksync_sock.h:489
static int GetNetlinkFamilyId()
Definition: ksync_sock.h:391
bool Enqueue(IoContext *io_context)
static int32_t ReceiveBinaryMsgOne(u_int8_t *buf, u_int32_t buf_len, int *error, SandeshContext *client_context)
Definition: sandesh.cc:642
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
int GetTaskId(const std::string &name)
Definition: task.cc:856
static TaskScheduler * GetInstance()
Definition: task.cc:547
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
void set_name(const std::string &name)
Definition: queue_task.h:307
#define KSYNC_ERROR(obj,...)
Definition: ksync_entry.h:16
bool NetlinkMsgDone(char *data)
Definition: ksync_sock.cc:62
bool ValidateNetlink(char *data)
Definition: ksync_sock.cc:68
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
Definition: ksync_sock.cc:113
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
Definition: ksync_sock.cc:140
boost::asio::detail::socket_option::integer< SOL_SOCKET, SO_RCVBUFFORCE > ReceiveBuffForceSize
Definition: ksync_sock.cc:40
void InitNetlink(nl_client *client)
Definition: ksync_sock.cc:126
uint32_t GetNetlinkSeqno(char *data)
Definition: ksync_sock.cc:57
void ResetNetlink(nl_client *client)
Definition: ksync_sock.cc:133
void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context, uint32_t alignment)
Definition: ksync_sock.cc:147
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
Definition: ksync_sock.h:37
#define KSYNC_BMC_ARR_SIZE
Definition: ksync_sock.h:31
#define KSYNC_DEFAULT_Q_ID_SEQ
Definition: ksync_sock.h:28
#define KSYNC_SOCK_RECV_BUFF_SIZE
Definition: ksync_sock.h:30
uint8_t type
Definition: load_balance.h:2
#define LOG(_Level, _Msg)
Definition: logging.h:33
void operator()(IoContext *io_context)
Definition: ksync_sock.cc:1064
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:333
KSyncEntry::KSyncEvent event_
Definition: ksync_sock.h:347