OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
ifmap_update_sender.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 #include "base/task.h"
7 #include "ifmap/ifmap_client.h"
8 #include "ifmap/ifmap_server.h"
9 #include "ifmap/ifmap_exporter.h"
10 #include "ifmap/ifmap_log.h"
11 #include "ifmap/ifmap_log_types.h"
12 #include "ifmap/ifmap_update.h"
14 
15 using namespace std;
16 
18  IFMapUpdateQueue *queue)
19  : server_(server), queue_(queue), message_(new IFMapMessage()),
20  task_scheduled_(false), queue_active_(false) {
21 }
22 
24  delete(message_);
25 }
26 
28 public:
29  explicit SendTask(IFMapUpdateSender *sender)
30  : Task(TaskScheduler::GetInstance()->GetTaskId("db::IFMapTable"), 0),
31  sender_(sender) {
32  }
33  virtual bool Run() {
34  BitSet send_scheduled;
35  sender_->GetSendScheduled(&send_scheduled);
36  sender_->send_blocked_.Reset(send_scheduled);
37  for (size_t i = send_scheduled.find_first(); i != BitSet::npos;
38  i = send_scheduled.find_next(i)) {
39  // Dequeue from client marker (i).
40  IFMAP_UPD_SENDER_TRACE(IFMapUSSendScheduled, "Send scheduled for",
41  send_scheduled.ToNumberedString(), "client", i,
44  }
45  if (sender_->queue_active_) {
46  // Dequeue from tail marker.
47  // Reset queue_active_
48  IFMAP_UPD_SENDER_TRACE(IFMapUSQueueActive, "Queue active for",
51  sender_->queue_active_ = false;
52  }
53  return true;
54  }
55 
56  std::string Description() const { return "IFMapUpdateSender::SendTask"; }
57 private:
59 };
60 
62  if (!task_scheduled_) {
63  // create new task
64  SendTask *send_task = new SendTask(this);
66  scheduler->Enqueue(send_task);
67  task_scheduled_ = true;
68  }
69 }
70 
72  if (queue_active_) {
73  return;
74  }
75  queue_active_ = true;
76  tbb::mutex::scoped_lock lock(mutex_);
77  StartTask();
78 }
79 
81  tbb::mutex::scoped_lock lock(mutex_);
82  send_scheduled_.set(index);
83  StartTask();
84 }
85 
87  tbb::mutex::scoped_lock lock(mutex_);
88  *current = send_scheduled_;
90  task_scheduled_ = false;
91 }
92 
94  tbb::mutex::scoped_lock lock(mutex_);
95  send_scheduled_.reset(index);
96  send_blocked_.reset(index);
97 }
98 
99 // We return only under 2 conditions:
100 // 1. All the clients in the marker are blocked.
101 // 2. We have finished traversing the Q.
102 // Invariant: while we are traversing the Q, the marker that we are working
103 // with only has ready clients. As soon as a client blocks, we split it out and
104 // continue with the ready set.
106  IFMapMarker *marker = imarker;
107 
108  // Get the clients in this marker that are blocked. If all of the clients in
109  // this marker are blocked, we are done.
110  BitSet blocked_clients;
111  blocked_clients = (marker->mask & send_blocked_);
112  if (blocked_clients == marker->mask) {
113  return;
114  }
115 
116  // If any of the clients are blocked, create a new marker for the set of
117  // blocked clients, insert it before marker and continue with the ready
118  // set.
119  if (!blocked_clients.empty()) {
120  IFMAP_UPD_SENDER_TRACE(IFMapUSSplitBlocked, "Splitting blocked clients",
121  blocked_clients.ToNumberedString(), "from", marker->ToString());
122  queue_->MarkerSplitBefore(marker, marker, blocked_clients);
123  }
124 
125  IFMapListEntry *next = queue_->Next(marker);
126  BitSet base_send_set;
127 
128  // Start with the node after the 'marker'
129  for (IFMapListEntry *curr = next; curr != NULL; curr = next) {
130  next = queue_->Next(curr);
131 
132  if (curr->IsMarker()) {
133  IFMapMarker *next_marker = static_cast<IFMapMarker *>(curr);
134  // Processing the next_marker can change the send_set and all
135  // clients in the next_marker should have already seen the updates
136  // currently sitting in the buffer. So, flush the buffer to the
137  // existing client-set before processing the marker so that we dont
138  // send duplicates.
139  if (!message_->IsEmpty()) {
140  BitSet blocked_set;
141  SendUpdate(base_send_set, &blocked_set);
142  }
143  bool done;
144  marker = ProcessMarker(marker, next_marker, &done);
145  if (done) {
146  // All the clients in this marker are blocked. We are done.
147  return;
148  }
149  // marker has the ready clients. Continue as if we are starting
150  // fresh.
151  base_send_set.clear();
152  continue;
153  }
154 
155  // ...else its an update or delete
156 
157  IFMapUpdate *update = static_cast<IFMapUpdate *>(curr);
158  BitSet send_set = update->advertise() & marker->mask;
159  if (send_set.empty()) {
160  continue;
161  }
162 
163  if (base_send_set.empty()) {
164  base_send_set = send_set;
165  }
166 
167  // Flush the message to all possible clients if:
168  // 1. The buffer is full OR
169  // 2. The send_set is changing and buffer is filled.
170  if (message_->IsFull() ||
171  ((base_send_set != send_set) && !message_->IsEmpty())) {
172 
173  BitSet blocked_set;
174  SendUpdate(base_send_set, &blocked_set);
175  if (!blocked_set.empty()) {
176  // All the clients in this marker are blocked. We are done.
177  if (blocked_set == marker->mask) {
178  IFMAP_UPD_SENDER_TRACE(IFMapUSAllBlocked, marker->ToString(),
179  "blocked before", curr->ToString());
180  queue_->MoveMarkerBefore(marker, curr);
181  return;
182  }
183  // Only a subset of clients in this marker are blocked. Insert
184  // a marker for them 'before' curr since they have seen
185  // everything before curr. Let the ready clients continue the
186  // traversal.
187  IFMAP_UPD_SENDER_TRACE(IFMapUSSubsetBlocked, "Clients",
188  blocked_set.ToNumberedString(), "blocked before",
189  curr->ToString(), "and split from", marker->ToString());
190  queue_->MarkerSplitBefore(marker, curr, blocked_set);
191  send_set.Reset(blocked_set);
192  }
193 
194  // The send_set for this marker is changing. Pick up the new one.
195  base_send_set = send_set;
196  }
197 
198  // base_send_set is same as send_set at this point.
199  ProcessUpdate(update, base_send_set);
200  }
201 
202  // The buffer will be filled in the common case of updates being added
203  // after the tail_marker.
204  BitSet blk_set;
205  if (!message_->IsEmpty()) {
206  SendUpdate(base_send_set, &blk_set);
207  }
208  // If the last node in the Q was the tail_marker, we would have already
209  // flushed the buffer and merged with it and we would be the last node in
210  // the Q.
211  IFMapListEntry *last = queue_->GetLast();
212  if (marker != last) {
213  // Since we have reached the end of the Q, we better be the tail_marker
214  assert(marker == queue_->tail_marker());
215  // If we have any blocked clients, splitting markers for them is not
216  // useful at this point. Just move the marker to the end of the Q,
217  // immediately after last, even if it has blocked clients. Being lazy
218  // is advantageous since by the time we get the next trigger, a blocked
219  // client could have become ready and splitting the marker now would be
220  // useless.
221  IFMAP_UPD_SENDER_TRACE(IFMapUSMoveAfterLast, "Moving", marker->ToString(),
222  "before", last->ToString(), "with blocked_set",
223  blk_set.ToNumberedString());
224  queue_->MoveMarkerAfter(marker, last);
225  }
226  return;
227 }
228 
230  const BitSet &base_send_set) {
231  LogAndCountSentUpdate(update, base_send_set);
232 
233  // Append the contents of the update-node to the message.
234  message_->EncodeUpdate(update);
235 
236  // Clean up the node if everybody has seen it.
237  update->AdvertiseReset(base_send_set);
238  if (update->advertise().empty()) {
239  queue_->Dequeue(update);
240  }
241  // Update may be freed.
242  server_->exporter()->StateUpdateOnDequeue(update, base_send_set,
243  update->IsDelete());
244 }
245 
246 // blocked_set is a subset of send_set
247 void IFMapUpdateSender::SendUpdate(BitSet send_set, BitSet *blocked_set) {
248  IFMapClient *client;
249  bool send_result;
250 
251  assert(!message_->IsEmpty());
252 
253  for (size_t i = send_set.find_first(); i != BitSet::npos;
254  i = send_set.find_next(i)) {
255  assert(!send_blocked_.test(i));
256  client = server_->GetClient(i);
257  assert(client);
258 
259  message_->SetReceiverInMsg(client->identifier());
260  // Close the message to save the document as string
261  message_->Close();
262 
263  // Send the string version of the message to the client.
264  send_result = client->SendUpdate(message_->get_string());
265 
266  // Keep track of all the clients whose buffers are full.
267  if (!send_result) {
268  blocked_set->set(i);
269  send_blocked_.set(i);
270  }
271  }
272  // Reset the message to init things for the next message
273  message_->Reset();
274 }
275 
276 // marker is before next_marker in the Q. next_marker could be the tail_marker.
277 // 'done' is set to true only if all the clients in the union of the
278 // client-sets of the 2 markers are blocked.
280  IFMapMarker *next_marker,
281  bool *done) {
282  // There should never be a marker beyond the tail_marker
283  assert(marker != queue_->tail_marker());
284 
285  // Get the union (total_set) of the client-sets in the 2 markers. Then, get
286  // the subset of clients in the union that are blocked (blocked_set). The
287  // remaining subset of clients are ready (ready_set).
288  BitSet total_set = (marker->mask | next_marker->mask);
289  BitSet blocked_set = (total_set & send_blocked_);
290  BitSet ready_set;
291  ready_set.BuildComplement(total_set, blocked_set); // *this = lhs & ~rhs
292 
293  // If all the clients are ready or all are blocked, merge marker into
294  // next_marker. marker will be deleted.
295  if (blocked_set.empty() || ready_set.empty()) {
296  IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerMerge, "Merging", marker->ToString(),
297  "into", next_marker->ToString());
298  queue_->MarkerMerge(next_marker, marker, marker->mask);
299  assert(next_marker->mask == total_set);
300  } else {
301  // We have both, ready and blocked, clients. First, merge both the
302  // markers into next_marker so that next_marker has the total_set. Then
303  // split next_marker into 2 markers: first with the blocked_set and the
304  // second with the ready_set, with first(blocked) preceding the
305  // second(ready).
306  IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerMerge, "Merging", marker->ToString(),
307  "into", next_marker->ToString());
308  queue_->MarkerMerge(next_marker, marker, marker->mask);
309  assert(next_marker->mask == total_set);
310  IFMAP_UPD_SENDER_TRACE(IFMapUSMarkerSplit, "Splitting blocked clients",
311  blocked_set.ToNumberedString(), "from", next_marker->ToString());
312  queue_->MarkerSplitBefore(next_marker, next_marker, blocked_set);
313  }
314  if (ready_set.empty()) {
315  // If all the clients are blocked, we are done.
316  *done = true;
317  } else {
318  // Atleast some clients are ready to continue.
319  *done = false;
320  }
321 
322  // next_marker has the ready_set if done is false
323  return next_marker;
324 }
325 
327  const BitSet &base_send_set) {
328  size_t total = base_send_set.count();
329  // Avoid dealing with return value of BitSet::npos
330  if (total) {
331  string name = update->ConfigName();
332  string operation = update->TypeToString();
333  size_t client_id = base_send_set.find_first();
334  while (total--) {
335  IFMapClient *client = server_->GetClient(client_id);
336  if (client) {
337  IFMAP_DEBUG_ONLY(IFMapClientSendInfo, operation, name,
338  client->identifier(), client->name());
339  if (update->IsNode()) {
340  if (update->IsUpdate()) {
341  client->incr_update_nodes_sent();
342  } else if (update->IsDelete()) {
343  client->incr_delete_nodes_sent();
344  } else {
345  assert(0);
346  }
347  } else if (update->IsLink()) {
348  if (update->IsUpdate()) {
349  client->incr_update_links_sent();
350  } else if (update->IsDelete()) {
351  client->incr_delete_links_sent();
352  } else {
353  assert(0);
354  }
355  }
356  }
357  client_id = base_send_set.find_next(client_id);
358  }
359  }
360 }
361 
void ProcessUpdate(IFMapUpdate *update, const BitSet &base_send_set)
bool IsLink() const
Definition: ifmap_update.h:100
virtual const std::string & identifier() const =0
void incr_delete_nodes_sent()
Definition: ifmap_client.h:46
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
bool test(size_t pos) const
Definition: bitset.cc:146
#define IFMAP_UPD_SENDER_TRACE(obj,...)
Definition: ifmap_log.h:57
IFMapUpdateQueue * queue_
void MoveMarkerBefore(IFMapMarker *marker, IFMapListEntry *current)
virtual void QueueActive()
BitSet & reset(size_t pos)
Definition: bitset.cc:136
void incr_delete_links_sent()
Definition: ifmap_client.h:48
void MoveMarkerAfter(IFMapMarker *marker, IFMapListEntry *current)
void Reset(const BitSet &rhs)
Definition: bitset.cc:470
IFMapMarker * tail_marker()
size_t count() const
Definition: bitset.cc:194
IFMapListEntry * GetLast()
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
IFMapExporter * exporter()
Definition: ifmap_server.h:86
bool IsDelete() const
Definition: ifmap_update.h:66
const BitSet & advertise() const
Definition: ifmap_update.h:94
virtual void SendActive(int index)
const std::string & get_string() const
Definition: ifmap_encoder.h:28
void AdvertiseReset(const BitSet &set)
Definition: ifmap_update.cc:59
void SendUpdate(BitSet send_set, BitSet *blocked_set)
#define IFMAP_DEBUG_ONLY(obj,...)
Definition: ifmap_log.h:39
void EncodeUpdate(const IFMapUpdate *update)
bool empty() const
Definition: bitset.cc:165
IFMapMarker * GetMarker(int bit)
void Dequeue(IFMapUpdate *update)
void LogAndCountSentUpdate(IFMapUpdate *update, const BitSet &base_send_set)
void incr_update_links_sent()
Definition: ifmap_client.h:47
IFMapClient * GetClient(int index)
bool IsUpdate() const
Definition: ifmap_update.h:65
static const size_t npos
Definition: bitset.h:19
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
std::string TypeToString()
Definition: ifmap_update.h:67
IFMapMarker * ProcessMarker(IFMapMarker *marker, IFMapMarker *next_marker, bool *done)
void clear()
Definition: bitset.cc:158
std::string ConfigName()
Definition: ifmap_update.cc:43
IFMapMarker * MarkerSplitBefore(IFMapMarker *marker, IFMapListEntry *current, const BitSet &msplit)
Definition: bitset.h:17
BitSet & set(size_t pos)
Definition: bitset.cc:125
void StateUpdateOnDequeue(IFMapUpdate *update, const BitSet &dequeue_set, bool is_delete)
int GetTaskId() const
Definition: task.h:118
void MarkerMerge(IFMapMarker *dst, IFMapMarker *src, const BitSet &mmove)
size_t find_first() const
Definition: bitset.cc:242
void BuildComplement(const BitSet &lhs, const BitSet &rhs)
Definition: bitset.cc:486
size_t find_next(size_t pos) const
Definition: bitset.cc:255
virtual std::string ToString()
Definition: ifmap_update.cc:75
virtual const std::string & name() const
Definition: ifmap_client.h:28
SendTask(IFMapUpdateSender *sender)
IFMapUpdateSender(IFMapServer *server, IFMapUpdateQueue *queue)
virtual std::string ToString()
Definition: ifmap_update.h:61
void incr_update_nodes_sent()
Definition: ifmap_client.h:45
std::string ToNumberedString() const
Definition: bitset.cc:593
bool IsNode() const
Definition: ifmap_update.h:99
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
void SetReceiverInMsg(const std::string &cli_identifier)
std::string Description() const
void CleanupClient(int index)
void Send(IFMapMarker *imarker)
void GetSendScheduled(BitSet *current)
IFMapMessage * message_
IFMapListEntry * Next(IFMapListEntry *current)