OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
bgp_ribout_updates.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 
7 #include <string>
8 
9 #include "sandesh/sandesh_trace.h"
10 #include "base/task_annotations.h"
11 #include "bgp/bgp_log.h"
12 #include "bgp/bgp_peer_types.h"
13 #include "bgp/bgp_ribout.h"
14 #include "bgp/bgp_route.h"
15 #include "bgp/bgp_update_queue.h"
16 #include "bgp/bgp_update_monitor.h"
17 #include "bgp/bgp_update_sender.h"
18 #include "bgp/message_builder.h"
19 
20 using std::unique_ptr;
21 using std::vector;
22 
23 vector<Message *> RibOutUpdates::bgp_messages_;
24 vector<Message *> RibOutUpdates::xmpp_messages_;
25 
26 //
27 // Create a new RibOutUpdates. Also create the necessary UpdateQueue and
28 // add them to the vector.
29 //
31  : ribout_(ribout),
32  index_(index) {
33  for (int i = 0; i < QCOUNT; i++) {
34  UpdateQueue *queue = new UpdateQueue(ribout, i);
35  queue_vec_.push_back(queue);
36  }
37  monitor_.reset(new RibUpdateMonitor(ribout, &queue_vec_));
38  memset(&stats_, 0, sizeof(stats_));
39 }
40 
41 //
42 // Destructor. Get rid of all the UpdateQueues.
43 //
46 }
47 
48 //
49 // Initialize static vectors of bgp/xmpp message pointers to NULL.
50 //
52  bgp_messages_.resize(DB::PartitionCount(), NULL);
53  xmpp_messages_.resize(DB::PartitionCount(), NULL);
54 }
55 
56 //
57 // Free any memory allocated for bgp/xmpp messages.
58 //
62 }
63 
64 //
65 // Create if needed, and return the bgp/xmpp message for this RibOutUpdates.
66 // Note that we use static vectors of bgp/xmpp messages, one per partition,
67 // so that we don't need to allocate and free messages repeatedly.
68 //
70  if (ribout_->IsEncodingBgp()) {
71  if (!bgp_messages_[index_]) {
72  MessageBuilder *builder =
74  Message *message = builder->Create();
75  bgp_messages_[index_] = message;
76  }
77  return bgp_messages_[index_];
78  }
79  if (ribout_->IsEncodingXmpp()) {
80  if (!xmpp_messages_[index_]) {
81  MessageBuilder *builder =
83  Message *message = builder->Create();
84  xmpp_messages_[index_] = message;
85  }
86  return xmpp_messages_[index_];
87  }
88  return NULL;
89 }
90 
91 //
92 // Concurrency: Called in the context of the routing table partition task.
93 //
94 // Enqueue the RouteUpdate corresponding to the DBEntryBase into the queue.
95 // This is called in the context of the routing table partition task. All
96 // the concurrency issues are handled by going through the monitor.
97 //
98 // If the UpdateQueue corresponding to the RouteUpdate previously had no
99 // updates after the tail marker, we kick the BgpUpdateSender to perform
100 // a tail dequeue for the RibOut.
101 //
102 void RibOutUpdates::Enqueue(DBEntryBase *db_entry, RouteUpdate *rt_update) {
103  CHECK_CONCURRENCY("db::DBTable");
104 
105  bool need_tail_dequeue = monitor_->EnqueueUpdate(db_entry, rt_update);
106  if (need_tail_dequeue) {
107  ribout_->sender()->RibOutActive(index_, ribout_, rt_update->queue_id());
108  }
109 }
110 
111 //
112 // Concurrency: Called in the context of the bgp::SendUpdate task.
113 //
114 // Common dequeue routine invoked by tail dequeue and peer dequeue. It builds
115 // and sends updates for each UpdateInfo element in the list hanging off the
116 // RouteUpdate. For each update that it builds, it also includes prefixes for
117 // other UpdateInfo elements that share the same attributes, provided that
118 // the associated RouteUpdate was enqueued after the original one.
119 
120 // Each update is targeted at the peers in the RibPeerSet of the UpdateMarker
121 // passed in to us. This set of peers is subsequently culled based on the
122 // RibPeerSet in each UpdateInfo. IOW, the update is sent only to the set
123 // of peers in the intersection of the UpdateMarker and the UpdateInfo. Note
124 // that the UpdateMarker could specify a single peer if we are called from
125 // peer dequeue.
126 //
127 // Return false if all the peers in the marker get blocked. In any case, the
128 // blocked parameter is populated with the set of peers that are send blocked.
129 //
131  RouteUpdate *rt_update, RibPeerSet *blocked) {
132  CHECK_CONCURRENCY("bgp::SendUpdate");
133 
134  // Pass a hint to the Message telling it whether it needs to cache the
135  // formatted version of each route. This is used only for xmpp messages.
136  // Heuristic is to cache if there's markers other than the tail marker.
137  // The reasoning is that the cached version can be used later when the
138  // markers in question are being processed. Put another way - there's
139  // no need to cache if route is not going to be advertised to any peers
140  // other than the ones in the given UpdateMarker.
141  bool cache_routes = queue->marker_count() != 0;
142 
143  // Go through all UpdateInfo elements for the RouteUpdate.
144  int queue_id = rt_update->queue_id();
145  RibPeerSet rt_blocked;
146  for (UpdateInfoSList::List::iterator iter = rt_update->Updates()->begin();
147  iter != rt_update->Updates()->end();) {
148  // Get the UpdateInfo and move the iterator to next one before doing
149  // any processing, since we may delete the UpdateInfo further down.
150  UpdateInfo *uinfo = iter.operator->();
151  ++iter;
152 
153  // Skip if there's no overlap between the UpdateMarker and the targets
154  // for the UpdateInfo. The intersection is the set of peers to which
155  // the message we are about to build will be sent.
156  RibPeerSet msgset;
157  msgset.BuildIntersection(uinfo->target, marker->members);
158  if (msgset.empty()) {
159  continue;
160  }
161 
162  // Generate the update, merge additional updates into that message and
163  // send it message to the target RibPeerSet.
164  //
165  // In the rare case that the first route and it's attributes don't fit
166  // into the message, clear the target bits in the UpdateInfo to ensure
167  // that the UpdateQueue doesn't get wedged. However, don't update the
168  // history bits in the RouteUpdate since the message did not get sent.
169  //
170  // The Create routine has the responsibility of logging an error and
171  // incrementing any counters.
172  RibPeerSet msg_blocked;
173  stats_[queue_id].messages_built_count_++;
174  Message *message = GetMessage();
175  assert(message);
176  bool msg_built = message->Start(
177  ribout_, cache_routes, &uinfo->roattr, rt_update->route());
178  if (msg_built) {
179  UpdatePack(queue_id, message, uinfo, msgset);
180  message->Finish();
181  UpdateSend(queue_id, message, msgset, &msg_blocked);
182  }
183 
184  // Reset bits in the UpdateInfo. Note that this has already been done
185  // via UpdatePack for all the other UpdateInfo elements that we packed
186  // into this message.
187  bool empty = ClearAdvertisedBits(rt_update, uinfo, msgset, msg_built);
188  if (empty) {
189  rt_update->RemoveUpdateInfo(uinfo);
190  }
191 
192  // Update RibPeerSet of peers that got blocked while processing this
193  // RouteUpdate. Since there's no overlap of peers between UpdateInfos
194  // for the same RouteUpdate, we can update the markers for all blocked
195  // peers in one shot i.e. outside this loop.
196  rt_blocked |= msg_blocked;
197  }
198 
199  // Update the markers for any peers that got blocked while processing this
200  // RouteUpdate. If all peers in the UpdateMarker got blocked, we shouldn't
201  // build any more update messages. Return false to let the callers know
202  // that this has happened.
203  if (rt_blocked.empty()) {
204  return true;
205  } else {
206  *blocked |= rt_blocked;
207  return !UpdateMarkersOnBlocked(marker, rt_update, &rt_blocked);
208  }
209 }
210 
211 //
212 // Concurrency: Called in the context of the bgp::SendUpdate task.
213 //
214 // Dequeue and build updates for the in-sync peers in the RibPeerSet of the
215 // tail marker for the given queue id.
216 //
217 // Return false if all the peers in the marker get blocked. In any case, the
218 // blocked parameter is populated with the set of peers that are send blocked
219 // and the unsync parameter is populated with the set of peers from the tail
220 // marker that are not in the msync set passed in to the method.
221 //
222 bool RibOutUpdates::TailDequeue(int queue_id, const RibPeerSet &msync,
223  RibPeerSet *blocked, RibPeerSet *unsync) {
224  CHECK_CONCURRENCY("bgp::SendUpdate");
225 
226  stats_[queue_id].tail_dequeue_count_++;
227  UpdateQueue *queue = queue_vec_[queue_id];
228  UpdateMarker *start_marker = queue->tail_marker();
229  RouteUpdatePtr update = monitor_->GetNextUpdate(queue_id, start_marker);
230 
231  if (update.get() == NULL) {
232  return true;
233  }
234 
235  // Intersect marker membership and in-sync peers to come up with the
236  // unsync peers. If all the peers are unsync return right away. The
237  // BgpUpdateSender will take care of triggering a TailDequeue again
238  // when at least one peer becomes in-sync.
239  unsync->BuildComplement(start_marker->members, msync);
240  if (*unsync == start_marker->members) {
241  return false;
242  }
243 
244  // Split the unsync peers from the tail marker. Note that this updates
245  // the RibPeerSet in the tail marker.
246  if (!unsync->empty()) {
247  stats_[queue_id].marker_split_count_++;
248  queue->MarkerSplit(start_marker, *unsync);
249  }
250 
251  // Update send loop. Select next update to send, format a message.
252  // Add other updates with the same attributes and replicate the
253  // packet.
254  RibPeerSet members = start_marker->members;
255  RouteUpdatePtr next_update;
256  for (; update.get() != NULL; update = next_update) {
257  if (!DequeueCommon(queue, start_marker, update.get(), blocked)) {
258  // Be sure to get rid of the RouteUpdate if it's empty.
259  if (update->empty()) {
260  ClearUpdate(&update);
261  }
262 
263  return false;
264  }
265 
266  // Iterate to the next update before we potentially delete the
267  // current one. If there are no more updates in the queue, the
268  // marker will get moved so that it's after the current update.
269  next_update = monitor_->GetNextUpdate(queue_id, update.get());
270 
271  // Be sure to get rid of the RouteUpdate if it's empty.
272  if (update->empty()) {
273  ClearUpdate(&update);
274  }
275  }
276 
277  // Request peers to flush accumulated update messages.
278  // Return false if all peers got blocked.
279  UpdateFlush(members, blocked);
280  return (members != *blocked);
281 }
282 
283 //
284 // Concurrency: Called in the context of the bgp::SendUpdate task.
285 //
286 // Dequeue and build updates for all the peers that share the same marker as
287 // the specified peer. This routine has some extra intelligence beyond the
288 // TailDequeue. As it encounters update markers, it merges in any send ready
289 // peers from those with the marker being processed for dequeue. This is done
290 // to reduce the number of times we build an update message containing the
291 // the same information.
292 //
293 // Return false if all the peers in the marker get blocked. In any case, the
294 // blocked parameter is populated with the set of peers that are send blocked.
295 //
296 bool RibOutUpdates::PeerDequeue(int queue_id, IPeerUpdate *peer,
297  RibPeerSet *blocked) {
298  CHECK_CONCURRENCY("bgp::SendUpdate");
299 
300  stats_[queue_id].peer_dequeue_count_++;
301  UpdateQueue *queue = queue_vec_[queue_id];
302  int peer_idx = ribout_->GetPeerIndex(peer);
303  UpdateMarker *start_marker = queue->GetMarker(peer_idx);
304 
305  // We're done if this is the same as the tail marker. Updates will be
306  // built subsequently via TailDequeue.
307  assert(start_marker);
308  if (start_marker == queue->tail_marker()) {
309  return true;
310  }
311 
312  // We're done if the lead peer is not send ready. This can happen if
313  // the peer got blocked when processing updates in another partition.
314  RibPeerSet mready;
315  ribout_->BuildSendReadyBitSet(start_marker->members, &mready);
316  if (!mready.test(peer_idx)) {
317  blocked->set(peer_idx);
318  return false;
319  }
320 
321  // Split out any peers from the marker that are not send ready. Note that
322  // this updates the RibPeerSet in the marker.
323  RibPeerSet notready;
324  notready.BuildComplement(start_marker->members, mready);
325  if (!notready.empty()) {
326  stats_[queue_id].marker_split_count_++;
327  queue->MarkerSplit(start_marker, notready);
328  }
329 
330  // Get the encapsulator for the first RouteUpdate. Even if there's no
331  // RouteUpdate, we should find another marker or the tail marker.
332  UpdateEntry *upentry;
333  RouteUpdatePtr update =
334  monitor_->GetNextEntry(queue_id, start_marker, &upentry);
335  assert(upentry);
336 
337  // Update loop. Keep going till we reach the tail marker or till all the
338  // peers get blocked.
339  RibPeerSet members = start_marker->members;
340  RouteUpdatePtr next_update;
341  UpdateEntry *next_upentry;
342  for (; upentry != NULL; upentry = next_upentry, update = next_update) {
343  UpdateMarker *marker = NULL;
344  if (upentry->IsMarker()) {
345  // The queue entry is a marker. We're done if we've reached the
346  // tail marker. Updates will be built later via TailDequeue.
347  marker = static_cast<UpdateMarker *>(upentry);
348  if (marker == queue->tail_marker()) {
349  stats_[queue_id].marker_merge_count_++;
350  queue->MarkerMerge(queue->tail_marker(), start_marker,
351  start_marker->members);
352  break;
353  }
354  } else {
355  // The queue entry is a RouteUpdate. Go ahead and build an update
356  // message. Bail if all the peers in the marker get blocked.
357  if (!DequeueCommon(queue, start_marker, update.get(), blocked)) {
358  // Be sure to get rid of the RouteUpdate if it's empty.
359  if (update->empty()) {
360  ClearUpdate(&update);
361  }
362  break;
363  }
364  }
365 
366  // Iterate to the next element before we potentially delete the
367  // current one.
368  next_update =
369  monitor_->GetNextEntry(queue_id, upentry, &next_upentry);
370 
371  if (upentry->IsMarker()) {
372  // As the entry is a marker, merge send-ready peers from it
373  // with the marker that is being processed for dequeue. Note
374  // that this updates the RibPeerSet in the marker.
375  RibPeerSet mmove;
376  ribout_->BuildSendReadyBitSet(marker->members, &mmove);
377  if (!mmove.empty()) {
378  stats_[queue_id].marker_merge_count_++;
379  queue->MarkerMerge(start_marker, marker, mmove);
380  members |= mmove;
381  }
382  } else if (update->empty()) {
383  // Be sure to get rid of the RouteUpdate since it's empty.
384  ClearUpdate(&update);
385  }
386  }
387 
388  // Request peers to flush accumulated update messages.
389  // Return false if all peers got blocked.
390  UpdateFlush(members, blocked);
391  return (members != *blocked);
392 }
393 
394 //
395 // Concurrency: Called in the context of the bgp::SendUpdate task.
396 //
397 // Go through all the UpdateInfo elements that have the same attribute as
398 // the start parameter and pack the corresponding prefixes into the Message.
399 // The attributes and the prefix associated with start are already in the
400 // Message when this method is invoked.
401 //
402 // The set of peers for which this update is being built is represented by
403 // the msgset parameter. As the msgset has already been determined by the
404 // caller, we should only add prefixes that need to go to all the peers in
405 // the msgset.
406 //
407 void RibOutUpdates::UpdatePack(int queue_id, Message *message,
408  UpdateInfo *start_uinfo, const RibPeerSet &msgset) {
409  CHECK_CONCURRENCY("bgp::SendUpdate");
410 
411  UpdateInfo *uinfo, *next_uinfo;
412  RouteUpdatePtr next_update;
413 
414  // Walk through all the UpdateInfo elements with the same attribute in
415  // enqueue order.
416  RouteUpdatePtr update =
417  monitor_->GetAttrNext(queue_id, start_uinfo, &uinfo);
418  for (; update.get() != NULL; update = next_update, uinfo = next_uinfo) {
419  // Iterate to the next element before we potentially delete the
420  // current one.
421  next_update = monitor_->GetAttrNext(queue_id, uinfo, &next_uinfo);
422 
423  // Skip if the msgset RibPeerSet is not a subset of the target in
424  // UpdateInfo.
425  if (!uinfo->target.Contains(msgset))
426  continue;
427 
428  // Go ahead and add the route to the message. Terminate the loop
429  // if the message doesn't have room for the route. The route will
430  // get included in another update message.
431  bool success = message->AddRoute(update->route(), &uinfo->roattr);
432  if (!success) {
433  break;
434  }
435 
436  // First clear the advertised bits as represented by msgset from
437  // the target RibPeerSet in the UpdateInfo. If the target is now
438  // empty, remove the UpdateInfo from the list container in the
439  // underlying RouteUpdate.
440  //
441  // If the RouteUpdate itself is now empty i.e. there are no more
442  // UpdateInfo elements associated with it, we can get rid of it.
443  bool empty = ClearAdvertisedBits(update.get(), uinfo, msgset, true);
444  if (empty && update->RemoveUpdateInfo(uinfo)) {
445  ClearUpdate(&update);
446  }
447  }
448 }
449 
450 //
451 // Concurrency: Called in the context of the bgp::SendUpdate task.
452 //
453 // Go through all the peers in the specified RibPeerSet and send the given
454 // message to each of them. Update the blocked RibPeerSet with peers that
455 // become blocked after sending the message.
456 //
457 void RibOutUpdates::UpdateSend(int queue_id, Message *message,
458  const RibPeerSet &dst, RibPeerSet *blocked) {
459  CHECK_CONCURRENCY("bgp::SendUpdate");
460 
461  RibOut::PeerIterator iter(ribout_, dst);
462  while (iter.HasNext()) {
463  int ix_current = iter.index();
464  IPeerUpdate *peer = iter.Next();
465  size_t msgsize = 0;
466  const string *msg_str = NULL;
467  string temp;
468  const uint8_t *data = message->GetData(peer, &msgsize, &msg_str, &temp);
472  "Update size " << msgsize <<
473  " reach " << message->num_reach_routes() <<
474  " unreach " << message->num_unreach_routes());
475  }
476  stats_[queue_id].messages_sent_count_++;
477  stats_[queue_id].reach_count_ += message->num_reach_routes();
478  stats_[queue_id].unreach_count_ += message->num_unreach_routes();
479  bool more = peer->SendUpdate(data, msgsize, msg_str);
480  if (!more) {
481  blocked->set(ix_current);
482  }
483  IPeer *ipeer = dynamic_cast<IPeer *>(peer);
484  if (!ipeer) {
485  continue;
486  }
487  IPeerDebugStats *stats = ipeer->peer_stats();
488  if (stats) {
489  stats->UpdateTxReachRoute(message->num_reach_routes());
490  stats->UpdateTxUnreachRoute(message->num_unreach_routes());
491  }
492  }
493 }
494 
495 //
496 // Concurrency: Called in the context of the bgp::SendUpdate task.
497 //
498 // Go through all the peers in the specified RibPeerSet and ask them to flush
499 // i.e. send immediately, any accumulated updates. Update blocked RibPeerSet
500 // with peers that become blocked after flushing.
501 //
502 // Skip if the RibOut is XMPP.
503 //
505  CHECK_CONCURRENCY("bgp::SendUpdate");
506 
507  if (ribout_->IsEncodingXmpp())
508  return;
509 
510  RibOut::PeerIterator iter(ribout_, dst);
511  while (iter.HasNext()) {
512  int ix_current = iter.index();
513  IPeerUpdate *peer = iter.Next();
514  bool more = peer->FlushUpdate();
515  if (!more) {
516  blocked->set(ix_current);
517  }
518  }
519 }
520 
521 //
522 // Take the AdvertisedInfo history in the RouteUpdate and move it to a new
523 // RouteState. Go through the monitor to associate the new RouteState as the
524 // listener state for the Route.
525 //
527  CHECK_CONCURRENCY("bgp::SendUpdate");
528 
529  BgpRoute *route = rt_update->route();
530  RouteState *rstate = new RouteState();
531  rt_update->MoveHistory(rstate);
532  monitor_->SetEntryState(route, rstate);
533 }
534 
535 //
536 // Go through the monitor to clear the listener state for the underlying Route
537 // of the RouteUpdate.
538 //
540  CHECK_CONCURRENCY("bgp::SendUpdate");
541 
542  BgpRoute *route = rt_update->route();
543  monitor_->ClearEntryState(route);
544 }
545 
546 //
547 // Called when the RouteUpdate encapsulated by the RouteUpdatePtr has no more
548 // UpdateInfo elements. Releases ownership of the RouteUpdate and deletes the
549 // RouteUpdate, as well as any associated UpdateList if appropriate.
550 //
552  CHECK_CONCURRENCY("bgp::SendUpdate");
553 
554  // Dequeue the route update.
555  RouteUpdate *rt_update = update->get();
556  monitor_->DequeueUpdate(rt_update);
557 
558  if (rt_update->OnUpdateList()) {
559  // Remove the route update from the update list and check if the list
560  // can now be downgraded back to a route update.
561  UpdateList *uplist = rt_update->GetUpdateList(ribout_);
562  uplist->RemoveUpdate(rt_update);
563  RouteUpdate *last_rt_update = uplist->MakeRouteUpdate();
564 
565  // If we were able to downgrade, set the DBEntry to point to the last
566  // remaining route update and get rid of the current route update and
567  // the update list. Otherwise, just get rid of the route update.
568  if (last_rt_update) {
569  monitor_->SetEntryState(rt_update->route(), last_rt_update);
570  update->release();
571  delete rt_update;
572  delete uplist;
573  } else {
574  update->release();
575  delete rt_update;
576  }
577  } else {
578  // Store the history from the route update or clear the state for the
579  // DBEntry depending on whether we advertised the route. In either
580  // case, get rid of the route update.
581  if (rt_update->IsAdvertised()) {
582  StoreHistory(rt_update);
583  } else {
584  ClearState(rt_update);
585  }
586  update->release();
587  delete rt_update;
588  }
589 }
590 
591 //
592 // Clear the advertised bits specified by isect from the target RibPeerSet in
593 // the UpdateInfo. If the target is now empty, remove the UpdateInfo from the
594 // set container in the UpdateQueue. Note that the UpdateInfo will still be
595 // on the SList in the RouteUpdate.
596 //
597 // Return true if the UpdateInfo was removed from the set container.
598 //
600  UpdateInfo *uinfo, const RibPeerSet &isect, bool update_history) {
601  CHECK_CONCURRENCY("bgp::SendUpdate");
602 
603  if (update_history) {
604  rt_update->UpdateHistory(ribout_, &uinfo->roattr, isect);
605  }
606  uinfo->target.Reset(isect);
607  bool empty = uinfo->target.empty();
608  if (empty) {
609  UpdateQueue *queue = queue_vec_[rt_update->queue_id()];
610  queue->AttrDequeue(uinfo);
611  }
612  return empty;
613 }
614 
615 //
616 // Concurrency: Called in the context of the bgp::SendUpdate task.
617 //
618 // Update the markers for all the peers in the blocked RibPeerSet. In the
619 // general case we clear the blocked RibPeerSet from the UpdateMarker and
620 // create a new UpdateMarker for the blocked peers.
621 //
622 // Return true in the special case where all peers in the UpdateMarker have
623 // become blocked.
624 //
626  RouteUpdate *rt_update,
627  const RibPeerSet *blocked) {
628  CHECK_CONCURRENCY("bgp::SendUpdate");
629 
630  assert(!blocked->empty());
631  int queue_id = rt_update->queue_id();
632  UpdateQueue *queue = queue_vec_[queue_id];
633 
634  // If all the peers in the UpdateMarker are blocked, we simply move the
635  // marker after the RouteUpdate.
636  if (marker->members == *blocked) {
637  stats_[queue_id].marker_move_count_++;
638  queue->MoveMarker(marker, rt_update);
639  return true;
640  }
641 
642  // Reset bits in the specified UpdateMarker, create a new one for the
643  // blocked peers and insert the new one after the RouteUpdate.
644  marker->members.Reset(*blocked);
645  assert(!marker->members.empty());
646  UpdateMarker *new_marker = new UpdateMarker();
647  new_marker->members = *blocked;
648  stats_[queue_id].marker_split_count_++;
649  queue->AddMarker(new_marker, rt_update);
650 
651  return false;
652 }
653 
654 bool RibOutUpdates::Empty() const {
655  for (int i = 0; i < RibOutUpdates::QCOUNT; ++i) {
657  if (!queue->empty()) {
658  return false;
659  }
660  }
661  return true;
662 }
663 
664 size_t RibOutUpdates::queue_size(int queue_id) const {
665  const UpdateQueue *queue = queue_vec_[queue_id];
666  return queue->size();
667 }
668 
669 size_t RibOutUpdates::queue_marker_count(int queue_id) const {
670  const UpdateQueue *queue = queue_vec_[queue_id];
671  return queue->marker_count();
672 }
673 
674 bool RibOutUpdates::QueueJoin(int queue_id, int bit) {
675  UpdateQueue *queue = queue_vec_[queue_id];
676  return queue->Join(bit);
677 }
678 
679 void RibOutUpdates::QueueLeave(int queue_id, int bit) {
680  UpdateQueue *queue = queue_vec_[queue_id];
681  queue->Leave(bit);
682 }
683 
684 //
685 // Add statistics information to the provided Stats structure.
686 //
687 void RibOutUpdates::AddStatisticsInfo(int queue_id, Stats *stats) const {
690  stats->reach_count_ += stats_[queue_id].reach_count_;
691  stats->unreach_count_ += stats_[queue_id].unreach_count_;
692  stats->tail_dequeue_count_ += stats_[queue_id].tail_dequeue_count_;
693  stats->peer_dequeue_count_ += stats_[queue_id].peer_dequeue_count_;
694  stats->marker_split_count_ += stats_[queue_id].marker_split_count_;
695  stats->marker_merge_count_ += stats_[queue_id].marker_merge_count_;
696  stats->marker_move_count_ += stats_[queue_id].marker_move_count_;
697 }
virtual bool AddRoute(const BgpRoute *route, const RibOutAttr *roattr)=0
uint64_t num_reach_routes() const
int GetPeerIndex(IPeerUpdate *peer) const
Definition: bgp_ribout.cc:536
int index() const
Definition: bgp_ribout.h:276
static MessageBuilder * GetInstance(RibExportPolicy::Encoding encoding)
virtual void Finish()=0
bool DequeueCommon(UpdateQueue *queue, UpdateMarker *marker, RouteUpdate *rt_update, RibPeerSet *blocked)
void STLDeleteValues(Container *container)
Definition: util.h:101
void StoreHistory(RouteUpdate *rt_update)
RouteUpdate * MakeRouteUpdate()
Definition: bgp_update.cc:492
RibOutAttr roattr
Definition: bgp_update.h:100
virtual void UpdateTxReachRoute(uint64_t count)=0
bool test(size_t pos) const
Definition: bitset.cc:146
size_t queue_size(int queue_id) const
void Enqueue(DBEntryBase *db_entry, RouteUpdate *rt_update)
void ClearUpdate(RouteUpdatePtr *update)
void UpdatePack(int queue_id, Message *message, UpdateInfo *start_uinfo, const RibPeerSet &isect)
void Reset(const BitSet &rhs)
Definition: bitset.cc:470
virtual IPeerDebugStats * peer_stats()=0
RibOutUpdates(RibOut *ribout, int index)
virtual void UpdateTxUnreachRoute(uint64_t count)=0
int queue_id() const
Definition: bgp_update.h:229
void BuildIntersection(const BitSet &lhs, const BitSet &rhs)
Definition: bitset.cc:509
void MarkerMerge(UpdateMarker *dst_marker, UpdateMarker *src_marker, const RibPeerSet &bitset)
static SandeshLevel::type LoggingLevel()
Definition: p/sandesh.h:221
bool OnUpdateList()
Definition: bgp_update.h:220
bool RemoveUpdateInfo(UpdateInfo *uinfo)
Definition: bgp_update.cc:60
bool Empty() const
bool HasNext() const
Definition: bgp_ribout.h:268
RibPeerSet members
Definition: bgp_update.h:55
void QueueLeave(int queue_id, int bit)
virtual bool PeerDequeue(int queue_id, IPeerUpdate *peer, RibPeerSet *blocked)
Definition: ipeer.h:186
size_t queue_marker_count(int queue_id) const
void ClearState(RouteUpdate *rt_update)
void AddMarker(UpdateMarker *marker, RouteUpdate *rt_update)
bool empty() const
Definition: bitset.cc:165
UpdateMarker * tail_marker()
bool UpdateMarkersOnBlocked(UpdateMarker *marker, RouteUpdate *rt_update, const RibPeerSet *blocked)
uint64_t num_unreach_routes() const
bool IsMarker()
Definition: bgp_update.h:36
UpdateQueue * queue(int queue_id)
void Leave(int bit)
bool Join(int bit)
#define BGP_LOG_FLAG_SYSLOG
Definition: bgp_log.h:42
void UpdateFlush(const RibPeerSet &dst, RibPeerSet *blocked)
boost::scoped_ptr< RibUpdateMonitor > monitor_
UpdateMarker * GetMarker(int bit)
RouteUpdate * get()
RibPeerSet target
Definition: bgp_update.h:103
BgpUpdateSender * sender()
Definition: bgp_ribout.h:306
void MoveHistory(RouteState *rstate)
Definition: bgp_update.cc:296
#define BGP_PEER_DIR_OUT
Definition: bgp_log.h:138
#define CHECK_CONCURRENCY(...)
static void Initialize()
bool IsEncodingBgp() const
Definition: bgp_ribout.h:331
virtual const uint8_t * GetData(IPeerUpdate *peer_update, size_t *lenp, const std::string **msg_str, std::string *temp)=0
static std::vector< Message * > bgp_messages_
void AttrDequeue(UpdateInfo *current_uinfo)
size_t marker_count() const
bool QueueJoin(int queue_id, int bit)
BgpRoute * route()
Definition: bgp_update.h:227
virtual bool FlushUpdate()
Definition: ipeer.h:38
BitSet & set(size_t pos)
Definition: bitset.cc:125
size_t size() const
virtual bool Start(const RibOut *ribout, bool cache_routes, const RibOutAttr *roattr, const BgpRoute *route)=0
void RibOutActive(int index, RibOut *ribout, int queue_id)
virtual bool SendUpdate(const uint8_t *msg, size_t msgsize)=0
void AddStatisticsInfo(int queue_id, Stats *stats) const
bool IsEncodingXmpp() const
Definition: bgp_ribout.h:328
virtual bool TailDequeue(int queue_id, const RibPeerSet &msync, RibPeerSet *blocked, RibPeerSet *unsync)
void MoveMarker(UpdateMarker *marker, RouteUpdate *rt_update)
void BuildComplement(const BitSet &lhs, const BitSet &rhs)
Definition: bitset.cc:486
bool empty() const
Stats stats_[QCOUNT]
void UpdateHistory(RibOut *ribout, const RibOutAttr *roattr, const RibPeerSet &bits)
Definition: bgp_update.cc:319
bool empty() const
Definition: bgp_update.h:235
bool IsAdvertised() const
Definition: bgp_update.cc:375
void BuildSendReadyBitSet(const RibPeerSet &peerset, RibPeerSet *mready) const
Definition: bgp_ribout.cc:426
static int PartitionCount()
Definition: db.cc:32
static void Terminate()
void RemoveUpdate(RouteUpdate *rt_update)
Definition: bgp_update.cc:467
static SandeshLevel::type LoggingUtLevel()
Definition: p/sandesh.h:222
UpdateList * GetUpdateList(RibOut *ribout)
Definition: bgp_update.cc:405
static std::vector< Message * > xmpp_messages_
IPeerUpdate * Next()
Definition: bgp_ribout.h:271
virtual Message * Create() const =0
Message * GetMessage() const
void MarkerSplit(UpdateMarker *marker, const RibPeerSet &msplit)
void UpdateSend(int queue_id, Message *message, const RibPeerSet &dst, RibPeerSet *blocked)
#define BGP_LOG_PEER(type, peer, level, flags, dir, arg)
Definition: bgp_log.h:159
UpdateInfoSList & Updates()
Definition: bgp_update.h:214
virtual ~RibOutUpdates()
bool ClearAdvertisedBits(RouteUpdate *rt_update, UpdateInfo *uinfo, const RibPeerSet &bits, bool update_history)