OpenSDN source code
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
flow_stats_collector.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <utility>
6 
7 #include <boost/uuid/uuid.hpp>
8 #include <boost/uuid/uuid_io.hpp>
9 
10 #include <db/db.h>
11 #include <base/util.h>
12 #include <base/string_util.h>
13 
14 #include <cmn/agent_cmn.h>
15 #include <init/agent_param.h>
16 #include <boost/functional/factory.hpp>
17 #include <cmn/agent_factory.h>
18 #include <oper/interface_common.h>
19 #include <oper/mirror_table.h>
20 #include <oper/global_vrouter.h>
21 
22 #include <ksync/ksync_index.h>
23 #include <ksync/ksync_entry.h>
24 #include <ksync/ksync_object.h>
25 #include <ksync/ksync_netlink.h>
26 #include <ksync/ksync_sock.h>
27 #include <uve/agent_uve.h>
29 #include <uve/vn_uve_table.h>
30 #include <uve/vm_uve_table.h>
32 #include <uve/vrouter_uve_entry.h>
33 #include <algorithm>
34 #include <pkt/flow_proto.h>
35 #include <pkt/flow_mgmt.h>
37 #include <vrouter/flow_stats/flow_stats_types.h>
38 
39 bool flow_ageing_debug_ = false;
40 FlowStatsCollector::FlowStatsCollector(boost::asio::io_context &io, int intvl,
41  uint32_t flow_cache_timeout,
42  AgentUveBase *uve,
43  uint32_t instance_id,
44  FlowAgingTableKey *key,
45  FlowStatsManager *aging_module,
47  StatsCollector(TaskScheduler::GetInstance()->GetTaskId
48  (kTaskFlowStatsCollector), instance_id,
49  io, kFlowStatsTimerInterval, "Flow stats collector"),
50  agent_uve_(uve),
51  task_id_(uve->agent()->task_scheduler()->GetTaskId
53  rand_gen_(boost::uuids::random_generator()),
54  flow_iteration_key_(NULL),
55  entries_to_visit_(0),
56  flow_tcp_syn_age_time_(FlowTcpSynAgeTime),
57  retry_delete_(true),
58  request_queue_(agent_uve_->agent()->task_scheduler()->
59  GetTaskId(kTaskFlowStatsCollector),
60  instance_id,
61  boost::bind(&FlowStatsCollector::RequestHandler,
62  this, _1)),
63  flow_aging_key_(*key), instance_id_(instance_id),
64  flow_stats_manager_(aging_module), parent_(obj), ageing_task_(NULL),
65  current_time_(GetCurrentTime()), ageing_task_starts_(0) {
66  if (flow_cache_timeout) {
67  // Convert to usec
68  flow_age_time_intvl_ = 1000000L * (uint64_t)flow_cache_timeout;
69  } else {
71  }
72  deleted_ = false;
73  request_queue_.set_name("Flow stats collector");
77  (boost::bind(&FlowStatsCollector::RequestHandlerEntry, this));
79  (boost::bind(&FlowStatsCollector::RequestHandlerExit, this, _1));
80  // Aging timer fires every kFlowStatsTimerInterval msec. Compute
81  // number of timer fires needed to scan complete table
83  InitDone();
84 }
85 
88 }
89 
91  return rand_gen_();
92 }
93 
95  return UTCTimestampUsec();
96 }
97 
99  assert(ageing_task_ == NULL);
102 }
103 
104 // We want to scan the flow table every 25% of configured ageing time.
105 // Compute number of timer fires needed to scan the flow-table once.
107  uint64_t scan_time_millisec;
108  /* Use Age Time itself as scan-time for flows */
109 
110  // Convert aging-time configured in micro-sec to millisecond
111  scan_time_millisec = flow_age_time_intvl_ / 1000;
112 
113  // Compute time in which we must scan the complete table to honor the
114  // kFlowScanTime
115  scan_time_millisec = (scan_time_millisec * kFlowScanTime) / 100;
116 
117  // Enforce min value on scan-time
118  if (scan_time_millisec < kFlowStatsTimerInterval) {
119  scan_time_millisec = kFlowStatsTimerInterval;
120  }
121 
122  // Number of timer fires needed to scan table once
123  return scan_time_millisec / kFlowStatsTimerInterval;
124 }
125 
126 // Update entries_to_visit_ based on total flows
127 // Timer fires every kFlowScanTime. Its possible that we may not have visited
128 // all entries by the time next timer fires. So, keep accumulating the number
129 // of entries to visit into entries_to_visit_
130 //
131 // A lower-bound and an upper-bound are enforced on entries_to_visit_
133  // Compute number of flows to visit per scan-time
134  uint32_t count = flow_export_info_list_.size();
135  uint32_t entries = count / timers_per_scan_;
136 
137  // Update number of entries to visit in flow.
138  // The scan for previous timer may still be in progress. So, accmulate
139  // number of entries to visit
140  entries_to_visit_ += entries;
141 
142  // Cap number of entries to visit to 25% of table
143  if (entries_to_visit_ > ((count * kFlowScanTime)/100))
144  entries_to_visit_ = (count * kFlowScanTime)/100;
145 
146  // Apply lower-limit
149 
150  return;
151 }
152 
154  const vr_flow_entry *k_flow,
155  const vr_flow_stats &k_stats,
156  uint64_t curr_time) {
157  FlowEntry *flow = info->flow();
158  //If both forward and reverse flow are marked
159  //as TCP closed then immediately remote the flow
160  if (k_flow != NULL) {
161  uint64_t k_flow_bytes, bytes;
162  k_flow_bytes = GetFlowStats(k_stats.flow_bytes_oflow,
163  k_stats.flow_bytes);
164  bytes = 0x0000ffffffffffffULL & info->bytes();
165  /* Don't account for agent overflow bits while comparing change in
166  * stats */
167  if (bytes < k_flow_bytes) {
168  return false;
169  }
170  }
171 
172  uint64_t diff_time = curr_time - info->last_modified_time();
173  if (diff_time < flow_age_time_intvl()) {
174  return false;
175  }
176 
178  return false;
179  }
180 
181  return true;
182 }
183 
184 uint64_t FlowStatsCollector::GetFlowStats(const uint16_t &oflow_data,
185  const uint32_t &data) {
186  uint64_t flow_stats = (uint64_t) oflow_data << (sizeof(uint32_t) * 8);
187  flow_stats |= data;
188  return flow_stats;
189 }
190 
192  uint64_t k_flow_bytes) {
193  uint64_t oflow_bytes = 0xffff000000000000ULL & stats->bytes();
194  uint64_t old_bytes = 0x0000ffffffffffffULL & stats->bytes();
195  if (old_bytes > k_flow_bytes) {
196  oflow_bytes += 0x0001000000000000ULL;
197  }
198  return (oflow_bytes |= k_flow_bytes);
199 }
200 
202  uint64_t k_flow_pkts) {
203  uint64_t oflow_pkts = 0xffffff0000000000ULL & stats->packets();
204  uint64_t old_pkts = 0x000000ffffffffffULL & stats->packets();
205  if (old_pkts > k_flow_pkts) {
206  oflow_pkts += 0x0000010000000000ULL;
207  }
208  return (oflow_pkts |= k_flow_pkts);
209 }
210 
212  uint64_t bytes, uint64_t pkts) {
214  FlowEntry *fe = flow->flow();
215  if (!fe) {
216  return;
217  }
218 
219  /* Ignore Non-Floating-IP flow */
220  if (!fe->fip() || fe->fip_vmi().uuid_ == boost::uuids::nil_uuid()) {
221  return;
222  }
223 
224  InterfaceUveStatsTable *table = static_cast<InterfaceUveStatsTable *>
226 
227  fip_info.bytes_ = bytes;
228  fip_info.packets_ = pkts;
229  fip_info.fip_ = fe->fip();
230  fip_info.fip_vmi_ = fe->fip_vmi();
234  fip_info.vn_ = fe->data().source_vn_match;
235 
236  fip_info.rev_fip_ = NULL;
237  if (fe->fip() != ReverseFlowFip(flow)) {
238  /* This is the case where Source and Destination VMs (part of
239  * same compute node) ping to each other to their respective
240  * Floating IPs. In this case for each flow we need to increment
241  * stats for both the VMs */
242  fip_info.rev_fip_ = ReverseFlowFipEntry(flow);
243  }
244 
245  table->UpdateFloatingIpStats(fip_info);
246 }
247 
249  (const FlowExportInfo *flow) {
250  uint32_t fip = ReverseFlowFip(flow);
251  VmInterfaceKey vmi = ReverseFlowFipVmi(flow);
252  Interface *intf = dynamic_cast<Interface *>
253  (agent_uve_->agent()->interface_table()->FindActiveEntry(&vmi));
254 
255  if (intf) {
256  InterfaceUveStatsTable *table = static_cast<InterfaceUveStatsTable *>
257  (agent_uve_->interface_uve_table());
258  const string &vn = flow->flow()->data().source_vn_match;
259  return table->FipEntry(fip, vn, intf);
260  }
261  return NULL;
262 }
263 
265  FlowEntry *rflow = info->reverse_flow();
266  if (rflow) {
267  return rflow->fip();
268  }
269  return 0;
270 }
271 
273  (const FlowExportInfo *info)
274 {
275  FlowEntry *rflow = info->reverse_flow();
276  if (rflow) {
277  return rflow->fip_vmi();
278  }
279  return VmInterfaceKey(
280  AgentKey::ADD_DEL_CHANGE, boost::uuids::nil_uuid(), "");
281 }
282 
284  uint64_t bytes, uint64_t pkts) {
285  FlowEntry *flow = info->flow();
286 
287  const Interface *itf = flow->intf_entry();
288  if (!itf) {
289  return;
290  }
291  if (itf->type() != Interface::VM_INTERFACE) {
292  return;
293  }
294  const VmInterface *vmi = static_cast<const VmInterface *>(itf);
295  const string &src_vn = flow->data().source_vn_match;
296  const string &dst_vn = flow->data().dest_vn_match;
297 
298  /* Ignore flows for which source VN or destination VN are not known */
299  if (!src_vn.length() || !dst_vn.length()) {
300  return;
301  }
302 
303  InterfaceUveStatsTable *itf_table = static_cast<InterfaceUveStatsTable *>
306  ep.vmi = vmi;
307  ep.local_tagset = flow->local_tagset();
308  ep.remote_tagset = flow->remote_tagset();
309  ep.remote_prefix = flow->RemotePrefix();
310  ep.policy = flow->fw_policy_name_uuid();
311  ep.diff_bytes = bytes;
312  ep.diff_pkts = pkts;
314  ep.action);
315  if (flow->is_flags_set(FlowEntry::LocalFlow)) {
316  /* When VM A talks to VM B which is in different compute nodes, the
317  * following flows are created
318  * (1) A-B, Ingress, Forward, pol1
319  * (2) B-A, Egress, Reverse, pol1
320  * (3) A-B, Egress, Forward, pol2
321  * (4) B-A, Inress, Reverse, pol2
322  * When both A and B are in single compute, we have only the following
323  * flows (Flows marked as LocalFlow)
324  * (1) A-B, Ingress, Forward, pol1
325  * (2) B-A, Inress, Reverse, pol2
326  * To simulate session stats similar to case where VMs are in different
327  * computes, for local flows, we do the following.
328  * (a) when "A-B, Ingress, Forward, pol1" flow is seen, we also
329  * update stats for "A-B, Egress, Forward, pol2". This is because
330  * diff stats for "A-B, Ingress, Forward, pol1" and
331  * "A-B, Egress, Forward, pol2" are same. Policy for implicit flow
332  * is picked from reverse flow
333  * (b) when "B-A, Ingress, Reverse, pol2" flow is seen, we also
334  * update stats for "B-A, Egress, Reverse, pol1". This is because diff
335  * stats for "B-A, Ingress, Reverse, pol2" and
336  * "B-A, Egress, Reverse, pol1" is same. Policy for implicit flow is
337  * picked from reverse flow
338  */
339  ep.local_vn = src_vn;
340  ep.remote_vn = dst_vn;
341  ep.in_stats = true;
342  bool egress_flow_is_client;
344  ep.client = false;
345  egress_flow_is_client = true;
346  } else {
347  ep.client = true;
348  egress_flow_is_client = false;
349  }
350  itf_table->UpdateVmiTagBasedStats(ep);
351 
352  /* Local flows will not have egress flows in the system. So we need to
353  * explicitly build stats for egress flow using the data available from
354  * ingress flow. Egress flow stats has to be updated on destination
355  * VMI. We skip updation if we are unable to pick destination VMI from
356  * reverse flow. */
357 
358  FlowEntry* rflow = info->reverse_flow();
359  if (rflow) {
360  const Interface *ritf = rflow->intf_entry();
361  if (ritf && (ritf->type() == Interface::VM_INTERFACE)) {
362  ep.local_tagset = flow->remote_tagset();
363  ep.remote_tagset = flow->local_tagset();
364  ep.local_vn = dst_vn;
365  ep.remote_vn = src_vn;
366  ep.policy = rflow->fw_policy_name_uuid();
367  ep.client = egress_flow_is_client;
368  ep.vmi = static_cast<const VmInterface *>(ritf);
369  ep.in_stats = false;
370  itf_table->UpdateVmiTagBasedStats(ep);
371  }
372  }
373  } else {
374  if (flow->is_flags_set(FlowEntry::IngressDir)) {
375  ep.local_vn = src_vn;
376  ep.remote_vn = dst_vn;
377  ep.in_stats = true;
379  ep.client = false;
380  } else {
381  ep.client = true;
382  }
383  } else {
384  ep.local_vn = dst_vn;
385  ep.remote_vn = src_vn;
386  ep.in_stats = false;
388  ep.client = true;
389  } else {
390  ep.client = false;
391  }
392  }
393  itf_table->UpdateVmiTagBasedStats(ep);
394  }
395 }
396 
398  uint64_t bytes, uint64_t pkts) {
399  FlowEntry *flow = info->flow();
400  string src_vn = flow->data().source_vn_match;
401  string dst_vn = flow->data().dest_vn_match;
402  VnUveTable *vn_table = static_cast<VnUveTable *>
404 
405  if (!src_vn.length())
406  src_vn = FlowHandler::UnknownVn();
407  if (!dst_vn.length())
408  dst_vn = FlowHandler::UnknownVn();
409 
410  /* When packet is going from src_vn to dst_vn it should be interpreted
411  * as ingress to vrouter and hence in-stats for src_vn w.r.t. dst_vn
412  * should be incremented. Similarly when the packet is egressing vrouter
413  * it should be considered as out-stats for dst_vn w.r.t. src_vn.
414  * Here the direction "in" and "out" should be interpreted w.r.t vrouter
415  */
416  if (flow->is_flags_set(FlowEntry::LocalFlow)) {
417  vn_table->UpdateInterVnStats(src_vn, dst_vn, bytes, pkts, false);
418  vn_table->UpdateInterVnStats(dst_vn, src_vn, bytes, pkts, true);
419  } else {
420  if (flow->is_flags_set(FlowEntry::IngressDir)) {
421  vn_table->UpdateInterVnStats(src_vn, dst_vn, bytes, pkts, false);
422  } else {
423  vn_table->UpdateInterVnStats(dst_vn, src_vn, bytes, pkts, true);
424  }
425  }
426 }
427 
429  uint64_t teardown_time) {
430  if (!info) {
431  return;
432  }
433  FlowEntry *fe = info->flow();
434  KSyncFlowMemory *ksync_obj = agent_uve_->agent()->ksync()->
435  ksync_flow_memory();
436  /* Fetch vrouter Flow entry using gen_id and flow_handle from FlowExportInfo
437  * to account for the case where FlowEntry's flow_handle/gen_id has changed
438  * during Delete processing by FlowStatsCollector */
439  vr_flow_stats k_stats;
440  const vr_flow_entry *k_flow = ksync_obj->GetKFlowStats(fe->key(),
441  info->flow_handle(),
442  info->gen_id(),
443  &k_stats);
444  if (k_flow) {
445  UpdateFlowStatsInternal(info, k_stats.flow_bytes,
446  k_stats.flow_bytes_oflow,
447  k_stats.flow_packets,
448  k_stats.flow_packets_oflow,
449  teardown_time, true);
450  return;
451  }
452 }
453 
455  flows_aged_++;
456  FlowEntry *fe = info->flow();
458  info->set_delete_enqueue_time(t);
459  FlowEntry *rflow = info->reverse_flow();
460  if (rflow) {
461  FlowExportInfo *rev_info = FindFlowExportInfo(rflow);
462  if (rev_info) {
463  rev_info->set_delete_enqueue_time(t);
464  }
465  }
466 }
467 
469  uint32_t flow_handle,
470  uint16_t gen_id) {
471  flows_evicted_++;
472  FlowEntry *fe = info->flow();
474  (fe, flow_handle, gen_id, (gen_id + 1));
475  info->set_evict_enqueue_time(t);
476 }
477 
479  uint32_t bytes,
480  uint16_t oflow_bytes,
481  uint32_t pkts,
482  uint16_t oflow_pkts,
483  uint64_t time,
484  bool teardown_time) {
485  FlowEntry *flow = info->flow();
486  FlowEntry *rflow = info->reverse_flow();
487  FLOW_LOCK(flow, rflow, FlowEvent::FLOW_MESSAGE);
488  UpdateFlowStatsInternal(info, bytes, oflow_bytes, pkts, oflow_pkts, time,
489  teardown_time);
490 }
491 
493  uint32_t bytes,
494  uint16_t oflow_bytes,
495  uint32_t pkts,
496  uint16_t oflow_pkts,
497  uint64_t time,
498  bool teardown_time) {
499  uint64_t k_bytes, k_packets, total_bytes, total_packets;
500  k_bytes = GetFlowStats(oflow_bytes, bytes);
501  k_packets = GetFlowStats(oflow_pkts, pkts);
502 
503  total_bytes = GetUpdatedFlowBytes(info, k_bytes);
504  total_packets = GetUpdatedFlowPackets(info, k_packets);
505  uint64_t diff_bytes = total_bytes - info->bytes();
506  uint64_t diff_pkts = total_packets - info->packets();
507  info->set_bytes(total_bytes);
508  info->set_packets(total_packets);
509  if (teardown_time) {
510  info->set_teardown_time(time);
511  } else {
512  info->set_last_modified_time(time);
513  }
514 
515  /* In TSN mode, we don't export flows or statistics based on flows */
516  if (agent_uve_->agent()->tsn_enabled()) {
517  return;
518  }
519  //Update Inter-VN stats
520  UpdateInterVnStats(info, diff_bytes, diff_pkts);
521  //Update Endpoint stats
522  UpdateVmiTagBasedStats(info, diff_bytes, diff_pkts);
523  //Update Floating-IP stats
524  UpdateFloatingIpStats(info, diff_bytes, diff_pkts);
525 }
526 
527 // Check if flow needs to be evicted
529  const vr_flow_entry *k_flow,
530  uint16_t k_flow_flags,
531  uint32_t flow_handle, uint16_t gen_id,
532  FlowExportInfo *info, uint64_t curr_time) {
533  FlowEntry *fe = info->flow();
534 
535  if ((fe->key().protocol != IPPROTO_TCP))
536  return false;
537 
538  if (ksync_obj->IsEvictionMarked(k_flow, k_flow_flags) == false)
539  return false;
540 
541  // Flow evict already enqueued? Re-Enqueue request after retry-time
542  uint64_t evict_time = info->evict_enqueue_time();
543  if (evict_time) {
544  if ((curr_time - evict_time) > kFlowDeleteRetryTime) {
545  FlowEvictEnqueue(info, curr_time, flow_handle, gen_id);
546  }
547  } else {
548  FlowEvictEnqueue(info, curr_time, flow_handle, gen_id);
549  }
550 
551  return true;
552 }
553 
555  const vr_flow_entry *k_flow,
556  const vr_flow_stats &k_stats,
557  const KFlowData& kinfo,
558  FlowExportInfo *info, uint64_t curr_time) {
559  FlowEntry *fe = info->flow();
560  FlowEntry *rfe = info->reverse_flow();
561 
562  // if we come across deleted entry, retry flow deletion after some time
563  // duplicate delete will be suppressed in flow_table
564  uint64_t delete_time = info->delete_enqueue_time();
565  if (delete_time) {
566  if ((curr_time - delete_time) > kFlowDeleteRetryTime) {
567  FlowDeleteEnqueue(info, curr_time);
568  }
569  return true;
570  }
571 
572  // Delete short flows
573  if ((flow_stats_manager_->delete_short_flow() == true) &&
575  FlowDeleteEnqueue(info, curr_time);
576  return true;
577  }
578 
579  bool deleted = false;
580  FlowExportInfo *rev_info = NULL;
581  // Can the flow be aged?
582  if (ShouldBeAged(info, k_flow, k_stats, curr_time)) {
583  rev_info = FindFlowExportInfo(rfe);
584  // ShouldBeAged looks at one flow only. So, check for both forward and
585  // reverse flows
586  if (rev_info) {
587  const vr_flow_entry *k_flow_rev = NULL;
588  vr_flow_stats k_rflow_stats;
589  k_flow_rev = ksync_obj->GetKFlowStats(rfe->key(),
590  rev_info->flow_handle(),
591  rev_info->gen_id(),
592  &k_rflow_stats);
593  if (ShouldBeAged(rev_info, k_flow_rev, k_rflow_stats, curr_time)) {
594  deleted = true;
595  }
596  } else {
597  deleted = true;
598  }
599  }
600 
601  if (deleted == true) {
602  FlowDeleteEnqueue(info, curr_time);
603  }
604 
605  // Update stats for flows not being deleted
606  // Stats for deleted flow are updated when we get DELETE message
607  if (deleted == false && k_flow) {
608  uint64_t k_bytes, bytes;
609 
610  k_bytes = GetFlowStats(k_stats.flow_bytes_oflow,
611  k_stats.flow_bytes);
612  bytes = 0x0000ffffffffffffULL & info->bytes();
613  /* Don't account for agent overflow bits while comparing change in
614  * stats */
615  if (bytes != k_bytes) {
617  k_stats.flow_bytes,
618  k_stats.flow_bytes_oflow,
619  k_stats.flow_packets,
620  k_stats.flow_packets_oflow,
621  curr_time, false);
622  }
623  }
624  return deleted;
625 }
626 
627 // Check if a flow is to be aged or evicted. Returns number of flows visited
628 uint32_t FlowStatsCollector::ProcessFlow(FlowExportInfoList::iterator &it,
629  KSyncFlowMemory *ksync_obj,
630  FlowExportInfo *info,
631  uint64_t curr_time) {
632  uint32_t count = 1;
633  FlowEntry *fe = info->flow();
634  /* Use flow-handle and gen-id from FlowExportInfo instead of FlowEntry.
635  * The stats that FlowExportInfo holds corresponds to a given
636  * (FlowKey, gen-id and FlowHandle). Since gen-id/flow-handle for a flow
637  * can change dynamically, we need to pick gen-id and flow-handle from
638  * FlowExportInfo. Otherwise stats will go wrong. Whenever gen-id/
639  * flow-handle changes, the stats will be reset as part of AddFlow API
640  */
641  uint32_t flow_handle = info->flow_handle();
642  uint16_t gen_id = info->gen_id();
643 
644  /* If Flow handle is still not populated in FlowStatsCollector, pick the
645  * value from FlowEntry
646  */
647  if (flow_handle == FlowEntry::kInvalidFlowHandle) {
648  {
649  FlowEntry *rflow = NULL;
651  // since flow processing and stats collector can run in parallel
652  // flow handle and gen id not being the key for flow entry can
653  // change while processing, so flow handle and gen id should be
654  // fetched by holding an lock.
655  flow_handle = fe->flow_handle();
656  gen_id = fe->gen_id();
657  info->CopyFlowInfo(fe);
658  }
659  }
660  const vr_flow_entry *k_flow = NULL;
661  vr_flow_stats k_stats;
662  KFlowData kinfo;
663 
664  /* Teardown time is set when Evicted flow stats update message is received.
665  * For flows whose teardown time is set, we need not read stats from
666  * vrouter
667  */
668  if (!info->teardown_time()) {
669  k_flow = ksync_obj->GetKFlowStatsAndInfo(fe->key(), flow_handle,
670  gen_id, &k_stats, &kinfo);
671 
672  // Flow evicted?
673  if (EvictFlow(ksync_obj, k_flow, kinfo.flags, flow_handle, gen_id,
674  info, curr_time) == true) {
675  // If retry_delete_ enabled, dont change flow_export_info_list_
676  if (retry_delete_ == true)
677  return count;
678 
679  // We dont want to retry delete-events, remove flow from ageing list
680  assert(info->is_linked());
681  FlowExportInfoList::iterator flow_it =
682  flow_export_info_list_.iterator_to(*info);
683  flow_export_info_list_.erase(flow_it);
684 
685  return count;
686  }
687  }
688 
689 
690  // Flow aged?
691  if (AgeFlow(ksync_obj, k_flow, k_stats, kinfo, info, curr_time) == false)
692  return count;
693 
694  // If retry_delete_ enabled, dont change flow_export_info_list_
695  if (retry_delete_ == false)
696  return count;
697 
698  // Flow aged, remove both forward and reverse flow
699  assert(info->is_linked());
700  FlowExportInfoList::iterator flow_it =
701  flow_export_info_list_.iterator_to(*info);
702  flow_export_info_list_.erase(flow_it);
703 
704  FlowEntry *rfe = info->reverse_flow();
705  FlowExportInfo *rev_info = FindFlowExportInfo(rfe);
706  if (rev_info) {
707  if (rev_info->is_linked()) {
708  FlowExportInfoList::iterator rev_flow_it =
709  flow_export_info_list_.iterator_to(*rev_info);
710  if (rev_flow_it == it) {
711  it++;
712  }
713  flow_export_info_list_.erase(rev_flow_it);
714  }
715  count++;
716  }
717  return count;
718 }
719 
720 uint32_t FlowStatsCollector::RunAgeing(uint32_t max_count) {
721  FlowExportInfoList::iterator it;
722  if (flow_iteration_key_ == NULL) {
723  it = flow_export_info_list_.begin();
724  } else {
725  FlowEntryTree::iterator tree_it = flow_tree_.find(flow_iteration_key_);
726  // Flow to iterate next is not found. Force stop this iteration.
727  // We will continue from begining on next timer
728  if (tree_it == flow_tree_.end()) {
729  flow_iteration_key_ = NULL;
730  return entries_to_visit_;
731  }
732  it = flow_export_info_list_.iterator_to(tree_it->second);
733  }
734 
735  KSyncFlowMemory *ksync_obj = agent_uve_->agent()->ksync()->
736  ksync_flow_memory();
737  uint64_t curr_time = GetCurrentTime();
738  uint32_t count = 0;
739  while (count < max_count) {
740  if (it == flow_export_info_list_.end()) {
741  break;
742  }
743 
744  FlowExportInfo *info = &(*it);
745  it++;
746  flows_visited_++;
747  count += ProcessFlow(it, ksync_obj, info, curr_time);
748  }
749 
750  // Update iterator for next pass
751  if (it == flow_export_info_list_.end()) {
752  flow_iteration_key_ = NULL;
753  } else {
754  flow_iteration_key_ = it->flow();
755  }
756 
757  return count;
758 }
759 
760 // Timer fired for ageing. Update the number of entries to visit and start the
761 // task if its already not ruuning
763  if (flow_tree_.size() == 0) {
764  return true;
765  }
766 
767  // Update number of entries to visit in flow.
769 
770  // Start task to scan the entries
771  if (ageing_task_ == NULL) {
773 
774  if (flow_ageing_debug_) {
775  LOG(DEBUG,
777  << " AgeingTasks Num " << ageing_task_starts_
778  << " Request count " << request_queue_.Length()
779  << " Tree size " << flow_tree_.size()
780  << " List size " << flow_export_info_list_.size()
781  << " flows visited " << flows_visited_
782  << " flows aged " << flows_aged_
783  << " flows evicted " << flows_evicted_);
784  }
785  flows_visited_ = 0;
786  flows_aged_ = 0;
787  flows_evicted_ = 0;
788  ageing_task_ = new AgeingTask(this);
790  }
791  return true;
792 }
793 
795  // Run ageing per task
796  uint32_t count = RunAgeing(kFlowsPerTask);
797  // Update number of entries visited
798  if (count < entries_to_visit_)
799  entries_to_visit_ -= count;
800  else
801  entries_to_visit_ = 0;
802  // Done with task if we reach end of tree or count is exceeded
803  if (flow_iteration_key_ == NULL || entries_to_visit_ == 0) {
804  entries_to_visit_ = 0;
805  ageing_task_ = NULL;
806  return true;
807  }
808 
809  // More entries to visit. Continue the task
810  return false;
811 }
812 
814 // Utility methods to enqueue events into work-queue
817  FlowExportInfo info(flow, GetCurrentTime());
818  boost::shared_ptr<FlowExportReq>
819  req(new FlowExportReq(FlowExportReq::ADD_FLOW, info));
820  request_queue_.Enqueue(req);
821 }
822 
824  const RevFlowDepParams &params) {
825  FlowExportInfo info(flow);
826  boost::shared_ptr<FlowExportReq>
828  GetCurrentTime(), params));
829  request_queue_.Enqueue(req);
830 }
831 
833  uint32_t bytes,
834  uint32_t packets,
835  uint32_t oflow_bytes,
836  const boost::uuids::uuid &u) {
837  FlowExportInfo info(flow);
838  boost::shared_ptr<FlowExportReq>
839  req(new FlowExportReq(FlowExportReq::UPDATE_FLOW_STATS, info, bytes,
840  packets, oflow_bytes, u));
841  request_queue_.Enqueue(req);
842 }
843 
846  return true;
847 }
848 
850 }
851 
852 bool FlowStatsCollector::RequestHandler(boost::shared_ptr<FlowExportReq> req) {
853  const FlowExportInfo &info = req->info();
854  FlowEntry *flow = info.flow();
855  FlowEntry *rflow = info.reverse_flow();
856  FLOW_LOCK(flow, rflow, FlowEvent::FLOW_MESSAGE);
857 
858  switch (req->event()) {
860  AddFlow(req->info());
861  break;
862  }
863 
865  FlowEntryTree::iterator it;
866  // Get the FlowExportInfo for flow
867  if (FindFlowExportInfo(flow, it) == false)
868  break;
869 
870  /* We don't export flows in TSN mode */
871  if (agent_uve_->agent()->tsn_enabled() == false) {
872  FlowExportInfo *info = &it->second;
873  /* While updating stats for evicted flows, we set the teardown_time
874  * and export the flow. So delete handling for evicted flows need
875  * not update stats and export flow */
876  if (!info->teardown_time()) {
877  UpdateFlowStats(info, req->time());
878  }
879  }
880  /* Remove the entry from our tree */
881  DeleteFlow(it);
882  break;
883  }
884 
886  /* We don't export flows in TSN mode */
887  if (agent_uve_->agent()->tsn_enabled() == false) {
888  EvictedFlowStatsUpdate(flow, req->bytes(), req->packets(),
889  req->oflow_bytes(), req->uuid());
890  }
891  break;
892  }
893 
894  default:
895  assert(0);
896  }
897 
898  if (deleted_ && parent_->CanDelete()) {
900  }
901 
902  return true;
903 }
904 
907  FlowEntryTree::iterator it = flow_tree_.find(fe);
908  if (it == flow_tree_.end()) {
909  return NULL;
910  }
911 
912  return &it->second;
913 }
914 
915 const FlowExportInfo *
917  FlowEntryTree::const_iterator it = flow_tree_.find(fe);
918  if (it == flow_tree_.end()) {
919  return NULL;
920  }
921 
922  return &it->second;
923 }
924 
926  FlowEntryTree::iterator &it) {
927  it = flow_tree_.find(fe);
928  if (it == flow_tree_.end()) {
929  return false;
930  }
931  return true;
932 }
933 
935  /* In TSN mode, we don't export flows or statistics based on flows */
936  if (agent_uve_->agent()->tsn_enabled()) {
937  return;
938  }
939  const FlowKey &key = flow->key();
940  uint8_t proto = key.protocol;
941  uint16_t sport = key.src_port;
942  uint16_t dport = key.dst_port;
943 
944  // Update vrouter port bitmap
945  VrouterUveEntry *vre = static_cast<VrouterUveEntry *>(
947  vre->UpdateBitmap(proto, sport, dport);
948 
949  // Update source-vn port bitmap
950  VnUveTable *vnte = static_cast<VnUveTable *>(agent_uve_->vn_uve_table());
951  vnte->UpdateBitmap(flow->data().source_vn_match, proto, sport, dport);
952  // Update dest-vn port bitmap
953  vnte->UpdateBitmap(flow->data().dest_vn_match, proto, sport, dport);
954 
955  const VmInterface *port = dynamic_cast<const VmInterface *>
956  (flow->intf_entry());
957  if (port == NULL) {
958  return;
959  }
960  const VmEntry *vm = port->vm();
961  if (vm == NULL) {
962  return;
963  }
964 
965  // update vm and interface (all interfaces of vm) bitmap
966  VmUveTable *vmt = static_cast<VmUveTable *>(agent_uve_->vm_uve_table());
967  vmt->UpdateBitmap(vm, proto, sport, dport);
968 }
969 
971  /* Before inserting update the gen_id and flow_handle in FlowExportInfo.
972  * Locks for accessing fields of flow are taken in calling function.
973  */
974  FlowEntry* fe = info.flow();
975  info.CopyFlowInfo(fe);
976  std::pair<FlowEntryTree::iterator, bool> ret =
977  flow_tree_.insert(make_pair(fe, info));
978  if (ret.second == false) {
979  FlowExportInfo &prev = ret.first->second;
980  if (prev.uuid() != fe->uuid()) {
981  /* Received ADD request for already added entry with a different
982  * UUID. Because of state-compression of messages to
983  * FlowStatsCollector in FlowMgmt, we have not received DELETE for
984  * previous UUID. Send FlowExport to indicate delete for the flow.
985  * This export need not be sent if teardown time is already set.
986  * Teardown time would be set if EvictedFlowStats update request
987  * comes before this duplicate add.
988  */
989  if (!prev.teardown_time()) {
990  UpdateFlowStats(&prev, info.last_modified_time());
991  }
992  /* After sending Delete to collector (if required), reset the stats
993  */
994  prev.ResetStats();
995  }
996  prev.CopyFlowInfo(fe);
997  prev.set_delete_enqueue_time(0);
998  prev.set_evict_enqueue_time(0);
999  prev.set_teardown_time(0);
1000  } else {
1001  NewFlow(info.flow());
1002  }
1003  if (ret.first->second.is_linked() == false) {
1004  flow_export_info_list_.push_back(ret.first->second);
1005  }
1006 }
1007 
1008 // The flow being deleted may be the first flow to visit in next ageing
1009 // iteration. Update the flow to visit next in such case
1011 (const FlowEntry *del_flow, FlowEntryTree::iterator &tree_it) {
1012  // Flow not found in tree is not a valid scenario. Lets be safe and
1013  // restart walk here
1014  if (tree_it == flow_tree_.end()) {
1015  flow_iteration_key_ = NULL;
1016  }
1017 
1018  if (flow_iteration_key_ == NULL) {
1019  return;
1020  }
1021 
1022  // The flow to visit next for ageing is being deleted. Update next flow to
1023  // visit
1024  FlowExportInfoList::iterator it =
1025  flow_export_info_list_.iterator_to(tree_it->second);
1026  ++it;
1027 
1028  // If this is end of list, start from begining again
1029  if (it == flow_export_info_list_.end())
1030  it = flow_export_info_list_.begin();
1031 
1032  if (it == flow_export_info_list_.end()) {
1033  flow_iteration_key_ = NULL;
1034  } else {
1035  flow_iteration_key_ = it->flow();
1036  }
1037 }
1038 
1039 void FlowStatsCollector::DeleteFlow(FlowEntryTree::iterator &it) {
1040  // Update flow_iteration_key_ if flow being deleted is flow to visit in
1041  // next ageing cycle
1042  // Nothing to do if flow being deleted is not the next-iteration key
1043  if (it->first == flow_iteration_key_) {
1044  UpdateFlowIterationKey(it->first, it);
1045  }
1046 
1047  if (it == flow_tree_.end())
1048  return;
1049 
1050  if (it->second.is_linked()) {
1051  FlowExportInfoList::iterator it1 =
1052  flow_export_info_list_.iterator_to(it->second);
1053  flow_export_info_list_.erase(it1);
1054  }
1055 
1056  flow_tree_.erase(it);
1057 }
1058 
1060  uint32_t bytes,
1061  uint32_t packets,
1062  uint32_t oflow_bytes,
1063  const boost::uuids::uuid &u) {
1064  FlowExportInfo *info = FindFlowExportInfo(flow.get());
1065  if (info) {
1066  /* Ignore stats update request for Evicted flow, if we don't have
1067  * FlowEntry corresponding to the Evicted Flow. The match is done using
1068  * UUID
1069  */
1070  if (info->uuid() != u) {
1071  return;
1072  }
1073  /* We are updating stats of evicted flow. Set teardown_time here.
1074  * When delete event is being handled we don't export flow if
1075  * teardown time is set */
1076  UpdateFlowStatsInternal(info, bytes, oflow_bytes & 0xFFFF,
1077  packets, oflow_bytes & 0xFFFF0000,
1078  GetCurrentTime(), true);
1079  }
1080 }
1081 
1083 // Introspect routines
1085 static void KeyToSandeshFlowKey(const FlowKey &key,
1086  SandeshFlowKey &skey) {
1087  skey.set_nh(key.nh);
1088  skey.set_sip(key.src_addr.to_string());
1089  skey.set_dip(key.dst_addr.to_string());
1090  skey.set_src_port(key.src_port);
1091  skey.set_dst_port(key.dst_port);
1092  skey.set_protocol(key.protocol);
1093 }
1094 
1095 static void FlowExportInfoToSandesh(const FlowExportInfo &value,
1096  SandeshFlowExportInfo &info) {
1097  SandeshFlowKey skey;
1098  FlowEntry *flow = value.flow();
1099  FlowEntry *rflow = value.reverse_flow();
1100  KeyToSandeshFlowKey(flow->key(), skey);
1101  info.set_key(skey);
1102  info.set_uuid(to_string(flow->uuid()));
1103  if (rflow) {
1104  info.set_rev_flow_uuid(to_string(rflow->uuid()));
1105  }
1106  if (!flow->data().origin_vn_src.empty()) {
1107  info.set_source_vn(flow->data().origin_vn_src);
1108  } else {
1109  info.set_source_vn(flow->data().source_vn_match);
1110  }
1111  if (!flow->data().origin_vn_dst.empty()) {
1112  info.set_dest_vn(flow->data().origin_vn_dst);
1113  } else {
1114  info.set_dest_vn(flow->data().dest_vn_match);
1115  }
1116  info.set_sg_rule_uuid(flow->sg_rule_uuid());
1117  info.set_nw_ace_uuid(flow->nw_ace_uuid());
1118  info.set_teardown_time(value.teardown_time());
1119  info.set_last_modified_time(value.last_modified_time());
1120  info.set_bytes(value.bytes());
1121  info.set_packets(value.packets());
1122  info.set_flow_handle(flow->flow_handle());
1123  std::vector<ActionStr> action_str_l;
1124  SetActionStr(flow->data().match_p.action_info, action_str_l);
1125  info.set_action(action_str_l);
1126  info.set_vm_cfg_name(flow->data().vm_cfg_name);
1127  info.set_peer_vrouter(flow->peer_vrouter());
1128  info.set_tunnel_type(flow->tunnel_type().ToString());
1129  const VmInterfaceKey &vmi = flow->fip_vmi();
1130  string vmi_str = to_string(vmi.uuid_) + vmi.name_;
1131  info.set_fip_vmi(vmi_str);
1132  Ip4Address ip(flow->fip());
1133  info.set_fip(ip.to_string());
1134  info.set_delete_enqueued(value.delete_enqueue_time() ? true : false);
1135 }
1136 
1137 void FlowStatsRecordsReq::HandleRequest() const {
1138  FlowStatsCollector::FlowEntryTree::iterator it;
1139  vector<FlowStatsRecord> list;
1140  FlowStatsRecordsResp *resp = new FlowStatsRecordsResp();
1141  for (int i = 0; i < FlowStatsCollectorObject::kMaxCollectors; i++) {
1143  flow_stats_manager()->default_flow_stats_collector_obj()->
1144  GetCollector(i);
1145  it = col->flow_tree_.begin();
1146  while (it != col->flow_tree_.end()) {
1147  const FlowExportInfo &value = it->second;
1148  ++it;
1149 
1150  SandeshFlowKey skey;
1151  KeyToSandeshFlowKey(value.flow()->key(), skey);
1152 
1153  SandeshFlowExportInfo info;
1154  FlowExportInfoToSandesh(value, info);
1155 
1156  FlowStatsRecord rec;
1157  rec.set_info(info);
1158  list.push_back(rec);
1159  }
1160  }
1161  resp->set_records_list(list);
1162 
1163  resp->set_context(context());
1164  resp->Response();
1165  return;
1166 }
1167 
1169 // Flow Stats Ageing task
1172  Task(fsc->task_id(), fsc->instance_id()), fsc_(fsc) {
1173 }
1174 
1176 }
1177 
1179  return "Flow Stats Collector Ageing Task";
1180 }
1181 
1183  return fsc_->RunAgeingTask();
1184 }
1185 
1187 // FlowStatsCollectorObject methods
1190  FlowStatsCollectorReq *req,
1191  FlowStatsManager *mgr) {
1192  FlowAgingTableKey *key = &(req->key);
1193  for (int i = 0; i < kMaxCollectors; i++) {
1194  uint32_t instance_id = mgr->AllocateIndex();
1195  boost::asio::io_context& io_ref =
1196  const_cast<boost::asio::io_context&>
1197  (*agent->event_manager()->io_service());
1198  collectors[i].reset(
1199  AgentStaticObjectFactory::CreateRef<FlowStatsCollector>(
1200  io_ref,
1202  agent->uve(), instance_id, key, mgr, this));
1203  }
1204 }
1205 
1207  if (idx >= 0 && idx < kMaxCollectors) {
1208  return collectors[idx].get();
1209  }
1210  return NULL;
1211 }
1212 
1214  for (int i = 0; i < kMaxCollectors; i++) {
1215  collectors[i]->set_expiry_time(time);
1216  }
1217 }
1218 
1220  /* Same expiry time would be configured for all the collectors. Pick value
1221  * from any one of them */
1222  return collectors[0]->expiry_time();
1223 }
1224 
1226  for (int i = 0; i < kMaxCollectors; i++) {
1227  collectors[i]->set_deleted(true);
1228  }
1229 }
1230 
1232  for (int i = 0; i < kMaxCollectors; i++) {
1233  collectors[i]->set_deleted(false);
1234  }
1235 }
1236 
1238  for (int i = 0; i < kMaxCollectors; i++) {
1239  if (!collectors[i]->deleted()) {
1240  return false;
1241  }
1242  }
1243  return true;
1244 }
1245 
1247  for (int i = 0; i < kMaxCollectors; i++) {
1248  collectors[i]->set_flow_age_time_intvl(value);
1249  }
1250 }
1251 
1253  /* Same age time would be configured for all the collectors. Pick value
1254  * from any one of them */
1255  return collectors[0]->flow_age_time_intvl();
1256 }
1257 
1259  for (int i = 0; i < kMaxCollectors; i++) {
1260  if (collectors[i]->flow_tree_.size() != 0 ||
1261  collectors[i]->request_queue_.IsQueueEmpty() == false) {
1262  return false;
1263  }
1264  }
1265  return true;
1266 }
1267 
1269  for (int i = 0; i < kMaxCollectors; i++) {
1270  collectors[i]->Shutdown();
1271  collectors[i].reset();
1272  }
1273 }
1274 
1276  (const FlowEntry *flow) {
1277  uint8_t idx = 0;
1278  FlowTable *table = flow->flow_table();
1279  if (table) {
1280  idx = table->table_index() % kMaxCollectors;
1281  }
1282  return collectors[idx].get();
1283 }
1284 
1286  for (int i = 0; i < kMaxCollectors; i++) {
1287  collectors[i]->UpdateFlowAgeTimeInSecs(age_time);
1288  }
1289 }
1290 
1292  /* Same age time would be configured for all the collectors. Pick value
1293  * from any one of them */
1294  return collectors[0]->flow_age_time_intvl_in_secs();
1295 }
1296 
1298  size_t size = 0;
1299  for (int i = 0; i < kMaxCollectors; i++) {
1300  size += collectors[i]->Size();
1301  }
1302  return size;
1303 }
bool MeasureQueueDelay()
Definition: agent.cc:1136
VmInterfaceKey ReverseFlowFipVmi(const FlowExportInfo *info)
VnUveTableBase * vn_uve_table() const
IpAddress src_addr
Definition: flow_entry.h:213
const TagList & local_tagset() const
Definition: flow_entry.cc:3502
void SetEntryCallback(TaskEntryCallback on_entry)
Definition: queue_task.h:299
Type type() const
Definition: interface.h:112
VrouterUveEntryBase * vrouter_uve_entry() const
bool tsn_enabled() const
Definition: agent.h:1162
const vr_flow_entry * GetKFlowStatsAndInfo(const FlowKey &key, uint32_t idx, uint8_t gen_id, vr_flow_stats *stats, KFlowData *info) const
static uint64_t GetFlowStats(const uint16_t &oflow_data, const uint32_t &data)
void set_packets(uint64_t value)
static Agent * GetInstance()
Definition: agent.h:436
uint8_t gen_id() const
FlowStatsCollector * FlowToCollector(const FlowEntry *flow)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
void set_teardown_time(uint64_t time)
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
void Free(const FlowAgingTableKey &key)
const std::string fw_policy_name_uuid() const
Definition: flow_entry.cc:3634
std::string dest_vn_match
Definition: flow_entry.h:295
uint64_t evict_enqueue_time() const
void set_last_modified_time(uint64_t time)
void EvictedFlowStatsUpdate(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
FlowTable * flow_table() const
Definition: flow_entry.h:597
void UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams &params)
IpAddress dst_addr
Definition: flow_entry.h:214
void UpdateFlowStatsInternal(FlowExportInfo *info, uint32_t bytes, uint16_t oflow_bytes, uint32_t pkts, uint16_t oflow_pkts, uint64_t time, bool teardown_time)
std::string source_vn_match
Definition: flow_entry.h:294
AgentUveBase * uve() const
Definition: agent.cc:909
uint32_t ProcessFlow(FlowExportInfoList::iterator &it, KSyncFlowMemory *ksync_obj, FlowExportInfo *info, uint64_t curr_time)
const VmInterface * vmi
void AddEvent(const FlowEntryPtr &flow)
static void KeyToSandeshFlowKey(const FlowKey &key, SandeshFlowKey &skey)
FlowAgingTableKey flow_aging_key_
Definition: vm.h:32
static const uint32_t kFlowsPerTask
InterfaceUveTable::FloatingIp * FipEntry(uint32_t fip, const string &vn, Interface *intf)
bool AgeFlow(KSyncFlowMemory *ksync_obj, const vr_flow_entry *k_flow, const vr_flow_stats &k_stats, const KFlowData &kinfo, FlowExportInfo *info, uint64_t curr_time)
AgeingTask(FlowStatsCollector *fsc)
boost::asio::io_context * io_service()
Definition: event_manager.h:42
MatchPolicy match_p
Definition: flow_entry.h:309
void UpdateBitmap(uint8_t proto, uint16_t sport, uint16_t dport)
void UpdateFlowStats(FlowExportInfo *info, uint64_t teardown_time)
uint32_t RunAgeing(uint32_t max_count)
void CopyFlowInfo(FlowEntry *fe)
#define FLOW_LOCK(flow, rflow, flow_event)
Definition: flow_table.h:61
boost::uuids::random_generator rand_gen_
bool flow_ageing_debug_
boost::uuids::uuid uuid
uint16_t table_index() const
Definition: flow_table.h:198
bool ShouldBeAged(FlowExportInfo *info, const vr_flow_entry *k_flow, const vr_flow_stats &k_stats, uint64_t curr_time)
void UpdateFlowStatsInternalLocked(FlowExportInfo *info, uint32_t bytes, uint16_t oflow_bytes, uint32_t pkts, uint16_t oflow_pkts, uint64_t time, bool teardown_time)
void SetActionStr(const FlowAction &action_info, std::vector< ActionStr > &action_str_l)
Definition: flow_entry.cc:3015
uint8_t protocol
Definition: flow_entry.h:215
void UpdateInterVnStats(FlowExportInfo *info, uint64_t bytes, uint64_t pkts)
const FlowEntry * flow_iteration_key_
FlowStatsCollector * GetCollector(uint8_t idx) const
FlowStatsCollectorObject(Agent *agent, FlowStatsCollectorReq *req, FlowStatsManager *mgr)
Agent * agent() const
bool EvictFlow(KSyncFlowMemory *ksync_obj, const vr_flow_entry *k_flow, uint16_t k_flow_flags, uint32_t flow_handle, uint16_t gen_id, FlowExportInfo *info, uint64_t curr_time)
uint64_t last_modified_time() const
uint32_t instance_id() const
uint64_t GetUpdatedFlowBytes(const FlowExportInfo *stats, uint64_t k_flow_bytes)
void UpdateFloatingIpStats(const FlowExportInfo *flow, uint64_t bytes, uint64_t pkts)
const std::string & sg_rule_uuid() const
Definition: flow_entry.h:633
void NewFlow(FlowEntry *flow)
FlowStatsCollector(boost::asio::io_context &io, int intvl, uint32_t flow_cache_timeout, AgentUveBase *uve, uint32_t instance_id, FlowAgingTableKey *key, FlowStatsManager *aging_module, FlowStatsCollectorObject *obj)
const std::string & peer_vrouter() const
Definition: flow_entry.h:642
void set_evict_enqueue_time(uint64_t value)
std::string name_
Definition: interface.h:245
TunnelType tunnel_type() const
Definition: flow_entry.h:643
static void FlowExportInfoToSandesh(const FlowExportInfo &value, SandeshFlowExportInfo &info)
const FlowKey & key() const
Definition: flow_entry.h:594
VmUveTableBase * vm_uve_table() const
static const std::string UnknownVn()
Definition: flow_handler.h:26
void UpdateAgeTimeInSeconds(uint32_t age_time)
bool is_flags_set(const FlowEntryFlags &flags) const
Definition: flow_entry.h:610
TaskScheduler * task_scheduler() const
Definition: agent.h:1120
void SetExitCallback(TaskExitCallback on_exit)
Definition: queue_task.h:303
Definition: agent.h:358
size_t Length() const
Definition: queue_task.h:356
FlowEntry * flow() const
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
const vr_flow_entry * GetKFlowStats(const FlowKey &key, uint32_t idx, uint8_t gen_id, vr_flow_stats *stats) const
const boost::uuids::uuid & uuid() const
Definition: flow_entry.h:631
const boost::uuids::uuid & uuid() const
KSync * ksync() const
Definition: agent.cc:901
VmInterfaceKey fip_vmi() const
Definition: flow_entry.h:627
EventManager * event_manager() const
Definition: agent.h:1103
std::string ToString() const
Definition: nexthop.h:260
void UpdateInterVnStats(const std::string &src, const std::string &dst, uint64_t bytes, uint64_t pkts, bool outgoing)
Definition: vn_uve_table.cc:99
void UpdateBitmap(const VmEntry *vm, uint8_t proto, uint16_t sport, uint16_t dport)
Definition: vm_uve_table.cc:23
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
std::string origin_vn_src
Definition: flow_entry.h:296
uint64_t delete_enqueue_time() const
void EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle, uint8_t gen_id, uint8_t evict_gen_id)
Definition: flow_proto.cc:611
static void GetFlowSandeshActionParams(const FlowAction &action_info, std::string &action_str)
Definition: flow_table.cc:944
bool RequestHandler(boost::shared_ptr< FlowExportReq > req)
uint32_t fip() const
Definition: flow_entry.h:626
FlowAction action_info
Definition: flow_entry.h:277
static const uint32_t kFlowStatsTimerInterval
boost::asio::ip::address_v4 Ip4Address
Definition: address.h:14
#define kTaskFlowStatsCollector
Definition: agent.h:328
FlowProto * get_flow_proto() const
Definition: pkt_init.h:43
void SetFlowAgeTime(uint64_t value)
const Interface * intf_entry() const
Definition: flow_entry.h:652
std::string origin_vn_dst
Definition: flow_entry.h:297
uint16_t src_port
Definition: flow_entry.h:216
const TagList & remote_tagset() const
Definition: flow_entry.cc:3509
boost::uuids::uuid rand_gen()
InterfaceUveTable::FloatingIp * ReverseFlowFipEntry(const FlowExportInfo *flow)
InterfaceUveTable * interface_uve_table() const
std::string vm_cfg_name
Definition: flow_entry.h:357
void UpdateFloatingIpStats(const FipInfo &fip_info)
void UpdateVmiTagBasedStats(const EndpointStatsInfo &info)
uint32_t flow_handle() const
FlowExportInfoList flow_export_info_list_
void DeleteFlow(FlowEntryTree::iterator &it)
static const uint32_t kFlowScanTime
uint32_t flow_handle() const
Definition: flow_entry.h:600
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
FlowStatsCollectorObject * parent_
void DeleteFlowRequest(FlowEntry *flow)
Definition: flow_proto.cc:598
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
uint16_t dst_port
Definition: flow_entry.h:217
void UpdateVmiTagBasedStats(FlowExportInfo *info, uint64_t bytes, uint64_t pkts)
uint64_t packets() const
bool IsEvictionMarked(const vr_flow_entry *entry, uint16_t flags) const
static const uint32_t kInvalidFlowHandle
Definition: flow_entry.h:521
void AddFlow(FlowExportInfo info)
static const uint32_t kMinFlowsPerTimer
#define LOG(_Level, _Msg)
Definition: logging.h:33
void set_bytes(uint64_t value)
uint64_t bytes() const
bool delete_short_flow() const
const std::string RemotePrefix() const
Definition: flow_entry.cc:3535
uint64_t teardown_time() const
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29
void UpdateFlowIterationKey(const FlowEntry *del_flow, FlowEntryTree::iterator &tree_it)
FlowData & data()
Definition: flow_entry.h:595
void FlowDeleteEnqueue(FlowExportInfo *info, uint64_t t)
const std::string & nw_ace_uuid() const
Definition: flow_entry.h:636
FlowAgingTableKey key
static const uint64_t kFlowDeleteRetryTime
tbb::atomic< bool > deleted_
void RequestHandlerExit(bool done)
static const uint64_t FlowAgeTime
uint8_t gen_id() const
Definition: flow_entry.h:599
PktModule * pkt() const
Definition: agent.cc:965
uint32_t nh
Definition: flow_entry.h:212
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
boost::uuids::uuid uuid_
Definition: interface.h:244
uint32_t ReverseFlowFip(const FlowExportInfo *info)
static uint64_t GetCurrentTime()
uint64_t GetUpdatedFlowPackets(const FlowExportInfo *stats, uint64_t k_flow_pkts)
bool FindFlowExportInfo(const FlowEntry *fe, FlowEntryTree::iterator &it)
void set_name(const std::string &name)
Definition: queue_task.h:307
const VmEntry * vm() const
FlowStatsManager * flow_stats_manager_
uint32_t GetAgeTimeInSeconds() const
void FreeIndex(uint32_t idx)
FlowEntry * reverse_flow() const
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
Definition: flow_entry.h:125
void UpdateBitmap(const std::string &vn, uint8_t proto, uint16_t sport, uint16_t dport)
Definition: vn_uve_table.cc:87
void set_delete_enqueue_time(uint64_t value)
void FlowEvictEnqueue(FlowExportInfo *info, uint64_t t, uint32_t flow_handle, uint16_t gen_id)
static std::string UTCUsecToString(uint64_t tstamp)
Definition: time_util.h:54