9 #include <linux/netlink.h>
10 #include <linux/rtnetlink.h>
11 #include <linux/genetlink.h>
12 #include <linux/sockios.h>
14 #include <sys/socket.h>
16 #include <boost/bind.hpp>
29 #include "ksync_types.h"
33 #include "vr_genetlink.h"
36 using namespace boost::asio;
39 typedef boost::asio::detail::socket_option::integer<SOL_SOCKET,
58 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
59 return nlh->nlmsg_seq;
63 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
64 return ((nlh->nlmsg_flags & NLM_F_MULTI) != 0);
69 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
70 if (nlh->nlmsg_type == NLMSG_ERROR) {
71 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq <<
" len "
78 LOG(ERROR,
"Length of " << nlh->nlmsg_len <<
" is more than expected "
84 if (nlh->nlmsg_type == NLMSG_DONE) {
90 LOG(ERROR,
"Netlink unknown message type : " << nlh->nlmsg_type);
95 struct genlmsghdr *genlh = (
struct genlmsghdr *) (data + NLMSG_HDRLEN);
96 if (genlh->cmd != SANDESH_REQUEST) {
97 LOG(ERROR,
"Unknown generic netlink cmd : " << genlh->cmd);
102 struct nlattr * attr = (
struct nlattr *)(data + NLMSG_HDRLEN
104 if (attr->nla_type != NL_ATTR_VR_MESSAGE_PROTOCOL) {
105 LOG(ERROR,
"Unknown generic netlink TLV type : " << attr->nla_type);
114 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
116 if (nlh->nlmsg_type == NLMSG_DONE) {
119 len = NLMSG_HDRLEN + GENL_HDRLEN + NLA_HDRLEN;
123 *buf_len = nlh->nlmsg_len - len;
128 unsigned char *nl_buf;
130 assert(nl_build_header(client, &nl_buf, &nl_buf_len) >= 0);
134 unsigned char *nl_buf;
136 client->cl_buf_offset = 0;
137 nl_build_header(client, &nl_buf, &nl_buf_len);
141 nl_update_header(client, len);
142 struct nlmsghdr *nlh = (
struct nlmsghdr *)client->cl_buf;
144 nlh->nlmsg_seq = seq_no;
148 uint32_t alignment) {
149 while (buf_len > (alignment - 1)) {
152 &error, sandesh_context);
153 if (decode_len < 0) {
154 LOG(DEBUG,
"Incorrect decode len " << decode_len);
158 buf_len -= decode_len;
166 nl_client_(NULL), wait_tree_(), send_queue_(this),
167 max_bulk_msg_count_(kMaxBulkMsgCount), max_bulk_buf_size_(kMaxBulkMsgSize),
168 bulk_seq_no_(kInvalidBulkSeqNo), bulk_buf_size_(0), bulk_msg_count_(0),
169 rx_buff_(NULL), read_inline_(true), bulk_msg_context_(NULL),
170 use_wait_tree_(true), process_data_inline_(false),
171 ksync_bulk_sandesh_context_(), uve_bulk_sandesh_context_(),
172 tx_count_(0), ack_count_(0), err_count_(0),
173 rx_process_queue_(
TaskScheduler::GetInstance()->GetTaskId(
"Agent::KSync"), 0,
174 boost::bind(&
KSyncSock::ProcessRxData, this, _1)) {
177 uint32_t uve_task_id =
179 uint32_t ksync_task_id =
183 ksync_task_id, i,
"KSync Receive Queue");
185 uve_task_id, i,
"KSync UVE Receive Queue");
188 nl_client_ = (nl_client *)malloc(
sizeof(nl_client));
222 sock_->send_queue_.Shutdown();
227 sock_->send_queue_.Init(use_work_queue, cpu_pin_policy);
238 &ctxt[instance], _1));
240 sprintf(tmp,
"%s-%d", name, instance);
246 sock_->send_queue_.set_measure_busy_time(val);
253 sock_->read_inline_ = read_inline;
254 if (
sock_->read_inline_) {
261 placeholders::bytes_transferred));
265 assert(
sock_.get() == NULL);
289 seq =
seqno_.fetch_and_add(1);
365 size_t bytes_transferred) {
367 LOG(ERROR,
"Error reading from Ksync sock. Error : " <<
368 boost::system::system_error(error).what());
381 placeholders::bytes_transferred));
389 WaitTree::iterator it;
393 tbb::mutex::scoped_lock lock(
mutex_);
397 LOG(ERROR,
"KSync error in finding for sequence number : "
401 bulk_message_context = &(it->second);
411 tbb::mutex::scoped_lock lock(
mutex_);
431 KSYNC_ERROR(VRouterError,
"VRouter operation failed. Error <",
434 ">. Object <",
"N/A",
">. State <",
"N/A",
435 ">. Message number :", 0);
446 iovec.push_back(buffer(msg, msg_len));
460 ioc->rx_buffer1_ =
new char [
kBufLen];
461 ioc->rx_buffer2_ =
new char [
kBufLen];
463 ioc->rx_buffer1_ = ioc->rx_buffer2_ = NULL;
470 size_t bytes_transferred) {
472 LOG(ERROR,
"Ksync sock write error : " <<
473 boost::system::system_error(error).what());
488 tbb::mutex::scoped_lock lock(
mutex_);
491 bulk_message_context = &it->second;
507 bulk_message_context->
Data(&iovec);
521 placeholders::bytes_transferred));
524 bool more_data =
false;
545 uint32_t work_queue_index) {
547 if (bulk_seq_no_ == kInvalidBulkSeqNo) {
548 assert(bulk_msg_context_ == NULL);
549 bulk_seq_no_ = seqno;
555 return bulk_msg_context_;
558 if (use_wait_tree_) {
559 tbb::mutex::scoped_lock lock(mutex_);
560 if (bulk_seq_no_ == kInvalidBulkSeqNo) {
561 bulk_seq_no_ = seqno;
570 WaitTree::iterator it = wait_tree_.find(bulk_seq_no_);
571 assert(it != wait_tree_.end());
574 if (bulk_seq_no_ == kInvalidBulkSeqNo) {
575 bulk_seq_no_ = seqno;
581 bulk_mctx_arr_[bmca_prod_]->set_seqno(seqno);
584 return bulk_mctx_arr_[bmca_prod_];
612 bulk_message_context->
Insert(ioc);
649 : sock_(ios, protocol) {
652 boost::system::error_code ec;
653 sock_.set_option(set_rcv_buf, ec);
654 if (ec.value() != 0) {
655 LOG(ERROR,
"Error Changing netlink receive sock buffer size to " <<
656 set_rcv_buf.value() <<
" error = " <<
657 boost::system::system_error(ec).what());
659 boost::asio::socket_base::receive_buffer_size rcv_buf_size;
660 boost::system::error_code ec1;
661 sock_.get_option(rcv_buf_size, ec);
662 LOG(INFO,
"Current receive sock buffer size is " << rcv_buf_size.value());
669 const std::string &cpu_pin_policy) {
690 KSyncBufferList::iterator it = iovec->begin();
691 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf,
695 boost::asio::netlink::raw::endpoint ep;
696 sock_.async_send_to(*iovec, ep, cb);
701 KSyncBufferList::iterator it = iovec->begin();
702 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf,
706 boost::asio::netlink::raw::endpoint ep;
707 return sock_.send_to(*iovec, ep);
714 uint32_t buf_len = 0;
729 uint32_t buf_len = 0;
733 bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO, more);
739 uint32_t buf_len = 0;
742 return bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO,
IsMoreData(data));
746 sock_.async_receive(buf, cb);
751 struct nlmsghdr *nlh = buffer_cast<
struct nlmsghdr *>(buf);
752 if (nlh->nlmsg_type == NLMSG_ERROR) {
753 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq
754 <<
" len " << nlh->nlmsg_len);
764 sock_(ios, boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), 0)),
765 server_ep_(boost::asio::ip::address::from_string(
"127.0.0.1"), port) {
769 const std::string &cpu_pin_policy) {
775 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
780 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
781 return ((hdr->flags & UVR_MORE) == UVR_MORE);
792 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
793 uint32_t buf_len = hdr->msg_len;
794 char *buf = data +
sizeof(
struct uvr_msg_hdr);
800 struct uvr_msg_hdr hdr;
805 KSyncBufferList::iterator it = iovec->begin();
806 iovec->insert(it, buffer((
char *)(&hdr),
sizeof(hdr)));
812 struct uvr_msg_hdr hdr;
817 KSyncBufferList::iterator it = iovec->begin();
818 iovec->insert(it, buffer((
char *)(&hdr),
sizeof(hdr)));
828 boost::asio::ip::udp::endpoint ep;
829 sock_.async_receive_from(buf, ep, cb);
833 boost::asio::ip::udp::endpoint ep;
834 sock_.receive_from(buf, ep);
847 assert(bulk_message_context->
seqno() == seqno);
854 delete bulk_message_context;
868 int msg_len,
char *msg,
871 sock->GetAgentSandeshContext(sync_entry->GetTableIndex()),
872 IoContext::IOC_KSYNC, sync_entry->GetTableIndex()),
873 entry_(sync_entry), event_(event), sock_(sock) {
900 if (sandesh_context->
GetErrno() != 0 &&
901 sandesh_context->
GetErrno() != EEXIST) {
912 (static_cast<KSyncIoContext *>(&io_context));
919 uint32_t alignment,
bool more) {
1053 io_context_list_(), io_context_type_(type), work_queue_index_(index),
1054 rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1058 io_context_list_(), io_context_type_(rhs.io_context_type_),
1059 work_queue_index_(rhs.work_queue_index_),
1060 rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1098 iovec->push_back(buffer(it->GetMsg(), it->GetMsgLen()));
bool ProcessRxData(KSyncRxQueueData data)
uint32_t GetSeqno() const
KSyncRxWorkQueue rx_process_queue_
static const unsigned kInvalidBulkSeqNo
static const int kRxWorkQueueCount
static int GetNetlinkFamilyId()
AgentSandeshContext * GetSandeshContext()
void VrfMsgHandler(vr_vrf_req *req)
void SetSeqno(uint32_t seq)
uint32_t WaitTreeSize() const
void VrfAssignMsgHandler(vr_vrf_assign_req *req)
KSyncEntry::KSyncEvent event_
static std::unique_ptr< KSyncSock > sock_
std::size_t BlockingSend(char *msg, int msg_len)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void operator()(IoContext *io_context)
virtual void MplsMsgHandler(vr_mpls_req *req)=0
virtual void SetErrno(int err)
void Shutdown(bool delete_entries=true)
void SendAsync(KSyncEntry *entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
static const char * io_wq_names[MAX_WORK_QUEUES]
void RouteMsgHandler(vr_route_req *req)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)=0
static AgentSandeshContext * agent_sandesh_ctx_[kRxWorkQueueCount]
virtual ~KSyncSockNetlink()
virtual bool Validate(char *data)=0
KSyncBulkMsgContext * bulk_msg_context_
#define KSYNC_SOCK_RECV_BUFF_SIZE
virtual void Receive(boost::asio::mutable_buffers_1)
void AddReceiveBuffer(char *buff)
virtual void DropStatsMsgHandler(vr_drop_stats_req *req)=0
virtual bool pre_alloc_rx_buffer() const
KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
KSyncBulkSandeshContext * GetBulkSandeshContext(uint32_t seqno)
void DropStatsMsgHandler(vr_drop_stats_req *req)
bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context, const KSyncRxData &data)
virtual bool IsMoreData(char *data)
KSyncBulkSandeshContext()
void SetSeqno(uint32_t seqno)
uint32_t GetMsgLen() const
virtual bool Validate(char *data)
#define KSYNC_BMC_ARR_SIZE
virtual void Receive(boost::asio::mutable_buffers_1)
virtual void FlowResponseHandler(vr_flow_response *req)
void set_bulk_message_context(KSyncBulkMsgContext *bulk_context)
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
KSyncBulkMsgContext * LocateBulkContext(uint32_t seqno, IoContext::Type io_context_type, uint32_t work_queue_index)
#define KSYNC_ERROR(obj,...)
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
virtual void MirrorMsgHandler(vr_mirror_req *req)=0
void VrfStatsMsgHandler(vr_vrf_stats_req *req)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
void GenericSend(IoContext *ctx)
std::pair< uint32_t, KSyncBulkMsgContext > WaitTreePair
void MplsMsgHandler(vr_mpls_req *req)
void ForwardingClassMsgHandler(vr_fc_map_req *req)
virtual uint32_t GetSeqno(char *data)=0
static int vnsw_netlink_family_id_
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
IoContextList io_context_list_
int VrResponseMsgHandler(vr_response *resp)
void ProcessDataInline(char *data)
KSyncBulkMsgContext(IoContext::Type type, uint32_t index)
tbb::atomic< uint32_t > uve_seqno_
virtual uint32_t GetSeqno(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual void VxLanMsgHandler(vr_vxlan_req *req)=0
bool SendAsyncImpl(IoContext *ioc)
void VrouterOpsMsgHandler(vrouter_ops *req)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
virtual uint32_t GetSeqno(char *data)
virtual void NetlinkAck(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
void MirrorMsgHandler(vr_mirror_req *req)
static void NetlinkBulkDecoder(char *data, SandeshContext *ctxt, bool more)
int GetTaskId(const std::string &name)
KSyncBulkMsgContext * bulk_msg_context_
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
bool Enqueue(IoContext *io_context)
bool ValidateNetlink(char *data)
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
virtual ~KSyncBulkSandeshContext()
uint32_t max_bulk_buf_size_
uint32_t GetNetlinkSeqno(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)=0
virtual void Receive(boost::asio::mutable_buffers_1)=0
virtual bool IsMoreData(char *data)=0
virtual void VrouterOpsMsgHandler(vrouter_ops *req)=0
static void Init(boost::asio::io_context &ios, int protocol, bool use_work_queue, const std::string &cpu_pin_policy)
KSyncReceiveQueue * uve_rx_queue[kRxWorkQueueCount]
bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context)
uint32_t work_queue_index() const
boost::asio::ip::udp::socket sock_
virtual void VrfMsgHandler(vr_vrf_req *req)=0
void Data(KSyncBufferList *iovec)
uint32_t vr_response_count_
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)=0
virtual void ErrorHandler(int err)
static KSyncSock * Get(DBTablePartBase *partition)
void ReadHandler(const boost::system::error_code &error, size_t bytes_transferred)
virtual bool IsMoreData(char *data)
static void Start(bool read_inline)
static TaskScheduler * GetInstance()
static tbb::atomic< bool > shutdown_
static void SetSockTableEntry(KSyncSock *sock)
AgentSandeshContext * GetSandeshContext()
virtual KSyncObject * GetObject() const =0
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
void SetMeasureQueueDelay(bool val)
uint32_t AllocSeqNo(IoContext::Type type)
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
uint32_t rx_buffer_index_
static int32_t ReceiveBinaryMsgOne(u_int8_t *buf, u_int32_t buf_len, int *error, SandeshContext *client_context)
void set_measure_busy_time(bool val) const
bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount]
virtual void FlowMsgHandler(vr_flow_req *req)=0
void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
KSyncReceiveQueue * AllocQueue(KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance, const char *name)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
KSyncBulkMsgContext * bulk_msg_context_
void WriteHandler(const boost::system::error_code &error, size_t bytes_transferred)
boost::asio::netlink::raw::socket sock_
void QosConfigMsgHandler(vr_qos_map_req *req)
virtual void QosConfigMsgHandler(vr_qos_map_req *req)=0
IoContext::Type io_context_type() const
void set_ksync_io_ctx(const KSyncIoContext *ioc)
static const unsigned kMaxRxBufferCount
static std::string VrouterErrorToString(uint32_t error)
#define KSYNC_DEFAULT_Q_ID_SEQ
KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount]
void Insert(IoContext *ioc)
KSyncReceiveQueue * ksync_rx_queue[kRxWorkQueueCount]
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
uint32_t max_bulk_msg_count_
static void NetlinkDecoder(char *data, SandeshContext *ctxt)
virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req)=0
bool NetlinkMsgDone(char *data)
virtual void RouteMsgHandler(vr_route_req *req)=0
void InitNetlink(nl_client *client)
virtual bool Validate(char *data)
virtual void IfMsgHandler(vr_interface_req *req)=0
virtual void ErrorHandler(int err, uint32_t seqno, KSyncEvent event) const
virtual void NHMsgHandler(vr_nexthop_req *req)=0
KSyncBulkMsgContext * bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE]
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)=0
KSyncSockUdp(boost::asio::io_context &ios, int port)
virtual void ForwardingClassMsgHandler(vr_fc_map_req *req)=0
#define LOG(_Level, _Msg)
bool process_data_inline_
static void SetNetlinkFamilyId(int id)
KSyncReceiveQueue * GetReceiveQueue(IoContext::Type type, uint32_t instance)
char * GetReceiveBuffer()
char * rx_buffer1() const
void NHMsgHandler(vr_nexthop_req *req)
static void Init(boost::asio::io_context &ios, int port, const std::string &cpu_pin_policy)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)=0
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
void FlowResponseHandler(vr_flow_response *req)
virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req)=0
IoContextList::iterator io_context_list_it_
void ErrorHandler(int err)
boost::asio::detail::socket_option::integer< SOL_SOCKET, SO_RCVBUFFORCE > ReceiveBuffForceSize
bool Enqueue(QueueEntryT entry)
void ResetNetlink(nl_client *client)
void VxLanMsgHandler(vr_vxlan_req *req)
boost::asio::ip::udp::endpoint server_ep_
void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context, uint32_t alignment)
void set_name(const std::string &name)
tbb::atomic< uint32_t > seqno_
KSyncEntry::KSyncEvent event_
virtual int VrResponseMsgHandler(vr_response *resp)=0
char * rx_buffers_[kMaxRxBufferCount]
static const unsigned kBufLen
void FlowMsgHandler(vr_flow_req *req)
void OnEmptyQueue(bool done)
void IfMsgHandler(vr_interface_req *req)