9 #include <linux/netlink.h>
10 #include <linux/rtnetlink.h>
11 #include <linux/genetlink.h>
12 #include <linux/sockios.h>
14 #include <sys/socket.h>
16 #include <boost/bind.hpp>
29 #include "ksync_types.h"
33 #include "vr_genetlink.h"
36 using namespace boost::asio;
39 typedef boost::asio::detail::socket_option::integer<SOL_SOCKET,
58 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
59 return nlh->nlmsg_seq;
63 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
64 return ((nlh->nlmsg_flags & NLM_F_MULTI) != 0);
69 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
70 if (nlh->nlmsg_type == NLMSG_ERROR) {
71 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq <<
" len "
78 LOG(ERROR,
"Length of " << nlh->nlmsg_len <<
" is more than expected "
84 if (nlh->nlmsg_type == NLMSG_DONE) {
90 LOG(ERROR,
"Netlink unknown message type : " << nlh->nlmsg_type);
95 struct genlmsghdr *genlh = (
struct genlmsghdr *) (data + NLMSG_HDRLEN);
96 if (genlh->cmd != SANDESH_REQUEST) {
97 LOG(ERROR,
"Unknown generic netlink cmd : " << genlh->cmd);
102 struct nlattr * attr = (
struct nlattr *)(data + NLMSG_HDRLEN
104 if (attr->nla_type != NL_ATTR_VR_MESSAGE_PROTOCOL) {
105 LOG(ERROR,
"Unknown generic netlink TLV type : " << attr->nla_type);
114 struct nlmsghdr *nlh = (
struct nlmsghdr *)data;
116 if (nlh->nlmsg_type == NLMSG_DONE) {
119 len = NLMSG_HDRLEN + GENL_HDRLEN + NLA_HDRLEN;
123 *buf_len = nlh->nlmsg_len - len;
128 unsigned char *nl_buf;
130 assert(nl_build_header(client, &nl_buf, &nl_buf_len) >= 0);
134 unsigned char *nl_buf;
136 client->cl_buf_offset = 0;
137 nl_build_header(client, &nl_buf, &nl_buf_len);
141 nl_update_header(client, len);
142 struct nlmsghdr *nlh = (
struct nlmsghdr *)client->cl_buf;
144 nlh->nlmsg_seq = seq_no;
148 uint32_t alignment) {
149 while (buf_len > (alignment - 1)) {
152 &error, sandesh_context);
153 if (decode_len < 0) {
154 LOG(DEBUG,
"Incorrect decode len " << decode_len);
158 buf_len -= decode_len;
166 nl_client_(NULL), wait_tree_(), send_queue_(this),
167 max_bulk_msg_count_(kMaxBulkMsgCount),
168 bulk_seq_no_(kInvalidBulkSeqNo), bulk_buf_size_(0), bulk_msg_count_(0),
169 rx_buff_(NULL), read_inline_(true), bulk_msg_context_(NULL),
170 use_wait_tree_(true), process_data_inline_(false),
171 ksync_bulk_sandesh_context_(), uve_bulk_sandesh_context_(),
172 tx_count_(0), ack_count_(0), err_count_(0),
173 rx_process_queue_(
TaskScheduler::GetInstance()->GetTaskId(
"Agent::KSync"), 0,
177 uint32_t uve_task_id =
179 uint32_t ksync_task_id =
183 ksync_task_id, i,
"KSync Receive Queue");
185 uve_task_id, i,
"KSync UVE Receive Queue");
188 nl_client_ = (nl_client *)malloc(
sizeof(nl_client));
222 sock_->send_queue_.Shutdown();
227 sock_->send_queue_.Init(use_work_queue, cpu_pin_policy);
238 &ctxt[instance], _1));
240 sprintf(tmp,
"%s-%d", name, instance);
246 sock_->send_queue_.set_measure_busy_time(val);
253 sock_->read_inline_ = read_inline;
254 if (
sock_->read_inline_) {
261 placeholders::bytes_transferred));
265 assert(
sock_.get() == NULL);
289 seq =
seqno_.fetch_and_add(1);
365 size_t bytes_transferred) {
367 LOG(ERROR,
"Error reading from Ksync sock. Error : " <<
368 boost::system::system_error(error).what());
381 placeholders::bytes_transferred));
389 WaitTree::iterator it;
393 tbb::mutex::scoped_lock lock(
mutex_);
397 LOG(ERROR,
"KSync error in finding for sequence number : "
401 bulk_message_context = &(it->second);
411 tbb::mutex::scoped_lock lock(
mutex_);
431 KSYNC_ERROR(VRouterError,
"VRouter operation failed. Error <",
434 ">. Object <",
"N/A",
">. State <",
"N/A",
435 ">. Message number :", 0);
446 iovec.push_back(buffer(msg, msg_len));
470 size_t bytes_transferred) {
472 LOG(ERROR,
"Ksync sock write error : " <<
473 boost::system::system_error(error).what());
488 tbb::mutex::scoped_lock lock(
mutex_);
491 bulk_message_context = &it->second;
507 bulk_message_context->
Data(&iovec);
521 placeholders::bytes_transferred));
524 bool more_data =
false;
545 uint32_t work_queue_index) {
559 tbb::mutex::scoped_lock lock(
mutex_);
609 bulk_message_context->
Insert(ioc);
646 : sock_(ios, protocol) {
649 boost::system::error_code ec;
650 sock_.set_option(set_rcv_buf, ec);
651 if (ec.value() != 0) {
652 LOG(ERROR,
"Error Changing netlink receive sock buffer size to " <<
653 set_rcv_buf.value() <<
" error = " <<
654 boost::system::system_error(ec).what());
656 boost::asio::socket_base::receive_buffer_size rcv_buf_size;
657 boost::system::error_code ec1;
658 sock_.get_option(rcv_buf_size, ec);
659 LOG(INFO,
"Current receive sock buffer size is " << rcv_buf_size.value());
666 const std::string &cpu_pin_policy) {
687 KSyncBufferList::iterator it = iovec->begin();
688 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf,
692 boost::asio::netlink::raw::endpoint ep;
693 sock_.async_send_to(*iovec, ep, cb);
698 KSyncBufferList::iterator it = iovec->begin();
699 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf,
703 boost::asio::netlink::raw::endpoint ep;
704 return sock_.send_to(*iovec, ep);
711 uint32_t buf_len = 0;
726 uint32_t buf_len = 0;
730 bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO, more);
736 uint32_t buf_len = 0;
739 return bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO,
IsMoreData(data));
743 sock_.async_receive(buf, cb);
748 struct nlmsghdr *nlh = buffer_cast<struct nlmsghdr *>(buf);
749 if (nlh->nlmsg_type == NLMSG_ERROR) {
750 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq
751 <<
" len " << nlh->nlmsg_len);
761 sock_(ios,
boost::asio::ip::udp::endpoint(
boost::asio::ip::udp::v4(), 0)),
762 server_ep_(
boost::asio::ip::address::from_string(
"127.0.0.1"), port) {
766 const std::string &cpu_pin_policy) {
772 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
777 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
778 return ((hdr->flags & UVR_MORE) == UVR_MORE);
789 struct uvr_msg_hdr *hdr = (
struct uvr_msg_hdr *)data;
790 uint32_t buf_len = hdr->msg_len;
791 char *buf = data +
sizeof(
struct uvr_msg_hdr);
797 struct uvr_msg_hdr hdr;
802 KSyncBufferList::iterator it = iovec->begin();
803 iovec->insert(it, buffer((
char *)(&hdr),
sizeof(hdr)));
809 struct uvr_msg_hdr hdr;
814 KSyncBufferList::iterator it = iovec->begin();
815 iovec->insert(it, buffer((
char *)(&hdr),
sizeof(hdr)));
825 boost::asio::ip::udp::endpoint ep;
826 sock_.async_receive_from(buf, ep, cb);
830 boost::asio::ip::udp::endpoint ep;
831 sock_.receive_from(buf, ep);
844 assert(bulk_message_context->
seqno() == seqno);
851 delete bulk_message_context;
865 int msg_len,
char *msg,
868 sock->GetAgentSandeshContext(sync_entry->GetTableIndex()),
869 IoContext::IOC_KSYNC, sync_entry->GetTableIndex()),
870 entry_(sync_entry), event_(event), sock_(sock) {
897 if (sandesh_context->
GetErrno() != 0 &&
898 sandesh_context->
GetErrno() != EEXIST) {
916 uint32_t alignment,
bool more) {
1050 io_context_list_(), io_context_type_(
type), work_queue_index_(index),
1051 rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1055 io_context_list_(), io_context_type_(rhs.io_context_type_),
1056 work_queue_index_(rhs.work_queue_index_),
1057 rx_buffer_index_(0), vr_response_count_(0), io_context_list_it_() {
1095 iovec->push_back(buffer(it->GetMsg(), it->GetMsgLen()));
void set_ksync_io_ctx(const KSyncIoContext *ioc)
virtual void MplsMsgHandler(vr_mpls_req *req)=0
virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req)=0
virtual void VrfMsgHandler(vr_vrf_req *req)=0
virtual void MirrorMsgHandler(vr_mirror_req *req)=0
virtual void NHMsgHandler(vr_nexthop_req *req)=0
virtual void QosConfigMsgHandler(vr_qos_map_req *req)=0
virtual void VrouterOpsMsgHandler(vrouter_ops *req)=0
virtual void SetErrno(int err)
virtual void FlowResponseHandler(vr_flow_response *req)
virtual void IfMsgHandler(vr_interface_req *req)=0
virtual void FlowMsgHandler(vr_flow_req *req)=0
virtual void ForwardingClassMsgHandler(vr_fc_map_req *req)=0
virtual void RouteMsgHandler(vr_route_req *req)=0
virtual void DropStatsMsgHandler(vr_drop_stats_req *req)=0
virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req)=0
virtual void VxLanMsgHandler(vr_vxlan_req *req)=0
virtual int VrResponseMsgHandler(vr_response *resp)=0
AgentSandeshContext * GetSandeshContext()
char * rx_buffer1() const
virtual void ErrorHandler(int err)
static const char * io_wq_names[MAX_WORK_QUEUES]
uint32_t GetSeqno() const
uint32_t GetMsgLen() const
void SetSeqno(uint32_t seqno)
void set_seqno(uint32_t seq)
uint32_t rx_buffer_index_
KSyncBulkMsgContext(IoContext::Type type, uint32_t index)
void Insert(IoContext *ioc)
void Data(KSyncBufferList *iovec)
char * GetReceiveBuffer()
char * rx_buffers_[kMaxRxBufferCount]
uint32_t vr_response_count_
static const unsigned kMaxRxBufferCount
IoContextList::iterator io_context_list_it_
IoContextList io_context_list_
IoContext::Type io_context_type() const
uint32_t work_queue_index() const
void AddReceiveBuffer(char *buff)
void set_bulk_message_context(KSyncBulkMsgContext *bulk_context)
void VxLanMsgHandler(vr_vxlan_req *req)
void IfMsgHandler(vr_interface_req *req)
KSyncBulkSandeshContext()
void DropStatsMsgHandler(vr_drop_stats_req *req)
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
void ForwardingClassMsgHandler(vr_fc_map_req *req)
void VrfAssignMsgHandler(vr_vrf_assign_req *req)
void NHMsgHandler(vr_nexthop_req *req)
KSyncBulkMsgContext * bulk_msg_context_
void FlowResponseHandler(vr_flow_response *req)
virtual ~KSyncBulkSandeshContext()
void VrfStatsMsgHandler(vr_vrf_stats_req *req)
void VrouterOpsMsgHandler(vrouter_ops *req)
void RouteMsgHandler(vr_route_req *req)
void QosConfigMsgHandler(vr_qos_map_req *req)
int VrResponseMsgHandler(vr_response *resp)
void MplsMsgHandler(vr_mpls_req *req)
AgentSandeshContext * GetSandeshContext()
void VrfMsgHandler(vr_vrf_req *req)
void MirrorMsgHandler(vr_mirror_req *req)
void FlowMsgHandler(vr_flow_req *req)
virtual KSyncObject * GetObject() const =0
virtual bool pre_alloc_rx_buffer() const
virtual void ErrorHandler(int err, uint32_t seqno, KSyncEvent event) const
static std::string VrouterErrorToString(uint32_t error)
void ErrorHandler(int err)
KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
KSyncEntry::KSyncEvent event_
virtual void NetlinkAck(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
virtual bool IsMoreData(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
static void Init(boost::asio::io_context &ios, int protocol, bool use_work_queue, const std::string &cpu_pin_policy)
virtual void Receive(boost::asio::mutable_buffers_1)
boost::asio::netlink::raw::socket sock_
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
virtual ~KSyncSockNetlink()
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
virtual uint32_t GetSeqno(char *data)
virtual bool Validate(char *data)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
static void NetlinkDecoder(char *data, SandeshContext *ctxt)
static void NetlinkBulkDecoder(char *data, SandeshContext *ctxt, bool more)
virtual void Receive(boost::asio::mutable_buffers_1)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
static void Init(boost::asio::io_context &ios, int port, const std::string &cpu_pin_policy)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
KSyncSockUdp(boost::asio::io_context &ios, int port)
virtual bool IsMoreData(char *data)
boost::asio::ip::udp::endpoint server_ep_
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
virtual bool Validate(char *data)
boost::asio::ip::udp::socket sock_
virtual uint32_t GetSeqno(char *data)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
WorkQueue< KSyncRxData > KSyncReceiveQueue
static std::unique_ptr< KSyncSock > sock_
static AgentSandeshContext * agent_sandesh_ctx_[kRxWorkQueueCount]
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
static const unsigned kInvalidBulkSeqNo
std::size_t BlockingSend(char *msg, int msg_len)
KSyncReceiveQueue * ksync_rx_queue[kRxWorkQueueCount]
bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context, const KSyncRxData &data)
virtual bool Validate(char *data)=0
void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount]
virtual uint32_t GetSeqno(char *data)=0
virtual void Receive(boost::asio::mutable_buffers_1)=0
std::pair< uint32_t, KSyncBulkMsgContext > WaitTreePair
KSyncBulkMsgContext * LocateBulkContext(uint32_t seqno, IoContext::Type io_context_type, uint32_t work_queue_index)
void OnEmptyQueue(bool done)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)=0
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)=0
void ProcessDataInline(char *data)
void ReadHandler(const boost::system::error_code &error, size_t bytes_transferred)
bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context)
void GenericSend(IoContext *ctx)
int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno)
tbb::atomic< uint32_t > seqno_
bool SendAsyncImpl(IoContext *ioc)
static void Start(bool read_inline)
uint32_t WaitTreeSize() const
void SendAsync(KSyncEntry *entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
static const int kRxWorkQueueCount
KSyncReceiveQueue * AllocQueue(KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance, const char *name)
static void SetSockTableEntry(KSyncSock *sock)
KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount]
KSyncRxWorkQueue rx_process_queue_
static void SetNetlinkFamilyId(int id)
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
static const unsigned kBufLen
static tbb::atomic< bool > shutdown_
void WriteHandler(const boost::system::error_code &error, size_t bytes_transferred)
void SetMeasureQueueDelay(bool val)
bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc)
KSyncReceiveQueue * GetReceiveQueue(IoContext::Type type, uint32_t instance)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)=0
uint32_t AllocSeqNo(IoContext::Type type)
void SetSeqno(uint32_t seq)
static KSyncSock * Get(DBTablePartBase *partition)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)=0
KSyncReceiveQueue * uve_rx_queue[kRxWorkQueueCount]
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)=0
uint32_t max_bulk_msg_count_
KSyncBulkSandeshContext * GetBulkSandeshContext(uint32_t seqno)
tbb::atomic< uint32_t > uve_seqno_
KSyncBulkMsgContext * bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE]
static int vnsw_netlink_family_id_
KSyncBulkMsgContext * bulk_msg_context_
bool ProcessRxData(KSyncRxQueueData data)
virtual bool IsMoreData(char *data)=0
bool process_data_inline_
static int GetNetlinkFamilyId()
bool Enqueue(IoContext *io_context)
static int32_t ReceiveBinaryMsgOne(u_int8_t *buf, u_int32_t buf_len, int *error, SandeshContext *client_context)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
int GetTaskId(const std::string &name)
static TaskScheduler * GetInstance()
bool Enqueue(QueueEntryT entry)
void Shutdown(bool delete_entries=true)
void set_measure_busy_time(bool val) const
void set_name(const std::string &name)
#define KSYNC_ERROR(obj,...)
bool NetlinkMsgDone(char *data)
bool ValidateNetlink(char *data)
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
boost::asio::detail::socket_option::integer< SOL_SOCKET, SO_RCVBUFFORCE > ReceiveBuffForceSize
void InitNetlink(nl_client *client)
uint32_t GetNetlinkSeqno(char *data)
void ResetNetlink(nl_client *client)
void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context, uint32_t alignment)
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
#define KSYNC_BMC_ARR_SIZE
#define KSYNC_DEFAULT_Q_ID_SEQ
#define KSYNC_SOCK_RECV_BUFF_SIZE
#define LOG(_Level, _Msg)
void operator()(IoContext *io_context)
KSyncBulkMsgContext * bulk_msg_context_
KSyncEntry::KSyncEvent event_