12 #include <sys/eventfd.h>
17 #include <boost/algorithm/string/case_conv.hpp>
18 #include <boost/thread.hpp>
20 #include <tbb/concurrent_queue.h>
32 int num_cores = boost::thread::hardware_concurrency();
34 LOG(ERROR,
"Failure in checking number of available threads");
38 int cpu_id = strtoul(cpu_pin_policy.c_str(), &p, 0);
39 if (*p || cpu_pin_policy.empty()) {
44 boost::algorithm::to_lower(cpu_pin_policy);
45 if (cpu_pin_policy ==
"last") {
46 cpu_id = num_cores - 1;
51 if (cpu_id >= num_cores)
58 CPU_SET(cpu_id, &cpuset);
59 LOG(ERROR,
"KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
60 <<
">. KsyncTxQueue pinned to CPU " << cpu_id);
61 sched_setaffinity(0,
sizeof(cpuset), &cpuset);
63 LOG(ERROR,
"KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
64 <<
">. KsyncTxQueuen not pinned to CPU");
71 Task(scheduler->GetTaskId(
"Ksync::KSyncTxQueue"), 0),
queue_(queue) {
97 measure_busy_time_(false) {
107 const std::string &cpu_pin_policy) {
110 if (use_work_queue) {
113 (scheduler->
GetTaskId(
"Ksync::AsyncSend"), 0,
119 assert((
event_fd_ = eventfd(0, (EFD_CLOEXEC | EFD_SEMAPHORE))) >= 0);
136 assert(write(
event_fd_, &u,
sizeof(u)) ==
sizeof(u));
160 while ((res = write(
event_fd_, &u,
sizeof(u))) < (
int)
sizeof(u)) {
162 if (ec != EINTR && ec != EIO) {
163 LOG(ERROR,
"KsyncTxQueue write failure : " << ec <<
" : "
180 if (
num >= (
int)
sizeof(u)) {
183 if (errno != EINTR && errno != EIO) {
184 LOG(ERROR,
"KsyncTxQueue read failure : " << errno <<
" : "
195 while (
queue_.try_pop(io_context)) {
void OnEmptyQueue(bool done)
bool SendAsyncImpl(IoContext *ioc)
std::string Description() const
Gives a description of the task.
KSyncTxQueueTask(TaskScheduler *scheduler, KSyncTxQueue *queue)
bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
std::string cpu_pin_policy_
KSyncTxQueue(KSyncSock *sock)
std::atomic< bool > shutdown_
bool EnqueueInternal(IoContext *io_context)
WorkQueue< IoContext * > * work_queue_
void Init(bool use_work_queue, const std::string &cpu_pin_policy)
std::atomic< size_t > queue_len_
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
int GetTaskId(const std::string &name)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a class to describe a computational task within OpenSDN control plane applications....
bool Enqueue(QueueEntryT entry)
void Shutdown(bool delete_entries=true)
void SetExitCallback(TaskExitCallback on_exit)
static void set_thread_affinity(std::string cpu_pin_policy)
static bool ksync_tx_queue_task_done_
#define LOG(_Level, _Msg)
static uint64_t ClockMonotonicUsec()