OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
db_table_walker.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "db/db_table_walker.h"
6 
7 #include <tbb/atomic.h>
8 
9 #include "base/logging.h"
10 #include "base/task.h"
11 #include "db/db.h"
12 #include "db/db_partition.h"
13 #include "db/db_table.h"
14 #include "db/db_table_partition.h"
15 
16 int DBTableWalker::max_iteration_to_yield_ = kIterationToYield;
17 
19 public:
20  Walker(WalkId id, DBTableWalker *wkmgr, DBTable *table,
21  const DBRequestKey *key, WalkFn walker,
22  WalkCompleteFn walk_done, bool postpone_walk);
23 
24  void StopWalk() {
25  assert(workers_.empty());
26  should_stop_.fetch_and_store(true);
27  }
28 
29  // Test only - resume walk that was postponed at creation.
30  void ResumeWalk() {
31  assert(!workers_.empty());
33  for (std::vector<Task *>::iterator it = workers_.begin();
34  it != workers_.end(); ++it) {
35  scheduler->Enqueue(*it);
36  }
37  workers_.clear();
38  }
39 
41 
42  // Parent walker manager
44 
45  // Table on which walk is done
47 
48  // Take the ownership of key passed
49  std::unique_ptr<DBRequestKey> key_start_;
50 
53 
54  // Will be true if Table walk is cancelled
55  tbb::atomic<bool> should_stop_;
56 
57  // check whether iteration is completed on all Table Partition
58  tbb::atomic<long> status_;
59 
60  std::vector<Task *> workers_;
61  int task_id() const { return wkmgr_->task_id(); }
62 };
63 
64 class DBTableWalker::Worker : public Task {
65 public:
66  Worker(Walker *walker, int db_partition_id, const DBRequestKey *key)
67  : Task(walker->task_id(), db_partition_id), walker_(walker),
68  key_start_(key) {
69  tbl_partition_ = static_cast<DBTablePartition *>(
70  walker_->table_->GetTablePartition(db_partition_id));
71  }
72 
73  virtual bool Run();
74  std::string Description() const { return "DBTableWalker::Worker"; }
75 
76 private:
78 
79  // Store the last visited node to continue walk
80  std::unique_ptr<DBRequestKey> walk_ctx_;
81 
82  // This is where the walk started
84 
85  // Table partition for which this worker was created
87 };
88 
89 static void db_walker_wait() {
90  static unsigned int walk_sleep_usecs_;
91  static bool once;
92 
93  if (!once) {
94  once = true;
95 
96  char *wait = getenv("DB_WALKER_WAIT_USECS");
97  if (wait) {
98  walk_sleep_usecs_ = (unsigned int) strtoul(wait, NULL, 0);
99  if (walk_sleep_usecs_ > 1000000)
100  walk_sleep_usecs_ = 1000000;
101  }
102  }
103 
104  if (walk_sleep_usecs_) {
105  usleep(walk_sleep_usecs_);
106  }
107 }
108 
110  int count = 0;
111  DBRequestKey *key_resume;
112 
113  // Check whether Walker was requested to be cancelled
114  if (walker_->should_stop_) {
115  goto walk_done;
116  }
117 
118  // Check where we left in last iteration
119  if ((key_resume = walk_ctx_.get()) == NULL) {
120  // First time invoke of worker thread, start from key_start_
121  key_resume = const_cast <DBRequestKey *>(key_start_);
122  }
123 
124  DBEntry *entry;
125  if (key_resume != NULL) {
126  DBTable *table = walker_->table_;
127  std::unique_ptr<const DBEntryBase> start;
128  start = table->AllocEntry(key_resume);
129  // Find matching or next in sort order
130  entry = tbl_partition_->lower_bound(start.get());
131  } else {
132  entry = tbl_partition_->GetFirst();
133  }
134  if (entry == NULL) {
135  goto walk_done;
136  }
137 
138  for (DBEntry *next = NULL; entry; entry = next) {
139  next = tbl_partition_->GetNext(entry);
140  // Check whether Walker was requested to be cancelled
141  if (walker_->should_stop_) {
142  break;
143  }
144  if (count == GetIterationToYield()) {
145  // store the context
146  walk_ctx_ = entry->GetDBRequestKey();
147  return false;
148  }
149 
150  // Invoke walker function
151  bool more = walker_->walker_fn_(tbl_partition_, entry);
152  if (!more) {
153  break;
154  }
155 
156  db_walker_wait();
157  count++;
158  }
159 
160 walk_done:
161  // Check whether all other walks on the table is completed
162  long num_walkers_on_tpart = walker_->status_.fetch_and_decrement();
163  if (num_walkers_on_tpart == 1) {
164  if (walker_->should_stop_) {
166  } else {
168  // Invoke Walker_Complete callback
169  if (walker_->done_fn_ != NULL) {
171  }
172  }
173 
174  // Release the memory for walker and bitmap
176  }
177  return true;
178 }
179 
181  DBTable *table, const DBRequestKey *key,
182  WalkFn walker, WalkCompleteFn walk_done,
183  bool postpone_walk)
184  : id_(id), wkmgr_(wkmgr), table_(table),
185  key_start_(const_cast<DBRequestKey *>(key)),
186  walker_fn_(walker), done_fn_(walk_done) {
187  int num_worker = table->PartitionCount();
188  should_stop_ = false;
189  status_ = num_worker;
190  for (int i = 0; i < num_worker; i++) {
191  Worker *task = new Worker(this, i, key);
192  if (postpone_walk) {
193  workers_.push_back(task);
194  } else {
196  scheduler->Enqueue(task);
197  }
198  }
199 }
200 
202  if (task_id == -1) {
204  task_id_ = scheduler->GetTaskId("db::DBTable");
205  }
206 }
207 
209  const DBRequestKey *key_start,
210  WalkFn walkerfn ,
211  WalkCompleteFn walk_complete,
212  bool postpone_walk) {
213  table->incr_walk_request_count();
214  tbb::mutex::scoped_lock lock(walkers_mutex_);
215  size_t i = walker_map_.find_first();
216  if (i == walker_map_.npos) {
217  i = walkers_.size();
218  Walker *walker = new Walker(i, this, table, key_start,
219  walkerfn, walk_complete, postpone_walk);
220  walkers_.push_back(walker);
221  } else {
222  walker_map_.reset(i);
223  if (walker_map_.none()) {
224  walker_map_.clear();
225  }
226  Walker *walker = new Walker(i, this, table, key_start,
227  walkerfn, walk_complete, postpone_walk);
228  walkers_[i] = walker;
229  }
230  table->incr_walker_count();
231  return i;
232 }
233 
235  tbb::mutex::scoped_lock lock(walkers_mutex_);
236  walkers_[id]->StopWalk();
237  // Purge to be called after task has stopped
238 }
239 
241  tbb::mutex::scoped_lock lock(walkers_mutex_);
242  walkers_[id]->ResumeWalk();
243 }
244 
246  tbb::mutex::scoped_lock lock(walkers_mutex_);
247  Walker *walker = walkers_[id];
248  DBTable *table = walker->table_;
249  delete walker;
250  walkers_[id] = NULL;
251  if ((size_t) id == walkers_.size() - 1) {
252  while (!walkers_.empty() && walkers_.back() == NULL) {
253  walkers_.pop_back();
254  }
255  if (walker_map_.size() > walkers_.size()) {
256  walker_map_.resize(walkers_.size());
257  }
258  } else {
259  if ((size_t) id >= walker_map_.size()) {
260  walker_map_.resize(id + 1);
261  }
262  walker_map_.set(id);
263  }
264 
265  // Retry table deletion when the last walker is purged.
266  if (table->decr_walker_count() == 0) {
267  table->RetryDelete();
268  }
269 }
DBTablePartition * tbl_partition_
static int GetIterationToYield()
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
DBTableWalker(int task_id=-1)
void PurgeWalker(WalkId id)
void incr_walk_cancel_count()
Definition: db_table.h:140
int task_id() const
virtual void RetryDelete()
Definition: db_table.h:104
virtual DBEntry * GetNext(const DBEntryBase *entry)
Definition: task_int.h:10
tbb::mutex walkers_mutex_
std::unique_ptr< DBRequestKey > key_start_
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
int GetTaskId(const std::string &name)
Definition: task.cc:856
void incr_walker_count()
Definition: db_table.h:130
WalkerList walkers_
static int max_iteration_to_yield_
static void db_walker_wait()
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
std::unique_ptr< DBRequestKey > walk_ctx_
Walker(WalkId id, DBTableWalker *wkmgr, DBTable *table, const DBRequestKey *key, WalkFn walker, WalkCompleteFn walk_done, bool postpone_walk)
tbb::atomic< bool > should_stop_
DBTableWalker * wkmgr_
DBTableWalker::Walker * walker_
WalkCompleteFn done_fn_
virtual std::unique_ptr< DBEntry > AllocEntry(const DBRequestKey *key) const =0
virtual DBEntry * lower_bound(const DBEntryBase *entry)
void incr_walk_complete_count()
Definition: db_table.h:139
const DBRequestKey * key_start_
void WalkCancel(WalkId id)
Worker(Walker *walker, int db_partition_id, const DBRequestKey *key)
virtual KeyPtr GetDBRequestKey() const =0
tbb::atomic< long > status_
uint64_t decr_walker_count()
Definition: db_table.h:131
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
Definition: db_table.cc:436
WalkerMap walker_map_
void WalkResume(WalkId id)
WalkId WalkTable(DBTable *table, const DBRequestKey *key_start, WalkFn walker, WalkCompleteFn walk_complete, bool postpone_walk=false)
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
boost::function< void(DBTableBase *)> WalkCompleteFn
std::vector< Task * > workers_
virtual DBEntry * GetFirst()
std::string Description() const
virtual int PartitionCount() const
Definition: db_table.cc:420
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
void incr_walk_request_count()
Definition: db_table.h:138