12 #include <sys/eventfd.h>
17 #include <boost/algorithm/string/case_conv.hpp>
18 #include <boost/thread.hpp>
20 #include <tbb/atomic.h>
21 #include <tbb/concurrent_queue.h>
33 int num_cores = boost::thread::hardware_concurrency();
35 LOG(ERROR,
"Failure in checking number of available threads");
39 int cpu_id = strtoul(cpu_pin_policy.c_str(), &p, 0);
40 if (*p || cpu_pin_policy.empty()) {
45 boost::algorithm::to_lower(cpu_pin_policy);
46 if (cpu_pin_policy ==
"last") {
47 cpu_id = num_cores - 1;
52 if (cpu_id >= num_cores)
59 CPU_SET(cpu_id, &cpuset);
60 LOG(ERROR,
"KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
61 <<
">. KsyncTxQueue pinned to CPU " << cpu_id);
62 sched_setaffinity(0,
sizeof(cpuset), &cpuset);
64 LOG(ERROR,
"KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
65 <<
">. KsyncTxQueuen not pinned to CPU");
98 measure_busy_time_(false) {
108 const std::string &cpu_pin_policy) {
111 if (use_work_queue) {
114 (scheduler->
GetTaskId(
"Ksync::AsyncSend"), 0,
120 assert((
event_fd_ = eventfd(0, (EFD_CLOEXEC | EFD_SEMAPHORE))) >= 0);
137 assert(write(
event_fd_, &u,
sizeof(u)) ==
sizeof(u));
155 size_t ncount =
queue_len_.fetch_and_increment() + 1;
161 while ((res = write(
event_fd_, &u,
sizeof(u))) < (
int)
sizeof(u)) {
163 if (ec != EINTR && ec != EIO) {
164 LOG(ERROR,
"KsyncTxQueue write failure : " << ec <<
" : "
181 if (num >= (
int)
sizeof(u)) {
184 if (errno != EINTR && errno != EIO) {
185 LOG(ERROR,
"KsyncTxQueue read failure : " << errno <<
" : "
196 while (
queue_.try_pop(io_context)) {
bool EnqueueInternal(IoContext *io_context)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
std::string cpu_pin_policy_
void Shutdown(bool delete_entries=true)
KSyncTxQueueTask(TaskScheduler *scheduler, KSyncTxQueue *queue)
void Init(bool use_work_queue, const std::string &cpu_pin_policy)
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
bool SendAsyncImpl(IoContext *ioc)
int GetTaskId(const std::string &name)
void SetExitCallback(TaskExitCallback on_exit)
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
std::string Description() const
KSyncTxQueue(KSyncSock *sock)
tbb::atomic< bool > shutdown_
static void set_thread_affinity(std::string cpu_pin_policy)
#define LOG(_Level, _Msg)
static uint64_t ClockMonotonicUsec()
static bool ksync_tx_queue_task_done_
bool Enqueue(QueueEntryT entry)
Task is a wrapper over tbb::task to support policies.
tbb::atomic< size_t > queue_len_
WorkQueue< IoContext * > * work_queue_
void OnEmptyQueue(bool done)