OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
queue_task.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 // queue_task.h
6 //
7 // Task based queue processor implementing thread safe enqueue and dequeue
8 // using concurrent queues. If queue is empty, enqueue creates a dequeue task
9 // that drains the queue. The dequeue task runs a maximum of kMaxIterations
10 // before yielding.
11 //
12 #ifndef __QUEUE_TASK_H__
13 #define __QUEUE_TASK_H__
14 
15 #include <iostream>
16 #include <sstream>
17 #include <algorithm>
18 #include <vector>
19 #include <set>
20 
21 #include <tbb/atomic.h>
22 #include <tbb/concurrent_queue.h>
23 #include <tbb/mutex.h>
24 #include <tbb/spin_rw_mutex.h>
25 
26 #include <base/task.h>
27 #include <base/time_util.h>
28 #include <base/watermark.h>
29 
30 template <typename QueueEntryT, typename QueueT>
31 class QueueTaskRunner : public Task {
32 public:
33  QueueTaskRunner(QueueT *queue)
34  : Task(queue->GetTaskId(), queue->GetTaskInstance()), queue_(queue) {
35  }
36 
37  bool Run() {
38  // Check if this run needs to be deferred
39  if (!queue_->OnEntry()) {
40  return false;
41  }
42  return RunQueue();
43  // No more client callbacks after updating
44  // queue running_ and current_runner_ in RunQueue to
45  // avoid client callbacks running concurrently
46  }
47 
48  virtual std::string Description() const {
49  return queue_->Description();
50  }
51 
52 private:
53  bool RunQueue() {
54  // Check if we need to abort
55  if (queue_->RunnerAbort()) {
56  return queue_->RunnerDone();
57  }
58 
59  uint64_t start = 0;
60  if (queue_->measure_busy_time_)
61  start = ClockMonotonicUsec();
62 
63  QueueEntryT entry = QueueEntryT();
64  size_t count = 0;
65  while (queue_->Dequeue(&entry)) {
66  // Process the entry
67  if (!queue_->GetCallback()(entry)) {
68  break;
69  }
70  if (++count == queue_->max_iterations_) {
71  if (start)
72  queue_->add_busy_time(ClockMonotonicUsec() - start);
73  return queue_->RunnerDone();
74  }
75  }
76 
77  if (start)
78  queue_->add_busy_time(ClockMonotonicUsec() - start);
79 
80  // Running is done if queue_ is empty
81  // While notification is being run, its possible that more entries
82  // are added into queue_
83  return queue_->RunnerDone();
84  }
85 
86  QueueT *queue_;
87 };
88 
89 template <typename QueueEntryT>
91  template <typename QueueT>
92  void operator()(QueueT &, bool) {}
93 };
94 
95 template <typename QueueEntryT>
96 struct WorkQueueDelete<QueueEntryT *> {
97  template <typename QueueT>
98  void operator()(QueueT &q, bool delete_entry) {
99  QueueEntryT *entry;
100  while (q.try_pop(entry)) {
101  if (delete_entry) {
102  delete entry;
103  }
104  }
105  }
106 };
107 
108 template <typename QueueEntryT>
109 class WorkQueue {
110 public:
111  static const int kMaxSize = 1024;
112  static const int kMaxIterations = 32;
113  typedef tbb::concurrent_queue<QueueEntryT> Queue;
114  typedef boost::function<bool (QueueEntryT)> Callback;
115  typedef boost::function<bool (void)> StartRunnerFunc;
116  typedef boost::function<void (bool)> TaskExitCallback;
117  typedef boost::function<bool ()> TaskEntryCallback;
118 
119  WorkQueue(int taskId, int taskInstance, Callback callback,
120  size_t size = kMaxSize,
121  size_t max_iterations = kMaxIterations) :
122  running_(false),
123  taskId_(taskId),
124  taskInstance_(taskInstance),
125  name_(""),
126  callback_(callback),
127  on_entry_cb_(0),
128  on_exit_cb_(0),
129  start_runner_(0),
130  current_runner_(NULL),
132  deleted_(false),
133  enqueues_(0),
134  dequeues_(0),
135  drops_(0),
136  max_iterations_(max_iterations),
137  size_(size),
138  bounded_(false),
139  shutdown_scheduled_(false),
141  task_starts_(0),
142  max_queue_len_(0),
143  busy_time_(0),
144  measure_busy_time_(false) {
145  count_ = 0;
146  disabled_ = false;
147  }
148 
149  // Concurrency - should be called from a task whose policy
150  // assures that the dequeue task - QueueTaskRunner is not running
151  // concurrently
152  void Shutdown(bool delete_entries = true) {
153  tbb::mutex::scoped_lock lock(mutex_);
154  ShutdownLocked(delete_entries);
155  }
156 
157  // Concurrency - can be called from any context
158  // Schedule shutdown of the WorkQueue, shutdown may happen asynchronously
159  // or in the caller's context also
160  void ScheduleShutdown(bool delete_entries = true) {
161  tbb::mutex::scoped_lock lock(mutex_);
162  if (shutdown_scheduled_) {
163  return;
164  }
165  shutdown_scheduled_ = true;
166  delete_entries_on_shutdown_ = delete_entries;
167 
168  // Cancel QueueTaskRunner
169  if (running_) {
170  assert(current_runner_);
172  TaskScheduler::CancelReturnCode cancel_code =
173  scheduler->Cancel(current_runner_);
174  if (cancel_code == TaskScheduler::CANCELLED) {
175  running_ = false;
176  current_runner_ = NULL;
177  ShutdownLocked(delete_entries);
178  } else {
179  assert(cancel_code == TaskScheduler::QUEUED);
180  }
181  } else {
182  ShutdownLocked(delete_entries);
183  }
184  }
185 
187  tbb::mutex::scoped_lock lock(mutex_);
188  // Shutdown() needs to be called before deleting
189  //assert(!running_ && deleted_);
190  }
191 
192  void SetStartRunnerFunc(StartRunnerFunc start_runner_fn) {
193  start_runner_ = start_runner_fn;
194  }
195 
196  void SetSize(size_t size) {
197  size_ = size;
198  }
199 
200  void SetBounded(bool bounded) {
201  bounded_ = bounded;
202  }
203 
204  bool GetBounded() const {
205  return bounded_;
206  }
207 
208  void SetHighWaterMark(const WaterMarkInfos &high_water) {
209  tbb::mutex::scoped_lock lock(water_mutex_);
210  watermarks_.SetHighWaterMark(high_water);
211  }
212 
213  void SetHighWaterMark(const WaterMarkInfo& hwm_info) {
214  tbb::mutex::scoped_lock lock(water_mutex_);
215  watermarks_.SetHighWaterMark(hwm_info);
216  }
217 
219  tbb::mutex::scoped_lock lock(water_mutex_);
221  }
222 
224  tbb::mutex::scoped_lock lock(water_mutex_);
225  return watermarks_.GetHighWaterMark();
226  }
227 
228  void SetLowWaterMark(const WaterMarkInfos &low_water) {
229  tbb::mutex::scoped_lock lock(water_mutex_);
230  watermarks_.SetLowWaterMark(low_water);
231  }
232 
233  void SetLowWaterMark(const WaterMarkInfo& lwm_info) {
234  tbb::mutex::scoped_lock lock(water_mutex_);
235  watermarks_.SetLowWaterMark(lwm_info);
236  }
237 
239  tbb::mutex::scoped_lock lock(water_mutex_);
241  }
242 
244  tbb::mutex::scoped_lock lock(water_mutex_);
245  return watermarks_.GetLowWaterMark();
246  }
247 
248  bool Enqueue(QueueEntryT entry) {
249  if (bounded_) {
250  if (AreWaterMarksSet()) {
251  return EnqueueBoundedLocked(entry);
252  } else {
253  return EnqueueBounded(entry);
254  }
255  } else {
256  if (AreWaterMarksSet()) {
257  return EnqueueInternalLocked(entry);
258  } else {
259  return EnqueueInternal(entry);
260  }
261  }
262  }
263 
264  // Returns true if pop is successful.
265  bool Dequeue(QueueEntryT *entry) {
266  if (AreWaterMarksSet()) {
267  return DequeueInternalLocked(entry);
268  } else {
269  return DequeueInternal(entry);
270  }
271  }
272 
273  int GetTaskId() const {
274  return taskId_;
275  }
276 
277  int GetTaskInstance() const {
278  return taskInstance_;
279  }
280 
282  tbb::mutex::scoped_lock lock(mutex_);
283  if (running_ || queue_.empty() || deleted_ || RunnerAbortLocked()) {
284  return;
285  }
286  task_starts_++;
287  running_ = true;
288  assert(current_runner_ == NULL);
292  scheduler->Enqueue(current_runner_);
293  }
294 
296  return callback_;
297  }
298 
300  on_entry_cb_ = on_entry;
301  }
302 
304  on_exit_cb_ = on_exit;
305  }
306 
307  void set_name(const std::string &name) {
308  name_ = name;
309  }
310  std::string Description() const {
311  if (name_.empty() == false)
312  return name_;
313 
314  std::ostringstream str;
315  str << "Function " << callback_;
316  return str.str();
317  }
318 
319  void set_disable(bool disabled) {
320  if (disabled_ != disabled) {
321  disabled_ = disabled;
322  if (!disabled_) {
324  }
325  }
326  }
327 
328  bool IsDisabled() const {
329  return disabled_;
330  }
331 
332  size_t on_entry_defer_count() const {
333  return on_entry_defer_count_;
334  }
335 
336  bool OnEntry() {
337  bool run = (on_entry_cb_.empty() || on_entry_cb_());
338 
339  // Track number of times this queue run is deferred
340  if (!run) {
342  }
343  return run;
344  }
345 
346  void OnExit(bool done) {
347  if (!on_exit_cb_.empty()) {
348  on_exit_cb_(done);
349  }
350  }
351 
352  bool IsQueueEmpty() const {
353  return queue_.empty();
354  }
355 
356  size_t Length() const {
357  return count_;
358  }
359 
360  size_t NumEnqueues() const {
361  return enqueues_;
362  }
363 
364  size_t NumDequeues() const {
365  return dequeues_;
366  }
367 
368  size_t NumDrops() const {
369  return drops_;
370  }
371 
372  bool deleted() const {
373  return deleted_;
374  }
375 
376  uint32_t task_starts() const { return task_starts_; }
377  size_t max_queue_len() const { return max_queue_len_; }
378  bool measure_busy_time() const { return measure_busy_time_; }
379  void set_measure_busy_time(bool val) const { measure_busy_time_ = val; }
380  uint64_t busy_time() const { return busy_time_; }
381  void add_busy_time(uint64_t t) { busy_time_ += t; }
382  void ClearStats() const {
383  max_queue_len_ = 0;
384  enqueues_ = 0;
385  dequeues_ = 0;
386  busy_time_ = 0;
387  task_starts_ = 0;
388  }
389 private:
390  // Returns true if pop is successful.
391  bool DequeueInternal(QueueEntryT *entry) {
392  bool success = queue_.try_pop(*entry);
393  if (success) {
394  dequeues_++;
395  size_t ncount(AtomicDecrementQueueCount(entry));
396  ProcessLowWaterMarks(ncount);
397  }
398  return success;
399  }
400 
401  bool DequeueInternalLocked(QueueEntryT *entry) {
402  tbb::mutex::scoped_lock lock(water_mutex_);
403  return DequeueInternal(entry);
404  }
405 
406  bool AreWaterMarksSet() const {
407  return watermarks_.AreWaterMarksSet();
408  }
409 
410  void ShutdownLocked(bool delete_entries) {
411  // Cancel QueueTaskRunner from the scheduler
412  assert(!deleted_);
413  if (running_) {
414  running_ = false;
415  assert(current_runner_);
417  TaskScheduler::CancelReturnCode cancel_code =
418  scheduler->Cancel(current_runner_);
419  assert(cancel_code == TaskScheduler::CANCELLED);
420  current_runner_ = NULL;
421  }
425  deleter(queue_, delete_entries);
426  queue_.clear();
427  count_ = 0;
428  deleted_ = true;
429  }
430 
431  size_t AtomicIncrementQueueCount(QueueEntryT *entry) {
432  return count_.fetch_and_increment() + 1;
433  }
434 
435  size_t AtomicDecrementQueueCount(QueueEntryT *entry) {
436  return count_.fetch_and_decrement() - 1;
437  }
438 
439  void ProcessHighWaterMarks(size_t count) {
441  }
442 
443  void ProcessLowWaterMarks(size_t count) {
445  }
446 
447  bool EnqueueInternal(QueueEntryT entry) {
448  enqueues_++;
449  size_t ncount(AtomicIncrementQueueCount(&entry));
450  if (ncount > max_queue_len_)
451  max_queue_len_ = ncount;
452  ProcessHighWaterMarks(ncount);
453  queue_.push(entry);
455  return ncount < size_;
456  }
457 
458  bool EnqueueInternalLocked(QueueEntryT entry) {
459  tbb::mutex::scoped_lock lock(water_mutex_);
460  return EnqueueInternal(entry);
461  }
462 
463  bool EnqueueBounded(QueueEntryT entry) {
464  size_t ncount(AtomicIncrementQueueCount(&entry));
465  if (ncount > max_queue_len_)
466  max_queue_len_ = ncount;
467  if (ncount < size_) {
468  enqueues_++;
469  ProcessHighWaterMarks(ncount);
470  queue_.push(entry);
472  return true;
473  }
475  drops_++;
477  return false;
478  }
479 
480  bool EnqueueBoundedLocked(QueueEntryT entry) {
481  tbb::mutex::scoped_lock lock(water_mutex_);
482  return EnqueueBounded(entry);
483  }
484 
486  return (disabled_ || shutdown_scheduled_ ||
487  (!start_runner_.empty() && !start_runner_()));
488  }
489 
490  bool RunnerAbort() {
491  tbb::mutex::scoped_lock lock(mutex_);
492  return RunnerAbortLocked();
493  }
494 
495  bool RunnerDone() {
496  tbb::mutex::scoped_lock lock(mutex_);
497  bool done = false;
498  if (queue_.empty() || RunnerAbortLocked()) {
499  done = true;
500  OnExit(done);
501  current_runner_ = NULL;
502  running_ = false;
503  if (shutdown_scheduled_) {
505  }
506  } else {
507  OnExit(done);
508  running_ = true;
509  }
510  return done;
511  }
512 
514  tbb::atomic<size_t> count_;
515  tbb::mutex mutex_;
516  bool running_;
517  int taskId_;
519  std::string name_;
526  tbb::atomic<bool> disabled_;
527  bool deleted_;
528  mutable size_t enqueues_;
529  mutable size_t dequeues_;
530  size_t drops_;
532  size_t size_;
533  bool bounded_;
537  mutable tbb::mutex water_mutex_;
538  mutable uint32_t task_starts_;
539  mutable size_t max_queue_len_;
540  mutable uint64_t busy_time_;
541  mutable bool measure_busy_time_;
542 
543  friend class QueueTaskTest;
544  friend class QueueTaskShutdownTest;
546  friend class QueueTaskRunner<QueueEntryT, WorkQueue<QueueEntryT> >;
547 
549 };
550 
551 #endif /* __QUEUE_TASK_H__ */
bool Dequeue(QueueEntryT *entry)
Definition: queue_task.h:265
bool IsDisabled() const
Definition: queue_task.h:328
void SetBounded(bool bounded)
Definition: queue_task.h:200
boost::function< bool(void)> StartRunnerFunc
Definition: queue_task.h:115
uint32_t task_starts() const
Definition: queue_task.h:376
void SetEntryCallback(TaskEntryCallback on_entry)
Definition: queue_task.h:299
size_t on_entry_defer_count_
Definition: queue_task.h:525
boost::function< bool()> TaskEntryCallback
Definition: queue_task.h:117
bool IsQueueEmpty() const
Definition: queue_task.h:352
size_t max_iterations_
Definition: queue_task.h:531
QueueTaskRunner< QueueEntryT, WorkQueue< QueueEntryT > > * current_runner_
Definition: queue_task.h:524
int GetTaskId() const
Definition: queue_task.h:273
uint64_t busy_time_
Definition: queue_task.h:540
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
friend class QueueTaskTest
Definition: queue_task.h:543
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
TaskExitCallback on_exit_cb_
Definition: queue_task.h:522
size_t max_queue_len() const
Definition: queue_task.h:377
size_t drops_
Definition: queue_task.h:530
void operator()(QueueT &q, bool delete_entry)
Definition: queue_task.h:98
CancelReturnCode
Definition: task.h:205
size_t NumDequeues() const
Definition: queue_task.h:364
QueueT * queue_
Definition: queue_task.h:86
void add_busy_time(uint64_t t)
Definition: queue_task.h:381
bool EnqueueInternal(QueueEntryT entry)
Definition: queue_task.h:447
void ProcessLowWaterMarks(size_t count)
Definition: watermark.cc:74
void SetHighWaterMark(const WaterMarkInfos &high_water)
Definition: queue_task.h:208
bool RunnerAbort()
Definition: queue_task.h:490
bool AreWaterMarksSet() const
Definition: watermark.cc:51
bool AreWaterMarksSet() const
Definition: queue_task.h:406
void OnExit(bool done)
Definition: queue_task.h:346
boost::function< bool(QueueEntryT)> Callback
Definition: queue_task.h:114
Queue queue_
Definition: queue_task.h:513
void ProcessLowWaterMarks(size_t count)
Definition: queue_task.h:443
bool OnEntry()
Definition: queue_task.h:336
bool bounded_
Definition: queue_task.h:533
void ResetLowWaterMark()
Definition: queue_task.h:238
void ScheduleShutdown(bool delete_entries=true)
Definition: queue_task.h:160
tbb::mutex water_mutex_
Definition: queue_task.h:537
bool deleted() const
Definition: queue_task.h:372
WorkQueue(int taskId, int taskInstance, Callback callback, size_t size=kMaxSize, size_t max_iterations=kMaxIterations)
Definition: queue_task.h:119
DISALLOW_COPY_AND_ASSIGN(WorkQueue)
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Definition: task.cc:699
WaterMarkInfos GetLowWaterMark() const
Definition: watermark.cc:39
std::set< WaterMarkInfo > WaterMarkInfos
Definition: watermark.h:40
WaterMarkInfos GetHighWaterMark() const
Definition: queue_task.h:223
size_t max_queue_len_
Definition: queue_task.h:539
int taskInstance_
Definition: queue_task.h:518
static const int kMaxIterations
Definition: queue_task.h:112
size_t NumDrops() const
Definition: queue_task.h:368
void SetExitCallback(TaskExitCallback on_exit)
Definition: queue_task.h:303
static const int kMaxSize
Definition: queue_task.h:111
void SetSize(size_t size)
Definition: queue_task.h:196
size_t Length() const
Definition: queue_task.h:356
void operator()(QueueT &, bool)
Definition: queue_task.h:92
static TaskScheduler * GetInstance()
Definition: task.cc:547
WaterMarkTuple watermarks_
Definition: queue_task.h:536
bool EnqueueInternalLocked(QueueEntryT entry)
Definition: queue_task.h:458
bool deleted_
Definition: queue_task.h:527
size_t dequeues_
Definition: queue_task.h:529
bool EnqueueBoundedLocked(QueueEntryT entry)
Definition: queue_task.h:480
TaskEntryCallback on_entry_cb_
Definition: queue_task.h:521
Callback GetCallback() const
Definition: queue_task.h:295
void SetLowWaterMark(const WaterMarkInfos &low_water)
Definition: watermark.cc:27
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
int taskId_
Definition: queue_task.h:517
void MayBeStartRunner()
Definition: queue_task.h:281
StartRunnerFunc start_runner_
Definition: queue_task.h:523
int GetTaskInstance() const
Definition: task.h:119
void SetHighWaterMark(const WaterMarkInfos &high_water)
Definition: watermark.cc:11
tbb::atomic< bool > disabled_
Definition: queue_task.h:526
bool EnqueueBounded(QueueEntryT entry)
Definition: queue_task.h:463
std::string Description() const
Definition: queue_task.h:310
void ShutdownLocked(bool delete_entries)
Definition: queue_task.h:410
bool RunnerAbortLocked()
Definition: queue_task.h:485
int GetTaskInstance() const
Definition: queue_task.h:277
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Definition: queue_task.h:37
void set_disable(bool disabled)
Definition: queue_task.h:319
void ResetLowWaterMark()
Definition: watermark.cc:35
size_t AtomicIncrementQueueCount(QueueEntryT *entry)
Definition: queue_task.h:431
bool measure_busy_time_
Definition: queue_task.h:541
bool RunQueue()
Definition: queue_task.h:53
friend class QueueTaskWaterMarkTest
Definition: queue_task.h:545
void ClearStats() const
Definition: queue_task.h:382
WaterMarkInfos GetHighWaterMark() const
Definition: watermark.cc:23
bool shutdown_scheduled_
Definition: queue_task.h:534
int GetTaskId() const
Definition: task.h:118
size_t AtomicDecrementQueueCount(QueueEntryT *entry)
Definition: queue_task.h:435
uint32_t task_starts_
Definition: queue_task.h:538
bool GetBounded() const
Definition: queue_task.h:204
void SetLowWaterMark(const WaterMarkInfos &low_water)
Definition: queue_task.h:228
boost::function< void(bool)> TaskExitCallback
Definition: queue_task.h:116
size_t enqueues_
Definition: queue_task.h:528
bool measure_busy_time() const
Definition: queue_task.h:378
bool DequeueInternal(QueueEntryT *entry)
Definition: queue_task.h:391
void ProcessHighWaterMarks(size_t count)
Definition: watermark.cc:55
void ResetHighWaterMark()
Definition: queue_task.h:218
bool RunnerDone()
Definition: queue_task.h:495
size_t on_entry_defer_count() const
Definition: queue_task.h:332
friend class QueueTaskShutdownTest
Definition: queue_task.h:544
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29
uint64_t busy_time() const
Definition: queue_task.h:380
bool running_
Definition: queue_task.h:516
size_t NumEnqueues() const
Definition: queue_task.h:360
void SetStartRunnerFunc(StartRunnerFunc start_runner_fn)
Definition: queue_task.h:192
void SetLowWaterMark(const WaterMarkInfo &lwm_info)
Definition: queue_task.h:233
bool DequeueInternalLocked(QueueEntryT *entry)
Definition: queue_task.h:401
QueueTaskRunner(QueueT *queue)
Definition: queue_task.h:33
tbb::mutex mutex_
Definition: queue_task.h:515
void ProcessHighWaterMarks(size_t count)
Definition: queue_task.h:439
void SetHighWaterMark(const WaterMarkInfo &hwm_info)
Definition: queue_task.h:213
size_t size_
Definition: queue_task.h:532
bool delete_entries_on_shutdown_
Definition: queue_task.h:535
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
WaterMarkInfos GetLowWaterMark() const
Definition: queue_task.h:243
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
void ResetHighWaterMark()
Definition: watermark.cc:19
Callback callback_
Definition: queue_task.h:520
std::string name_
Definition: queue_task.h:519
void set_name(const std::string &name)
Definition: queue_task.h:307
tbb::concurrent_queue< QueueEntryT > Queue
Definition: queue_task.h:113
tbb::atomic< size_t > count_
Definition: queue_task.h:514
virtual std::string Description() const
Definition: queue_task.h:48