10 #if defined(__linux__)
11 #include <asm/types.h>
12 #include <linux/netlink.h>
13 #include <linux/rtnetlink.h>
14 #include <linux/genetlink.h>
15 #include <linux/sockios.h>
18 #include <sys/socket.h>
20 #include <boost/bind/bind.hpp>
33 #include "ksync_types.h"
37 #include "vr_genetlink.h"
40 using namespace boost::asio;
41 using namespace boost::placeholders;
44 typedef boost::asio::detail::socket_option::integer<SOL_SOCKET,
63 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
64 return nlh->nlmsg_seq;
68 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
69 return ((nlh->nlmsg_flags & NLM_F_MULTI) != 0);
74 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
75 if (nlh->nlmsg_type == NLMSG_ERROR) {
76 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq <<
" len "
83 LOG(ERROR,
"Length of " << nlh->nlmsg_len <<
" is more than expected "
89 if (nlh->nlmsg_type == NLMSG_DONE) {
95 LOG(ERROR,
"Netlink unknown message type : " << nlh->nlmsg_type);
100 struct genlmsghdr *genlh = (
struct genlmsghdr *) (data + NLMSG_HDRLEN);
101 if (genlh->cmd != SANDESH_REQUEST) {
102 LOG(ERROR,
"Unknown generic netlink cmd : " << genlh->cmd);
107 struct nlattr * attr = (
struct nlattr *)(data + NLMSG_HDRLEN
109 if (attr->nla_type != NL_ATTR_VR_MESSAGE_PROTOCOL) {
110 LOG(ERROR,
"Unknown generic netlink TLV type : " << attr->nla_type);
119 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
121 if (nlh->nlmsg_type == NLMSG_DONE) {
124 len = NLMSG_HDRLEN + GENL_HDRLEN + NLA_HDRLEN;
128 *buf_len = nlh->nlmsg_len - len;
133 unsigned char *nl_buf;
135 assert(nl_build_header(client, &nl_buf, &nl_buf_len) >= 0);
139 unsigned char *nl_buf;
141 client->cl_buf_offset = 0;
142 nl_build_header(client, &nl_buf, &nl_buf_len);
146 nl_update_header(client, len);
147 struct nlmsghdr *nlh = (
struct nlmsghdr *)client->cl_buf;
149 nlh->nlmsg_seq = seq_no;
153 uint32_t alignment) {
154 while (buf_len > (alignment - 1)) {
157 &error, sandesh_context);
158 if (decode_len < 0) {
159 LOG(DEBUG,
"Incorrect decode len " << decode_len);
163 buf_len -= decode_len;
171 nl_client_(NULL), wait_tree_(), send_queue_(this),
172 max_bulk_msg_count_(kMaxBulkMsgCount),
173 bulk_seq_no_(kInvalidBulkSeqNo), bulk_buf_size_(0), bulk_msg_count_(0),
174 rx_buff_(NULL), read_inline_(true), bulk_msg_context_(NULL),
175 use_wait_tree_(true), process_data_inline_(false),
176 ksync_bulk_sandesh_context_(), uve_bulk_sandesh_context_(),
177 tx_count_(0), ack_count_(0), err_count_(0),
178 rx_process_queue_(
TaskScheduler::GetInstance()->GetTaskId(
"Agent::KSync"), 0,
182 uint32_t uve_task_id =
184 uint32_t ksync_task_id =
188 ksync_task_id, i,
"KSync Receive Queue");
190 uve_task_id, i,
"KSync UVE Receive Queue");
193 nl_client_ = (nl_client *)malloc(
sizeof(nl_client));
227 sock_->send_queue_.Shutdown();
232 sock_->send_queue_.Init(use_work_queue, cpu_pin_policy);
243 &ctxt[instance], _1));
245 sprintf(tmp,
"%s-%d", name, instance);
251 sock_->send_queue_.set_measure_busy_time(val);
258 sock_->read_inline_ = read_inline;
259 if (
sock_->read_inline_) {
266 placeholders::bytes_transferred));
270 assert(
sock_.get() == NULL);
294 seq =
seqno_.fetch_add(1);
370 size_t bytes_transferred) {
372 LOG(ERROR,
"Error reading from Ksync sock. Error : " <<
373 boost::system::system_error(error).what());
386 placeholders::bytes_transferred));
394 WaitTree::iterator it;
398 std::scoped_lock lock(
mutex_);
402 LOG(ERROR,
"KSync error in finding for sequence number : "
406 bulk_message_context = &(it->second);
416 std::scoped_lock lock(
mutex_);
436 KSYNC_ERROR(VRouterError,
"VRouter operation failed. Error <",
439 ">. Object <",
"N/A",
">. State <",
"N/A",
440 ">. Message number :", 0);
451 iovec.push_back(buffer(msg, msg_len));
475 size_t bytes_transferred) {
477 LOG(ERROR,
"Ksync sock write error : " <<
478 boost::system::system_error(error).what());
493 std::scoped_lock lock(
mutex_);
496 bulk_message_context = &it->second;
512 bulk_message_context->
Data(&iovec);
526 placeholders::bytes_transferred));
529 bool more_data =
false;
550 uint32_t work_queue_index) {
564 std::scoped_lock lock(
mutex_);
614 bulk_message_context->
Insert(ioc);
651 : sock_(ios, protocol) {
654 boost::system::error_code ec;
655 sock_.set_option(set_rcv_buf, ec);
656 if (ec.value() != 0) {
657 LOG(ERROR,
"Error Changing netlink receive sock buffer size to " <<
658 set_rcv_buf.value() <<
" error = " <<
659 boost::system::system_error(ec).what());
661 boost::asio::socket_base::receive_buffer_size rcv_buf_size;
662 boost::system::error_code ec1;
663 sock_.get_option(rcv_buf_size, ec);
664 LOG(INFO,
"Current receive sock buffer size is " << rcv_buf_size.value());
671 const std::string &cpu_pin_policy) {
692 KSyncBufferList::iterator it = iovec->begin();
693 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf,
697 boost::asio::netlink::raw::endpoint ep;
698 sock_.async_send_to(*iovec, ep, cb);
703 KSyncBufferList::iterator it = iovec->begin();
704 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf,
708 boost::asio::netlink::raw::endpoint ep;
709 return sock_.send_to(*iovec, ep);
716 uint32_t buf_len = 0;
731 uint32_t buf_len = 0;
735 bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO, more);
741 uint32_t buf_len = 0;
744 return bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO,
IsMoreData(data));
748 sock_.async_receive(buf, cb);
753 struct nlmsghdr *nlh = buffer_cast<struct nlmsghdr *>(buf);
754 if (nlh->nlmsg_type == NLMSG_ERROR) {
755 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq
756 <<
" len " << nlh->nlmsg_len);
766 sock_(ios,
boost::asio::ip::udp::endpoint(
boost::asio::ip::udp::v4(), 0)),
767 server_ep_(
boost::asio::ip::address::from_string(
"127.0.0.1"), port) {
771 const std::string &cpu_pin_policy) {
777 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
782 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
783 return ((hdr->flags & UVR_MORE) == UVR_MORE);
794 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
795 uint32_t buf_len = hdr->msg_len;
796 char *buf = data +
sizeof(
struct uvr_msg_hdr);
802 struct uvr_msg_hdr hdr;
807 KSyncBufferList::iterator it = iovec->begin();
808 iovec->insert(it, buffer((
char *)(&hdr),
sizeof(hdr)));
814 struct uvr_msg_hdr hdr;
819 KSyncBufferList::iterator it = iovec->begin();
820 iovec->insert(it, buffer((
char *)(&hdr),
sizeof(hdr)));
830 boost::asio::ip::udp::endpoint ep;
831 sock_.async_receive_from(buf, ep, cb);
835 boost::asio::ip::udp::endpoint ep;
836 sock_.receive_from(buf, ep);
849 assert(bulk_message_context->
seqno() == seqno);
856 delete bulk_message_context;
870 int msg_len,
char *msg,
873 sock->GetAgentSandeshContext(sync_entry->GetTableIndex()),
874 IoContext::IOC_KSYNC, sync_entry->GetTableIndex()),
875 entry_(sync_entry), event_(event), sock_(sock) {
902 if (sandesh_context->
GetErrno() != 0 &&
903 sandesh_context->
GetErrno() != EEXIST) {
921 uint32_t alignment,
bool more) {
1055 io_context_list_(), io_context_type_(
type), work_queue_index_(index),
1056 rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1060 io_context_list_(), io_context_type_(rhs.io_context_type_),
1061 work_queue_index_(rhs.work_queue_index_),
1062 rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1100 iovec->push_back(buffer(it->GetMsg(), it->GetMsgLen()));
void set_ksync_io_ctx(const KSyncIoContext *ioc)
virtual void MplsMsgHandler(vr_mpls_req *req)=0
virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req)=0
virtual void VrfMsgHandler(vr_vrf_req *req)=0
virtual void MirrorMsgHandler(vr_mirror_req *req)=0
virtual void NHMsgHandler(vr_nexthop_req *req)=0
virtual void QosConfigMsgHandler(vr_qos_map_req *req)=0
virtual void VrouterOpsMsgHandler(vrouter_ops *req)=0
virtual void SetErrno(int err)
virtual void FlowResponseHandler(vr_flow_response *req)
virtual void IfMsgHandler(vr_interface_req *req)=0
virtual void FlowMsgHandler(vr_flow_req *req)=0
virtual void ForwardingClassMsgHandler(vr_fc_map_req *req)=0
virtual void RouteMsgHandler(vr_route_req *req)=0
virtual void DropStatsMsgHandler(vr_drop_stats_req *req)=0
virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req)=0
virtual void VxLanMsgHandler(vr_vxlan_req *req)=0
virtual int VrResponseMsgHandler(vr_response *resp)=0
AgentSandeshContext * GetSandeshContext()
char * rx_buffer1() const
virtual void ErrorHandler(int err)
static const char * io_wq_names[MAX_WORK_QUEUES]
uint32_t GetSeqno() const
uint32_t GetMsgLen() const
void SetSeqno(uint32_t seqno)
void set_seqno(uint32_t seq)
uint32_t rx_buffer_index_
KSyncBulkMsgContext(IoContext::Type type, uint32_t index)
void Insert(IoContext *ioc)
void Data(KSyncBufferList *iovec)
char * GetReceiveBuffer()
char * rx_buffers_[kMaxRxBufferCount]
uint32_t vr_response_count_
static const unsigned kMaxRxBufferCount
IoContextList::iterator io_context_list_it_
IoContextList io_context_list_
IoContext::Type io_context_type() const
uint32_t work_queue_index() const
void AddReceiveBuffer(char *buff)
void set_bulk_message_context(KSyncBulkMsgContext *bulk_context)
void VxLanMsgHandler(vr_vxlan_req *req)
void IfMsgHandler(vr_interface_req *req)
KSyncBulkSandeshContext()
void DropStatsMsgHandler(vr_drop_stats_req *req)
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
void ForwardingClassMsgHandler(vr_fc_map_req *req)
void VrfAssignMsgHandler(vr_vrf_assign_req *req)
void NHMsgHandler(vr_nexthop_req *req)
KSyncBulkMsgContext * bulk_msg_context_
void FlowResponseHandler(vr_flow_response *req)
virtual ~KSyncBulkSandeshContext()
void VrfStatsMsgHandler(vr_vrf_stats_req *req)
void VrouterOpsMsgHandler(vrouter_ops *req)
void RouteMsgHandler(vr_route_req *req)
void QosConfigMsgHandler(vr_qos_map_req *req)
int VrResponseMsgHandler(vr_response *resp)
void MplsMsgHandler(vr_mpls_req *req)
AgentSandeshContext * GetSandeshContext()
void VrfMsgHandler(vr_vrf_req *req)
void MirrorMsgHandler(vr_mirror_req *req)
void FlowMsgHandler(vr_flow_req *req)
virtual KSyncObject * GetObject() const =0
virtual bool pre_alloc_rx_buffer() const
virtual void ErrorHandler(int err, uint32_t seqno, KSyncEvent event) const
static std::string VrouterErrorToString(uint32_t error)
void ErrorHandler(int err)
KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
KSyncEntry::KSyncEvent event_
virtual void NetlinkAck(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
virtual bool IsMoreData(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
static void Init(boost::asio::io_context &ios, int protocol, bool use_work_queue, const std::string &cpu_pin_policy)
virtual void Receive(boost::asio::mutable_buffers_1)
boost::asio::netlink::raw::socket sock_
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
virtual ~KSyncSockNetlink()
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
virtual uint32_t GetSeqno(char *data)
virtual bool Validate(char *data)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
static void NetlinkDecoder(char *data, SandeshContext *ctxt)
static void NetlinkBulkDecoder(char *data, SandeshContext *ctxt, bool more)
virtual void Receive(boost::asio::mutable_buffers_1)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
static void Init(boost::asio::io_context &ios, int port, const std::string &cpu_pin_policy)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
KSyncSockUdp(boost::asio::io_context &ios, int port)
virtual bool IsMoreData(char *data)
boost::asio::ip::udp::endpoint server_ep_
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
virtual bool Validate(char *data)
boost::asio::ip::udp::socket sock_
virtual uint32_t GetSeqno(char *data)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
WorkQueue< KSyncRxData > KSyncReceiveQueue
static std::unique_ptr< KSyncSock > sock_
static AgentSandeshContext * agent_sandesh_ctx_[kRxWorkQueueCount]
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
static const unsigned kInvalidBulkSeqNo
std::size_t BlockingSend(char *msg, int msg_len)
KSyncReceiveQueue * ksync_rx_queue[kRxWorkQueueCount]
bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context, const KSyncRxData &data)
virtual bool Validate(char *data)=0
void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount]
std::atomic< uint32_t > seqno_
virtual uint32_t GetSeqno(char *data)=0
virtual void Receive(boost::asio::mutable_buffers_1)=0
std::pair< uint32_t, KSyncBulkMsgContext > WaitTreePair
std::atomic< uint32_t > uve_seqno_
KSyncBulkMsgContext * LocateBulkContext(uint32_t seqno, IoContext::Type io_context_type, uint32_t work_queue_index)
void OnEmptyQueue(bool done)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)=0
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)=0
void ProcessDataInline(char *data)
void ReadHandler(const boost::system::error_code &error, size_t bytes_transferred)
bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context)
void GenericSend(IoContext *ctx)
int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno)
bool SendAsyncImpl(IoContext *ioc)
static void Start(bool read_inline)
uint32_t WaitTreeSize() const
void SendAsync(KSyncEntry *entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
static const int kRxWorkQueueCount
KSyncReceiveQueue * AllocQueue(KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance, const char *name)
static void SetSockTableEntry(KSyncSock *sock)
KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount]
KSyncRxWorkQueue rx_process_queue_
static void SetNetlinkFamilyId(int id)
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
static const unsigned kBufLen
void WriteHandler(const boost::system::error_code &error, size_t bytes_transferred)
void SetMeasureQueueDelay(bool val)
bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc)
KSyncReceiveQueue * GetReceiveQueue(IoContext::Type type, uint32_t instance)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)=0
uint32_t AllocSeqNo(IoContext::Type type)
void SetSeqno(uint32_t seq)
static KSyncSock * Get(DBTablePartBase *partition)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)=0
KSyncReceiveQueue * uve_rx_queue[kRxWorkQueueCount]
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)=0
uint32_t max_bulk_msg_count_
KSyncBulkSandeshContext * GetBulkSandeshContext(uint32_t seqno)
KSyncBulkMsgContext * bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE]
static int vnsw_netlink_family_id_
KSyncBulkMsgContext * bulk_msg_context_
bool ProcessRxData(KSyncRxQueueData data)
virtual bool IsMoreData(char *data)=0
bool process_data_inline_
static std::atomic< bool > shutdown_
static int GetNetlinkFamilyId()
bool Enqueue(IoContext *io_context)
static int32_t ReceiveBinaryMsgOne(u_int8_t *buf, u_int32_t buf_len, int *error, SandeshContext *client_context)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
int GetTaskId(const std::string &name)
static TaskScheduler * GetInstance()
bool Enqueue(QueueEntryT entry)
void Shutdown(bool delete_entries=true)
void set_measure_busy_time(bool val) const
void set_name(const std::string &name)
#define KSYNC_ERROR(obj,...)
bool NetlinkMsgDone(char *data)
bool ValidateNetlink(char *data)
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
boost::asio::detail::socket_option::integer< SOL_SOCKET, SO_RCVBUFFORCE > ReceiveBuffForceSize
void InitNetlink(nl_client *client)
uint32_t GetNetlinkSeqno(char *data)
void ResetNetlink(nl_client *client)
void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context, uint32_t alignment)
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
#define KSYNC_BMC_ARR_SIZE
#define KSYNC_DEFAULT_Q_ID_SEQ
#define KSYNC_SOCK_RECV_BUFF_SIZE
#define LOG(_Level, _Msg)
void operator()(IoContext *io_context)
KSyncBulkMsgContext * bulk_msg_context_
KSyncEntry::KSyncEvent event_