OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ksync_sock.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <string>
6 #include "base/os.h"
7 #if defined(__linux__)
8 #include <asm/types.h>
9 #include <linux/netlink.h>
10 #include <linux/rtnetlink.h>
11 #include <linux/genetlink.h>
12 #include <linux/sockios.h>
13 #endif
14 #include <sys/socket.h>
15 
16 #include <boost/bind.hpp>
17 
18 #include <base/logging.h>
19 #include <db/db.h>
20 #include <db/db_entry.h>
21 #include <db/db_table.h>
22 #include <db/db_table_partition.h>
23 
24 #include "ksync_index.h"
25 #include "ksync_entry.h"
26 #include "ksync_object.h"
27 #include "ksync_sock.h"
28 #include "ksync_sock_user.h"
29 #include "ksync_types.h"
30 
31 #include "nl_util.h"
32 #include "udp_util.h"
33 #include "vr_genetlink.h"
34 #include "vr_types.h"
35 
36 using namespace boost::asio;
37 
38 /* Note SO_RCVBUFFORCE is supported only for linux version 2.6.14 and above */
39 typedef boost::asio::detail::socket_option::integer<SOL_SOCKET,
40  SO_RCVBUFFORCE> ReceiveBuffForceSize;
41 
44 std::unique_ptr<KSyncSock> KSyncSock::sock_;
45 pid_t KSyncSock::pid_;
46 tbb::atomic<bool> KSyncSock::shutdown_;
47 
48 // Name of task used in KSync Response work-queues
50  {
51  "Agent::Uve",
52  "Agent::KSync"
53  };
55 // Netlink utilities
57 uint32_t GetNetlinkSeqno(char *data) {
58  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
59  return nlh->nlmsg_seq;
60 }
61 
62 bool NetlinkMsgDone(char *data) {
63  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
64  return ((nlh->nlmsg_flags & NLM_F_MULTI) != 0);
65 }
66 
67 // Common validation for netlink messages
68 bool ValidateNetlink(char *data) {
69  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
70  if (nlh->nlmsg_type == NLMSG_ERROR) {
71  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq << " len "
72  << nlh->nlmsg_len);
73  assert(0);
74  return false;
75  }
76 
77  if (nlh->nlmsg_len > KSyncSock::kBufLen) {
78  LOG(ERROR, "Length of " << nlh->nlmsg_len << " is more than expected "
79  "length of " << KSyncSock::kBufLen);
80  assert(0);
81  return false;
82  }
83 
84  if (nlh->nlmsg_type == NLMSG_DONE) {
85  return true;
86  }
87 
88  // Sanity checks for generic-netlink message
89  if (nlh->nlmsg_type != KSyncSock::GetNetlinkFamilyId()) {
90  LOG(ERROR, "Netlink unknown message type : " << nlh->nlmsg_type);
91  assert(0);
92  return false;
93  }
94 
95  struct genlmsghdr *genlh = (struct genlmsghdr *) (data + NLMSG_HDRLEN);
96  if (genlh->cmd != SANDESH_REQUEST) {
97  LOG(ERROR, "Unknown generic netlink cmd : " << genlh->cmd);
98  assert(0);
99  return false;
100  }
101 
102  struct nlattr * attr = (struct nlattr *)(data + NLMSG_HDRLEN
103  + GENL_HDRLEN);
104  if (attr->nla_type != NL_ATTR_VR_MESSAGE_PROTOCOL) {
105  LOG(ERROR, "Unknown generic netlink TLV type : " << attr->nla_type);
106  assert(0);
107  return false;
108  }
109 
110  return true;
111 }
112 
113 void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len) {
114  struct nlmsghdr *nlh = (struct nlmsghdr *)data;
115  int len = 0;
116  if (nlh->nlmsg_type == NLMSG_DONE) {
117  len = NLMSG_HDRLEN;
118  } else {
119  len = NLMSG_HDRLEN + GENL_HDRLEN + NLA_HDRLEN;
120  }
121 
122  *buf = data + len;
123  *buf_len = nlh->nlmsg_len - len;
124 }
125 
126 void InitNetlink(nl_client *client) {
127  nl_init_generic_client_req(client, KSyncSock::GetNetlinkFamilyId());
128  unsigned char *nl_buf;
129  uint32_t nl_buf_len;
130  assert(nl_build_header(client, &nl_buf, &nl_buf_len) >= 0);
131 }
132 
133 void ResetNetlink(nl_client *client) {
134  unsigned char *nl_buf;
135  uint32_t nl_buf_len;
136  client->cl_buf_offset = 0;
137  nl_build_header(client, &nl_buf, &nl_buf_len);
138 }
139 
140 void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no) {
141  nl_update_header(client, len);
142  struct nlmsghdr *nlh = (struct nlmsghdr *)client->cl_buf;
143  nlh->nlmsg_pid = KSyncSock::GetPid();
144  nlh->nlmsg_seq = seq_no;
145 }
146 
147 void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context,
148  uint32_t alignment) {
149  while (buf_len > (alignment - 1)) {
150  int error;
151  int decode_len = Sandesh::ReceiveBinaryMsgOne((uint8_t *)buf, buf_len,
152  &error, sandesh_context);
153  if (decode_len < 0) {
154  LOG(DEBUG, "Incorrect decode len " << decode_len);
155  break;
156  }
157  buf += decode_len;
158  buf_len -= decode_len;
159  }
160 }
161 
163 // KSyncSock routines
166  nl_client_(NULL), wait_tree_(), send_queue_(this),
167  max_bulk_msg_count_(kMaxBulkMsgCount), max_bulk_buf_size_(kMaxBulkMsgSize),
168  bulk_seq_no_(kInvalidBulkSeqNo), bulk_buf_size_(0), bulk_msg_count_(0),
169  rx_buff_(NULL), read_inline_(true), bulk_msg_context_(NULL),
170  use_wait_tree_(true), process_data_inline_(false),
171  ksync_bulk_sandesh_context_(), uve_bulk_sandesh_context_(),
172  tx_count_(0), ack_count_(0), err_count_(0),
173  rx_process_queue_(TaskScheduler::GetInstance()->GetTaskId("Agent::KSync"), 0,
174  boost::bind(&KSyncSock::ProcessRxData, this, _1)) {
176 
177  uint32_t uve_task_id =
179  uint32_t ksync_task_id =
181  for(uint32_t i = 0; i < kRxWorkQueueCount; i++) {
183  ksync_task_id, i, "KSync Receive Queue");
185  uve_task_id, i, "KSync UVE Receive Queue");
186  }
187 
188  nl_client_ = (nl_client *)malloc(sizeof(nl_client));
189  memset(nl_client_, 0, sizeof(nl_client));
190  rx_buff_ = NULL;
191  seqno_ = 0;
192  uve_seqno_ = 0;
193 
194  memset(bulk_mctx_arr_, 0, sizeof(bulk_mctx_arr_));
195  bmca_prod_ = bmca_cons_ = 0;
196 }
197 
199  assert(wait_tree_.size() == 0);
200 
201  if (rx_buff_) {
202  delete [] rx_buff_;
203  rx_buff_ = NULL;
204  }
205 
206  for(int i = 0; i < kRxWorkQueueCount; i++) {
207  ksync_rx_queue[i]->Shutdown();
208  delete ksync_rx_queue[i];
209 
210  uve_rx_queue[i]->Shutdown();
211  delete uve_rx_queue[i];
212  }
213 
214  if (nl_client_->cl_buf) {
215  free(nl_client_->cl_buf);
216  }
217  free(nl_client_);
218 }
219 
221  shutdown_ = true;
222  sock_->send_queue_.Shutdown();
223  sock_.release();
224 }
225 
226 void KSyncSock::Init(bool use_work_queue, const std::string &cpu_pin_policy) {
227  sock_->send_queue_.Init(use_work_queue, cpu_pin_policy);
228  pid_ = getpid();
229  shutdown_ = false;
230 }
231 
233 (KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance,
234  const char *name) {
235  KSyncReceiveQueue *queue;
236  queue = new KSyncReceiveQueue
237  (task_id, instance, boost::bind(&KSyncSock::ProcessKernelData, this,
238  &ctxt[instance], _1));
239  char tmp[128];
240  sprintf(tmp, "%s-%d", name, instance);
241  queue->set_name(tmp);
242  return queue;
243 }
244 
246  sock_->send_queue_.set_measure_busy_time(val);
247  for (int i = 0; i < kRxWorkQueueCount; i++) {
249  }
250 }
251 
252 void KSyncSock::Start(bool read_inline) {
253  sock_->read_inline_ = read_inline;
254  if (sock_->read_inline_) {
255  return;
256  }
257  sock_->rx_buff_ = new char[kBufLen];
258  sock_->AsyncReceive(boost::asio::buffer(sock_->rx_buff_, kBufLen),
259  boost::bind(&KSyncSock::ReadHandler, sock_.get(),
260  placeholders::error,
261  placeholders::bytes_transferred));
262 }
263 
265  assert(sock_.get() == NULL);
266  sock_.reset(sock);
267 }
268 
271  InitNetlink(sock_->nl_client_);
272 }
273 
274 uint32_t KSyncSock::WaitTreeSize() const {
275  return wait_tree_.size();
276 }
277 
278 void KSyncSock::SetSeqno(uint32_t seq) {
279  seqno_ = seq;
280  uve_seqno_ = seq;
281 }
282 
283 uint32_t KSyncSock::AllocSeqNo(IoContext::Type type, uint32_t instance) {
284  uint32_t seq;
285  if (type == IoContext::IOC_UVE) {
286  seq = uve_seqno_.fetch_and_add(1);
287  seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
288  } else {
289  seq = seqno_.fetch_and_add(1);
290  seq = (seq * kRxWorkQueueCount + (instance % kRxWorkQueueCount)) << 1;
291  seq |= KSYNC_DEFAULT_Q_ID_SEQ;
292  }
293  if (seq == kInvalidBulkSeqNo) {
294  return AllocSeqNo(type, instance);
295  }
296  return seq;
297 }
298 
300  return AllocSeqNo(type, 0);
301 }
302 
304  uint32_t instance) {
305  if (type == IoContext::IOC_UVE) {
306  return uve_rx_queue[instance % kRxWorkQueueCount];
307  } else {
308  return ksync_rx_queue[instance % kRxWorkQueueCount];
309  }
310 }
311 
314  if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
315  type = IoContext::IOC_KSYNC;
316  else
317  type = IoContext::IOC_UVE;
318 
319  uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
320  return GetReceiveQueue(type, instance);
321 }
324 }
326  assert(data.event_ != KSyncEntry::INVALID);
327  KSyncObject *object = data.entry_->GetObject();
328  object->NetlinkAck(data.entry_, data.event_);
329  return true;
330 }
332 
333  uint32_t instance = (seqno >> 1) % kRxWorkQueueCount;
334  if (seqno & KSYNC_DEFAULT_Q_ID_SEQ)
335  return &ksync_bulk_sandesh_context_[instance];
336  else
337  return &uve_bulk_sandesh_context_[instance];
338 }
339 
341  return sock_.get();
342 }
343 
345  assert(idx == 0);
346  return sock_.get();
347 }
348 
350  Validate(data);
351 
352  KSyncReceiveQueue *queue;
353  if (context) {
354  queue = GetReceiveQueue(context->io_context_type(),
355  context->work_queue_index());
356  } else {
357  queue = GetReceiveQueue(GetSeqno(data));
358  }
359  queue->Enqueue(KSyncRxData(data, context));
360  return true;
361 }
362 
363 // Read handler registered with boost::asio. Demux done based on seqno_
364 void KSyncSock::ReadHandler(const boost::system::error_code& error,
365  size_t bytes_transferred) {
366  if (error) {
367  LOG(ERROR, "Error reading from Ksync sock. Error : " <<
368  boost::system::system_error(error).what());
369  if (shutdown_ == false) {
370  assert(0);
371  }
372  return;
373  }
374 
376 
377  rx_buff_ = new char[kBufLen];
378  AsyncReceive(boost::asio::buffer(rx_buff_, kBufLen),
379  boost::bind(&KSyncSock::ReadHandler, this,
380  placeholders::error,
381  placeholders::bytes_transferred));
382 }
383 
384 // Process kernel data - executes in the task specified by IoContext
385 // Currently only Agent::KSync and Agent::Uve are possibilities
387  const KSyncRxData &data) {
388  KSyncBulkMsgContext *bulk_message_context = data.bulk_msg_context_;
389  WaitTree::iterator it;
390  if (data.bulk_msg_context_ == NULL) {
391  uint32_t seqno = GetSeqno(data.buff_);
392  {
393  tbb::mutex::scoped_lock lock(mutex_);
394  it = wait_tree_.find(seqno);
395  }
396  if (it == wait_tree_.end()) {
397  LOG(ERROR, "KSync error in finding for sequence number : "
398  << seqno);
399  assert(0);
400  }
401  bulk_message_context = &(it->second);
402  }
403 
404  bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
405  BulkDecoder(data.buff_, bulk_sandesh_context);
406  // Remove the IoContext only on last netlink message
407  if (IsMoreData(data.buff_) == false) {
408  if (data.bulk_msg_context_ != NULL) {
409  delete data.bulk_msg_context_;
410  } else {
411  tbb::mutex::scoped_lock lock(mutex_);
412  wait_tree_.erase(it);
413  }
414  }
415  delete[] data.buff_;
416  return true;
417 }
418 
420  char data[kBufLen];
421  bool ret = false;
422 
423  do {
424  Receive(boost::asio::buffer(data, kBufLen));
426  ctxt->SetErrno(0);
427  // BlockingRecv used only during Init and doesnt support bulk messages
428  // Use non-bulk version of decoder
429  Decoder(data, ctxt);
430  if (ctxt->GetErrno() != 0 && ctxt->GetErrno() != EEXIST) {
431  KSYNC_ERROR(VRouterError, "VRouter operation failed. Error <",
432  ctxt->GetErrno(), ":",
434  ">. Object <", "N/A", ">. State <", "N/A",
435  ">. Message number :", 0);
436  ret = true;
437  }
438  } while (IsMoreData(data));
439 
440  return ret;
441 }
442 
443 // BlockingSend does not support bulk messages.
444 size_t KSyncSock::BlockingSend(char *msg, int msg_len) {
445  KSyncBufferList iovec;
446  iovec.push_back(buffer(msg, msg_len));
447  bulk_buf_size_ = msg_len;
448  return SendTo(&iovec, 0);
449 }
450 
452  send_queue_.Enqueue(ioc);
453 }
454 
455 void KSyncSock::SendAsync(KSyncEntry *entry, int msg_len, char *msg,
456  KSyncEntry::KSyncEvent event) {
457  KSyncIoContext *ioc = new KSyncIoContext(this, entry, msg_len, msg, event);
458  // Pre-allocate buffers to minimize processing in KSyncTxQueue context
459  if (read_inline_ && entry->pre_alloc_rx_buffer()) {
460  ioc->rx_buffer1_ = new char [kBufLen];
461  ioc->rx_buffer2_ = new char [kBufLen];
462  } else {
463  ioc->rx_buffer1_ = ioc->rx_buffer2_ = NULL;
464  }
465  send_queue_.Enqueue(ioc);
466 }
467 
468 // Write handler registered with boost::asio
469 void KSyncSock::WriteHandler(const boost::system::error_code& error,
470  size_t bytes_transferred) {
471  if (error) {
472  LOG(ERROR, "Ksync sock write error : " <<
473  boost::system::system_error(error).what());
474  if (shutdown_ == false) {
475  assert(0);
476  }
477  }
478 }
479 
480 // End of messages in the work-queue. Send messages pending in bulk context
481 void KSyncSock::OnEmptyQueue(bool done) {
483  return;
484 
485  KSyncBulkMsgContext *bulk_message_context = NULL;
486  if (use_wait_tree_) {
487  if (read_inline_ == false) {
488  tbb::mutex::scoped_lock lock(mutex_);
489  WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
490  assert(it != wait_tree_.end());
491  bulk_message_context = &it->second;
492  } else {
493  bulk_message_context = bulk_msg_context_;
494  }
495  } else {
496  bulk_message_context = bulk_mctx_arr_[bmca_prod_];
497  }
498 
499  SendBulkMessage(bulk_message_context, bulk_seq_no_);
500 }
501 
502 // Send messages accumilated in bulk context
504  uint32_t seqno) {
505  KSyncBufferList iovec;
506  // Get all buffers to send into single io-vector
507  bulk_message_context->Data(&iovec);
508  tx_count_++;
509 
510  if (!read_inline_) {
511  if (!use_wait_tree_) {
512  bmca_prod_++;
514  bmca_prod_ = 0;
515  }
516  }
517 
518  AsyncSendTo(&iovec, seqno,
519  boost::bind(&KSyncSock::WriteHandler, this,
520  placeholders::error,
521  placeholders::bytes_transferred));
522  } else {
523  SendTo(&iovec, seqno);
524  bool more_data = false;
525  do {
526  char *rxbuf = bulk_message_context->GetReceiveBuffer();
527  Receive(boost::asio::buffer(rxbuf, kBufLen));
528  more_data = IsMoreData(rxbuf);
529  if (!process_data_inline_) {
530  ValidateAndEnqueue(rxbuf, bulk_message_context);
531  } else {
532  ProcessDataInline(rxbuf);
533  }
534  } while(more_data);
535  }
536 
537  bulk_msg_context_ = NULL;
539  return true;
540 }
541 
542 // Get the bulk-context for sequence-number
544 (uint32_t seqno, IoContext::Type io_context_type,
545  uint32_t work_queue_index) {
546  if (read_inline_) {
547  if (bulk_seq_no_ == kInvalidBulkSeqNo) {
548  assert(bulk_msg_context_ == NULL);
549  bulk_seq_no_ = seqno;
550  bulk_buf_size_ = 0;
551  bulk_msg_count_ = 0;
552  bulk_msg_context_ = new KSyncBulkMsgContext(io_context_type,
553  work_queue_index);
554  }
555  return bulk_msg_context_;
556  }
557 
558  if (use_wait_tree_) {
559  tbb::mutex::scoped_lock lock(mutex_);
560  if (bulk_seq_no_ == kInvalidBulkSeqNo) {
561  bulk_seq_no_ = seqno;
562  bulk_buf_size_ = 0;
563  bulk_msg_count_ = 0;
564 
565  wait_tree_.insert(WaitTreePair(seqno,
566  KSyncBulkMsgContext(io_context_type,
567  work_queue_index)));
568  }
569 
570  WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
571  assert(it != wait_tree_.end());
572  return &it->second;
573  } else {
574  if (bulk_seq_no_ == kInvalidBulkSeqNo) {
575  bulk_seq_no_ = seqno;
576  bulk_buf_size_ = 0;
577  bulk_msg_count_ = 0;
578 
579  bulk_mctx_arr_[bmca_prod_] = new KSyncBulkMsgContext(io_context_type,
580  work_queue_index);
581  bulk_mctx_arr_[bmca_prod_]->set_seqno(seqno);
582  }
583 
584  return bulk_mctx_arr_[bmca_prod_];
585  }
586 
587  return NULL;
588 }
589 
590 // Try adding an io-context to bulk context. Returns
591 // - true : if message can be added to bulk context
592 // - false : if message cannot be added to bulk context
593 bool KSyncSock::TryAddToBulk(KSyncBulkMsgContext *bulk_message_context,
594  IoContext *ioc) {
595  if ((bulk_buf_size_ + ioc->GetMsgLen()) >= max_bulk_buf_size_)
596  return false;
597 
599  return false;
600 
601  if (bulk_message_context->io_context_type() != ioc->type())
602  return false;
603 
604  if (use_wait_tree_) {
605  if (bulk_message_context->work_queue_index() != ioc->index())
606  return false;
607  }
608 
609  bulk_buf_size_ += ioc->GetMsgLen();
610  bulk_msg_count_++;
611 
612  bulk_message_context->Insert(ioc);
613  if (ioc->rx_buffer1()) {
614  bulk_message_context->AddReceiveBuffer(ioc->rx_buffer1());
615  ioc->reset_rx_buffer1();
616 
617  }
618  if (ioc->rx_buffer2()) {
619  bulk_message_context->AddReceiveBuffer(ioc->rx_buffer2());
620  ioc->reset_rx_buffer2();
621  }
622  return true;
623 }
624 
626  KSyncBulkMsgContext *bulk_message_context =
627  LocateBulkContext(ioc->GetSeqno(), ioc->type(), ioc->index());
628  // Try adding message to bulk-message list
629  if (TryAddToBulk(bulk_message_context, ioc)) {
630  // Message added to bulk-list. Nothing more to do
631  return true;
632  }
633 
634  // Message cannot be added to bulk-list. Send the current list
635  SendBulkMessage(bulk_message_context, bulk_seq_no_);
636 
637  // Allocate a new context and add message to it
638  bulk_message_context = LocateBulkContext(ioc->GetSeqno(), ioc->type(),
639  ioc->index());
640  assert(TryAddToBulk(bulk_message_context, ioc));
641  return true;
642 }
643 
644 
646 // KSyncSockNetlink routines
648 KSyncSockNetlink::KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
649  : sock_(ios, protocol) {
650  ReceiveBuffForceSize set_rcv_buf;
651  set_rcv_buf = KSYNC_SOCK_RECV_BUFF_SIZE;
652  boost::system::error_code ec;
653  sock_.set_option(set_rcv_buf, ec);
654  if (ec.value() != 0) {
655  LOG(ERROR, "Error Changing netlink receive sock buffer size to " <<
656  set_rcv_buf.value() << " error = " <<
657  boost::system::system_error(ec).what());
658  }
659  boost::asio::socket_base::receive_buffer_size rcv_buf_size;
660  boost::system::error_code ec1;
661  sock_.get_option(rcv_buf_size, ec);
662  LOG(INFO, "Current receive sock buffer size is " << rcv_buf_size.value());
663 }
664 
666 }
667 
668 void KSyncSockNetlink::Init(io_service &ios, int protocol, bool use_work_queue,
669  const std::string &cpu_pin_policy) {
671  KSyncSock::Init(use_work_queue, cpu_pin_policy);
672 }
673 
674 uint32_t KSyncSockNetlink::GetSeqno(char *data) {
675  return GetNetlinkSeqno(data);
676 }
677 
679  return NetlinkMsgDone(data);
680 }
681 
682 bool KSyncSockNetlink::Validate(char *data) {
683  return ValidateNetlink(data);
684 }
685 
686 //netlink socket class for interacting with kernel
687 void KSyncSockNetlink::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
688  HandlerCb cb) {
690  KSyncBufferList::iterator it = iovec->begin();
691  iovec->insert(it, buffer((char *)nl_client_->cl_buf,
692  nl_client_->cl_buf_offset));
694 
695  boost::asio::netlink::raw::endpoint ep;
696  sock_.async_send_to(*iovec, ep, cb);
697 }
698 
699 size_t KSyncSockNetlink::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
701  KSyncBufferList::iterator it = iovec->begin();
702  iovec->insert(it, buffer((char *)nl_client_->cl_buf,
703  nl_client_->cl_buf_offset));
705 
706  boost::asio::netlink::raw::endpoint ep;
707  return sock_.send_to(*iovec, ep);
708 }
709 
710 // Static method to decode non-bulk message
712  assert(ValidateNetlink(data));
713  char *buf = NULL;
714  uint32_t buf_len = 0;
715  GetNetlinkPayload(data, &buf, &buf_len);
716  DecodeSandeshMessages(buf, buf_len, ctxt, NLA_ALIGNTO);
717 }
718 
719 bool KSyncSockNetlink::Decoder(char *data, AgentSandeshContext *context) {
720  NetlinkDecoder(data, context);
721  return true;
722 }
723 
724 // Static method used in ksync_sock_user only
726  bool more) {
727  assert(ValidateNetlink(data));
728  char *buf = NULL;
729  uint32_t buf_len = 0;
730  GetNetlinkPayload(data, &buf, &buf_len);
731  KSyncBulkSandeshContext *bulk_sandesh_context =
732  dynamic_cast<KSyncBulkSandeshContext *>(ctxt);
733  bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, more);
734 }
735 
737  KSyncBulkSandeshContext *bulk_sandesh_context) {
738  // Get sandesh buffer and buffer-length
739  uint32_t buf_len = 0;
740  char *buf = NULL;
741  GetNetlinkPayload(data, &buf, &buf_len);
742  return bulk_sandesh_context->Decoder(buf, buf_len, NLA_ALIGNTO, IsMoreData(data));
743 }
744 
745 void KSyncSockNetlink::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
746  sock_.async_receive(buf, cb);
747 }
748 
749 void KSyncSockNetlink::Receive(mutable_buffers_1 buf) {
750  sock_.receive(buf);
751  struct nlmsghdr *nlh = buffer_cast<struct nlmsghdr *>(buf);
752  if (nlh->nlmsg_type == NLMSG_ERROR) {
753  LOG(ERROR, "Netlink error for seqno " << nlh->nlmsg_seq
754  << " len " << nlh->nlmsg_len);
755  assert(0);
756  }
757 }
758 
760 // KSyncSockUdp routines
762 //Udp socket class for interacting with kernel
763 KSyncSockUdp::KSyncSockUdp(boost::asio::io_context &ios, int port) :
764  sock_(ios, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
765  server_ep_(boost::asio::ip::address::from_string("127.0.0.1"), port) {
766 }
767 
768 void KSyncSockUdp::Init(io_service &ios, int port,
769  const std::string &cpu_pin_policy) {
771  KSyncSock::Init(false, cpu_pin_policy);
772 }
773 
774 uint32_t KSyncSockUdp::GetSeqno(char *data) {
775  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
776  return hdr->seq_no;
777 }
778 
779 bool KSyncSockUdp::IsMoreData(char *data) {
780  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
781  return ((hdr->flags & UVR_MORE) == UVR_MORE);
782 }
783 
784 // We dont expect any non-bulk operation on UDP
785 bool KSyncSockUdp::Decoder(char *data, AgentSandeshContext *context) {
786  assert(0);
787  return false;
788 }
789 
791  KSyncBulkSandeshContext *bulk_sandesh_context) {
792  struct uvr_msg_hdr *hdr = (struct uvr_msg_hdr *)data;
793  uint32_t buf_len = hdr->msg_len;
794  char *buf = data + sizeof(struct uvr_msg_hdr);
795  return bulk_sandesh_context->Decoder(buf, buf_len, 1, IsMoreData(data));
796 }
797 
798 void KSyncSockUdp::AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no,
799  HandlerCb cb) {
800  struct uvr_msg_hdr hdr;
801  hdr.seq_no = seq_no;
802  hdr.flags = 0;
803  hdr.msg_len = bulk_buf_size_;
804 
805  KSyncBufferList::iterator it = iovec->begin();
806  iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
807 
808  sock_.async_send_to(*iovec, server_ep_, cb);
809 }
810 
811 size_t KSyncSockUdp::SendTo(KSyncBufferList *iovec, uint32_t seq_no) {
812  struct uvr_msg_hdr hdr;
813  hdr.seq_no = seq_no;
814  hdr.flags = 0;
815  hdr.msg_len = bulk_buf_size_;
816 
817  KSyncBufferList::iterator it = iovec->begin();
818  iovec->insert(it, buffer((char *)(&hdr), sizeof(hdr)));
819 
820  return sock_.send_to(*iovec, server_ep_, MSG_DONTWAIT);
821 }
822 
823 bool KSyncSockUdp::Validate(char *data) {
824  return true;
825 }
826 
827 void KSyncSockUdp::AsyncReceive(mutable_buffers_1 buf, HandlerCb cb) {
828  boost::asio::ip::udp::endpoint ep;
829  sock_.async_receive_from(buf, ep, cb);
830 }
831 
832 void KSyncSockUdp::Receive(mutable_buffers_1 buf) {
833  boost::asio::ip::udp::endpoint ep;
834  sock_.receive_from(buf, ep);
835 }
836 
838  KSyncBulkMsgContext *bulk_message_context = NULL;
839  KSyncBulkSandeshContext *bulk_sandesh_context;
840  uint32_t seqno = GetSeqno(data);
841 
842  assert(!use_wait_tree_);
843  Validate(data);
844 
845  bulk_sandesh_context = GetBulkSandeshContext(seqno);
846  bulk_message_context = bulk_mctx_arr_[bmca_cons_];
847  assert(bulk_message_context->seqno() == seqno);
848 
849  bulk_sandesh_context->set_bulk_message_context(bulk_message_context);
850  BulkDecoder(data, bulk_sandesh_context);
851 
852  // Remove the IoContext only on last netlink message
853  if (IsMoreData(data) == false) {
854  delete bulk_message_context;
855  bmca_cons_++;
857  bmca_cons_ = 0;
858  }
859  }
860 
861  return;
862 }
863 
865 // KSyncIoContext routines
868  int msg_len, char *msg,
869  KSyncEntry::KSyncEvent event) :
870  IoContext(msg, msg_len, 0,
871  sock->GetAgentSandeshContext(sync_entry->GetTableIndex()),
872  IoContext::IOC_KSYNC, sync_entry->GetTableIndex()),
873  entry_(sync_entry), event_(event), sock_(sock) {
874  SetSeqno(sock->AllocSeqNo(type(), index()));
875 }
876 
879 }
880 
883 }
884 
886 // Routines for KSyncBulkSandeshContext
889  AgentSandeshContext(), bulk_msg_context_(NULL) { }
890 
892 }
893 
894 // Sandesh responses for old context are done. Check for any errors
897  AgentSandeshContext *sandesh_context = io_context->GetSandeshContext();
898 
899  sandesh_context->set_ksync_io_ctx(NULL);
900  if (sandesh_context->GetErrno() != 0 &&
901  sandesh_context->GetErrno() != EEXIST) {
902  io_context->ErrorHandler(sandesh_context->GetErrno());
903  }
904  io_context->Handler();
905 }
906 
910  AgentSandeshContext *sandesh_context = io_context.GetSandeshContext();
911  sandesh_context->set_ksync_io_ctx
912  (static_cast<KSyncIoContext *>(&io_context));
913 }
914 
915 // Process the sandesh messages
916 // There can be more then one sandesh messages in the netlink buffer.
917 // Iterate and process all of them
918 bool KSyncBulkSandeshContext::Decoder(char *data, uint32_t len,
919  uint32_t alignment, bool more) {
920  DecodeSandeshMessages(data, len, this, alignment);
923  if (more == true)
924  return false;
925 
926  IoContextDone();
927 
928  // No more netlink messages. Validate that iterator points to last element
929  // in IoContextList
933  return true;
934 }
935 
938  context->SetErrno(err);
939 }
940 
943  return bulk_msg_context_->io_context_list_it_->GetSandeshContext();
944 }
945 
946 void KSyncBulkSandeshContext::IfMsgHandler(vr_interface_req *req) {
948  context->IfMsgHandler(req);
949 }
950 
951 void KSyncBulkSandeshContext::NHMsgHandler(vr_nexthop_req *req) {
953  context->NHMsgHandler(req);
954 }
955 
958  context->RouteMsgHandler(req);
959 }
960 
963  context->MplsMsgHandler(req);
964 }
965 
968  context->QosConfigMsgHandler(req);
969 }
970 
973  context->ForwardingClassMsgHandler(req);
974 }
975 
976 // vr_response message is treated as delimiter in a bulk-context. So, move to
977 // next io-context within bulk-message context.
979  AgentSandeshContext *sandesh_context = NULL;
980  // If this is first vr_reponse received, move io-context to first entry in
981  // bulk context
985  sandesh_context =
986  bulk_msg_context_->io_context_list_it_->GetSandeshContext();
987  IoContextStart();
988  } else {
989  // Sandesh responses for old io-context are done.
990  // Check for any errors and trigger state-machine for old io-context
991  IoContextDone();
992  // Move to the next io-context
996  sandesh_context =
997  bulk_msg_context_->io_context_list_it_->GetSandeshContext();
998  IoContextStart();
999  }
1000  return sandesh_context->VrResponseMsgHandler(resp);
1001 }
1002 
1005  context->MirrorMsgHandler(req);
1006 }
1007 
1010  context->FlowMsgHandler(req);
1011 }
1012 
1015  context->FlowResponseHandler(req);
1016 }
1017 
1018 void KSyncBulkSandeshContext::VrfAssignMsgHandler(vr_vrf_assign_req *req) {
1020  context->VrfAssignMsgHandler(req);
1021 }
1022 
1025  context->VrfMsgHandler(req);
1026 }
1027 
1028 void KSyncBulkSandeshContext::VrfStatsMsgHandler(vr_vrf_stats_req *req) {
1030  context->VrfStatsMsgHandler(req);
1031 }
1032 
1033 void KSyncBulkSandeshContext::DropStatsMsgHandler(vr_drop_stats_req *req) {
1035  context->DropStatsMsgHandler(req);
1036 }
1037 
1040  context->VxLanMsgHandler(req);
1041 }
1042 
1045  context->VrouterOpsMsgHandler(req);
1046 }
1047 
1049 // KSyncBulkMsgContext routines
1052  uint32_t index) :
1053  io_context_list_(), io_context_type_(type), work_queue_index_(index),
1054  rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1055 }
1056 
1058  io_context_list_(), io_context_type_(rhs.io_context_type_),
1059  work_queue_index_(rhs.work_queue_index_),
1060  rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1061  assert(rhs.vr_response_count_ == 0);
1062  assert(rhs.rx_buffer_index_ == 0);
1063  assert(rhs.io_context_list_.size() == 0);
1064 }
1065 
1067  void operator() (IoContext *io_context) { delete io_context; }
1068 };
1069 
1071  assert(vr_response_count_ == io_context_list_.size());
1072  io_context_list_.clear_and_dispose(IoContextDisposer());
1073  for (uint32_t i = 0; i < rx_buffer_index_; i++) {
1074  delete[] rx_buffers_[i];
1075  }
1076 }
1077 
1079  if (rx_buffer_index_ == 0)
1080  return new char[KSyncSock::kBufLen];
1081 
1082  return rx_buffers_[--rx_buffer_index_];
1083 }
1084 
1087  rx_buffers_[rx_buffer_index_++] = buff;
1088 }
1089 
1091  io_context_list_.push_back(*ioc);
1092  return;
1093 }
1094 
1096  IoContextList::iterator it = io_context_list_.begin();
1097  while (it != io_context_list_.end()) {
1098  iovec->push_back(buffer(it->GetMsg(), it->GetMsgLen()));
1099  it++;
1100  }
1101 }
bool ProcessRxData(KSyncRxQueueData data)
Definition: ksync_sock.cc:325
uint32_t GetSeqno() const
Definition: ksync_sock.h:138
KSyncRxWorkQueue rx_process_queue_
Definition: ksync_sock.h:503
static const unsigned kInvalidBulkSeqNo
Definition: ksync_sock.h:323
static const int kRxWorkQueueCount
Definition: ksync_sock.h:314
static int GetNetlinkFamilyId()
Definition: ksync_sock.h:393
AgentSandeshContext * GetSandeshContext()
Definition: ksync_sock.cc:941
void VrfMsgHandler(vr_vrf_req *req)
Definition: ksync_sock.cc:1023
void SetSeqno(uint32_t seq)
Definition: ksync_sock.cc:278
KSyncEntry * entry_
Definition: ksync_sock.h:181
uint32_t WaitTreeSize() const
Definition: ksync_sock.cc:274
void VrfAssignMsgHandler(vr_vrf_assign_req *req)
Definition: ksync_sock.cc:1018
KSyncEntry::KSyncEvent event_
Definition: ksync_sock.h:182
static std::unique_ptr< KSyncSock > sock_
Definition: ksync_sock.h:504
std::size_t BlockingSend(char *msg, int msg_len)
Definition: ksync_sock.cc:444
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
void operator()(IoContext *io_context)
Definition: ksync_sock.cc:1067
virtual void MplsMsgHandler(vr_mpls_req *req)=0
Type type()
Definition: ksync_sock.h:135
virtual void SetErrno(int err)
Definition: ksync_sock.h:78
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
void SendAsync(KSyncEntry *entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:455
static const char * io_wq_names[MAX_WORK_QUEUES]
Definition: ksync_sock.h:104
void RouteMsgHandler(vr_route_req *req)
Definition: ksync_sock.cc:956
uint32_t bmca_prod_
Definition: ksync_sock.h:450
void reset_rx_buffer1()
Definition: ksync_sock.h:142
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)=0
static AgentSandeshContext * agent_sandesh_ctx_[kRxWorkQueueCount]
Definition: ksync_sock.h:511
virtual bool Validate(char *data)=0
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:307
#define KSYNC_SOCK_RECV_BUFF_SIZE
Definition: ksync_sock.h:30
void AddReceiveBuffer(char *buff)
Definition: ksync_sock.cc:1085
virtual void DropStatsMsgHandler(vr_drop_stats_req *req)=0
virtual bool pre_alloc_rx_buffer() const
Definition: ksync_entry.h:149
KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:867
KSyncBulkSandeshContext * GetBulkSandeshContext(uint32_t seqno)
Definition: ksync_sock.cc:331
void DropStatsMsgHandler(vr_drop_stats_req *req)
Definition: ksync_sock.cc:1033
bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context, const KSyncRxData &data)
Definition: ksync_sock.cc:386
bool read_inline_
Definition: ksync_sock.h:488
void SetSeqno(uint32_t seqno)
Definition: ksync_sock.h:137
uint32_t GetMsgLen() const
Definition: ksync_sock.h:140
#define KSYNC_BMC_ARR_SIZE
Definition: ksync_sock.h:31
virtual void Receive(boost::asio::mutable_buffers_1)
Definition: ksync_sock.cc:832
virtual void FlowResponseHandler(vr_flow_response *req)
Definition: ksync_sock.h:66
void set_bulk_message_context(KSyncBulkMsgContext *bulk_context)
Definition: ksync_sock.h:299
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
Definition: ksync_sock.h:37
KSyncBulkMsgContext * LocateBulkContext(uint32_t seqno, IoContext::Type io_context_type, uint32_t work_queue_index)
Definition: ksync_sock.cc:544
#define KSYNC_ERROR(obj,...)
Definition: ksync_entry.h:16
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
Definition: ksync_sock.h:396
virtual void MirrorMsgHandler(vr_mirror_req *req)=0
void VrfStatsMsgHandler(vr_vrf_stats_req *req)
Definition: ksync_sock.cc:1028
void GenericSend(IoContext *ctx)
Definition: ksync_sock.cc:451
static void Shutdown()
Definition: ksync_sock.cc:220
std::pair< uint32_t, KSyncBulkMsgContext > WaitTreePair
Definition: ksync_sock.h:326
virtual ~KSyncSock()
Definition: ksync_sock.cc:198
void MplsMsgHandler(vr_mpls_req *req)
Definition: ksync_sock.cc:961
void ForwardingClassMsgHandler(vr_fc_map_req *req)
Definition: ksync_sock.cc:971
virtual uint32_t GetSeqno(char *data)=0
static int vnsw_netlink_family_id_
Definition: ksync_sock.h:506
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
Definition: ksync_sock.cc:140
int GetErrno() const
Definition: ksync_sock.h:80
IoContextList io_context_list_
Definition: ksync_sock.h:251
int VrResponseMsgHandler(vr_response *resp)
Definition: ksync_sock.cc:978
void ProcessDataInline(char *data)
Definition: ksync_sock.cc:837
uint32_t bulk_seq_no_
Definition: ksync_sock.h:444
KSyncBulkMsgContext(IoContext::Type type, uint32_t index)
Definition: ksync_sock.cc:1051
tbb::atomic< uint32_t > uve_seqno_
Definition: ksync_sock.h:485
virtual void VxLanMsgHandler(vr_vxlan_req *req)=0
bool SendAsyncImpl(IoContext *ioc)
Definition: ksync_sock.cc:625
void VrouterOpsMsgHandler(vrouter_ops *req)
Definition: ksync_sock.cc:1043
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
Definition: ksync_sock.cc:811
virtual uint32_t GetSeqno(char *data)
Definition: ksync_sock.cc:774
virtual void NetlinkAck(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
Definition: ksync_sock.cc:827
void MirrorMsgHandler(vr_mirror_req *req)
Definition: ksync_sock.cc:1003
int GetTaskId(const std::string &name)
Definition: task.cc:856
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:489
char * rx_buffer2()
Definition: ksync_sock.h:143
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
Definition: ksync_sock.cc:798
bool Enqueue(IoContext *io_context)
bool ValidateNetlink(char *data)
Definition: ksync_sock.cc:68
nl_client * nl_client_
Definition: ksync_sock.h:428
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
Definition: ksync_sock.cc:113
virtual ~KSyncBulkSandeshContext()
Definition: ksync_sock.cc:891
uint32_t max_bulk_buf_size_
Definition: ksync_sock.h:440
uint32_t GetNetlinkSeqno(char *data)
Definition: ksync_sock.cc:57
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)=0
virtual void Receive(boost::asio::mutable_buffers_1)=0
virtual bool IsMoreData(char *data)=0
virtual void VrouterOpsMsgHandler(vrouter_ops *req)=0
KSyncSock * sock_
Definition: ksync_sock.h:184
KSyncReceiveQueue * uve_rx_queue[kRxWorkQueueCount]
Definition: ksync_sock.h:432
bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context)
Definition: ksync_sock.cc:349
uint32_t work_queue_index() const
Definition: ksync_sock.h:245
boost::asio::ip::udp::socket sock_
Definition: ksync_sock.h:562
virtual void VrfMsgHandler(vr_vrf_req *req)=0
void Data(KSyncBufferList *iovec)
Definition: ksync_sock.cc:1095
uint8_t type
Definition: load_balance.h:109
uint32_t vr_response_count_
Definition: ksync_sock.h:268
uint32_t bulk_buf_size_
Definition: ksync_sock.h:446
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)=0
virtual void ErrorHandler(int err)
Definition: ksync_sock.h:132
static KSyncSock * Get(DBTablePartBase *partition)
Definition: ksync_sock.cc:340
tbb::mutex mutex_
Definition: ksync_sock.h:427
void ReadHandler(const boost::system::error_code &error, size_t bytes_transferred)
Definition: ksync_sock.cc:364
virtual bool IsMoreData(char *data)
Definition: ksync_sock.cc:779
static void Start(bool read_inline)
Definition: ksync_sock.cc:252
static TaskScheduler * GetInstance()
Definition: task.cc:547
static tbb::atomic< bool > shutdown_
Definition: ksync_sock.h:512
static void SetSockTableEntry(KSyncSock *sock)
Definition: ksync_sock.cc:264
AgentSandeshContext * GetSandeshContext()
Definition: ksync_sock.h:134
virtual KSyncObject * GetObject() const =0
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
Definition: ksync_sock.cc:918
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
Definition: ksync_sock.h:328
void SetMeasureQueueDelay(bool val)
Definition: ksync_sock.cc:245
uint32_t AllocSeqNo(IoContext::Type type)
Definition: ksync_sock.cc:299
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:226
uint32_t rx_buffer_index_
Definition: ksync_sock.h:262
static int32_t ReceiveBinaryMsgOne(u_int8_t *buf, u_int32_t buf_len, int *error, SandeshContext *client_context)
Definition: sandesh.cc:642
KSyncTxQueue send_queue_
Definition: ksync_sock.h:431
bool BlockingRecv()
Definition: ksync_sock.cc:419
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc)
Definition: ksync_sock.cc:593
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
Definition: ksync_sock.cc:785
static uint32_t GetPid()
Definition: ksync_sock.h:392
KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount]
Definition: ksync_sock.h:493
virtual void FlowMsgHandler(vr_flow_req *req)=0
void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
Definition: ksync_sock.cc:322
int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno)
Definition: ksync_sock.cc:503
KSyncReceiveQueue * AllocQueue(KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance, const char *name)
Definition: ksync_sock.cc:233
KSyncBulkMsgContext * bulk_msg_context_
Definition: ksync_sock.h:335
uint32_t seqno()
Definition: ksync_sock.h:247
void WriteHandler(const boost::system::error_code &error, size_t bytes_transferred)
Definition: ksync_sock.cc:469
void QosConfigMsgHandler(vr_qos_map_req *req)
Definition: ksync_sock.cc:966
virtual void QosConfigMsgHandler(vr_qos_map_req *req)=0
void reset_rx_buffer2()
Definition: ksync_sock.h:144
IoContext::Type io_context_type() const
Definition: ksync_sock.h:240
void set_ksync_io_ctx(const KSyncIoContext *ioc)
Definition: ksync_sock.h:81
char * rx_buff_
Definition: ksync_sock.h:483
static const unsigned kMaxRxBufferCount
Definition: ksync_sock.h:233
static std::string VrouterErrorToString(uint32_t error)
#define KSYNC_DEFAULT_Q_ID_SEQ
Definition: ksync_sock.h:28
KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount]
Definition: ksync_sock.h:492
void SetErrno(int err)
Definition: ksync_sock.cc:936
void Insert(IoContext *ioc)
Definition: ksync_sock.cc:1090
KSyncReceiveQueue * ksync_rx_queue[kRxWorkQueueCount]
Definition: ksync_sock.h:433
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
Definition: ksync_sock.cc:790
uint32_t max_bulk_msg_count_
Definition: ksync_sock.h:438
static pid_t pid_
Definition: ksync_sock.h:505
virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req)=0
bool NetlinkMsgDone(char *data)
Definition: ksync_sock.cc:62
virtual void RouteMsgHandler(vr_route_req *req)=0
void InitNetlink(nl_client *client)
Definition: ksync_sock.cc:126
virtual bool Validate(char *data)
Definition: ksync_sock.cc:823
virtual void IfMsgHandler(vr_interface_req *req)=0
virtual void ErrorHandler(int err, uint32_t seqno, KSyncEvent event) const
virtual void NHMsgHandler(vr_nexthop_req *req)=0
KSyncBulkMsgContext * bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE]
Definition: ksync_sock.h:452
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)=0
WaitTree wait_tree_
Definition: ksync_sock.h:430
KSyncSockUdp(boost::asio::io_context &ios, int port)
Definition: ksync_sock.cc:763
virtual void Handler()
Definition: ksync_sock.h:131
uint32_t index() const
Definition: ksync_sock.h:145
virtual void ForwardingClassMsgHandler(vr_fc_map_req *req)=0
#define LOG(_Level, _Msg)
Definition: logging.h:33
bool process_data_inline_
Definition: ksync_sock.h:491
static void SetNetlinkFamilyId(int id)
Definition: ksync_sock.cc:269
uint32_t bmca_cons_
Definition: ksync_sock.h:451
virtual void Handler()
Definition: ksync_sock.cc:877
bool use_wait_tree_
Definition: ksync_sock.h:490
KSyncReceiveQueue * GetReceiveQueue(IoContext::Type type, uint32_t instance)
Definition: ksync_sock.cc:303
char * GetReceiveBuffer()
Definition: ksync_sock.cc:1078
char * rx_buffer1() const
Definition: ksync_sock.h:141
void NHMsgHandler(vr_nexthop_req *req)
Definition: ksync_sock.cc:951
static void Init(boost::asio::io_context &ios, int port, const std::string &cpu_pin_policy)
Definition: ksync_sock.cc:768
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)=0
void FlowResponseHandler(vr_flow_response *req)
Definition: ksync_sock.cc:1013
virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req)=0
int tx_count_
Definition: ksync_sock.h:496
IoContextList::iterator io_context_list_it_
Definition: ksync_sock.h:270
void ErrorHandler(int err)
Definition: ksync_sock.cc:881
boost::asio::detail::socket_option::integer< SOL_SOCKET, SO_RCVBUFFORCE > ReceiveBuffForceSize
Definition: ksync_sock.cc:40
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
void ResetNetlink(nl_client *client)
Definition: ksync_sock.cc:133
void VxLanMsgHandler(vr_vxlan_req *req)
Definition: ksync_sock.cc:1038
uint32_t bulk_msg_count_
Definition: ksync_sock.h:448
boost::asio::ip::udp::endpoint server_ep_
Definition: ksync_sock.h:563
void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context, uint32_t alignment)
Definition: ksync_sock.cc:147
void set_name(const std::string &name)
Definition: queue_task.h:307
tbb::atomic< uint32_t > seqno_
Definition: ksync_sock.h:484
KSyncEntry::KSyncEvent event_
Definition: ksync_sock.h:349
virtual int VrResponseMsgHandler(vr_response *resp)=0
char * rx_buffers_[kMaxRxBufferCount]
Definition: ksync_sock.h:260
static const unsigned kBufLen
Definition: ksync_sock.h:316
void FlowMsgHandler(vr_flow_req *req)
Definition: ksync_sock.cc:1008
void OnEmptyQueue(bool done)
Definition: ksync_sock.cc:481
void IfMsgHandler(vr_interface_req *req)
Definition: ksync_sock.cc:946