OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
bgp_update_sender.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef SRC_BGP_BGP_UPDATE_SENDER_H_
6 #define SRC_BGP_BGP_UPDATE_SENDER_H_
7 
8 #include <boost/ptr_container/ptr_list.hpp>
9 
10 #include <vector>
11 
12 #include "base/bitset.h"
13 #include "base/index_map.h"
14 #include "base/queue_task.h"
15 
16 class BgpServer;
17 class BgpUpdateSender;
18 class IPeerUpdate;
19 class RibOut;
20 class RibPeerSet;
21 
22 //
23 // This class maintains state to generate updates for a DB partition for all
24 // RibOuts and IPeerUpdates in a BgpServer. All BgpSenderPartitions work in
25 // parallel and maintain their own view of whether an IPeerUpdate is blocked,
26 // in sync etc.
27 //
28 // A BgpSenderPartition maintains two indexed maps for internal bookkeeping.
29 //
30 // o PeerStateMap allocates an index per IPeerUpdate to allow direct access to
31 // the corresponding PeerState using the index. The PeerState nested class
32 // is described elsewhere.
33 // o RibStateMap allocates a bit index per RibOut and allows direct access to
34 // the corresponding RibState using the index. The RibState nested class is
35 // described elsewhere.
36 //
37 // BgpSenderPartition contains a WorkQueue of WorkBase entries to represent
38 // pending work. A WorkBase can either be a WorkRibOut, which corresponds to
39 // to a tail dequeue for a (RibOut, QueueId) or a WorkPeer which corresponds
40 // to a peer dequeue.
41 //
42 // A mutex is used to control access to the WorkQueue between producers that
43 // need to enqueue WorkBase entries and the Worker which dequeues the entries
44 // and processes them. The producers are the BgpExport class which creates a
45 // WorkRibOut entry after adding a RouteUpdate to an empty UpdateQueue, and
46 // IPeerUpdate class which creates a WorkPeer entry when it becomes unblocked.
47 //
49 public:
52 
53  void Add(RibOut *ribout, IPeerUpdate *peer);
54  void Remove(RibOut *ribout, IPeerUpdate *peer);
55 
56  void RibOutActive(RibOut *ribout, int queue_id);
57 
58  void PeerSendReady(IPeerUpdate *peer);
59  bool PeerIsSendReady(IPeerUpdate *peer) const;
60  bool PeerIsRegistered(IPeerUpdate *peer) const;
61  bool PeerInSync(IPeerUpdate *peer) const;
62 
63  bool CheckInvariants() const;
64 
65  int task_id() const;
66  int index() const { return index_; }
67 
68  // For unit testing.
69  void set_disabled(bool disabled);
70 
71 private:
72  friend class BgpUpdateSenderTest;
73  friend class RibOutUpdatesTest;
74 
75  struct WorkBase {
76  enum Type {
79  };
80  explicit WorkBase(Type type)
81  : type(type), valid(true) {
82  }
84  bool valid;
85  };
86 
87  struct WorkRibOut : public WorkBase {
88  WorkRibOut(RibOut *ribout, int queue_id)
89  : WorkBase(WRibOut), ribout(ribout), queue_id(queue_id) {
90  }
92  int queue_id;
93  };
94 
95  struct WorkPeer : public WorkBase {
96  explicit WorkPeer(IPeerUpdate *peer)
97  : WorkBase(WPeer), peer(peer) {
98  }
100  };
101 
102  class PeerState;
103  struct PeerRibState;
104  class RibState;
105  class Worker;
106 
107  typedef boost::ptr_list<WorkBase> WorkQueue;
110 
111  void MaybeStartWorker();
112  std::unique_ptr<WorkBase> WorkDequeue();
113  void WorkEnqueue(WorkBase *wentry);
114  void WorkPeerEnqueue(IPeerUpdate *peer);
115  void WorkPeerInvalidate(IPeerUpdate *peer);
116  void WorkRibOutEnqueue(RibOut *ribout, int queue_id);
117  void WorkRibOutInvalidate(RibOut *ribout);
118 
119  void UpdateRibOut(RibOut *ribout, int queue_id);
120  void UpdatePeer(IPeerUpdate *peer);
121 
122  bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id);
123 
124  void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync);
125 
126  void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id,
127  const RibPeerSet &munsync);
128  void SetQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer);
129  bool IsQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer);
130  void SetSendBlocked(RibOut *ribout, int queue_id,
131  const RibPeerSet &blocked);
132  void SetSendBlocked(const RibOut *ribout, RibState *rs, int queue_id,
133  const RibPeerSet &blocked);
134  void SetQueueSync(PeerState *ps, int queue_id);
135 
137  int index_;
138  bool running_;
139  bool disabled_;
144 
146 };
147 
148 //
149 // This is a wrapper that hides the existence of multiple BgpSenderPartitions
150 // from client classes. It relays APIs to appropriate/all BgpSenderPartition
151 // instance(s).
152 //
153 // The send ready WorkQueue is needed to process send ready notifications for
154 // IPeerUpdates in the context of bgp::SendReadyTask. This ensures that there
155 // are no concurrency issues in case the IPeerUpdate gets unblocked while we
156 // are still processing the previous WorkBase which caused it to get blocked
157 // in the first place.
158 //
160 public:
161  explicit BgpUpdateSender(BgpServer *server);
163 
164  void Join(RibOut *ribout, IPeerUpdate *peer);
165  void Leave(RibOut *ribout, IPeerUpdate *peer);
166 
167  void RibOutActive(int index, RibOut *ribout, int queue_id);
168  void PeerSendReady(IPeerUpdate *peer);
169  bool PeerIsRegistered(IPeerUpdate *peer) const;
170  bool PeerInSync(IPeerUpdate *peer) const;
171 
172  int task_id() const { return task_id_; }
173  bool CheckInvariants() const;
174 
175  // For unit testing.
176  void DisableProcessing();
177  void EnableProcessing();
178 
179 private:
180  friend class BgpTestPeer;
181  friend class BgpUpdateSenderTest;
182  friend class RibOutUpdatesTest;
183 
184  bool SendReadyCallback(IPeerUpdate *peer);
185  BgpSenderPartition *partition(int index) { return partitions_[index]; }
186 
188  int task_id_;
189  std::vector<BgpSenderPartition *> partitions_;
191 
193 };
194 
195 #endif // SRC_BGP_BGP_UPDATE_SENDER_H_
RibStateMap rib_state_imap_
bool SendReadyCallback(IPeerUpdate *peer)
IndexMap< RibOut *, RibState > RibStateMap
bool PeerIsRegistered(IPeerUpdate *peer) const
void UpdatePeer(IPeerUpdate *peer)
WorkRibOut(RibOut *ribout, int queue_id)
bool IsQueueActive(RibOut *ribout, int queue_id, IPeerUpdate *peer)
bool PeerInSync(IPeerUpdate *peer) const
void UpdateRibOut(RibOut *ribout, int queue_id)
PeerStateMap peer_state_imap_
void WorkRibOutInvalidate(RibOut *ribout)
void WorkPeerInvalidate(IPeerUpdate *peer)
bool CheckInvariants() const
void RibOutActive(RibOut *ribout, int queue_id)
BgpUpdateSender(BgpServer *server)
bool PeerInSync(IPeerUpdate *peer) const
void BuildSyncBitSet(const RibOut *ribout, RibState *rs, RibPeerSet *msync)
bool PeerIsSendReady(IPeerUpdate *peer) const
BgpUpdateSender * sender_
bool CheckInvariants() const
std::unique_ptr< WorkBase > WorkDequeue()
friend class BgpUpdateSenderTest
boost::ptr_list< WorkBase > WorkQueue
bool UpdatePeerQueue(IPeerUpdate *peer, PeerState *ps, int queue_id)
void Leave(RibOut *ribout, IPeerUpdate *peer)
friend class RibOutUpdatesTest
friend class BgpTestPeer
void Join(RibOut *ribout, IPeerUpdate *peer)
void WorkRibOutEnqueue(RibOut *ribout, int queue_id)
DISALLOW_COPY_AND_ASSIGN(BgpSenderPartition)
IndexMap< IPeerUpdate *, PeerState > PeerStateMap
void RibOutActive(int index, RibOut *ribout, int queue_id)
void SetSendBlocked(RibOut *ribout, int queue_id, const RibPeerSet &blocked)
int task_id() const
void PeerSendReady(IPeerUpdate *peer)
BgpSenderPartition(BgpUpdateSender *sender, int index)
void PeerSendReady(IPeerUpdate *peer)
void WorkEnqueue(WorkBase *wentry)
friend class BgpUpdateSenderTest
BgpSenderPartition * partition(int index)
void set_disabled(bool disabled)
void Remove(RibOut *ribout, IPeerUpdate *peer)
DISALLOW_COPY_AND_ASSIGN(BgpUpdateSender)
void SetQueueSync(PeerState *ps, int queue_id)
WorkQueue< IPeerUpdate * > send_ready_queue_
friend class RibOutUpdatesTest
void Add(RibOut *ribout, IPeerUpdate *peer)
std::vector< BgpSenderPartition * > partitions_
bool PeerIsRegistered(IPeerUpdate *peer) const
void SetQueueActive(const RibOut *ribout, RibState *rs, int queue_id, const RibPeerSet &munsync)
void WorkPeerEnqueue(IPeerUpdate *peer)