OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
db_partition.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "db/db_partition.h"
6 
7 #include <list>
8 #include <tbb/atomic.h>
9 #include <tbb/concurrent_queue.h>
10 #include <tbb/mutex.h>
11 
12 #include "base/task.h"
13 #include "db/db.h"
14 #include "db/db_client.h"
15 #include "db/db_entry.h"
16 
17 using tbb::concurrent_queue;
18 using tbb::atomic;
19 
21  // Constructor takes ownership of DBRequest key, data.
23  : tpart(tpart), client(client) {
24  request.Swap(req);
25  }
29 };
30 
33  : tpart(tpart), db_entry(db_entry) {
34  }
37 };
38 
40 public:
41  static const int kThreshold = 1024;
42  typedef concurrent_queue<RequestQueueEntry *> RequestQueue;
43  typedef concurrent_queue<RemoveQueueEntry *> RemoveQueue;
44  typedef std::list<DBTablePartBase *> TablePartList;
45 
46  explicit WorkQueue(DBPartition *partition, int partition_id)
47  : db_partition_(partition),
48  db_partition_id_(partition_id),
49  disable_(false),
50  running_(false) {
51  request_count_ = 0;
54  }
56  for (RequestQueue::iterator iter = request_queue_.unsafe_begin();
57  iter != request_queue_.unsafe_end();) {
58  RequestQueueEntry *req_entry = *iter;
59  ++iter;
60  delete req_entry;
61  }
62  request_queue_.clear();
63  }
64 
65  bool EnqueueRequest(RequestQueueEntry *req_entry) {
66  request_queue_.push(req_entry);
68  uint32_t max = request_count_.fetch_and_increment();
69  if (max > max_request_queue_len_)
72  return max < (kThreshold - 1);
73 
74  }
75 
76  bool DequeueRequest(RequestQueueEntry **req_entry) {
77  bool success = request_queue_.try_pop(*req_entry);
78  if (success) {
79  request_count_.fetch_and_decrement();
80  }
81  return success;
82  }
83 
84  void EnqueueRemove(RemoveQueueEntry *rm_entry) {
85  remove_queue_.push(rm_entry);
87  }
88 
89  bool DequeueRemove(RemoveQueueEntry **rm_entry) {
90  bool success = remove_queue_.try_pop(*rm_entry);
91  return success;
92  }
93 
95  void MaybeStartRunner();
96  bool RunnerDone();
97 
98  // Normally called from single task that either runs in DB context or is
99  // exclusive with DB task, but can be called concurrently from multiple
100  // bgp::ConfigHelper tasks.
101  void SetActive(DBTablePartBase *tpart) {
102  tbb::mutex::scoped_lock lock(mutex_);
103  change_list_.push_back(tpart);
105  }
106 
108  DBTablePartBase *tpart = NULL;
109  if (!change_list_.empty()) {
110  tpart = change_list_.front();
111  change_list_.pop_front();
112  }
113  return tpart;
114  }
115 
117  return db_partition_id_;
118  }
119 
120  int db_task_id() const { return db_partition_->task_id(); }
121 
122  bool IsDBQueueEmpty() const {
123  return (request_queue_.empty() && change_list_.empty());
124  }
125 
126  bool disable() { return disable_; }
128 
129  long request_queue_len() const {
130  return request_count_;
131  }
132 
133  uint64_t total_request_count() const {
134  return total_request_count_;
135  }
136 
137  uint64_t max_request_queue_len() const {
138  return max_request_queue_len_;
139  }
140 
141 private:
145  atomic<long> request_count_;
149  tbb::mutex mutex_;
151  bool disable_;
152  bool running_;
153 
155 };
156 
158  return work_queue_->IsDBQueueEmpty();
159 }
160 
161 void DBPartition::SetQueueDisable(bool disable) {
162  if (disable) {
163  work_queue_->set_disable(true);
164  } else {
165  work_queue_->set_disable(false);
166  work_queue_->MaybeStartRunner();
167  }
168 }
169 
171 public:
172  static const int kMaxIterations = 32;
174  : Task(queue->db_task_id(), queue->db_partition_id()),
175  queue_(queue) {
176  }
177 
178  virtual bool Run() {
179  int count = 0;
180 
181  // Skip if the queue is disabled.
182  if (queue_->disable())
183  return queue_->RunnerDone();
184 
185  RemoveQueueEntry *rm_entry = NULL;
186  while (queue_->DequeueRemove(&rm_entry)) {
187  DBEntryBase *db_entry = rm_entry->db_entry;
188  {
189  tbb::spin_rw_mutex::scoped_lock
190  lock(rm_entry->tpart->dbstate_mutex(), false);
191  if (!db_entry->IsDeleted() || db_entry->is_onlist() ||
192  !db_entry->is_state_empty_unlocked(rm_entry->tpart)) {
193  db_entry->ClearOnRemoveQ();
194  db_entry = NULL;
195  }
196  }
197  if (db_entry) {
198  rm_entry->tpart->Remove(db_entry);
199  }
200  delete rm_entry;
201  if (++count == kMaxIterations) {
202  return false;
203  }
204  }
205 
206  RequestQueueEntry *req_entry = NULL;
207  while (queue_->DequeueRequest(&req_entry)) {
208  req_entry->tpart->Process(req_entry->client, &req_entry->request);
209  delete req_entry;
210  if (++count == kMaxIterations) {
211  return false;
212  }
213  }
214 
215  while (true) {
217  if (tpart == NULL) {
218  break;
219  }
220  bool done = tpart->RunNotify();
221  if (!done) {
222  return false;
223  }
224  }
225 
226  // Running is done only if queue_ is empty. It's possible that more
227  // entries are added into in the input or remove queues during the
228  // time we were processing those queues.
229  return queue_->RunnerDone();
230  }
231 
232  std::string Description() const {
233  return "DBPartition QueueRunner";
234  }
235 private:
237 };
238 
240  if (running_) {
241  return;
242  }
243  running_ = true;
244  QueueRunner *runner = new QueueRunner(this);
246  scheduler->Enqueue(runner);
247 }
248 
250  tbb::mutex::scoped_lock lock(mutex_);
251  MaybeStartRunnerUnlocked();
252 }
253 
255  tbb::mutex::scoped_lock lock(mutex_);
256  if (disable_ || (request_queue_.empty() && remove_queue_.empty())) {
257  running_ = false;
258  return true;
259  }
260 
261  running_ = true;
262  return false;
263 }
264 
265 DBPartition::DBPartition(DB *db, int partition_id)
266  : db_(db), work_queue_(new WorkQueue(this, partition_id)) {
267 }
268 
269 // The DBPartition destructor needs to be defined after WorkQueue has
270 // been declared.
272 }
273 
275  DBRequest *req) {
276  RequestQueueEntry *entry = new RequestQueueEntry(tpart, client, req);
277  return work_queue_->EnqueueRequest(entry);
278 }
279 
281  RemoveQueueEntry *entry = new RemoveQueueEntry(tpart, db_entry);
282  db_entry->SetOnRemoveQ();
283  work_queue_->EnqueueRemove(entry);
284 }
285 
286 // concurrency: called from DBPartition task.
288  work_queue_->SetActive(tablepart);
289 }
290 
292  return work_queue_->request_queue_len();
293 }
294 
296  return work_queue_->total_request_count();
297 }
298 
300  return work_queue_->max_request_queue_len();
301 }
302 
303 int DBPartition::task_id() const {
304  return db_->task_id();
305 }
DBEntryBase * db_entry
Definition: db_partition.cc:36
tbb::spin_rw_mutex & dbstate_mutex()
concurrent_queue< RemoveQueueEntry * > RemoveQueue
Definition: db_partition.cc:43
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
uint64_t total_request_count() const
bool DequeueRequest(RequestQueueEntry **req_entry)
Definition: db_partition.cc:76
bool EnqueueRequest(RequestQueueEntry *req_entry)
Definition: db_partition.cc:65
DBTablePartBase * tpart
Definition: db_partition.cc:35
bool IsDeleted() const
Definition: db_entry.h:49
WorkQueue(DBPartition *partition, int partition_id)
Definition: db_partition.cc:46
uint64_t total_request_count() const
int task_id() const
DISALLOW_COPY_AND_ASSIGN(WorkQueue)
DBTablePartBase * GetActiveTable()
DBTablePartBase * tpart
Definition: db_partition.cc:26
static const int kMaxIterations
RemoveQueueEntry(DBTablePartBase *tpart, DBEntryBase *db_entry)
Definition: db_partition.cc:32
static const int kThreshold
Definition: db_partition.cc:41
void Swap(DBRequest *rhs)
Definition: db_table.cc:43
bool IsDBQueueEmpty() const
Definition: db.h:24
QueueRunner(WorkQueue *queue)
void ClearOnRemoveQ()
Definition: db_entry.h:59
bool IsDBQueueEmpty() const
bool DequeueRemove(RemoveQueueEntry **rm_entry)
Definition: db_partition.cc:89
void SetOnRemoveQ()
Definition: db_entry.h:55
uint64_t max_request_queue_len() const
bool EnqueueRequest(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
TablePartList change_list_
std::unique_ptr< WorkQueue > work_queue_
Definition: db_partition.h:49
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
void SetQueueDisable(bool disable)
uint64_t max_request_queue_len() const
DBPartition * db_partition_
RequestQueueEntry(DBTablePartBase *tpart, DBClient *client, DBRequest *req)
Definition: db_partition.cc:22
void SetActive(DBTablePartBase *tpart)
concurrent_queue< RequestQueueEntry * > RequestQueue
Definition: db_partition.cc:42
atomic< long > request_count_
long request_queue_len() const
DBPartition(DB *db, int partition_id)
bool is_state_empty_unlocked(DBTablePartBase *tpart)
Definition: db_entry.cc:92
DBClient * client
Definition: db_partition.cc:27
int task_id() const
Definition: db.h:79
void EnqueueRemove(DBTablePartBase *tpart, DBEntryBase *db_entry)
std::string Description() const
bool is_onlist()
Definition: db_entry.h:53
virtual void Process(DBClient *client, DBRequest *req)=0
void EnqueueRemove(RemoveQueueEntry *rm_entry)
Definition: db_partition.cc:84
long request_queue_len() const
void set_disable(bool disable)
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
RequestQueue request_queue_
std::list< DBTablePartBase * > TablePartList
Definition: db_partition.cc:44
void OnTableChange(DBTablePartBase *tpart)
virtual void Remove(DBEntryBase *)=0