6 #include <tbb/atomic.h>
7 #include <tbb/spin_rw_mutex.h>
9 #include <boost/bind.hpp>
10 #include <boost/foreach.hpp>
11 #include <boost/dynamic_bitset.hpp>
12 #include <boost/type_traits.hpp>
23 #include "db/db_types.h"
35 #if (__GNUC_PREREQ(4, 2) > 0)
38 assert(key_has_destructor && data_has_destructor);
57 if (table_name.find(
"__ifmap_") != string::npos) {
66 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
67 size_t i =
bmap_.find_first();
68 if (i ==
bmap_.npos) {
87 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
93 if ((
size_t) listener ==
callbacks_.size() - 1) {
103 if ((
size_t) listener >=
bmap_.size()) {
104 bmap_.resize(listener + 1);
112 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
113 for (CallbackList::iterator iter =
callbacks_.begin();
134 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
136 for (CallbackList::const_iterator iter =
callbacks_.begin();
139 ShowTableListener item;
143 listeners->push_back(item);
149 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
154 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
169 enqueue_count_(0), input_count_(0), notify_count_(0) {
182 const string &name) {
183 return info_->Register(callback, name);
187 info_->Unregister(listener);
198 return partition->EnqueueRequest(tpart, NULL, req);
209 info_->RunNotify(tpart, entry);
213 info_->AddToDBStateCount(listener, count);
217 return info_->GetDBStateCount(listener);
235 return !
info_->empty();
239 return info_->size();
243 info_->FillListeners(listeners);
296 if (key_resume != NULL) {
297 std::unique_ptr<const DBEntryBase> start;
308 for (
DBEntry *next = NULL; entry; entry = next) {
310 if (count == max_walk_entry_count) {
329 if (num_walkers_on_tpart == 1) {
343 assert(pending_workers_ == 0);
344 for (
int i = 0; i < table_->PartitionCount(); i++) {
346 table_->GetTablePartition(i));
347 if (!partition->
size())
continue;
348 worker_tasks_.push_back(
new WalkWorker(
this, i));
351 if (pending_workers_ == 0) {
367 static bool init_ =
false;
368 static int iter_to_yield_env_ = 0;
372 char *count_str = getenv(
"DB_ITERATION_TO_YIELD");
374 iter_to_yield_env_ = strtol(count_str, NULL, 0);
473 return tbl_partition->
Find(entry);
494 return tbl_partition->
Find(key);
509 for (vector<DBTablePartition *>::const_iterator iter =
partitions_.begin();
511 total += (*iter)->size();
522 entry = tbl_partition->
Find(key);
528 tbl_partition->
Change(entry);
531 if ((entry =
Add(req)) != NULL) {
532 tbl_partition->
Add(entry);
538 tbl_partition->
Delete(entry);
543 tbl_partition->
Notify(entry);
558 next = partition->
GetNext(entry);
559 DBState *state = entry->GetState(table,
id);
561 entry->ClearState(table,
id);
616 return walk_mgr->
AllocWalker(
this, walk_fn, walk_complete);
static const int kIterationToYield
tbb::atomic< uint64_t > walk_request_count_
boost::dynamic_bitset bmap_
std::string Description() const
DBEntry * FindNoLock(const DBEntry *entry)
void STLDeleteValues(Container *container)
void WalkTable(DBTableWalkRef walk)
bool WalkCallback(DBTablePartBase *tpart, DBEntryBase *entry)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
ListenerInfo(const string &table_name)
uint64_t GetDBStateCount(ListenerId listener)
virtual bool OnChange(DBEntry *entry, const DBRequest *req)
void AddToDBStateCount(ListenerId listener, int count)
DBEntry * Find(const DBEntry *entry)
std::vector< DBTablePartition * > partitions_
virtual void RetryDelete()
DBTableBase(DB *db, const std::string &name)
virtual DBEntry * GetNext(const DBEntryBase *entry)
TableWalker(DBTable *table)
std::unique_ptr< DBRequestData > data
DBTableWalkRef AllocWalker(WalkFn walk_fn, WalkCompleteFn walk_complete)
boost::function< void(DBTablePartBase *, DBEntryBase *)> ChangeCallback
virtual DBEntry * Add(const DBRequest *req)
bool Enqueue(DBRequest *req)
virtual void Change(DBEntryBase *entry)
tbb::atomic< uint64_t > walk_count_
DBTableBase::ListenerId Register(ChangeCallback callback, const string &name)
tbb::atomic< uint64_t > walk_again_count_
void Delete(DBEntryBase *)
tbb::spin_rw_mutex rw_mutex_
uint64_t GetDBStateCount(ListenerId listener)
void FillListeners(vector< ShowTableListener > *listeners) const
bool db_state_accounting_
void Swap(DBRequest *rhs)
virtual size_t Hash(const DBEntry *entry) const
void FillListeners(std::vector< ShowTableListener > *listeners) const
void Unregister(ListenerId listener)
virtual void Change(DBEntry *entry)
DBTable::DBTableWalkRef AllocWalker(DBTable *table, DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete)
static void db_walker_wait()
void WalkTable(DBTable::DBTableWalkRef walk)
void ReleaseWalker(DBTableWalkRef &walk)
void Unregister(ListenerId listener)
DBTablePartition * tbl_partition_
boost::function< void(DBTableWalkRef, DBTableBase *)> WalkCompleteFn
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
std::unique_ptr< TableWalker > walker_
tbb::atomic< uint64_t > walk_complete_count_
void WalkAgain(DBTableWalkRef walk)
void ReleaseWalker(DBTable::DBTableWalkRef &walk)
virtual size_t Size() const
tbb::atomic< uint16_t > pending_workers_
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
DBTable(DB *db, const std::string &name)
WalkWorker(TableWalker *walker, int db_partition_id)
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
virtual bool Delete(DBEntry *entry, const DBRequest *req)
std::unique_ptr< DBRequestKey > key
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
#define CHECK_CONCURRENCY(...)
void WalkAgain(DBTable::DBTableWalkRef walk)
virtual std::unique_ptr< DBEntry > AllocEntry(const DBRequestKey *key) const =0
std::unique_ptr< ListenerInfo > info_
std::list< Task * > worker_tasks_
virtual DBEntry * lower_bound(const DBEntryBase *entry)
DBEntry * FindNoLock(const DBEntry *entry)
void incr_walk_complete_count()
const std::string & name() const
void EnqueueRemove(DBEntryBase *db_entry)
virtual DBTablePartition * AllocPartition(int index)
virtual KeyPtr GetDBRequestKey() const =0
vector< string > NameList
virtual bool MayDelete() const
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
void EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry)
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
size_t GetListenerCount() const
std::unique_ptr< DBRequestKey > walk_ctx_
int max_walk_iteration_to_yield_
static void DBStateClear(DBTable *table, ListenerId id)
bool HasListeners() const
static const int kInvalidId
DBEntry * Find(const DBEntry *entry)
void WalkCompleteCallback(DBTableBase *tbl_base)
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
static int PartitionCount()
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
virtual DBEntry * GetFirst()
static size_t HashToPartition(size_t hash)
virtual void Add(DBEntry *entry)
void AddToDBStateCount(ListenerId listener, int count)
boost::intrusive_ptr< DBTableWalk > DBTableWalkRef
void Notify(DBEntryBase *entry)
tbb::atomic< uint64_t > walk_cancel_count_
DBTableWalkMgr * GetWalkMgr()
DBTable::DBTableWalkRef walk_ref_
StateCountList state_count_
virtual int PartitionCount() const
vector< tbb::atomic< uint64_t > > StateCountList
Task is a wrapper over tbb::task to support policies.
vector< ChangeCallback > CallbackList
int GetWalkIterationToYield()
tbb::atomic< uint64_t > walker_count_
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)=0
DBPartition * GetPartition(int index)
virtual void Input(DBTablePartition *tbl_partition, DBClient *client, DBRequest *req)