OpenSDN source code
db_table.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <vector>
6 #include <atomic>
7 
8 #include <tbb/spin_rw_mutex.h>
9 
10 #include <boost/bind/bind.hpp>
11 #include <boost/foreach.hpp>
12 #include <boost/dynamic_bitset.hpp>
13 #include <boost/type_traits.hpp>
14 
15 #include "base/compiler.h"
16 #include "base/logging.h"
17 #include "base/task_annotations.h"
18 #include "base/time_util.h"
19 #include "db/db.h"
20 #include "db/db_partition.h"
21 #include "db/db_table.h"
22 #include "db/db_table_partition.h"
23 #include "db/db_table_walk_mgr.h"
24 #include "db/db_types.h"
25 
26 class DBEntry;
27 class DBEntryBase;
28 
29 using namespace std;
30 using namespace boost::placeholders;
31 
32 DBRequest::DBRequest() : oper(static_cast<DBOperation>(0)) {
33 }
34 
36 #if defined(__GNUC__)
37 #if (__GNUC_PREREQ(4, 2) > 0)
40  assert(key_has_destructor && data_has_destructor);
41 #endif
42 #endif
43 }
44 
46  swap(oper, rhs->oper);
47  swap(key, rhs->key);
48  swap(data, rhs->data);
49 }
50 
51 // we need copy to be able to resize vector of atomics
52 // therefore we don't need the same value in both instances
53 template<typename _Tp>
54 struct AtomicWithCopy : public std::atomic<_Tp> {
55  // Inherit constructors
56  using std::atomic<_Tp>::atomic;
57 
58  // Bring in base class operators and methods
59  using std::atomic<_Tp>::operator=;
60  using std::atomic<_Tp>::load;
61  using std::atomic<_Tp>::store;
62 
63  // Custom constructor to handle copy from another AtomicWithCopy (must perform a load/store)
64  AtomicWithCopy(const AtomicWithCopy& other) : std::atomic<_Tp>(other.load()) {}
65 
66  // Custom assignment operator
68  this->store(other.load());
69  return *this;
70  }
71 };
72 
74 public:
75  typedef vector<ChangeCallback> CallbackList;
76  typedef vector<string> NameList;
77  typedef vector<AtomicWithCopy<uint64_t>> StateCountList;
78 
79  explicit ListenerInfo(const string &table_name) :
80  db_state_accounting_(true) {
81  if (table_name.find("__ifmap_") != string::npos) {
82  // TODO need to have unconditional DB state accounting
83  // for now skipp DB State accounting for ifmap tables
84  db_state_accounting_ = false;
85  }
86  }
87 
89  const string &name) {
90  tbb::spin_rw_mutex::scoped_lock write_lock(rw_mutex_, true);
91  size_t i = bmap_.find_first();
92  if (i == bmap_.npos) {
93  i = callbacks_.size();
94  callbacks_.push_back(callback);
95  names_.push_back(name);
96  state_count_.resize(i + 1);
98  } else {
99  bmap_.reset(i);
100  if (bmap_.none()) {
101  bmap_.clear();
102  }
103  callbacks_[i] = callback;
104  names_[i] = name;
105  state_count_[i] = 0;
106  }
107  return i;
108  }
109 
110  void Unregister(ListenerId listener) {
111  tbb::spin_rw_mutex::scoped_lock write_lock(rw_mutex_, true);
112  callbacks_[listener] = NULL;
113  names_[listener] = "";
114  // During Unregister Listener should have cleaned up,
115  // DB states from all the entries in this table.
116  assert(state_count_[listener] == 0);
117  if ((size_t) listener == callbacks_.size() - 1) {
118  while (!callbacks_.empty() && callbacks_.back() == NULL) {
119  callbacks_.pop_back();
120  names_.pop_back();
121  state_count_.pop_back();
122  }
123  if (bmap_.size() > callbacks_.size()) {
124  bmap_.resize(callbacks_.size());
125  }
126  } else {
127  if ((size_t) listener >= bmap_.size()) {
128  bmap_.resize(listener + 1);
129  }
130  bmap_.set(listener);
131  }
132  }
133 
134  // concurrency: called from DBPartition task.
135  void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry) {
136  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
137  for (CallbackList::iterator iter = callbacks_.begin();
138  iter != callbacks_.end(); ++iter) {
139  if (*iter != NULL) {
140  ChangeCallback cb = *iter;
141  (cb)(tpart, entry);
142  }
143  }
144  }
145 
146  void AddToDBStateCount(ListenerId listener, int count) {
147  if (db_state_accounting_ && listener != DBTableBase::kInvalidId) {
148  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
149  state_count_[listener] += count;
150  }
151  }
152 
153  uint64_t GetDBStateCount(ListenerId listener) {
154  assert(db_state_accounting_ && listener != DBTableBase::kInvalidId);
155  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
156  return state_count_[listener];
157  }
158 
159  void FillListeners(vector<ShowTableListener> *listeners) const {
160  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
161  ListenerId id = 0;
162  for (CallbackList::const_iterator iter = callbacks_.begin();
163  iter != callbacks_.end(); ++iter, ++id) {
164  if (*iter != NULL) {
165  ShowTableListener item;
166  item.id = id;
167  item.name = names_[id];
168  item.state_count = state_count_[id];
169  listeners->push_back(item);
170  }
171  }
172  }
173 
174  bool empty() const {
175  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
176  return callbacks_.empty();
177  }
178 
179  size_t size() const {
180  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
181  return (callbacks_.size() - bmap_.count());
182  }
183 
184 private:
189  mutable tbb::spin_rw_mutex rw_mutex_;
190  boost::dynamic_bitset<> bmap_; // free list.
191 };
192 
193 DBTableBase::DBTableBase(DB *db, const string &name)
194  : db_(db), name_(name), info_(new ListenerInfo(name)),
195  enqueue_count_(0), input_count_(0), notify_count_(0) {
196  walker_count_ = 0;
199  walk_cancel_count_ = 0;
200  walk_again_count_ = 0;
201  walk_count_ = 0;
202 }
203 
205 }
206 
208  const string &name) {
209  return info_->Register(callback, name);
210 }
211 
213  info_->Unregister(listener);
214  // If a table is marked for deletion, then we may trigger the deletion
215  // process when the last client is removed
216  if (info_->empty())
217  RetryDelete();
218 }
219 
221  DBTablePartBase *tpart = GetTablePartition(req->key.get());
222  DBPartition *partition = db_->GetPartition(tpart->index());
223  enqueue_count_++;
224  return partition->EnqueueRequest(tpart, NULL, req);
225 }
226 
228  DBTablePartBase *tpart = GetTablePartition(db_entry);
229  DBPartition *partition = db_->GetPartition(tpart->index());
230  partition->EnqueueRemove(tpart, db_entry);
231 }
232 
234  notify_count_++;
235  info_->RunNotify(tpart, entry);
236 }
237 
238 void DBTableBase::AddToDBStateCount(ListenerId listener, int count) {
239  info_->AddToDBStateCount(listener, count);
240 }
241 
243  return info_->GetDBStateCount(listener);
244 }
245 
247  if (HasListeners()) {
248  return false;
249  }
250  if (HasWalkers()) {
251  return false;
252  }
253  if (!empty()) {
254  return false;
255  }
256 
257  return true;
258 }
259 
261  return !info_->empty();
262 }
263 
265  return info_->size();
266 }
267 
268 void DBTableBase::FillListeners(vector<ShowTableListener> *listeners) const {
269  info_->FillListeners(listeners);
270 }
271 
272 class DBTable::WalkWorker : public Task {
273 public:
274  WalkWorker(TableWalker *walker, int db_partition_id);
275 
276  virtual bool Run();
277 
278  std::string Description() const { return "DBTable::WalkWorker"; }
279 
280 private:
281  // Store the last visited node to continue walk
282  std::unique_ptr<DBRequestKey> walk_ctx_;
283 
284  // Table partition for which this worker was created
286 
288 };
289 
291 public:
293  pending_workers_ = 0;
294  }
295 
296  void StartWalk();
297 
299  return table_;
300  }
301 
302  void ClearWalkWorks() {
303  worker_tasks_.clear();
304  }
305 
307  // check whether iteration is completed on all Table Partition
308  std::atomic<uint16_t> pending_workers_;
309  // For debugging purpose. Few of the tasks in this list could has finished
310  // executing and destroyed. List of workers are useful in debugging with
311  // gdb/gcore to see the current state of the walk and walk_context
312  std::list<Task *> worker_tasks_;
313 };
314 
316  int count = 0;
317  DBRequestKey *key_resume = walk_ctx_.get();
318  DBTable *table = walker_->table();
319  int max_walk_entry_count = table->GetWalkIterationToYield();
320  DBEntry *entry;
321 
322  if (key_resume != NULL) {
323  std::unique_ptr<const DBEntryBase> start;
324  start = table->AllocEntry(key_resume);
325  // Find matching or next in sort order
326  entry = tbl_partition_->lower_bound(start.get());
327  } else {
328  entry = tbl_partition_->GetFirst();
329  }
330  if (entry == NULL) {
331  goto walk_done;
332  }
333 
334  for (DBEntry *next = NULL; entry; entry = next) {
335  next = tbl_partition_->GetNext(entry);
336  if (count == max_walk_entry_count) {
337  // store the context
338  walk_ctx_ = entry->GetDBRequestKey();
339  return false;
340  }
341 
342  // Invoke walker function
343  bool more = table->InvokeWalkCb(tbl_partition_, entry);
344  if (!more) {
345  break;
346  }
347 
348  db_walker_wait();
349  count++;
350  }
351 
352 walk_done:
353  // Check whether all other walks on the table is completed
354  long num_walkers_on_tpart = walker_->pending_workers_.fetch_sub(1);
355  if (num_walkers_on_tpart == 1) {
356  table->WalkDone();
357  }
358  return true;
359 }
360 
361 DBTable::WalkWorker::WalkWorker(TableWalker *walker, int db_partition_id)
362  : Task(walker->table()->GetWalkerTaskId(), db_partition_id), walker_(walker) {
363  tbl_partition_ = static_cast<DBTablePartition *>
364  (walker_->table()->GetTablePartition(db_partition_id));
365 }
366 
368  CHECK_CONCURRENCY("db::Walker");
369  assert(pending_workers_ == 0);
370  for (int i = 0; i < table_->PartitionCount(); i++) {
371  DBTablePartition *partition = static_cast<DBTablePartition *>(
372  table_->GetTablePartition(i));
373  if (!partition->size()) continue;
374  worker_tasks_.push_back(new WalkWorker(this, i));
375  pending_workers_++;
376  }
377  if (pending_workers_ == 0) {
378  table_->WalkDone();
379  } else {
381  for (auto *task : worker_tasks_) scheduler->Enqueue(task);
382  }
383 }
384 
386 // Implementation of DBTable methods
388 DBTable::DBTable(DB *db, const string &name)
389  : DBTableBase(db, name),
390  walker_(new TableWalker(this)),
391  walker_task_id_(db->task_id()) {
392 
393  static bool init_ = false;
394  static int iter_to_yield_env_ = 0;
395 
396  if (!init_) {
397  // XXX To be used for testing purposes only.
398  char *count_str = getenv("DB_ITERATION_TO_YIELD");
399  if (count_str) {
400  iter_to_yield_env_ = strtol(count_str, NULL, 0);
401  } else {
402  iter_to_yield_env_ = kIterationToYield;
403  }
404  init_ = true;
405  }
406  max_walk_iteration_to_yield_ = iter_to_yield_env_;
407 }
408 
411 }
412 
414  for (int i = 0; i < PartitionCount(); i++) {
415  partitions_.push_back(AllocPartition(i));
416  }
417 }
418 
420  return new DBTablePartition(this, index);
421 }
422 
424  CHECK_CONCURRENCY("db::Walker");
425  incr_walk_count();
426  walker_->StartWalk();
427 }
428 
430  return AllocEntry(req->key.get()).release();
431 }
432 
434  DBTablePartBase *tpart = GetTablePartition(entry);
435  tpart->Notify(entry);
436 }
437 
438 bool DBTable::OnChange(DBEntry *entry, const DBRequest *req) {
439  return true;
440 }
441 
442 bool DBTable::Delete(DBEntry *entry, const DBRequest *req) {
443  return true;
444 }
445 
447  return DB::PartitionCount();
448 }
449 
450 static size_t HashToPartition(size_t hash) {
451  return hash % DB::PartitionCount();
452 }
453 
455  return partitions_[index];
456 }
457 
458 const DBTablePartBase *DBTable::GetTablePartition(const int index) const {
459  return partitions_[index];
460 }
461 
463  int id = HashToPartition(Hash(key));
464  return GetTablePartition(id);
465 }
466 
468  const DBRequestKey *key) const {
469  int id = HashToPartition(Hash(key));
470  return GetTablePartition(id);
471 }
472 
474  const DBEntry *gentry = static_cast<const DBEntry *>(entry);
475  size_t id = HashToPartition(Hash(gentry));
476  return GetTablePartition(id);
477 }
478 
480  const DBEntryBase *entry) const {
481  const DBEntry *gentry = static_cast<const DBEntry *>(entry);
482  size_t id = HashToPartition(Hash(gentry));
483  return GetTablePartition(id);
484 }
485 
486 // Find DB Entry without taking lock. Calling routine must ensure its
487 // running in exclusion with DB task
489  size_t id = HashToPartition(Hash(entry));
490  DBTablePartition *tbl_partition =
491  static_cast<DBTablePartition *>(GetTablePartition(id));
492  return tbl_partition->FindNoLock(entry);
493 }
494 
495 DBEntry *DBTable::Find(const DBEntry *entry) {
496  size_t id = HashToPartition(Hash(entry));
497  DBTablePartition *tbl_partition =
498  static_cast<DBTablePartition *>(GetTablePartition(id));
499  return tbl_partition->Find(entry);
500 }
501 
502 const DBEntry *DBTable::Find(const DBEntry *entry) const {
503  return const_cast<DBTable *>(this)->Find(entry);
504 }
505 
506 // Find DB Entry without taking lock. Calling routine must ensure its
507 // running in exclusion with DB task
509  int id = HashToPartition(Hash(key));
510  DBTablePartition *tbl_partition =
511  static_cast<DBTablePartition *>(GetTablePartition(id));
512  return tbl_partition->FindNoLock(key);
513 }
514 
515 DBEntry *DBTable::Find(const DBRequestKey *key, int id) {
516  if (id == -1)
517  id = HashToPartition(Hash(key));
518  DBTablePartition *tbl_partition =
519  static_cast<DBTablePartition *>(GetTablePartition(id));
520  return tbl_partition->Find(key);
521 }
522 
523 const DBEntry *DBTable::Find(const DBRequestKey *key, int id) const {
524  return const_cast<DBTable *>(this)->Find(key, id);
525 }
526 
527 //
528 // Concurrency: called from task that's mutually exclusive with db::DBTable
529 // or db::IFMapTable as applicable.
530 //
531 // Calculate the size across all partitions.
532 //
533 size_t DBTable::Size() const {
534  size_t total = 0;
535  for (vector<DBTablePartition *>::const_iterator iter = partitions_.begin();
536  iter != partitions_.end(); iter++) {
537  total += (*iter)->size();
538  }
539  return total;
540 }
541 
542 void DBTable::Input(DBTablePartition *tbl_partition, DBClient *client,
543  DBRequest *req) {
544  DBRequestKey *key =
545  static_cast<DBRequestKey *>(req->key.get());
546  DBEntry *entry = NULL;
547 
548  entry = tbl_partition->Find(key);
549  if (req->oper == DBRequest::DB_ENTRY_ADD_CHANGE) {
550  if (entry) {
551  if (OnChange(entry, req) || entry->IsDeleted()) {
552  // The entry may currently be marked as deleted.
553  entry->ClearDelete();
554  tbl_partition->Change(entry);
555  }
556  } else {
557  if ((entry = Add(req)) != NULL) {
558  tbl_partition->Add(entry);
559  }
560  }
561  } else if (req->oper == DBRequest::DB_ENTRY_DELETE) {
562  if (entry) {
563  if (Delete(entry, req)) {
564  tbl_partition->Delete(entry);
565  }
566  }
567  } else if (req->oper == DBRequest::DB_ENTRY_NOTIFY) {
568  if (entry) {
569  tbl_partition->Notify(entry);
570  }
571  } else {
572  assert(0);
573  }
574 }
575 
577  DBEntryBase *next = NULL;
578 
579  for (int i = 0; i < table->PartitionCount(); ++i) {
580  DBTablePartition *partition = static_cast<DBTablePartition *>(
581  table->GetTablePartition(i));
582 
583  for (DBEntryBase *entry = partition->GetFirst(); entry; entry = next) {
584  next = partition->GetNext(entry);
585  DBState *state = entry->GetState(table, id);
586  if (state) {
587  entry->ClearState(table, id);
588  delete state;
589  }
590  }
591  }
592 }
593 
594 //
595 // Callback for table walk triggered by NotifyAllEntries.
596 //
598  tpart->Notify(entry);
599  return true;
600 }
601 
602 //
603 // Callback for completion of table walk triggered by NotifyAllEntries.
604 //
606  walk_ref_.reset();
607 }
608 
609 //
610 // Concurrency: called from task that's mutually exclusive with db::DBTable
611 // or db::IFMapTable as applicable.
612 //
613 // Trigger notification of all entries to all listeners.
614 // Should be used sparingly e.g. to handle significant configuration change.
615 //
616 // The walk callback just turns around and puts the DBentryBase on the change
617 // list.
618 //
619 // If the walk is already running, it is allowed to complete and WalkAgain API
620 // is invoked to trigger walk on current walk completion.
621 //
623  CHECK_CONCURRENCY("bgp::Config", "bgp::ConfigHelper", "bgp::RTFilter",
624  "db::DBTable");
625 
626  if (empty())
627  return;
628 
629  if (walk_ref_ == NULL) {
630  walk_ref_ =
631  AllocWalker(boost::bind(&DBTable::WalkCallback, this, _1, _2),
632  boost::bind(&DBTable::WalkCompleteCallback, this, _2));
634  } else {
636  }
637 }
638 
640  WalkCompleteFn walk_complete) {
641  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
642  return walk_mgr->AllocWalker(this, walk_fn, walk_complete);
643 }
644 
646  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
647  walk_mgr->ReleaseWalker(walk);
648  return;
649 }
650 
652  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
653  walk_mgr->WalkTable(walk);
654  return;
655 }
656 
658  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
659  walk_mgr->WalkAgain(walk);
660  return;
661 }
662 
664  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
665  return walk_mgr->InvokeWalkCb(part, entry);
666 }
667 
670  walker_->ClearWalkWorks();
671  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
672  return walk_mgr->WalkDone();
673 }
void ClearDelete()
Definition: db_entry.h:47
bool IsDeleted() const
Definition: db_entry.h:48
virtual KeyPtr GetDBRequestKey() const =0
void EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry)
bool EnqueueRequest(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
StateCountList state_count_
Definition: db_table.cc:188
void AddToDBStateCount(ListenerId listener, int count)
Definition: db_table.cc:146
vector< string > NameList
Definition: db_table.cc:76
boost::dynamic_bitset bmap_
Definition: db_table.cc:190
DBTableBase::ListenerId Register(ChangeCallback callback, const string &name)
Definition: db_table.cc:88
void FillListeners(vector< ShowTableListener > *listeners) const
Definition: db_table.cc:159
CallbackList callbacks_
Definition: db_table.cc:186
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
Definition: db_table.cc:135
uint64_t GetDBStateCount(ListenerId listener)
Definition: db_table.cc:153
ListenerInfo(const string &table_name)
Definition: db_table.cc:79
size_t size() const
Definition: db_table.cc:179
void Unregister(ListenerId listener)
Definition: db_table.cc:110
tbb::spin_rw_mutex rw_mutex_
Definition: db_table.cc:189
vector< AtomicWithCopy< uint64_t > > StateCountList
Definition: db_table.cc:77
vector< ChangeCallback > CallbackList
Definition: db_table.cc:75
std::atomic< uint64_t > walk_again_count_
Definition: db_table.h:157
DB * db_
Definition: db_table.h:145
bool Enqueue(DBRequest *req)
Definition: db_table.cc:220
void AddToDBStateCount(ListenerId listener, int count)
Definition: db_table.cc:238
size_t GetListenerCount() const
Definition: db_table.cc:264
std::atomic< uint64_t > walker_count_
Definition: db_table.h:152
bool HasWalkers() const
Definition: db_table.h:128
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
Definition: db_table.cc:233
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)=0
boost::function< void(DBTablePartBase *, DBEntryBase *)> ChangeCallback
Definition: db_table.h:61
virtual void RetryDelete()
Definition: db_table.h:104
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
Definition: db_table.cc:207
uint64_t enqueue_count_
Definition: db_table.h:149
void EnqueueRemove(DBEntryBase *db_entry)
Definition: db_table.cc:227
std::atomic< uint64_t > walk_request_count_
Definition: db_table.h:154
uint64_t GetDBStateCount(ListenerId listener)
Definition: db_table.cc:242
bool empty() const
Definition: db_table.h:101
std::unique_ptr< ListenerInfo > info_
Definition: db_table.h:148
std::atomic< uint64_t > walk_complete_count_
Definition: db_table.h:155
bool HasListeners() const
Definition: db_table.cc:260
void incr_walk_count()
Definition: db_table.h:142
virtual bool MayDelete() const
Definition: db_table.cc:246
uint64_t notify_count_
Definition: db_table.h:151
void incr_walk_complete_count()
Definition: db_table.h:139
std::atomic< uint64_t > walk_cancel_count_
Definition: db_table.h:156
static const int kInvalidId
Definition: db_table.h:64
void Unregister(ListenerId listener)
Definition: db_table.cc:212
int ListenerId
Definition: db_table.h:62
void FillListeners(std::vector< ShowTableListener > *listeners) const
Definition: db_table.cc:268
DB * database()
Definition: db_table.h:107
virtual ~DBTableBase()
Definition: db_table.cc:204
DBTableBase(DB *db, const std::string &name)
Definition: db_table.cc:193
const std::string & name() const
Definition: db_table.h:110
std::atomic< uint64_t > walk_count_
Definition: db_table.h:153
void Notify(DBEntryBase *entry)
void Delete(DBEntryBase *)
virtual void Change(DBEntry *entry)
size_t size() const
virtual void Add(DBEntry *entry)
DBEntry * FindNoLock(const DBEntry *entry)
virtual DBEntry * GetNext(const DBEntryBase *entry)
virtual DBEntry * lower_bound(const DBEntryBase *entry)
DBEntry * Find(const DBEntry *entry)
virtual DBEntry * GetFirst()
void ReleaseWalker(DBTable::DBTableWalkRef &walk)
DBTable::DBTableWalkRef AllocWalker(DBTable *table, DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete)
void WalkTable(DBTable::DBTableWalkRef walk)
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
void WalkAgain(DBTable::DBTableWalkRef walk)
DBTable * table()
Definition: db_table.cc:298
std::list< Task * > worker_tasks_
Definition: db_table.cc:312
std::atomic< uint16_t > pending_workers_
Definition: db_table.cc:308
TableWalker(DBTable *table)
Definition: db_table.cc:292
std::string Description() const
Gives a description of the task.
Definition: db_table.cc:278
WalkWorker(TableWalker *walker, int db_partition_id)
Definition: db_table.cc:361
DBTablePartition * tbl_partition_
Definition: db_table.cc:285
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
Definition: db_table.cc:315
TableWalker * walker_
Definition: db_table.cc:287
std::unique_ptr< DBRequestKey > walk_ctx_
Definition: db_table.cc:282
void WalkAgain(DBTableWalkRef walk)
Definition: db_table.cc:657
int walker_task_id_
Definition: db_table.h:353
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
Definition: db_table.cc:663
virtual bool Delete(DBEntry *entry, const DBRequest *req)
Definition: db_table.cc:442
std::unique_ptr< TableWalker > walker_
Definition: db_table.h:350
virtual std::unique_ptr< DBEntry > AllocEntry(const DBRequestKey *key) const =0
virtual bool OnChange(DBEntry *entry, const DBRequest *req)
Definition: db_table.cc:438
void WalkDone()
Definition: db_table.cc:668
void StartWalk()
Definition: db_table.cc:423
virtual void Input(DBTablePartition *tbl_partition, DBClient *client, DBRequest *req)
Definition: db_table.cc:542
void Init()
Definition: db_table.cc:413
int max_walk_iteration_to_yield_
Definition: db_table.h:354
static const int kIterationToYield
Definition: db_table.h:181
DBTableWalkRef AllocWalker(WalkFn walk_fn, WalkCompleteFn walk_complete)
Definition: db_table.cc:639
int GetWalkerTaskId()
Definition: db_table.h:299
void NotifyAllEntries()
Definition: db_table.cc:622
virtual void Change(DBEntryBase *entry)
Definition: db_table.cc:433
virtual size_t Size() const
Definition: db_table.cc:533
int GetWalkIterationToYield()
Definition: db_table.h:291
virtual int PartitionCount() const
Definition: db_table.cc:446
void WalkTable(DBTableWalkRef walk)
Definition: db_table.cc:651
std::vector< DBTablePartition * > partitions_
Definition: db_table.h:351
DBTable::DBTableWalkRef walk_ref_
Definition: db_table.h:352
virtual ~DBTable()
Definition: db_table.cc:409
virtual DBTablePartition * AllocPartition(int index)
Definition: db_table.cc:419
bool WalkCallback(DBTablePartBase *tpart, DBEntryBase *entry)
Definition: db_table.cc:597
DBEntry * Find(const DBEntry *entry)
Definition: db_table.cc:495
void ReleaseWalker(DBTableWalkRef &walk)
Definition: db_table.cc:645
boost::function< void(DBTableWalkRef, DBTableBase *)> WalkCompleteFn
Definition: db_table.h:179
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
Definition: db_table.cc:462
static void DBStateClear(DBTable *table, ListenerId id)
Definition: db_table.cc:576
virtual DBEntry * Add(const DBRequest *req)
Definition: db_table.cc:429
DBTable(DB *db, const std::string &name)
Definition: db_table.cc:388
DBEntry * FindNoLock(const DBEntry *entry)
Definition: db_table.cc:488
boost::intrusive_ptr< DBTableWalk > DBTableWalkRef
Definition: db_table.h:169
void WalkCompleteCallback(DBTableBase *tbl_base)
Definition: db_table.cc:605
static void db_walker_wait()
Definition: db_table.h:308
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
Definition: db_table.h:176
virtual size_t Hash(const DBEntry *entry) const
Definition: db_table.h:195
Definition: db.h:24
DBTableWalkMgr * GetWalkMgr()
Definition: db.h:51
DBPartition * GetPartition(int index)
Definition: db.cc:60
static int PartitionCount()
Definition: db.cc:32
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
static size_t HashToPartition(size_t hash)
Definition: db_table.cc:450
uint8_t type
Definition: load_balance.h:2
AtomicWithCopy(const AtomicWithCopy &other)
Definition: db_table.cc:64
AtomicWithCopy & operator=(const AtomicWithCopy &other)
Definition: db_table.cc:67
DBRequest()
Definition: db_table.cc:32
~DBRequest()
Definition: db_table.cc:35
DBOperation oper
Definition: db_table.h:42
@ DB_ENTRY_NOTIFY
Definition: db_table.h:40
@ DB_ENTRY_DELETE
Definition: db_table.h:39
@ DB_ENTRY_ADD_CHANGE
Definition: db_table.h:38
std::unique_ptr< DBRequestKey > key
Definition: db_table.h:48
void Swap(DBRequest *rhs)
Definition: db_table.cc:45
std::unique_ptr< DBRequestData > data
Definition: db_table.h:49
#define CHECK_CONCURRENCY(...)
struct task_ task
void STLDeleteValues(Container *container)
Definition: util.h:101