8 #include <tbb/atomic.h>
9 #include <tbb/concurrent_queue.h>
10 #include <tbb/mutex.h>
17 using tbb::concurrent_queue;
23 : tpart(tpart), client(client) {
33 : tpart(tpart), db_entry(db_entry) {
102 tbb::mutex::scoped_lock lock(
mutex_);
174 :
Task(queue->db_task_id(), queue->db_partition_id()),
189 tbb::spin_rw_mutex::scoped_lock
233 return "DBPartition QueueRunner";
250 tbb::mutex::scoped_lock lock(mutex_);
251 MaybeStartRunnerUnlocked();
255 tbb::mutex::scoped_lock lock(mutex_);
256 if (disable_ || (request_queue_.empty() && remove_queue_.empty())) {
tbb::spin_rw_mutex & dbstate_mutex()
concurrent_queue< RemoveQueueEntry * > RemoveQueue
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
uint64_t total_request_count() const
bool DequeueRequest(RequestQueueEntry **req_entry)
bool EnqueueRequest(RequestQueueEntry *req_entry)
uint64_t max_request_queue_len_
WorkQueue(DBPartition *partition, int partition_id)
uint64_t total_request_count() const
DISALLOW_COPY_AND_ASSIGN(WorkQueue)
DBTablePartBase * GetActiveTable()
static const int kMaxIterations
RemoveQueueEntry(DBTablePartBase *tpart, DBEntryBase *db_entry)
static const int kThreshold
void Swap(DBRequest *rhs)
bool IsDBQueueEmpty() const
QueueRunner(WorkQueue *queue)
bool IsDBQueueEmpty() const
bool DequeueRemove(RemoveQueueEntry **rm_entry)
uint64_t max_request_queue_len() const
bool EnqueueRequest(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
void MaybeStartRunnerUnlocked()
TablePartList change_list_
std::unique_ptr< WorkQueue > work_queue_
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
void SetQueueDisable(bool disable)
uint64_t max_request_queue_len() const
DBPartition * db_partition_
RequestQueueEntry(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
void SetActive(DBTablePartBase *tpart)
concurrent_queue< RequestQueueEntry * > RequestQueue
atomic< long > request_count_
long request_queue_len() const
DBPartition(DB *db, int partition_id)
bool is_state_empty_unlocked(DBTablePartBase *tpart)
void EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry)
std::string Description() const
virtual void Process(DBClient *client, DBRequest *req)=0
uint64_t total_request_count_
void EnqueueRemove(RemoveQueueEntry *rm_entry)
long request_queue_len() const
RemoveQueue remove_queue_
void set_disable(bool disable)
Task is a wrapper over tbb::task to support policies.
RequestQueue request_queue_
std::list< DBTablePartBase * > TablePartList
void OnTableChange(DBTablePartBase *tpart)
virtual void Remove(DBEntryBase *)=0