OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
flow_stats_manager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <boost/uuid/uuid_io.hpp>
6 
7 #include <db/db.h>
8 #include <base/util.h>
9 
10 #include <cmn/agent_cmn.h>
11 #include <boost/functional/factory.hpp>
12 #include <cmn/agent_factory.h>
13 #include <oper/interface_common.h>
14 #include <oper/mirror_table.h>
15 
16 #include <ksync/ksync_index.h>
17 #include <ksync/ksync_entry.h>
18 #include <ksync/ksync_object.h>
19 #include <ksync/ksync_netlink.h>
20 #include <ksync/ksync_sock.h>
21 #include <uve/agent_uve.h>
24 #include <uve/vn_uve_table.h>
25 #include <uve/vm_uve_table.h>
27 #include <algorithm>
28 #include <pkt/flow_proto.h>
30 #include <vrouter/flow_stats/flow_stats_types.h>
31 #include <oper/global_vrouter.h>
32 #include <init/agent_param.h>
33 
35  "FlowExportStats", 3000));
37 
38 void FlowStatsManager::UpdateThreshold(uint64_t new_value, bool check_oflow) {
39  if (check_oflow && new_value < threshold_) {
40  /* Retain the same value for threshold if it results in overflow */
41  return;
42  }
43  if (new_value < kMinFlowSamplingThreshold) {
45  } else {
46  threshold_ = new_value;
47  }
48 }
49 
50 void SetFlowStatsInterval_InSeconds::HandleRequest() const {
51  SandeshResponse *resp;
52  if (get_interval() > 0) {
55  if (obj) {
56  obj->SetExpiryTime(get_interval() * 1000);
57  }
58  resp = new FlowStatsCfgResp();
59  } else {
60  resp = new FlowStatsCfgErrResp();
61  }
62 
63  resp->set_context(context());
64  resp->Response();
65  return;
66 }
67 
68 void GetFlowStatsInterval::HandleRequest() const {
69  FlowStatsIntervalResp_InSeconds *resp =
70  new FlowStatsIntervalResp_InSeconds();
71  resp->set_flow_stats_interval((Agent::GetInstance()->flow_stats_manager()->
72  default_flow_stats_collector_obj()->GetExpiryTime())/1000);
73 
74  resp->set_context(context());
75  resp->Response();
76  return;
77 }
78 
79 FlowStatsManager::FlowStatsManager(Agent *agent) : agent_(agent),
80  request_queue_(agent_->task_scheduler()->GetTaskId("Agent::FlowStatsManager"),
82  boost::bind(&FlowStatsManager::RequestHandler, this, _1)),
83  prev_flow_export_rate_compute_time_(0),
84  threshold_(kDefaultFlowSamplingThreshold),
85  prev_cfg_flow_export_rate_(0), session_export_rate_(0),
86  session_export_count_(), session_sample_exports_(), session_msg_exports_(),
87  session_exports_(), session_export_disable_drops_(),
88  session_export_sampling_drops_(), session_export_without_sampling_(),
89  session_export_drops_(), session_global_slo_logging_drops_(),
90  session_slo_logging_drops_(),
91  timer_(TimerManager::CreateTimer(*(agent_->event_manager())->io_service(),
92  "FlowThresholdTimer",
93  TaskScheduler::GetInstance()->GetTaskId("Agent::FlowStatsManager"), 0)),
94  delete_short_flow_(true) {
98  session_exports_ = 0;
107  for (uint16_t i = 0; i < sizeof(protocol_list_)/sizeof(protocol_list_[0]);
108  i++) {
109  protocol_list_[i] = NULL;
110  }
112  this));
113  session_stats_collector_obj_ = session_obj;
114 }
115 
117  assert(flow_aging_table_map_.size() == 0);
118 }
119 
120 bool FlowStatsManager::RequestHandler(boost::shared_ptr<FlowStatsCollectorReq>
121  req) {
122  switch (req->event) {
124  AddReqHandler(req);
125  break;
126  }
127 
129  DeleteReqHandler(req);
130  break;
131  }
132 
134  FreeReqHandler(req);
135  break;
136  }
137 
138  default: {
139  assert(0);
140  break;
141  }
142  }
143  return true;
144 }
145 
146 void FlowStatsManager::AddReqHandler(boost::shared_ptr<FlowStatsCollectorReq>
147  req) {
148  FlowAgingTableMap::iterator it = flow_aging_table_map_.find(req->key);
149  if (it != flow_aging_table_map_.end()) {
150  it->second->SetFlowAgeTime(
151  1000000L * (uint64_t)req->flow_cache_timeout);
152  it->second->ClearDelete();
153  return;
154  }
155 
157  req.get(),
158  this));
159  flow_aging_table_map_.insert(FlowAgingTableEntry(req->key, aging_table));
160  if (req->key.proto == kCatchAllProto && req->key.port == 0) {
161  default_flow_stats_collector_obj_ = aging_table;
162  }
163 
164  if (req->key.port == 0) {
165  protocol_list_[req->key.proto] = aging_table.get();
166  }
167 }
168 
169 void FlowStatsManager::DeleteReqHandler(boost::shared_ptr<FlowStatsCollectorReq>
170  req) {
171  FlowAgingTableMap::iterator it = flow_aging_table_map_.find(req->key);
172  if (it == flow_aging_table_map_.end()) {
173  return;
174  }
175 
176  FlowAgingTablePtr flow_aging_table_ptr = it->second;
177  flow_aging_table_ptr->MarkDelete();
178 
179  if (flow_aging_table_ptr->CanDelete()) {
180  flow_aging_table_map_.erase(it);
181  protocol_list_[req->key.proto] = NULL;
182  }
183 }
184 
185 void FlowStatsManager::FreeReqHandler(boost::shared_ptr<FlowStatsCollectorReq>
186  req) {
187  FlowAgingTableMap::iterator it = flow_aging_table_map_.find(req->key);
188  if (it == flow_aging_table_map_.end()) {
189  return;
190  }
191 
192  FlowAgingTablePtr flow_aging_table_ptr = it->second;
193  if (flow_aging_table_ptr->IsDeleted() == false) {
194  return;
195  }
196  assert(flow_aging_table_ptr->CanDelete());
197  flow_aging_table_ptr->Shutdown();
198  flow_aging_table_map_.erase(it);
199  protocol_list_[req->key.proto] = NULL;
200 }
201 
203  uint64_t flow_stats_interval,
204  uint64_t flow_cache_timeout) {
205  boost::shared_ptr<FlowStatsCollectorReq>
206  req(new FlowStatsCollectorReq(
208  key, flow_stats_interval, flow_cache_timeout));
209  request_queue_.Enqueue(req);
210 }
211 
213  if (key.proto == kCatchAllProto) {
214  return;
215  }
216  boost::shared_ptr<FlowStatsCollectorReq>
217  req(new FlowStatsCollectorReq(
219  key));
220  request_queue_.Enqueue(req);
221 }
222 
224  boost::shared_ptr<FlowStatsCollectorReq>
225  req(new FlowStatsCollectorReq(
227  key));
228  request_queue_.Enqueue(req);
229 }
230 
232 FlowStatsManager::Find(uint32_t proto, uint32_t port) const {
233 
234  FlowAgingTableKey key1(proto, port);
235  FlowAgingTableMap::const_iterator key1_it = flow_aging_table_map_.find(key1);
236 
237  if (key1_it == flow_aging_table_map_.end()){
238  return NULL;
239  }
240 
241  return key1_it->second.get();
242 }
243 
246  FlowStatsCollectorObject* col = NULL;
247 
248  const FlowKey &key = flow->key();
249  FlowAgingTableKey key1(key.protocol, key.src_port);
250  FlowAgingTableMap::const_iterator key1_it =
251  flow_aging_table_map_.find(key1);
252 
253  if (key1_it != flow_aging_table_map_.end()) {
254  col = key1_it->second.get();
255  if (!col->IsDeleted())
256  return col;
257  }
258 
259  FlowAgingTableKey key2(key.protocol, key.dst_port);
260  FlowAgingTableMap::const_iterator key2_it =
261  flow_aging_table_map_.find(key2);
262  if (key2_it != flow_aging_table_map_.end()) {
263  col = key2_it->second.get();
264  if (!col->IsDeleted())
265  return col;
266  }
267 
268  if (protocol_list_[key.protocol] != NULL) {
269  col = protocol_list_[key.protocol];
270  if (!col->IsDeleted())
271  return col;
272  }
274 }
275 
278  /* If the reverse flow already has FlowStatsCollector assigned, return
279  * the same to ensure that forward and reverse flows go to same
280  * FlowStatsCollector */
281  const FlowEntry *rflow = flow->reverse_flow_entry();
282  if (rflow && rflow->fsc()) {
283  return rflow->fsc();
284  }
286 
287  return obj->FlowToCollector(flow);
288 }
289 
291  if (flow == NULL) {
292  return;
293  }
294 
295  FlowStatsCollector *fsc = NULL;
296  if (flow->fsc() == NULL) {
297  fsc = GetFlowStatsCollector(flow.get());
298  flow->set_fsc(fsc);
299  } else {
300  fsc = flow->fsc();
301  }
302 
303  fsc->AddEvent(flow);
304 
305  SessionStatsCollector *ssc = NULL;
306  ssc = session_stats_collector_obj_->FlowToCollector(flow.get());
307  if (ssc) {
308  ssc->AddEvent(flow);
309  }
310 }
311 
313  const RevFlowDepParams &params) {
314  if (flow == NULL) {
315  return;
316  }
317  FlowStatsCollector *fsc = flow->fsc();
318  /* Ignore delete requests if FlowStatsCollector is NULL */
319  if (fsc != NULL) {
320  fsc->DeleteEvent(flow, params);
321  flow->set_fsc(NULL);
322  }
323 
324  SessionStatsCollector *ssc = NULL;
325  ssc = session_stats_collector_obj_->FlowToCollector(flow.get());
326  if (ssc) {
327  ssc->DeleteEvent(flow, params);
328  }
329 }
330 
332  uint32_t bytes, uint32_t packets,
333  uint32_t oflow_bytes,
334  const boost::uuids::uuid &u) {
335  if (flow == NULL) {
336  return;
337  }
338 
339  FlowStatsCollector *fsc = flow->fsc();
340  if (fsc == NULL) {
341  /* Ignore stats update request, if the flow does not have any
342  * FlowStatsCollector associated with it */
343  return;
344  }
345 
346  fsc->UpdateStatsEvent(flow, bytes, packets, oflow_bytes, u);
347 
348  SessionStatsCollector *ssc = NULL;
349  ssc = session_stats_collector_obj_->FlowToCollector(flow.get());
350  if (ssc) {
351  ssc->UpdateSessionStatsEvent(flow, bytes, packets, oflow_bytes, u);
352  }
353 }
354 
356  return instance_table_.Insert(NULL);
357 }
358 
359 void FlowStatsManager::FreeIndex(uint32_t idx) {
360  instance_table_.Remove(idx);
361 }
362 
364  uint32_t protocol, uint32_t port, uint64_t timeout) {
365  if (timeout == 0) {
366  agent->flow_stats_manager()->Delete(
367  FlowAgingTableKey(protocol, port));
368  } else {
369  agent->flow_stats_manager()->Add(
370  FlowAgingTableKey(protocol, port),
371  agent->params()->flow_stats_interval(), timeout);
372  }
373 }
374 
376  session_stats_collector_obj_->RegisterDBClients();
377  return;
378 }
379 
380 
381 void FlowStatsManager::Init(uint64_t flow_stats_interval,
382  uint64_t flow_cache_timeout) {
384  flow_stats_interval, flow_cache_timeout);
385 
386  if (agent_->tsn_enabled()) {
387  /* In TSN mode, we don't support add/delete of FlowStatsCollector
388  * (so we don't invoke set_flow_stats_req_handler)
389  * Also, we don't export flows, so we don't start UpdateSessionThreshold
390  * timer */
391  return;
392  }
394 
396  boost::bind(&FlowStatsManager::UpdateSessionThreshold, this));
397 }
398 
400  AgentProfile *profile = agent_->oper_db()->agent_profile();
402  this, _1));
403 }
404 
408  session_stats_collector_obj_->Shutdown();
410  flow_aging_table_map_.clear();
411  protocol_list_[0] = NULL;
412  timer_->Cancel();
415 }
416 
417 void ShowAgingConfig::HandleRequest() const {
418  SandeshResponse *resp;
419 
421  resp = new AgingConfigResponse();
422 
423  FlowStatsManager::FlowAgingTableMap::const_iterator it = fam->begin();
424  while (it != fam->end()) {
425  AgingConfig cfg;
426  cfg.set_protocol(it->first.proto);
427  cfg.set_port(it->first.port);
428  cfg.set_cache_timeout(it->second->GetAgeTimeInSeconds());
429  cfg.set_stats_interval(0);
430  std::vector<AgingConfig> &list =
431  const_cast<std::vector<AgingConfig>&>(
432  ((AgingConfigResponse *)resp)->get_aging_config_list());
433  list.push_back(cfg);
434  it++;
435  }
436 
437  resp->set_context(context());
438  resp->Response();
439  return;
440 }
441 
442 void AddAgingConfig::HandleRequest() const {
444  fam->Add(FlowAgingTableKey(get_protocol(), get_port()),
445  get_stats_interval(), get_cache_timeout());
446  SandeshResponse *resp = new FlowStatsCfgResp();
447  resp->set_context(context());
448  resp->Response();
449  return;
450 }
451 
452 void DeleteAgingConfig::HandleRequest() const {
454  fam->Delete(FlowAgingTableKey(get_protocol(), get_port()));
455 
456  SandeshResponse *resp = new FlowStatsCfgResp();
457  resp->set_context(context());
458  resp->Response();
459  return;
460 }
461 
462 static void SetQueueStats(Agent *agent, FlowStatsCollector *fsc,
464  stats->name_ = fsc->queue()->Description();
465  stats->queue_count_ = fsc->queue()->Length();
466  stats->enqueue_count_ = fsc->queue()->NumEnqueues();
467  stats->dequeue_count_ = fsc->queue()->NumDequeues();
468  stats->max_queue_count_ = fsc->queue()->max_queue_len();
469  stats->start_count_ = fsc->queue()->task_starts();
470  stats->busy_time_ = fsc->queue()->busy_time();
472  if (agent->MeasureQueueDelay())
473  fsc->queue()->ClearStats();
474 }
475 
477  uint32_t qsize = flow_aging_table_map_.size() *
479  data->flow_.flow_stats_queue_.resize(qsize);
480  int i = 0;
481  FlowAgingTableMap::iterator it = flow_aging_table_map_.begin();
482  while (it != flow_aging_table_map_.end()) {
483  FlowStatsCollectorObject *obj = it->second.get();
484  for (int j = 0; j < FlowStatsCollectorObject::kMaxCollectors; j++) {
485  SetQueueStats(agent(), obj->GetCollector(j),
486  &data->flow_.flow_stats_queue_[i]);
487  i++;
488  }
489  it++;
490  }
491 }
492 
494  session_sample_exports_ += count;
495 }
496 
498  session_msg_exports_ += count;
499 }
500 
502  bool first_export,
503  bool sampled) {
504  session_export_count_ += count;
505  if (first_export) {
506  session_exports_ += count;
507  }
508  if (!sampled) {
510  }
511 }
bool MeasureQueueDelay()
Definition: agent.cc:1136
boost::shared_ptr< SessionStatsCollectorObject > SessionStatsCollectorPtr
uint32_t task_starts() const
Definition: queue_task.h:376
tbb::atomic< uint64_t > session_exports_
tbb::atomic< bool > sessions_sampled_atleast_once_
bool tsn_enabled() const
Definition: agent.h:1162
tbb::atomic< uint64_t > session_slo_logging_drops_
FlowAgingTableMap flow_aging_table_map_
static Agent * GetInstance()
Definition: agent.h:436
FlowStatsCollector * FlowToCollector(const FlowEntry *flow)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
FlowStatsManager * flow_stats_manager() const
Definition: agent.cc:925
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
void Free(const FlowAgingTableKey &key)
size_t max_queue_len() const
Definition: queue_task.h:377
void UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams &params)
size_t NumDequeues() const
Definition: queue_task.h:364
void AddEvent(const FlowEntryPtr &flow)
void UpdateSessionStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void RegisterFlowStatsCb(ProfileCb cb)
boost::shared_ptr< FlowStatsCollectorObject > FlowAgingTablePtr
const Queue * queue() const
FlowAgingTableMap::iterator begin()
boost::uuids::uuid uuid
static void SetQueueStats(Agent *agent, FlowStatsCollector *fsc, ProfileData::WorkQueueStats *stats)
std::pair< const FlowAgingTableKey, FlowAgingTablePtr > FlowAgingTableEntry
tbb::atomic< uint64_t > session_export_disable_drops_
tbb::atomic< uint64_t > session_global_slo_logging_drops_
void Add(const FlowAgingTableKey &key, uint64_t flow_stats_interval, uint64_t flow_cache_timeout)
uint8_t protocol
Definition: flow_entry.h:215
tbb::atomic< uint64_t > session_export_drops_
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
Definition: sandesh_trace.h:18
FlowStatsCollector * GetCollector(uint8_t idx) const
void Init(uint64_t flow_stats_interval, uint64_t flow_cache_timeout)
SessionStatsCollectorPtr session_stats_collector_obj_
OperDB * oper_db() const
Definition: agent.cc:1013
FlowAgingTableMap::iterator end()
void AddReqHandler(boost::shared_ptr< FlowStatsCollectorReq > req)
const FlowKey & key() const
Definition: flow_entry.h:594
void Remove(size_t index)
Definition: index_vector.h:78
size_t Insert(EntryType entry)
Definition: index_vector.h:40
FlowAgingTablePtr default_flow_stats_collector_obj_
virtual void Response()
Definition: p/sandesh.h:502
Definition: agent.h:358
void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams &params)
static const uint32_t kMinFlowSamplingThreshold
size_t Length() const
Definition: queue_task.h:356
void FreeReqHandler(boost::shared_ptr< FlowStatsCollectorReq > req)
bool RequestHandler(boost::shared_ptr< FlowStatsCollectorReq > req)
static void FlowStatsReqHandler(Agent *agent, uint32_t proto, uint32_t port, uint64_t protocol)
void UpdateSessionMsgExportStats(uint32_t count)
void UpdateSessionExportStats(uint32_t count, bool first_export, bool sampled)
static const uint8_t kCatchAllProto
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
WorkQueue< boost::shared_ptr< FlowStatsCollectorReq > > request_queue_
void Delete(const FlowAgingTableKey &key)
void AddEvent(FlowEntryPtr &flow)
void SetProfileData(ProfileData *data)
AgentParam * params() const
Definition: agent.h:1218
SandeshTraceBufferPtr FlowExportStatsTraceBuf
FlowStats flow_
void UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void AddEvent(const FlowEntryPtr &flow)
bool Cancel()
Definition: timer.cc:150
std::string Description() const
Definition: queue_task.h:310
uint16_t src_port
Definition: flow_entry.h:216
FlowStatsCollector * GetFlowStatsCollector(const FlowEntry *p) const
void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams &params)
tbb::atomic< uint64_t > session_msg_exports_
void ClearStats() const
Definition: queue_task.h:382
IndexVector< FlowStatsCollector * > instance_table_
FlowStatsManager(Agent *agent)
uint16_t dst_port
Definition: flow_entry.h:217
void set_context(std::string context)
Definition: p/sandesh.h:310
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
void UpdateThreshold(uint64_t new_value, bool check_oflow)
FlowEntry * reverse_flow_entry()
Definition: flow_entry.h:602
tbb::atomic< uint64_t > session_export_sampling_drops_
friend struct FlowStatsCollectorReq
FlowStatsCollectorObject * protocol_list_[256]
uint64_t busy_time() const
Definition: queue_task.h:380
size_t NumEnqueues() const
Definition: queue_task.h:360
tbb::atomic< uint64_t > session_sample_exports_
int flow_stats_interval() const
Definition: agent_param.h:321
tbb::atomic< uint32_t > session_export_count_
std::vector< WorkQueueStats > flow_stats_queue_
Definition: agent_profile.h:71
tbb::atomic< uint32_t > session_export_without_sampling_
FlowStatsCollectorObject * GetFlowStatsCollectorObject(const FlowEntry *flow) const
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
static const uint64_t FlowThresoldUpdateTime
void set_flow_stats_req_handler(FlowStatsReqHandler req)
Definition: agent.h:1336
AgentProfile * agent_profile() const
Definition: operdb_init.h:79
void DeleteReqHandler(boost::shared_ptr< FlowStatsCollectorReq > req)
void UpdateSessionSampleExportStats(uint32_t count)
const FlowStatsCollectorObject * Find(uint32_t proto, uint32_t port) const
void FreeIndex(uint32_t idx)
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
Definition: sandesh_trace.h:46
FlowStatsCollector * fsc() const
Definition: flow_entry.h:728
FlowStatsCollectorObject * default_flow_stats_collector_obj()
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
Definition: flow_entry.h:125