7 #include <boost/bind.hpp>
8 #include <boost/foreach.hpp>
19 using std::unique_ptr;
67 typedef map<size_t, PeerRibState>
Map;
69 class iterator :
public boost::iterator_facade<
70 iterator, RibOut, boost::forward_traversal_tag> {
82 Map::const_iterator loc =
map_->upper_bound(
index_);
83 if (loc ==
map_->end()) {
100 circular_iterator, RibOut, boost::forward_traversal_tag> {
103 int start,
bool is_valid)
108 Map::const_iterator loc =
map_->lower_bound(start);
109 if (loc ==
map_->end()) {
113 if (is_valid)
match_ =
false;
123 assert(!
map_->empty());
124 Map::const_iterator loc =
map_->upper_bound(
index_);
125 if (loc ==
map_->end()) {
160 Map::const_iterator it =
rib_set_.begin();
179 Map::iterator loc =
rib_set_.find(rib_index);
181 if (!
BitIsSet(loc->second.qactive, queue_id)) {
182 SetBit(loc->second.qactive, queue_id);
189 Map::iterator loc =
rib_set_.find(rib_index);
191 if (
BitIsSet(loc->second.qactive, queue_id)) {
192 ClearBit(loc->second.qactive, queue_id);
199 Map::iterator loc =
rib_set_.find(rib_index);
201 return BitIsSet(loc->second.qactive, queue_id);
253 iterator, PeerState, boost::forward_traversal_tag> {
322 peer_set_.reset(ps->
index());
327 return (in_sync_[queue_id]);
332 in_sync_[queue_id] =
true;
337 in_sync_[queue_id] =
false;
343 rib_set_.insert(make_pair(rs->
index(),
init));
344 rib_bitset_.set(rs->
index());
350 SetQueueInactive(rs->
index(), queue_id);
352 rib_set_.erase(rs->
index());
353 rib_bitset_.reset(rs->
index());
358 for (Map::iterator it = rib_set_.begin(); it != rib_set_.end(); ++it) {
359 assert(it->second.qactive == 0);
362 assert(qactive_cnt_[i] == 0);
368 return *indexmap_.At(
index_)->ribout();
372 return *indexmap_.At(
index_)->ribout();
391 switch (wentry->type) {
407 string Description()
const {
return "BgpSenderPartition::Worker"; }
530 return (ps ? ps->
in_sync() :
false);
554 unique_ptr<WorkBase> wentry;
570 "bgp::PeerMembership");
608 if (wpeer->
peer != peer)
610 wpeer->
valid =
false;
637 if (wribout->
ribout != ribout)
639 wribout->
valid =
false;
768 RibOut *ribout = it.operator->();
791 bool done = updates->
TailDequeue(queue_id, msync, &blocked, &munsync);
825 if (!
BitIsSet(it.peer_rib_state().qactive, queue_id))
829 RibOut *ribout = it.operator->();
832 bool done = updates->
PeerDequeue(queue_id, peer, &blocked);
914 int grp_peer_count = 0;
915 int peer_grp_count = 0;
958 task_id_(
TaskScheduler::GetInstance()->GetTaskId(
"bgp::SendUpdate")),
960 TaskScheduler::GetInstance()->GetTaskId(
"bgp::SendReadyTask"), 0,
1005 partitions_[index]->RibOutActive(ribout, queue_id);
bool equal(const circular_iterator &rhs) const
RibOut & dereference() const
const PeerRibState & peer_rib_state() const
circular_iterator(const RibStateMap &indexmap, Map *map, int start, bool is_valid)
friend class boost::iterator_core_access
const RibStateMap & indexmap_
bool equal(const iterator &rhs) const
const RibStateMap & indexmap_
iterator(const RibStateMap &indexmap, Map *map, size_t index)
RibOut & dereference() const
friend class boost::iterator_core_access
const PeerRibState & peer_rib_state() const
PeerState(IPeerUpdate *peer)
iterator end(const RibStateMap &indexmap)
void SetQueueInactive(size_t rib_index, int queue_id)
void set_send_ready(bool toggle)
vector< int > qactive_cnt_
bool IsQueueActive(size_t rib_index, int queue_id)
void SetIteratorStart(size_t start)
int QueueCount(int queue_id)
iterator begin(const RibStateMap &indexmap)
tbb::atomic< bool > send_ready_
void set_index(size_t index)
void SetQueueActive(size_t rib_index, int queue_id)
circular_iterator circular_begin(const RibStateMap &indexmap)
map< size_t, PeerRibState > Map
circular_iterator circular_end(const RibStateMap &indexmap)
bool CheckInvariants() const
DISALLOW_COPY_AND_ASSIGN(PeerState)
IPeerUpdate * peer() const
void Remove(RibState *rs)
bool IsMember(size_t index) const
PeerState & dereference() const
const PeerStateMap & indexmap_
bool equal(const iterator &rhs) const
iterator(const PeerStateMap &indexmap, const BitSet &set, size_t index)
friend class boost::iterator_core_access
DISALLOW_COPY_AND_ASSIGN(RibState)
void set_index(size_t index)
iterator begin(const PeerStateMap &indexmap)
bool QueueSync(int queue_id)
void SetQueueUnsync(int queue_id)
iterator end(const PeerStateMap &indexmap)
const BitSet & peer_set() const
void SetQueueSync(int queue_id)
void Remove(PeerState *ps)
Worker(BgpSenderPartition *partition)
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
BgpSenderPartition * partition_
string Description() const
bool CheckInvariants() const
std::unique_ptr< WorkBase > WorkDequeue()
void RibOutActive(RibOut *ribout, int queue_id)
void SetSendBlocked(RibOut *ribout, int queue_id, const RibPeerSet &blocked)
void Remove(RibOut *ribout, IPeerUpdate *peer)
void Add(RibOut *ribout, IPeerUpdate *peer)
void WorkRibOutInvalidate(RibOut *ribout)
bool PeerIsSendReady(IPeerUpdate *peer) const
void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync)
void WorkRibOutEnqueue(RibOut *ribout, int queue_id)
void SetQueueSync(PeerState *ps, int queue_id)
void set_disabled(bool disabled)
bool IsQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer)
bool PeerIsRegistered(IPeerUpdate *peer) const
RibStateMap rib_state_imap_
void WorkPeerEnqueue(IPeerUpdate *peer)
BgpUpdateSender * sender_
void UpdatePeer(IPeerUpdate *peer)
void PeerSendReady(IPeerUpdate *peer)
void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id, const RibPeerSet &munsync)
bool PeerInSync(IPeerUpdate *peer) const
bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id)
PeerStateMap peer_state_imap_
BgpSenderPartition(BgpUpdateSender *sender, int index)
void WorkEnqueue(WorkBase *wentry)
void UpdateRibOut(RibOut *ribout, int queue_id)
void WorkPeerInvalidate(IPeerUpdate *peer)
bool PeerInSync(IPeerUpdate *peer) const
void Leave(RibOut *ribout, IPeerUpdate *peer)
bool PeerIsRegistered(IPeerUpdate *peer) const
BgpSenderPartition * partition(int index)
bool CheckInvariants() const
void Join(RibOut *ribout, IPeerUpdate *peer)
bool SendReadyCallback(IPeerUpdate *peer)
void RibOutActive(int index, RibOut *ribout, int queue_id)
BgpUpdateSender(BgpServer *server)
WorkQueue< IPeerUpdate * > send_ready_queue_
std::vector< BgpSenderPartition * > partitions_
void PeerSendReady(IPeerUpdate *peer)
size_t find_first() const
size_t find_next(size_t pos) const
bool Contains(const BitSet &rhs) const
bool test(size_t pos) const
static int PartitionCount()
virtual bool send_ready() const
ValueType * At(int index) const
const BitsetType & bits() const
ValueType * Locate(const KeyType &key)
void Remove(const KeyType &key, int index, bool clear_bit=true)
ValueType * Find(const KeyType &key) const
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer, RibPeerSet *blocked)
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync, RibPeerSet *blocked, RibPeerSet *unsync)
RibOutUpdates * updates(int idx)
IPeerUpdate * GetPeer(int index) const
int GetPeerIndex(IPeerUpdate *peer) const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Task is a wrapper over tbb::task to support policies.
bool Enqueue(QueueEntryT entry)
void Shutdown(bool delete_entries=true)
#define CHECK_CONCURRENCY(...)
void SetBit(IntType &value, size_t bit)
void ClearBit(IntType &value, size_t bit)
void STLDeleteValues(Container *container)
bool BitIsSet(IntType value, size_t bit)
#define CHECK_INVARIANT(Cond)