8 #include <tbb/atomic.h>
9 #include <tbb/concurrent_queue.h>
10 #include <tbb/mutex.h>
17 using tbb::concurrent_queue;
102 tbb::mutex::scoped_lock lock(
mutex_);
174 :
Task(queue->db_task_id(), queue->db_partition_id()),
189 tbb::spin_rw_mutex::scoped_lock
233 return "DBPartition QueueRunner";
250 tbb::mutex::scoped_lock lock(mutex_);
251 MaybeStartRunnerUnlocked();
255 tbb::mutex::scoped_lock lock(mutex_);
256 if (disable_ || (request_queue_.empty() && remove_queue_.empty())) {
bool is_state_empty_unlocked(DBTablePartBase *tpart)
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
std::string Description() const
QueueRunner(WorkQueue *queue)
static const int kMaxIterations
bool IsDBQueueEmpty() const
bool DequeueRequest(RequestQueueEntry **req_entry)
concurrent_queue< RequestQueueEntry * > RequestQueue
void EnqueueRemove(RemoveQueueEntry *rm_entry)
WorkQueue(DBPartition *partition, int partition_id)
DISALLOW_COPY_AND_ASSIGN(WorkQueue)
concurrent_queue< RemoveQueueEntry * > RemoveQueue
bool EnqueueRequest(RequestQueueEntry *req_entry)
uint64_t total_request_count() const
bool DequeueRemove(RemoveQueueEntry **rm_entry)
long request_queue_len() const
uint64_t max_request_queue_len_
void SetActive(DBTablePartBase *tpart)
void set_disable(bool disable)
DBPartition * db_partition_
void MaybeStartRunnerUnlocked()
uint64_t total_request_count_
std::list< DBTablePartBase * > TablePartList
static const int kThreshold
TablePartList change_list_
RequestQueue request_queue_
RemoveQueue remove_queue_
atomic< long > request_count_
DBTablePartBase * GetActiveTable()
uint64_t max_request_queue_len() const
long request_queue_len() const
uint64_t total_request_count() const
void EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry)
void SetQueueDisable(bool disable)
DBPartition(DB *db, int partition_id)
uint64_t max_request_queue_len() const
bool EnqueueRequest(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
std::unique_ptr< WorkQueue > work_queue_
bool IsDBQueueEmpty() const
void OnTableChange(DBTablePartBase *tpart)
tbb::spin_rw_mutex & dbstate_mutex()
virtual void Process(DBClient *client, DBRequest *req)=0
virtual void Remove(DBEntryBase *)=0
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a wrapper over tbb::task to support policies.
void Swap(DBRequest *rhs)
RemoveQueueEntry(DBTablePartBase *tpart, DBEntryBase *db_entry)
RequestQueueEntry(DBTablePartBase *tpart, DBClient *client, DBRequest *req)