7 #include <boost/foreach.hpp>
13 #include "bgp/bgp_peer_types.h"
31 TaskScheduler::GetInstance()->GetTaskId(
"bgp::PeerMembership"), 0,
48 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
65 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
91 for (PeerRegistrationListenerList::iterator iter =
96 (callback)(peer, table, unregister);
122 "bgp::StateMachine",
"xmpp::StateMachine");
124 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
162 "bgp::StateMachine",
"xmpp::StateMachine");
164 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
192 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
219 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
239 "Unregister table requested for action " << prs->
action());
251 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
289 tbb::spin_rw_mutex::scoped_lock write_lock(
rw_mutex_,
true);
298 table,
"Walk table requested for action " << prs->
action());
307 int *instance_id, uint64_t *subscription_gen_id)
const {
308 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
314 if (subscription_gen_id)
324 int instance_id, uint64_t subscription_gen_id) {
325 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
338 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
348 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
358 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
368 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
382 list<BgpTable *> *table_list)
const {
383 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
395 ShowRoutingInstanceTable *srit,
const BgpTable *table)
const {
396 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
407 BgpNeighborResp *resp)
const {
408 tbb::spin_rw_mutex::scoped_lock read_lock(
rw_mutex_,
false);
409 assert(resp->get_routing_tables().empty());
414 resp->set_send_state(
415 sender->
PeerInSync(nc_peer) ?
"in sync" :
"not in sync");
417 resp->set_send_state(
"not advertising");
475 const IPeer *peer)
const {
617 IPeer *peer =
event->peer;
637 table,
"Register table requested for action " << prs->
action());
645 IPeer *peer =
event->peer;
652 table,
"Register table completed for action " << prs->
action());
663 IPeer *peer =
event->peer;
677 table,
"Unregister table requested for action " << prs->
action());
684 IPeer *peer =
event->peer;
697 table,
"Unregister table completed for action " << prs->
action());
711 IPeer *peer =
event->peer;
719 table,
"Walk table completed for action " << prs->
action());
723 table,
"Unregister table completed for action " << prs->
action());
775 : event_type(event_type),
786 : event_type(event_type),
790 instance_id(instance_id) {
806 assert(rib_map_.empty());
814 PeerRibStateMap::iterator loc = rib_map_.find(rs);
815 if (loc == rib_map_.end()) {
817 rib_map_.insert(make_pair(rs, prs));
829 PeerRibStateMap::iterator loc = rib_map_.find(rs);
830 return (loc != rib_map_.end() ? loc->second : NULL);
839 PeerRibStateMap::const_iterator loc = rib_map_.find(rs);
840 return (loc != rib_map_.end() ? loc->second : NULL);
848 PeerRibStateMap::iterator loc = rib_map_.find(prs->
rib_state());
849 if (loc != rib_map_.end())
851 return rib_map_.empty();
858 list<BgpTable *> *table_list)
const {
859 for (PeerRibStateMap::const_iterator loc = rib_map_.begin();
860 loc != rib_map_.end(); ++loc) {
862 table_list->push_back(rs->
table());
870 BgpNeighborResp *resp)
const {
871 vector<BgpNeighborRoutingTable> table_list;
872 for (PeerRibStateMap::const_iterator loc = rib_map_.begin();
873 loc != rib_map_.end(); ++loc) {
875 BgpNeighborRoutingTable table;
877 table.set_current_state(
"subscribed");
878 table_list.push_back(table);
880 resp->set_routing_tables(table_list);
892 table_delete_ref_(this, table->
deleter()) {
899 assert(peer_rib_list_.empty());
900 assert(pending_peer_rib_list_.empty());
908 pending_peer_rib_list_.insert(prs);
909 manager_->EnqueueRibState(
this);
916 pending_peer_rib_list_.clear();
923 peer_rib_list_.insert(prs);
930 peer_rib_list_.erase(prs);
931 return peer_rib_list_.empty();
938 ShowRoutingInstanceTable *srit)
const {
939 ShowTableMembershipInfo stmi;
940 stmi.set_requests(request_count_);
941 stmi.set_walks(walk_count_);
942 vector<ShowMembershipPeerInfo> peers;
943 for (PeerRibList::const_iterator it = peer_rib_list_.begin();
944 it != peer_rib_list_.end(); ++it) {
946 ShowMembershipPeerInfo smpi;
948 peers.push_back(smpi);
950 stmi.set_peers(peers);
951 srit->set_membership(stmi);
965 ribin_registered_(false),
966 ribout_registered_(false),
968 subscription_gen_id_(0) {
976 assert(ribout_index_ == -1);
978 assert(!ribin_registered_);
979 assert(!ribout_registered_);
980 assert(instance_id_ == -1);
981 assert(subscription_gen_id_ == 0);
997 ribout_ = rs_->table()->RibOutLocate(sender, policy);
998 ribout_->RegisterListener();
999 ribout_->Register(ps_->peer());
1000 ribout_index_ = ribout_->GetPeerIndex(ps_->peer());
1001 ribout_registered_ =
true;
1002 rs_->EnqueuePeerRibState(
this);
1018 ribout_->Deactivate(ps_->peer());
1019 rs_->EnqueuePeerRibState(
this);
1021 assert(ribout_index_ == -1);
1022 ribout_registered_ =
false;
1023 manager_->TriggerUnregisterRibCompleteEvent(ps_->peer(), rs_->table());
1040 assert(ribout_index_ != -1);
1041 ribout_->Unregister(ps_->peer());
1044 ribout_registered_ =
false;
1051 rs_->EnqueuePeerRibState(
this);
1058 rs_->EnqueuePeerRibState(
this);
1065 ShowMembershipPeerInfo *smpi)
const {
1066 smpi->set_peer(ps_->peer()->ToString());
1067 smpi->set_ribin_registered(ribin_registered_);
1068 smpi->set_ribout_registered(ribout_registered_);
1069 smpi->set_instance_id(instance_id_);
1070 smpi->set_generation_id(subscription_gen_id_);
1077 : manager_(manager),
1080 TaskScheduler::GetInstance()->GetTaskId(
"bgp::PeerMembership"), 0)),
1081 postpone_walk_(false),
1082 walk_started_(false),
1083 walk_completed_(false),
1085 rib_state_list_size_(0),
1086 ribout_state_list_size_(0) {
1093 assert(rib_state_set_.empty());
1094 assert(rib_state_list_.empty());
1095 assert(!postpone_walk_);
1097 assert(walk_ref_ == NULL);
1098 assert(peer_rib_list_.empty());
1099 assert(peer_list_.empty());
1100 assert(ribout_state_map_.empty());
1101 assert(ribout_state_list_.empty());
1109 if (rib_state_set_.find(rs) != rib_state_set_.end())
1111 rib_state_set_.insert(rs);
1112 rib_state_list_.push_back(rs);
1113 rib_state_list_size_++;
1122 return (rib_state_list_.empty() && !trigger_->IsSet() && !rs_);
1130 RibOutStateMap::iterator loc = ribout_state_map_.find(ribout);
1131 if (loc == ribout_state_map_.end()) {
1133 ribout_state_map_.insert(make_pair(ribout, ros));
1134 ribout_state_list_.push_back(ros);
1135 ribout_state_list_size_++;
1150 for (RibOutStateList::iterator it = ribout_state_list_.begin();
1151 it != ribout_state_list_.end(); ++it) {
1159 if (peer_list_.empty())
1163 bool notify =
false;
1165 for (Route::PathList::iterator it = route->
GetPathList().begin(), next = it;
1181 if (dynamic_cast<BgpSecondaryPath *>(path))
1185 if (!peer || peer_list_.find(peer) == peer_list_.end())
1191 rs_->table()->InputCommonPostProcess(tpart, route, notify);
1202 assert(rs_->table() == table_base);
1203 walk_completed_ =
true;
1214 assert(walk_ref_ == NULL);
1216 assert(peer_rib_list_.empty());
1217 assert(peer_list_.empty());
1218 assert(ribout_state_map_.empty());
1219 assert(ribout_state_list_.empty());
1220 assert(rib_state_list_size_ == rib_state_set_.size());
1223 if (rib_state_list_.empty())
1227 rs_ = rib_state_list_.front();
1228 rib_state_list_.pop_front();
1229 rib_state_list_size_--;
1230 assert(rib_state_set_.erase(rs_) == 1);
1237 peer_rib_list_.insert(prs);
1250 peer_list_.insert(peer);
1256 peer_list_.insert(peer);
1271 rs_->ClearPeerRibStateList();
1274 rs_->increment_walk_count();
1279 walk_started_ =
true;
1280 if (!postpone_walk_)
1294 assert(walk_ref_ != NULL);
1296 assert(!peer_rib_list_.empty());
1297 assert(!peer_list_.empty() || !ribout_state_map_.empty());
1298 assert(rib_state_list_size_ == rib_state_set_.size());
1299 assert(ribout_state_list_size_ == ribout_state_map_.size());
1302 for (PeerRibList::iterator it = peer_rib_list_.begin();
1303 it != peer_rib_list_.end(); ++it) {
1309 manager_->TriggerRegisterRibCompleteEvent(peer, table);
1313 manager_->TriggerWalkRibCompleteEvent(peer, table);
1317 manager_->TriggerUnregisterRibCompleteEvent(peer, table);
1325 table->ReleaseWalker(walk_ref_);
1327 peer_rib_list_.clear();
1329 ribout_state_list_.clear();
1330 ribout_state_list_size_ = 0;
1333 walk_started_ =
false;
1334 walk_completed_ =
false;
1345 if (!walk_started_) {
1346 assert(!walk_completed_);
1348 }
else if (walk_completed_) {
1362 trigger_->set_disable();
1364 trigger_->set_enable();
1373 assert(!walk_started_);
1374 assert(walk_ref_ == NULL);
1375 postpone_walk_ =
true;
1383 assert(walk_started_);
1384 assert(!walk_completed_);
1385 assert(walk_ref_ != NULL);
1386 postpone_walk_ =
false;
PeerState(BgpMembershipManager *manager, IPeer *peer)
virtual void UnregisterRibOut(IPeer *peer, BgpTable *table)
void ProcessRegisterRibEvent(Event *event)
void ProcessUnregisterRibEvent(Event *event)
boost::function< void(IPeer *, BgpTable *, bool)> PeerRegistrationCallback
bool PeerIsRegistered(IPeerUpdate *peer) const
void ProcessUnregisterRibCompleteEvent(Event *event)
void EnqueuePeerRibState(PeerRibState *prs)
bool WalkCallback(DBTablePartBase *tpart, DBEntryBase *db_entry)
virtual bool EventCallbackInternal(Event *event)
void WalkTable(DBTableWalkRef walk)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
size_t GetMembershipCount() const
void RegisterRibOut(const RibExportPolicy &policy)
#define BGP_LOG_PEER_TABLE(peer, level, flags, tbl, arg)
virtual bool IsXmppPeer() const =0
void EnqueueEvent(Event *event)
tbb::atomic< uint64_t > total_jobs_count_
bool PeerInSync(IPeerUpdate *peer) const
boost::dynamic_bitset registration_bmap_
bool IsQueueEmpty() const
PeerRibState * LocatePeerRibState(RibState *rs)
void EnqueueRibState(RibState *rs)
tbb::spin_rw_mutex rw_mutex_
PeerStateMap peer_state_map_
bool IsRegistered(const IPeer *peer, const BgpTable *table) const
BgpMembershipManager(BgpServer *server)
DBTableWalkRef AllocWalker(WalkFn walk_fn, WalkCompleteFn walk_complete)
const BgpTable * table() const
bool Join(DBTablePartBase *root, const RibPeerSet &mjoin, DBEntryBase *db_entry)
virtual void Register(IPeer *peer, BgpTable *table, const RibExportPolicy &policy, int instance_id=-1)
bool IsQueueEmpty() const
PeerRegistrationListenerList registration_callbacks_
void TriggerWalkRibCompleteEvent(IPeer *peer, BgpTable *table)
size_t GetMembershipCount() const
void FillPeerMembershipInfo(const IPeer *peer, BgpNeighborResp *resp) const
bool RemovePeerRibState(PeerRibState *prs)
virtual bool IsInGRTimerWaitState() const =0
void DestroyPeerState(PeerState *ps)
bool GetRegistrationInfo(const IPeer *peer, const BgpTable *table, int *instance_id=NULL, uint64_t *subscription_gen_id=NULL) const
uint32_t GetQueueSize() const
void TriggerUnregisterRibCompleteEvent(IPeer *peer, BgpTable *table)
PeerRibState * LocatePeerRibState(IPeer *peer, BgpTable *table)
const RibPeerSet & leave_bitset()
void FillPeerMembershipInfo(BgpNeighborResp *resp) const
RibStateMap rib_state_map_
void WalkRibIn(IPeer *peer, BgpTable *table)
boost::scoped_ptr< Walker > walker_
RibState(BgpMembershipManager *manager, BgpTable *table)
const IPeer * peer() const
RibOutState * LocateRibOutState(RibOut *ribout)
#define BGP_LOG_FLAG_SYSLOG
void set_ribin_registered(bool value)
void LeavePeer(int index)
void DestroyPeerRibState(PeerRibState *prs)
virtual bool AssertRegisterRibIn(PeerRibState *prs, IPeer *peer, bool do_assert=true)
virtual ~BgpMembershipManager()
bool ribout_registered() const
void RegisterRibIn(IPeer *peer, BgpTable *table)
void SetQueueDisable(bool value)
bool RemovePeerRibState(PeerRibState *prs)
void UnregisterPeerRegistrationCallback(int id)
void STLDeleteElements(Container *container)
#define CHECK_CONCURRENCY(...)
void DestroyRibState(RibState *ps)
virtual void Unregister(IPeer *peer, BgpTable *table)
boost::scoped_ptr< WorkQueue< Event * > > event_queue_
void set_ribout_registered(bool value)
const std::string & name() const
void UnregisterRibIn(IPeer *peer, BgpTable *table)
RibState * LocateRibState(BgpTable *table)
Walker(BgpMembershipManager *manager)
void TriggerRegisterRibCompleteEvent(IPeer *peer, BgpTable *table)
bool ribin_registered() const
void SetRegistrationInfo(const IPeer *peer, const BgpTable *table, int instance_id, uint64_t subscription_gen_id)
void FillRoutingInstanceTableInfo(ShowRoutingInstanceTable *srit, const BgpTable *table) const
void GetRegisteredRibs(std::list< BgpTable * > *table_list) const
void UnregisterRibInUnlocked(PeerRibState *prs)
void WalkDoneCallback(DBTableBase *table)
uint64_t subscription_gen_id() const
void set_instance_id(int instance_id)
BgpUpdateSender * update_sender()
void ProcessWalkRibCompleteEvent(Event *event)
PeerRibState(BgpMembershipManager *manager, PeerState *ps, RibState *rs)
PeerRibState * FindPeerRibState(const IPeer *peer, const BgpTable *table)
virtual void MembershipRequestCallback(BgpTable *table)=0
virtual bool AssertRegister(PeerRibState *prs, bool do_assert=true)
virtual bool AssertWalkRibIn(PeerRibState *prs, bool do_assert=true)
bool Leave(DBTablePartBase *root, const RibPeerSet &mleave, DBEntryBase *db_entry)
void Enqueue(RibState *rs)
bool IsRibInRegistered(const IPeer *peer, const BgpTable *table) const
void FillRoutingInstanceTableInfo(ShowRoutingInstanceTable *srit) const
RibState * FindRibState(const BgpTable *table)
bool IsRibOutRegistered(const IPeer *peer, const BgpTable *table) const
PeerState * LocatePeerState(IPeer *peer)
tbb::atomic< uint64_t > current_jobs_count_
void set_subscription_gen_id(uint64_t subscription_gen_id)
void ProcessRegisterRibCompleteEvent(Event *event)
PeerRibState * FindPeerRibState(const RibState *rs)
Event(EventType event_type, IPeer *peer, BgpTable *table)
virtual bool MembershipPathCallback(DBTablePartBase *tpart, BgpRoute *route, BgpPath *path)=0
PeerState * FindPeerState(const IPeer *peer)
void ClearPeerRibStateList()
void GetRegisteredRibs(const IPeer *peer, std::list< BgpTable * > *table_list) const
PeerRibList::iterator iterator
void NotifyPeerRegistration(IPeer *peer, BgpTable *table, bool unregister)
BgpMembershipManager::Action action() const
int RegisterPeerRegistrationCallback(PeerRegistrationCallback callback)
virtual bool AssertUnregister(PeerRibState *prs, bool do_assert=true)
uint32_t GetRibOutQueueDepth(const IPeer *peer, const BgpTable *table) const
void set_action(BgpMembershipManager::Action action)
bool EventCallback(Event *event)
void FillMembershipInfo(ShowMembershipPeerInfo *smpi) const
const RibPeerSet & join_bitset()
const PathList & GetPathList() const
void InsertPeerRibState(PeerRibState *prs)