7 #include <boost/asio.hpp>
8 #include <boost/bind/bind.hpp>
13 using namespace boost::asio;
14 using namespace boost::placeholders;
24 Task(scheduler->GetTaskId(
"Ksync::KSyncSockUdsRead"), 0), queue_(queue) {
33 std::string
Description()
const {
return "KSyncSockUdsRead"; }
45 server_ep_(sockpath_),
51 boost::system::error_code ec;
68 char *ret_buff =
new char[1024*
kBufLen];
69 boost::system::error_code ec;
75 struct nlmsghdr *nlh = NULL;
79 size_t bytes_transferred = 0;
85 LOG(INFO,
" dpdk vrouter is down, exiting.. errno:" << errno);
89 if (errno != EAGAIN) {
105 if (
remain_ <
sizeof(
struct nlmsghdr)) {
108 (
sizeof(
struct nlmsghdr) -
remain_));
117 bufp += (nlh->nlmsg_len -
remain_);
120 offset = nlh->nlmsg_len -
remain_;
122 while (offset < bytes_transferred) {
123 if ((bytes_transferred - offset) > (
sizeof(
struct nlmsghdr))) {
124 nlh = (
struct nlmsghdr *)(
rx_buff_ + offset);
125 if ((bytes_transferred - offset) > nlh->nlmsg_len) {
126 memcpy(ret_buff,
rx_buff_ + offset, nlh->nlmsg_len);
129 offset += nlh->nlmsg_len;
138 remain_ = bytes_transferred - offset;
144 const std::string &sockpathvr) {
167 uint32_t buf_len = 0;
170 return bulk_sandesh_context->
Decoder(buf, buf_len, NLA_ALIGNTO,
IsMoreData(data));
185 memset(&msg, 0,
sizeof(msg));
192 KSyncBufferList::iterator it = iovec->begin();
193 iovec->insert(it, buffer((
char *)
nl_client_->cl_buf, offset));
195 int count = iovec->size();
196 for(i = 0; i < count; i++) {
197 mutable_buffers_1 buf = iovec->at(i);
198 size_t buf_size = boost::asio::buffer_size(buf);
199 void* cbuf = boost::asio::buffer_cast<void*>(buf);
201 iov[i].iov_base = cbuf;
202 iov[i].iov_len = buf_size;
206 ret = sendmsg(
socket_, &msg, 0);
208 LOG(ERROR,
"sendmsg failure " << ret <<
"len " << len);
218 static int started = 0;
229 boost::system::error_code ec;
230 uint32_t bytes_read = 0;
231 const struct nlmsghdr *nlh = NULL;
233 char *netlink_header(buffer_cast<char *>(buf));
235 while (bytes_read <
sizeof(
struct nlmsghdr)) {
236 char *buffer = netlink_header + bytes_read;
237 bytes_read += recv(
socket_, buffer,
sizeof(
struct nlmsghdr) - bytes_read, 0);
240 if (bytes_read ==
sizeof(
struct nlmsghdr)) {
241 nlh = buffer_cast<struct nlmsghdr *>(buf);
245 if (nlh->nlmsg_type == NLMSG_ERROR) {
246 LOG(ERROR,
"Netlink error for seqno " << nlh->nlmsg_seq
247 <<
" len " << nlh->nlmsg_len);
252 uint32_t payload_size = nlh->nlmsg_len -
sizeof(
struct nlmsghdr);
253 char *data(buffer_cast<char *>(buf +
sizeof(
struct nlmsghdr)));
255 while (bytes_read < payload_size) {
256 char *buffer = data + bytes_read;
257 bytes_read += recv(
socket_, buffer, payload_size - bytes_read, 0);
virtual void SetErrno(int err)
bool Decoder(char *buff, uint32_t buff_len, uint32_t alignment, bool more)
static void NetlinkDecoder(char *data, SandeshContext *ctxt)
KSyncSockUdsReadTask(TaskScheduler *scheduler, KSyncSockUds *queue)
bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
std::string Description() const
Gives a description of the task.
virtual bool IsMoreData(char *data)
static void Init(boost::asio::io_context &ios, const std::string &cpu_pin_policy, const std::string &sockpathvr="")
KSyncSockUds(boost::asio::io_context &ios)
virtual void AsyncSendTo(KSyncBufferList *iovec, uint32_t seq_no, HandlerCb cb)
virtual std::size_t SendTo(KSyncBufferList *iovec, uint32_t seq_no)
boost::asio::local::stream_protocol::endpoint server_ep_
boost::asio::local::stream_protocol::socket sock_
virtual bool BulkDecoder(char *data, KSyncBulkSandeshContext *ctxt)
virtual void Receive(boost::asio::mutable_buffers_1)
virtual void AsyncReceive(boost::asio::mutable_buffers_1, HandlerCb)
virtual bool Validate(char *data)
virtual bool Decoder(char *data, AgentSandeshContext *ctxt)
virtual uint32_t GetSeqno(char *data)
static AgentSandeshContext * GetAgentSandeshContext(uint32_t type)
void reset_use_wait_tree()
void set_process_data_inline()
static void Init(bool use_work_queue, const std::string &cpu_pin_policy)
void ProcessDataInline(char *data)
static void SetSockTableEntry(KSyncSock *sock)
static void SetNetlinkFamilyId(int id)
boost::function< void(const boost::system::error_code &, size_t)> HandlerCb
static const unsigned kBufLen
uint32_t max_bulk_msg_count_
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a class to describe a computational task within OpenSDN control plane applications....
bool NetlinkMsgDone(char *data)
void GetNetlinkPayload(char *data, char **buf, uint32_t *buf_len)
void UpdateNetlink(nl_client *client, uint32_t len, uint32_t seq_no)
uint32_t GetNetlinkSeqno(char *data)
void ResetNetlink(nl_client *client)
#define KSYNC_AGENT_VROUTER_SOCK_PATH
std::vector< boost::asio::mutable_buffers_1 > KSyncBufferList
#define LOG(_Level, _Msg)