8 #include <tbb/spin_rw_mutex.h>
10 #include <boost/bind/bind.hpp>
11 #include <boost/foreach.hpp>
12 #include <boost/dynamic_bitset.hpp>
13 #include <boost/type_traits.hpp>
24 #include "db/db_types.h"
30 using namespace boost::placeholders;
37 #if (__GNUC_PREREQ(4, 2) > 0)
40 assert(key_has_destructor && data_has_destructor);
53 template<
typename _Tp>
56 using std::atomic<_Tp>::atomic;
59 using std::atomic<_Tp>::operator=;
60 using std::atomic<_Tp>::load;
61 using std::atomic<_Tp>::store;
68 this->store(other.load());
81 if (table_name.find(
"__ifmap_") != string::npos) {
90 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
91 size_t i =
bmap_.find_first();
92 if (i ==
bmap_.npos) {
111 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
117 if ((
size_t) listener ==
callbacks_.size() - 1) {
127 if ((
size_t) listener >=
bmap_.size()) {
128 bmap_.resize(listener + 1);
136 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
137 for (CallbackList::iterator iter =
callbacks_.begin();
148 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
155 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
160 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
162 for (CallbackList::const_iterator iter =
callbacks_.begin();
165 ShowTableListener item;
169 listeners->push_back(item);
175 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
180 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
195 enqueue_count_(0), input_count_(0), notify_count_(0) {
208 const string &name) {
213 info_->Unregister(listener);
235 info_->RunNotify(tpart, entry);
239 info_->AddToDBStateCount(listener, count);
243 return info_->GetDBStateCount(listener);
261 return !
info_->empty();
265 return info_->size();
269 info_->FillListeners(listeners);
322 if (key_resume != NULL) {
323 std::unique_ptr<const DBEntryBase> start;
334 for (
DBEntry *next = NULL; entry; entry = next) {
336 if (count == max_walk_entry_count) {
355 if (num_walkers_on_tpart == 1) {
369 assert(pending_workers_ == 0);
370 for (
int i = 0; i < table_->PartitionCount(); i++) {
372 table_->GetTablePartition(i));
373 if (!partition->
size())
continue;
374 worker_tasks_.push_back(
new WalkWorker(
this, i));
377 if (pending_workers_ == 0) {
393 static bool init_ =
false;
394 static int iter_to_yield_env_ = 0;
398 char *count_str = getenv(
"DB_ITERATION_TO_YIELD");
400 iter_to_yield_env_ = strtol(count_str, NULL, 0);
499 return tbl_partition->
Find(entry);
520 return tbl_partition->
Find(key);
535 for (vector<DBTablePartition *>::const_iterator iter =
partitions_.begin();
537 total += (*iter)->size();
548 entry = tbl_partition->
Find(key);
554 tbl_partition->
Change(entry);
557 if ((entry =
Add(req)) != NULL) {
558 tbl_partition->
Add(entry);
564 tbl_partition->
Delete(entry);
569 tbl_partition->
Notify(entry);
584 next = partition->
GetNext(entry);
585 DBState *state = entry->GetState(table,
id);
587 entry->ClearState(table,
id);
642 return walk_mgr->
AllocWalker(
this, walk_fn, walk_complete);
virtual KeyPtr GetDBRequestKey() const =0
void EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry)
bool EnqueueRequest(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
StateCountList state_count_
void AddToDBStateCount(ListenerId listener, int count)
vector< string > NameList
boost::dynamic_bitset bmap_
DBTableBase::ListenerId Register(ChangeCallback callback, const string &name)
void FillListeners(vector< ShowTableListener > *listeners) const
bool db_state_accounting_
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
uint64_t GetDBStateCount(ListenerId listener)
ListenerInfo(const string &table_name)
void Unregister(ListenerId listener)
tbb::spin_rw_mutex rw_mutex_
vector< AtomicWithCopy< uint64_t > > StateCountList
vector< ChangeCallback > CallbackList
std::atomic< uint64_t > walk_again_count_
bool Enqueue(DBRequest *req)
void AddToDBStateCount(ListenerId listener, int count)
size_t GetListenerCount() const
std::atomic< uint64_t > walker_count_
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)=0
boost::function< void(DBTablePartBase *, DBEntryBase *)> ChangeCallback
virtual void RetryDelete()
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
void EnqueueRemove(DBEntryBase *db_entry)
std::atomic< uint64_t > walk_request_count_
uint64_t GetDBStateCount(ListenerId listener)
std::unique_ptr< ListenerInfo > info_
std::atomic< uint64_t > walk_complete_count_
bool HasListeners() const
virtual bool MayDelete() const
void incr_walk_complete_count()
std::atomic< uint64_t > walk_cancel_count_
static const int kInvalidId
void Unregister(ListenerId listener)
void FillListeners(std::vector< ShowTableListener > *listeners) const
DBTableBase(DB *db, const std::string &name)
const std::string & name() const
std::atomic< uint64_t > walk_count_
void Notify(DBEntryBase *entry)
void Delete(DBEntryBase *)
virtual void Change(DBEntry *entry)
virtual void Add(DBEntry *entry)
DBEntry * FindNoLock(const DBEntry *entry)
virtual DBEntry * GetNext(const DBEntryBase *entry)
virtual DBEntry * lower_bound(const DBEntryBase *entry)
DBEntry * Find(const DBEntry *entry)
virtual DBEntry * GetFirst()
void ReleaseWalker(DBTable::DBTableWalkRef &walk)
DBTable::DBTableWalkRef AllocWalker(DBTable *table, DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete)
void WalkTable(DBTable::DBTableWalkRef walk)
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
void WalkAgain(DBTable::DBTableWalkRef walk)
std::list< Task * > worker_tasks_
std::atomic< uint16_t > pending_workers_
TableWalker(DBTable *table)
std::string Description() const
Gives a description of the task.
WalkWorker(TableWalker *walker, int db_partition_id)
DBTablePartition * tbl_partition_
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
std::unique_ptr< DBRequestKey > walk_ctx_
void WalkAgain(DBTableWalkRef walk)
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
virtual bool Delete(DBEntry *entry, const DBRequest *req)
std::unique_ptr< TableWalker > walker_
virtual std::unique_ptr< DBEntry > AllocEntry(const DBRequestKey *key) const =0
virtual bool OnChange(DBEntry *entry, const DBRequest *req)
virtual void Input(DBTablePartition *tbl_partition, DBClient *client, DBRequest *req)
int max_walk_iteration_to_yield_
static const int kIterationToYield
DBTableWalkRef AllocWalker(WalkFn walk_fn, WalkCompleteFn walk_complete)
virtual void Change(DBEntryBase *entry)
virtual size_t Size() const
int GetWalkIterationToYield()
virtual int PartitionCount() const
void WalkTable(DBTableWalkRef walk)
std::vector< DBTablePartition * > partitions_
DBTable::DBTableWalkRef walk_ref_
virtual DBTablePartition * AllocPartition(int index)
bool WalkCallback(DBTablePartBase *tpart, DBEntryBase *entry)
DBEntry * Find(const DBEntry *entry)
void ReleaseWalker(DBTableWalkRef &walk)
boost::function< void(DBTableWalkRef, DBTableBase *)> WalkCompleteFn
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
static void DBStateClear(DBTable *table, ListenerId id)
virtual DBEntry * Add(const DBRequest *req)
DBTable(DB *db, const std::string &name)
DBEntry * FindNoLock(const DBEntry *entry)
boost::intrusive_ptr< DBTableWalk > DBTableWalkRef
void WalkCompleteCallback(DBTableBase *tbl_base)
static void db_walker_wait()
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
virtual size_t Hash(const DBEntry *entry) const
DBTableWalkMgr * GetWalkMgr()
DBPartition * GetPartition(int index)
static int PartitionCount()
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a class to describe a computational task within OpenSDN control plane applications....
static size_t HashToPartition(size_t hash)
AtomicWithCopy(const AtomicWithCopy &other)
AtomicWithCopy & operator=(const AtomicWithCopy &other)
std::unique_ptr< DBRequestKey > key
void Swap(DBRequest *rhs)
std::unique_ptr< DBRequestData > data
#define CHECK_CONCURRENCY(...)
void STLDeleteValues(Container *container)