OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
bgp_update_sender.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 
7 #include <boost/bind.hpp>
8 #include <boost/foreach.hpp>
9 
10 #include <map>
11 #include <string>
12 
13 #include "base/task_annotations.h"
14 #include "bgp/ipeer.h"
15 #include "bgp/bgp_ribout.h"
16 #include "bgp/bgp_ribout_updates.h"
17 #include "db/db.h"
18 
19 using std::unique_ptr;
20 using std::make_pair;
21 using std::map;
22 using std::string;
23 using std::vector;
24 
25 //
26 // This struct represents RibOut specific state for a PeerState. There's one
27 // instance of this for each RibOut that an IPeerUpdate has joined.
28 //
29 // The PeerRibState contains a bit mask to keep track of the QueueIds that are
30 // currently active for the RibOut for the IPeerUpdate.
31 //
33  PeerRibState() : qactive(0) { }
34  uint8_t qactive;
35 };
36 
37 //
38 // This nested class represents IPeerUpdate related state that's specific to
39 // the BgpSenderPartition.
40 //
41 // A PeerState contains a Map of the index for a RibState to a PeerRibState.
42 // Each entry in the map logically represents the state of the peer for the
43 // ribout.
44 //
45 // The Map is used in conjunction with the RibStateMap in BgpSenderPartition
46 // to implement regular and circular iterator classes that are used to walk
47 // through all the RibState entries for a peer.
48 //
49 // A PeerState maintains the in_sync and send_ready state for the IPeerUpdate.
50 //
51 // The PeerState is considered to be send_ready when the underlying socket is
52 // is writable. Note that the send_ready state in the PeerState may be out of
53 // date with the actual socket state because the socket could have got blocked
54 // when writing from another partition. Hence IPeerUpdate::send_ready() is the
55 // more authoritative source.
56 //
57 // The PeerState is considered to be in_sync if it's send_ready and the marker
58 // IPeerUpdate the peer has merged with the tail marker for all QueueIds in
59 // all RiBOuts for which the IPeerUpdate is subscribed.
60 //
61 // The PeerState keeps count of the number of active RibOuts for each QueueId.
62 // A (RibOut, QueueId) pair is considered to be active if the PeerState isn't
63 // send_ready and there's RouteUpdates for the pair.
64 //
66 public:
67  typedef map<size_t, PeerRibState> Map;
68 
69  class iterator : public boost::iterator_facade<
70  iterator, RibOut, boost::forward_traversal_tag> {
71  public:
72  explicit iterator(const RibStateMap &indexmap, Map *map, size_t index)
73  : indexmap_(indexmap), map_(map), index_(index) {
74  }
75  size_t index() const { return index_; }
77  const PeerRibState &peer_rib_state() const { return (*map_)[index_]; }
78 
79  private:
81  void increment() {
82  Map::const_iterator loc = map_->upper_bound(index_);
83  if (loc == map_->end()) {
84  index_ = -1;
85  } else {
86  index_ = loc->first;
87  }
88  }
89  bool equal(const iterator &rhs) const {
90  return index_ == rhs.index_;
91  }
92  RibOut &dereference() const;
93 
96  size_t index_;
97  };
98 
99  class circular_iterator : public boost::iterator_facade<
100  circular_iterator, RibOut, boost::forward_traversal_tag> {
101  public:
102  explicit circular_iterator(const RibStateMap &indexmap, Map *map,
103  int start, bool is_valid)
104  : indexmap_(indexmap), map_(map), index_(-1), match_(true) {
105  if (map_->empty()) {
106  return;
107  }
108  Map::const_iterator loc = map_->lower_bound(start);
109  if (loc == map_->end()) {
110  loc = map_->begin();
111  }
112  index_ = loc->first;
113  if (is_valid) match_ = false;
114  }
115  int index() const { return index_; }
117  const PeerRibState &peer_rib_state() const { return (*map_)[index_]; }
118 
119  private:
121  void increment() {
122  match_ = true;
123  assert(!map_->empty());
124  Map::const_iterator loc = map_->upper_bound(index_);
125  if (loc == map_->end()) {
126  loc = map_->begin();
127  }
128  index_ = loc->first;
129  }
130  bool equal(const circular_iterator &rhs) const {
131  return ((match_ == rhs.match_) && (index_ == rhs.index_));
132  }
133  RibOut &dereference() const;
134 
137  int index_;
138  bool match_;
139  };
140 
142  : key_(peer), index_(-1),
143  qactive_cnt_(RibOutUpdates::QCOUNT),
144  in_sync_(true), rib_iterator_(BitSet::npos) {
145  send_ready_ = true;
146  for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
147  qactive_cnt_[i] = 0;
148  }
149  }
150 
151  void Add(RibState *rs);
152 
153  void Remove(RibState *rs);
154 
155  bool IsMember(size_t index) const {
156  return rib_bitset_.test(index);
157  }
158 
159  iterator begin(const RibStateMap &indexmap) {
160  Map::const_iterator it = rib_set_.begin();
161  size_t index = (it != rib_set_.end() ? it->first : -1);
162  return iterator(indexmap, &rib_set_, index);
163  }
164  iterator end(const RibStateMap &indexmap) {
165  return iterator(indexmap, &rib_set_, -1);
166  }
167 
169  return circular_iterator(indexmap, &rib_set_, rib_iterator_, true);
170  }
172  return circular_iterator(indexmap, &rib_set_, rib_iterator_, false);
173  }
174 
175  void SetIteratorStart(size_t start) { rib_iterator_ = start; }
176 
177  void SetQueueActive(size_t rib_index, int queue_id) {
178  CHECK_CONCURRENCY("bgp::SendUpdate");
179  Map::iterator loc = rib_set_.find(rib_index);
180  assert(loc != rib_set_.end());
181  if (!BitIsSet(loc->second.qactive, queue_id)) {
182  SetBit(loc->second.qactive, queue_id);
183  qactive_cnt_[queue_id]++;
184  }
185  }
186 
187  void SetQueueInactive(size_t rib_index, int queue_id) {
188  CHECK_CONCURRENCY("bgp::SendUpdate", "bgp::PeerMembership");
189  Map::iterator loc = rib_set_.find(rib_index);
190  assert(loc != rib_set_.end());
191  if (BitIsSet(loc->second.qactive, queue_id)) {
192  ClearBit(loc->second.qactive, queue_id);
193  qactive_cnt_[queue_id]--;
194  }
195  }
196 
197  bool IsQueueActive(size_t rib_index, int queue_id) {
198  CHECK_CONCURRENCY("bgp::SendUpdate");
199  Map::iterator loc = rib_set_.find(rib_index);
200  assert(loc != rib_set_.end());
201  return BitIsSet(loc->second.qactive, queue_id);
202  }
203 
204  int QueueCount(int queue_id) { return qactive_cnt_[queue_id]; }
205 
206  IPeerUpdate *peer() const { return key_; }
207  void set_index(size_t index) { index_ = index; }
208  size_t index() const { return index_; }
209 
210  bool in_sync() const { return in_sync_; }
211  void clear_sync() { in_sync_ = false; }
212  void SetSync();
213 
214  bool send_ready() const { return send_ready_; }
215  void set_send_ready(bool toggle) { send_ready_ = toggle; }
216 
217  bool empty() const { return rib_set_.empty(); }
218 
219  bool CheckInvariants() const {
220  for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
221  CHECK_INVARIANT(qactive_cnt_[i] <= (int) rib_set_.size());
222  }
223  return true;
224  }
225 
226 private:
228  size_t index_; // assigned from PeerStateMap
229  Map rib_set_; // list of RibOuts advertised by the peer.
230  BitSet rib_bitset_; // bitset of RibOuts advertised by the peer
231  vector<int> qactive_cnt_;
232  bool in_sync_; // whether the peer may dequeue tail markers.
233  tbb::atomic<bool> send_ready_; // whether the peer may send updates.
234  size_t rib_iterator_; // index of last processed rib.
235 
237 };
238 
239 //
240 // This nested class represents RibOut related state that's specific to the
241 // BgpSenderPartition.
242 //
243 // A RibState contains a BitSet of all the peers that are advertising the
244 // RibOut associated with the RibState.
245 //
246 // The BitSet is used in conjunction with PeerStateMap in BgpSenderPartition
247 // to implement an iterator that is used to walk through all the PeerState
248 // entries for the ribout.
249 //
251 public:
252  class iterator : public boost::iterator_facade<
253  iterator, PeerState, boost::forward_traversal_tag> {
254  public:
255  explicit iterator(const PeerStateMap &indexmap,
256  const BitSet &set, size_t index)
257  : indexmap_(indexmap), set_(set), index_(index) {
258  }
259  size_t index() const { return index_; }
260 
261  private:
263  void increment() {
265  }
266  bool equal(const iterator &rhs) const {
267  return index_ == rhs.index_;
268  }
270  return *indexmap_.At(index_);
271  }
273  const BitSet &set_;
274  size_t index_;
275  };
276 
277  explicit RibState(RibOut *ribout)
278  : key_(ribout), index_(-1), in_sync_(RibOutUpdates::QCOUNT, true) {
279  }
280 
281  void Add(PeerState *ps);
282  void Remove(PeerState *ps);
283 
284  bool QueueSync(int queue_id);
285  void SetQueueSync(int queue_id);
286  void SetQueueUnsync(int queue_id);
287 
288  RibOut *ribout() { return key_; }
289 
290  iterator begin(const PeerStateMap &indexmap) {
291  return iterator(indexmap, peer_set_, peer_set_.find_first());
292  }
293 
294  iterator end(const PeerStateMap &indexmap) {
295  return iterator(indexmap, peer_set_, BitSet::npos);
296  }
297 
298  void set_index(size_t index) { index_ = index; }
299  size_t index() const { return index_; }
300 
301  bool empty() const { return peer_set_.none(); }
302 
303  const BitSet &peer_set() const { return peer_set_; }
304 
305 private:
307  size_t index_;
309  vector<bool> in_sync_;
310 
312 };
313 
314 
316  CHECK_CONCURRENCY("bgp::PeerMembership");
317  peer_set_.set(ps->index());
318 }
319 
321  CHECK_CONCURRENCY("bgp::PeerMembership");
322  peer_set_.reset(ps->index());
323 }
324 
326  CHECK_CONCURRENCY("bgp::SendUpdate");
327  return (in_sync_[queue_id]);
328 }
329 
331  CHECK_CONCURRENCY("bgp::SendUpdate");
332  in_sync_[queue_id] = true;
333 }
334 
336  CHECK_CONCURRENCY("bgp::SendUpdate");
337  in_sync_[queue_id] = false;
338 }
339 
341  CHECK_CONCURRENCY("bgp::PeerMembership");
343  rib_set_.insert(make_pair(rs->index(), init));
344  rib_bitset_.set(rs->index());
345 }
346 
348  CHECK_CONCURRENCY("bgp::PeerMembership");
349  for (int queue_id = 0; queue_id < RibOutUpdates::QCOUNT; queue_id++) {
350  SetQueueInactive(rs->index(), queue_id);
351  }
352  rib_set_.erase(rs->index());
353  rib_bitset_.reset(rs->index());
354 }
355 
357  CHECK_CONCURRENCY("bgp::SendUpdate");
358  for (Map::iterator it = rib_set_.begin(); it != rib_set_.end(); ++it) {
359  assert(it->second.qactive == 0);
360  }
361  for (int i = 0; i < RibOutUpdates::QCOUNT; i++) {
362  assert(qactive_cnt_[i] == 0);
363  }
364  in_sync_ = true;
365 }
366 
368  return *indexmap_.At(index_)->ribout();
369 }
370 
372  return *indexmap_.At(index_)->ribout();
373 }
374 
376 public:
377  explicit Worker(BgpSenderPartition *partition)
378  : Task(partition->task_id(), partition->index()),
379  partition_(partition) {
380  }
381 
382  virtual bool Run() {
383  CHECK_CONCURRENCY("bgp::SendUpdate");
384 
385  while (true) {
386  unique_ptr<WorkBase> wentry = partition_->WorkDequeue();
387  if (!wentry.get())
388  break;
389  if (!wentry->valid)
390  continue;
391  switch (wentry->type) {
392  case WorkBase::WRibOut: {
393  WorkRibOut *workrib = static_cast<WorkRibOut *>(wentry.get());
394  partition_->UpdateRibOut(workrib->ribout, workrib->queue_id);
395  break;
396  }
397  case WorkBase::WPeer: {
398  WorkPeer *workpeer = static_cast<WorkPeer *>(wentry.get());
399  partition_->UpdatePeer(workpeer->peer);
400  break;
401  }
402  }
403  }
404 
405  return true;
406  }
407  string Description() const { return "BgpSenderPartition::Worker"; }
408 
409 private:
411 };
412 
414  : sender_(sender),
415  index_(index),
416  running_(false),
417  disabled_(false),
418  worker_task_(NULL) {
419 }
420 
422  if (worker_task_) {
424  scheduler->Cancel(worker_task_);
425  }
426  assert(peer_state_imap_.empty());
427  assert(rib_state_imap_.empty());
428 }
429 
431  return sender_->task_id();
432 }
433 
434 //
435 // Add the (RibOut, IPeerUpdate) combo to the BgpSenderPartition.
436 // Find or create the corresponding RibState and PeerState and sets up the
437 // cross-linkage.
438 //
440  CHECK_CONCURRENCY("bgp::PeerMembership");
441 
442  RibState *rs = rib_state_imap_.Locate(ribout);
443  PeerState *ps = peer_state_imap_.Locate(peer);
444  rs->Add(ps);
445  ps->Add(rs);
446 }
447 
448 //
449 // Remove the (RibOut, IPeerUpdate) combo from the BgpSenderPartition.
450 // Decouple cross linkage between the corresponding RibState and PeerState and
451 // get rid of the RibState and/or PeerState if they are no longer needed.
452 //
454  CHECK_CONCURRENCY("bgp::PeerMembership");
455 
456  RibState *rs = rib_state_imap_.Find(ribout);
457  PeerState *ps = peer_state_imap_.Find(peer);
458  assert(rs != NULL);
459  assert(ps != NULL);
460  rs->Remove(ps);
461  ps->Remove(rs);
462  if (rs->empty()) {
463  WorkRibOutInvalidate(ribout);
464  rib_state_imap_.Remove(ribout, rs->index());
465  }
466  if (ps->empty()) {
467  WorkPeerInvalidate(peer);
468  peer_state_imap_.Remove(peer, ps->index());
469  }
470 }
471 
472 //
473 // Create and enqueue new WorkRibOut entry since the RibOut is now
474 // active.
475 //
476 void BgpSenderPartition::RibOutActive(RibOut *ribout, int queue_id) {
477  CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::PeerMembership");
478 
479  WorkRibOutEnqueue(ribout, queue_id);
480 }
481 
482 //
483 // Mark an IPeerUpdate to be send ready.
484 //
486  CHECK_CONCURRENCY("bgp::SendReadyTask");
487 
488  // The IPeerUpdate may not be registered if it has not reached Established
489  // state or it may already have been unregistered by the time we get around
490  // to processing the notification.
491  PeerState *ps = peer_state_imap_.Find(peer);
492  if (!ps)
493  return;
494 
495  // Nothing to do if the IPeerUpdate's already in that state.
496  if (ps->send_ready())
497  return;
498 
499  // Create and enqueue new WorkPeer entry.
500  ps->set_send_ready(true);
501  WorkPeerEnqueue(peer);
502 }
503 
504 //
505 // Return true if the IPeer is send ready.
506 //
508  CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
509 
510  PeerState *ps = peer_state_imap_.Find(peer);
511  return ps->send_ready();
512 }
513 
514 //
515 // Return true if the IPeer is registered.
516 //
518  CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
519 
520  return (peer_state_imap_.Find(peer) != NULL);
521 }
522 
523 //
524 // Return true if the IPeer is in sync.
525 //
527  CHECK_CONCURRENCY("bgp::PeerMembership", "bgp::ShowCommand");
528 
529  PeerState *ps = peer_state_imap_.Find(peer);
530  return (ps ? ps->in_sync() : false);
531 }
532 
533 //
534 // Create a Worker if warranted and enqueue it to the TaskScheduler.
535 // Assumes that the caller holds the BgpSenderPartition mutex.
536 //
538  if (!running_ && !disabled_) {
539  worker_task_ = new Worker(this);
541  scheduler->Enqueue(worker_task_);
542  running_ = true;
543  }
544 }
545 
546 //
547 // Dequeue the first WorkBase item from the work queue and return an
548 // unique_ptr to it. Clear out Worker related state if the work queue
549 // is empty.
550 //
551 unique_ptr<BgpSenderPartition::WorkBase> BgpSenderPartition::WorkDequeue() {
552  CHECK_CONCURRENCY("bgp::SendUpdate");
553 
554  unique_ptr<WorkBase> wentry;
555  if (work_queue_.empty()) {
556  worker_task_ = NULL;
557  running_ = false;
558  } else {
559  wentry.reset(work_queue_.pop_front().release());
560  }
561  return wentry;
562 }
563 
564 //
565 // Enqueue a WorkBase entry into the the work queue and start a new Worker
566 // task if required.
567 //
569  CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::SendReadyTask",
570  "bgp::PeerMembership");
571 
572  work_queue_.push_back(wentry);
574 }
575 
576 //
577 // Disable or enable the worker.
578 // For unit testing.
579 //
580 void BgpSenderPartition::set_disabled(bool disabled) {
581  disabled_ = disabled;
583 }
584 
585 //
586 // Enqueue a WorkPeer to the work queue.
587 //
589  CHECK_CONCURRENCY("bgp::SendReadyTask");
590 
591  WorkBase *wentry = new WorkPeer(peer);
592  WorkEnqueue(wentry);
593 }
594 
595 //
596 // Invalidate all WorkBases for the given IPeerUpdate.
597 // Used when a IPeerUpdate is removed.
598 //
600  CHECK_CONCURRENCY("bgp::PeerMembership");
601 
602  for (WorkQueue::iterator it = work_queue_.begin();
603  it != work_queue_.end(); ++it) {
604  WorkBase *wentry = it.operator->();
605  if (wentry->type != WorkBase::WPeer)
606  continue;
607  WorkPeer *wpeer = static_cast<WorkPeer *>(wentry);
608  if (wpeer->peer != peer)
609  continue;
610  wpeer->valid = false;
611  }
612 }
613 
614 //
615 // Enqueue a WorkRibOut to the work queue.
616 //
617 void BgpSenderPartition::WorkRibOutEnqueue(RibOut *ribout, int queue_id) {
618  CHECK_CONCURRENCY("db::DBTable", "bgp::SendUpdate", "bgp::PeerMembership");
619 
620  WorkBase *wentry = new WorkRibOut(ribout, queue_id);
621  WorkEnqueue(wentry);
622 }
623 
624 //
625 // Invalidate all WorkBases for the given RibOut.
626 // Used when a RibOut is removed.
627 //
629  CHECK_CONCURRENCY("bgp::PeerMembership");
630 
631  for (WorkQueue::iterator it = work_queue_.begin();
632  it != work_queue_.end(); ++it) {
633  WorkBase *wentry = it.operator->();
634  if (wentry->type != WorkBase::WRibOut)
635  continue;
636  WorkRibOut *wribout = static_cast<WorkRibOut *>(wentry);
637  if (wribout->ribout != ribout)
638  continue;
639  wribout->valid = false;
640  }
641 }
642 
643 //
644 // Build the RibPeerSet of IPeers for the RibOut that are in sync. Note that
645 // we need to use bit indices that are specific to the RibOut, not the ones
646 // from the BgpSenderPartition.
647 //
649  RibPeerSet *msync) {
650  CHECK_CONCURRENCY("bgp::SendUpdate");
651 
653  it != rs->end(peer_state_imap_); ++it) {
654  PeerState *ps = it.operator->();
655 
656  // If the PeerState is in sync but the IPeerUpdate is not send ready
657  // then update the sync and send ready state in the PeerState. Note
658  // that the RibOut queue for the PeerState will get marked active via
659  // the call the SetQueueActive from UpdateRibOut.
660  if (ps->in_sync()) {
661  if (ps->peer()->send_ready()) {
662  int rix = ribout->GetPeerIndex(ps->peer());
663  msync->set(rix);
664  } else {
665  ps->clear_sync();
666  ps->set_send_ready(false);
667  }
668  }
669  }
670 }
671 
672 //
673 // Take the RibPeerSet of blocked IPeers and update the relevant PeerStates.
674 // Note that bit indices in the RibPeerSet and are specific to the RibOut.
675 //
677  int queue_id, const RibPeerSet &blocked) {
678  CHECK_CONCURRENCY("bgp::SendUpdate");
679 
680  for (size_t bit = blocked.find_first(); bit != RibPeerSet::npos;
681  bit = blocked.find_next(bit)) {
682  IPeerUpdate *peer = ribout->GetPeer(bit);
683  PeerState *ps = peer_state_imap_.Find(peer);
684  ps->SetQueueActive(rs->index(), queue_id);
685  ps->clear_sync();
686  ps->set_send_ready(false);
687  }
688 }
689 
690 //
691 // For unit testing only.
692 // Take the RibPeerSet of blocked IPeers and update the relevant PeerStates.
693 //
695  int queue_id, const RibPeerSet &blocked) {
696  CHECK_CONCURRENCY("bgp::SendUpdate");
697 
698  RibState *rs = rib_state_imap_.Find(ribout);
699  assert(rs);
700  SetSendBlocked(ribout, rs, queue_id, blocked);
701 }
702 
703 //
704 // Concurrency: called from bgp send task.
705 //
706 // Take the RibPeerSet of unsync IPeers and update the relevant PeerStates.
707 // Note that bit indices in the RibPeerSet and are specific to the RibOut.
708 //
710  int queue_id, const RibPeerSet &munsync) {
711  CHECK_CONCURRENCY("bgp::SendUpdate");
712 
713  for (size_t bit = munsync.find_first(); bit != RibPeerSet::npos;
714  bit = munsync.find_next(bit)) {
715  IPeerUpdate *peer = ribout->GetPeer(bit);
716  PeerState *ps = peer_state_imap_.Find(peer);
717  ps->SetQueueActive(rs->index(), queue_id);
718  }
719 }
720 
721 //
722 // Concurrency: called from bgp send task.
723 //
724 // Mark the PeerRibState corresponding to the given IPeerUpdate and RibOut
725 // as active.
726 //
727 // Used by unit test code.
728 //
729 void BgpSenderPartition::SetQueueActive(RibOut *ribout, int queue_id,
730  IPeerUpdate *peer) {
731  CHECK_CONCURRENCY("bgp::SendUpdate");
732 
733  PeerState *ps = peer_state_imap_.Find(peer);
734  RibState *rs = rib_state_imap_.Find(ribout);
735  ps->SetQueueActive(rs->index(), queue_id);
736 }
737 
738 //
739 // Concurrency: called from bgp send task.
740 //
741 // Check if the queue corresponding to IPeerUpdate, Ribout and queue id is
742 // active.
743 //
744 // Used by unit test code.
745 //
746 bool BgpSenderPartition::IsQueueActive(RibOut *ribout, int queue_id,
747  IPeerUpdate *peer) {
748  CHECK_CONCURRENCY("bgp::SendUpdate");
749 
750  PeerState *ps = peer_state_imap_.Find(peer);
751  RibState *rs = rib_state_imap_.Find(ribout);
752  return ps->IsQueueActive(rs->index(), queue_id);
753 }
754 
755 //
756 // Concurrency: called from bgp send task.
757 //
758 // Mark all the RibStates for the given peer and queue id as being in sync
759 // and trigger a tail dequeue.
760 //
762  CHECK_CONCURRENCY("bgp::SendUpdate");
763 
765  it != ps->end(rib_state_imap_); ++it) {
766  RibState *rs = it.rib_state();
767  if (!rs->QueueSync(queue_id)) {
768  RibOut *ribout = it.operator->();
769  RibOutActive(ribout, queue_id);
770  rs->SetQueueSync(queue_id);
771  }
772  }
773 }
774 
775 //
776 // Drain the queue until there are no more updates or all the members become
777 // blocked.
778 //
779 void BgpSenderPartition::UpdateRibOut(RibOut *ribout, int queue_id) {
780  CHECK_CONCURRENCY("bgp::SendUpdate");
781 
782  RibOutUpdates *updates = ribout->updates(index_);
783  RibState *rs = rib_state_imap_.Find(ribout);
784  RibPeerSet msync;
785 
786  // Convert group in-sync list to rib specific bitset.
787  BuildSyncBitSet(ribout, rs, &msync);
788 
789  // Drain the queue till we can do no more.
790  RibPeerSet blocked, munsync;
791  bool done = updates->TailDequeue(queue_id, msync, &blocked, &munsync);
792  assert(msync.Contains(blocked));
793 
794  // Mark peers as send blocked.
795  SetSendBlocked(ribout, rs, queue_id, blocked);
796 
797  // Set the queue to be active for any unsync peers. If we don't do this,
798  // we will forget to mark the (RibOut,QueueId) as active for these peers
799  // since the blocked RibPeerSet does not contain peers that are already
800  // out of sync. Note that the unsync peers would have been split from
801  // the tail marker in TailDequeue.
802  SetQueueActive(ribout, rs, queue_id, munsync);
803 
804  // If all peers are blocked, mark the queue as unsync in the RibState. We
805  // will trigger tail dequeue for the (RibOut,QueueId) when any peer that
806  // is interested in the RibOut becomes in sync.
807  if (!done)
808  rs->SetQueueUnsync(queue_id);
809 }
810 
811 //
812 // Go through all RibOuts for the IPeerUpdate and drain the given queue till it
813 // is up-to date or it becomes blocked. If it's blocked, select the next RibOut
814 // to be processed when the IPeerUpdate becomes send ready.
815 //
816 // Return false if the IPeerUpdate got blocked.
817 //
819  int queue_id) {
820  CHECK_CONCURRENCY("bgp::SendUpdate");
821 
823  it != ps->circular_end(rib_state_imap_); ++it) {
824  // Skip if this queue is not active in the PeerRibState.
825  if (!BitIsSet(it.peer_rib_state().qactive, queue_id))
826  continue;
827 
828  // Drain the queue till we can do no more.
829  RibOut *ribout = it.operator->();
830  RibOutUpdates *updates = ribout->updates(index_);
831  RibPeerSet blocked;
832  bool done = updates->PeerDequeue(queue_id, peer, &blocked);
833 
834  // Process blocked mask.
835  RibState *rs = it.rib_state();
836  SetSendBlocked(ribout, rs, queue_id, blocked);
837 
838  // If the peer is still send_ready, mark the queue as inactive for
839  // the peer. Need to check send_ready because the return value of
840  // PeerDequeue only tells that *some* peer was merged with the tail
841  // marker.
842  // If the peer got blocked, remember where to start next time and
843  // stop processing. We don't want to continue processing for other
844  // merged peers if the lead peer is blocked. Processing for other
845  // peers will continue when their own WorkPeer items are processed.
846  if (ps->send_ready()) {
847  assert(done);
848  ps->SetQueueInactive(rs->index(), queue_id);
849  } else {
850  ps->SetIteratorStart(it.index());
851  return false;
852  }
853  }
854 
855  return true;
856 }
857 
858 //
859 // Drain the queue of all updates for this IPeerUpdate, until it is up-to date
860 // or it becomes blocked.
861 //
863  CHECK_CONCURRENCY("bgp::SendUpdate");
864 
865  // Bail if the PeerState is not send ready.
866  PeerState *ps = peer_state_imap_.Find(peer);
867  if (!ps->send_ready()) {
868  return;
869  }
870 
871  // Update the PeerState and bail if the IPeerUpdate is not send ready.
872  // This happens if the IPeerUpdate gets blocked while processing some
873  // other partition.
874  if (!peer->send_ready()) {
875  ps->set_send_ready(false);
876  return;
877  }
878 
879  // Go through all queues and drain them if there's anything on them.
880  for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
881  if (ps->QueueCount(queue_id) == 0) {
882  continue;
883  }
884  if (!UpdatePeerQueue(peer, ps, queue_id)) {
885  assert(!ps->send_ready());
886  return;
887  }
888  }
889 
890  // Checking the return value of UpdatePeerQueue above is not sufficient as
891  // that only tells us that *some* peer(s) got merged with the tail marker.
892  // Need to make sure that the IPeerUpdate that we are processing is still
893  // send ready.
894  if (!ps->send_ready()) {
895  return;
896  }
897 
898  // Mark the peer as being in sync across all tables.
899  ps->SetSync();
900 
901  // Mark all RibStates for the peer as being in sync. This triggers a tail
902  // dequeue for the corresponding (RibOut, QueueId) if necessary. This in
903  // turn ensures that we do not get stuck in the case where all peers get
904  // blocked and then get back in sync.
905  for (int queue_id = RibOutUpdates::QCOUNT - 1; queue_id >= 0; --queue_id) {
906  SetQueueSync(ps, queue_id);
907  }
908 }
909 
910 //
911 // Check invariants for the BgpSenderPartition.
912 //
914  int grp_peer_count = 0;
915  int peer_grp_count = 0;
916  for (size_t i = 0; i < rib_state_imap_.size(); i++) {
917  if (!rib_state_imap_.bits().test(i)) {
918  continue;
919  }
920  RibState *rs = rib_state_imap_.At(i);
921  assert(rs != NULL);
922  CHECK_INVARIANT(rs->index() == i);
924  it != rs->end(peer_state_imap_); ++it) {
925  PeerState *ps = it.operator->();
926  CHECK_INVARIANT(ps->IsMember(i));
927  grp_peer_count++;
928  }
929  }
930  for (size_t i = 0; i < peer_state_imap_.size(); i++) {
931  if (!peer_state_imap_.bits().test(i)) {
932  continue;
933  }
934  PeerState *ps = peer_state_imap_.At(i);
935  assert(ps != NULL);
936  CHECK_INVARIANT(ps->index() == i);
937  if (!ps->CheckInvariants()) {
938  return false;
939  }
941  it != ps->end(rib_state_imap_); ++it) {
942  RibState *rs = it.rib_state();
943  CHECK_INVARIANT(rs->peer_set().test(i));
944  peer_grp_count++;
945  }
946  }
947 
948  CHECK_INVARIANT(grp_peer_count == peer_grp_count);
949  return true;
950 }
951 
952 //
953 // Constructor for BgpUpdateSender.
954 // Initialize send ready WorkQueue and allocate BgpSenderPartitions.
955 //
957  : server_(server),
958  task_id_(TaskScheduler::GetInstance()->GetTaskId("bgp::SendUpdate")),
959  send_ready_queue_(
960  TaskScheduler::GetInstance()->GetTaskId("bgp::SendReadyTask"), 0,
961  boost::bind(&BgpUpdateSender::SendReadyCallback, this, _1)) {
962  for (int idx = 0; idx < DB::PartitionCount(); ++idx) {
963  partitions_.push_back(new BgpSenderPartition(this, idx));
964  }
965 }
966 
967 //
968 // Destructor for BgpUpdateSender.
969 // Shutdown the WorkQueue and delete all BgpSenderPartitions.
970 //
974 }
975 
976 //
977 // Handle the join of an IPeerUpdate to a RibOut.
978 //
980  CHECK_CONCURRENCY("bgp::PeerMembership");
981 
982  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
983  partition->Add(ribout, peer);
984  }
985 }
986 
987 //
988 // Handle the leave of an IPeerUpdate from a RibOut.
989 //
991  CHECK_CONCURRENCY("bgp::PeerMembership");
992 
993  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
994  partition->Remove(ribout, peer);
995  }
996 }
997 
998 //
999 // Inform the specified BgpSenderPartition that it needs to schedule a tail
1000 // dequeue for the given RibOut queue.
1001 //
1002 void BgpUpdateSender::RibOutActive(int index, RibOut *ribout, int queue_id) {
1003  CHECK_CONCURRENCY("db::DBTable", "bgp::PeerMembership");
1004 
1005  partitions_[index]->RibOutActive(ribout, queue_id);
1006 }
1007 
1008 //
1009 // Concurrency: called from arbitrary task.
1010 //
1011 // Enqueue the IPeerUpdate to the send ready processing work queue.
1012 // The callback is invoked in the context of bgp::SendReadyTask.
1013 //
1015  send_ready_queue_.Enqueue(peer);
1016 }
1017 
1018 //
1019 // Return true if the IPeer is registered.
1020 //
1022  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1023  if (partition->PeerIsRegistered(peer))
1024  return true;
1025  }
1026  return false;
1027 }
1028 
1029 //
1030 // Return true if the IPeer is in sync.
1031 //
1033  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1034  if (!partition->PeerInSync(peer))
1035  return false;
1036  }
1037  return true;
1038 }
1039 
1040 //
1041 // Callback to handle send ready notification for IPeerUpdate. Processing it
1042 // in the context of bgp::SendeReadyTask ensures that there are no concurrency
1043 // issues w.r.t. the BgpSenderPartition working on the IPeerUpdate while we are
1044 // processing the notification.
1045 //
1047  CHECK_CONCURRENCY("bgp::SendReadyTask");
1048 
1049  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1050  partition->PeerSendReady(peer);
1051  }
1052  return true;
1053 }
1054 
1055 //
1056 // Check invariants for the BgpUpdateSender.
1057 //
1059  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1060  if (!partition->CheckInvariants())
1061  return false;
1062  }
1063  return true;
1064 }
1065 
1066 //
1067 // Disable all BgpSenderPartitions.
1068 //
1070  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1071  partition->set_disabled(true);
1072  }
1073 }
1074 
1075 //
1076 // Enable all BgpSenderPartitions.
1077 //
1079  BOOST_FOREACH(BgpSenderPartition *partition, partitions_) {
1080  partition->set_disabled(false);
1081  }
1082 }
circular_iterator(const RibStateMap &indexmap, Map *map, int start, bool is_valid)
RibStateMap rib_state_imap_
bool SendReadyCallback(IPeerUpdate *peer)
void SetQueueActive(size_t rib_index, int queue_id)
int GetPeerIndex(IPeerUpdate *peer) const
Definition: bgp_ribout.cc:536
bool PeerIsRegistered(IPeerUpdate *peer) const
void init()
Definition: bgp_log.cc:37
void STLDeleteValues(Container *container)
Definition: util.h:101
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
bool equal(const iterator &rhs) const
map< size_t, PeerRibState > Map
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
bool test(size_t pos) const
Definition: bitset.cc:146
void UpdatePeer(IPeerUpdate *peer)
bool IsQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer)
bool PeerInSync(IPeerUpdate *peer) const
circular_iterator circular_begin(const RibStateMap &indexmap)
void UpdateRibOut(RibOut *ribout, int queue_id)
PeerStateMap peer_state_imap_
void WorkRibOutInvalidate(RibOut *ribout)
void WorkPeerInvalidate(IPeerUpdate *peer)
bool Contains(const BitSet &rhs) const
Definition: bitset.cc:536
bool CheckInvariants() const
ValueType * At(int index) const
Definition: index_map.h:31
void RibOutActive(RibOut *ribout, int queue_id)
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer, RibPeerSet *blocked)
BgpUpdateSender(BgpServer *server)
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Definition: task.cc:699
iterator end(const PeerStateMap &indexmap)
bool PeerInSync(IPeerUpdate *peer) const
void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync)
const BitSet & peer_set() const
#define CHECK_INVARIANT(Cond)
Definition: util.h:29
bool PeerIsSendReady(IPeerUpdate *peer) const
iterator begin(const RibStateMap &indexmap)
BgpUpdateSender * sender_
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
bool CheckInvariants() const
void SetQueueInactive(size_t rib_index, int queue_id)
std::unique_ptr< WorkBase > WorkDequeue()
bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id)
static const size_t npos
Definition: bitset.h:19
void Leave(RibOut *ribout, IPeerUpdate *peer)
void Remove(const KeyType &key, int index, bool clear_bit=true)
Definition: index_map.h:69
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
void SetBit(IntType &value, size_t bit)
Definition: util.h:41
virtual bool send_ready() const
Definition: ipeer.h:23
#define CHECK_CONCURRENCY(...)
IPeerUpdate * GetPeer(int index) const
Definition: bgp_ribout.cc:525
Worker(BgpSenderPartition *partition)
void Join(RibOut *ribout, IPeerUpdate *peer)
void WorkRibOutEnqueue(RibOut *ribout, int queue_id)
iterator(const RibStateMap &indexmap, Map *map, size_t index)
Definition: bitset.h:17
BitSet & set(size_t pos)
Definition: bitset.cc:125
bool IsQueueActive(size_t rib_index, int queue_id)
const BitsetType & bits() const
Definition: index_map.h:110
bool IsMember(size_t index) const
const PeerRibState & peer_rib_state() const
bool none() const
Definition: bitset.cc:172
void RibOutActive(int index, RibOut *ribout, int queue_id)
size_t find_first() const
Definition: bitset.cc:242
void SetSendBlocked(RibOut *ribout, int queue_id, const RibPeerSet &blocked)
size_t size() const
Definition: index_map.h:100
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync, RibPeerSet *blocked, RibPeerSet *unsync)
iterator begin(const PeerStateMap &indexmap)
bool equal(const iterator &rhs) const
int task_id() const
bool empty() const
Definition: index_map.h:102
size_t find_next(size_t pos) const
Definition: bitset.cc:255
bool equal(const circular_iterator &rhs) const
void PeerSendReady(IPeerUpdate *peer)
RibOutUpdates * updates(int idx)
Definition: bgp_ribout.h:316
circular_iterator circular_end(const RibStateMap &indexmap)
static int PartitionCount()
Definition: db.cc:32
BgpSenderPartition(BgpUpdateSender *sender, int index)
void PeerSendReady(IPeerUpdate *peer)
void WorkEnqueue(WorkBase *wentry)
BgpSenderPartition * partition(int index)
void set_disabled(bool disabled)
void Remove(RibOut *ribout, IPeerUpdate *peer)
BgpSenderPartition * partition_
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
iterator end(const RibStateMap &indexmap)
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
void SetQueueSync(PeerState *ps, int queue_id)
WorkQueue< IPeerUpdate * > send_ready_queue_
void ClearBit(IntType &value, size_t bit)
Definition: util.h:46
bool BitIsSet(IntType value, size_t bit)
Definition: util.h:36
ValueType * Find(const KeyType &key) const
Definition: index_map.h:34
ValueType * Locate(const KeyType &key)
Definition: index_map.h:91
void Add(RibOut *ribout, IPeerUpdate *peer)
std::vector< BgpSenderPartition * > partitions_
iterator(const PeerStateMap &indexmap, const BitSet &set, size_t index)
bool PeerIsRegistered(IPeerUpdate *peer) const
void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id, const RibPeerSet &munsync)
void WorkPeerEnqueue(IPeerUpdate *peer)