12 #ifndef __QUEUE_TASK_H__
13 #define __QUEUE_TASK_H__
21 #include <tbb/atomic.h>
22 #include <tbb/concurrent_queue.h>
23 #include <tbb/mutex.h>
24 #include <tbb/spin_rw_mutex.h>
30 template <
typename QueueEntryT,
typename QueueT>
49 return queue_->Description();
55 if (
queue_->RunnerAbort()) {
56 return queue_->RunnerDone();
60 if (
queue_->measure_busy_time_)
63 QueueEntryT entry = QueueEntryT();
65 while (
queue_->Dequeue(&entry)) {
67 if (!
queue_->GetCallback()(entry)) {
70 if (++count ==
queue_->max_iterations_) {
73 return queue_->RunnerDone();
83 return queue_->RunnerDone();
89 template <
typename QueueEntryT>
91 template <
typename QueueT>
95 template <
typename QueueEntryT>
97 template <
typename QueueT>
100 while (q.try_pop(entry)) {
108 template <
typename QueueEntryT>
113 typedef tbb::concurrent_queue<QueueEntryT>
Queue;
114 typedef boost::function<bool (QueueEntryT)>
Callback;
153 tbb::mutex::scoped_lock lock(
mutex_);
161 tbb::mutex::scoped_lock lock(
mutex_);
187 tbb::mutex::scoped_lock lock(
mutex_);
282 tbb::mutex::scoped_lock lock(
mutex_);
311 if (
name_.empty() ==
false)
314 std::ostringstream str;
392 bool success =
queue_.try_pop(*entry);
425 deleter(
queue_, delete_entries);
432 return count_.fetch_and_increment() + 1;
436 return count_.fetch_and_decrement() - 1;
455 return ncount <
size_;
467 if (ncount <
size_) {
491 tbb::mutex::scoped_lock lock(
mutex_);
496 tbb::mutex::scoped_lock lock(
mutex_);
bool Dequeue(QueueEntryT *entry)
void SetBounded(bool bounded)
boost::function< bool(void)> StartRunnerFunc
uint32_t task_starts() const
void SetEntryCallback(TaskEntryCallback on_entry)
size_t on_entry_defer_count_
boost::function< bool()> TaskEntryCallback
bool IsQueueEmpty() const
QueueTaskRunner< QueueEntryT, WorkQueue< QueueEntryT > > * current_runner_
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
friend class QueueTaskTest
void Shutdown(bool delete_entries=true)
TaskExitCallback on_exit_cb_
size_t max_queue_len() const
void operator()(QueueT &q, bool delete_entry)
size_t NumDequeues() const
void add_busy_time(uint64_t t)
bool EnqueueInternal(QueueEntryT entry)
void ProcessLowWaterMarks(size_t count)
void SetHighWaterMark(const WaterMarkInfos &high_water)
bool AreWaterMarksSet() const
bool AreWaterMarksSet() const
boost::function< bool(QueueEntryT)> Callback
void ProcessLowWaterMarks(size_t count)
void ScheduleShutdown(bool delete_entries=true)
WorkQueue(int taskId, int taskInstance, Callback callback, size_t size=kMaxSize, size_t max_iterations=kMaxIterations)
DISALLOW_COPY_AND_ASSIGN(WorkQueue)
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
WaterMarkInfos GetLowWaterMark() const
std::set< WaterMarkInfo > WaterMarkInfos
WaterMarkInfos GetHighWaterMark() const
static const int kMaxIterations
void SetExitCallback(TaskExitCallback on_exit)
static const int kMaxSize
void SetSize(size_t size)
void operator()(QueueT &, bool)
static TaskScheduler * GetInstance()
WaterMarkTuple watermarks_
bool EnqueueInternalLocked(QueueEntryT entry)
bool EnqueueBoundedLocked(QueueEntryT entry)
TaskEntryCallback on_entry_cb_
Callback GetCallback() const
void SetLowWaterMark(const WaterMarkInfos &low_water)
void set_measure_busy_time(bool val) const
StartRunnerFunc start_runner_
int GetTaskInstance() const
void SetHighWaterMark(const WaterMarkInfos &high_water)
tbb::atomic< bool > disabled_
bool EnqueueBounded(QueueEntryT entry)
std::string Description() const
void ShutdownLocked(bool delete_entries)
int GetTaskInstance() const
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
void set_disable(bool disabled)
size_t AtomicIncrementQueueCount(QueueEntryT *entry)
friend class QueueTaskWaterMarkTest
WaterMarkInfos GetHighWaterMark() const
size_t AtomicDecrementQueueCount(QueueEntryT *entry)
void SetLowWaterMark(const WaterMarkInfos &low_water)
boost::function< void(bool)> TaskExitCallback
bool measure_busy_time() const
bool DequeueInternal(QueueEntryT *entry)
void ProcessHighWaterMarks(size_t count)
void ResetHighWaterMark()
size_t on_entry_defer_count() const
friend class QueueTaskShutdownTest
static uint64_t ClockMonotonicUsec()
uint64_t busy_time() const
size_t NumEnqueues() const
void SetStartRunnerFunc(StartRunnerFunc start_runner_fn)
void SetLowWaterMark(const WaterMarkInfo &lwm_info)
bool DequeueInternalLocked(QueueEntryT *entry)
QueueTaskRunner(QueueT *queue)
void ProcessHighWaterMarks(size_t count)
void SetHighWaterMark(const WaterMarkInfo &hwm_info)
bool delete_entries_on_shutdown_
bool Enqueue(QueueEntryT entry)
WaterMarkInfos GetLowWaterMark() const
Task is a wrapper over tbb::task to support policies.
void ResetHighWaterMark()
void set_name(const std::string &name)
tbb::concurrent_queue< QueueEntryT > Queue
tbb::atomic< size_t > count_
virtual std::string Description() const