OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ifmap_update_queue.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 
7 #include <boost/checked_delete.hpp>
8 #include <boost/assign/list_of.hpp>
9 
10 #include <sandesh/request_pipeline.h>
11 
12 #include "ifmap/ifmap_exporter.h"
13 #include "ifmap/ifmap_link.h"
15 #include "ifmap/ifmap_server.h"
16 #include "ifmap/ifmap_server_show_types.h"
17 
18 // Convention for SetSequence():
19 // If we are inserting the item in the middle of the Q, we set its sequence to
20 // the same value as its successor. If its the last item in the Q, it gets the
21 // next available sequence number.
23  IFMapListEntry *next = Next(item);
24  item->set_sequence(next ? next->get_sequence(): ++sequence_);
25 }
26 
27 // Insert 'item' at the end of the list.
29  list_.push_back(*item);
30  SetSequence(item);
32 }
33 
34 // Insert 'item' before 'ptr'.
36  IFMapListEntry *item) {
37  list_.insert(list_.iterator_to(*ptr), *item);
38  SetSequence(item);
40 }
41 
42 // Insert 'item' after 'ptr'.
44  IFMapListEntry *item) {
45  list_.insert(++list_.iterator_to(*ptr), *item);
46  SetSequence(item);
48 }
49 
51  list_.erase(list_.iterator_to(*item));
53 }
54 
56  sequence_(0) {
58 }
59 
62  boost::checked_delete(ptr);
63  }
64 };
65 
67  list_.clear_and_dispose(IFMapListEntryDisposer());
68 }
69 
73 }
74 
76  assert(!update->advertise().empty());
77  bool tm_last = false;
78  if (GetLast() == tail_marker()) {
79  tm_last = true;
80  }
81  PushbackIntoList(update);
82  return tm_last;
83 }
84 
86  EraseFromList(update);
87 }
88 
90  MarkerMap::iterator loc = marker_map_.find(bit);
91  if (loc == marker_map_.end()) {
92  return NULL;
93  }
94  return loc->second;
95 }
96 
97 void IFMapUpdateQueue::Join(int bit) {
98  IFMapMarker *marker = &tail_marker_;
99  marker->mask.set(bit);
100  marker_map_.insert(std::make_pair(bit, marker));
101 }
102 
103 void IFMapUpdateQueue::Leave(int bit) {
104  MarkerMap::iterator loc = marker_map_.find(bit);
105  assert(loc != marker_map_.end());
106  IFMapMarker *marker = loc->second;
107 
108  BitSet reset_bs;
109  reset_bs.set(bit);
110 
111  // Start with the first element after the client's marker
112  for (List::iterator iter = list_.iterator_to(*marker), next;
113  iter != list_.end(); iter = next) {
114  IFMapListEntry *item = iter.operator->();
115  next = ++iter;
116  if (item->IsMarker()) {
117  continue;
118  }
119  IFMapUpdate *update = static_cast<IFMapUpdate *>(item);
120  update->AdvertiseReset(reset_bs);
121  if (update->advertise().empty()) {
122  Dequeue(update);
123  }
124 
125  // Update may be freed.
126  server_->exporter()->StateUpdateOnDequeue(update, reset_bs, true);
127  }
128 
129  marker_map_.erase(loc);
130  marker->mask.reset(bit);
131  if ((marker != &tail_marker_) && (marker->mask.empty())) {
132  EraseFromList(marker);
133  delete marker;
134  }
135 }
136 
138  const BitSet &mmove) {
139  //
140  // Set the bits in dst and update the MarkerMap. Be sure to set the dst
141  // before we reset the src since bitset maybe a reference to src->mask.
142  // Call to operator|=()
143  //
144  dst->mask |= mmove;
145  for (size_t i = mmove.find_first();
146  i != BitSet::npos; i = mmove.find_next(i)) {
147  MarkerMap::iterator loc = marker_map_.find(i);
148  assert(loc != marker_map_.end());
149  loc->second = dst;
150  }
151  // Reset the bits in the src and get rid of it in case it's now empty.
152  src->mask.Reset(mmove);
153  if (src->mask.empty()) {
154  assert(src != &tail_marker_);
155  EraseFromList(src);
156  delete src;
157  }
158 }
159 
161  IFMapListEntry *current,
162  const BitSet &msplit, bool before) {
163  assert(!msplit.empty());
164  IFMapMarker *new_marker = new IFMapMarker();
165 
166  // call to operator=()
167  new_marker->mask = msplit;
168  marker->mask.Reset(msplit);
169  assert(!marker->mask.empty());
170 
171  for (size_t i = msplit.find_first();
172  i != BitSet::npos; i = msplit.find_next(i)) {
173  MarkerMap::iterator loc = marker_map_.find(i);
174  assert(loc != marker_map_.end());
175  loc->second = new_marker;
176  }
177  if (before) {
178  // Insert new_marker before current
179  InsertIntoListBefore(current, new_marker);
180  } else {
181  // Insert new_marker after current
182  InsertIntoListAfter(current, new_marker);
183  }
184  return new_marker;
185 }
186 
188  IFMapListEntry *current,
189  const BitSet &msplit) {
190  bool before = true;
191  IFMapMarker *ret_marker = MarkerSplit(marker, current, msplit, before);
192  return ret_marker;
193 }
194 
196  IFMapListEntry *current,
197  const BitSet &msplit) {
198  bool before = false;
199  IFMapMarker *ret_marker = MarkerSplit(marker, current, msplit, before);
200  return ret_marker;
201 }
202 
203 // Insert marker before current
205  IFMapListEntry *current) {
206  if (marker != current) {
207  EraseFromList(marker);
208  InsertIntoListBefore(current, marker);
209  }
210 }
211 
212 // Insert marker after current
214  IFMapListEntry *current) {
215  if (marker != current) {
216  EraseFromList(marker);
217  InsertIntoListAfter(current, marker);
218  }
219 }
220 
222  List::iterator iter = list_.iterator_to(*current);
223  if (iter == list_.begin()) {
224  return NULL;
225  }
226  --iter;
227  return iter.operator->();
228 }
229 
231  // the list must always have the tail_marker
232  assert(!list_.empty());
233  List::reverse_iterator riter;
234  riter = list_.rbegin();
235  return riter.operator->();
236 }
237 
239  List::iterator iter = list_.iterator_to(*current);
240  if (++iter == list_.end()) {
241  return NULL;
242  }
243  return iter.operator->();
244 }
245 
247  return (list_.begin().operator->() == &tail_marker_) &&
248  (list_.rbegin().operator->() == &tail_marker_);
249 }
250 
252  return (int)list_.size();
253 }
254 
256  int i = 0;
257  IFMapListEntry *item;
258  List::iterator iter = list_.iterator_to(list_.front());
259  while (iter != list_.end()) {
260  item = iter.operator->();
261  if (item->IsMarker()) {
262  IFMapMarker *marker = static_cast<IFMapMarker *>(item);
263  if (marker == &tail_marker_) {
264  std::cout << i << ". Tail Marker: " << item;
265  } else {
266  std::cout << i << ". Marker: " << item;
267  }
268  std::cout << " clients:";
269  for (size_t j = marker->mask.find_first();
270  j != BitSet::npos; j = marker->mask.find_next(j)) {
271  std::cout << " " << j;
272  }
273  std::cout << std::endl;
274  }
275  if (item->IsUpdate()) {
276  std::cout << i << ". Update: " << item << " ";
277  }
278  if (item->IsDelete()) {
279  std::cout << i << ". Delete: " << item << " ";
280  }
281  if (item->IsUpdate() || item->IsDelete()) {
282  IFMapUpdate *update = static_cast<IFMapUpdate *>(item);
283  const IFMapObjectPtr ref = update->data();
284  if (ref.type == IFMapObjectPtr::NODE) {
285  std::cout << "node <";
286  std::cout << ref.u.node->name() << ">" << std::endl;
287  } else if (ref.type == IFMapObjectPtr::LINK) {
288  std::cout << ref.u.link->ToString() << std::endl;
289  }
290  }
291 
292  iter++;
293  i++;
294  }
295  std::cout << "**End of queue**" << std::endl;
296 }
297 
298 // almost everything in this class is static since we dont really want to
299 // intantiate this class
301 public:
302  static const int kMaxElementsPerRound = 50;
303 
305  std::vector<UpdateQueueShowEntry> send_buffer;
306  };
307 
309  return static_cast<RequestPipeline::InstData *>(new ShowData);
310  }
311 
313  // init as 1 indicates we need to init 'first' to begin() since there is
314  // no way to initialize an iterator here.
315  TrackerData() : init(1) { }
316  int init;
317  std::vector<UpdateQueueShowEntry>::const_iterator first;
318  };
319 
321  return static_cast<RequestPipeline::InstData *>(new TrackerData);
322  }
323 
324  static void CopyNode(UpdateQueueShowEntry *dest, IFMapListEntry *src,
325  IFMapUpdateQueue *queue);
326  static bool BufferStage(const Sandesh *sr,
327  const RequestPipeline::PipeSpec ps, int stage,
328  int instNum, RequestPipeline::InstData *data);
329  static bool SendStage(const Sandesh *sr, const RequestPipeline::PipeSpec ps,
330  int stage, int instNum,
332 };
333 
334 void ShowIFMapUpdateQueue::CopyNode(UpdateQueueShowEntry *dest,
335  IFMapListEntry *src,
336  IFMapUpdateQueue *queue) {
337  if (src->IsUpdate() || src->IsDelete()) {
338  IFMapUpdate *update = static_cast<IFMapUpdate *>(src);
339  const IFMapObjectPtr ref = update->data();
340  if (ref.type == IFMapObjectPtr::NODE) {
341  dest->node_name = "<![CDATA[" + ref.u.node->name() + "]]>";
342  } else if (ref.type == IFMapObjectPtr::LINK) {
343  dest->node_name = "<![CDATA[" + ref.u.link->ToString() + "]]>";
344  }
345  if (src->IsUpdate()) {
346  dest->qe_type = "Update";
347  }
348  if (src->IsDelete()) {
349  dest->qe_type = "Delete";
350  }
351  dest->qe_bitset = update->advertise().ToNumberedString();
352  }
353  if (src->IsMarker()) {
354  IFMapMarker *marker = static_cast<IFMapMarker *>(src);
355  dest->node_name = "Marker";
356  if (marker == queue->tail_marker()) {
357  dest->qe_type = "Tail-Marker";
358  } else {
359  dest->qe_type = "Marker";
360  }
361  dest->qe_bitset = marker->mask.ToNumberedString();
362  }
363  dest->queue_insert_ago = src->queue_insert_ago_str();
364  dest->sequence = src->get_sequence();
365 }
366 
368  const RequestPipeline::PipeSpec ps,
369  int stage, int instNum,
371  const IFMapUpdateQueueShowReq *request =
372  static_cast<const IFMapUpdateQueueShowReq *>(ps.snhRequest_.get());
373  IFMapSandeshContext *sctx =
374  static_cast<IFMapSandeshContext *>(request->module_context("IFMap"));
375  ShowData *show_data = static_cast<ShowData *>(data);
376 
377  IFMapUpdateQueue *queue = sctx->ifmap_server()->queue();
378  assert(queue);
379  show_data->send_buffer.reserve(queue->list_.size());
380 
381  IFMapUpdateQueue::List::iterator iter =
382  queue->list_.iterator_to(queue->list_.front());
383  while (iter != queue->list_.end()) {
384  IFMapListEntry *item = iter.operator->();
385 
386  UpdateQueueShowEntry dest;
387  CopyNode(&dest, item, queue);
388  show_data->send_buffer.push_back(dest);
389 
390  iter++;
391  }
392 
393  return true;
394 }
395 
396 // Can be called multiple times i.e. approx total/kMaxElementsPerRound
398  const RequestPipeline::PipeSpec ps,
399  int stage, int instNum,
401  const RequestPipeline::StageData *prev_stage_data = ps.GetStageData(0);
402  const ShowIFMapUpdateQueue::ShowData &show_data =
403  static_cast<const ShowIFMapUpdateQueue::ShowData &>
404  (prev_stage_data->at(0));
405  // Data for this stage
406  TrackerData *tracker_data = static_cast<TrackerData *>(data);
407 
408  std::vector<UpdateQueueShowEntry> dest_buffer;
409  std::vector<UpdateQueueShowEntry>::const_iterator first, last;
410  bool more = false;
411 
412  if (tracker_data->init) {
413  first = show_data.send_buffer.begin();
414  tracker_data->init = 0;
415  } else {
416  first = tracker_data->first;
417  }
418  int rem_num = show_data.send_buffer.end() - first;
419  int send_num = (rem_num < kMaxElementsPerRound) ? rem_num :
421  last = first + send_num;
422  copy(first, last, back_inserter(dest_buffer));
423  // Decide if we want to be called again.
424  if ((rem_num - send_num) > 0) {
425  more = true;
426  } else {
427  more = false;
428  }
429  const IFMapUpdateQueueShowReq *request =
430  static_cast<const IFMapUpdateQueueShowReq *>(ps.snhRequest_.get());
431  IFMapUpdateQueueShowResp *response = new IFMapUpdateQueueShowResp();
432  response->set_queue(dest_buffer);
433  response->set_context(request->context());
434  response->set_more(more);
435  response->Response();
436  tracker_data->first = first + send_num;
437 
438  // Return 'false' to be called again
439  return (!more);
440 }
441 
442 void IFMapUpdateQueueShowReq::HandleRequest() const {
443 
446 
447  // 2 stages - first: gather/read, second: send
448 
449  s0.taskId_ = scheduler->GetTaskId("db::IFMapTable");
452  s0.instances_.push_back(0);
453 
454  // control-node ifmap show command task
455  s1.taskId_ = scheduler->GetTaskId("ifmap::ShowCommandSendStage");
458  s1.instances_.push_back(0);
459 
460  RequestPipeline::PipeSpec ps(this);
461  ps.stages_= boost::assign::list_of(s0)(s1)
462  .convert_to_container<vector<RequestPipeline::StageSpec> >();
463  RequestPipeline rp(ps);
464 }
ObjectType type
Definition: ifmap_update.h:38
static RequestPipeline::InstData * AllocBuffer(int stage)
std::vector< int > instances_
std::vector< UpdateQueueShowEntry > send_buffer
IFMapServer * ifmap_server()
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
IFMapMarker * MarkerSplit(IFMapMarker *marker, IFMapListEntry *current, const BitSet &msplit, bool before)
boost::ptr_vector< InstData > StageData
static RequestPipeline::InstData * AllocTracker(int stage)
void MoveMarkerBefore(IFMapMarker *marker, IFMapListEntry *current)
void MoveMarkerAfter(IFMapMarker *marker, IFMapListEntry *current)
void Reset(const BitSet &rhs)
Definition: bitset.cc:470
IFMapMarker * tail_marker()
void SetSequence(IFMapListEntry *item)
IFMapListEntry * GetLast()
static const uint64_t NULL_SEQUENCE
IFMapExporter * exporter()
Definition: ifmap_server.h:86
bool IsDelete() const
Definition: ifmap_update.h:66
const BitSet & advertise() const
Definition: ifmap_update.h:94
void AdvertiseReset(const BitSet &set)
Definition: ifmap_update.cc:59
bool IsMarker() const
Definition: ifmap_update.h:64
bool empty() const
Definition: bitset.cc:165
int GetTaskId(const std::string &name)
Definition: task.cc:856
IFMapMarker * GetMarker(int bit)
void Dequeue(IFMapUpdate *update)
uint64_t get_sequence()
Definition: ifmap_update.h:82
IFMapMarker * MarkerSplitAfter(IFMapMarker *marker, IFMapListEntry *current, const BitSet &msplit)
bool IsUpdate() const
Definition: ifmap_update.h:65
static const size_t npos
Definition: bitset.h:19
static TaskScheduler * GetInstance()
Definition: task.cc:547
IFMapLink * link
Definition: ifmap_update.h:42
const IFMapObjectPtr & data() const
Definition: ifmap_update.h:96
IFMapNode * node
Definition: ifmap_update.h:41
std::vector< UpdateQueueShowEntry >::const_iterator first
IFMapMarker * MarkerSplitBefore(IFMapMarker *marker, IFMapListEntry *current, const BitSet &msplit)
IFMapUpdateQueue * queue()
Definition: ifmap_server.h:84
const std::string & name() const
Definition: ifmap_node.h:48
void set_sequence(uint64_t seq)
Definition: ifmap_update.h:81
Definition: bitset.h:17
void EraseFromList(IFMapListEntry *item)
BitSet & set(size_t pos)
Definition: bitset.cc:125
IFMapListEntry * Previous(IFMapListEntry *current)
void StateUpdateOnDequeue(IFMapUpdate *update, const BitSet &dequeue_set, bool is_delete)
void MarkerMerge(IFMapMarker *dst, IFMapMarker *src, const BitSet &mmove)
size_t find_first() const
Definition: bitset.cc:242
static void CopyNode(UpdateQueueShowEntry *dest, IFMapListEntry *src, IFMapUpdateQueue *queue)
static bool SendStage(const Sandesh *sr, const RequestPipeline::PipeSpec ps, int stage, int instNum, RequestPipeline::InstData *data)
IFMapServer * server_
boost::shared_ptr< const SandeshRequest > snhRequest_
size_t find_next(size_t pos) const
Definition: bitset.cc:255
static const int kMaxElementsPerRound
union IFMapObjectPtr::@5 u
void InsertIntoListBefore(IFMapListEntry *ptr, IFMapListEntry *item)
std::string queue_insert_ago_str()
Definition: ifmap_update.cc:29
IFMapMarker tail_marker_
void PushbackIntoList(IFMapListEntry *item)
std::string ToNumberedString() const
Definition: bitset.cc:593
void set_queue_insert_at_to_now()
Definition: ifmap_update.cc:25
const StageData * GetStageData(int stage) const
void operator()(IFMapListEntry *ptr)
static bool BufferStage(const Sandesh *sr, const RequestPipeline::PipeSpec ps, int stage, int instNum, RequestPipeline::InstData *data)
IFMapUpdateQueue(IFMapServer *server)
void InsertIntoListAfter(IFMapListEntry *ptr, IFMapListEntry *item)
IFMapListEntry * Next(IFMapListEntry *current)
bool Enqueue(IFMapUpdate *update)