OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ksync_tx_queue.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef _GNU_SOURCE
6 #define _GNU_SOURCE
7 #endif
8 #include <sched.h>
9 
10 #include <unistd.h>
11 #include <stdlib.h>
12 #include <sys/eventfd.h>
13 
14 #include <algorithm>
15 #include <vector>
16 #include <set>
17 #include <boost/algorithm/string/case_conv.hpp>
18 #include <boost/thread.hpp>
19 
20 #include <tbb/atomic.h>
21 #include <tbb/concurrent_queue.h>
22 
23 #include "ksync_object.h"
24 #include "ksync_sock.h"
25 
26 static bool ksync_tx_queue_task_done_ = false;
27 
28 // Set CPU affinity for KSync Tx Thread based on cpu_pin_policy.
29 // By default CPU affinity is not set. cpu_pin_policy can change it,
30 // "last" : Last CPU-ID
31 // "<num>" : Specifies CPU-ID to pin
32 static void set_thread_affinity(std::string cpu_pin_policy) {
33  int num_cores = boost::thread::hardware_concurrency();
34  if (!num_cores) {
35  LOG(ERROR, "Failure in checking number of available threads");
36  num_cores = 1;
37  }
38  char *p = NULL;
39  int cpu_id = strtoul(cpu_pin_policy.c_str(), &p, 0);
40  if (*p || cpu_pin_policy.empty()) {
41  // cpu_pin_policy is non-integer
42  // Assume pinning disabled by default
43  cpu_id = -1;
44  // If policy is "last", pick last CPU-ID
45  boost::algorithm::to_lower(cpu_pin_policy);
46  if (cpu_pin_policy == "last") {
47  cpu_id = num_cores - 1;
48  }
49  } else {
50  // cpu_pin_policy is integer
51  // Disable pinning if configured value out of range
52  if (cpu_id >= num_cores)
53  cpu_id = -1;
54  }
55 
56  if (cpu_id >= 0) {
57  cpu_set_t cpuset;
58  CPU_ZERO(&cpuset);
59  CPU_SET(cpu_id, &cpuset);
60  LOG(ERROR, "KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
61  << ">. KsyncTxQueue pinned to CPU " << cpu_id);
62  sched_setaffinity(0, sizeof(cpuset), &cpuset);
63  } else {
64  LOG(ERROR, "KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
65  << ">. KsyncTxQueuen not pinned to CPU");
66  }
67 }
68 
69 class KSyncTxQueueTask : public Task {
70 public:
72  Task(scheduler->GetTaskId("Ksync::KSyncTxQueue"), 0), queue_(queue) {
73  }
76  }
77 
78  bool Run() {
79  queue_->Run();
80  return true;
81  }
82  std::string Description() const { return "KSyncTxQueue"; }
83 
84 private:
86 };
87 
89  work_queue_(NULL),
90  event_fd_(-1),
91  cpu_pin_policy_(),
92  sock_(sock),
93  enqueues_(0),
94  dequeues_(0),
95  write_events_(0),
96  read_events_(0),
97  busy_time_(0),
98  measure_busy_time_(false) {
99  queue_len_ = 0;
100  shutdown_ = false;
101  ClearStats();
102 }
103 
105 }
106 
107 void KSyncTxQueue::Init(bool use_work_queue,
108  const std::string &cpu_pin_policy) {
109  cpu_pin_policy_ = cpu_pin_policy;
111  if (use_work_queue) {
112  assert(work_queue_ == NULL);
114  (scheduler->GetTaskId("Ksync::AsyncSend"), 0,
115  boost::bind(&KSyncSock::SendAsyncImpl, sock_, _1));
117  (boost::bind(&KSyncSock::OnEmptyQueue, sock_, _1));
118  return;
119  }
120  assert((event_fd_ = eventfd(0, (EFD_CLOEXEC | EFD_SEMAPHORE))) >= 0);
121 
122  KSyncTxQueueTask *task = new KSyncTxQueueTask(scheduler, this);
123  scheduler->Enqueue(task);
124 }
125 
127  shutdown_ = true;
128  if (work_queue_) {
129  assert(work_queue_->Length() == 0);
131  delete work_queue_;
132  work_queue_ = NULL;
133  return;
134  }
135 
136  uint64_t u = 1;
137  assert(write(event_fd_, &u, sizeof(u)) == sizeof(u));
138  while (queue_len_ != 0) {
139  usleep(1);
140  }
141 
142  while(ksync_tx_queue_task_done_ != true) {
143  usleep(1);
144  }
145  close(event_fd_);
146 }
147 
149  if (work_queue_) {
150  work_queue_->Enqueue(io_context);
151  return true;
152  }
153  queue_.push(io_context);
154  enqueues_++;
155  size_t ncount = queue_len_.fetch_and_increment() + 1;
156  if (ncount > max_queue_len_)
157  max_queue_len_ = ncount;
158  if (ncount == 1) {
159  uint64_t u = 1;
160  int res = 0;
161  while ((res = write(event_fd_, &u, sizeof(u))) < (int)sizeof(u)) {
162  int ec = errno;
163  if (ec != EINTR && ec != EIO) {
164  LOG(ERROR, "KsyncTxQueue write failure : " << ec << " : "
165  << strerror(ec));
166  assert(0);
167  }
168  }
169 
170  write_events_++;
171  }
172  return true;
173 }
174 
177  while (1) {
178  while (1) {
179  uint64_t u = 0;
180  ssize_t num = read(event_fd_, &u, sizeof(u));
181  if (num >= (int)sizeof(u)) {
182  break;
183  }
184  if (errno != EINTR && errno != EIO) {
185  LOG(ERROR, "KsyncTxQueue read failure : " << errno << " : "
186  << strerror(errno));
187  assert(0);
188  }
189  }
190  read_events_++;
191 
192  uint64_t t1 = 0;
193  if (measure_busy_time_)
194  t1 = ClockMonotonicUsec();
195  IoContext *io_context = NULL;
196  while (queue_.try_pop(io_context)) {
197  dequeues_++;
198  queue_len_ -= 1;
199  sock_->SendAsyncImpl(io_context);
200  }
201  sock_->OnEmptyQueue(false);
202  if (shutdown_) {
203  break;
204  }
205 
206  if (t1)
207  busy_time_ += (ClockMonotonicUsec() - t1);
208  }
209  return true;
210 }
bool EnqueueInternal(IoContext *io_context)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
std::string cpu_pin_policy_
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
KSyncSock * sock_
size_t max_queue_len_
KSyncTxQueueTask(TaskScheduler *scheduler, KSyncTxQueue *queue)
void Init(bool use_work_queue, const std::string &cpu_pin_policy)
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Definition: task_int.h:10
bool SendAsyncImpl(IoContext *ioc)
Definition: ksync_sock.cc:625
bool measure_busy_time_
int GetTaskId(const std::string &name)
Definition: task.cc:856
void SetExitCallback(TaskExitCallback on_exit)
Definition: queue_task.h:303
size_t Length() const
Definition: queue_task.h:356
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
std::string Description() const
KSyncTxQueue(KSyncSock *sock)
size_t read_events_
uint64_t busy_time_
int GetTaskId() const
Definition: task.h:118
tbb::atomic< bool > shutdown_
static void set_thread_affinity(std::string cpu_pin_policy)
#define LOG(_Level, _Msg)
Definition: logging.h:33
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29
static bool ksync_tx_queue_task_done_
size_t write_events_
KSyncTxQueue * queue_
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
tbb::atomic< size_t > queue_len_
void ClearStats() const
WorkQueue< IoContext * > * work_queue_
void OnEmptyQueue(bool done)
Definition: ksync_sock.cc:481