OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
flow_event.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
4 #include <base/address_util.h>
5 #include <boost/functional/hash.hpp>
6 #include <init/agent_param.h>
7 #include <cmn/agent_stats.h>
8 #include <oper/agent_profile.h>
13 #include "flow_proto.h"
14 #include "flow_mgmt.h"
15 #include "flow_event.h"
16 
18 // FlowEventQueue routines
21  const std::string &name,
22  uint32_t task_id, int task_instance,
23  FlowTokenPool *pool,
24  uint16_t latency_limit,
25  uint32_t max_iterations) :
26  flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0),
27  events_processed_(0), latency_limit_(latency_limit) {
28  queue_ = new Queue(task_id, task_instance,
29  boost::bind(&FlowEventQueueBase::Handler, this, _1),
30  Queue::kMaxSize, max_iterations);
31  char buff[100];
32  sprintf(buff, "%s-%d", name.c_str(), task_instance);
33  queue_->set_name(buff);
34  if (token_pool_)
36  this));
38  if (latency_limit_) {
40  this));
42  this, _1));
43  }
44 }
45 
47  delete queue_;
48 }
49 
51  queue_->Shutdown();
52 }
53 
55  if (CanEnqueue(event) == false) {
56  delete event;
57  return;
58  }
59  queue_->Enqueue(event);
60 }
61 
64 }
65 
67  count_ = 0;
69  getrusage(RUSAGE_THREAD, &rusage_);
70  return true;
71 }
72 
74  if (task_start_ == 0)
75  return;
76 
77  uint64_t t = ClockMonotonicUsec();
78  if (((t - task_start_) / 1000) >= latency_limit_) {
79  struct rusage r;
80  getrusage(RUSAGE_THREAD, &r);
81 
82  uint32_t user = (r.ru_utime.tv_sec - rusage_.ru_utime.tv_sec) * 1000;
83  user += ((r.ru_utime.tv_usec - rusage_.ru_utime.tv_usec) / 1000);
84 
85  uint32_t sys = (r.ru_stime.tv_sec - rusage_.ru_stime.tv_sec) * 1000;
86  sys += ((r.ru_stime.tv_usec - rusage_.ru_stime.tv_usec) / 1000);
87 
88  LOG(ERROR, queue_->Description()
89  << " Time exceeded " << ((t - task_start_) / 1000)
90  << " Count " << count_
91  << " User " << user << " Sys " << sys);
92  }
93  return;
94 }
95 
97  std::unique_ptr<FlowEvent> event_ptr(event);
98  count_++;
99  if (CanProcess(event) == false) {
100  ProcessDone(event, false);
101  return true;
102  }
103 
104  HandleEvent(event);
105 
106  ProcessDone(event, true);
107  return true;
108 }
109 
111  FlowEntry *flow = event->flow();
112  bool ret = true;
113  switch (event->event()) {
114 
116  case FlowEvent::DELETE_FLOW: {
117  tbb::mutex::scoped_lock mutext(flow->mutex());
118  ret = flow->GetPendingAction()->SetDelete();
119  break;
120  }
121 
122  // lock already token for the flow
124  ret = flow->GetPendingAction()->SetRecompute();
125  break;
126  }
127 
129  tbb::mutex::scoped_lock mutext(flow->mutex());
130  ret = flow->GetPendingAction()->SetRecomputeDBEntry();
131  break;
132  }
133 
135  tbb::mutex::scoped_lock mutext(flow->mutex());
136  ret = flow->GetPendingAction()->SetRevaluate();
137  break;
138  }
139 
140  default:
141  break;
142  }
143 
144  return ret;
145 }
146 
148  FlowEntry *flow = event->flow();
149  bool ret = true;
150  switch (event->event()) {
151 
153  case FlowEvent::DELETE_FLOW: {
154  tbb::mutex::scoped_lock mutext(flow->mutex());
156  ret = flow->GetPendingAction()->CanDelete();
157  break;
158  }
159 
161  tbb::mutex::scoped_lock mutext(flow->mutex());
163  ret = flow->GetPendingAction()->CanRecompute();
164  break;
165  }
166 
168  tbb::mutex::scoped_lock mutext(flow->mutex());
170  ret = flow->GetPendingAction()->CanRecomputeDBEntry();
171  break;
172  }
173 
176  tbb::mutex::scoped_lock mutext(flow->mutex());
177  ret = flow->GetPendingAction()->CanRevaluate();
178  break;
179  }
180 
181  default:
182  break;
183  }
184 
185  return ret;
186 }
187 
188 void FlowEventQueueBase::ProcessDone(FlowEvent *event, bool update_rev_flow) {
189  FlowEntry *flow = event->flow();
190  FlowEntry *rflow = NULL;
191  if (flow && update_rev_flow)
192  rflow = flow->reverse_flow_entry();
193 
194  switch (event->event()) {
195 
197  case FlowEvent::DELETE_FLOW: {
198  FLOW_LOCK(flow, rflow, event->event());
199  flow->GetPendingAction()->ResetDelete();
200  if (rflow)
201  rflow->GetPendingAction()->ResetDelete();
202  break;
203  }
204 
206  FLOW_LOCK(flow, rflow, event->event());
207  flow->GetPendingAction()->ResetRecompute();
208  if (rflow)
209  rflow->GetPendingAction()->ResetRecompute();
210  break;
211  }
212 
214  tbb::mutex::scoped_lock mutext(flow->mutex());
216  break;
217  }
218 
220  FLOW_LOCK(flow, rflow, event->event());
221  flow->GetPendingAction()->ResetRevaluate();
222  if (rflow)
223  rflow->GetPendingAction()->ResetRevaluate();
224  break;
225  }
226 
227  default:
228  break;
229  }
230 
231  return;
232 }
233 
235  FlowTable *table, FlowTokenPool *pool,
236  uint16_t latency_limit,
237  uint32_t max_iterations) :
238  FlowEventQueueBase(proto, "Flow Event Queue",
239  agent->task_scheduler()->GetTaskId(kTaskFlowEvent),
240  table->table_index(), pool, latency_limit,
241  max_iterations),
242  flow_table_(table) {
243 }
244 
246 }
247 
249  return flow_proto_->FlowEventHandler(event, flow_table_);
250 }
251 
253  FlowTable *table,
254  FlowTokenPool *pool,
255  uint16_t latency_limit,
256  uint32_t max_iterations) :
257  FlowEventQueueBase(proto, "Flow Delete Queue",
258  agent->task_scheduler()->GetTaskId(kTaskFlowDelete),
259  table->table_index(), pool, latency_limit,
260  max_iterations),
261  flow_table_(table) {
262 }
263 
265 }
266 
268  return flow_proto_->FlowDeleteHandler(event, flow_table_);
269 }
270 
272  FlowTable *table,
273  FlowTokenPool *pool,
274  uint16_t latency_limit,
275  uint32_t max_iterations) :
276  FlowEventQueueBase(proto, "Flow KSync Queue",
277  agent->task_scheduler()->GetTaskId(kTaskFlowKSync),
278  table->table_index(), pool, latency_limit,
279  max_iterations),
280  flow_table_(table) {
281 }
282 
284 }
285 
288 }
289 
291  FlowTokenPool *pool,
292  uint16_t latency_limit,
293  uint32_t max_iterations) :
294  FlowEventQueueBase(proto, "Flow Update Queue",
295  agent->task_scheduler()->GetTaskId(kTaskFlowUpdate), 0,
296  pool, latency_limit, max_iterations) {
297 }
298 
300 }
301 
303  return flow_proto_->FlowUpdateHandler(event);
304 }
bool MeasureQueueDelay()
Definition: agent.cc:1136
UpdateFlowEventQueue(Agent *agent, FlowProto *proto, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
Definition: flow_event.cc:290
void SetEntryCallback(TaskEntryCallback on_entry)
Definition: queue_task.h:299
KSyncFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
Definition: flow_event.cc:271
void ResetRecomputeDBEntry()
Definition: flow_entry.cc:2981
DeleteFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
Definition: flow_event.cc:252
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
virtual bool Handler(FlowEvent *event)
Definition: flow_event.cc:96
#define kTaskFlowEvent
Definition: agent.h:321
bool HandleEvent(FlowEvent *event)
Definition: flow_event.cc:248
uint16_t latency_limit_
Definition: flow_event.h:305
FlowPendingAction * GetPendingAction()
Definition: flow_entry.h:751
virtual bool HandleEvent(FlowEvent *event)=0
bool HandleEvent(FlowEvent *event)
Definition: flow_event.cc:267
#define FLOW_LOCK(flow, rflow, flow_event)
Definition: flow_table.h:61
#define kTaskFlowUpdate
Definition: agent.h:323
bool CanRecomputeDBEntry()
Definition: flow_entry.cc:2986
bool HandleEvent(FlowEvent *event)
Definition: flow_event.cc:286
FlowProto * flow_proto_
Definition: flow_event.h:297
bool FlowUpdateHandler(FlowEvent *req)
Definition: flow_proto.cc:545
bool FlowEventHandler(FlowEvent *req, FlowTable *table)
Definition: flow_proto.cc:401
void TaskExit(bool done)
Definition: flow_event.cc:73
bool CanProcess(FlowEvent *event)
Definition: flow_event.cc:147
void ProcessDone(FlowEvent *event, bool update_rev_flow)
Definition: flow_event.cc:188
virtual ~DeleteFlowEventQueue()
Definition: flow_event.cc:264
void SetExitCallback(TaskExitCallback on_exit)
Definition: queue_task.h:303
static const int kMaxSize
Definition: queue_task.h:111
Definition: agent.h:358
FlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
Definition: flow_event.cc:234
uint64_t task_start_
Definition: flow_event.h:299
virtual ~FlowEventQueue()
Definition: flow_event.cc:245
FlowEventQueueBase(FlowProto *proto, const std::string &name, uint32_t task_id, int task_instance, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
Definition: flow_event.cc:20
#define kTaskFlowKSync
Definition: agent.h:322
bool CanEnqueue(FlowEvent *event)
Definition: flow_event.cc:110
#define kTaskFlowDelete
Definition: agent.h:324
FlowTable * flow_table_
Definition: flow_event.h:342
uint64_t events_processed_
Definition: flow_event.h:304
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
tbb::mutex & mutex()
Definition: flow_entry.h:650
std::string Description() const
Definition: queue_task.h:310
bool FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table)
Definition: flow_proto.cc:510
bool HandleEvent(FlowEvent *event)
Definition: flow_event.cc:302
void Enqueue(FlowEvent *event)
Definition: flow_event.cc:54
bool FlowDeleteHandler(FlowEvent *req, FlowTable *table)
Definition: flow_proto.cc:571
bool TokenCheck(const FlowTokenPool *pool) const
Definition: flow_proto.cc:749
FlowEntry * reverse_flow_entry()
Definition: flow_entry.h:602
virtual ~FlowEventQueueBase()
Definition: flow_event.cc:46
#define LOG(_Level, _Msg)
Definition: logging.h:33
bool SetRecomputeDBEntry()
Definition: flow_entry.cc:2973
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29
virtual ~KSyncFlowEventQueue()
Definition: flow_event.cc:283
void SetStartRunnerFunc(StartRunnerFunc start_runner_fn)
Definition: queue_task.h:192
FlowTokenPool * token_pool_
Definition: flow_event.h:298
WorkQueue< FlowEvent * > Queue
Definition: flow_event.h:269
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
Event event() const
Definition: flow_event.h:146
void set_name(const std::string &name)
Definition: queue_task.h:307
FlowTable * flow_table_
Definition: flow_event.h:330
virtual ~UpdateFlowEventQueue()
Definition: flow_event.cc:299
struct rusage rusage_
Definition: flow_event.h:306
FlowTable * flow_table_
Definition: flow_event.h:318