7 #include <tbb/atomic.h>
33 for (std::vector<Task *>::iterator it =
workers_.begin();
35 scheduler->Enqueue(*it);
74 std::string
Description()
const {
return "DBTableWalker::Worker"; }
90 static unsigned int walk_sleep_usecs_;
96 char *wait = getenv(
"DB_WALKER_WAIT_USECS");
98 walk_sleep_usecs_ = (
unsigned int) strtoul(wait, NULL, 0);
99 if (walk_sleep_usecs_ > 1000000)
100 walk_sleep_usecs_ = 1000000;
104 if (walk_sleep_usecs_) {
105 usleep(walk_sleep_usecs_);
119 if ((key_resume =
walk_ctx_.get()) == NULL) {
125 if (key_resume != NULL) {
127 std::unique_ptr<const DBEntryBase> start;
138 for (
DBEntry *next = NULL; entry; entry = next) {
162 long num_walkers_on_tpart =
walker_->
status_.fetch_and_decrement();
163 if (num_walkers_on_tpart == 1) {
184 : id_(id), wkmgr_(wkmgr), table_(table),
186 walker_fn_(walker), done_fn_(walk_done) {
190 for (
int i = 0; i < num_worker; i++) {
212 bool postpone_walk) {
219 walkerfn, walk_complete, postpone_walk);
227 walkerfn, walk_complete, postpone_walk);
251 if ((
size_t)
id ==
walkers_.size() - 1) {
DBTablePartition * tbl_partition_
static int GetIterationToYield()
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
DBTableWalker(int task_id=-1)
void PurgeWalker(WalkId id)
void incr_walk_cancel_count()
virtual void RetryDelete()
virtual DBEntry * GetNext(const DBEntryBase *entry)
tbb::mutex walkers_mutex_
std::unique_ptr< DBRequestKey > key_start_
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
int GetTaskId(const std::string &name)
static int max_iteration_to_yield_
static void db_walker_wait()
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
std::unique_ptr< DBRequestKey > walk_ctx_
Walker(WalkId id, DBTableWalker *wkmgr, DBTable *table, const DBRequestKey *key, WalkFn walker, WalkCompleteFn walk_done, bool postpone_walk)
tbb::atomic< bool > should_stop_
DBTableWalker::Walker * walker_
virtual std::unique_ptr< DBEntry > AllocEntry(const DBRequestKey *key) const =0
virtual DBEntry * lower_bound(const DBEntryBase *entry)
void incr_walk_complete_count()
const DBRequestKey * key_start_
void WalkCancel(WalkId id)
Worker(Walker *walker, int db_partition_id, const DBRequestKey *key)
virtual KeyPtr GetDBRequestKey() const =0
tbb::atomic< long > status_
uint64_t decr_walker_count()
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
void WalkResume(WalkId id)
WalkId WalkTable(DBTable *table, const DBRequestKey *key_start, WalkFn walker, WalkCompleteFn walk_complete, bool postpone_walk=false)
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
boost::function< void(DBTableBase *)> WalkCompleteFn
std::vector< Task * > workers_
virtual DBEntry * GetFirst()
std::string Description() const
virtual int PartitionCount() const
Task is a wrapper over tbb::task to support policies.
void incr_walk_request_count()