OpenSDN source code
ksync_sock.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <atomic>
6 #include <string>
7 
8 #include "base/os.h"
9 
10 #if defined(__linux__)
11 #include <asm/types.h>
12 #include <linux/netlink.h>
13 #include <linux/rtnetlink.h>
14 #include <linux/genetlink.h>
15 #include <linux/sockios.h>
16 #endif
17 
18 #include <sys/socket.h>
19 
20 #include <boost/bind/bind.hpp>
21 
22 #include <base/logging.h>
23 #include <db/db.h>
24 #include <db/db_entry.h>
25 #include <db/db_table.h>
26 #include <db/db_table_partition.h>
27 
28 #include "ksync_index.h"
29 #include "ksync_entry.h"
30 #include "ksync_object.h"
31 #include "ksync_sock.h"
32 #include "ksync_sock_user.h"
33 #include "ksync_types.h"
34 
35 #include "nl_util.h"
36 #include "udp_util.h"
37 #include "vr_genetlink.h"
38 #include "vr_types.h"
39 
40 using namespace boost::asio;
41 using namespace boost::placeholders;
42 
43 /* Note SO_RCVBUFFORCE is supported only for linux version 2.6.14 and above */
44 typedef boost::asio::detail::socket_option::integer<SOL_SOCKET,
45  SO_RCVBUFFORCE> ReceiveBuffForceSize;
46 
49 std::unique_ptr<KSyncSock> KSyncSock::sock_;
50 pid_t KSyncSock::pid_;
51 std::atomic<bool> KSyncSock::shutdown_;
52 
53 // Name of task used in KSync Response work-queues
55  {
56  "Agent::Uve",
57  "Agent::KSync"
58  };
60 // Netlink utilities
62 uint32_t GetNetlinkSeqno(char *data) {
63  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
64  return nlh->nlmsg_seq;
65 }
66 
67 bool NetlinkMsgDone(char *data) {
68  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
69  return ((nlh->nlmsg_flags & NLM_F_MULTI) != 0);
70 }
71 
72 // Common validation for netlink messages
73 bool ValidateNetlink(char *data) {
74  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
75  if (nlh->nlmsg_type == NLMSG_ERROR) {
76  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq << " len "
77  << nlh->nlmsg_len);
78  assert(0);
79  return false;
80  }
81 
82  if (nlh->nlmsg_len > KSyncSock::kBufLen) {
83  LOG(ERROR, "Length of " << nlh->nlmsg_len << " is more than expected "
84  "length of " << KSyncSock::kBufLen);
85  assert(0);
86  return false;
87  }
88 
89  if (nlh->nlmsg_type == NLMSG_DONE) {
90  return true;
91  }
92 
93  // Sanity checks for generic-netlink message
94  if (nlh->nlmsg_type != KSyncSock::GetNetlinkFamilyId()) {
95  LOG(ERROR, "Netlink unknown message type : " << nlh->nlmsg_type);
96  assert(0);
97  return false;
98  }
99 
100  struct genlmsghdr *genlh = (struct genlmsghdr *) (data + NLMSG_HDRLEN);
101  if (genlh->cmd != SANDESH_REQUEST) {
102  LOG(ERROR, "Unknown generic netlink cmd : " << genlh->cmd);
103  assert(0);
104  return false;
105  }
106 
107  struct nlattr * attr = (struct nlattr *)(data + NLMSG_HDRLEN
108  + GENL_HDRLEN);
109  if (attr->nla_type != NL_ATTR_VR_MESSAGE_PROTOCOL) {
110  LOG(ERROR, "Unknown generic netlink TLV type : " << attr->nla_type);
111  assert(0);
112  return false;
113  }
114 
115  return true;
116 }
117 
118 void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len) {
119  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
120  int len = 0;
121  if (nlh->nlmsg_type == NLMSG_DONE) {
122  len = NLMSG_HDRLEN;
123  } else {
124  len = NLMSG_HDRLEN + GENL_HDRLEN + NLA_HDRLEN;
125  }
126 
127  *buf = data + len;
128  *buf_len = nlh->nlmsg_len - len;
129 }
130 
131 void InitNetlink(nl_client *client) {
132  nl_init_generic_client_req(client, KSyncSock::GetNetlinkFamilyId());
133  unsigned char *nl_buf;
134  uint32_t nl_buf_len;
135  assert(nl_build_header(client, &nl_buf, &nl_buf_len) >= 0);
136 }
137 
138 void ResetNetlink(nl_client *client) {
139  unsigned char *nl_buf;
140  uint32_t nl_buf_len;
141  client->cl_buf_offset = 0;
142  nl_build_header(client, &nl_buf, &nl_buf_len);
143 }
144 
145 void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no) {
146  nl_update_header(client, len);
147  struct nlmsghdr *nlh = (struct nlmsghdr *)client->cl_buf;
148  nlh->nlmsg_pid = KSyncSock::GetPid();
149  nlh->nlmsg_seq = seq_no;
150 }
151 
152 void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context,
153  uint32_t alignment) {
154  while (buf_len > (alignment - 1)) {
155  int error;
156  int decode_len = Sandesh::ReceiveBinaryMsgOne((uint8_t *)buf, buf_len,
157  &error, sandesh_context);
158  if (decode_len < 0) {
159  LOG(DEBUG, "Incorrect decode len " << decode_len);
160  break;
161  }
162  buf += decode_len;
163  buf_len -= decode_len;
164  }
165 }
166 
168 // KSyncSock routines
171  nl_client_(NULL), wait_tree_(), send_queue_(this),
172  max_bulk_msg_count_(kMaxBulkMsgCount),
173  bulk_seq_no_(kInvalidBulkSeqNo), bulk_buf_size_(0), bulk_msg_count_(0),
174  rx_buff_(NULL), read_inline_(true), bulk_msg_context_(NULL),
175  use_wait_tree_(true), process_data_inline_(false),
176  ksync_bulk_sandesh_context_(), uve_bulk_sandesh_context_(),
177  tx_count_(0), ack_count_(0), err_count_(0),
178  rx_process_queue_(TaskScheduler::GetInstance()->GetTaskId("Agent::KSync"), 0,
179  boost::bind(&KSyncSock::ProcessRxData, this, _1)) {
181 
182  uint32_t uve_task_id =
184  uint32_t ksync_task_id =
186  for(uint32_t i = 0; i < kRxWorkQueueCount; i++) {
188  ksync_task_id, i, "KSync Receive Queue");
190  uve_task_id, i, "KSync UVE Receive Queue");
191  }
192 
193  nl_client_ = (nl_client *)malloc(sizeof(nl_client));
194  memset(nl_client_, 0, sizeof(nl_client));
195  rx_buff_ = NULL;
196  seqno_ = 0;
197  uve_seqno_ = 0;
198 
199  memset(bulk_mctx_arr_, 0, sizeof(bulk_mctx_arr_));
200  bmca_prod_ = bmca_cons_ = 0;
201 }
202 
204  assert(wait_tree_.size() == 0);
205 
206  if (rx_buff_) {
207  delete [] rx_buff_;
208  rx_buff_ = NULL;
209  }
210 
211  for(int i = 0; i < kRxWorkQueueCount; i++) {
212  ksync_rx_queue[i]->Shutdown();
213  delete ksync_rx_queue[i];
214 
215  uve_rx_queue[i]->Shutdown();
216  delete uve_rx_queue[i];
217  }
218 
219  if (nl_client_->cl_buf) {
220  free(nl_client_->cl_buf);
221  }
222  free(nl_client_);
223 }
224 
226  shutdown_ = true;
227  sock_->send_queue_.Shutdown();
228  sock_.release();
229 }
230 
231 void KSyncSock::Init(bool use_work_queue, const std::string &cpu_pin_policy) {
232  sock_->send_queue_.Init(use_work_queue, cpu_pin_policy);
233  pid_ = getpid();
234  shutdown_ = false;
235 }
236 
238 (KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance,
239  const char *name) {
240  KSyncReceiveQueue *queue;
241  queue = new KSyncReceiveQueue
242  (task_id, instance, boost::bind(&KSyncSock::ProcessKernelData, this,
243  &ctxt[instance], _1));
244  char tmp[128];
245  sprintf(tmp, "%s-%d", name, instance);
246  queue->set_name(tmp);
247  return queue;
248 }
249 
251  sock_->send_queue_.set_measure_busy_time(val);
252  for (int i = 0; i < kRxWorkQueueCount; i++) {
254  }
255 }
256 
257 void KSyncSock::Start(bool read_inline) {
258  sock_->read_inline_ = read_inline;
259  if (sock_->read_inline_) {
260  return;
261  }
262  sock_->rx_buff_ = new char[kBufLen];
263  sock_->AsyncReceive(boost::asio::buffer(sock_->rx_buff_, kBufLen),
264  boost::bind(&KSyncSock::ReadHandler, sock_.get(),
265  placeholders::error,
266  placeholders::bytes_transferred));
267 }
268 
270  assert(sock_.get() == NULL);
271  sock_.reset(sock);
272 }
273 
276  InitNetlink(sock_->nl_client_);
277 }
278 
279 uint32_t KSyncSock::WaitTreeSize() const {
280  return wait_tree_.size();
281 }
282 
283 void KSyncSock::SetSeqno(uint32_t seq) {
284  seqno_ = seq;
285  uve_seqno_ = seq;
286 }
287 
288 uint32_t KSyncSock::AllocSeqNo(IoContext::Type type, uint32_t instance) {
289  uint32_t seq;
290  if (type == IoContext::IOC_UVE) {
291  seq = uve_seqno_.fetch_add(1);
292  seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
293  } else {
294  seq = seqno_.fetch_add(1);
295  seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
296  seq |= KSYNC_DEFAULT_Q_ID_SEQ;
297  }
298  if (seq == kInvalidBulkSeqNo) {
299  return AllocSeqNo(type, instance);
300  }
301  return seq;
302 }
303 
305  return AllocSeqNo(type, 0);
306 }
307 
309  uint32_t instance) {
310  if (type == IoContext::IOC_UVE) {
311  return uve_rx_queue[instance % kRxWorkQueueCount];
312  } else {
313  return ksync_rx_queue[instance % kRxWorkQueueCount];
314  }
315 }
316 
319  if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
321  else
323 
324  uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
325  return GetReceiveQueue(type, instance);
326 }
329 }
331  assert(data.event_ != KSyncEntry::INVALID);
332  KSyncObject *object = data.entry_->GetObject();
333  object->NetlinkAck(data.entry_, data.event_);
334  return true;
335 }
337 
338  uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
339  if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
340  return &ksync_bulk_sandesh_context_[instance];
341  else
342  return &uve_bulk_sandesh_context_[instance];
343 }
344 
346  return sock_.get();
347 }
348 
350  assert(idx == 0);
351  return sock_.get();
352 }
353 
355  Validate(data);
356 
357  KSyncReceiveQueue *queue;
358  if (context) {
359  queue = GetReceiveQueue(context->io_context_type(),
360  context->work_queue_index());
361  } else {
362  queue = GetReceiveQueue(GetSeqno(data));
363  }
364  queue->Enqueue(KSyncRxData(data, context));
365  return true;
366 }
367 
368 // Read handler registered with boost::asio. Demux done based on seqno_
369 void KSyncSock::ReadHandler(const boost::system::error_code& error,
370  size_t bytes_transferred) {
371  if (error) {
372  LOG(ERROR, "Error reading from Ksync sock. Error : " <<
373  boost::system::system_error(error).what());
374  if (shutdown_ == false) {
375  assert(0);
376  }
377  return;
378  }
379 
381 
382  rx_buff_ = new char[kBufLen];
383  AsyncReceive(boost::asio::buffer(rx_buff_, kBufLen),
384  boost::bind(&KSyncSock::ReadHandler, this,
385  placeholders::error,
386  placeholders::bytes_transferred));
387 }
388 
389 // Process kernel data - executes in the task specified by IoContext
390 // Currently only Agent::KSync and Agent::Uve are possibilities
392  const KSyncRxData &data) {
393  KSyncBulkMsgContext *bulk_message_context = data.bulk_msg_context_;
394  WaitTree::iterator it;
395  if (data.bulk_msg_context_ == NULL) {
396  uint32_t seqno = GetSeqno(data.buff_);
397  {
398  std::scoped_lock lock(mutex_);
399  it = wait_tree_.find(seqno);
400  }
401  if (it == wait_tree_.end()) {
402  LOG(ERROR, "KSync error in finding for sequence number : "
403  << seqno);
404  assert(0);
405  }
406  bulk_message_context = &(it->second);
407  }
408 
409  bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
410  BulkDecoder(data.buff_, bulk_sandesh_context);
411  // Remove the IoContext only on last netlink message
412  if (IsMoreData(data.buff_) == false) {
413  if (data.bulk_msg_context_ != NULL) {
414  delete data.bulk_msg_context_;
415  } else {
416  std::scoped_lock lock(mutex_);
417  wait_tree_.erase(it);
418  }
419  }
420  delete[] data.buff_;
421  return true;
422 }
423 
425  char data[kBufLen];
426  bool ret = false;
427 
428  do {
429  Receive(boost::asio::buffer(data, kBufLen));
431  ctxt->SetErrno(0);
432  // BlockingRecv used only during Init and doesnt support bulk messages
433  // Use non-bulk version of decoder
434  Decoder(data, ctxt);
435  if (ctxt->GetErrno() != 0 && ctxt->GetErrno() != EEXIST) {
436  KSYNC_ERROR(VRouterError, "VRouter operation failed. Error <",
437  ctxt->GetErrno(), ":",
439  ">. Object <", "N/A", ">. State <", "N/A",
440  ">. Message number :", 0);
441  ret = true;
442  }
443  } while (IsMoreData(data));
444 
445  return ret;
446 }
447 
448 // BlockingSend does not support bulk messages.
449 size_t KSyncSock::BlockingSend(char *msg, int msg_len) {
450  KSyncBufferList iovec;
451  iovec.push_back(buffer(msg, msg_len));
452  bulk_buf_size_ = msg_len;
453  return SendTo(&iovec, 0);
454 }
455 
457  send_queue_.Enqueue(ioc);
458 }
459 
460 void KSyncSock::SendAsync(KSyncEntry *entry, int msg_len, char *msg,
461  KSyncEntry::KSyncEvent event) {
462  KSyncIoContext *ioc = new KSyncIoContext(this, entry, msg_len, msg, event);
463  // Pre-allocate buffers to minimize processing in KSyncTxQueue context
464  if (read_inline_ && entry->pre_alloc_rx_buffer()) {
465  ioc->rx_buffer1_ = new char [kBufLen];
466  ioc->rx_buffer2_ = new char [kBufLen];
467  } else {
468  ioc->rx_buffer1_ = ioc->rx_buffer2_ = NULL;
469  }
470  send_queue_.Enqueue(ioc);
471 }
472 
473 // Write handler registered with boost::asio
474 void KSyncSock::WriteHandler(const boost::system::error_code& error,
475  size_t bytes_transferred) {
476  if (error) {
477  LOG(ERROR, "Ksync sock write error : " <<
478  boost::system::system_error(error).what());
479  if (shutdown_ == false) {
480  assert(0);
481  }
482  }
483 }
484 
485 // End of messages in the work-queue. Send messages pending in bulk context
486 void KSyncSock::OnEmptyQueue(bool done) {
488  return;
489 
490  KSyncBulkMsgContext *bulk_message_context = NULL;
491  if (use_wait_tree_) {
492  if (read_inline_ == false) {
493  std::scoped_lock lock(mutex_);
494  WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
495  assert(it != wait_tree_.end());
496  bulk_message_context = &it->second;
497  } else {
498  bulk_message_context = bulk_msg_context_;
499  }
500  } else {
501  bulk_message_context = bulk_mctx_arr_[bmca_prod_];
502  }
503 
504  SendBulkMessage(bulk_message_context, bulk_seq_no_);
505 }
506 
507 // Send messages accumilated in bulk context
509  uint32_t seqno) {
510  KSyncBufferList iovec;
511  // Get all buffers to send into single io-vector
512  bulk_message_context->Data(&iovec);
513  tx_count_++;
514 
515  if (!read_inline_) {
516  if (!use_wait_tree_) {
517  bmca_prod_++;
519  bmca_prod_ = 0;
520  }
521  }
522 
523  AsyncSendTo(&iovec, seqno,
524  boost::bind(&KSyncSock::WriteHandler, this,
525  placeholders::error,
526  placeholders::bytes_transferred));
527  } else {
528  SendTo(&iovec, seqno);
529  bool more_data = false;
530  do {
531  char *rxbuf = bulk_message_context->GetReceiveBuffer();
532  Receive(boost::asio::buffer(rxbuf, kBufLen));
533  more_data = IsMoreData(rxbuf);
534  if (!process_data_inline_) {
535  ValidateAndEnqueue(rxbuf, bulk_message_context);
536  } else {
537  ProcessDataInline(rxbuf);
538  }
539  } while(more_data);
540  }
541 
542  bulk_msg_context_ = NULL;
544  return true;
545 }
546 
547 // Get the bulk-context for sequence-number
549 (uint32_t seqno, IoContext::Type io_context_type,
550  uint32_t work_queue_index) {
551  if (read_inline_) {
553  assert(bulk_msg_context_ == NULL);
554  bulk_seq_no_ = seqno;
555  bulk_buf_size_ = 0;
556  bulk_msg_count_ = 0;
557  bulk_msg_context_ = new KSyncBulkMsgContext(io_context_type,
558  work_queue_index);
559  }
560  return bulk_msg_context_;
561  }
562 
563  if (use_wait_tree_) {
564  std::scoped_lock lock(mutex_);
566  bulk_seq_no_ = seqno;
567  bulk_buf_size_ = 0;
568  bulk_msg_count_ = 0;
569 
570  wait_tree_.insert(WaitTreePair(seqno,
571  KSyncBulkMsgContext(io_context_type,
572  work_queue_index)));
573  }
574 
575  WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
576  assert(it != wait_tree_.end());
577  return &it->second;
578  } else {
580  bulk_seq_no_ = seqno;
581  bulk_buf_size_ = 0;
582  bulk_msg_count_ = 0;
583 
584  bulk_mctx_arr_[bmca_prod_] = new KSyncBulkMsgContext(io_context_type,
585  work_queue_index);
587  }
588 
589  return bulk_mctx_arr_[bmca_prod_];
590  }
591 
592  return NULL;
593 }
594 
595 // Try adding an io-context to bulk context. Returns
596 // - true : if message can be added to bulk context
597 // - false : if message cannot be added to bulk context
598 bool KSyncSock::TryAddToBulk(KSyncBulkMsgContext *bulk_message_context,
599  IoContext *ioc) {
601  return false;
602 
603  if (bulk_message_context->io_context_type() != ioc->type())
604  return false;
605 
606  if (use_wait_tree_) {
607  if (bulk_message_context->work_queue_index() != ioc->index())
608  return false;
609  }
610 
611  bulk_buf_size_ += ioc->GetMsgLen();
612  bulk_msg_count_++;
613 
614  bulk_message_context->Insert(ioc);
615  if (ioc->rx_buffer1()) {
616  bulk_message_context->AddReceiveBuffer(ioc->rx_buffer1());
617  ioc->reset_rx_buffer1();
618 
619  }
620  if (ioc->rx_buffer2()) {
621  bulk_message_context->AddReceiveBuffer(ioc->rx_buffer2());
622  ioc->reset_rx_buffer2();
623  }
624  return true;
625 }
626 
628  KSyncBulkMsgContext *bulk_message_context =
629  LocateBulkContext(ioc->GetSeqno(), ioc->type(), ioc->index());
630  // Try adding message to bulk-message list
631  if (TryAddToBulk(bulk_message_context, ioc)) {
632  // Message added to bulk-list. Nothing more to do
633  return true;
634  }
635 
636  // Message cannot be added to bulk-list. Send the current list
637  SendBulkMessage(bulk_message_context, bulk_seq_no_);
638 
639  // Allocate a new context and add message to it
640  bulk_message_context = LocateBulkContext(ioc->GetSeqno(), ioc->type(),
641  ioc->index());
642  assert(TryAddToBulk(bulk_message_context, ioc));
643  return true;
644 }
645 
646 
648 // KSyncSockNetlink routines
650 KSyncSockNetlink::KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
651  : sock_(ios, protocol) {
652  ReceiveBuffForceSize set_rcv_buf;
653  set_rcv_buf = KSYNC_SOCK_RECV_BUFF_SIZE;
654  boost::system::error_code ec;
655  sock_.set_option(set_rcv_buf, ec);
656  if (ec.value() != 0) {
657  LOG(ERROR, "Error Changing netlink receive sock buffer size to " <<
658  set_rcv_buf.value() << " error = " <<
659  boost::system::system_error(ec).what());
660  }
661  boost::asio::socket_base::receive_buffer_size rcv_buf_size;
662  boost::system::error_code ec1;
663  sock_.get_option(rcv_buf_size, ec);
664  LOG(INFO, "Current receive sock buffer size is " << rcv_buf_size.value());
665 }
666 
668 }
669 
670 void KSyncSockNetlink::Init(io_service &ios, int protocol, bool use_work_queue,
671  const std::string &cpu_pin_policy) {
673  KSyncSock::Init(use_work_queue, cpu_pin_policy);
674 }
675 
676 uint32_t KSyncSockNetlink::GetSeqno(char *data) {
677  return GetNetlinkSeqno(data);
678 }
679 
681  return NetlinkMsgDone(data);
682 }
683 
684 bool KSyncSockNetlink::Validate(char *data) {
685  return ValidateNetlink(data);
686 }
687 
688 //netlink socket class for interacting with kernel
689 void KSyncSockNetlink::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
690  HandlerCb cb) {
692  KSyncBufferList::iterator it = iovec->begin();
693  iovec->insert(it, buffer((char *)nl_client_->cl_buf,
694  nl_client_->cl_buf_offset));
696 
697  boost::asio::netlink::raw::endpoint ep;
698  sock_.async_send_to(*iovec, ep, cb);
699 }
700 
701 size_t KSyncSockNetlink::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
703  KSyncBufferList::iterator it = iovec->begin();
704  iovec->insert(it, buffer((char *)nl_client_->cl_buf,
705  nl_client_->cl_buf_offset));
707 
708  boost::asio::netlink::raw::endpoint ep;
709  return sock_.send_to(*iovec, ep);
710 }
711 
712 // Static method to decode non-bulk message
714  assert(ValidateNetlink(data));
715  char *buf = NULL;
716  uint32_t buf_len = 0;
717  GetNetlinkPayload(data, &buf, &buf_len);
718  DecodeSandeshMessages(buf, buf_len, ctxt, NLA_ALIGNTO);
719 }
720 
721 bool KSyncSockNetlink::Decoder(char *data, AgentSandeshContext *context) {
722  NetlinkDecoder(data, context);
723  return true;
724 }
725 
726 // Static method used in ksync_sock_user only
728  bool more) {
729  assert(ValidateNetlink(data));
730  char *buf = NULL;
731  uint32_t buf_len = 0;
732  GetNetlinkPayload(data, &buf, &buf_len);
733  KSyncBulkSandeshContext *bulk_sandesh_context =
734  dynamic_cast<KSyncBulkSandeshContext *>(ctxt);
735  bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, more);
736 }
737 
739  KSyncBulkSandeshContext *bulk_sandesh_context) {
740  // Get sandesh buffer and buffer-length
741  uint32_t buf_len = 0;
742  char *buf = NULL;
743  GetNetlinkPayload(data, &buf, &buf_len);
744  return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
745 }
746 
747 void KSyncSockNetlink::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
748  sock_.async_receive(buf, cb);
749 }
750 
751 void KSyncSockNetlink::Receive(mutable_buffers_1 buf) {
752  sock_.receive(buf);
753  struct nlmsghdr *nlh = buffer_cast<struct nlmsghdr *>(buf);
754  if (nlh->nlmsg_type == NLMSG_ERROR) {
755  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
756  << " len " << nlh->nlmsg_len);
757  assert(0);
758  }
759 }
760 
762 // KSyncSockUdp routines
764 //Udp socket class for interacting with kernel
765 KSyncSockUdp::KSyncSockUdp(boost::asio::io_context &ios, int port) :
766  sock_(ios, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
767  server_ep_(boost::asio::ip::address::from_string("127.0.0.1"), port) {
768 }
769 
770 void KSyncSockUdp::Init(io_service &ios, int port,
771  const std::string &cpu_pin_policy) {
773  KSyncSock::Init(false, cpu_pin_policy);
774 }
775 
776 uint32_t KSyncSockUdp::GetSeqno(char *data) {
777  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
778  return hdr->seq_no;
779 }
780 
781 bool KSyncSockUdp::IsMoreData(char *data) {
782  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
783  return ((hdr->flags & UVR_MORE) == UVR_MORE);
784 }
785 
786 // We dont expect any non-bulk operation on UDP
787 bool KSyncSockUdp::Decoder(char *data, AgentSandeshContext *context) {
788  assert(0);
789  return false;
790 }
791 
793  KSyncBulkSandeshContext *bulk_sandesh_context) {
794  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
795  uint32_t buf_len = hdr->msg_len;
796  char *buf = data + sizeof(struct uvr_msg_hdr);
797  return bulk_sandesh_context->Decoder(buf, buf_len, 1, IsMoreData(data));
798 }
799 
800 void KSyncSockUdp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
801  HandlerCb cb) {
802  struct uvr_msg_hdr hdr;
803  hdr.seq_no = seq_no;
804  hdr.flags = 0;
805  hdr.msg_len = bulk_buf_size_;
806 
807  KSyncBufferList::iterator it = iovec->begin();
808  iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
809 
810  sock_.async_send_to(*iovec, server_ep_, cb);
811 }
812 
813 size_t KSyncSockUdp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
814  struct uvr_msg_hdr hdr;
815  hdr.seq_no = seq_no;
816  hdr.flags = 0;
817  hdr.msg_len = bulk_buf_size_;
818 
819  KSyncBufferList::iterator it = iovec->begin();
820  iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
821 
822  return sock_.send_to(*iovec, server_ep_, MSG_DONTWAIT);
823 }
824 
825 bool KSyncSockUdp::Validate(char *data) {
826  return true;
827 }
828 
829 void KSyncSockUdp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
830  boost::asio::ip::udp::endpoint ep;
831  sock_.async_receive_from(buf, ep, cb);
832 }
833 
834 void KSyncSockUdp::Receive(mutable_buffers_1 buf) {
835  boost::asio::ip::udp::endpoint ep;
836  sock_.receive_from(buf, ep);
837 }
838 
840  KSyncBulkMsgContext *bulk_message_context = NULL;
841  KSyncBulkSandeshContext *bulk_sandesh_context;
842  uint32_t seqno = GetSeqno(data);
843 
844  assert(!use_wait_tree_);
845  Validate(data);
846 
847  bulk_sandesh_context = GetBulkSandeshContext(seqno);
848  bulk_message_context = bulk_mctx_arr_[bmca_cons_];
849  assert(bulk_message_context->seqno() == seqno);
850 
851  bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
852  BulkDecoder(data, bulk_sandesh_context);
853 
854  // Remove the IoContext only on last netlink message
855  if (IsMoreData(data) == false) {
856  delete bulk_message_context;
857  bmca_cons_++;
859  bmca_cons_ = 0;
860  }
861  }
862 
863  return;
864 }
865 
867 // KSyncIoContext routines
870  int msg_len, char *msg,
871  KSyncEntry::KSyncEvent event) :
872  IoContext(msg, msg_len, 0,
873  sock->GetAgentSandeshContext(sync_entry->GetTableIndex()),
874  IoContext::IOC_KSYNC, sync_entry->GetTableIndex()),
875  entry_(sync_entry), event_(event), sock_(sock) {
876  SetSeqno(sock->AllocSeqNo(type(), index()));
877 }
878 
881 }
882 
885 }
886 
888 // Routines for KSyncBulkSandeshContext
891  AgentSandeshContext(), bulk_msg_context_(NULL) { }
892 
894 }
895 
896 // Sandesh responses for old context are done. Check for any errors
899  AgentSandeshContext *sandesh_context = io_context->GetSandeshContext();
900 
901  sandesh_context->set_ksync_io_ctx(NULL);
902  if (sandesh_context->GetErrno() != 0 &&
903  sandesh_context->GetErrno() != EEXIST) {
904  io_context->ErrorHandler(sandesh_context->GetErrno());
905  }
906  io_context->Handler();
907 }
908 
912  AgentSandeshContext *sandesh_context = io_context.GetSandeshContext();
913  sandesh_context->set_ksync_io_ctx
914  (static_cast<KSyncIoContext *>(&io_context));
915 }
916 
917 // Process the sandesh messages
918 // There can be more then one sandesh messages in the netlink buffer.
919 // Iterate and process all of them
920 bool KSyncBulkSandeshContext::Decoder(char *data, uint32_t len,
921  uint32_t alignment, bool more) {
922  DecodeSandeshMessages(data, len, this, alignment);
925  if (more == true)
926  return false;
927 
928  IoContextDone();
929 
930  // No more netlink messages. Validate that iterator points to last element
931  // in IoContextList
935  return true;
936 }
937 
940  context->SetErrno(err);
941 }
942 
945  return bulk_msg_context_->io_context_list_it_->GetSandeshContext();
946 }
947 
948 void KSyncBulkSandeshContext::IfMsgHandler(vr_interface_req *req) {
950  context->IfMsgHandler(req);
951 }
952 
953 void KSyncBulkSandeshContext::NHMsgHandler(vr_nexthop_req *req) {
955  context->NHMsgHandler(req);
956 }
957 
960  context->RouteMsgHandler(req);
961 }
962 
965  context->MplsMsgHandler(req);
966 }
967 
970  context->QosConfigMsgHandler(req);
971 }
972 
975  context->ForwardingClassMsgHandler(req);
976 }
977 
978 // vr_response message is treated as delimiter in a bulk-context. So, move to
979 // next io-context within bulk-message context.
981  AgentSandeshContext *sandesh_context = NULL;
982  // If this is first vr_reponse received, move io-context to first entry in
983  // bulk context
987  sandesh_context =
988  bulk_msg_context_->io_context_list_it_->GetSandeshContext();
989  IoContextStart();
990  } else {
991  // Sandesh responses for old io-context are done.
992  // Check for any errors and trigger state-machine for old io-context
993  IoContextDone();
994  // Move to the next io-context
998  sandesh_context =
999  bulk_msg_context_->io_context_list_it_->GetSandeshContext();
1000  IoContextStart();
1001  }
1002  return sandesh_context->VrResponseMsgHandler(resp);
1003 }
1004 
1007  context->MirrorMsgHandler(req);
1008 }
1009 
1012  context->FlowMsgHandler(req);
1013 }
1014 
1017  context->FlowResponseHandler(req);
1018 }
1019 
1020 void KSyncBulkSandeshContext::VrfAssignMsgHandler(vr_vrf_assign_req *req) {
1022  context->VrfAssignMsgHandler(req);
1023 }
1024 
1027  context->VrfMsgHandler(req);
1028 }
1029 
1030 void KSyncBulkSandeshContext::VrfStatsMsgHandler(vr_vrf_stats_req *req) {
1032  context->VrfStatsMsgHandler(req);
1033 }
1034 
1035 void KSyncBulkSandeshContext::DropStatsMsgHandler(vr_drop_stats_req *req) {
1037  context->DropStatsMsgHandler(req);
1038 }
1039 
1042  context->VxLanMsgHandler(req);
1043 }
1044 
1047  context->VrouterOpsMsgHandler(req);
1048 }
1049 
1051 // KSyncBulkMsgContext routines
1054  uint32_t index) :
1055  io_context_list_(), io_context_type_(type), work_queue_index_(index),
1056  rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1057 }
1058 
1060  io_context_list_(), io_context_type_(rhs.io_context_type_),
1061  work_queue_index_(rhs.work_queue_index_),
1062  rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1063  assert(rhs.vr_response_count_ == 0);
1064  assert(rhs.rx_buffer_index_ == 0);
1065  assert(rhs.io_context_list_.size() == 0);
1066 }
1067 
1069  void operator() (IoContext *io_context) { delete io_context; }
1070 };
1071 
1073  assert(vr_response_count_ == io_context_list_.size());
1074  io_context_list_.clear_and_dispose(IoContextDisposer());
1075  for (uint32_t i = 0; i < rx_buffer_index_; i++) {
1076  delete[] rx_buffers_[i];
1077  }
1078 }
1079 
1081  if (rx_buffer_index_ == 0)
1082  return new char[KSyncSock::kBufLen];
1083 
1084  return rx_buffers_[--rx_buffer_index_];
1085 }
1086 
1089  rx_buffers_[rx_buffer_index_++] = buff;
1090 }
1091 
1093  io_context_list_.push_back(*ioc);
1094  return;
1095 }
1096 
1098  IoContextList::iterator it = io_context_list_.begin();
1099  while (it != io_context_list_.end()) {
1100  iovec->push_back(buffer(it->GetMsg(), it->GetMsgLen()));
1101  it++;
1102  }
1103 }
void set_ksync_io_ctx(const KSyncIoContext *ioc)
Definition: ksync_sock.h:81
virtual void MplsMsgHandler(vr_mpls_req *req)=0
virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req)=0
int GetErrno() const
Definition: ksync_sock.h:80
virtual void VrfMsgHandler(vr_vrf_req *req)=0
virtual void MirrorMsgHandler(vr_mirror_req *req)=0
virtual void NHMsgHandler(vr_nexthop_req *req)=0
virtual void QosConfigMsgHandler(vr_qos_map_req *req)=0
virtual void VrouterOpsMsgHandler(vrouter_ops *req)=0
virtual void SetErrno(int err)
Definition: ksync_sock.h:78
virtual void FlowResponseHandler(vr_flow_response *req)
Definition: ksync_sock.h:66
virtual void IfMsgHandler(vr_interface_req *req)=0
virtual void FlowMsgHandler(vr_flow_req *req)=0
virtual void ForwardingClassMsgHandler(vr_fc_map_req *req)=0
virtual void RouteMsgHandler(vr_route_req *req)=0
virtual void DropStatsMsgHandler(vr_drop_stats_req *req)=0
virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req)=0
virtual void VxLanMsgHandler(vr_vxlan_req *req)=0
virtual int VrResponseMsgHandler(vr_response *resp)=0
AgentSandeshContext * GetSandeshContext()
Definition: ksync_sock.h:134
char * rx_buffer1() const
Definition: ksync_sock.h:141
virtual void ErrorHandler(int err)
Definition: ksync_sock.h:132
uint32_t index() const
Definition: ksync_sock.h:145
char * rx_buffer1_
Definition: ksync_sock.h:161
static const char * io_wq_names[MAX_WORK_QUEUES]
Definition: ksync_sock.h:104
virtual void Handler()
Definition: ksync_sock.h:131
@ MAX_WORK_QUEUES
Definition: ksync_sock.h:101
void reset_rx_buffer2()
Definition: ksync_sock.h:144
uint32_t GetSeqno() const
Definition: ksync_sock.h:138
uint32_t GetMsgLen() const
Definition: ksync_sock.h:140
char * rx_buffer2()
Definition: ksync_sock.h:143
void reset_rx_buffer1()
Definition: ksync_sock.h:142
Type type()
Definition: ksync_sock.h:135
void SetSeqno(uint32_t seqno)
Definition: ksync_sock.h:137
char * rx_buffer2_
Definition: ksync_sock.h:162
void set_seqno(uint32_t seq)
Definition: ksync_sock.h:246
uint32_t rx_buffer_index_
Definition: ksync_sock.h:262
KSyncBulkMsgContext(IoContext::Type type, uint32_t index)
Definition: ksync_sock.cc:1053
void Insert(IoContext *ioc)
Definition: ksync_sock.cc:1092
void Data(KSyncBufferList *iovec)
Definition: ksync_sock.cc:1097
char * GetReceiveBuffer()
Definition: ksync_sock.cc:1080
char * rx_buffers_[kMaxRxBufferCount]
Definition: ksync_sock.h:260
uint32_t vr_response_count_
Definition: ksync_sock.h:268
static const unsigned kMaxRxBufferCount
Definition: ksync_sock.h:233
IoContextList::iterator io_context_list_it_
Definition: ksync_sock.h:270
IoContextList io_context_list_
Definition: ksync_sock.h:251
IoContext::Type io_context_type() const
Definition: ksync_sock.h:240
uint32_t work_queue_index() const
Definition: ksync_sock.h:245
uint32_t seqno()
Definition: ksync_sock.h:247
void AddReceiveBuffer(char *buff)
Definition: ksync_sock.cc:1087
void set_bulk_message_context(KSyncBulkMsgContext *bulk_context)
Definition: ksync_sock.h:299
void VxLanMsgHandler(vr_vxlan_req *req)
Definition: ksync_sock.cc:1040
void IfMsgHandler(vr_interface_req *req)
Definition: ksync_sock.cc:948
void DropStatsMsgHandler(vr_drop_stats_req *req)
Definition: ksync_sock.cc:1035
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
Definition: ksync_sock.cc:920
void ForwardingClassMsgHandler(vr_fc_map_req *req)
Definition: ksync_sock.cc:973
void VrfAssignMsgHandler(vr_vrf_assign_req *req)
Definition: ksync_sock.cc:1020
void NHMsgHandler(vr_nexthop_req *req)
Definition: ksync_sock.cc:953
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:307
void FlowResponseHandler(vr_flow_response *req)
Definition: ksync_sock.cc:1015
virtual ~KSyncBulkSandeshContext()
Definition: ksync_sock.cc:893
void VrfStatsMsgHandler(vr_vrf_stats_req *req)
Definition: ksync_sock.cc:1030
void VrouterOpsMsgHandler(vrouter_ops *req)
Definition: ksync_sock.cc:1045
void RouteMsgHandler(vr_route_req *req)
Definition: ksync_sock.cc:958
void QosConfigMsgHandler(vr_qos_map_req *req)
Definition: ksync_sock.cc:968
void SetErrno(int err)
Definition: ksync_sock.cc:938
int VrResponseMsgHandler(vr_response *resp)
Definition: ksync_sock.cc:980
void MplsMsgHandler(vr_mpls_req *req)
Definition: ksync_sock.cc:963
AgentSandeshContext * GetSandeshContext()
Definition: ksync_sock.cc:943
void VrfMsgHandler(vr_vrf_req *req)
Definition: ksync_sock.cc:1025
void MirrorMsgHandler(vr_mirror_req *req)
Definition: ksync_sock.cc:1005
void FlowMsgHandler(vr_flow_req *req)
Definition: ksync_sock.cc:1010
virtual KSyncObject * GetObject() const =0
virtual bool pre_alloc_rx_buffer() const
Definition: ksync_entry.h:149
virtual void ErrorHandler(int err, uint32_t seqno, KSyncEvent event) const
static std::string VrouterErrorToString(uint32_t error)
KSyncEntry * entry_
Definition: ksync_sock.h:181
void ErrorHandler(int err)
Definition: ksync_sock.cc:883
KSyncSock * sock_
Definition: ksync_sock.h:184
KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:869
virtual void Handler()
Definition: ksync_sock.cc:879
KSyncEntry::KSyncEvent event_
Definition: ksync_sock.h:182
virtual void NetlinkAck(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
virtual void Receive(boost::asio::mutable_buffers_1)
Definition: ksync_sock.cc:834
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
Definition: ksync_sock.cc:829
static void Init(boost::asio::io_context &ios, int port, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:770
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
Definition: ksync_sock.cc:792
KSyncSockUdp(boost::asio::io_context &ios, int port)
Definition: ksync_sock.cc:765
virtual bool IsMoreData(char *data)
Definition: ksync_sock.cc:781
boost::asio::ip::udp::endpoint server_ep_
Definition: ksync_sock.h:561
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
Definition: ksync_sock.cc:800
virtual bool Validate(char *data)
Definition: ksync_sock.cc:825
boost::asio::ip::udp::socket sock_
Definition: ksync_sock.h:560
virtual uint32_t GetSeqno(char *data)
Definition: ksync_sock.cc:776
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
Definition: ksync_sock.cc:813
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
Definition: ksync_sock.cc:787
WorkQueue< KSyncRxData > KSyncReceiveQueue
Definition: ksync_sock.h:343
static std::unique_ptr< KSyncSock > sock_
Definition: ksync_sock.h:502
static AgentSandeshContext * agent_sandesh_ctx_[kRxWorkQueueCount]
Definition: ksync_sock.h:509
uint32_t bmca_prod_
Definition: ksync_sock.h:448
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
Definition: ksync_sock.h:394
static pid_t pid_
Definition: ksync_sock.h:503
static const unsigned kInvalidBulkSeqNo
Definition: ksync_sock.h:321
std::size_t BlockingSend(char *msg, int msg_len)
Definition: ksync_sock.cc:449
KSyncReceiveQueue * ksync_rx_queue[kRxWorkQueueCount]
Definition: ksync_sock.h:431
bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context, const KSyncRxData &data)
Definition: ksync_sock.cc:391
static uint32_t GetPid()
Definition: ksync_sock.h:390
virtual bool Validate(char *data)=0
void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:327
KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount]
Definition: ksync_sock.h:490
std::atomic< uint32_t > seqno_
Definition: ksync_sock.h:482
virtual uint32_t GetSeqno(char *data)=0
std::mutex mutex_
Definition: ksync_sock.h:425
virtual void Receive(boost::asio::mutable_buffers_1)=0
std::pair< uint32_t, KSyncBulkMsgContext > WaitTreePair
Definition: ksync_sock.h:324
uint32_t bulk_seq_no_
Definition: ksync_sock.h:442
std::atomic< uint32_t > uve_seqno_
Definition: ksync_sock.h:483
KSyncBulkMsgContext * LocateBulkContext(uint32_t seqno, IoContext::Type io_context_type, uint32_t work_queue_index)
Definition: ksync_sock.cc:549
void OnEmptyQueue(bool done)
Definition: ksync_sock.cc:486
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)=0
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:231
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)=0
void ProcessDataInline(char *data)
Definition: ksync_sock.cc:839
bool read_inline_
Definition: ksync_sock.h:486
static void Shutdown()
Definition: ksync_sock.cc:225
void ReadHandler(const boost::system::error_code &error, size_t bytes_transferred)
Definition: ksync_sock.cc:369
bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context)
Definition: ksync_sock.cc:354
void GenericSend(IoContext *ctx)
Definition: ksync_sock.cc:456
int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno)
Definition: ksync_sock.cc:508
bool SendAsyncImpl(IoContext *ioc)
Definition: ksync_sock.cc:627
static void Start(bool read_inline)
Definition: ksync_sock.cc:257
uint32_t WaitTreeSize() const
Definition: ksync_sock.cc:279
void SendAsync(KSyncEntry *entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:460
uint32_t bulk_buf_size_
Definition: ksync_sock.h:444
static const int kRxWorkQueueCount
Definition: ksync_sock.h:314
virtual ~KSyncSock()
Definition: ksync_sock.cc:203
KSyncReceiveQueue * AllocQueue(KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance, const char *name)
Definition: ksync_sock.cc:238
bool BlockingRecv()
Definition: ksync_sock.cc:424
static void SetSockTableEntry(KSyncSock *sock)
Definition: ksync_sock.cc:269
KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount]
Definition: ksync_sock.h:491
KSyncRxWorkQueue rx_process_queue_
Definition: ksync_sock.h:501
static void SetNetlinkFamilyId(int id)
Definition: ksync_sock.cc:274
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
Definition: ksync_sock.h:326
static const unsigned kBufLen
Definition: ksync_sock.h:316
void WriteHandler(const boost::system::error_code &error, size_t bytes_transferred)
Definition: ksync_sock.cc:474
void SetMeasureQueueDelay(bool val)
Definition: ksync_sock.cc:250
bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc)
Definition: ksync_sock.cc:598
char * rx_buff_
Definition: ksync_sock.h:481
KSyncReceiveQueue * GetReceiveQueue(IoContext::Type type, uint32_t instance)
Definition: ksync_sock.cc:308
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)=0
uint32_t AllocSeqNo(IoContext::Type type)
Definition: ksync_sock.cc:304
WaitTree wait_tree_
Definition: ksync_sock.h:428
uint32_t bulk_msg_count_
Definition: ksync_sock.h:446
nl_client * nl_client_
Definition: ksync_sock.h:426
void SetSeqno(uint32_t seq)
Definition: ksync_sock.cc:283
KSyncTxQueue send_queue_
Definition: ksync_sock.h:429
static KSyncSock * Get(DBTablePartBase *partition)
Definition: ksync_sock.cc:345
bool use_wait_tree_
Definition: ksync_sock.h:488
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)=0
KSyncReceiveQueue * uve_rx_queue[kRxWorkQueueCount]
Definition: ksync_sock.h:430
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)=0
uint32_t max_bulk_msg_count_
Definition: ksync_sock.h:436
KSyncBulkSandeshContext * GetBulkSandeshContext(uint32_t seqno)
Definition: ksync_sock.cc:336
KSyncBulkMsgContext * bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE]
Definition: ksync_sock.h:450
static int vnsw_netlink_family_id_
Definition: ksync_sock.h:504
int tx_count_
Definition: ksync_sock.h:494
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:487
bool ProcessRxData(KSyncRxQueueData data)
Definition: ksync_sock.cc:330
uint32_t bmca_cons_
Definition: ksync_sock.h:449
virtual bool IsMoreData(char *data)=0
bool process_data_inline_
Definition: ksync_sock.h:489
static std::atomic< bool > shutdown_
Definition: ksync_sock.h:510
static int GetNetlinkFamilyId()
Definition: ksync_sock.h:391
bool Enqueue(IoContext *io_context)
static int32_t ReceiveBinaryMsgOne(u_int8_t *buf, u_int32_t buf_len, int *error, SandeshContext *client_context)
Definition: sandesh.cc:643
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
int GetTaskId(const std::string &name)
Definition: task.cc:861
static TaskScheduler * GetInstance()
Definition: task.cc:554
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
void set_name(const std::string &name)
Definition: queue_task.h:307
#define KSYNC_ERROR(obj,...)
Definition: ksync_entry.h:16
bool NetlinkMsgDone(char *data)
Definition: ksync_sock.cc:67
bool ValidateNetlink(char *data)
Definition: ksync_sock.cc:73
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
Definition: ksync_sock.cc:118
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
Definition: ksync_sock.cc:145
boost::asio::detail::socket_option::integer< SOL_SOCKET, SO_RCVBUFFORCE > ReceiveBuffForceSize
Definition: ksync_sock.cc:45
void InitNetlink(nl_client *client)
Definition: ksync_sock.cc:131
uint32_t GetNetlinkSeqno(char *data)
Definition: ksync_sock.cc:62
void ResetNetlink(nl_client *client)
Definition: ksync_sock.cc:138
void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context, uint32_t alignment)
Definition: ksync_sock.cc:152
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
Definition: ksync_sock.h:37
#define KSYNC_BMC_ARR_SIZE
Definition: ksync_sock.h:31
#define KSYNC_DEFAULT_Q_ID_SEQ
Definition: ksync_sock.h:28
#define KSYNC_SOCK_RECV_BUFF_SIZE
Definition: ksync_sock.h:30
uint8_t type
Definition: load_balance.h:2
#define LOG(_Level, _Msg)
Definition: logging.h:34
void operator()(IoContext *io_context)
Definition: ksync_sock.cc:1069
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:333
KSyncEntry::KSyncEvent event_
Definition: ksync_sock.h:347