7 #include <boost/asio.hpp>
8 #include <boost/bind.hpp>
10 using namespace boost::asio;
17 boost::asio::ip::address ip_address,
int port) :
TcpServer(evm), evm_(evm),
18 session_(NULL), server_ep_(ip_address, port), connect_complete_(false) {
33 int port,
const std::string &cpu_pin_policy) {
57 struct iovec iov[max_bulk_msg_count_*2];
60 memset(&msg, 0,
sizeof(msg));
67 KSyncBufferList::iterator it = iovec->begin();
68 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf, offset));
70 int count = iovec->size();
71 for(i = 0; i < count; i++) {
72 mutable_buffers_1 buf = iovec->at(i);
73 size_t buf_size = boost::asio::buffer_size(buf);
74 void* cbuf = boost::asio::buffer_cast<
void*>(buf);
76 iov[i].iov_base = cbuf;
77 iov[i].iov_len = buf_size;
82 ret = sendmsg(fd, &msg, 0);
84 LOG(ERROR,
"sendmsg failure " << ret <<
"len " << len);
107 uint32_t buf_len = 0;
110 return bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO,
IsMoreData(data));
119 uint32_t bytes_read = 0;
120 boost::system::error_code ec;
121 const struct nlmsghdr *nlh = NULL;
124 mutable_buffers_1 netlink_header(buffer_cast<void *>(buf),
125 sizeof(
struct nlmsghdr));
129 while (bytes_read <
sizeof(
struct nlmsghdr)) {
130 mutable_buffers_1 buffer =
131 static_cast<mutable_buffers_1
>(netlink_header + bytes_read);
138 if (bytes_read ==
sizeof(
struct nlmsghdr)) {
139 nlh = buffer_cast<
struct nlmsghdr *>(buf);
143 if (nlh->nlmsg_type == NLMSG_ERROR) {
144 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq
145 <<
" len " << nlh->nlmsg_len);
150 uint32_t payload_size = nlh->nlmsg_len -
sizeof(
struct nlmsghdr);
152 mutable_buffers_1 data(buffer_cast<void *>(buf +
sizeof(
struct nlmsghdr)),
155 while (bytes_read < payload_size) {
156 mutable_buffers_1 buffer =
157 static_cast<mutable_buffers_1
>(data + bytes_read);
179 struct nlmsghdr *nlh = NULL;
180 struct nlmsghdr tnlh;
182 int bytes_transferred = 0;
185 if (bytes_transferred <= 0) {
186 LOG(ERROR,
"Connection to dpdk-vrouter lost.");
192 if (
remain_ <
sizeof(
struct nlmsghdr)) {
195 (
sizeof(
struct nlmsghdr) -
remain_));
205 bufp += (nlh->nlmsg_len -
remain_);
208 offset = nlh->nlmsg_len -
remain_;
211 while (offset < bytes_transferred) {
212 if ((
unsigned int)(bytes_transferred - offset) > (
sizeof(
struct nlmsghdr))) {
213 nlh = (
struct nlmsghdr *)(
rx_buff_ + offset);
214 if ((
unsigned int)(bytes_transferred - offset) > nlh->nlmsg_len) {
217 offset += nlh->nlmsg_len;
226 remain_ = bytes_transferred - offset;
254 static int started = 0;
255 boost::system::error_code ec;
274 LOG(ERROR,
"Connection to dpdk-vrouter lost.");
293 bool async_ready) :
TcpSession(server, sock, async_ready) {
316 int remain = size - offset;
322 const struct nlmsghdr *nlh =
324 return nlh->nlmsg_len;
virtual const int GetHeaderLenSize()
virtual bool IsMoreData(char *data)
virtual TcpSession * AllocSession(Socket *socket)
boost::asio::const_buffer Buffer
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
boost::function< bool(const uint8_t *, size_t)> ReceiveCallback
virtual void SetErrno(int err)
virtual bool Validate(char *data)
boost::asio::ip::tcp::socket Socket
virtual void Connect(TcpSession *session, Endpoint remote)
KSyncSockTcpSession(TcpServer *server, Socket *sock, bool async_ready=false)
std::string Description() const
static size_t BufferSize(const Buffer &buffer)
virtual uint32_t GetSeqno(char *data)
virtual TcpSession * CreateSession()
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
void OnSessionEvent(TcpSession *session, TcpSession::Event event)
KSyncSockTcp(EventManager *evm, boost::asio::ip::address ip_addr, int port)
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
void ProcessDataInline(char *data)
boost::asio::ip::tcp::endpoint server_ep_
bool ValidateNetlink(char *data)
boost::asio::ip::tcp::socket Socket
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
void set_observer(EventObserver observer)
uint32_t max_bulk_buf_size_
uint32_t GetNetlinkSeqno(char *data)
bool ReceiveMsg(const u_int8_t *msg, size_t size)
virtual void OnRead(Buffer buffer)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual int MsgLength(Buffer buffer, int offset)
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
boost::asio::const_buffer Buffer
static void SetSockTableEntry(KSyncSock *sock)
virtual void Receive(boost::asio::mutable_buffers_1)
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
KSyncSockTcpReadTask(TaskScheduler *scheduler, KSyncSockTcp *sock)
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
KSyncSockTcpSessionReader(TcpSession *session, ReceiveCallback callback)
boost::asio::ip::tcp::socket * tcp_socket_
void reset_use_wait_tree()
static void NetlinkDecoder(char *data, SandeshContext *ctxt)
virtual void OnRead(Buffer buffer)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
bool NetlinkMsgDone(char *data)
static void Init(EventManager *evm, boost::asio::ip::address ip_addr, int port, const std::string &cpu_pin_policy)
boost::system::error_code SetTcpRecvBufSize(uint32_t size)
KSyncSockTcpSessionReader * reader_
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
#define LOG(_Level, _Msg)
static void SetNetlinkFamilyId(int id)
void set_process_data_inline()
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
boost::system::error_code SetTcpSendBufSize(uint32_t size)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
virtual Socket * socket() const
static const uint8_t * BufferData(const Buffer &buffer)
Task is a wrapper over tbb::task to support policies.
void ResetNetlink(nl_client *client)
static const unsigned kBufLen
virtual ~KSyncSockTcpSession()
boost::system::error_code SetTcpNoDelay()