7 #include <boost/bind.hpp>
8 #include <boost/foreach.hpp>
19 using std::unique_ptr;
67 typedef map<size_t, PeerRibState>
Map;
69 class iterator :
public boost::iterator_facade<
70 iterator, RibOut, boost::forward_traversal_tag> {
82 Map::const_iterator loc =
map_->upper_bound(
index_);
83 if (loc ==
map_->end()) {
100 circular_iterator, RibOut, boost::forward_traversal_tag> {
103 int start,
bool is_valid)
108 Map::const_iterator loc =
map_->lower_bound(start);
109 if (loc ==
map_->end()) {
113 if (is_valid)
match_ =
false;
123 assert(!
map_->empty());
124 Map::const_iterator loc =
map_->upper_bound(
index_);
125 if (loc ==
map_->end()) {
160 Map::const_iterator it =
rib_set_.begin();
179 Map::iterator loc =
rib_set_.find(rib_index);
181 if (!
BitIsSet(loc->second.qactive, queue_id)) {
182 SetBit(loc->second.qactive, queue_id);
189 Map::iterator loc =
rib_set_.find(rib_index);
191 if (
BitIsSet(loc->second.qactive, queue_id)) {
192 ClearBit(loc->second.qactive, queue_id);
199 Map::iterator loc =
rib_set_.find(rib_index);
201 return BitIsSet(loc->second.qactive, queue_id);
253 iterator, PeerState, boost::forward_traversal_tag> {
322 peer_set_.reset(ps->
index());
327 return (in_sync_[queue_id]);
332 in_sync_[queue_id] =
true;
337 in_sync_[queue_id] =
false;
343 rib_set_.insert(make_pair(rs->
index(),
init));
344 rib_bitset_.set(rs->
index());
350 SetQueueInactive(rs->
index(), queue_id);
352 rib_set_.erase(rs->
index());
353 rib_bitset_.reset(rs->
index());
358 for (Map::iterator it = rib_set_.begin(); it != rib_set_.end(); ++it) {
359 assert(it->second.qactive == 0);
362 assert(qactive_cnt_[i] == 0);
368 return *indexmap_.At(
index_)->ribout();
372 return *indexmap_.At(
index_)->ribout();
379 partition_(partition) {
386 unique_ptr<WorkBase> wentry = partition_->WorkDequeue();
391 switch (wentry->type) {
399 partition_->UpdatePeer(workpeer->
peer);
407 string Description()
const {
return "BgpSenderPartition::Worker"; }
530 return (ps ? ps->
in_sync() :
false);
554 unique_ptr<WorkBase> wentry;
570 "bgp::PeerMembership");
608 if (wpeer->
peer != peer)
610 wpeer->
valid =
false;
637 if (wribout->
ribout != ribout)
639 wribout->
valid =
false;
768 RibOut *ribout = it.operator->();
791 bool done = updates->
TailDequeue(queue_id, msync, &blocked, &munsync);
825 if (!
BitIsSet(it.peer_rib_state().qactive, queue_id))
829 RibOut *ribout = it.operator->();
832 bool done = updates->
PeerDequeue(queue_id, peer, &blocked);
914 int grp_peer_count = 0;
915 int peer_grp_count = 0;
958 task_id_(
TaskScheduler::GetInstance()->GetTaskId(
"bgp::SendUpdate")),
960 TaskScheduler::GetInstance()->GetTaskId(
"bgp::SendReadyTask"), 0,
983 partition->
Add(ribout, peer);
994 partition->
Remove(ribout, peer);
1005 partitions_[index]->RibOutActive(ribout, queue_id);
circular_iterator(const RibStateMap &indexmap, Map *map, int start, bool is_valid)
RibStateMap rib_state_imap_
bool SendReadyCallback(IPeerUpdate *peer)
void SetQueueActive(size_t rib_index, int queue_id)
int GetPeerIndex(IPeerUpdate *peer) const
RibOut & dereference() const
bool PeerIsRegistered(IPeerUpdate *peer) const
void STLDeleteValues(Container *container)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
bool equal(const iterator &rhs) const
map< size_t, PeerRibState > Map
void Shutdown(bool delete_entries=true)
bool test(size_t pos) const
void UpdatePeer(IPeerUpdate *peer)
vector< int > qactive_cnt_
bool IsQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer)
bool PeerInSync(IPeerUpdate *peer) const
circular_iterator circular_begin(const RibStateMap &indexmap)
DISALLOW_COPY_AND_ASSIGN(RibState)
void UpdateRibOut(RibOut *ribout, int queue_id)
PeerStateMap peer_state_imap_
void WorkRibOutInvalidate(RibOut *ribout)
void WorkPeerInvalidate(IPeerUpdate *peer)
string Description() const
int QueueCount(int queue_id)
bool QueueSync(int queue_id)
bool Contains(const BitSet &rhs) const
const PeerStateMap & indexmap_
bool CheckInvariants() const
ValueType * At(int index) const
void RibOutActive(RibOut *ribout, int queue_id)
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer, RibPeerSet *blocked)
BgpUpdateSender(BgpServer *server)
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
iterator end(const PeerStateMap &indexmap)
bool PeerInSync(IPeerUpdate *peer) const
tbb::atomic< bool > send_ready_
bool CheckInvariants() const
void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync)
const BitSet & peer_set() const
#define CHECK_INVARIANT(Cond)
const RibStateMap & indexmap_
bool PeerIsSendReady(IPeerUpdate *peer) const
iterator begin(const RibStateMap &indexmap)
BgpUpdateSender * sender_
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
bool CheckInvariants() const
void SetQueueInactive(size_t rib_index, int queue_id)
std::unique_ptr< WorkBase > WorkDequeue()
bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id)
void Leave(RibOut *ribout, IPeerUpdate *peer)
void SetQueueUnsync(int queue_id)
void set_send_ready(bool toggle)
void Remove(const KeyType &key, int index, bool clear_bit=true)
static TaskScheduler * GetInstance()
friend class boost::iterator_core_access
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
void set_index(size_t index)
void SetBit(IntType &value, size_t bit)
virtual bool send_ready() const
void SetQueueSync(int queue_id)
#define CHECK_CONCURRENCY(...)
IPeerUpdate * GetPeer(int index) const
Worker(BgpSenderPartition *partition)
void Join(RibOut *ribout, IPeerUpdate *peer)
void WorkRibOutEnqueue(RibOut *ribout, int queue_id)
iterator(const RibStateMap &indexmap, Map *map, size_t index)
friend class boost::iterator_core_access
void set_index(size_t index)
bool IsQueueActive(size_t rib_index, int queue_id)
const BitsetType & bits() const
bool IsMember(size_t index) const
const PeerRibState & peer_rib_state() const
void RibOutActive(int index, RibOut *ribout, int queue_id)
size_t find_first() const
void SetSendBlocked(RibOut *ribout, int queue_id, const RibPeerSet &blocked)
void Remove(PeerState *ps)
RibOut & dereference() const
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync, RibPeerSet *blocked, RibPeerSet *unsync)
iterator begin(const PeerStateMap &indexmap)
bool equal(const iterator &rhs) const
size_t find_next(size_t pos) const
bool equal(const circular_iterator &rhs) const
void PeerSendReady(IPeerUpdate *peer)
RibOutUpdates * updates(int idx)
DISALLOW_COPY_AND_ASSIGN(PeerState)
circular_iterator circular_end(const RibStateMap &indexmap)
static int PartitionCount()
PeerState(IPeerUpdate *peer)
BgpSenderPartition(BgpUpdateSender *sender, int index)
void SetIteratorStart(size_t start)
void PeerSendReady(IPeerUpdate *peer)
void WorkEnqueue(WorkBase *wentry)
BgpSenderPartition * partition(int index)
void set_disabled(bool disabled)
IPeerUpdate * peer() const
void Remove(RibOut *ribout, IPeerUpdate *peer)
BgpSenderPartition * partition_
bool Enqueue(QueueEntryT entry)
const PeerRibState & peer_rib_state() const
iterator end(const RibStateMap &indexmap)
Task is a wrapper over tbb::task to support policies.
void SetQueueSync(PeerState *ps, int queue_id)
PeerState & dereference() const
WorkQueue< IPeerUpdate * > send_ready_queue_
void ClearBit(IntType &value, size_t bit)
bool BitIsSet(IntType value, size_t bit)
const RibStateMap & indexmap_
ValueType * Find(const KeyType &key) const
void Remove(RibState *rs)
ValueType * Locate(const KeyType &key)
void Add(RibOut *ribout, IPeerUpdate *peer)
std::vector< BgpSenderPartition * > partitions_
iterator(const PeerStateMap &indexmap, const BitSet &set, size_t index)
bool PeerIsRegistered(IPeerUpdate *peer) const
void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id, const RibPeerSet &munsync)
void WorkPeerEnqueue(IPeerUpdate *peer)
friend class boost::iterator_core_access