OpenSDN source code
ksync_tx_queue.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef _GNU_SOURCE
6 #define _GNU_SOURCE
7 #endif
8 #include <sched.h>
9 
10 #include <unistd.h>
11 #include <stdlib.h>
12 #include <sys/eventfd.h>
13 
14 #include <algorithm>
15 #include <vector>
16 #include <set>
17 #include <boost/algorithm/string/case_conv.hpp>
18 #include <boost/thread.hpp>
19 
20 #include <tbb/concurrent_queue.h>
21 
22 #include "ksync_object.h"
23 #include "ksync_sock.h"
24 
25 static bool ksync_tx_queue_task_done_ = false;
26 
27 // Set CPU affinity for KSync Tx Thread based on cpu_pin_policy.
28 // By default CPU affinity is not set. cpu_pin_policy can change it,
29 // "last" : Last CPU-ID
30 // "<num>" : Specifies CPU-ID to pin
31 static void set_thread_affinity(std::string cpu_pin_policy) {
32  int num_cores = boost::thread::hardware_concurrency();
33  if (!num_cores) {
34  LOG(ERROR, "Failure in checking number of available threads");
35  num_cores = 1;
36  }
37  char *p = NULL;
38  int cpu_id = strtoul(cpu_pin_policy.c_str(), &p, 0);
39  if (*p || cpu_pin_policy.empty()) {
40  // cpu_pin_policy is non-integer
41  // Assume pinning disabled by default
42  cpu_id = -1;
43  // If policy is "last", pick last CPU-ID
44  boost::algorithm::to_lower(cpu_pin_policy);
45  if (cpu_pin_policy == "last") {
46  cpu_id = num_cores - 1;
47  }
48  } else {
49  // cpu_pin_policy is integer
50  // Disable pinning if configured value out of range
51  if (cpu_id >= num_cores)
52  cpu_id = -1;
53  }
54 
55  if (cpu_id >= 0) {
56  cpu_set_t cpuset;
57  CPU_ZERO(&cpuset);
58  CPU_SET(cpu_id, &cpuset);
59  LOG(ERROR, "KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
60  << ">. KsyncTxQueue pinned to CPU " << cpu_id);
61  sched_setaffinity(0, sizeof(cpuset), &cpuset);
62  } else {
63  LOG(ERROR, "KsyncTxQueue CPU pinning policy <" << cpu_pin_policy
64  << ">. KsyncTxQueuen not pinned to CPU");
65  }
66 }
67 
68 class KSyncTxQueueTask : public Task {
69 public:
71  Task(scheduler->GetTaskId("Ksync::KSyncTxQueue"), 0), queue_(queue) {
72  }
75  }
76 
77  bool Run() {
78  queue_->Run();
79  return true;
80  }
81  std::string Description() const { return "KSyncTxQueue"; }
82 
83 private:
85 };
86 
88  work_queue_(NULL),
89  event_fd_(-1),
90  cpu_pin_policy_(),
91  sock_(sock),
92  enqueues_(0),
93  dequeues_(0),
94  write_events_(0),
95  read_events_(0),
96  busy_time_(0),
97  measure_busy_time_(false) {
98  queue_len_ = 0;
99  shutdown_ = false;
100  ClearStats();
101 }
102 
104 }
105 
106 void KSyncTxQueue::Init(bool use_work_queue,
107  const std::string &cpu_pin_policy) {
108  cpu_pin_policy_ = cpu_pin_policy;
110  if (use_work_queue) {
111  assert(work_queue_ == NULL);
113  (scheduler->GetTaskId("Ksync::AsyncSend"), 0,
114  boost::bind(&KSyncSock::SendAsyncImpl, sock_, _1));
116  (boost::bind(&KSyncSock::OnEmptyQueue, sock_, _1));
117  return;
118  }
119  assert((event_fd_ = eventfd(0, (EFD_CLOEXEC | EFD_SEMAPHORE))) >= 0);
120 
121  KSyncTxQueueTask *task = new KSyncTxQueueTask(scheduler, this);
122  scheduler->Enqueue(task);
123 }
124 
126  shutdown_ = true;
127  if (work_queue_) {
128  assert(work_queue_->Length() == 0);
130  delete work_queue_;
131  work_queue_ = NULL;
132  return;
133  }
134 
135  uint64_t u = 1;
136  assert(write(event_fd_, &u, sizeof(u)) == sizeof(u));
137  while (queue_len_ != 0) {
138  usleep(1);
139  }
140 
141  while(ksync_tx_queue_task_done_ != true) {
142  usleep(1);
143  }
144  close(event_fd_);
145 }
146 
148  if (work_queue_) {
149  work_queue_->Enqueue(io_context);
150  return true;
151  }
152  queue_.push(io_context);
153  enqueues_++;
154  size_t ncount = queue_len_.fetch_add(1) + 1;
155  if (ncount > max_queue_len_)
156  max_queue_len_ = ncount;
157  if (ncount == 1) {
158  uint64_t u = 1;
159  int res = 0;
160  while ((res = write(event_fd_, &u, sizeof(u))) < (int)sizeof(u)) {
161  int ec = errno;
162  if (ec != EINTR && ec != EIO) {
163  LOG(ERROR, "KsyncTxQueue write failure : " << ec << " : "
164  << strerror(ec));
165  assert(0);
166  }
167  }
168 
169  write_events_++;
170  }
171  return true;
172 }
173 
176  while (1) {
177  while (1) {
178  uint64_t u = 0;
179  ssize_t num = read(event_fd_, &u, sizeof(u));
180  if (num >= (int)sizeof(u)) {
181  break;
182  }
183  if (errno != EINTR && errno != EIO) {
184  LOG(ERROR, "KsyncTxQueue read failure : " << errno << " : "
185  << strerror(errno));
186  assert(0);
187  }
188  }
189  read_events_++;
190 
191  uint64_t t1 = 0;
192  if (measure_busy_time_)
193  t1 = ClockMonotonicUsec();
194  IoContext *io_context = NULL;
195  while (queue_.try_pop(io_context)) {
196  dequeues_++;
197  queue_len_ -= 1;
198  sock_->SendAsyncImpl(io_context);
199  }
200  sock_->OnEmptyQueue(false);
201  if (shutdown_) {
202  break;
203  }
204 
205  if (t1)
206  busy_time_ += (ClockMonotonicUsec() - t1);
207  }
208  return true;
209 }
void OnEmptyQueue(bool done)
Definition: ksync_sock.cc:486
bool SendAsyncImpl(IoContext *ioc)
Definition: ksync_sock.cc:627
KSyncTxQueue * queue_
std::string Description() const
Gives a description of the task.
KSyncTxQueueTask(TaskScheduler *scheduler, KSyncTxQueue *queue)
bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
std::string cpu_pin_policy_
KSyncSock * sock_
uint64_t busy_time_
KSyncTxQueue(KSyncSock *sock)
std::atomic< bool > shutdown_
size_t read_events_
bool EnqueueInternal(IoContext *io_context)
void ClearStats() const
bool measure_busy_time_
size_t max_queue_len_
WorkQueue< IoContext * > * work_queue_
void Init(bool use_work_queue, const std::string &cpu_pin_policy)
size_t write_events_
std::atomic< size_t > queue_len_
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
int GetTaskId(const std::string &name)
Definition: task.cc:861
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
size_t Length() const
Definition: queue_task.h:356
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
void SetExitCallback(TaskExitCallback on_exit)
Definition: queue_task.h:303
static void set_thread_affinity(std::string cpu_pin_policy)
static bool ksync_tx_queue_task_done_
#define LOG(_Level, _Msg)
Definition: logging.h:34
struct task_ task
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29