7 #include <boost/asio.hpp>
8 #include <boost/bind.hpp>
13 using namespace boost::asio;
23 Task(scheduler->GetTaskId(
"Ksync::KSyncSockUdsRead"), 0), queue_(queue) {
32 std::string
Description()
const {
return "KSyncSockUdsRead"; }
44 server_ep_(sockpath_),
50 boost::system::error_code ec;
67 char *ret_buff =
new char[1024*
kBufLen];
68 boost::system::error_code ec;
74 struct nlmsghdr *nlh = NULL;
78 size_t bytes_transferred = 0;
84 LOG(INFO,
" dpdk vrouter is down, exiting.. errno:" << errno);
88 if (errno != EAGAIN) {
104 if (
remain_ <
sizeof(
struct nlmsghdr)) {
107 (
sizeof(
struct nlmsghdr) -
remain_));
116 bufp += (nlh->nlmsg_len -
remain_);
119 offset = nlh->nlmsg_len -
remain_;
121 while (offset < bytes_transferred) {
122 if ((bytes_transferred - offset) > (
sizeof(
struct nlmsghdr))) {
123 nlh = (
struct nlmsghdr *)(
rx_buff_ + offset);
124 if ((bytes_transferred - offset) > nlh->nlmsg_len) {
125 memcpy(ret_buff,
rx_buff_ + offset, nlh->nlmsg_len);
128 offset += nlh->nlmsg_len;
137 remain_ = bytes_transferred - offset;
143 const std::string &sockpathvr) {
166 uint32_t buf_len = 0;
169 return bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO,
IsMoreData(data));
181 struct iovec iov[max_bulk_msg_count_*2];
184 memset(&msg, 0,
sizeof(msg));
191 KSyncBufferList::iterator it = iovec->begin();
192 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf, offset));
194 int count = iovec->size();
195 for(i = 0; i < count; i++) {
196 mutable_buffers_1 buf = iovec->at(i);
197 size_t buf_size = boost::asio::buffer_size(buf);
198 void* cbuf = boost::asio::buffer_cast<
void*>(buf);
200 iov[i].iov_base = cbuf;
201 iov[i].iov_len = buf_size;
205 ret = sendmsg(
socket_, &msg, 0);
207 LOG(ERROR,
"sendmsg failure " << ret <<
"len " << len);
217 static int started = 0;
228 boost::system::error_code ec;
229 uint32_t bytes_read = 0;
230 const struct nlmsghdr *nlh = NULL;
232 char *netlink_header(buffer_cast<char *>(buf));
234 while (bytes_read <
sizeof(
struct nlmsghdr)) {
235 char *buffer = netlink_header + bytes_read;
236 bytes_read += recv(
socket_, buffer,
sizeof(
struct nlmsghdr) - bytes_read, 0);
239 if (bytes_read ==
sizeof(
struct nlmsghdr)) {
240 nlh = buffer_cast<
struct nlmsghdr *>(buf);
244 if (nlh->nlmsg_type == NLMSG_ERROR) {
245 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq
246 <<
" len " << nlh->nlmsg_len);
251 uint32_t payload_size = nlh->nlmsg_len -
sizeof(
struct nlmsghdr);
252 char *data(buffer_cast<char *>(buf +
sizeof(
struct nlmsghdr)));
254 while (bytes_read < payload_size) {
255 char *buffer = data + bytes_read;
256 bytes_read += recv(
socket_, buffer, payload_size - bytes_read, 0);
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
virtual void SetErrno(int err)
virtual bool Validate(char *data)
virtual uint32_t GetSeqno(char *data)
virtual bool IsMoreData(char *data)
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
void ProcessDataInline(char *data)
KSyncSockUds(boost::asio::io_context &ios)
KSyncSockUdsReadTask(TaskScheduler *scheduler, KSyncSockUds *queue)
std::string Description() const
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
uint32_t GetNetlinkSeqno(char *data)
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
static void SetSockTableEntry(KSyncSock *sock)
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
#define KSYNC_AGENT_VROUTER_SOCK_PATH
static void Init(boost::asio::io_context &ios, const std::string &cpu_pin_policy, const std::string &sockpathvr="")
void reset_use_wait_tree()
boost::asio::local::stream_protocol::socket sock_
static void NetlinkDecoder(char *data, SandeshContext *ctxt)
boost::asio::local::stream_protocol::endpoint server_ep_
bool NetlinkMsgDone(char *data)
#define LOG(_Level, _Msg)
static void SetNetlinkFamilyId(int id)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
void set_process_data_inline()
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
virtual void Receive(boost::asio::mutable_buffers_1)
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Task is a wrapper over tbb::task to support policies.
void ResetNetlink(nl_client *client)
static const unsigned kBufLen