7 #include <boost/bind/bind.hpp>
8 #include <boost/foreach.hpp>
20 using std::unique_ptr;
25 using namespace boost::placeholders;
69 typedef map<size_t, PeerRibState>
Map;
71 class iterator :
public boost::iterator_facade<
72 iterator, RibOut, boost::forward_traversal_tag> {
75 : indexmap_(indexmap), map_(map), index_(index) {
77 size_t index()
const {
return index_; }
82 friend class boost::iterator_core_access;
84 Map::const_iterator loc = map_->upper_bound(index_);
85 if (loc == map_->end()) {
92 return index_ == rhs.
index_;
94 RibOut &dereference()
const;
102 circular_iterator, RibOut, boost::forward_traversal_tag> {
105 int start,
bool is_valid)
106 : indexmap_(indexmap), map_(map), index_(-1), match_(true) {
110 Map::const_iterator loc = map_->lower_bound(start);
111 if (loc == map_->end()) {
115 if (is_valid) match_ =
false;
117 int index()
const {
return index_; }
122 friend class boost::iterator_core_access;
125 assert(!map_->empty());
126 Map::const_iterator loc = map_->upper_bound(index_);
127 if (loc == map_->end()) {
133 return ((match_ == rhs.
match_) && (index_ == rhs.
index_));
135 RibOut &dereference()
const;
144 : key_(peer), index_(-1),
146 in_sync_(true), rib_iterator_(
BitSet::npos) {
158 return rib_bitset_.test(index);
162 Map::const_iterator it = rib_set_.begin();
163 size_t index = (it != rib_set_.end() ? it->first : -1);
164 return iterator(indexmap, &rib_set_, index);
167 return iterator(indexmap, &rib_set_, -1);
181 Map::iterator loc = rib_set_.find(rib_index);
182 assert(loc != rib_set_.end());
183 if (!
BitIsSet(loc->second.qactive, queue_id)) {
184 SetBit(loc->second.qactive, queue_id);
185 qactive_cnt_[queue_id]++;
191 Map::iterator loc = rib_set_.find(rib_index);
192 assert(loc != rib_set_.end());
193 if (
BitIsSet(loc->second.qactive, queue_id)) {
194 ClearBit(loc->second.qactive, queue_id);
195 qactive_cnt_[queue_id]--;
201 Map::iterator loc = rib_set_.find(rib_index);
202 assert(loc != rib_set_.end());
203 return BitIsSet(loc->second.qactive, queue_id);
206 int QueueCount(
int queue_id) {
return qactive_cnt_[queue_id]; }
210 size_t index()
const {
return index_; }
219 bool empty()
const {
return rib_set_.empty(); }
255 iterator, PeerState, boost::forward_traversal_tag> {
258 const BitSet &set,
size_t index)
259 : indexmap_(indexmap), set_(set), index_(index) {
261 size_t index()
const {
return index_; }
264 friend class boost::iterator_core_access;
266 index_ = set_.find_next(index_);
269 return index_ == rhs.
index_;
272 return *indexmap_.At(index_);
280 : key_(ribout), index_(-1), in_sync_(
RibOutUpdates::QCOUNT, true) {
286 bool QueueSync(
int queue_id);
287 void SetQueueSync(
int queue_id);
288 void SetQueueUnsync(
int queue_id);
293 return iterator(indexmap, peer_set_, peer_set_.find_first());
301 size_t index()
const {
return index_; }
303 bool empty()
const {
return peer_set_.none(); }
319 peer_set_.set(ps->
index());
324 peer_set_.reset(ps->
index());
329 return (in_sync_[queue_id]);
334 in_sync_[queue_id] =
true;
339 in_sync_[queue_id] =
false;
345 rib_set_.insert(make_pair(rs->
index(),
init));
346 rib_bitset_.set(rs->
index());
352 SetQueueInactive(rs->
index(), queue_id);
354 rib_set_.erase(rs->
index());
355 rib_bitset_.reset(rs->
index());
360 for (Map::iterator it = rib_set_.begin(); it != rib_set_.end(); ++it) {
361 assert(it->second.qactive == 0);
364 assert(qactive_cnt_[i] == 0);
370 return *indexmap_.At(index_)->ribout();
374 return *indexmap_.At(index_)->ribout();
380 :
Task(partition->task_id(), partition->index()),
381 partition_(partition) {
388 unique_ptr<WorkBase> wentry = partition_->WorkDequeue();
393 switch (wentry->type) {
394 case WorkBase::WRibOut: {
399 case WorkBase::WPeer: {
401 partition_->UpdatePeer(workpeer->
peer);
409 string Description()
const {
return "BgpSenderPartition::Worker"; }
532 return (ps ? ps->
in_sync() :
false);
556 unique_ptr<WorkBase> wentry;
572 "bgp::PeerMembership");
610 if (wpeer->
peer != peer)
612 wpeer->
valid =
false;
639 if (wribout->
ribout != ribout)
641 wribout->
valid =
false;
770 RibOut *ribout = it.operator->();
793 bool done = updates->
TailDequeue(queue_id, msync, &blocked, &munsync);
827 if (!
BitIsSet(it.peer_rib_state().qactive, queue_id))
831 RibOut *ribout = it.operator->();
834 bool done = updates->
PeerDequeue(queue_id, peer, &blocked);
916 int grp_peer_count = 0;
917 int peer_grp_count = 0;
960 task_id_(
TaskScheduler::GetInstance()->GetTaskId(
"bgp::SendUpdate")),
962 TaskScheduler::GetInstance()->GetTaskId(
"bgp::SendReadyTask"), 0,
1007 partitions_[index]->RibOutActive(ribout, queue_id);
bool equal(const circular_iterator &rhs) const
RibOut & dereference() const
const PeerRibState & peer_rib_state() const
circular_iterator(const RibStateMap &indexmap, Map *map, int start, bool is_valid)
const RibStateMap & indexmap_
bool equal(const iterator &rhs) const
const RibStateMap & indexmap_
iterator(const RibStateMap &indexmap, Map *map, size_t index)
RibOut & dereference() const
const PeerRibState & peer_rib_state() const
PeerState(IPeerUpdate *peer)
iterator end(const RibStateMap &indexmap)
void SetQueueInactive(size_t rib_index, int queue_id)
void set_send_ready(bool toggle)
vector< int > qactive_cnt_
bool IsQueueActive(size_t rib_index, int queue_id)
void SetIteratorStart(size_t start)
int QueueCount(int queue_id)
iterator begin(const RibStateMap &indexmap)
void set_index(size_t index)
void SetQueueActive(size_t rib_index, int queue_id)
circular_iterator circular_begin(const RibStateMap &indexmap)
map< size_t, PeerRibState > Map
circular_iterator circular_end(const RibStateMap &indexmap)
bool CheckInvariants() const
std::atomic< bool > send_ready_
DISALLOW_COPY_AND_ASSIGN(PeerState)
IPeerUpdate * peer() const
void Remove(RibState *rs)
bool IsMember(size_t index) const
PeerState & dereference() const
const PeerStateMap & indexmap_
bool equal(const iterator &rhs) const
iterator(const PeerStateMap &indexmap, const BitSet &set, size_t index)
DISALLOW_COPY_AND_ASSIGN(RibState)
void set_index(size_t index)
iterator begin(const PeerStateMap &indexmap)
bool QueueSync(int queue_id)
void SetQueueUnsync(int queue_id)
iterator end(const PeerStateMap &indexmap)
const BitSet & peer_set() const
void SetQueueSync(int queue_id)
void Remove(PeerState *ps)
Worker(BgpSenderPartition *partition)
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
BgpSenderPartition * partition_
string Description() const
Gives a description of the task.
bool CheckInvariants() const
std::unique_ptr< WorkBase > WorkDequeue()
void RibOutActive(RibOut *ribout, int queue_id)
void SetSendBlocked(RibOut *ribout, int queue_id, const RibPeerSet &blocked)
void Remove(RibOut *ribout, IPeerUpdate *peer)
void Add(RibOut *ribout, IPeerUpdate *peer)
void WorkRibOutInvalidate(RibOut *ribout)
bool PeerIsSendReady(IPeerUpdate *peer) const
void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync)
void WorkRibOutEnqueue(RibOut *ribout, int queue_id)
void SetQueueSync(PeerState *ps, int queue_id)
void set_disabled(bool disabled)
bool IsQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer)
bool PeerIsRegistered(IPeerUpdate *peer) const
RibStateMap rib_state_imap_
void WorkPeerEnqueue(IPeerUpdate *peer)
BgpUpdateSender * sender_
void UpdatePeer(IPeerUpdate *peer)
void PeerSendReady(IPeerUpdate *peer)
void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id, const RibPeerSet &munsync)
bool PeerInSync(IPeerUpdate *peer) const
bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id)
PeerStateMap peer_state_imap_
BgpSenderPartition(BgpUpdateSender *sender, int index)
void WorkEnqueue(WorkBase *wentry)
void UpdateRibOut(RibOut *ribout, int queue_id)
void WorkPeerInvalidate(IPeerUpdate *peer)
bool PeerInSync(IPeerUpdate *peer) const
void Leave(RibOut *ribout, IPeerUpdate *peer)
bool PeerIsRegistered(IPeerUpdate *peer) const
BgpSenderPartition * partition(int index)
bool CheckInvariants() const
void Join(RibOut *ribout, IPeerUpdate *peer)
bool SendReadyCallback(IPeerUpdate *peer)
void RibOutActive(int index, RibOut *ribout, int queue_id)
BgpUpdateSender(BgpServer *server)
WorkQueue< IPeerUpdate * > send_ready_queue_
std::vector< BgpSenderPartition * > partitions_
void PeerSendReady(IPeerUpdate *peer)
size_t find_first() const
size_t find_next(size_t pos) const
bool Contains(const BitSet &rhs) const
bool test(size_t pos) const
static int PartitionCount()
virtual bool send_ready() const
ValueType * At(int index) const
const BitsetType & bits() const
ValueType * Locate(const KeyType &key)
void Remove(const KeyType &key, int index, bool clear_bit=true)
ValueType * Find(const KeyType &key) const
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer, RibPeerSet *blocked)
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync, RibPeerSet *blocked, RibPeerSet *unsync)
RibOutUpdates * updates(int idx)
IPeerUpdate * GetPeer(int index) const
int GetPeerIndex(IPeerUpdate *peer) const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Task is a class to describe a computational task within OpenSDN control plane applications....
bool Enqueue(QueueEntryT entry)
void Shutdown(bool delete_entries=true)
#define CHECK_CONCURRENCY(...)
void SetBit(IntType &value, size_t bit)
void ClearBit(IntType &value, size_t bit)
void STLDeleteValues(Container *container)
bool BitIsSet(IntType value, size_t bit)
#define CHECK_INVARIANT(Cond)