9 #include "sandesh/sandesh_trace.h"
12 #include "bgp/bgp_peer_types.h"
20 using std::unique_ptr;
33 for (
int i = 0; i <
QCOUNT; i++) {
105 bool need_tail_dequeue =
monitor_->EnqueueUpdate(db_entry, rt_update);
106 if (need_tail_dequeue) {
144 int queue_id = rt_update->
queue_id();
146 for (UpdateInfoSList::List::iterator iter = rt_update->
Updates()->begin();
147 iter != rt_update->
Updates()->end();) {
158 if (msgset.
empty()) {
176 bool msg_built = message->
Start(
181 UpdateSend(queue_id, message, msgset, &msg_blocked);
196 rt_blocked |= msg_blocked;
203 if (rt_blocked.
empty()) {
206 *blocked |= rt_blocked;
231 if (update.
get() == NULL) {
240 if (*unsync == start_marker->
members) {
246 if (!unsync->
empty()) {
256 for (; update.
get() != NULL; update = next_update) {
259 if (update->
empty()) {
269 next_update =
monitor_->GetNextUpdate(queue_id, update.
get());
272 if (update->
empty()) {
280 return (members != *blocked);
307 assert(start_marker);
316 if (!mready.
test(peer_idx)) {
317 blocked->
set(peer_idx);
325 if (!notready.
empty()) {
334 monitor_->GetNextEntry(queue_id, start_marker, &upentry);
342 for (; upentry != NULL; upentry = next_upentry, update = next_update) {
357 if (!
DequeueCommon(queue, start_marker, update.get(), blocked)) {
359 if (update->empty()) {
369 monitor_->GetNextEntry(queue_id, upentry, &next_upentry);
377 if (!mmove.
empty()) {
382 }
else if (update->empty()) {
391 return (members != *blocked);
417 monitor_->GetAttrNext(queue_id, start_uinfo, &uinfo);
418 for (; update.
get() != NULL; update = next_update, uinfo = next_uinfo) {
421 next_update =
monitor_->GetAttrNext(queue_id, uinfo, &next_uinfo);
425 if (!uinfo->target.Contains(msgset))
431 bool success = message->
AddRoute(update->
route(), &uinfo->roattr);
463 int ix_current = iter.
index();
466 const string *msg_str = NULL;
468 const uint8_t *data = message->
GetData(peer, &msgsize, &msg_str, &temp);
472 "Update size " << msgsize <<
479 bool more = peer->
SendUpdate(data, msgsize, msg_str);
481 blocked->
set(ix_current);
512 int ix_current = iter.
index();
516 blocked->
set(ix_current);
532 monitor_->SetEntryState(route, rstate);
568 if (last_rt_update) {
603 if (update_history) {
630 assert(!blocked->
empty());
631 int queue_id = rt_update->
queue_id();
636 if (marker->
members == *blocked) {
647 new_marker->members = *blocked;
657 if (!queue->
empty()) {
666 return queue->
size();
676 return queue->
Join(bit);
virtual bool AddRoute(const BgpRoute *route, const RibOutAttr *roattr)=0
uint64_t num_reach_routes() const
uint64_t peer_dequeue_count_
int GetPeerIndex(IPeerUpdate *peer) const
static MessageBuilder * GetInstance(RibExportPolicy::Encoding encoding)
bool DequeueCommon(UpdateQueue *queue, UpdateMarker *marker, RouteUpdate *rt_update, RibPeerSet *blocked)
void STLDeleteValues(Container *container)
uint64_t marker_move_count_
void StoreHistory(RouteUpdate *rt_update)
uint64_t tail_dequeue_count_
RouteUpdate * MakeRouteUpdate()
virtual void UpdateTxReachRoute(uint64_t count)=0
bool test(size_t pos) const
size_t queue_size(int queue_id) const
void Enqueue(DBEntryBase *db_entry, RouteUpdate *rt_update)
void ClearUpdate(RouteUpdatePtr *update)
void UpdatePack(int queue_id, Message *message, UpdateInfo *start_uinfo, const RibPeerSet &isect)
void Reset(const BitSet &rhs)
uint64_t messages_built_count_
virtual IPeerDebugStats * peer_stats()=0
RibOutUpdates(RibOut *ribout, int index)
virtual void UpdateTxUnreachRoute(uint64_t count)=0
void BuildIntersection(const BitSet &lhs, const BitSet &rhs)
void MarkerMerge(UpdateMarker *dst_marker, UpdateMarker *src_marker, const RibPeerSet &bitset)
static SandeshLevel::type LoggingLevel()
bool RemoveUpdateInfo(UpdateInfo *uinfo)
uint64_t messages_sent_count_
void QueueLeave(int queue_id, int bit)
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer, RibPeerSet *blocked)
size_t queue_marker_count(int queue_id) const
void ClearState(RouteUpdate *rt_update)
void AddMarker(UpdateMarker *marker, RouteUpdate *rt_update)
UpdateMarker * tail_marker()
bool UpdateMarkersOnBlocked(UpdateMarker *marker, RouteUpdate *rt_update, const RibPeerSet *blocked)
uint64_t num_unreach_routes() const
UpdateQueue * queue(int queue_id)
#define BGP_LOG_FLAG_SYSLOG
void UpdateFlush(const RibPeerSet &dst, RibPeerSet *blocked)
uint64_t marker_merge_count_
boost::scoped_ptr< RibUpdateMonitor > monitor_
UpdateMarker * GetMarker(int bit)
BgpUpdateSender * sender()
void MoveHistory(RouteState *rstate)
uint64_t marker_split_count_
#define CHECK_CONCURRENCY(...)
bool IsEncodingBgp() const
virtual const uint8_t * GetData(IPeerUpdate *peer_update, size_t *lenp, const std::string **msg_str, std::string *temp)=0
static std::vector< Message * > bgp_messages_
void AttrDequeue(UpdateInfo *current_uinfo)
size_t marker_count() const
bool QueueJoin(int queue_id, int bit)
virtual bool FlushUpdate()
virtual bool Start(const RibOut *ribout, bool cache_routes, const RibOutAttr *roattr, const BgpRoute *route)=0
void RibOutActive(int index, RibOut *ribout, int queue_id)
virtual bool SendUpdate(const uint8_t *msg, size_t msgsize)=0
void AddStatisticsInfo(int queue_id, Stats *stats) const
bool IsEncodingXmpp() const
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync, RibPeerSet *blocked, RibPeerSet *unsync)
void MoveMarker(UpdateMarker *marker, RouteUpdate *rt_update)
void BuildComplement(const BitSet &lhs, const BitSet &rhs)
void UpdateHistory(RibOut *ribout, const RibOutAttr *roattr, const RibPeerSet &bits)
bool IsAdvertised() const
void BuildSendReadyBitSet(const RibPeerSet &peerset, RibPeerSet *mready) const
static int PartitionCount()
void RemoveUpdate(RouteUpdate *rt_update)
static SandeshLevel::type LoggingUtLevel()
UpdateList * GetUpdateList(RibOut *ribout)
static std::vector< Message * > xmpp_messages_
virtual Message * Create() const =0
Message * GetMessage() const
void MarkerSplit(UpdateMarker *marker, const RibPeerSet &msplit)
void UpdateSend(int queue_id, Message *message, const RibPeerSet &dst, RibPeerSet *blocked)
#define BGP_LOG_PEER(type, peer, level, flags, dir, arg)
UpdateInfoSList & Updates()
bool ClearAdvertisedBits(RouteUpdate *rt_update, UpdateInfo *uinfo, const RibPeerSet &bits, bool update_history)