5 #ifndef ctrlplane_ksync_sock_h
6 #define ctrlplane_ksync_sock_h
9 #include <boost/asio.hpp>
10 #include <boost/asio/buffer.hpp>
12 #include <boost/asio/netlink_protocol.hpp>
13 #include <boost/asio/netlink_endpoint.hpp>
15 #include <tbb/atomic.h>
16 #include <tbb/mutex.h>
19 #include <sandesh/common/vns_constants.h>
20 #include <sandesh/common/vns_types.h>
27 #define KSYNC_DEFAULT_MSG_SIZE 4096
28 #define KSYNC_DEFAULT_Q_ID_SEQ 0x00000001
29 #define KSYNC_ACK_WAIT_THRESHOLD 200
30 #define KSYNC_SOCK_RECV_BUFF_SIZE (256 * 1024)
31 #define KSYNC_BMC_ARR_SIZE 1024
47 void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no);
147 boost::intrusive::list_member_hook<>
node_;
226 typedef boost::intrusive::member_hook<
IoContext,
227 boost::intrusive::list_member_hook<>,
297 bool Decoder(
char *buff, uint32_t buff_len, uint32_t alignment,
bool more);
325 typedef std::map<uint32_t, KSyncBulkMsgContext>
WaitTree;
327 typedef boost::function<void(const boost::system::error_code &, size_t)>
378 uint32_t work_queue_index);
385 static void Start(
bool read_inline);
410 uint32_t task_id, uint32_t instance,
421 static void Init(
bool use_work_queue,
const std::string &cpu_pin_policy);
460 virtual void Receive(boost::asio::mutable_buffers_1) = 0;
461 virtual uint32_t
GetSeqno(
char *data) = 0;
463 virtual bool Validate(
char *data) = 0;
466 void ReadHandler(
const boost::system::error_code& error,
467 size_t bytes_transferred);
470 void WriteHandler(
const boost::system::error_code& error,
471 size_t bytes_transferred);
478 tbb::mutex::scoped_lock lock(
mutex_);
504 static std::unique_ptr<KSyncSock>
sock_;
523 virtual uint32_t
GetSeqno(
char *data);
532 virtual void Receive(boost::asio::mutable_buffers_1);
536 static void Init(boost::asio::io_context &ios,
int protocol,
bool use_work_queue,
537 const std::string &cpu_pin_policy);
539 boost::asio::netlink::raw::socket
sock_;
548 virtual uint32_t
GetSeqno(
char *data);
557 virtual void Receive(boost::asio::mutable_buffers_1);
559 static void Init(boost::asio::io_context &ios,
int port,
560 const std::string &cpu_pin_policy);
567 #define KSYNC_AGENT_VROUTER_SOCK_PATH "/var/run/vrouter/dpdk_netlink"
576 virtual uint32_t
GetSeqno(
char *data);
585 virtual void Receive(boost::asio::mutable_buffers_1);
586 virtual bool Run(
void);
588 static void Init(boost::asio::io_context &ios,
589 const std::string &cpu_pin_policy,
590 const std::string &sockpathvr=
"");
592 boost::asio::local::stream_protocol::socket
sock_;
611 return sizeof(
struct nlmsghdr);
625 bool async_ready =
false);
639 virtual uint32_t
GetSeqno(
char *data);
648 virtual void Receive(boost::asio::mutable_buffers_1);
650 virtual bool Run(
void);
652 bool ReceiveMsg(
const u_int8_t *msg,
size_t size);
660 boost::asio::ip::address ip_addr,
int port,
661 const std::string &cpu_pin_policy);
672 #endif // ctrlplane_ksync_sock_h
bool ProcessRxData(KSyncRxQueueData data)
uint32_t GetSeqno() const
KSyncRxWorkQueue rx_process_queue_
static const unsigned kInvalidBulkSeqNo
static const int kRxWorkQueueCount
virtual const int GetHeaderLenSize()
virtual TcpSession * AllocSession(Socket *socket)
virtual bool IsMoreData(char *data)
KSyncRxData(const KSyncRxData &rhs)
static int GetNetlinkFamilyId()
AgentSandeshContext * GetSandeshContext()
void VrfMsgHandler(vr_vrf_req *req)
void SetSeqno(uint32_t seq)
uint32_t WaitTreeSize() const
void VrfAssignMsgHandler(vr_vrf_assign_req *req)
boost::asio::const_buffer Buffer
KSyncEntry::KSyncEvent event_
static std::unique_ptr< KSyncSock > sock_
virtual void BridgeTableInfoHandler(vr_bridge_table_data *r)
std::size_t BlockingSend(char *msg, int msg_len)
virtual void MplsMsgHandler(vr_mpls_req *req)=0
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
virtual void SetErrno(int err)
virtual bool Validate(char *data)
void SendAsync(KSyncEntry *entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
boost::asio::ip::tcp::socket Socket
static const char * io_wq_names[MAX_WORK_QUEUES]
void RouteMsgHandler(vr_route_req *req)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)=0
static AgentSandeshContext * agent_sandesh_ctx_[kRxWorkQueueCount]
virtual ~KSyncSockNetlink()
virtual bool Validate(char *data)=0
KSyncBulkMsgContext * bulk_msg_context_
KSyncSockTcpSession(TcpServer *server, Socket *sock, bool async_ready=false)
boost::intrusive::list_member_hook node_
virtual void Receive(boost::asio::mutable_buffers_1)
void AddReceiveBuffer(char *buff)
virtual void DropStatsMsgHandler(vr_drop_stats_req *req)=0
KSyncIoContext(KSyncSock *sock, KSyncEntry *sync_entry, int msg_len, char *msg, KSyncEntry::KSyncEvent event)
boost::intrusive::list< IoContext, KSyncSockNode > IoContextList
KSyncBulkSandeshContext * GetBulkSandeshContext(uint32_t seqno)
void DropStatsMsgHandler(vr_drop_stats_req *req)
bool ProcessKernelData(KSyncBulkSandeshContext *ksync_context, const KSyncRxData &data)
KSyncEntry * GetKSyncEntry() const
virtual bool IsMoreData(char *data)
virtual uint32_t GetSeqno(char *data)
virtual bool Validate(char *data)
KSyncBulkSandeshContext()
void SetSeqno(uint32_t seqno)
uint32_t GetMsgLen() const
virtual bool Validate(char *data)
#define KSYNC_BMC_ARR_SIZE
virtual uint32_t GetSeqno(char *data)
virtual bool IsMoreData(char *data)
virtual void Receive(boost::asio::mutable_buffers_1)
virtual void FlowResponseHandler(vr_flow_response *req)
void set_bulk_message_context(KSyncBulkMsgContext *bulk_context)
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
KSyncBulkMsgContext * LocateBulkContext(uint32_t seqno, IoContext::Type io_context_type, uint32_t work_queue_index)
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
virtual void MirrorMsgHandler(vr_mirror_req *req)=0
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
void VrfStatsMsgHandler(vr_vrf_stats_req *req)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
KSyncSockNetlink(boost::asio::io_context &ios, int protocol)
void GenericSend(IoContext *ctx)
std::pair< uint32_t, KSyncBulkMsgContext > WaitTreePair
IoContext(char *msg, uint32_t len, uint32_t seq, AgentSandeshContext *ctx, Type type, uint32_t index)
KSyncSockTcp(EventManager *evm, boost::asio::ip::address ip_addr, int port)
void MplsMsgHandler(vr_mpls_req *req)
void ForwardingClassMsgHandler(vr_fc_map_req *req)
virtual uint32_t GetSeqno(char *data)=0
static int vnsw_netlink_family_id_
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
IoContextList io_context_list_
int VrResponseMsgHandler(vr_response *resp)
void ProcessDataInline(char *data)
boost::asio::ip::tcp::endpoint server_ep_
tbb::atomic< uint32_t > uve_seqno_
KSyncBulkMsgContext(IoContext::Type type, uint32_t index)
virtual uint32_t GetSeqno(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual void VxLanMsgHandler(vr_vxlan_req *req)=0
KSyncSockUds(boost::asio::io_context &ios)
bool SendAsyncImpl(IoContext *ioc)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
void VrouterOpsMsgHandler(vrouter_ops *req)
virtual uint32_t GetSeqno(char *data)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
void MirrorMsgHandler(vr_mirror_req *req)
static void NetlinkBulkDecoder(char *data, SandeshContext *ctxt, bool more)
virtual void FlowTableInfoHandler(vr_flow_table_data *r)
DISALLOW_COPY_AND_ASSIGN(KSyncSock)
KSyncBulkMsgContext * bulk_msg_context_
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
bool ValidateNetlink(char *data)
boost::asio::ip::tcp::socket Socket
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual ~KSyncBulkSandeshContext()
uint32_t max_bulk_buf_size_
uint32_t GetNetlinkSeqno(char *data)
virtual const int GetMaxMessageSize()
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)=0
virtual void Receive(boost::asio::mutable_buffers_1)=0
virtual bool IsMoreData(char *data)=0
virtual void VrouterOpsMsgHandler(vrouter_ops *req)=0
bool operator<(const IoContext &rhs) const
static void Init(boost::asio::io_context &ios, int protocol, bool use_work_queue, const std::string &cpu_pin_policy)
KSyncReceiveQueue * uve_rx_queue[kRxWorkQueueCount]
bool ValidateAndEnqueue(char *data, KSyncBulkMsgContext *context)
bool ReceiveMsg(const u_int8_t *msg, size_t size)
uint32_t work_queue_index() const
boost::asio::ip::udp::socket sock_
virtual void VrfMsgHandler(vr_vrf_req *req)=0
void Data(KSyncBufferList *iovec)
uint32_t vr_response_count_
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)=0
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual void ErrorHandler(int err)
static KSyncSock * Get(DBTablePartBase *partition)
static const int kMsgGrowSize
void ReadHandler(const boost::system::error_code &error, size_t bytes_transferred)
virtual bool IsMoreData(char *data)
virtual int MsgLength(Buffer buffer, int offset)
static void Start(bool read_inline)
static tbb::atomic< bool > shutdown_
boost::asio::const_buffer Buffer
static const int kMaxMessageSize
static void SetSockTableEntry(KSyncSock *sock)
AgentSandeshContext * GetSandeshContext()
bool connect_complete() const
virtual void Receive(boost::asio::mutable_buffers_1)
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
void SetMeasureQueueDelay(bool val)
virtual ~KSyncIoContext()
uint32_t AllocSeqNo(IoContext::Type type)
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
KSyncSockTcpSessionReader(TcpSession *session, ReceiveCallback callback)
uint32_t rx_buffer_index_
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
bool TryAddToBulk(KSyncBulkMsgContext *bulk_context, IoContext *ioc)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
KSyncBulkSandeshContext uve_bulk_sandesh_context_[kRxWorkQueueCount]
AgentSandeshContext * sandesh_context_
virtual void FlowMsgHandler(vr_flow_req *req)=0
void EnqueueRxProcessData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
IoContext(char *msg, uint32_t len, uint32_t seq, AgentSandeshContext *ctx, Type type)
int SendBulkMessage(KSyncBulkMsgContext *bulk_context, uint32_t seqno)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
KSyncReceiveQueue * AllocQueue(KSyncBulkSandeshContext ctxt[], uint32_t task_id, uint32_t instance, const char *name)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
KSyncBulkMsgContext * bulk_msg_context_
static void Init(boost::asio::io_context &ios, const std::string &cpu_pin_policy, const std::string &sockpathvr="")
boost::asio::ip::tcp::socket * tcp_socket_
const KSyncReceiveQueue * get_receive_work_queue(uint16_t index) const
const KSyncIoContext * ksync_io_ctx() const
void WriteHandler(const boost::system::error_code &error, size_t bytes_transferred)
boost::asio::netlink::raw::socket sock_
const KSyncIoContext * ksync_io_ctx_
void QosConfigMsgHandler(vr_qos_map_req *req)
virtual void QosConfigMsgHandler(vr_qos_map_req *req)=0
IoContext::Type io_context_type() const
KSyncRxData(char *buff, KSyncBulkMsgContext *ctxt)
void set_ksync_io_ctx(const KSyncIoContext *ioc)
static const unsigned kMaxRxBufferCount
KSyncBulkSandeshContext ksync_bulk_sandesh_context_[kRxWorkQueueCount]
void Insert(IoContext *ioc)
std::map< uint32_t, KSyncBulkMsgContext > WaitTree
static const unsigned kMaxBulkMsgCount
KSyncReceiveQueue * ksync_rx_queue[kRxWorkQueueCount]
void reset_use_wait_tree()
boost::asio::local::stream_protocol::socket sock_
static void SetAgentSandeshContext(AgentSandeshContext *ctx, uint32_t idx)
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
uint32_t max_bulk_msg_count_
static void NetlinkDecoder(char *data, SandeshContext *ctxt)
boost::asio::local::stream_protocol::endpoint server_ep_
WorkQueue< KSyncRxQueueData > KSyncRxWorkQueue
virtual void OnRead(Buffer buffer)
virtual void VrfAssignMsgHandler(vr_vrf_assign_req *req)=0
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
bool NetlinkMsgDone(char *data)
static void Init(EventManager *evm, boost::asio::ip::address ip_addr, int port, const std::string &cpu_pin_policy)
virtual void RouteMsgHandler(vr_route_req *req)=0
AgentSandeshContext * agent_sandesh_ctx_
#define KSYNC_ACK_WAIT_THRESHOLD
void InitNetlink(nl_client *client)
virtual bool Validate(char *data)
virtual void IfMsgHandler(vr_interface_req *req)=0
KSyncRxQueueData(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
virtual void NHMsgHandler(vr_nexthop_req *req)=0
KSyncSockTcpSessionReader * reader_
WorkQueue< KSyncRxData > KSyncReceiveQueue
KSyncBulkMsgContext * bulk_mctx_arr_[KSYNC_BMC_ARR_SIZE]
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)=0
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
virtual ~KSyncSockTcpSessionReader()
KSyncSockUdp(boost::asio::io_context &ios, int port)
void set_seqno(uint32_t seq)
virtual void ForwardingClassMsgHandler(vr_fc_map_req *req)=0
boost::intrusive::member_hook< IoContext, boost::intrusive::list_member_hook<>,&IoContext::node_ > KSyncSockNode
bool process_data_inline_
static void SetNetlinkFamilyId(int id)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
void set_process_data_inline()
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
KSyncReceiveQueue * GetReceiveQueue(IoContext::Type type, uint32_t instance)
char * GetReceiveBuffer()
char * rx_buffer1() const
uint32_t work_queue_index_
virtual void Receive(boost::asio::mutable_buffers_1)
DISALLOW_COPY_AND_ASSIGN(KSyncBulkSandeshContext)
void NHMsgHandler(vr_nexthop_req *req)
static void Init(boost::asio::io_context &ios, int port, const std::string &cpu_pin_policy)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)=0
KSyncEntry::KSyncEvent event() const
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
virtual ~AgentSandeshContext()
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
void FlowResponseHandler(vr_flow_response *req)
virtual void VrfStatsMsgHandler(vr_vrf_stats_req *req)=0
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
IoContextList::iterator io_context_list_it_
void ErrorHandler(int err)
static const unsigned kMaxBulkMsgSize
void ResetNetlink(nl_client *client)
void VxLanMsgHandler(vr_vxlan_req *req)
boost::asio::ip::udp::endpoint server_ep_
void DecodeSandeshMessages(char *buf, uint32_t buf_len, SandeshContext *sandesh_context, uint32_t alignment)
tbb::atomic< uint32_t > seqno_
KSyncEntry::KSyncEvent event_
virtual int VrResponseMsgHandler(vr_response *resp)=0
char * rx_buffers_[kMaxRxBufferCount]
static const unsigned kBufLen
const KSyncTxQueue * send_queue() const
virtual ~KSyncSockTcpSession()
void FlowMsgHandler(vr_flow_req *req)
void OnEmptyQueue(bool done)
IoContext::Type io_context_type_
void IfMsgHandler(vr_interface_req *req)
virtual void VrouterHugePageHandler(vr_hugepage_config *req)