OpenSDN source code
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
db_table.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <vector>
6 #include <tbb/atomic.h>
7 #include <tbb/spin_rw_mutex.h>
8 
9 #include <boost/bind.hpp>
10 #include <boost/foreach.hpp>
11 #include <boost/dynamic_bitset.hpp>
12 #include <boost/type_traits.hpp>
13 
14 #include "base/compiler.h"
15 #include "base/logging.h"
16 #include "base/task_annotations.h"
17 #include "base/time_util.h"
18 #include "db/db.h"
19 #include "db/db_partition.h"
20 #include "db/db_table.h"
21 #include "db/db_table_partition.h"
22 #include "db/db_table_walk_mgr.h"
23 #include "db/db_types.h"
24 
25 class DBEntry;
26 class DBEntryBase;
27 
28 using namespace std;
29 
30 DBRequest::DBRequest() : oper(static_cast<DBOperation>(0)) {
31 }
32 
34 #if defined(__GNUC__)
35 #if (__GNUC_PREREQ(4, 2) > 0)
38  assert(key_has_destructor && data_has_destructor);
39 #endif
40 #endif
41 }
42 
44  swap(oper, rhs->oper);
45  swap(key, rhs->key);
46  swap(data, rhs->data);
47 }
48 
50 public:
51  typedef vector<ChangeCallback> CallbackList;
52  typedef vector<string> NameList;
53  typedef vector<tbb::atomic<uint64_t> > StateCountList;
54 
55  explicit ListenerInfo(const string &table_name) :
56  db_state_accounting_(true) {
57  if (table_name.find("__ifmap_") != string::npos) {
58  // TODO need to have unconditional DB state accounting
59  // for now skipp DB State accounting for ifmap tables
60  db_state_accounting_ = false;
61  }
62  }
63 
65  const string &name) {
66  tbb::spin_rw_mutex::scoped_lock write_lock(rw_mutex_, true);
67  size_t i = bmap_.find_first();
68  if (i == bmap_.npos) {
69  i = callbacks_.size();
70  callbacks_.push_back(callback);
71  names_.push_back(name);
72  state_count_.resize(i + 1);
73  state_count_[i] = 0;
74  } else {
75  bmap_.reset(i);
76  if (bmap_.none()) {
77  bmap_.clear();
78  }
79  callbacks_[i] = callback;
80  names_[i] = name;
81  state_count_[i] = 0;
82  }
83  return i;
84  }
85 
86  void Unregister(ListenerId listener) {
87  tbb::spin_rw_mutex::scoped_lock write_lock(rw_mutex_, true);
88  callbacks_[listener] = NULL;
89  names_[listener] = "";
90  // During Unregister Listener should have cleaned up,
91  // DB states from all the entries in this table.
92  assert(state_count_[listener] == 0);
93  if ((size_t) listener == callbacks_.size() - 1) {
94  while (!callbacks_.empty() && callbacks_.back() == NULL) {
95  callbacks_.pop_back();
96  names_.pop_back();
97  state_count_.pop_back();
98  }
99  if (bmap_.size() > callbacks_.size()) {
100  bmap_.resize(callbacks_.size());
101  }
102  } else {
103  if ((size_t) listener >= bmap_.size()) {
104  bmap_.resize(listener + 1);
105  }
106  bmap_.set(listener);
107  }
108  }
109 
110  // concurrency: called from DBPartition task.
111  void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry) {
112  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
113  for (CallbackList::iterator iter = callbacks_.begin();
114  iter != callbacks_.end(); ++iter) {
115  if (*iter != NULL) {
116  ChangeCallback cb = *iter;
117  (cb)(tpart, entry);
118  }
119  }
120  }
121 
122  void AddToDBStateCount(ListenerId listener, int count) {
123  if (db_state_accounting_ && listener != DBTableBase::kInvalidId) {
124  state_count_[listener] += count;
125  }
126  }
127 
128  uint64_t GetDBStateCount(ListenerId listener) {
129  assert(db_state_accounting_ && listener != DBTableBase::kInvalidId);
130  return state_count_[listener];
131  }
132 
133  void FillListeners(vector<ShowTableListener> *listeners) const {
134  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
135  ListenerId id = 0;
136  for (CallbackList::const_iterator iter = callbacks_.begin();
137  iter != callbacks_.end(); ++iter, ++id) {
138  if (*iter != NULL) {
139  ShowTableListener item;
140  item.id = id;
141  item.name = names_[id];
142  item.state_count = state_count_[id];
143  listeners->push_back(item);
144  }
145  }
146  }
147 
148  bool empty() const {
149  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
150  return callbacks_.empty();
151  }
152 
153  size_t size() const {
154  tbb::spin_rw_mutex::scoped_lock read_lock(rw_mutex_, false);
155  return (callbacks_.size() - bmap_.count());
156  }
157 
158 private:
163  mutable tbb::spin_rw_mutex rw_mutex_;
164  boost::dynamic_bitset<> bmap_; // free list.
165 };
166 
167 DBTableBase::DBTableBase(DB *db, const string &name)
168  : db_(db), name_(name), info_(new ListenerInfo(name)),
169  enqueue_count_(0), input_count_(0), notify_count_(0) {
170  walker_count_ = 0;
173  walk_cancel_count_ = 0;
174  walk_again_count_ = 0;
175  walk_count_ = 0;
176 }
177 
179 }
180 
182  const string &name) {
183  return info_->Register(callback, name);
184 }
185 
187  info_->Unregister(listener);
188  // If a table is marked for deletion, then we may trigger the deletion
189  // process when the last client is removed
190  if (info_->empty())
191  RetryDelete();
192 }
193 
195  DBTablePartBase *tpart = GetTablePartition(req->key.get());
196  DBPartition *partition = db_->GetPartition(tpart->index());
197  enqueue_count_++;
198  return partition->EnqueueRequest(tpart, NULL, req);
199 }
200 
202  DBTablePartBase *tpart = GetTablePartition(db_entry);
203  DBPartition *partition = db_->GetPartition(tpart->index());
204  partition->EnqueueRemove(tpart, db_entry);
205 }
206 
208  notify_count_++;
209  info_->RunNotify(tpart, entry);
210 }
211 
212 void DBTableBase::AddToDBStateCount(ListenerId listener, int count) {
213  info_->AddToDBStateCount(listener, count);
214 }
215 
217  return info_->GetDBStateCount(listener);
218 }
219 
221  if (HasListeners()) {
222  return false;
223  }
224  if (HasWalkers()) {
225  return false;
226  }
227  if (!empty()) {
228  return false;
229  }
230 
231  return true;
232 }
233 
235  return !info_->empty();
236 }
237 
239  return info_->size();
240 }
241 
242 void DBTableBase::FillListeners(vector<ShowTableListener> *listeners) const {
243  info_->FillListeners(listeners);
244 }
245 
246 class DBTable::WalkWorker : public Task {
247 public:
248  WalkWorker(TableWalker *walker, int db_partition_id);
249 
250  virtual bool Run();
251 
252  std::string Description() const { return "DBTable::WalkWorker"; }
253 
254 private:
255  // Store the last visited node to continue walk
256  std::unique_ptr<DBRequestKey> walk_ctx_;
257 
258  // Table partition for which this worker was created
260 
262 };
263 
265 public:
267  pending_workers_ = 0;
268  }
269 
270  void StartWalk();
271 
273  return table_;
274  }
275 
276  void ClearWalkWorks() {
277  worker_tasks_.clear();
278  }
279 
281  // check whether iteration is completed on all Table Partition
282  tbb::atomic<uint16_t> pending_workers_;
283  // For debugging purpose. Few of the tasks in this list could has finished
284  // executing and destroyed. List of workers are useful in debugging with
285  // gdb/gcore to see the current state of the walk and walk_context
286  std::list<Task *> worker_tasks_;
287 };
288 
290  int count = 0;
291  DBRequestKey *key_resume = walk_ctx_.get();
292  DBTable *table = walker_->table();
293  int max_walk_entry_count = table->GetWalkIterationToYield();
294  DBEntry *entry;
295 
296  if (key_resume != NULL) {
297  std::unique_ptr<const DBEntryBase> start;
298  start = table->AllocEntry(key_resume);
299  // Find matching or next in sort order
300  entry = tbl_partition_->lower_bound(start.get());
301  } else {
302  entry = tbl_partition_->GetFirst();
303  }
304  if (entry == NULL) {
305  goto walk_done;
306  }
307 
308  for (DBEntry *next = NULL; entry; entry = next) {
309  next = tbl_partition_->GetNext(entry);
310  if (count == max_walk_entry_count) {
311  // store the context
312  walk_ctx_ = entry->GetDBRequestKey();
313  return false;
314  }
315 
316  // Invoke walker function
317  bool more = table->InvokeWalkCb(tbl_partition_, entry);
318  if (!more) {
319  break;
320  }
321 
322  db_walker_wait();
323  count++;
324  }
325 
326 walk_done:
327  // Check whether all other walks on the table is completed
328  long num_walkers_on_tpart = walker_->pending_workers_.fetch_and_decrement();
329  if (num_walkers_on_tpart == 1) {
330  table->WalkDone();
331  }
332  return true;
333 }
334 
335 DBTable::WalkWorker::WalkWorker(TableWalker *walker, int db_partition_id)
336  : Task(walker->table()->GetWalkerTaskId(), db_partition_id), walker_(walker) {
337  tbl_partition_ = static_cast<DBTablePartition *>
338  (walker_->table()->GetTablePartition(db_partition_id));
339 }
340 
342  CHECK_CONCURRENCY("db::Walker");
343  assert(pending_workers_ == 0);
344  for (int i = 0; i < table_->PartitionCount(); i++) {
345  DBTablePartition *partition = static_cast<DBTablePartition *>(
346  table_->GetTablePartition(i));
347  if (!partition->size()) continue;
348  worker_tasks_.push_back(new WalkWorker(this, i));
349  pending_workers_++;
350  }
351  if (pending_workers_ == 0) {
352  table_->WalkDone();
353  } else {
355  for (auto *task : worker_tasks_) scheduler->Enqueue(task);
356  }
357 }
358 
360 // Implementation of DBTable methods
362 DBTable::DBTable(DB *db, const string &name)
363  : DBTableBase(db, name),
364  walker_(new TableWalker(this)),
365  walker_task_id_(db->task_id()) {
366 
367  static bool init_ = false;
368  static int iter_to_yield_env_ = 0;
369 
370  if (!init_) {
371  // XXX To be used for testing purposes only.
372  char *count_str = getenv("DB_ITERATION_TO_YIELD");
373  if (count_str) {
374  iter_to_yield_env_ = strtol(count_str, NULL, 0);
375  } else {
376  iter_to_yield_env_ = kIterationToYield;
377  }
378  init_ = true;
379  }
380  max_walk_iteration_to_yield_ = iter_to_yield_env_;
381 }
382 
385 }
386 
388  for (int i = 0; i < PartitionCount(); i++) {
389  partitions_.push_back(AllocPartition(i));
390  }
391 }
392 
394  return new DBTablePartition(this, index);
395 }
396 
398  CHECK_CONCURRENCY("db::Walker");
399  incr_walk_count();
400  walker_->StartWalk();
401 }
402 
404  return AllocEntry(req->key.get()).release();
405 }
406 
408  DBTablePartBase *tpart = GetTablePartition(entry);
409  tpart->Notify(entry);
410 }
411 
412 bool DBTable::OnChange(DBEntry *entry, const DBRequest *req) {
413  return true;
414 }
415 
416 bool DBTable::Delete(DBEntry *entry, const DBRequest *req) {
417  return true;
418 }
419 
421  return DB::PartitionCount();
422 }
423 
424 static size_t HashToPartition(size_t hash) {
425  return hash % DB::PartitionCount();
426 }
427 
429  return partitions_[index];
430 }
431 
432 const DBTablePartBase *DBTable::GetTablePartition(const int index) const {
433  return partitions_[index];
434 }
435 
437  int id = HashToPartition(Hash(key));
438  return GetTablePartition(id);
439 }
440 
442  const DBRequestKey *key) const {
443  int id = HashToPartition(Hash(key));
444  return GetTablePartition(id);
445 }
446 
448  const DBEntry *gentry = static_cast<const DBEntry *>(entry);
449  size_t id = HashToPartition(Hash(gentry));
450  return GetTablePartition(id);
451 }
452 
454  const DBEntryBase *entry) const {
455  const DBEntry *gentry = static_cast<const DBEntry *>(entry);
456  size_t id = HashToPartition(Hash(gentry));
457  return GetTablePartition(id);
458 }
459 
460 // Find DB Entry without taking lock. Calling routine must ensure its
461 // running in exclusion with DB task
463  size_t id = HashToPartition(Hash(entry));
464  DBTablePartition *tbl_partition =
465  static_cast<DBTablePartition *>(GetTablePartition(id));
466  return tbl_partition->FindNoLock(entry);
467 }
468 
469 DBEntry *DBTable::Find(const DBEntry *entry) {
470  size_t id = HashToPartition(Hash(entry));
471  DBTablePartition *tbl_partition =
472  static_cast<DBTablePartition *>(GetTablePartition(id));
473  return tbl_partition->Find(entry);
474 }
475 
476 const DBEntry *DBTable::Find(const DBEntry *entry) const {
477  return const_cast<DBTable *>(this)->Find(entry);
478 }
479 
480 // Find DB Entry without taking lock. Calling routine must ensure its
481 // running in exclusion with DB task
483  int id = HashToPartition(Hash(key));
484  DBTablePartition *tbl_partition =
485  static_cast<DBTablePartition *>(GetTablePartition(id));
486  return tbl_partition->FindNoLock(key);
487 }
488 
489 DBEntry *DBTable::Find(const DBRequestKey *key, int id) {
490  if (id == -1)
491  id = HashToPartition(Hash(key));
492  DBTablePartition *tbl_partition =
493  static_cast<DBTablePartition *>(GetTablePartition(id));
494  return tbl_partition->Find(key);
495 }
496 
497 const DBEntry *DBTable::Find(const DBRequestKey *key, int id) const {
498  return const_cast<DBTable *>(this)->Find(key, id);
499 }
500 
501 //
502 // Concurrency: called from task that's mutually exclusive with db::DBTable
503 // or db::IFMapTable as applicable.
504 //
505 // Calculate the size across all partitions.
506 //
507 size_t DBTable::Size() const {
508  size_t total = 0;
509  for (vector<DBTablePartition *>::const_iterator iter = partitions_.begin();
510  iter != partitions_.end(); iter++) {
511  total += (*iter)->size();
512  }
513  return total;
514 }
515 
516 void DBTable::Input(DBTablePartition *tbl_partition, DBClient *client,
517  DBRequest *req) {
518  DBRequestKey *key =
519  static_cast<DBRequestKey *>(req->key.get());
520  DBEntry *entry = NULL;
521 
522  entry = tbl_partition->Find(key);
523  if (req->oper == DBRequest::DB_ENTRY_ADD_CHANGE) {
524  if (entry) {
525  if (OnChange(entry, req) || entry->IsDeleted()) {
526  // The entry may currently be marked as deleted.
527  entry->ClearDelete();
528  tbl_partition->Change(entry);
529  }
530  } else {
531  if ((entry = Add(req)) != NULL) {
532  tbl_partition->Add(entry);
533  }
534  }
535  } else if (req->oper == DBRequest::DB_ENTRY_DELETE) {
536  if (entry) {
537  if (Delete(entry, req)) {
538  tbl_partition->Delete(entry);
539  }
540  }
541  } else if (req->oper == DBRequest::DB_ENTRY_NOTIFY) {
542  if (entry) {
543  tbl_partition->Notify(entry);
544  }
545  } else {
546  assert(0);
547  }
548 }
549 
551  DBEntryBase *next = NULL;
552 
553  for (int i = 0; i < table->PartitionCount(); ++i) {
554  DBTablePartition *partition = static_cast<DBTablePartition *>(
555  table->GetTablePartition(i));
556 
557  for (DBEntryBase *entry = partition->GetFirst(); entry; entry = next) {
558  next = partition->GetNext(entry);
559  DBState *state = entry->GetState(table, id);
560  if (state) {
561  entry->ClearState(table, id);
562  delete state;
563  }
564  }
565  }
566 }
567 
568 //
569 // Callback for table walk triggered by NotifyAllEntries.
570 //
572  tpart->Notify(entry);
573  return true;
574 }
575 
576 //
577 // Callback for completion of table walk triggered by NotifyAllEntries.
578 //
580  walk_ref_.reset();
581 }
582 
583 //
584 // Concurrency: called from task that's mutually exclusive with db::DBTable
585 // or db::IFMapTable as applicable.
586 //
587 // Trigger notification of all entries to all listeners.
588 // Should be used sparingly e.g. to handle significant configuration change.
589 //
590 // The walk callback just turns around and puts the DBentryBase on the change
591 // list.
592 //
593 // If the walk is already running, it is allowed to complete and WalkAgain API
594 // is invoked to trigger walk on current walk completion.
595 //
597  CHECK_CONCURRENCY("bgp::Config", "bgp::ConfigHelper", "bgp::RTFilter",
598  "db::DBTable");
599 
600  if (empty())
601  return;
602 
603  if (walk_ref_ == NULL) {
604  walk_ref_ =
605  AllocWalker(boost::bind(&DBTable::WalkCallback, this, _1, _2),
606  boost::bind(&DBTable::WalkCompleteCallback, this, _2));
608  } else {
610  }
611 }
612 
614  WalkCompleteFn walk_complete) {
615  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
616  return walk_mgr->AllocWalker(this, walk_fn, walk_complete);
617 }
618 
620  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
621  walk_mgr->ReleaseWalker(walk);
622  return;
623 }
624 
626  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
627  walk_mgr->WalkTable(walk);
628  return;
629 }
630 
632  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
633  walk_mgr->WalkAgain(walk);
634  return;
635 }
636 
638  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
639  return walk_mgr->InvokeWalkCb(part, entry);
640 }
641 
644  walker_->ClearWalkWorks();
645  DBTableWalkMgr *walk_mgr = database()->GetWalkMgr();
646  return walk_mgr->WalkDone();
647 }
bool HasWalkers() const
Definition: db_table.h:128
static const int kIterationToYield
Definition: db_table.h:181
tbb::atomic< uint64_t > walk_request_count_
Definition: db_table.h:154
DB * db_
Definition: db_table.h:145
boost::dynamic_bitset bmap_
Definition: db_table.cc:164
std::string Description() const
Definition: db_table.cc:252
size_t size() const
Definition: db_table.cc:153
DBEntry * FindNoLock(const DBEntry *entry)
void STLDeleteValues(Container *container)
Definition: util.h:101
DBTable * table()
Definition: db_table.cc:272
void WalkTable(DBTableWalkRef walk)
Definition: db_table.cc:625
bool WalkCallback(DBTablePartBase *tpart, DBEntryBase *entry)
Definition: db_table.cc:571
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
Definition: db_table.cc:207
uint64_t enqueue_count_
Definition: db_table.h:149
ListenerInfo(const string &table_name)
Definition: db_table.cc:55
uint64_t GetDBStateCount(ListenerId listener)
Definition: db_table.cc:128
virtual ~DBTableBase()
Definition: db_table.cc:178
virtual bool OnChange(DBEntry *entry, const DBRequest *req)
Definition: db_table.cc:412
void AddToDBStateCount(ListenerId listener, int count)
Definition: db_table.cc:212
DBEntry * Find(const DBEntry *entry)
int walker_task_id_
Definition: db_table.h:353
std::vector< DBTablePartition * > partitions_
Definition: db_table.h:351
bool IsDeleted() const
Definition: db_entry.h:49
virtual void RetryDelete()
Definition: db_table.h:104
DBTableBase(DB *db, const std::string &name)
Definition: db_table.cc:167
virtual DBEntry * GetNext(const DBEntryBase *entry)
TableWalker(DBTable *table)
Definition: db_table.cc:266
TableWalker * walker_
Definition: db_table.cc:261
int ListenerId
Definition: db_table.h:62
std::unique_ptr< DBRequestData > data
Definition: db_table.h:49
DBTableWalkRef AllocWalker(WalkFn walk_fn, WalkCompleteFn walk_complete)
Definition: db_table.cc:613
~DBRequest()
Definition: db_table.cc:33
boost::function< void(DBTablePartBase *, DBEntryBase *)> ChangeCallback
Definition: db_table.h:61
virtual DBEntry * Add(const DBRequest *req)
Definition: db_table.cc:403
bool Enqueue(DBRequest *req)
Definition: db_table.cc:194
virtual void Change(DBEntryBase *entry)
Definition: db_table.cc:407
void incr_walk_count()
Definition: db_table.h:142
CallbackList callbacks_
Definition: db_table.cc:160
tbb::atomic< uint64_t > walk_count_
Definition: db_table.h:153
DBTableBase::ListenerId Register(ChangeCallback callback, const string &name)
Definition: db_table.cc:64
void NotifyAllEntries()
Definition: db_table.cc:596
DB * database()
Definition: db_table.h:107
tbb::atomic< uint64_t > walk_again_count_
Definition: db_table.h:157
void Delete(DBEntryBase *)
tbb::spin_rw_mutex rw_mutex_
Definition: db_table.cc:163
uint64_t GetDBStateCount(ListenerId listener)
Definition: db_table.cc:216
void FillListeners(vector< ShowTableListener > *listeners) const
Definition: db_table.cc:133
void Swap(DBRequest *rhs)
Definition: db_table.cc:43
virtual size_t Hash(const DBEntry *entry) const
Definition: db_table.h:195
void FillListeners(std::vector< ShowTableListener > *listeners) const
Definition: db_table.cc:242
void Unregister(ListenerId listener)
Definition: db_table.cc:186
virtual void Change(DBEntry *entry)
DBTable::DBTableWalkRef AllocWalker(DBTable *table, DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete)
static void db_walker_wait()
Definition: db_table.h:308
void WalkTable(DBTable::DBTableWalkRef walk)
void ReleaseWalker(DBTableWalkRef &walk)
Definition: db_table.cc:619
void Unregister(ListenerId listener)
Definition: db_table.cc:86
DBTablePartition * tbl_partition_
Definition: db_table.cc:259
boost::function< void(DBTableWalkRef, DBTableBase *)> WalkCompleteFn
Definition: db_table.h:179
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
Definition: db_table.cc:181
Definition: db.h:24
std::unique_ptr< TableWalker > walker_
Definition: db_table.h:350
tbb::atomic< uint64_t > walk_complete_count_
Definition: db_table.h:155
void WalkAgain(DBTableWalkRef walk)
Definition: db_table.cc:631
void ReleaseWalker(DBTable::DBTableWalkRef &walk)
virtual size_t Size() const
Definition: db_table.cc:507
void Init()
Definition: db_table.cc:387
tbb::atomic< uint16_t > pending_workers_
Definition: db_table.cc:282
void RunNotify(DBTablePartBase *tpart, DBEntryBase *entry)
Definition: db_table.cc:111
DBTable(DB *db, const std::string &name)
Definition: db_table.cc:362
uint8_t type
Definition: load_balance.h:109
WalkWorker(TableWalker *walker, int db_partition_id)
Definition: db_table.cc:335
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
virtual bool Delete(DBEntry *entry, const DBRequest *req)
Definition: db_table.cc:416
std::unique_ptr< DBRequestKey > key
Definition: db_table.h:48
DBOperation oper
Definition: db_table.h:42
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Definition: db_table.cc:289
#define CHECK_CONCURRENCY(...)
void WalkAgain(DBTable::DBTableWalkRef walk)
virtual std::unique_ptr< DBEntry > AllocEntry(const DBRequestKey *key) const =0
std::unique_ptr< ListenerInfo > info_
Definition: db_table.h:148
std::list< Task * > worker_tasks_
Definition: db_table.cc:286
virtual DBEntry * lower_bound(const DBEntryBase *entry)
DBEntry * FindNoLock(const DBEntry *entry)
Definition: db_table.cc:462
int GetWalkerTaskId()
Definition: db_table.h:299
void incr_walk_complete_count()
Definition: db_table.h:139
const std::string & name() const
Definition: db_table.h:110
void ClearDelete()
Definition: db_entry.h:48
void EnqueueRemove(DBEntryBase *db_entry)
Definition: db_table.cc:201
virtual DBTablePartition * AllocPartition(int index)
Definition: db_table.cc:393
DBRequest()
Definition: db_table.cc:30
void StartWalk()
Definition: db_table.cc:397
virtual KeyPtr GetDBRequestKey() const =0
vector< string > NameList
Definition: db_table.cc:52
void WalkDone()
Definition: db_table.cc:642
virtual bool MayDelete() const
Definition: db_table.cc:220
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
Definition: db_table.cc:436
void EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry)
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
Definition: db_table.cc:637
size_t GetListenerCount() const
Definition: db_table.cc:238
std::unique_ptr< DBRequestKey > walk_ctx_
Definition: db_table.cc:256
int max_walk_iteration_to_yield_
Definition: db_table.h:354
static void DBStateClear(DBTable *table, ListenerId id)
Definition: db_table.cc:550
bool HasListeners() const
Definition: db_table.cc:234
static const int kInvalidId
Definition: db_table.h:64
DBEntry * Find(const DBEntry *entry)
Definition: db_table.cc:469
bool empty() const
Definition: db_table.h:101
void WalkCompleteCallback(DBTableBase *tbl_base)
Definition: db_table.cc:579
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
Definition: db_table.h:176
size_t size() const
static int PartitionCount()
Definition: db.cc:32
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
virtual DBEntry * GetFirst()
static size_t HashToPartition(size_t hash)
Definition: db_table.cc:424
virtual void Add(DBEntry *entry)
void AddToDBStateCount(ListenerId listener, int count)
Definition: db_table.cc:122
boost::intrusive_ptr< DBTableWalk > DBTableWalkRef
Definition: db_table.h:169
void Notify(DBEntryBase *entry)
tbb::atomic< uint64_t > walk_cancel_count_
Definition: db_table.h:156
DBTableWalkMgr * GetWalkMgr()
Definition: db.h:51
DBTable::DBTableWalkRef walk_ref_
Definition: db_table.h:352
StateCountList state_count_
Definition: db_table.cc:162
virtual int PartitionCount() const
Definition: db_table.cc:420
vector< tbb::atomic< uint64_t > > StateCountList
Definition: db_table.cc:53
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
uint64_t notify_count_
Definition: db_table.h:151
vector< ChangeCallback > CallbackList
Definition: db_table.cc:51
int GetWalkIterationToYield()
Definition: db_table.h:291
tbb::atomic< uint64_t > walker_count_
Definition: db_table.h:152
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)=0
DBPartition * GetPartition(int index)
Definition: db.cc:60
struct task_ task
virtual void Input(DBTablePartition *tbl_partition, DBClient *client, DBRequest *req)
Definition: db_table.cc:516
int index() const
virtual ~DBTable()
Definition: db_table.cc:383