7 #include <tbb/atomic.h>
33 for (std::vector<Task *>::iterator it =
workers_.begin();
74 std::string
Description()
const {
return "DBTableWalker::Worker"; }
90 static unsigned int walk_sleep_usecs_;
96 char *wait = getenv(
"DB_WALKER_WAIT_USECS");
98 walk_sleep_usecs_ = (
unsigned int) strtoul(wait, NULL, 0);
99 if (walk_sleep_usecs_ > 1000000)
100 walk_sleep_usecs_ = 1000000;
104 if (walk_sleep_usecs_) {
105 usleep(walk_sleep_usecs_);
119 if ((key_resume =
walk_ctx_.get()) == NULL) {
125 if (key_resume != NULL) {
127 std::unique_ptr<const DBEntryBase> start;
138 for (
DBEntry *next = NULL; entry; entry = next) {
162 long num_walkers_on_tpart =
walker_->
status_.fetch_and_decrement();
163 if (num_walkers_on_tpart == 1) {
184 : id_(id), wkmgr_(wkmgr), table_(table),
186 walker_fn_(walker), done_fn_(walk_done) {
190 for (
int i = 0; i < num_worker; i++) {
212 bool postpone_walk) {
219 walkerfn, walk_complete, postpone_walk);
227 walkerfn, walk_complete, postpone_walk);
251 if ((
size_t)
id ==
walkers_.size() - 1) {
virtual KeyPtr GetDBRequestKey() const =0
void incr_walk_request_count()
void incr_walk_cancel_count()
virtual void RetryDelete()
uint64_t decr_walker_count()
void incr_walk_complete_count()
virtual DBEntry * GetNext(const DBEntryBase *entry)
virtual DBEntry * lower_bound(const DBEntryBase *entry)
virtual DBEntry * GetFirst()
tbb::atomic< bool > should_stop_
Walker(WalkId id, DBTableWalker *wkmgr, DBTable *table, const DBRequestKey *key, WalkFn walker, WalkCompleteFn walk_done, bool postpone_walk)
tbb::atomic< long > status_
std::vector< Task * > workers_
std::unique_ptr< DBRequestKey > key_start_
DBTableWalker::Walker * walker_
Worker(Walker *walker, int db_partition_id, const DBRequestKey *key)
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
std::unique_ptr< DBRequestKey > walk_ctx_
std::string Description() const
DBTablePartition * tbl_partition_
const DBRequestKey * key_start_
static int GetIterationToYield()
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
static int max_iteration_to_yield_
DBTableWalker(int task_id=-1)
WalkId WalkTable(DBTable *table, const DBRequestKey *key_start, WalkFn walker, WalkCompleteFn walk_complete, bool postpone_walk=false)
tbb::mutex walkers_mutex_
void PurgeWalker(WalkId id)
void WalkCancel(WalkId id)
void WalkResume(WalkId id)
boost::function< void(DBTableBase *)> WalkCompleteFn
virtual std::unique_ptr< DBEntry > AllocEntry(const DBRequestKey *key) const =0
virtual int PartitionCount() const
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
int GetTaskId(const std::string &name)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a wrapper over tbb::task to support policies.
static void db_walker_wait()