5 #ifndef ctrlplane_ksync_sock_h
6 #define ctrlplane_ksync_sock_h
9 #include <boost/asio.hpp>
10 #include <boost/asio/buffer.hpp>
12 #include <boost/asio/netlink_protocol.hpp>
13 #include <boost/asio/netlink_endpoint.hpp>
15 #include <tbb/atomic.h>
16 #include <tbb/mutex.h>
19 #include <sandesh/common/vns_constants.h>
20 #include <sandesh/common/vns_types.h>
27 #define KSYNC_DEFAULT_MSG_SIZE 4096
28 #define KSYNC_DEFAULT_Q_ID_SEQ 0x00000001
29 #define KSYNC_ACK_WAIT_THRESHOLD 200
30 #define KSYNC_SOCK_RECV_BUFF_SIZE (256 * 1024)
31 #define KSYNC_BMC_ARR_SIZE 1024
47 void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no);
147 boost::intrusive::list_member_hook<>
node_;
226 typedef boost::intrusive::member_hook<
IoContext,
227 boost::intrusive::list_member_hook<>,
297 bool Decoder(
char *buff, uint32_t buff_len, uint32_t alignment,
bool more);
323 typedef std::map<uint32_t, KSyncBulkMsgContext>
WaitTree;
325 typedef boost::function<void(
const boost::system::error_code &,
size_t)>
376 uint32_t work_queue_index);
383 static void Start(
bool read_inline);
408 uint32_t task_id, uint32_t instance,
419 static void Init(
bool use_work_queue,
const std::string &cpu_pin_policy);
458 virtual void Receive(boost::asio::mutable_buffers_1) = 0;
464 void ReadHandler(
const boost::system::error_code& error,
465 size_t bytes_transferred);
468 void WriteHandler(
const boost::system::error_code& error,
469 size_t bytes_transferred);
476 tbb::mutex::scoped_lock lock(
mutex_);
502 static std::unique_ptr<KSyncSock>
sock_;
521 virtual uint32_t
GetSeqno(
char *data);
530 virtual void Receive(boost::asio::mutable_buffers_1);
534 static void Init(boost::asio::io_context &ios,
int protocol,
bool use_work_queue,
535 const std::string &cpu_pin_policy);
537 boost::asio::netlink::raw::socket
sock_;
546 virtual uint32_t
GetSeqno(
char *data);
555 virtual void Receive(boost::asio::mutable_buffers_1);
557 static void Init(boost::asio::io_context &ios,
int port,
558 const std::string &cpu_pin_policy);
565 #define KSYNC_AGENT_VROUTER_SOCK_PATH "/var/run/vrouter/dpdk_netlink"
574 virtual uint32_t
GetSeqno(
char *data);
583 virtual void Receive(boost::asio::mutable_buffers_1);
584 virtual bool Run(
void);
586 static void Init(boost::asio::io_context &ios,
587 const std::string &cpu_pin_policy,
588 const std::string &sockpathvr=
"");
590 boost::asio::local::stream_protocol::socket
sock_;
609 return sizeof(
struct nlmsghdr);
623 bool async_ready =
false);
637 virtual uint32_t
GetSeqno(
char *data);
646 virtual void Receive(boost::asio::mutable_buffers_1);
648 virtual bool Run(
void);
650 bool ReceiveMsg(
const u_int8_t *msg,
size_t size);
658 boost::asio::ip::address ip_addr,
int port,
659 const std::string &cpu_pin_policy);
void set_ksync_io_ctx(const KSyncIoContext *ioc)
virtual void MplsMsgHandler(vr_mpls_req *req)=0
virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req)=0
virtual void VrfMsgHandler(vr_vrf_req *req)=0
virtual ~AgentSandeshContext()
virtual void MirrorMsgHandler(vr_mirror_req *req)=0
virtual void FlowTableInfoHandler(vr_flow_table_data *r)
const KSyncIoContext * ksync_io_ctx_
virtual void NHMsgHandler(vr_nexthop_req *req)=0
const KSyncIoContext * ksync_io_ctx() const
virtual void QosConfigMsgHandler(vr_qos_map_req *req)=0
virtual void BridgeTableInfoHandler(vr_bridge_table_data *r)
virtual void VrouterOpsMsgHandler(vrouter_ops *req)=0
virtual void VrouterHugePageHandler(vr_hugepage_config *req)
virtual void SetErrno(int err)
virtual void FlowResponseHandler(vr_flow_response *req)
virtual void IfMsgHandler(vr_interface_req *req)=0
virtual void FlowMsgHandler(vr_flow_req *req)=0
virtual void ForwardingClassMsgHandler(vr_fc_map_req *req)=0
virtual void RouteMsgHandler(vr_route_req *req)=0
virtual void DropStatsMsgHandler(vr_drop_stats_req *req)=0
virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req)=0
virtual void VxLanMsgHandler(vr_vxlan_req *req)=0
virtual int VrResponseMsgHandler(vr_response *resp)=0
AgentSandeshContext * GetSandeshContext()
char * rx_buffer1() const
IoContext(char *msg, uint32_t len, uint32_t seq, AgentSandeshContext *ctx, Type type)
AgentSandeshContext * sandesh_context_
virtual void ErrorHandler(int err)
static const char * io_wq_names[MAX_WORK_QUEUES]
uint32_t GetSeqno() const
uint32_t GetMsgLen() const
IoContext(char *msg, uint32_t len, uint32_t seq, AgentSandeshContext *ctx, Type type, uint32_t index)
void SetSeqno(uint32_t seqno)
boost::intrusive::list_member_hook node_
bool operator<(const IoContext &rhs) const
void set_seqno(uint32_t seq)
uint32_t rx_buffer_index_
uint32_t work_queue_index_
KSyncBulkMsgContext(IoContext::Type type, uint32_t index)
void Insert(IoContext *ioc)
void Data(KSyncBufferList *iovec)
char * GetReceiveBuffer()
IoContext::Type io_context_type_
char * rx_buffers_[kMaxRxBufferCount]
uint32_t vr_response_count_
static const unsigned kMaxRxBufferCount
IoContextList::iterator io_context_list_it_
IoContextList io_context_list_
IoContext::Type io_context_type() const
uint32_t work_queue_index() const
void AddReceiveBuffer(char *buff)
void set_bulk_message_context(KSyncBulkMsgContext *bulk_context)
void VxLanMsgHandler(vr_vxlan_req *req)
void IfMsgHandler(vr_interface_req *req)
KSyncBulkSandeshContext()
void DropStatsMsgHandler(vr_drop_stats_req *req)
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
void ForwardingClassMsgHandler(vr_fc_map_req *req)
void VrfAssignMsgHandler(vr_vrf_assign_req *req)
void NHMsgHandler(vr_nexthop_req *req)
KSyncBulkMsgContext * bulk_msg_context_
void FlowResponseHandler(vr_flow_response *req)
virtual ~KSyncBulkSandeshContext()
DISALLOW_COPY_AND_ASSIGN(KSyncBulkSandeshContext)
void VrfStatsMsgHandler(vr_vrf_stats_req *req)
void VrouterOpsMsgHandler(vrouter_ops *req)
void RouteMsgHandler(vr_route_req *req)
void QosConfigMsgHandler(vr_qos_map_req *req)
int VrResponseMsgHandler(vr_response *resp)
void MplsMsgHandler(vr_mpls_req *req)
AgentSandeshContext * GetSandeshContext()
void VrfMsgHandler(vr_vrf_req *req)
void MirrorMsgHandler(vr_mirror_req *req)
void FlowMsgHandler(vr_flow_req *req)
void ErrorHandler(int err)
KSyncEntry::KSyncEvent event() const
AgentSandeshContext * agent_sandesh_ctx_
virtual ~KSyncIoContext()
KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
KSyncEntry::KSyncEvent event_
KSyncEntry * GetKSyncEntry() const
virtual bool IsMoreData(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
static void Init(boost::asio::io_context &ios, int protocol, bool use_work_queue, const std::string &cpu_pin_policy)
virtual void Receive(boost::asio::mutable_buffers_1)
boost::asio::netlink::raw::socket sock_
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
virtual ~KSyncSockNetlink()
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
virtual uint32_t GetSeqno(char *data)
virtual bool Validate(char *data)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
static void NetlinkDecoder(char *data, SandeshContext *ctxt)
static void NetlinkBulkDecoder(char *data, SandeshContext *ctxt, bool more)
virtual ~KSyncSockTcpSessionReader()
virtual const int GetMaxMessageSize()
static const int kMaxMessageSize
virtual const int GetHeaderLenSize()
virtual int MsgLength(Buffer buffer, int offset)
KSyncSockTcpSessionReader(TcpSession *session, ReceiveCallback callback)
KSyncSockTcpSessionReader * reader_
virtual void OnRead(Buffer buffer)
KSyncSockTcpSession(TcpServer *server, Socket *sock, bool async_ready=false)
virtual ~KSyncSockTcpSession()
virtual bool IsMoreData(char *data)
bool connect_complete() const
boost::asio::ip::tcp::endpoint server_ep_
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
virtual void Receive(boost::asio::mutable_buffers_1)
virtual uint32_t GetSeqno(char *data)
bool ReceiveMsg(const u_int8_t *msg, size_t size)
KSyncSockTcp(EventManager *evm, boost::asio::ip::address ip_addr, int port)
boost::asio::ip::tcp::socket * tcp_socket_
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
virtual bool Validate(char *data)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
virtual TcpSession * AllocSession(Socket *socket)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
static void Init(EventManager *evm, boost::asio::ip::address ip_addr, int port, const std::string &cpu_pin_policy)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
virtual void Receive(boost::asio::mutable_buffers_1)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
static void Init(boost::asio::io_context &ios, int port, const std::string &cpu_pin_policy)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
KSyncSockUdp(boost::asio::io_context &ios, int port)
virtual bool IsMoreData(char *data)
boost::asio::ip::udp::endpoint server_ep_
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
virtual bool Validate(char *data)
boost::asio::ip::udp::socket sock_
virtual uint32_t GetSeqno(char *data)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual bool IsMoreData(char *data)
static void Init(boost::asio::io_context &ios, const std::string &cpu_pin_policy, const std::string &sockpathvr="")
KSyncSockUds(boost::asio::io_context &ios)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
boost::asio::local::stream_protocol::endpoint server_ep_
boost::asio::local::stream_protocol::socket sock_
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
virtual void Receive(boost::asio::mutable_buffers_1)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
virtual bool Validate(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual uint32_t GetSeqno(char *data)
WorkQueue< KSyncRxData > KSyncReceiveQueue
static std::unique_ptr< KSyncSock > sock_
static AgentSandeshContext * agent_sandesh_ctx_[kRxWorkQueueCount]
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
static const unsigned kInvalidBulkSeqNo
std::size_t BlockingSend(char *msg, int msg_len)
KSyncReceiveQueue * ksync_rx_queue[kRxWorkQueueCount]
bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context, const KSyncRxData &data)
void reset_use_wait_tree()
DISALLOW_COPY_AND_ASSIGN(KSyncSock)
virtual bool Validate(char *data)=0
void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount]
void set_process_data_inline()
virtual uint32_t GetSeqno(char *data)=0
virtual void Receive(boost::asio::mutable_buffers_1)=0
std::pair< uint32_t, KSyncBulkMsgContext > WaitTreePair
KSyncBulkMsgContext * LocateBulkContext(uint32_t seqno, IoContext::Type io_context_type, uint32_t work_queue_index)
void OnEmptyQueue(bool done)
uint32_t max_bulk_buf_size_
const KSyncReceiveQueue * get_receive_work_queue(uint16_t index) const
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)=0
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)=0
void ProcessDataInline(char *data)
void ReadHandler(const boost::system::error_code &error, size_t bytes_transferred)
bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context)
void GenericSend(IoContext *ctx)
int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno)
tbb::atomic< uint32_t > seqno_
bool SendAsyncImpl(IoContext *ioc)
static void Start(bool read_inline)
uint32_t WaitTreeSize() const
void SendAsync(KSyncEntry *entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
WorkQueue< KSyncRxQueueData > KSyncRxWorkQueue
static const int kRxWorkQueueCount
KSyncReceiveQueue * AllocQueue(KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance, const char *name)
static void SetSockTableEntry(KSyncSock *sock)
KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount]
KSyncRxWorkQueue rx_process_queue_
static void SetNetlinkFamilyId(int id)
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
static void SetAgentSandeshContext(AgentSandeshContext *ctx, uint32_t idx)
static const unsigned kBufLen
static tbb::atomic< bool > shutdown_
void WriteHandler(const boost::system::error_code &error, size_t bytes_transferred)
void SetMeasureQueueDelay(bool val)
bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc)
KSyncReceiveQueue * GetReceiveQueue(IoContext::Type type, uint32_t instance)
static const int kMsgGrowSize
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)=0
uint32_t AllocSeqNo(IoContext::Type type)
std::map< uint32_t, KSyncBulkMsgContext > WaitTree
void SetSeqno(uint32_t seq)
static KSyncSock * Get(DBTablePartBase *partition)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)=0
KSyncReceiveQueue * uve_rx_queue[kRxWorkQueueCount]
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)=0
uint32_t max_bulk_msg_count_
KSyncBulkSandeshContext * GetBulkSandeshContext(uint32_t seqno)
static const unsigned kMaxBulkMsgCount
const KSyncTxQueue * send_queue() const
tbb::atomic< uint32_t > uve_seqno_
KSyncBulkMsgContext * bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE]
static int vnsw_netlink_family_id_
KSyncBulkMsgContext * bulk_msg_context_
bool ProcessRxData(KSyncRxQueueData data)
virtual bool IsMoreData(char *data)=0
bool process_data_inline_
static int GetNetlinkFamilyId()
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
boost::asio::const_buffer Buffer
boost::asio::ip::tcp::socket Socket
boost::asio::const_buffer Buffer
boost::asio::ip::tcp::socket Socket
bool NetlinkMsgDone(char *data)
bool ValidateNetlink(char *data)
boost::intrusive::list< IoContext, KSyncSockNode > IoContextList
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
boost::intrusive::member_hook< IoContext, boost::intrusive::list_member_hook<>, &IoContext::node_ > KSyncSockNode
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
#define KSYNC_BMC_ARR_SIZE
void InitNetlink(nl_client *client)
#define KSYNC_ACK_WAIT_THRESHOLD
uint32_t GetNetlinkSeqno(char *data)
void ResetNetlink(nl_client *client)
void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context, uint32_t alignment)
KSyncBulkMsgContext * bulk_msg_context_
KSyncRxData(char *buff, KSyncBulkMsgContext *ctxt)
KSyncRxData(const KSyncRxData &rhs)
KSyncRxQueueData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
KSyncEntry::KSyncEvent event_