OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
db_table_walk_mgr.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "db/db_table_walk_mgr.h"
6 
7 #include <tbb/atomic.h>
8 
9 #include <boost/bind.hpp>
10 #include <boost/foreach.hpp>
11 
12 #include "base/logging.h"
13 #include "base/task.h"
14 #include "base/task_annotations.h"
15 #include "db/db.h"
16 #include "db/db_partition.h"
17 #include "db/db_table.h"
18 #include "db/db_table_partition.h"
19 
21  : walk_request_trigger_(new TaskTrigger(
22  boost::bind(&DBTableWalkMgr::ProcessWalkRequestList, this),
23  TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)),
24  walk_done_trigger_(new TaskTrigger(
25  boost::bind(&DBTableWalkMgr::ProcessWalkDone, this),
26  TaskScheduler::GetInstance()->GetTaskId("db::Walker"), 0)) {
27 }
28 
30  CHECK_CONCURRENCY("db::Walker");
31  tbb::mutex::scoped_lock lock(mutex_);
32  if (!current_table_walk_.empty()) return true;
33  while (true) {
34  if (walk_request_list_.empty()) break;
36  walk_request_set_.erase(info.get());
37  walk_request_list_.pop_front();
38  current_table_walk_.swap(info->pending_requests);
39  DBTable *table = info->table;
40  bool walk_table = false;
41  for (auto walker : current_table_walk_) {
42  if (walker->stopped()) continue;
43  walker->set_in_progress();
44  walker->reset_walk_again();
45  walk_table = true;
46  }
47  if (walk_table) {
48  // start the walk
49  table->StartWalk();
50  break;
51  } else {
52  current_table_walk_.clear();
53  }
54  }
55  return true;
56 }
57 
59  CHECK_CONCURRENCY("db::Walker");
60  assert(!current_table_walk_.empty());
61  for (auto walker : current_table_walk_) {
62  if (walker->walk_again())
63  walker->set_walk_requested();
64  else if (!walker->stopped())
65  walker->set_walk_done();
66  if (walker->stopped() || walker->walk_again()) continue;
67  walker->walk_complete()(walker, walker->table());
68  }
69  current_table_walk_.clear();
70  walk_request_trigger_->Set();
71  return true;
72 }
73 
75  DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete) {
76  table->incr_walker_count();
77  DBTableWalk *walker = new DBTableWalk(table, walk_fn, walk_complete);
78  return DBTable::DBTableWalkRef(walker);
79 }
80 
82  ref->set_walk_stopped();
83  ref.reset();
84 }
85 
87  WalkTable(ref);
88 }
89 
91  tbb::mutex::scoped_lock lock(mutex_);
92  DBTable *table = walk->table();
93 
94  if (walk->in_progress()) {
95  table->incr_walk_again_count();
96  walk->set_walk_again();
97  } else {
98  table->incr_walk_request_count();
99  walk->set_walk_requested();
100  }
101 
102  WalkRequestInfo tmp_info = WalkRequestInfo(table);
103  WalkRequestInfoSet::iterator it = walk_request_set_.find(&tmp_info);
104  if (it != walk_request_set_.end()) {
105  (*it)->AppendWalkReq(walk);
106  return;
107  }
108 
109  WalkRequestInfo *new_info = new WalkRequestInfo(table);
110  new_info->AppendWalkReq(walk);
111  walk_request_list_.push_back(WalkRequestInfoPtr(new_info));
112  walk_request_set_.insert(new_info);
113  walk_request_trigger_->Set();
114 }
115 
117  walk_done_trigger_->Set();
118 }
119 
121  uint32_t skip_walk_count = 0;
122  for (auto walker : current_table_walk_) {
123  if (walker->done() || walker->stopped() || walker->walk_again()) {
124  skip_walk_count++;
125  continue;
126  }
127  bool more = walker->walk_fn()(part, entry);
128  if (!more) {
129  skip_walk_count++;
130  if (!walker->stopped()) walker->set_walk_done();
131  }
132  }
133  return (skip_walk_count < current_table_walk_.size());
134 }
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
bool ProcessWalkRequestList()
WalkRequestInfoSet walk_request_set_
DBTable::DBTableWalkRef AllocWalker(DBTable *table, DBTable::WalkFn walk_fn, DBTable::WalkCompleteFn walk_complete)
void WalkTable(DBTable::DBTableWalkRef walk)
boost::function< void(DBTableWalkRef, DBTableBase *)> WalkCompleteFn
Definition: db_table.h:179
void incr_walker_count()
Definition: db_table.h:130
void AppendWalkReq(DBTable::DBTableWalkRef ref)
void ReleaseWalker(DBTable::DBTableWalkRef &walk)
boost::scoped_ptr< TaskTrigger > walk_done_trigger_
#define CHECK_CONCURRENCY(...)
void WalkAgain(DBTable::DBTableWalkRef walk)
WalkRequestInfoList walk_request_list_
void StartWalk()
Definition: db_table.cc:397
boost::shared_ptr< WalkRequestInfo > WalkRequestInfoPtr
boost::function< bool(DBTablePartBase *, DBEntryBase *)> WalkFn
Definition: db_table.h:176
bool InvokeWalkCb(DBTablePartBase *part, DBEntryBase *entry)
boost::scoped_ptr< TaskTrigger > walk_request_trigger_
boost::intrusive_ptr< DBTableWalk > DBTableWalkRef
Definition: db_table.h:169
void incr_walk_again_count()
Definition: db_table.h:141
void incr_walk_request_count()
Definition: db_table.h:138
WalkReqList current_table_walk_