6 #include <tbb/atomic.h>
7 #include <tbb/spin_rw_mutex.h>
9 #include <boost/bind.hpp>
10 #include <boost/foreach.hpp>
11 #include <boost/dynamic_bitset.hpp>
12 #include <boost/type_traits.hpp>
23 #include "db/db_types.h"
35 #if (__GNUC_PREREQ(4, 2) > 0)
38 assert(key_has_destructor && data_has_destructor);
57 if (table_name.find(
"__ifmap_") != string::npos) {
66 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
67 size_t i =
bmap_.find_first();
68 if (i ==
bmap_.npos) {
87 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
93 if ((
size_t) listener ==
callbacks_.size() - 1) {
103 if ((
size_t) listener >=
bmap_.size()) {
104 bmap_.resize(listener + 1);
112 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
113 for (CallbackList::iterator iter =
callbacks_.begin();
134 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
136 for (CallbackList::const_iterator iter =
callbacks_.begin();
139 ShowTableListener item;
143 listeners->push_back(item);
149 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
154 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
169 enqueue_count_(0), input_count_(0), notify_count_(0) {
182 const string &name) {
187 info_->Unregister(listener);
209 info_->RunNotify(tpart, entry);
213 info_->AddToDBStateCount(listener, count);
217 return info_->GetDBStateCount(listener);
235 return !
info_->empty();
239 return info_->size();
243 info_->FillListeners(listeners);
296 if (key_resume != NULL) {
297 std::unique_ptr<const DBEntryBase> start;
308 for (
DBEntry *next = NULL; entry; entry = next) {
310 if (count == max_walk_entry_count) {
329 if (num_walkers_on_tpart == 1) {
343 assert(pending_workers_ == 0);
344 for (
int i = 0; i < table_->PartitionCount(); i++) {
346 table_->GetTablePartition(i));
347 if (!partition->
size())
continue;
348 worker_tasks_.push_back(
new WalkWorker(
this, i));
351 if (pending_workers_ == 0) {
367 static bool init_ =
false;
368 static int iter_to_yield_env_ = 0;
372 char *count_str = getenv(
"DB_ITERATION_TO_YIELD");
374 iter_to_yield_env_ = strtol(count_str, NULL, 0);
473 return tbl_partition->
Find(entry);
494 return tbl_partition->
Find(key);
509 for (vector<DBTablePartition *>::const_iterator iter =
partitions_.begin();
511 total += (*iter)->size();
522 entry = tbl_partition->
Find(key);
528 tbl_partition->
Change(entry);
531 if ((entry =
Add(req)) != NULL) {
532 tbl_partition->
Add(entry);
538 tbl_partition->
Delete(entry);
543 tbl_partition->
Notify(entry);
558 next = partition->
GetNext(entry);
559 DBState *state = entry->GetState(table,
id);
561 entry->ClearState(table,
id);
616 return walk_mgr->
AllocWalker(
this, walk_fn, walk_complete);
virtual KeyPtr GetDBRequestKey() const =0
void EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry)
bool EnqueueRequest(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
StateCountList state_count_
void AddToDBStateCount(ListenerId listener, int count)
vector< string > NameList
boost::dynamic_bitset bmap_
DBTableBase::ListenerId Register(ChangeCallback callback, const string &name)
void FillListeners(vector< ShowTableListener > *listeners) const
bool db_state_accounting_
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
uint64_t GetDBStateCount(ListenerId listener)
ListenerInfo(const string &table_name)
void Unregister(ListenerId listener)
tbb::spin_rw_mutex rw_mutex_
vector< ChangeCallback > CallbackList
vector< tbb::atomic< uint64_t > > StateCountList
bool Enqueue(DBRequest *req)
void AddToDBStateCount(ListenerId listener, int count)
tbb::atomic< uint64_t > walk_again_count_
size_t GetListenerCount() const
tbb::atomic< uint64_t > walk_complete_count_
tbb::atomic< uint64_t > walk_cancel_count_
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)=0
boost::function< void(DBTablePartBase *, DBEntryBase *)> ChangeCallback
virtual void RetryDelete()
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
void EnqueueRemove(DBEntryBase *db_entry)
uint64_t GetDBStateCount(ListenerId listener)
std::unique_ptr< ListenerInfo > info_
bool HasListeners() const
virtual bool MayDelete() const
void incr_walk_complete_count()
static const int kInvalidId
tbb::atomic< uint64_t > walk_request_count_
void Unregister(ListenerId listener)
void FillListeners(std::vector< ShowTableListener > *listeners) const
tbb::atomic< uint64_t > walk_count_
tbb::atomic< uint64_t > walker_count_
DBTableBase(DB *db, const std::string &name)
const std::string & name() const
void Notify(DBEntryBase *entry)
void Delete(DBEntryBase *)
virtual void Change(DBEntry *entry)
virtual void Add(DBEntry *entry)
DBEntry * FindNoLock(const DBEntry *entry)
virtual DBEntry * GetNext(const DBEntryBase *entry)
virtual DBEntry * lower_bound(const DBEntryBase *entry)
DBEntry * Find(const DBEntry *entry)
virtual DBEntry * GetFirst()
void ReleaseWalker(DBTable::DBTableWalkRef &walk)
DBTable::DBTableWalkRef AllocWalker(DBTable *table, DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete)
void WalkTable(DBTable::DBTableWalkRef walk)
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
void WalkAgain(DBTable::DBTableWalkRef walk)
tbb::atomic< uint16_t > pending_workers_
std::list< Task * > worker_tasks_
TableWalker(DBTable *table)
std::string Description() const
WalkWorker(TableWalker *walker, int db_partition_id)
DBTablePartition * tbl_partition_
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
std::unique_ptr< DBRequestKey > walk_ctx_
void WalkAgain(DBTableWalkRef walk)
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
virtual bool Delete(DBEntry *entry, const DBRequest *req)
std::unique_ptr< TableWalker > walker_
virtual std::unique_ptr< DBEntry > AllocEntry(const DBRequestKey *key) const =0
virtual bool OnChange(DBEntry *entry, const DBRequest *req)
virtual void Input(DBTablePartition *tbl_partition, DBClient *client, DBRequest *req)
int max_walk_iteration_to_yield_
static const int kIterationToYield
DBTableWalkRef AllocWalker(WalkFn walk_fn, WalkCompleteFn walk_complete)
virtual void Change(DBEntryBase *entry)
virtual size_t Size() const
int GetWalkIterationToYield()
virtual int PartitionCount() const
void WalkTable(DBTableWalkRef walk)
std::vector< DBTablePartition * > partitions_
DBTable::DBTableWalkRef walk_ref_
virtual DBTablePartition * AllocPartition(int index)
bool WalkCallback(DBTablePartBase *tpart, DBEntryBase *entry)
DBEntry * Find(const DBEntry *entry)
void ReleaseWalker(DBTableWalkRef &walk)
boost::function< void(DBTableWalkRef, DBTableBase *)> WalkCompleteFn
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
static void DBStateClear(DBTable *table, ListenerId id)
virtual DBEntry * Add(const DBRequest *req)
DBTable(DB *db, const std::string &name)
DBEntry * FindNoLock(const DBEntry *entry)
boost::intrusive_ptr< DBTableWalk > DBTableWalkRef
void WalkCompleteCallback(DBTableBase *tbl_base)
static void db_walker_wait()
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
virtual size_t Hash(const DBEntry *entry) const
DBTableWalkMgr * GetWalkMgr()
DBPartition * GetPartition(int index)
static int PartitionCount()
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a wrapper over tbb::task to support policies.
static size_t HashToPartition(size_t hash)
std::unique_ptr< DBRequestKey > key
void Swap(DBRequest *rhs)
std::unique_ptr< DBRequestData > data
#define CHECK_CONCURRENCY(...)
void STLDeleteValues(Container *container)