OpenSDN source code
bgp_update_sender.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 
7 #include <boost/bind/bind.hpp>
8 #include <boost/foreach.hpp>
9 
10 #include <map>
11 #include <string>
12 #include <atomic>
13 
14 #include "base/task_annotations.h"
15 #include "bgp/ipeer.h"
16 #include "bgp/bgp_ribout.h"
17 #include "bgp/bgp_ribout_updates.h"
18 #include "db/db.h"
19 
20 using std::unique_ptr;
21 using std::make_pair;
22 using std::map;
23 using std::string;
24 using std::vector;
25 using namespace boost::placeholders;
26 
27 //
28 // This struct represents RibOut specific state for a PeerState. There's one
29 // instance of this for each RibOut that an IPeerUpdate has joined.
30 //
31 // The PeerRibState contains a bit mask to keep track of the QueueIds that are
32 // currently active for the RibOut for the IPeerUpdate.
33 //
35  PeerRibState() : qactive(0) { }
36  uint8_t qactive;
37 };
38 
39 //
40 // This nested class represents IPeerUpdate related state that's specific to
41 // the BgpSenderPartition.
42 //
43 // A PeerState contains a Map of the index for a RibState to a PeerRibState.
44 // Each entry in the map logically represents the state of the peer for the
45 // ribout.
46 //
47 // The Map is used in conjunction with the RibStateMap in BgpSenderPartition
48 // to implement regular and circular iterator classes that are used to walk
49 // through all the RibState entries for a peer.
50 //
51 // A PeerState maintains the in_sync and send_ready state for the IPeerUpdate.
52 //
53 // The PeerState is considered to be send_ready when the underlying socket is
54 // is writable. Note that the send_ready state in the PeerState may be out of
55 // date with the actual socket state because the socket could have got blocked
56 // when writing from another partition. Hence IPeerUpdate::send_ready() is the
57 // more authoritative source.
58 //
59 // The PeerState is considered to be in_sync if it's send_ready and the marker
60 // IPeerUpdate the peer has merged with the tail marker for all QueueIds in
61 // all RiBOuts for which the IPeerUpdate is subscribed.
62 //
63 // The PeerState keeps count of the number of active RibOuts for each QueueId.
64 // A (RibOut, QueueId) pair is considered to be active if the PeerState isn't
65 // send_ready and there's RouteUpdates for the pair.
66 //
68 public:
69  typedef map<size_t, PeerRibState> Map;
70 
71  class iterator : public boost::iterator_facade<
72  iterator, RibOut, boost::forward_traversal_tag> {
73  public:
74  explicit iterator(const RibStateMap &indexmap, Map *map, size_t index)
75  : indexmap_(indexmap), map_(map), index_(index) {
76  }
77  size_t index() const { return index_; }
78  RibState *rib_state() { return indexmap_.At(index_); }
79  const PeerRibState &peer_rib_state() const { return (*map_)[index_]; }
80 
81  private:
82  friend class boost::iterator_core_access;
83  void increment() {
84  Map::const_iterator loc = map_->upper_bound(index_);
85  if (loc == map_->end()) {
86  index_ = -1;
87  } else {
88  index_ = loc->first;
89  }
90  }
91  bool equal(const iterator &rhs) const {
92  return index_ == rhs.index_;
93  }
94  RibOut &dereference() const;
95 
98  size_t index_;
99  };
100 
101  class circular_iterator : public boost::iterator_facade<
102  circular_iterator, RibOut, boost::forward_traversal_tag> {
103  public:
104  explicit circular_iterator(const RibStateMap &indexmap, Map *map,
105  int start, bool is_valid)
106  : indexmap_(indexmap), map_(map), index_(-1), match_(true) {
107  if (map_->empty()) {
108  return;
109  }
110  Map::const_iterator loc = map_->lower_bound(start);
111  if (loc == map_->end()) {
112  loc = map_->begin();
113  }
114  index_ = loc->first;
115  if (is_valid) match_ = false;
116  }
117  int index() const { return index_; }
118  RibState *rib_state() { return indexmap_.At(index_); }
119  const PeerRibState &peer_rib_state() const { return (*map_)[index_]; }
120 
121  private:
122  friend class boost::iterator_core_access;
123  void increment() {
124  match_ = true;
125  assert(!map_->empty());
126  Map::const_iterator loc = map_->upper_bound(index_);
127  if (loc == map_->end()) {
128  loc = map_->begin();
129  }
130  index_ = loc->first;
131  }
132  bool equal(const circular_iterator &rhs) const {
133  return ((match_ == rhs.match_) && (index_ == rhs.index_));
134  }
135  RibOut &dereference() const;
136 
139  int index_;
140  bool match_;
141  };
142 
143  explicit PeerState(IPeerUpdate *peer)
144  : key_(peer), index_(-1),
145  qactive_cnt_(RibOutUpdates::QCOUNT),
146  in_sync_(true), rib_iterator_(BitSet::npos) {
147  send_ready_ = true;
148  for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
149  qactive_cnt_[i] = 0;
150  }
151  }
152 
153  void Add(RibState *rs);
154 
155  void Remove(RibState *rs);
156 
157  bool IsMember(size_t index) const {
158  return rib_bitset_.test(index);
159  }
160 
161  iterator begin(const RibStateMap &indexmap) {
162  Map::const_iterator it = rib_set_.begin();
163  size_t index = (it != rib_set_.end() ? it->first : -1);
164  return iterator(indexmap, &rib_set_, index);
165  }
166  iterator end(const RibStateMap &indexmap) {
167  return iterator(indexmap, &rib_set_, -1);
168  }
169 
171  return circular_iterator(indexmap, &rib_set_, rib_iterator_, true);
172  }
174  return circular_iterator(indexmap, &rib_set_, rib_iterator_, false);
175  }
176 
177  void SetIteratorStart(size_t start) { rib_iterator_ = start; }
178 
179  void SetQueueActive(size_t rib_index, int queue_id) {
180  CHECK_CONCURRENCY("bgp::SendUpdate");
181  Map::iterator loc = rib_set_.find(rib_index);
182  assert(loc != rib_set_.end());
183  if (!BitIsSet(loc->second.qactive, queue_id)) {
184  SetBit(loc->second.qactive, queue_id);
185  qactive_cnt_[queue_id]++;
186  }
187  }
188 
189  void SetQueueInactive(size_t rib_index, int queue_id) {
190  CHECK_CONCURRENCY("bgp::SendUpdate", "bgp::PeerMembership");
191  Map::iterator loc = rib_set_.find(rib_index);
192  assert(loc != rib_set_.end());
193  if (BitIsSet(loc->second.qactive, queue_id)) {
194  ClearBit(loc->second.qactive, queue_id);
195  qactive_cnt_[queue_id]--;
196  }
197  }
198 
199  bool IsQueueActive(size_t rib_index, int queue_id) {
200  CHECK_CONCURRENCY("bgp::SendUpdate");
201  Map::iterator loc = rib_set_.find(rib_index);
202  assert(loc != rib_set_.end());
203  return BitIsSet(loc->second.qactive, queue_id);
204  }
205 
206  int QueueCount(int queue_id) { return qactive_cnt_[queue_id]; }
207 
208  IPeerUpdate *peer() const { return key_; }
209  void set_index(size_t index) { index_ = index; }
210  size_t index() const { return index_; }
211 
212  bool in_sync() const { return in_sync_; }
213  void clear_sync() { in_sync_ = false; }
214  void SetSync();
215 
216  bool send_ready() const { return send_ready_; }
217  void set_send_ready(bool toggle) { send_ready_ = toggle; }
218 
219  bool empty() const { return rib_set_.empty(); }
220 
221  bool CheckInvariants() const {
222  for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
223  CHECK_INVARIANT(qactive_cnt_[i] <= (int) rib_set_.size());
224  }
225  return true;
226  }
227 
228 private:
230  size_t index_; // assigned from PeerStateMap
231  Map rib_set_; // list of RibOuts advertised by the peer.
232  BitSet rib_bitset_; // bitset of RibOuts advertised by the peer
233  vector<int> qactive_cnt_;
234  bool in_sync_; // whether the peer may dequeue tail markers.
235  std::atomic<bool> send_ready_; // whether the peer may send updates.
236  size_t rib_iterator_; // index of last processed rib.
237 
239 };
240 
241 //
242 // This nested class represents RibOut related state that's specific to the
243 // BgpSenderPartition.
244 //
245 // A RibState contains a BitSet of all the peers that are advertising the
246 // RibOut associated with the RibState.
247 //
248 // The BitSet is used in conjunction with PeerStateMap in BgpSenderPartition
249 // to implement an iterator that is used to walk through all the PeerState
250 // entries for the ribout.
251 //
253 public:
254  class iterator : public boost::iterator_facade<
255  iterator, PeerState, boost::forward_traversal_tag> {
256  public:
257  explicit iterator(const PeerStateMap &indexmap,
258  const BitSet &set, size_t index)
259  : indexmap_(indexmap), set_(set), index_(index) {
260  }
261  size_t index() const { return index_; }
262 
263  private:
264  friend class boost::iterator_core_access;
265  void increment() {
266  index_ = set_.find_next(index_);
267  }
268  bool equal(const iterator &rhs) const {
269  return index_ == rhs.index_;
270  }
272  return *indexmap_.At(index_);
273  }
275  const BitSet &set_;
276  size_t index_;
277  };
278 
279  explicit RibState(RibOut *ribout)
280  : key_(ribout), index_(-1), in_sync_(RibOutUpdates::QCOUNT, true) {
281  }
282 
283  void Add(PeerState *ps);
284  void Remove(PeerState *ps);
285 
286  bool QueueSync(int queue_id);
287  void SetQueueSync(int queue_id);
288  void SetQueueUnsync(int queue_id);
289 
290  RibOut *ribout() { return key_; }
291 
292  iterator begin(const PeerStateMap &indexmap) {
293  return iterator(indexmap, peer_set_, peer_set_.find_first());
294  }
295 
296  iterator end(const PeerStateMap &indexmap) {
297  return iterator(indexmap, peer_set_, BitSet::npos);
298  }
299 
300  void set_index(size_t index) { index_ = index; }
301  size_t index() const { return index_; }
302 
303  bool empty() const { return peer_set_.none(); }
304 
305  const BitSet &peer_set() const { return peer_set_; }
306 
307 private:
309  size_t index_;
311  vector<bool> in_sync_;
312 
314 };
315 
316 
318  CHECK_CONCURRENCY("bgp::PeerMembership");
319  peer_set_.set(ps->index());
320 }
321 
323  CHECK_CONCURRENCY("bgp::PeerMembership");
324  peer_set_.reset(ps->index());
325 }
326 
328  CHECK_CONCURRENCY("bgp::SendUpdate");
329  return (in_sync_[queue_id]);
330 }
331 
333  CHECK_CONCURRENCY("bgp::SendUpdate");
334  in_sync_[queue_id] = true;
335 }
336 
338  CHECK_CONCURRENCY("bgp::SendUpdate");
339  in_sync_[queue_id] = false;
340 }
341 
343  CHECK_CONCURRENCY("bgp::PeerMembership");
345  rib_set_.insert(make_pair(rs->index(), init));
346  rib_bitset_.set(rs->index());
347 }
348 
350  CHECK_CONCURRENCY("bgp::PeerMembership");
351  for (int queue_id = 0; queue_id < RibOutUpdates::QCOUNT; queue_id++) {
352  SetQueueInactive(rs->index(), queue_id);
353  }
354  rib_set_.erase(rs->index());
355  rib_bitset_.reset(rs->index());
356 }
357 
359  CHECK_CONCURRENCY("bgp::SendUpdate");
360  for (Map::iterator it = rib_set_.begin(); it != rib_set_.end(); ++it) {
361  assert(it->second.qactive == 0);
362  }
363  for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
364  assert(qactive_cnt_[i] == 0);
365  }
366  in_sync_ = true;
367 }
368 
370  return *indexmap_.At(index_)->ribout();
371 }
372 
374  return *indexmap_.At(index_)->ribout();
375 }
376 
378 public:
379  explicit Worker(BgpSenderPartition *partition)
380  : Task(partition->task_id(), partition->index()),
381  partition_(partition) {
382  }
383 
384  virtual bool Run() {
385  CHECK_CONCURRENCY("bgp::SendUpdate");
386 
387  while (true) {
388  unique_ptr<WorkBase> wentry = partition_->WorkDequeue();
389  if (!wentry.get())
390  break;
391  if (!wentry->valid)
392  continue;
393  switch (wentry->type) {
394  case WorkBase::WRibOut: {
395  WorkRibOut *workrib = static_cast<WorkRibOut *>(wentry.get());
396  partition_->UpdateRibOut(workrib->ribout, workrib->queue_id);
397  break;
398  }
399  case WorkBase::WPeer: {
400  WorkPeer *workpeer = static_cast<WorkPeer *>(wentry.get());
401  partition_->UpdatePeer(workpeer->peer);
402  break;
403  }
404  }
405  }
406 
407  return true;
408  }
409  string Description() const { return "BgpSenderPartition::Worker"; }
410 
411 private:
413 };
414 
416  : sender_(sender),
417  index_(index),
418  running_(false),
419  disabled_(false),
420  worker_task_(NULL) {
421 }
422 
424  if (worker_task_) {
426  scheduler->Cancel(worker_task_);
427  }
428  assert(peer_state_imap_.empty());
429  assert(rib_state_imap_.empty());
430 }
431 
433  return sender_->task_id();
434 }
435 
436 //
437 // Add the (RibOut, IPeerUpdate) combo to the BgpSenderPartition.
438 // Find or create the corresponding RibState and PeerState and sets up the
439 // cross-linkage.
440 //
442  CHECK_CONCURRENCY("bgp::PeerMembership");
443 
444  RibState *rs = rib_state_imap_.Locate(ribout);
445  PeerState *ps = peer_state_imap_.Locate(peer);
446  rs->Add(ps);
447  ps->Add(rs);
448 }
449 
450 //
451 // Remove the (RibOut, IPeerUpdate) combo from the BgpSenderPartition.
452 // Decouple cross linkage between the corresponding RibState and PeerState and
453 // get rid of the RibState and/or PeerState if they are no longer needed.
454 //
456  CHECK_CONCURRENCY("bgp::PeerMembership");
457 
458  RibState *rs = rib_state_imap_.Find(ribout);
459  PeerState *ps = peer_state_imap_.Find(peer);
460  assert(rs != NULL);
461  assert(ps != NULL);
462  rs->Remove(ps);
463  ps->Remove(rs);
464  if (rs->empty()) {
465  WorkRibOutInvalidate(ribout);
466  rib_state_imap_.Remove(ribout, rs->index());
467  }
468  if (ps->empty()) {
469  WorkPeerInvalidate(peer);
470  peer_state_imap_.Remove(peer, ps->index());
471  }
472 }
473 
474 //
475 // Create and enqueue new WorkRibOut entry since the RibOut is now
476 // active.
477 //
478 void BgpSenderPartition::RibOutActive(RibOut *ribout, int queue_id) {
479  CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::PeerMembership");
480 
481  WorkRibOutEnqueue(ribout, queue_id);
482 }
483 
484 //
485 // Mark an IPeerUpdate to be send ready.
486 //
488  CHECK_CONCURRENCY("bgp::SendReadyTask");
489 
490  // The IPeerUpdate may not be registered if it has not reached Established
491  // state or it may already have been unregistered by the time we get around
492  // to processing the notification.
493  PeerState *ps = peer_state_imap_.Find(peer);
494  if (!ps)
495  return;
496 
497  // Nothing to do if the IPeerUpdate's already in that state.
498  if (ps->send_ready())
499  return;
500 
501  // Create and enqueue new WorkPeer entry.
502  ps->set_send_ready(true);
503  WorkPeerEnqueue(peer);
504 }
505 
506 //
507 // Return true if the IPeer is send ready.
508 //
510  CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
511 
512  PeerState *ps = peer_state_imap_.Find(peer);
513  return ps->send_ready();
514 }
515 
516 //
517 // Return true if the IPeer is registered.
518 //
520  CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
521 
522  return (peer_state_imap_.Find(peer) != NULL);
523 }
524 
525 //
526 // Return true if the IPeer is in sync.
527 //
529  CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
530 
531  PeerState *ps = peer_state_imap_.Find(peer);
532  return (ps ? ps->in_sync() : false);
533 }
534 
535 //
536 // Create a Worker if warranted and enqueue it to the TaskScheduler.
537 // Assumes that the caller holds the BgpSenderPartition mutex.
538 //
540  if (!running_ && !disabled_) {
541  worker_task_ = new Worker(this);
543  scheduler->Enqueue(worker_task_);
544  running_ = true;
545  }
546 }
547 
548 //
549 // Dequeue the first WorkBase item from the work queue and return an
550 // unique_ptr to it. Clear out Worker related state if the work queue
551 // is empty.
552 //
553 unique_ptr<BgpSenderPartition::WorkBase> BgpSenderPartition::WorkDequeue() {
554  CHECK_CONCURRENCY("bgp::SendUpdate");
555 
556  unique_ptr<WorkBase> wentry;
557  if (work_queue_.empty()) {
558  worker_task_ = NULL;
559  running_ = false;
560  } else {
561  wentry.reset(work_queue_.pop_front().release());
562  }
563  return wentry;
564 }
565 
566 //
567 // Enqueue a WorkBase entry into the the work queue and start a new Worker
568 // task if required.
569 //
571  CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::SendReadyTask",
572  "bgp::PeerMembership");
573 
574  work_queue_.push_back(wentry);
576 }
577 
578 //
579 // Disable or enable the worker.
580 // For unit testing.
581 //
582 void BgpSenderPartition::set_disabled(bool disabled) {
583  disabled_ = disabled;
585 }
586 
587 //
588 // Enqueue a WorkPeer to the work queue.
589 //
591  CHECK_CONCURRENCY("bgp::SendReadyTask");
592 
593  WorkBase *wentry = new WorkPeer(peer);
594  WorkEnqueue(wentry);
595 }
596 
597 //
598 // Invalidate all WorkBases for the given IPeerUpdate.
599 // Used when a IPeerUpdate is removed.
600 //
602  CHECK_CONCURRENCY("bgp::PeerMembership");
603 
604  for (WorkQueue::iterator it = work_queue_.begin();
605  it != work_queue_.end(); ++it) {
606  WorkBase *wentry = it.operator->();
607  if (wentry->type != WorkBase::WPeer)
608  continue;
609  WorkPeer *wpeer = static_cast<WorkPeer *>(wentry);
610  if (wpeer->peer != peer)
611  continue;
612  wpeer->valid = false;
613  }
614 }
615 
616 //
617 // Enqueue a WorkRibOut to the work queue.
618 //
619 void BgpSenderPartition::WorkRibOutEnqueue(RibOut *ribout, int queue_id) {
620  CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::PeerMembership");
621 
622  WorkBase *wentry = new WorkRibOut(ribout, queue_id);
623  WorkEnqueue(wentry);
624 }
625 
626 //
627 // Invalidate all WorkBases for the given RibOut.
628 // Used when a RibOut is removed.
629 //
631  CHECK_CONCURRENCY("bgp::PeerMembership");
632 
633  for (WorkQueue::iterator it = work_queue_.begin();
634  it != work_queue_.end(); ++it) {
635  WorkBase *wentry = it.operator->();
636  if (wentry->type != WorkBase::WRibOut)
637  continue;
638  WorkRibOut *wribout = static_cast<WorkRibOut *>(wentry);
639  if (wribout->ribout != ribout)
640  continue;
641  wribout->valid = false;
642  }
643 }
644 
645 //
646 // Build the RibPeerSet of IPeers for the RibOut that are in sync. Note that
647 // we need to use bit indices that are specific to the RibOut, not the ones
648 // from the BgpSenderPartition.
649 //
651  RibPeerSet *msync) {
652  CHECK_CONCURRENCY("bgp::SendUpdate");
653 
655  it != rs->end(peer_state_imap_); ++it) {
656  PeerState *ps = it.operator->();
657 
658  // If the PeerState is in sync but the IPeerUpdate is not send ready
659  // then update the sync and send ready state in the PeerState. Note
660  // that the RibOut queue for the PeerState will get marked active via
661  // the call the SetQueueActive from UpdateRibOut.
662  if (ps->in_sync()) {
663  if (ps->peer()->send_ready()) {
664  int rix = ribout->GetPeerIndex(ps->peer());
665  msync->set(rix);
666  } else {
667  ps->clear_sync();
668  ps->set_send_ready(false);
669  }
670  }
671  }
672 }
673 
674 //
675 // Take the RibPeerSet of blocked IPeers and update the relevant PeerStates.
676 // Note that bit indices in the RibPeerSet and are specific to the RibOut.
677 //
679  int queue_id, const RibPeerSet &blocked) {
680  CHECK_CONCURRENCY("bgp::SendUpdate");
681 
682  for (size_t bit = blocked.find_first(); bit != RibPeerSet::npos;
683  bit = blocked.find_next(bit)) {
684  IPeerUpdate *peer = ribout->GetPeer(bit);
685  PeerState *ps = peer_state_imap_.Find(peer);
686  ps->SetQueueActive(rs->index(), queue_id);
687  ps->clear_sync();
688  ps->set_send_ready(false);
689  }
690 }
691 
692 //
693 // For unit testing only.
694 // Take the RibPeerSet of blocked IPeers and update the relevant PeerStates.
695 //
697  int queue_id, const RibPeerSet &blocked) {
698  CHECK_CONCURRENCY("bgp::SendUpdate");
699 
700  RibState *rs = rib_state_imap_.Find(ribout);
701  assert(rs);
702  SetSendBlocked(ribout, rs, queue_id, blocked);
703 }
704 
705 //
706 // Concurrency: called from bgp send task.
707 //
708 // Take the RibPeerSet of unsync IPeers and update the relevant PeerStates.
709 // Note that bit indices in the RibPeerSet and are specific to the RibOut.
710 //
712  int queue_id, const RibPeerSet &munsync) {
713  CHECK_CONCURRENCY("bgp::SendUpdate");
714 
715  for (size_t bit = munsync.find_first(); bit != RibPeerSet::npos;
716  bit = munsync.find_next(bit)) {
717  IPeerUpdate *peer = ribout->GetPeer(bit);
718  PeerState *ps = peer_state_imap_.Find(peer);
719  ps->SetQueueActive(rs->index(), queue_id);
720  }
721 }
722 
723 //
724 // Concurrency: called from bgp send task.
725 //
726 // Mark the PeerRibState corresponding to the given IPeerUpdate and RibOut
727 // as active.
728 //
729 // Used by unit test code.
730 //
731 void BgpSenderPartition::SetQueueActive(RibOut *ribout, int queue_id,
732  IPeerUpdate *peer) {
733  CHECK_CONCURRENCY("bgp::SendUpdate");
734 
735  PeerState *ps = peer_state_imap_.Find(peer);
736  RibState *rs = rib_state_imap_.Find(ribout);
737  ps->SetQueueActive(rs->index(), queue_id);
738 }
739 
740 //
741 // Concurrency: called from bgp send task.
742 //
743 // Check if the queue corresponding to IPeerUpdate, Ribout and queue id is
744 // active.
745 //
746 // Used by unit test code.
747 //
748 bool BgpSenderPartition::IsQueueActive(RibOut *ribout, int queue_id,
749  IPeerUpdate *peer) {
750  CHECK_CONCURRENCY("bgp::SendUpdate");
751 
752  PeerState *ps = peer_state_imap_.Find(peer);
753  RibState *rs = rib_state_imap_.Find(ribout);
754  return ps->IsQueueActive(rs->index(), queue_id);
755 }
756 
757 //
758 // Concurrency: called from bgp send task.
759 //
760 // Mark all the RibStates for the given peer and queue id as being in sync
761 // and trigger a tail dequeue.
762 //
764  CHECK_CONCURRENCY("bgp::SendUpdate");
765 
767  it != ps->end(rib_state_imap_); ++it) {
768  RibState *rs = it.rib_state();
769  if (!rs->QueueSync(queue_id)) {
770  RibOut *ribout = it.operator->();
771  RibOutActive(ribout, queue_id);
772  rs->SetQueueSync(queue_id);
773  }
774  }
775 }
776 
777 //
778 // Drain the queue until there are no more updates or all the members become
779 // blocked.
780 //
781 void BgpSenderPartition::UpdateRibOut(RibOut *ribout, int queue_id) {
782  CHECK_CONCURRENCY("bgp::SendUpdate");
783 
784  RibOutUpdates *updates = ribout->updates(index_);
785  RibState *rs = rib_state_imap_.Find(ribout);
786  RibPeerSet msync;
787 
788  // Convert group in-sync list to rib specific bitset.
789  BuildSyncBitSet(ribout, rs, &msync);
790 
791  // Drain the queue till we can do no more.
792  RibPeerSet blocked, munsync;
793  bool done = updates->TailDequeue(queue_id, msync, &blocked, &munsync);
794  assert(msync.Contains(blocked));
795 
796  // Mark peers as send blocked.
797  SetSendBlocked(ribout, rs, queue_id, blocked);
798 
799  // Set the queue to be active for any unsync peers. If we don't do this,
800  // we will forget to mark the (RibOut,QueueId) as active for these peers
801  // since the blocked RibPeerSet does not contain peers that are already
802  // out of sync. Note that the unsync peers would have been split from
803  // the tail marker in TailDequeue.
804  SetQueueActive(ribout, rs, queue_id, munsync);
805 
806  // If all peers are blocked, mark the queue as unsync in the RibState. We
807  // will trigger tail dequeue for the (RibOut,QueueId) when any peer that
808  // is interested in the RibOut becomes in sync.
809  if (!done)
810  rs->SetQueueUnsync(queue_id);
811 }
812 
813 //
814 // Go through all RibOuts for the IPeerUpdate and drain the given queue till it
815 // is up-to date or it becomes blocked. If it's blocked, select the next RibOut
816 // to be processed when the IPeerUpdate becomes send ready.
817 //
818 // Return false if the IPeerUpdate got blocked.
819 //
821  int queue_id) {
822  CHECK_CONCURRENCY("bgp::SendUpdate");
823 
825  it != ps->circular_end(rib_state_imap_); ++it) {
826  // Skip if this queue is not active in the PeerRibState.
827  if (!BitIsSet(it.peer_rib_state().qactive, queue_id))
828  continue;
829 
830  // Drain the queue till we can do no more.
831  RibOut *ribout = it.operator->();
832  RibOutUpdates *updates = ribout->updates(index_);
833  RibPeerSet blocked;
834  bool done = updates->PeerDequeue(queue_id, peer, &blocked);
835 
836  // Process blocked mask.
837  RibState *rs = it.rib_state();
838  SetSendBlocked(ribout, rs, queue_id, blocked);
839 
840  // If the peer is still send_ready, mark the queue as inactive for
841  // the peer. Need to check send_ready because the return value of
842  // PeerDequeue only tells that *some* peer was merged with the tail
843  // marker.
844  // If the peer got blocked, remember where to start next time and
845  // stop processing. We don't want to continue processing for other
846  // merged peers if the lead peer is blocked. Processing for other
847  // peers will continue when their own WorkPeer items are processed.
848  if (ps->send_ready()) {
849  assert(done);
850  ps->SetQueueInactive(rs->index(), queue_id);
851  } else {
852  ps->SetIteratorStart(it.index());
853  return false;
854  }
855  }
856 
857  return true;
858 }
859 
860 //
861 // Drain the queue of all updates for this IPeerUpdate, until it is up-to date
862 // or it becomes blocked.
863 //
865  CHECK_CONCURRENCY("bgp::SendUpdate");
866 
867  // Bail if the PeerState is not send ready.
868  PeerState *ps = peer_state_imap_.Find(peer);
869  if (!ps->send_ready()) {
870  return;
871  }
872 
873  // Update the PeerState and bail if the IPeerUpdate is not send ready.
874  // This happens if the IPeerUpdate gets blocked while processing some
875  // other partition.
876  if (!peer->send_ready()) {
877  ps->set_send_ready(false);
878  return;
879  }
880 
881  // Go through all queues and drain them if there's anything on them.
882  for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
883  if (ps->QueueCount(queue_id) == 0) {
884  continue;
885  }
886  if (!UpdatePeerQueue(peer, ps, queue_id)) {
887  assert(!ps->send_ready());
888  return;
889  }
890  }
891 
892  // Checking the return value of UpdatePeerQueue above is not sufficient as
893  // that only tells us that *some* peer(s) got merged with the tail marker.
894  // Need to make sure that the IPeerUpdate that we are processing is still
895  // send ready.
896  if (!ps->send_ready()) {
897  return;
898  }
899 
900  // Mark the peer as being in sync across all tables.
901  ps->SetSync();
902 
903  // Mark all RibStates for the peer as being in sync. This triggers a tail
904  // dequeue for the corresponding (RibOut, QueueId) if necessary. This in
905  // turn ensures that we do not get stuck in the case where all peers get
906  // blocked and then get back in sync.
907  for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
908  SetQueueSync(ps, queue_id);
909  }
910 }
911 
912 //
913 // Check invariants for the BgpSenderPartition.
914 //
916  int grp_peer_count = 0;
917  int peer_grp_count = 0;
918  for (size_t i = 0; i < rib_state_imap_.size(); i++) {
919  if (!rib_state_imap_.bits().test(i)) {
920  continue;
921  }
922  RibState *rs = rib_state_imap_.At(i);
923  assert(rs != NULL);
924  CHECK_INVARIANT(rs->index() == i);
926  it != rs->end(peer_state_imap_); ++it) {
927  PeerState *ps = it.operator->();
928  CHECK_INVARIANT(ps->IsMember(i));
929  grp_peer_count++;
930  }
931  }
932  for (size_t i = 0; i < peer_state_imap_.size(); i++) {
933  if (!peer_state_imap_.bits().test(i)) {
934  continue;
935  }
936  PeerState *ps = peer_state_imap_.At(i);
937  assert(ps != NULL);
938  CHECK_INVARIANT(ps->index() == i);
939  if (!ps->CheckInvariants()) {
940  return false;
941  }
943  it != ps->end(rib_state_imap_); ++it) {
944  RibState *rs = it.rib_state();
945  CHECK_INVARIANT(rs->peer_set().test(i));
946  peer_grp_count++;
947  }
948  }
949 
950  CHECK_INVARIANT(grp_peer_count == peer_grp_count);
951  return true;
952 }
953 
954 //
955 // Constructor for BgpUpdateSender.
956 // Initialize send ready WorkQueue and allocate BgpSenderPartitions.
957 //
959  : server_(server),
960  task_id_(TaskScheduler::GetInstance()->GetTaskId("bgp::SendUpdate")),
961  send_ready_queue_(
962  TaskScheduler::GetInstance()->GetTaskId("bgp::SendReadyTask"), 0,
963  boost::bind(&BgpUpdateSender::SendReadyCallback, this, _1)) {
964  for (int idx = 0; idx < DB::PartitionCount(); ++idx) {
965  partitions_.push_back(new BgpSenderPartition(this, idx));
966  }
967 }
968 
969 //
970 // Destructor for BgpUpdateSender.
971 // Shutdown the WorkQueue and delete all BgpSenderPartitions.
972 //
976 }
977 
978 //
979 // Handle the join of an IPeerUpdate to a RibOut.
980 //
982  CHECK_CONCURRENCY("bgp::PeerMembership");
983 
984  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
985  partition->Add(ribout, peer);
986  }
987 }
988 
989 //
990 // Handle the leave of an IPeerUpdate from a RibOut.
991 //
993  CHECK_CONCURRENCY("bgp::PeerMembership");
994 
995  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
996  partition->Remove(ribout, peer);
997  }
998 }
999 
1000 //
1001 // Inform the specified BgpSenderPartition that it needs to schedule a tail
1002 // dequeue for the given RibOut queue.
1003 //
1004 void BgpUpdateSender::RibOutActive(int index, RibOut *ribout, int queue_id) {
1005  CHECK_CONCURRENCY("db::DBTable", "bgp::PeerMembership");
1006 
1007  partitions_[index]->RibOutActive(ribout, queue_id);
1008 }
1009 
1010 //
1011 // Concurrency: called from arbitrary task.
1012 //
1013 // Enqueue the IPeerUpdate to the send ready processing work queue.
1014 // The callback is invoked in the context of bgp::SendReadyTask.
1015 //
1017  send_ready_queue_.Enqueue(peer);
1018 }
1019 
1020 //
1021 // Return true if the IPeer is registered.
1022 //
1024  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1025  if (partition->PeerIsRegistered(peer))
1026  return true;
1027  }
1028  return false;
1029 }
1030 
1031 //
1032 // Return true if the IPeer is in sync.
1033 //
1035  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1036  if (!partition->PeerInSync(peer))
1037  return false;
1038  }
1039  return true;
1040 }
1041 
1042 //
1043 // Callback to handle send ready notification for IPeerUpdate. Processing it
1044 // in the context of bgp::SendeReadyTask ensures that there are no concurrency
1045 // issues w.r.t. the BgpSenderPartition working on the IPeerUpdate while we are
1046 // processing the notification.
1047 //
1049  CHECK_CONCURRENCY("bgp::SendReadyTask");
1050 
1051  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1052  partition->PeerSendReady(peer);
1053  }
1054  return true;
1055 }
1056 
1057 //
1058 // Check invariants for the BgpUpdateSender.
1059 //
1061  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1062  if (!partition->CheckInvariants())
1063  return false;
1064  }
1065  return true;
1066 }
1067 
1068 //
1069 // Disable all BgpSenderPartitions.
1070 //
1072  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1073  partition->set_disabled(true);
1074  }
1075 }
1076 
1077 //
1078 // Enable all BgpSenderPartitions.
1079 //
1081  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1082  partition->set_disabled(false);
1083  }
1084 }
bool equal(const circular_iterator &rhs) const
circular_iterator(const RibStateMap &indexmap, Map *map, int start, bool is_valid)
bool equal(const iterator &rhs) const
iterator(const RibStateMap &indexmap, Map *map, size_t index)
const PeerRibState & peer_rib_state() const
iterator end(const RibStateMap &indexmap)
void SetQueueInactive(size_t rib_index, int queue_id)
bool IsQueueActive(size_t rib_index, int queue_id)
iterator begin(const RibStateMap &indexmap)
void SetQueueActive(size_t rib_index, int queue_id)
circular_iterator circular_begin(const RibStateMap &indexmap)
map< size_t, PeerRibState > Map
circular_iterator circular_end(const RibStateMap &indexmap)
bool IsMember(size_t index) const
bool equal(const iterator &rhs) const
iterator(const PeerStateMap &indexmap, const BitSet &set, size_t index)
iterator begin(const PeerStateMap &indexmap)
iterator end(const PeerStateMap &indexmap)
const BitSet & peer_set() const
Worker(BgpSenderPartition *partition)
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
BgpSenderPartition * partition_
string Description() const
Gives a description of the task.
bool CheckInvariants() const
std::unique_ptr< WorkBase > WorkDequeue()
void RibOutActive(RibOut *ribout, int queue_id)
void SetSendBlocked(RibOut *ribout, int queue_id, const RibPeerSet &blocked)
void Remove(RibOut *ribout, IPeerUpdate *peer)
void Add(RibOut *ribout, IPeerUpdate *peer)
void WorkRibOutInvalidate(RibOut *ribout)
bool PeerIsSendReady(IPeerUpdate *peer) const
void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync)
void WorkRibOutEnqueue(RibOut *ribout, int queue_id)
void SetQueueSync(PeerState *ps, int queue_id)
void set_disabled(bool disabled)
bool IsQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer)
bool PeerIsRegistered(IPeerUpdate *peer) const
RibStateMap rib_state_imap_
void WorkPeerEnqueue(IPeerUpdate *peer)
BgpUpdateSender * sender_
void UpdatePeer(IPeerUpdate *peer)
void PeerSendReady(IPeerUpdate *peer)
void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id, const RibPeerSet &munsync)
bool PeerInSync(IPeerUpdate *peer) const
bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id)
PeerStateMap peer_state_imap_
BgpSenderPartition(BgpUpdateSender *sender, int index)
void WorkEnqueue(WorkBase *wentry)
void UpdateRibOut(RibOut *ribout, int queue_id)
void WorkPeerInvalidate(IPeerUpdate *peer)
bool PeerInSync(IPeerUpdate *peer) const
void Leave(RibOut *ribout, IPeerUpdate *peer)
bool PeerIsRegistered(IPeerUpdate *peer) const
BgpSenderPartition * partition(int index)
bool CheckInvariants() const
void Join(RibOut *ribout, IPeerUpdate *peer)
bool SendReadyCallback(IPeerUpdate *peer)
void RibOutActive(int index, RibOut *ribout, int queue_id)
BgpUpdateSender(BgpServer *server)
WorkQueue< IPeerUpdate * > send_ready_queue_
std::vector< BgpSenderPartition * > partitions_
void PeerSendReady(IPeerUpdate *peer)
Definition: bitset.h:17
static const size_t npos
Definition: bitset.h:19
size_t find_first() const
Definition: bitset.cc:242
size_t find_next(size_t pos) const
Definition: bitset.cc:255
bool Contains(const BitSet &rhs) const
Definition: bitset.cc:536
BitSet & set(size_t pos)
Definition: bitset.cc:125
bool test(size_t pos) const
Definition: bitset.cc:146
static int PartitionCount()
Definition: db.cc:32
virtual bool send_ready() const
Definition: ipeer.h:23
ValueType * At(int index) const
Definition: index_map.h:31
bool empty() const
Definition: index_map.h:102
const BitsetType & bits() const
Definition: index_map.h:110
size_t size() const
Definition: index_map.h:100
ValueType * Locate(const KeyType &key)
Definition: index_map.h:91
void Remove(const KeyType &key, int index, bool clear_bit=true)
Definition: index_map.h:69
ValueType * Find(const KeyType &key) const
Definition: index_map.h:34
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer, RibPeerSet *blocked)
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync, RibPeerSet *blocked, RibPeerSet *unsync)
RibOutUpdates * updates(int idx)
Definition: bgp_ribout.h:316
IPeerUpdate * GetPeer(int index) const
Definition: bgp_ribout.cc:527
int GetPeerIndex(IPeerUpdate *peer) const
Definition: bgp_ribout.cc:538
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Definition: task.cc:704
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
static bool disabled_
Definition: logging.cc:19
void init()
Definition: bgp_log.cc:37
#define CHECK_CONCURRENCY(...)
void SetBit(IntType &value, size_t bit)
Definition: util.h:41
void ClearBit(IntType &value, size_t bit)
Definition: util.h:46
void STLDeleteValues(Container *container)
Definition: util.h:101
bool BitIsSet(IntType value, size_t bit)
Definition: util.h:36
#define CHECK_INVARIANT(Cond)
Definition: util.h:29