OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
session_stats_collector.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <bitset>
6 #include <pkt/flow_table.h>
8 #include <uve/stats_collector.h>
10 #include <sandesh/common/flow_types.h>
11 #include <init/agent_param.h>
14 #include <oper/tag.h>
16 #include <oper/global_vrouter.h>
18 #include <vrouter/flow_stats/flow_stats_types.h>
19 #include <cmn/agent_factory.h>
20 
21 // setting work queue max size as 4M
22 #define DEFAULT_SSC_REQUEST_QUEUE_SIZE 4*1024*1024
23 #define MAX_SSC_REQUEST_QUEUE_ITERATIONS 256
24 
26  "SessionStats", 4000));
27 
28 bool session_debug_ = false;
30  AgentUveBase *uve,
31  uint32_t instance_id,
32  FlowStatsManager *aging_module,
34  StatsCollector(TaskScheduler::GetInstance()->GetTaskId
35  (kTaskSessionStatsCollector), instance_id,
36  io, kSessionStatsTimerInterval, "Session stats collector"),
37  agent_uve_(uve),
38  task_id_(uve->agent()->task_scheduler()->GetTaskId
40  session_ep_iteration_key_(), session_agg_iteration_key_(),
41  session_iteration_key_(),
42  request_queue_(agent_uve_->agent()->task_scheduler()->
44  instance_id,
45  boost::bind(&SessionStatsCollector::RequestHandler,
46  this, _1),
49  session_msg_list_(agent_uve_->agent()->params()->max_endpoints_per_session_msg(),
50  SessionEndpoint()),
51  session_msg_index_(0), instance_id_(instance_id),
52  flow_stats_manager_(aging_module), parent_(obj), session_task_(NULL),
53  current_time_(GetCurrentTime()), session_task_starts_(0) {
54  request_queue_.set_name("Session stats collector event queue");
58  (boost::bind(&SessionStatsCollector::RequestHandlerEntry, this));
60  (boost::bind(&SessionStatsCollector::RequestHandlerExit, this, _1));
62  InitDone();
63 }
64 
67 }
68 
70  return UTCTimestampUsec();
71 }
72 
76 }
77 
79  if (agent_uve_->agent()->slo_table()) {
81  boost::bind(&SessionStatsCollector::SloNotify, this,
82  _1, _2));
83  }
84 }
85 
87  if (session_endpoint_map_.size() == 0) {
88  return true;
89  }
90 
91  // Start task to scan the entries
92  if (session_task_ == NULL) {
94 
95  if (session_debug_) {
96  LOG(DEBUG,
98  << " SessionTasks Num " << session_task_starts_
99  << " session_ep visited " << session_ep_visited_
100  << " Request count " << request_queue_.Length());
101  }
103  session_task_ = new SessionTask(this);
105  }
106  return true;
107 }
108 
109 
111  uint64_t curr_time = FlowStatsCollector::GetCurrentTime();
112  bool export_rate_calculated = false;
113  uint32_t exp_rate_without_sampling = 0;
114 
115  /* If flows are not being exported, no need to update threshold */
116  if (!session_export_count_) {
117  return true;
118  }
119 
120  // Calculate Flow Export rate
122  uint64_t diff_secs = 0;
123  uint64_t diff_micro_secs = curr_time -
125  if (diff_micro_secs) {
126  diff_secs = diff_micro_secs/1000000;
127  }
128  if (diff_secs) {
130  session_export_rate_ = session_export_count/diff_secs;
131  exp_rate_without_sampling =
133  prev_flow_export_rate_compute_time_ = curr_time;
134  export_rate_calculated = true;
135  }
136  } else {
139  return true;
140  }
141 
142  uint32_t cfg_rate = agent_->oper_db()->global_vrouter()->
143  flow_export_rate();
144  /* No need to update threshold when flow_export_rate is NOT calculated
145  * and configured flow export rate has not changed */
146  if (!export_rate_calculated &&
147  (cfg_rate == prev_cfg_flow_export_rate_)) {
148  return true;
149  }
150  uint64_t cur_t = threshold(), new_t = 0;
151  // Update sampling threshold based on flow_export_rate_
152  if (session_export_rate_ < ((double)cfg_rate) * 0.8) {
153  /* There are two reasons why we can be here.
154  * 1. None of the flows were sampled because we never crossed
155  * 80% of configured flow-export-rate.
156  * 2. In scale setups, the threshold was updated to high value because
157  * of which flow-export-rate has dropped drastically.
158  * Threshold should be updated here depending on which of the above two
159  * situations we are in. */
162  } else {
163  if (session_export_rate_ < ((double)cfg_rate) * 0.5) {
164  UpdateThreshold((threshold_ / 4), false);
165  } else {
166  UpdateThreshold((threshold_ / 2), false);
167  }
168  }
169  } else if (session_export_rate_ > (cfg_rate * 3)) {
170  UpdateThreshold((threshold_ * 4), true);
171  } else if (session_export_rate_ > (cfg_rate * 2)) {
172  UpdateThreshold((threshold_ * 3), true);
173  } else if (session_export_rate_ > ((double)cfg_rate) * 1.25) {
174  UpdateThreshold((threshold_ * 2), true);
175  }
176  prev_cfg_flow_export_rate_ = cfg_rate;
177  new_t = threshold();
178  FLOW_EXPORT_STATS_TRACE(session_export_rate_, exp_rate_without_sampling,
179  cur_t, new_t);
180  return true;
181 }
182 
184 // Utility methods to enqueue events into work-queue
187  boost::shared_ptr<SessionStatsReq>
189  GetCurrentTime()));
190  request_queue_.Enqueue(req);
191 }
192 
194  const RevFlowDepParams &params) {
195  boost::shared_ptr<SessionStatsReq>
197  GetCurrentTime(), params));
198  request_queue_.Enqueue(req);
199 }
200 
202  uint32_t bytes,
203  uint32_t packets,
204  uint32_t oflow_bytes,
205  const boost::uuids::uuid &u) {
206  boost::shared_ptr<SessionStatsReq>
208  bytes, packets, oflow_bytes, u));
209  request_queue_.Enqueue(req);
210 }
211 
212 void SessionStatsCollector::DispatchSessionMsg(const std::vector<SessionEndpoint> &lst) {
215  SESSION_ENDPOINT_OBJECT_LOG("", SandeshLevel::SYS_INFO, lst);
216 }
217 
220  if (session_msg_index_ ==
223  session_msg_index_ = 0;
224  }
225 }
226 
228  if (session_msg_index_ == 0) {
229  return;
230  }
231 
232  vector<SessionEndpoint>::const_iterator first = session_msg_list_.begin();
233  vector<SessionEndpoint>::const_iterator last = session_msg_list_.begin() +
235  vector<SessionEndpoint> new_list(first, last);
236  DispatchSessionMsg(new_list);
237  session_msg_index_ = 0;
238 }
239 
241  SessionEndpoint &obj = session_msg_list_[session_msg_index_];
242  obj = SessionEndpoint();
243  return session_msg_index_;
244 }
245 
248  return true;
249 }
250 
252 }
253 
254 bool SessionStatsCollector::RequestHandler(boost::shared_ptr<SessionStatsReq> req) {
255  FlowEntry *flow = req->flow();
256  FlowEntry *rflow = req->reverse_flow();
257  FLOW_LOCK(flow, rflow, FlowEvent::FLOW_MESSAGE);
258 
259  switch (req->event()) {
261  AddSession(flow, req->time());
262  break;
263  }
264 
266  DeleteSession(flow, flow->uuid(), req->time(), &req->params());
267  break;
268  }
269 
271  EvictedSessionStatsUpdate(req->flow(), req->bytes(), req->packets(),
272  req->oflow_bytes(), req->uuid());
273  break;
274  }
275 
276  default:
277  assert(0);
278  }
279 
280  return true;
281 }
282 
284  if (vmi_cfg_name != rhs.vmi_cfg_name) {
285  return vmi_cfg_name < rhs.vmi_cfg_name;
286  }
287  if (local_vn != rhs.local_vn) {
288  return local_vn < rhs.local_vn;
289  }
290  if (remote_vn != rhs.remote_vn) {
291  return remote_vn < rhs.remote_vn;
292  }
293  if (local_tagset != rhs.local_tagset) {
294  return local_tagset < rhs.local_tagset;
295  }
296  if (remote_tagset != rhs.remote_tagset) {
297  return remote_tagset < rhs.remote_tagset;
298  }
299  if (remote_prefix != rhs.remote_prefix) {
300  return remote_prefix < rhs.remote_prefix;
301  }
302  if (match_policy != rhs.match_policy) {
303  return match_policy < rhs.match_policy;
304  }
307  }
308  return is_si < rhs.is_si;
309 }
310 
312  if (vmi_cfg_name != rhs.vmi_cfg_name) {
313  return false;
314  }
315  if (local_vn != rhs.local_vn) {
316  return false;
317  }
318  if (remote_vn != rhs.remote_vn) {
319  return false;
320  }
321  if (local_tagset != rhs.local_tagset) {
322  return false;
323  }
324  if (remote_tagset != rhs.remote_tagset) {
325  return false;
326  }
327  if (remote_prefix != rhs.remote_prefix) {
328  return false;
329  }
330  if (match_policy != rhs.match_policy) {
331  return false;
332  }
334  return false;
335  }
336  if (is_si != rhs.is_si) {
337  return false;
338  }
339  return true;
340 }
341 
343  vmi_cfg_name = "";
344  local_vn = "";
345  remote_vn = "";
346  local_tagset.clear();
347  remote_tagset.clear();
348  remote_prefix = "";
349  match_policy = "";
350  is_client_session = false;
351  is_si = false;
352 }
353 
354 bool SessionAggKey::IsLess(const SessionAggKey &rhs) const {
355  if (local_ip != rhs.local_ip) {
356  return local_ip < rhs.local_ip;
357  }
358  if (server_port != rhs.server_port) {
359  return server_port < rhs.server_port;
360  }
361  return proto < rhs.proto;
362 }
363 
364 bool SessionAggKey::IsEqual(const SessionAggKey &rhs) const {
365  if (local_ip != rhs.local_ip) {
366  return false;
367  }
368  if (server_port != rhs.server_port) {
369  return false;
370  }
371  if (proto != rhs.proto) {
372  return false;
373  }
374  return true;
375 }
376 
378  local_ip = IpAddress();
379  server_port = 0;
380  proto = 0;
381 }
382 
383 bool SessionKey::IsLess(const SessionKey &rhs) const {
384  if (remote_ip != rhs.remote_ip) {
385  return remote_ip < rhs.remote_ip;
386  }
387  if (client_port != rhs.client_port) {
388  return client_port < rhs.client_port;
389  }
390  return uuid < rhs.uuid;
391 }
392 
393 bool SessionKey::IsEqual(const SessionKey &rhs) const {
394  if (remote_ip != rhs.remote_ip) {
395  return false;
396  }
397  if (client_port != rhs.client_port) {
398  return false;
399  }
400  if (uuid != rhs.uuid) {
401  return false;
402  }
403  return true;
404 }
405 
407  remote_ip = IpAddress();
408  client_port = 0;
409  uuid = boost::uuids::nil_uuid();
410 }
411 
413  SessionAggKey &session_agg_key,
414  SessionKey &session_key,
415  SessionEndpointKey &session_endpoint_key) {
416  /*
417  * For non local flows always get the key for forward flow entry
418  * If vms are in same compute node (local route)
419  * Vrouter A (Client) <==> Vrouter B (Server)
420  * A->B (Ingress + Forwarding + Local)
421  * B->A (Ingress + Reverse + Local)
422  *
423  * If vms are in different compute nodes
424  * Vrouter A (Client) <==> Vrouter B (Server)
425  * A->B (Ingress + Forwarding) A->B (Egresss + Forward)
426  * B->A (Egress + Reverse) B->A (Ingress + Reverse)
427  */
428 
429  /*
430  * If it is non local reverse flow then NOP, actual
431  * config will be handled in fwd flow
432  */
433  if ((!(fe->is_flags_set(FlowEntry::LocalFlow))) &&
435  return false;
436  }
437 
438  const Interface *itf = fe->intf_entry();
439  if (!itf) {
440  return false;
441  }
442 
443  if (itf->type() != Interface::VM_INTERFACE) {
444  return false;
445  }
446 
447  const VmInterface *vmi = static_cast<const VmInterface *>(itf);
448  if (vmi->cfg_name().empty()) {
449  return false;
450  }
451  const string &src_vn = !fe->data().origin_vn_src.empty() ?
452  fe->data().origin_vn_src :
453  fe->data().source_vn_match;
454  const string &dst_vn = !fe->data().origin_vn_dst.empty() ?
455  fe->data().origin_vn_dst :
456  fe->data().dest_vn_match;
457 
458  session_endpoint_key.vmi_cfg_name = vmi->cfg_name();
459  session_endpoint_key.local_tagset = fe->local_tagset();
460  session_endpoint_key.remote_tagset = fe->remote_tagset();
461  session_endpoint_key.remote_prefix = fe->RemotePrefix();
462  session_endpoint_key.match_policy = fe->fw_policy_name_uuid();
463  if (vmi->service_intf_type().empty()) {
464  session_endpoint_key.is_si = false;
465  } else {
466  session_endpoint_key.is_si= true;
467  }
468 
469  if (fe->IsClientFlow()) {
470  session_agg_key.local_ip = fe->key().src_addr;
471  session_agg_key.server_port = fe->key().dst_port;
472  session_key.remote_ip = fe->key().dst_addr;
473  session_key.client_port = fe->key().src_port;
474  session_endpoint_key.local_vn = src_vn;
475  session_endpoint_key.remote_vn = dst_vn;
476  session_endpoint_key.is_client_session = true;
477  } else if (fe->IsServerFlow()) {
478  /*
479  * If it is local flow, then reverse flow
480  * (Ingress + Reverse) will be used to create
481  * the server side session (Egress + Forward)
482  */
484  session_agg_key.local_ip = fe->key().src_addr;
485  session_agg_key.server_port = fe->key().src_port;
486  session_key.remote_ip = fe->key().dst_addr;
487  session_key.client_port = fe->key().dst_port;
488  session_endpoint_key.local_vn = src_vn;
489  session_endpoint_key.remote_vn = dst_vn;
490  } else {
491  session_agg_key.local_ip = fe->key().dst_addr;
492  session_agg_key.server_port = fe->key().dst_port;
493  session_key.remote_ip = fe->key().src_addr;
494  session_key.client_port = fe->key().src_port;
495  session_endpoint_key.local_vn = dst_vn;
496  session_endpoint_key.remote_vn = src_vn;
497  }
498  session_endpoint_key.is_client_session = false;
499  } else {
500  return false;
501  }
502  session_agg_key.proto = fe->key().protocol;
503  session_key.uuid = fe->uuid();
504  return true;
505 }
506 
508  SessionFlowStatsInfo *session_flow) const {
509  session_flow->flow = fe;
510  session_flow->gen_id= fe->gen_id();
511  session_flow->flow_handle = fe->flow_handle();
512  session_flow->uuid = fe->uuid();
513  session_flow->total_bytes = 0;
514  session_flow->total_packets = 0;
515 }
516 
518  uint64_t setup_time, SessionStatsInfo *session) const {
519  FlowEntry *rfe = fe->reverse_flow_entry();
520 
521  session->setup_time = setup_time;
522  session->teardown_time = 0;
523  session->exported_atleast_once = false;
524  UpdateSessionFlowStatsInfo(fe, &session->fwd_flow);
525  UpdateSessionFlowStatsInfo(rfe, &session->rev_flow);
526 }
527 
528 static void BuildTraceTagList(const TagList &slist, vector<string> *dlist) {
529  TagList::const_iterator it = slist.begin();
530  while (it != slist.end()) {
531  dlist->push_back(integerToString(*it));
532  ++it;
533  }
534 }
535 
536 static void TraceSession(const string &op, const SessionEndpointKey &ep,
537  const SessionAggKey &agg, const SessionKey &session,
538  bool rev_flow_params) {
539  SessionTraceInfo info;
540  info.vmi = ep.vmi_cfg_name;
541  info.local_vn = ep.local_vn;
542  info.remote_vn = ep.remote_vn;
543  BuildTraceTagList(ep.local_tagset, &info.local_tagset);
544  BuildTraceTagList(ep.remote_tagset, &info.remote_tagset);
545  info.remote_prefix = ep.remote_prefix;
546  info.match_policy = ep.match_policy;
547  info.is_si = ep.is_si;
548  info.is_client = ep.is_client_session;
549  info.local_ip = agg.local_ip.to_string();
550  info.server_port = agg.server_port;
551  info.protocol = agg.proto;
552  info.remote_ip = session.remote_ip.to_string();
553  info.client_port = session.client_port;
554  info.flow_uuid = to_string(session.uuid);
555  SESSION_STATS_TRACE(Trace, op, info, rev_flow_params);
556 }
557 
558 void SessionStatsCollector::AddSession(FlowEntry* fe, uint64_t setup_time) {
559  SessionAggKey session_agg_key;
560  SessionEndpointInfo::SessionAggMap::iterator session_agg_map_iter;
561  SessionPreAggInfo session_agg_info;
562  SessionStatsInfo session;
563  SessionKey session_key;
564  SessionPreAggInfo::SessionMap::iterator session_map_iter;
565  SessionEndpointInfo session_endpoint_info = {};
566  SessionEndpointKey session_endpoint_key;
567  SessionEndpointMap::iterator session_endpoint_map_iter;
568  FlowEntry *fe_fwd = fe;
569  bool success;
570 
571  if (NULL == fe->reverse_flow_entry()) {
572  return;
573  }
574 
575  if (!(fe->is_flags_set(FlowEntry::LocalFlow))) {
577  fe_fwd = fe->reverse_flow_entry();
578  }
579  }
580 
581  success = GetSessionKey(fe_fwd, session_agg_key, session_key,
582  session_endpoint_key);
583  if (!success) {
584  return;
585  }
586 
587  /*
588  * If the flow is part of session endpoint DB then
589  * delete the existing one
590  * Flow add comes for the existing flow for the following cases
591  * - flow uuid changes (add-delete-add : will be compressed)
592  * - any key changes from session endpoint, aggregate or session
593  */
594  FlowSessionMap::iterator flow_session_map_iter;
595  flow_session_map_iter = flow_session_map_.find(fe_fwd);
596  if (flow_session_map_iter != flow_session_map_.end()) {
597 
598  FlowToSessionMap &flow_to_session_map = flow_session_map_iter->second;
599  FlowToSessionMap rhs_flow_to_session_map(session_key,
600  session_agg_key,
601  session_endpoint_key);
602  if (!(flow_to_session_map.IsEqual(rhs_flow_to_session_map))) {
603  DeleteSession(fe_fwd, flow_to_session_map.session_key().uuid,
604  GetCurrentTime(), NULL);
605  }
606  }
607 
608  TraceSession("Add", session_endpoint_key, session_agg_key, session_key,
609  false);
610  UpdateSessionStatsInfo(fe_fwd, setup_time, &session);
611 
612  session_endpoint_map_iter = session_endpoint_map_.find(
613  session_endpoint_key);
614  if (session_endpoint_map_iter == session_endpoint_map_.end()) {
615  session_agg_info.session_map_.insert(make_pair(session_key, session));
616  session_endpoint_info.session_agg_map_.insert(
617  make_pair(session_agg_key, session_agg_info));
618  session_endpoint_map_.insert(make_pair(session_endpoint_key,
619  session_endpoint_info));
620  AddFlowToSessionMap(fe_fwd, session_key, session_agg_key,
621  session_endpoint_key);
622  } else {
623  session_agg_map_iter = session_endpoint_map_iter->
624  second.session_agg_map_.find(
625  session_agg_key);
626  if (session_agg_map_iter ==
627  session_endpoint_map_iter->second.session_agg_map_.end()) {
628  session_agg_info.session_map_.insert(make_pair(session_key, session));
629  session_endpoint_map_iter->second.session_agg_map_.insert(
630  make_pair(session_agg_key, session_agg_info));
631  AddFlowToSessionMap(fe_fwd, session_key, session_agg_key,
632  session_endpoint_key);
633  } else {
634  session_map_iter =
635  session_agg_map_iter->second.session_map_.find(session_key);
636  if (session_map_iter ==
637  session_agg_map_iter->second.session_map_.end()) {
638  session_agg_map_iter->second.session_map_.insert(
639  make_pair(session_key, session));
640  AddFlowToSessionMap(fe_fwd, session_key, session_agg_key,
641  session_endpoint_key);
642  } else {
643  /*
644  * existing flow should match with the incoming add flow
645  */
646  assert(session.fwd_flow.uuid == fe_fwd->uuid());
647  }
648  }
649  }
650 }
651 
653  const boost::uuids::uuid &del_uuid,
654  uint64_t teardown_time,
655  const RevFlowDepParams *params) {
656  SessionAggKey session_agg_key;
657  SessionEndpointInfo::SessionAggMap::iterator session_agg_map_iter;
658  SessionPreAggInfo session_agg_info;
659  SessionKey session_key;
660  SessionPreAggInfo::SessionMap::iterator session_map_iter;
661  SessionEndpointInfo session_endpoint_info;
662  SessionEndpointKey session_endpoint_key;
663  SessionEndpointMap::iterator session_endpoint_map_iter;
664  bool read_flow = true;
665 
666  if (del_uuid != fe->uuid()) {
667  read_flow = false;
668  }
669 
670  /*
671  * If the given flow uuid is different from the existing one
672  * then ignore the read from flow
673  */
674  FlowSessionMap::iterator flow_session_map_iter;
675 
676  flow_session_map_iter = flow_session_map_.find(fe);
677  if (flow_session_map_iter != flow_session_map_.end()) {
678  if (del_uuid != flow_session_map_iter->second.session_key().uuid) {
679  /* We had never seen ADD for del_uuid, ignore delete request. This
680  * can happen when the following events occur
681  * 1. Add with UUID x
682  * 2. Delete with UUID x
683  * 3. Add with UUID y
684  * 4. Delete with UUID y
685  * When events 2 and 3 are suppressed and we receive only 1 and 4
686  * In this case initiated delete for entry with UUID x */
687  read_flow = false;
688  /* Reset params because they correspond to entry with UUID y */
689  params = NULL;
690  }
691  session_endpoint_key = flow_session_map_iter->second.session_endpoint_key();
692  session_agg_key = flow_session_map_iter->second.session_agg_key();
693  session_key = flow_session_map_iter->second.session_key();
694  } else {
695  return;
696  }
697  if (params && params->action_info_.action == 0) {
698  params = NULL;
699  }
700 
701  bool params_valid = true;
702  if (params == NULL) {
703  params_valid = false;
704  }
705 
706  TraceSession("Del", session_endpoint_key, session_agg_key, session_key,
707  params_valid);
708  session_endpoint_map_iter = session_endpoint_map_.find(
709  session_endpoint_key);
710  if (session_endpoint_map_iter != session_endpoint_map_.end()) {
711  session_agg_map_iter = session_endpoint_map_iter->
712  second.session_agg_map_.find(
713  session_agg_key);
714  if (session_agg_map_iter !=
715  session_endpoint_map_iter->second.session_agg_map_.end()) {
716  session_map_iter =
717  session_agg_map_iter->second.session_map_.find(session_key);
718  if (session_map_iter !=
719  session_agg_map_iter->second.session_map_.end()) {
720  /*
721  * Process the stats collector
722  */
723  session_map_iter->second.teardown_time = teardown_time;
724  session_map_iter->second.deleted = true;
725  /* Don't read stats for evicted flow, during delete */
726  if (!session_map_iter->second.evicted) {
727  SessionStatsChangedUnlocked(session_map_iter,
728  &session_map_iter->second.del_stats);
729  }
730  if (read_flow) {
731  CopyFlowInfo(session_map_iter->second, params);
732  }
733 
734  assert(session_map_iter->second.fwd_flow.flow.get() == fe);
736  session_map_iter->second.fwd_flow.flow = NULL;
737  session_map_iter->second.rev_flow.flow = NULL;
738  }
739  }
740  }
741 }
742 
744  uint32_t bytes,
745  uint32_t packets,
746  uint32_t oflow_bytes,
747  const boost::uuids::uuid &u) {
748  FlowSessionMap::iterator flow_session_map_iter;
749  SessionInfo session_info;
750  SessionIpPort session_key;
751  SessionAggInfo session_agg_info;
752  SessionIpPortProtocol session_agg_key;
753  SessionEndpointMap::iterator session_ep_map_iter;
754  SessionEndpointInfo::SessionAggMap::iterator session_agg_map_iter;
755  SessionPreAggInfo::SessionMap::iterator session_map_iter;
756 
757  /* TODO: Evicted msg coming for reverse flow. We currently don't have
758  * mapping from reverse_flow to flow_session_map */
759  flow_session_map_iter = flow_session_map_.find(flow.get());
760  if (flow_session_map_iter == flow_session_map_.end()) {
761  return;
762  }
763 
764  if (flow_session_map_iter->second.session_key().uuid != u) {
765  return;
766  }
767 
768  SessionEndpointKey session_db_ep_key = flow_session_map_iter->second.session_endpoint_key();
769  SessionAggKey session_db_agg_key = flow_session_map_iter->second.session_agg_key();
770  SessionKey session_db_key = flow_session_map_iter->second.session_key();
771 
772  session_ep_map_iter = session_endpoint_map_.find(session_db_ep_key);
773  if (session_ep_map_iter != session_endpoint_map_.end()) {
774  session_agg_map_iter = session_ep_map_iter->
775  second.session_agg_map_.find(
776  session_db_agg_key);
777  if (session_agg_map_iter !=
778  session_ep_map_iter->second.session_agg_map_.end()) {
779  session_map_iter =
780  session_agg_map_iter->second.session_map_.find(session_db_key);
781  if (session_map_iter !=
782  session_agg_map_iter->second.session_map_.end()) {
783  /*
784  * update the latest statistics
785  */
786  SessionFlowStatsInfo &session_flow = session_map_iter->second.fwd_flow;
787  uint64_t k_bytes, total_bytes, diff_bytes = 0;
788  uint64_t k_packets, total_packets, diff_packets = 0;
789  k_bytes = FlowStatsCollector::GetFlowStats((oflow_bytes & 0xFFFF),
790  bytes);
791  k_packets = FlowStatsCollector::GetFlowStats((oflow_bytes & 0xFFFF0000),
792  packets);
793  total_bytes = GetUpdatedSessionFlowBytes(session_flow.total_bytes,
794  k_bytes);
795  total_packets = GetUpdatedSessionFlowPackets(session_flow.total_packets,
796  k_packets);
797  diff_bytes = total_bytes - session_flow.total_bytes;
798  diff_packets = total_packets - session_flow.total_packets;
799  session_flow.total_bytes = total_bytes;
800  session_flow.total_packets = total_packets;
801 
802  SessionStatsParams &estats = session_map_iter->second.
803  evict_stats;
804  session_map_iter->second.evicted = true;
805  estats.fwd_flow.valid = true;
806  estats.fwd_flow.diff_bytes = diff_bytes;
807  estats.fwd_flow.diff_packets = diff_packets;
808  }
809  }
810  }
811 }
812 
814  SessionKey session_key,
815  SessionAggKey session_agg_key,
816  SessionEndpointKey session_endpoint_key) {
817  FlowToSessionMap flow_to_session_map(session_key, session_agg_key,
818  session_endpoint_key);
819  std::pair<FlowSessionMap::iterator, bool> ret =
820  flow_session_map_.insert(make_pair(fe, flow_to_session_map));
821  if (ret.second == false) {
822  FlowToSessionMap &prev = ret.first->second;
823  assert(prev.session_key().uuid == fe->uuid());
824  }
825 }
826 
828  FlowSessionMap::iterator flow_session_map_iter;
829  flow_session_map_iter = flow_session_map_.find(fe);
830  if (flow_session_map_iter != flow_session_map_.end()) {
831  flow_session_map_.erase(flow_session_map_iter);
832  }
833 }
834 
836  const {
837  /* If rate is not configured, it will have -1 as value
838  * -1 as value. In this case, pick the rate from SLO */
839  if (rate == -1) {
840  rate = slo->rate();
841  }
842  return rate;
843 }
844 
846  SessionSloState *state) {
847  vector<autogen::SecurityLoggingObjectRuleEntryType>::const_iterator it;
848  it = slo->rules().begin();
849  while (it != slo->rules().end()) {
850  state->UpdateSessionSloStateRuleEntry(it->rule_uuid, it->rate);
851  it++;
852  }
853 
854  SloRuleList::const_iterator acl_it = slo->firewall_policy_list().begin();
855  while (acl_it != slo->firewall_policy_list().end()) {
856  AclKey key(acl_it->uuid_);
857  AclDBEntry *acl = static_cast<AclDBEntry *>(agent_uve_->agent()->
858  acl_table()->FindActiveEntry(&key));
859  if (acl) {
860  int index = 0;
861  int rate = ComputeSloRate(acl_it->rate_, slo);
862  const AclEntry *ae = acl->GetAclEntryAtIndex(index);
863  while (ae != NULL) {
864  state->UpdateSessionSloStateRuleEntry(ae->uuid(), rate);
865  index++;
866  ae = acl->GetAclEntryAtIndex(index);
867  }
868  }
869  acl_it++;
870  }
871 
872  SloRuleList::const_iterator fw_rule_it;
873  fw_rule_it = slo->firewall_rule_list().begin();
874  while (fw_rule_it != slo->firewall_rule_list().end()) {
875  const SloRuleInfo &item = *fw_rule_it;
876  int rate = ComputeSloRate(item.rate_, slo);
877  state->UpdateSessionSloStateRuleEntry(to_string(item.uuid_), rate);
878  fw_rule_it++;
879  }
880 
881 }
882 
884  DBEntryBase *e) {
885  SecurityLoggingObject *slo = static_cast<SecurityLoggingObject *>(e);
886  SessionSloState *state =
887  static_cast<SessionSloState *>(slo->GetState(partition->parent(),
889  if (slo->IsDeleted()) {
890  if (!state)
891  return;
892  slo->ClearState(partition->parent(), slo_listener_id_);
893  delete state;
894  return;
895  }
896 
897  if (!state) {
898  state = new SessionSloState();
899  slo->SetState(slo->get_table(), slo_listener_id_, state);
900  }
901  UpdateSloStateRules(slo, state);
902 }
903 
905  int rate,
907  SessionSloRuleMap *slo_rule_map) {
908  SessionSloRuleEntry slo_rule_entry(rate, slo->uuid());
909  std::pair<SessionSloRuleMap::iterator, bool> ret;
910  ret = slo_rule_map->insert(make_pair(uuid,
911  slo_rule_entry));
912 }
913 
915  const std::vector<autogen::SecurityLoggingObjectRuleEntryType> &list,
917  SessionSloRuleMap *slo_rule_map) {
918  vector<autogen::SecurityLoggingObjectRuleEntryType>::const_iterator it;
919  it = list.begin();
920  while (it != list.end()) {
921  AddSessionSloRuleEntry(it->rule_uuid, it->rate, slo, slo_rule_map);
922  it++;
923  }
924 }
925 
927  SessionSloRuleMap *r_map) {
928  const SloRuleList &list = slo->firewall_policy_list();
929  SloRuleList::const_iterator acl_it = list.begin();
930  while (acl_it != list.end()) {
931  AclKey key(acl_it->uuid_);
932  AclDBEntry *acl = static_cast<AclDBEntry *>(agent_uve_->agent()->
933  acl_table()->FindActiveEntry(&key));
934  if (acl) {
935  int index = 0;
936  const AclEntry *ae = acl->GetAclEntryAtIndex(index);
937  int rate = ComputeSloRate(acl_it->rate_, slo);
938  while (ae != NULL) {
939  AddSessionSloRuleEntry(ae->uuid(), rate, slo, r_map);
940  index++;
941  ae = acl->GetAclEntryAtIndex(index);
942  }
943  }
944  acl_it++;
945  }
946 }
947 
949  SessionSloRuleMap *rule_map) {
950  const SloRuleList &list = slo->firewall_rule_list();
951  SloRuleList::const_iterator it = list.begin();
952  while (it != list.end()) {
953  int rate = ComputeSloRate(it->rate_, slo);
954  AddSessionSloRuleEntry(to_string(it->uuid_), rate, slo, rule_map);
955  it++;
956  }
957 }
958 
960  SessionSloRuleMap *slo_rule_map) {
961  AddSloRules(slo->rules(), slo, slo_rule_map);
962  AddSloFirewallPolicies(slo, slo_rule_map);
963  AddSloFirewallRules(slo, slo_rule_map);
964 }
965 
967  SessionSloRuleMap *slo_rule_map) {
968  SecurityLoggingObjectKey slo_key(uuid);
969  SecurityLoggingObject *slo = static_cast<SecurityLoggingObject *>
970  (agent_uve_->agent()->slo_table()->FindActiveEntry(&slo_key));
971  if (slo) {
972  if (slo->status()) {
973  AddSloEntryRules(slo, slo_rule_map);
974  }
975  }
976 }
977 
979  SessionSloRuleMap *slo_rule_map) {
980  UuidList::const_iterator sit = slo_list.begin();
981  while (sit != slo_list.end()) {
982  AddSloEntry(*sit, slo_rule_map);
983  sit++;
984  }
985 }
986 
988  SessionSloRuleMap *vmi_session_slo_rule_map,
989  SessionSloRuleMap *vn_session_slo_rule_map) {
990  if (fe == NULL) {
991  return;
992  }
993  const Interface *itf = fe->intf_entry();
994  if (!itf) {
995  return;
996  }
997  if (itf->type() != Interface::VM_INTERFACE) {
998  return;
999  }
1000  const VmInterface *vmi = static_cast<const VmInterface *>(itf);
1001  AddSloList(vmi->slo_list(), vmi_session_slo_rule_map);
1002  if (vmi->vn()) {
1003  AddSloList(vmi->vn()->slo_list(), vn_session_slo_rule_map);
1004  }
1005  return;
1006 }
1007 
1009  const SessionStatsInfo &stats_info,
1010  const FlowEntry *fe,
1011  SessionSloRuleMap *global_session_slo_rule_map,
1012  SessionSloRuleMap *vmi_session_slo_rule_map,
1013  SessionSloRuleMap *vn_session_slo_rule_map) {
1014 
1015  vmi_session_slo_rule_map->clear();
1016  vn_session_slo_rule_map->clear();
1017  global_session_slo_rule_map->clear();
1018 
1019  if (agent_uve_->agent()->oper_db()->global_vrouter()->slo_uuid() !=
1020  boost::uuids::nil_uuid()) {
1021  AddSloEntry(
1023  global_session_slo_rule_map);
1024  }
1025 
1026  if (stats_info.deleted) {
1027  AddSloList(stats_info.export_info.vmi_slo_list, vmi_session_slo_rule_map);
1028  AddSloList(stats_info.export_info.vn_slo_list, vn_session_slo_rule_map);
1029  } else {
1030  MakeSloList(fe, vmi_session_slo_rule_map, vn_session_slo_rule_map);
1031  }
1032 }
1033 
1035  const boost::uuids::uuid &slo_uuid,
1036  const std::string &match_uuid,
1037  bool *match) {
1038  SecurityLoggingObjectKey slo_key(slo_uuid);
1039  SecurityLoggingObject *slo = static_cast<SecurityLoggingObject *>
1040  (agent_uve_->agent()->slo_table()->FindActiveEntry(&slo_key));
1041  if (slo) {
1042  SessionSloState *state =
1043  static_cast<SessionSloState *>(slo->GetState(agent_uve_->agent()->slo_table(),
1044  slo_listener_id_));
1045  if (state) {
1046  return state->UpdateSessionSloStateRuleRefCount(match_uuid, match);
1047  }
1048  }
1049  *match = false;
1050  return false;
1051 }
1052 
1054  const std::string &policy_uuid,
1055  const bool &deleted_flag,
1056  bool *match,
1057  const bool &exported_once) {
1058  SessionSloRuleMap::const_iterator it;
1059  if (!policy_uuid.empty()) {
1060  it = map.find(policy_uuid);
1061  if (it != map.end()) {
1062  /* Always logging tear down session, which is exported atleast once
1063  * earlier will be logged other tear down sessions will be reported
1064  * with SLO rate checking
1065  */
1066  if (deleted_flag && exported_once) {
1067  *match = true;
1068  return true;
1069  }
1070  return UpdateSloMatchRuleEntry(it->second.slo_uuid, policy_uuid, match);
1071  }
1072  }
1073  *match = false;
1074  return false;
1075 }
1076 
1078  const std::string &fw_policy_uuid,
1079  const std::string &nw_policy_uuid,
1080  const std::string &sg_policy_uuid,
1081  const bool &deleted_flag,
1082  bool *match,
1083  const bool &exported_once) {
1084  SessionSloRuleMap::const_iterator it;
1085  bool fw_logged = false, nw_logged = false, sg_logged = false;
1086  bool fw_match = false, nw_match = false, sg_match = false;
1087 
1088  fw_logged = CheckPolicyMatch(map, fw_policy_uuid, deleted_flag,
1089  &fw_match, exported_once);
1090  nw_logged = CheckPolicyMatch(map, nw_policy_uuid, deleted_flag,
1091  &nw_match, exported_once);
1092  sg_logged = CheckPolicyMatch(map, sg_policy_uuid, deleted_flag,
1093  &sg_match, exported_once);
1094 
1095  if (fw_match || nw_match || sg_match) {
1096  *match = true;
1097  } else {
1098  *match = false;
1099  }
1100 
1101  if (fw_logged || nw_logged || sg_logged) {
1102  return true;
1103  }
1104  return false;
1105 }
1106 
1108  const SessionStatsInfo &stats_info,
1109  const FlowEntry *fe,
1110  const std::string &fw_policy_uuid,
1111  const std::string &nw_policy_uuid,
1112  const std::string &sg_policy_uuid,
1113  const bool &deleted_flag,
1114  bool *logged,
1115  const bool &exported_once) {
1116 
1117  bool is_vmi_slo_logged, is_vn_slo_logged, is_global_slo_logged;
1118  bool vmi_slo_match, vn_slo_match, global_slo_match;
1119  SessionSloRuleMap vmi_session_slo_rule_map;
1120  SessionSloRuleMap vn_session_slo_rule_map;
1121  SessionSloRuleMap global_session_slo_rule_map;
1122 
1123  /*
1124  * Get the list of slos need to be matched for the given flow
1125  */
1126  BuildSloList(stats_info, fe,
1127  &global_session_slo_rule_map,
1128  &vmi_session_slo_rule_map,
1129  &vn_session_slo_rule_map);
1130  /*
1131  * Match each type of policy for the given flow against the slo list
1132  */
1133 
1134  is_vmi_slo_logged = FindSloMatchRule(vmi_session_slo_rule_map,
1135  fw_policy_uuid,
1136  nw_policy_uuid,
1137  sg_policy_uuid,
1138  deleted_flag,
1139  &vmi_slo_match,
1140  exported_once);
1141 
1142  is_vn_slo_logged = FindSloMatchRule(vn_session_slo_rule_map,
1143  fw_policy_uuid,
1144  nw_policy_uuid,
1145  sg_policy_uuid,
1146  deleted_flag,
1147  &vn_slo_match,
1148  exported_once);
1149 
1150  is_global_slo_logged = FindSloMatchRule(global_session_slo_rule_map,
1151  fw_policy_uuid,
1152  nw_policy_uuid,
1153  sg_policy_uuid,
1154  deleted_flag,
1155  &global_slo_match,
1156  exported_once);
1157  if ((is_vmi_slo_logged) ||
1158  (is_vn_slo_logged) ||
1159  (is_global_slo_logged)) {
1160  *logged = true;
1161  }
1162  if (vmi_slo_match || vn_slo_match || global_slo_match) {
1163  return true;
1164  } else {
1165  return false;
1166  }
1167 }
1168 
1170  const SessionFlowExportInfo &flow_info,
1171  std::string &fw_policy_uuid,
1172  std::string &nw_policy_uuid,
1173  std::string &sg_policy_uuid) {
1174  fw_policy_uuid = flow_info.aps_rule_uuid;
1175  sg_policy_uuid = flow_info.sg_rule_uuid;
1176  nw_policy_uuid = flow_info.nw_ace_uuid;
1177  return;
1178 }
1179 
1181  const FlowEntry *fe,
1182  std::string &fw_policy_uuid,
1183  std::string &nw_policy_uuid,
1184  std::string &sg_policy_uuid) {
1185  fw_policy_uuid = fe->fw_policy_uuid();
1186  sg_policy_uuid = fe->sg_rule_uuid();
1187  nw_policy_uuid = fe->nw_ace_uuid();
1188  return;
1189 }
1190 
1192  const SessionStatsInfo &stats_info,
1193  const FlowEntry *fe,
1194  bool *logged,
1195  const bool &exported_once) {
1196 
1197  bool matched = false, deleted_flag=false;
1198  std::string fw_policy_uuid = "", nw_policy_uuid = "", sg_policy_uuid = "";
1199 
1201  fw_policy_uuid,
1202  nw_policy_uuid,
1203  sg_policy_uuid);
1204 
1205  matched = MatchSloForFlow(stats_info,
1206  fe,
1207  fw_policy_uuid,
1208  nw_policy_uuid,
1209  sg_policy_uuid,
1210  deleted_flag,
1211  logged,
1212  exported_once);
1213 
1214  return matched;
1215 }
1216 
1218  const SessionStatsInfo &stats_info,
1219  const SessionFlowExportInfo &flow_info,
1220  bool *logged,
1221  const bool &exported_once) {
1222 
1223  bool matched = false, deleted_flag = true;
1224  std::string fw_policy_uuid = "", nw_policy_uuid = "", sg_policy_uuid = "";
1225 
1226  GetPolicyIdFromDeletedFlow(flow_info,
1227  fw_policy_uuid,
1228  nw_policy_uuid,
1229  sg_policy_uuid);
1230 
1231  matched = MatchSloForFlow(stats_info,
1232  NULL,
1233  fw_policy_uuid,
1234  nw_policy_uuid,
1235  sg_policy_uuid,
1236  deleted_flag,
1237  logged,
1238  exported_once);
1239 
1240  return matched;
1241 }
1242 
1244  const SessionStatsInfo &stats_info) {
1245 
1246  bool logged = false;
1247  const SessionExportInfo &info = stats_info.export_info;
1248 
1249  /*
1250  * Deleted flow need to to be just checked whether SLO rules matched
1251  * If SLO is macthed, it should be logged irrespective of the rate
1252  */
1253  if (DeletedFlowLogging(stats_info,
1254  info.fwd_flow,
1255  &logged,
1256  stats_info.exported_atleast_once)) {
1257  CheckFlowLogging(logged);
1258  } else if (DeletedFlowLogging(stats_info,
1259  info.rev_flow,
1260  &logged,
1261  stats_info.exported_atleast_once)) {
1262  CheckFlowLogging(logged);
1263  }
1264  return false;
1265 }
1266 
1268  const SessionStatsInfo &stats_info) {
1269  bool logged = false;
1270 
1271  /*
1272  * FWD and REV flow of the Session need to be checked for SLO
1273  * separately. If FWD flow matches or logged then rev flow
1274  * is not required to check for SLO match.
1275  * REV flow will be checked for SLO only when FWD flow
1276  * is not matched for the SLO, since SLO is per session
1277  */
1278 
1279  if (FlowLogging(stats_info,
1280  stats_info.fwd_flow.flow.get(),
1281  &logged,
1282  stats_info.exported_atleast_once)) {
1283  CheckFlowLogging(logged);
1284  } else if (FlowLogging(stats_info,
1285  stats_info.rev_flow.flow.get(),
1286  &logged,
1287  stats_info.exported_atleast_once)) {
1288  CheckFlowLogging(logged);
1289  }
1290  return false;
1291 }
1292 
1294  const SessionStatsInfo &stats_info) {
1295 
1296  if (!agent_uve_->agent()->global_slo_status()) {
1297  /* SLO is not enabled */
1299  return false;
1300  }
1301 
1302  /*
1303  * Deleted flow will be logged if SLO is configured.
1304  * Normal case will be logged only when there is a change in the
1305  * stats. If there is no change in the session stats, it will
1306  * not be considered to SLO match and rate. This will avoid logging
1307  * of each session at least once. Also, idle session will not be
1308  * considered for the rate count
1309  */
1310 
1311  if (stats_info.deleted) {
1312  if(HandleDeletedFlowLogging(stats_info)) {
1313  return true;
1314  }
1315  } else if(HandleFlowLogging(stats_info)) {
1316  return true;
1317  }
1318 
1320  return false;
1321 }
1322 
1324  uint64_t k_flow_bytes) const {
1325  uint64_t oflow_bytes = 0xffff000000000000ULL & info_bytes;
1326  uint64_t old_bytes = 0x0000ffffffffffffULL & info_bytes;
1327  if (old_bytes > k_flow_bytes) {
1328  oflow_bytes += 0x0001000000000000ULL;
1329  }
1330  return (oflow_bytes |= k_flow_bytes);
1331 }
1332 
1334  uint64_t info_packets,
1335  uint64_t k_flow_pkts) const {
1336  uint64_t oflow_pkts = 0xffffff0000000000ULL & info_packets;
1337  uint64_t old_pkts = 0x000000ffffffffffULL & info_packets;
1338  if (old_pkts > k_flow_pkts) {
1339  oflow_pkts += 0x0000010000000000ULL;
1340  }
1341  return (oflow_pkts |= k_flow_pkts);
1342 }
1343 
1345  const boost::uuids::uuid &u,
1346  FlowEntry *fe) const {
1347  if (fe->uuid() != u) {
1348  return;
1349  }
1351  info->action);
1352  info->sg_rule_uuid = fe->sg_rule_uuid();
1353  info->nw_ace_uuid = fe->nw_ace_uuid();
1354  info->aps_rule_uuid = fe->fw_policy_uuid();
1357  }
1358 }
1359 
1361  const RevFlowDepParams *params) {
1362  SessionExportInfo &info = session.export_info;
1363  FlowEntry *fe = session.fwd_flow.flow.get();
1364  FlowEntry *rfe = session.rev_flow.flow.get();
1365  info.valid = true;
1366 
1367  const Interface *itf = fe->intf_entry();
1368  if ((itf != NULL) && (itf->type() == Interface::VM_INTERFACE)) {
1369  const VmInterface *vmi = static_cast<const VmInterface *>(itf);
1370  if (vmi != NULL) {
1371  info.vmi_slo_list = vmi->slo_list();
1372  if (vmi->vn()) {
1373  info.vn_slo_list = vmi->vn()->slo_list();
1374  }
1375  }
1376  }
1377 
1378  if (fe->IsIngressFlow()) {
1379  info.vm_cfg_name = fe->data().vm_cfg_name;
1380  } else if (rfe) {
1381  /* TODO: vm_cfg_name should be passed in RevFlowDepParams because rfe
1382  * may now point to different UUID altogether */
1383  info.vm_cfg_name = rfe->data().vm_cfg_name;
1384  }
1385  string rid = agent_uve_->agent()->router_id().to_string();
1387  info.other_vrouter = rid;
1388  } else {
1389  info.other_vrouter = fe->peer_vrouter();
1390  }
1391  info.underlay_proto = fe->tunnel_type().GetType();
1392  CopyFlowInfoInternal(&info.fwd_flow, session.fwd_flow.uuid, fe);
1393  if (params) {
1395  info.rev_flow.action);
1396  info.rev_flow.sg_rule_uuid = params->sg_uuid_;
1397  info.rev_flow.nw_ace_uuid = params->nw_ace_uuid_;
1398  if (FlowEntry::ShouldDrop(params->action_info_.action)) {
1400  drop_reason_);
1401  }
1402  } else if (rfe) {
1403  CopyFlowInfoInternal(&info.rev_flow, session.rev_flow.uuid, rfe);
1404  }
1405 }
1406 
1408 (const SessionFlowStatsParams &stats, SessionFlowInfo *flow_info,
1409  bool is_sampling, bool is_logging) const {
1410  if (!stats.valid) {
1411  return;
1412  }
1413  flow_info->set_tcp_flags(stats.tcp_flags);
1414  flow_info->set_underlay_source_port(stats.underlay_src_port);
1415  if (is_sampling) {
1416  flow_info->set_sampled_pkts(stats.diff_packets);
1417  flow_info->set_sampled_bytes(stats.diff_bytes);
1418  }
1419  if (is_logging) {
1420  flow_info->set_logged_pkts(stats.diff_packets);
1421  flow_info->set_logged_bytes(stats.diff_bytes);
1422  }
1423 }
1424 
1426 (const SessionFlowStatsInfo &session_flow, const SessionStatsInfo &sinfo,
1427  const SessionFlowExportInfo &einfo, SessionFlowInfo *flow_info) const {
1428  FlowEntry *fe = session_flow.flow.get();
1429  std::string action_str, drop_reason = "";
1430 
1431  flow_info->set_flow_uuid(session_flow.uuid);
1432  flow_info->set_setup_time(sinfo.setup_time);
1433  if (sinfo.teardown_time) {
1434  flow_info->set_teardown_time(sinfo.teardown_time);
1435  flow_info->set_teardown_bytes(session_flow.total_bytes);
1436  flow_info->set_teardown_pkts(session_flow.total_packets);
1437  }
1438  if (sinfo.deleted) {
1439  if (!sinfo.export_info.valid) {
1440  return;
1441  }
1442  if (!einfo.action.empty()) {
1443  flow_info->set_action(einfo.action);
1444  }
1445  if (!einfo.sg_rule_uuid.empty()) {
1446  flow_info->set_sg_rule_uuid(StringToUuid(einfo.sg_rule_uuid));
1447  }
1448  if (!einfo.nw_ace_uuid.empty()) {
1449  flow_info->set_nw_ace_uuid(StringToUuid(einfo.nw_ace_uuid));
1450  }
1451  if (!einfo.drop_reason.empty()) {
1452  flow_info->set_drop_reason(einfo.drop_reason);
1453  }
1454  } else {
1456  action_str);
1457  flow_info->set_action(action_str);
1458  flow_info->set_sg_rule_uuid(StringToUuid(fe->sg_rule_uuid()));
1459  flow_info->set_nw_ace_uuid(StringToUuid(fe->nw_ace_uuid()));
1461  drop_reason = FlowEntry::DropReasonStr(fe->data().drop_reason);
1462  flow_info->set_drop_reason(drop_reason);
1463  }
1464  }
1465 }
1466 
1468  SessionPreAggInfo::SessionMap::iterator session_map_iter) {
1469  FlowEntry *fe = session_map_iter->second.fwd_flow.flow.get();
1470  FlowEntry *rfe = session_map_iter->second.rev_flow.flow.get();
1472  if (fe->deleted()) {
1473  DeleteSession(fe, session_map_iter->first.uuid,
1474  GetCurrentTime(), NULL);
1475  return true;
1476  }
1477  return false;
1478 }
1479 
1481  (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1482  SessionStatsParams *params) const {
1483  FlowEntry *fe = session_map_iter->second.fwd_flow.flow.get();
1484  FlowEntry *rfe = session_map_iter->second.rev_flow.flow.get();
1486  return SessionStatsChangedUnlocked(session_map_iter, params);
1487 }
1488 
1490  (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1491  SessionStatsParams *params) const {
1492 
1493  bool fwd_updated = FetchFlowStats(&session_map_iter->second.fwd_flow,
1494  &params->fwd_flow);
1495  bool rev_updated = FetchFlowStats(&session_map_iter->second.rev_flow,
1496  &params->rev_flow);
1497  return (fwd_updated || rev_updated);
1498 }
1499 
1502  vr_flow_stats k_stats;
1503  KFlowData kinfo;
1504  uint64_t k_bytes, bytes, k_packets;
1505  const vr_flow_entry *k_flow = NULL;
1506  KSyncFlowMemory *ksync_obj = agent_uve_->agent()->ksync()->
1507  ksync_flow_memory();
1508  /* Update gen-id and flow-handle before reading stats using them. For
1509  * reverse-flow, it is possible that flow-handle is not set yet
1510  */
1511  FlowEntry *fe = info->flow.get();
1512  if (fe && (info->uuid == fe->uuid())) {
1513  info->flow_handle = fe->flow_handle();
1514  info->gen_id = fe->gen_id();
1515  }
1516 
1517  k_flow = ksync_obj->GetKFlowStatsAndInfo(info->flow->key(),
1518  info->flow_handle,
1519  info->gen_id, &k_stats, &kinfo);
1520  if (!k_flow) {
1521  SandeshFlowKey skey;
1522  skey.set_nh(info->flow->key().nh);
1523  skey.set_sip(info->flow->key().src_addr.to_string());
1524  skey.set_dip(info->flow->key().dst_addr.to_string());
1525  skey.set_src_port(info->flow->key().src_port);
1526  skey.set_dst_port(info->flow->key().dst_port);
1527  skey.set_protocol(info->flow->key().protocol);
1528  SESSION_STATS_TRACE(Err, "Fetching stats failed", info->flow_handle,
1529  info->gen_id, skey);
1530  return false;
1531  }
1532 
1533  k_bytes = FlowStatsCollector::GetFlowStats(k_stats.flow_bytes_oflow,
1534  k_stats.flow_bytes);
1535  k_packets = FlowStatsCollector::GetFlowStats(k_stats.flow_packets_oflow,
1536  k_stats.flow_packets);
1537 
1538  bytes = 0x0000ffffffffffffULL & info->total_bytes;
1539 
1540  if (bytes != k_bytes) {
1541  uint64_t total_bytes = GetUpdatedSessionFlowBytes(info->total_bytes,
1542  k_bytes);
1543  uint64_t total_packets = GetUpdatedSessionFlowPackets
1544  (info->total_packets, k_packets);
1545  params->diff_bytes = total_bytes - info->total_bytes;
1546  params->diff_packets = total_packets - info->total_packets;
1547  info->total_bytes = total_bytes;
1548  info->total_packets = total_packets;
1549  params->tcp_flags = kinfo.tcp_flags;
1550  params->underlay_src_port = kinfo.underlay_src_port;
1551  params->valid = true;
1552  return true;
1553  }
1554  return false;
1555 }
1556 
1558  (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1559  const SessionStatsParams &stats, SessionInfo *session_info,
1560  SessionIpPort *session_key, bool is_sampling, bool is_logging) const {
1561  FlowEntry *fe = session_map_iter->second.fwd_flow.flow.get();
1562  FlowEntry *rfe = session_map_iter->second.rev_flow.flow.get();
1564  FillSessionInfoUnlocked(session_map_iter, stats, session_info, session_key, NULL,
1565  true, is_sampling, is_logging);
1566 }
1567 
1569  (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1570  SessionInfo *session_info, bool is_sampling, bool is_logging) const {
1571  const SessionStatsParams &estats = session_map_iter->second.evict_stats;
1572  if (!estats.fwd_flow.valid) {
1573  return;
1574  }
1575  if (is_logging) {
1576  session_info->forward_flow_info.set_logged_pkts(estats.fwd_flow.
1577  diff_packets);
1578  session_info->forward_flow_info.set_logged_bytes(estats.fwd_flow.
1579  diff_bytes);
1580  /* TODO: Evict stats for reverse flow is not supported yet */
1581  session_info->reverse_flow_info.set_logged_pkts(0);
1582  session_info->reverse_flow_info.set_logged_bytes(0);
1583  }
1584  if (is_sampling) {
1585  session_info->forward_flow_info.set_sampled_pkts(estats.fwd_flow.
1586  diff_packets);
1587  session_info->forward_flow_info.set_sampled_bytes(estats.fwd_flow.
1588  diff_bytes);
1589  /* TODO: Evict stats for reverse flow is not supported yet */
1590  session_info->reverse_flow_info.set_sampled_pkts(0);
1591  session_info->reverse_flow_info.set_sampled_bytes(0);
1592  }
1593 }
1594 
1596  (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1597  const SessionStatsParams &stats,
1598  SessionInfo *session_info,
1599  SessionIpPort *session_key,
1600  const RevFlowDepParams *params,
1601  bool read_flow, bool is_sampling, bool is_logging) const {
1602  string rid = agent_uve_->agent()->router_id().to_string();
1603  FlowEntry *fe = session_map_iter->second.fwd_flow.flow.get();
1604  FlowEntry *rfe = session_map_iter->second.rev_flow.flow.get();
1605  boost::system::error_code ec;
1606  /*
1607  * Fill the session Key
1608  */
1609  session_key->set_ip(session_map_iter->first.remote_ip);
1610  session_key->set_port(session_map_iter->first.client_port);
1611  FillSessionFlowInfo(session_map_iter->second.fwd_flow,
1612  session_map_iter->second,
1613  session_map_iter->second.export_info.fwd_flow,
1614  &session_info->forward_flow_info);
1615  FillSessionFlowInfo(session_map_iter->second.rev_flow,
1616  session_map_iter->second,
1617  session_map_iter->second.export_info.rev_flow,
1618  &session_info->reverse_flow_info);
1619  bool first_time_export = false;
1620  if (!session_map_iter->second.exported_atleast_once) {
1621  first_time_export = true;
1622  /* Mark the flow as exported */
1623  session_map_iter->second.exported_atleast_once = true;
1624  }
1625 
1626  const bool &evicted = session_map_iter->second.evicted;
1627  const bool &deleted = session_map_iter->second.deleted;
1628  if (evicted) {
1629  const SessionStatsParams &estats = session_map_iter->second.evict_stats;
1630  FillSessionEvictStats(session_map_iter, session_info, is_sampling,
1631  is_logging);
1632  flow_stats_manager_->UpdateSessionExportStats(1, first_time_export,
1633  estats.sampled);
1634  } else {
1635  const SessionStatsParams *real_stats = &stats;
1636  if (deleted) {
1637  real_stats = &session_map_iter->second.del_stats;
1638  }
1639  FillSessionFlowStats(real_stats->fwd_flow,
1640  &session_info->forward_flow_info, is_sampling,
1641  is_logging);
1642  FillSessionFlowStats(real_stats->rev_flow,
1643  &session_info->reverse_flow_info, is_sampling,
1644  is_logging);
1645  flow_stats_manager_->UpdateSessionExportStats(1, first_time_export,
1646  real_stats->sampled);
1647  }
1648  if (deleted) {
1649  SessionExportInfo &info = session_map_iter->second.export_info;
1650  if (info.valid) {
1651  if (!info.vm_cfg_name.empty()) {
1652  session_info->set_vm(info.vm_cfg_name);
1653  }
1654  session_info->set_other_vrouter_ip(
1655  AddressFromString(info.other_vrouter, &ec));
1656  session_info->set_underlay_proto(info.underlay_proto);
1657  }
1658  } else {
1659  session_info->set_vm(fe->data().vm_cfg_name);
1661  session_info->set_other_vrouter_ip(AddressFromString(rid, &ec));
1662  } else {
1663  /* For Egress flows, pick VM name from reverse flow */
1664  if (!fe->IsIngressFlow() && rfe) {
1665  session_info->set_vm(rfe->data().vm_cfg_name);
1666  }
1667  session_info->set_other_vrouter_ip(
1668  AddressFromString(fe->peer_vrouter(), &ec));
1669  }
1670  session_info->set_underlay_proto(fe->tunnel_type().GetType());
1671  }
1672 }
1673 
1674 void SessionStatsCollector::UpdateAggregateStats(const SessionInfo &sinfo,
1675  SessionAggInfo *agg_info,
1676  bool is_sampling,
1677  bool is_logging) const {
1678  if (is_sampling) {
1679  agg_info->set_sampled_forward_bytes(agg_info->get_sampled_forward_bytes() +
1680  sinfo.get_forward_flow_info().get_sampled_bytes());
1681  agg_info->set_sampled_forward_pkts(agg_info->get_sampled_forward_pkts() +
1682  sinfo.get_forward_flow_info().get_sampled_pkts());
1683  agg_info->set_sampled_reverse_bytes(agg_info->get_sampled_reverse_bytes() +
1684  sinfo.get_reverse_flow_info().get_sampled_bytes());
1685  agg_info->set_sampled_reverse_pkts(agg_info->get_sampled_reverse_pkts() +
1686  sinfo.get_reverse_flow_info().get_sampled_pkts());
1687  }
1688  if (is_logging) {
1689  agg_info->set_logged_forward_bytes(agg_info->get_logged_forward_bytes() +
1690  sinfo.get_forward_flow_info().get_logged_bytes());
1691  agg_info->set_logged_forward_pkts(agg_info->get_logged_forward_pkts() +
1692  sinfo.get_forward_flow_info().get_logged_pkts());
1693  agg_info->set_logged_reverse_bytes(agg_info->get_logged_reverse_bytes() +
1694  sinfo.get_reverse_flow_info().get_logged_bytes());
1695  agg_info->set_logged_reverse_pkts(agg_info->get_logged_reverse_pkts() +
1696  sinfo.get_reverse_flow_info().get_logged_pkts());
1697  }
1698 }
1699 
1701 (SessionEndpointInfo::SessionAggMap::iterator it, SessionIpPortProtocol *key)
1702  const {
1703  /*
1704  * Fill the session agg key
1705  */
1706  key->set_local_ip(it->first.local_ip);
1707  key->set_service_port(it->first.server_port);
1708  key->set_protocol(it->first.proto);
1709 }
1710 
1712  SessionEndpoint *ep) const {
1713  UveTagData tinfo(UveTagData::SET);
1714  agent_uve_->BuildTagNamesFromList(list, &tinfo);
1715  if (!tinfo.application.empty()) {
1716  ep->set_application(tinfo.application);
1717  }
1718  if (!tinfo.tier.empty()) {
1719  ep->set_tier(tinfo.tier);
1720  }
1721  if (!tinfo.site.empty()) {
1722  ep->set_site(tinfo.site);
1723  }
1724  if (!tinfo.deployment.empty()) {
1725  ep->set_deployment(tinfo.deployment);
1726  }
1727  if (tinfo.label_set.size() != 0) {
1728  ep->set_labels(tinfo.label_set);
1729  }
1730  if (tinfo.custom_tag_set.size() != 0) {
1731  ep->set_custom_tags(tinfo.custom_tag_set);
1732  }
1733 }
1734 
1736  SessionEndpoint *ep) const {
1737  UveTagData tinfo(UveTagData::SET);
1738  agent_uve_->BuildTagIdsFromList(list, &tinfo);
1739  if (!tinfo.application.empty()){
1740  ep->set_remote_application(tinfo.application);
1741  }
1742  if (!tinfo.tier.empty()){
1743  ep->set_remote_tier(tinfo.tier);
1744  }
1745  if (!tinfo.site.empty()){
1746  ep->set_remote_site(tinfo.site);
1747  }
1748  if (!tinfo.deployment.empty()){
1749  ep->set_remote_deployment(tinfo.deployment);
1750  }
1751  if (tinfo.label_set.size() != 0) {
1752  ep->set_remote_labels(tinfo.label_set);
1753  }
1754  if (tinfo.custom_tag_set.size() != 0) {
1755  ep->set_remote_custom_tags(tinfo.custom_tag_set);
1756  }
1757 }
1758 
1759 void SessionStatsCollector::FillSessionEndpoint(SessionEndpointMap::iterator it,
1760  SessionEndpoint *session_ep)
1761  const {
1762  string rid = agent_uve_->agent()->router_id().to_string();
1763  boost::system::error_code ec;
1764 
1765  session_ep->set_vmi(it->first.vmi_cfg_name);
1766  session_ep->set_vn(it->first.local_vn);
1767  session_ep->set_remote_vn(it->first.remote_vn);
1768  session_ep->set_is_client_session(it->first.is_client_session);
1769  session_ep->set_is_si(it->first.is_si);
1770  if (!it->first.remote_prefix.empty()) {
1771  session_ep->set_remote_prefix(it->first.remote_prefix);
1772  }
1773  session_ep->set_security_policy_rule(it->first.match_policy);
1774  if (it->first.local_tagset.size() > 0) {
1775  FillSessionTags(it->first.local_tagset, session_ep);
1776  }
1777  if (it->first.remote_tagset.size() > 0) {
1778  FillSessionRemoteTags(it->first.remote_tagset, session_ep);
1779  }
1780  session_ep->set_vrouter_ip(AddressFromString(rid, &ec));
1781 }
1782 
1784  (const SessionEndpointMap::iterator &it) {
1785  SessionEndpointInfo::SessionAggMap::iterator session_agg_map_iter;
1786  SessionEndpointInfo::SessionAggMap::iterator prev_agg_iter;
1787  SessionPreAggInfo::SessionMap::iterator session_map_iter, prev;
1788 
1789  SessionInfo session_info;
1790  SessionIpPort session_key;
1791  uint32_t session_count = 0, session_agg_count = 0;
1792  bool exit = false, ep_completed = true;
1793 
1794  SessionEndpoint &session_ep = session_msg_list_[GetSessionMsgIdx()];
1795 
1796  session_agg_map_iter = it->second.session_agg_map_.
1797  lower_bound(session_agg_iteration_key_);
1798  while (session_agg_map_iter != it->second.session_agg_map_.end()) {
1799  SessionAggInfo session_agg_info;
1800  SessionIpPortProtocol session_agg_key;
1801  session_count = 0;
1802  session_map_iter = session_agg_map_iter->second.session_map_.
1803  lower_bound(session_iteration_key_);
1804  while (session_map_iter != session_agg_map_iter->second.session_map_.end()) {
1805  prev = session_map_iter;
1806  SessionStatsParams params;
1807  if (!session_map_iter->second.deleted &&
1808  !session_map_iter->second.evicted) {
1809  bool delete_marked = CheckAndDeleteSessionStatsFlow(session_map_iter);
1810  if (!delete_marked) {
1811  bool changed = SessionStatsChangedLocked(session_map_iter,
1812  &params);
1813  if (!changed && session_map_iter->second.exported_atleast_once) {
1814  ++session_map_iter;
1815  continue;
1816  }
1817  }
1818  }
1819 
1820  bool is_sampling = true;
1821  if (IsSamplingEnabled()) {
1822  is_sampling = SampleSession(session_map_iter, &params);
1823  }
1824  bool is_logging = CheckSessionLogging(session_map_iter->second);
1825 
1826  /* Ignore session export if sampling & logging drop the session */
1827  if (!is_sampling && !is_logging) {
1828  ++session_map_iter;
1829  if (prev->second.deleted) {
1830  session_agg_map_iter->second.session_map_.erase(prev);
1831  }
1832  continue;
1833  }
1834  if (session_map_iter->second.deleted) {
1835  FillSessionInfoUnlocked(session_map_iter, params, &session_info,
1836  &session_key, NULL, true, is_sampling,
1837  is_logging);
1838  } else {
1839  FillSessionInfoLocked(session_map_iter, params, &session_info,
1840  &session_key, is_sampling, is_logging);
1841  }
1842  session_agg_info.sessionMap.insert(make_pair(session_key,
1843  session_info));
1844  UpdateAggregateStats(session_info, &session_agg_info, is_sampling,
1845  is_logging);
1846  ++session_map_iter;
1847  ++session_count;
1848  if (prev->second.deleted) {
1849  session_agg_map_iter->second.session_map_.erase(prev);
1850  }
1851  if (session_count ==
1852  agent_uve_->agent()->params()->max_sessions_per_aggregate()) {
1853  exit = true;
1854  break;
1855  }
1856  }
1857  if (session_count) {
1858  FillSessionAggInfo(session_agg_map_iter, &session_agg_key);
1859  session_ep.sess_agg_info.insert(make_pair(session_agg_key,
1860  session_agg_info));
1861  }
1862  if (exit) {
1863  break;
1864  }
1865  session_iteration_key_.Reset();
1866  prev_agg_iter = session_agg_map_iter;
1867  session_agg_map_iter++;
1868  if (prev_agg_iter->second.session_map_.size() == 0) {
1869  it->second.session_agg_map_.erase(prev_agg_iter);
1870  }
1871  ++session_agg_count;
1872  if (session_agg_count ==
1873  agent_uve_->agent()->params()->max_aggregates_per_session_endpoint()) {
1874  break;
1875  }
1876  }
1877  /* Don't export SessionEndpoint if there are 0 aggregates */
1878  if (session_ep.sess_agg_info.size()) {
1879  FillSessionEndpoint(it, &session_ep);
1880  EnqueueSessionMsg();
1881  }
1882 
1883  if (session_count ==
1884  agent_uve_->agent()->params()->max_sessions_per_aggregate()) {
1885  ep_completed = false;
1886  session_ep_iteration_key_ = it->first;
1887  session_agg_iteration_key_ = session_agg_map_iter->first;
1888  if (session_map_iter == session_agg_map_iter->second.session_map_.end()) {
1889  prev_agg_iter = session_agg_map_iter;
1890  ++session_agg_map_iter;
1891  if (prev_agg_iter->second.session_map_.size() == 0) {
1892  it->second.session_agg_map_.erase(prev_agg_iter);
1893  }
1894  if (session_agg_map_iter == it->second.session_agg_map_.end()) {
1895  /* session_iteration_key_ and session_agg_iteration_key_ are
1896  * both reset when ep_completed is returned as true in the
1897  * calling function */
1898  ep_completed = true;
1899  } else {
1900  session_iteration_key_.Reset();
1901  session_agg_iteration_key_ = session_agg_map_iter->first;
1902  }
1903  } else {
1904  session_iteration_key_ = session_map_iter->first;
1905  }
1906  } else if (session_agg_count ==
1907  agent_uve_->agent()->params()->
1908  max_aggregates_per_session_endpoint()) {
1909  ep_completed = false;
1910  session_ep_iteration_key_ = it->first;
1911  if (session_agg_map_iter == it->second.session_agg_map_.end()) {
1912  /* session_iteration_key_ and session_agg_iteration_key_ are both
1913  * reset when ep_completed is returned as true in the calling
1914  * function */
1915  ep_completed = true;
1916  } else {
1917  session_agg_iteration_key_ = session_agg_map_iter->first;
1918  session_iteration_key_.Reset();
1919  }
1920  }
1921  return ep_completed;
1922 }
1923 
1925  SessionEndpointMap::iterator it = session_endpoint_map_.
1926  lower_bound(session_ep_iteration_key_);
1927  if (it == session_endpoint_map_.end()) {
1928  it = session_endpoint_map_.begin();
1929  }
1930  if (it == session_endpoint_map_.end()) {
1931  return 0;
1932  }
1933 
1934  uint32_t count = 0;
1935  while (count < max_count) {
1936  if (it == session_endpoint_map_.end()) {
1937  break;
1938  }
1939 
1940  /* ProcessSessionEndpoint will build 1 SessionEndpoint message. This
1941  * may or may not include all aggregates and all sessions within each
1942  * aggregate. It returns true if the built message includes all
1943  * aggregates and all sessions of each aggregate */
1944  bool ep_completed = ProcessSessionEndpoint(it);
1945  ++count;
1946  if (ep_completed) {
1947  SessionEndpointMap::iterator prev = it;
1948  ++it;
1952  if (prev->second.session_agg_map_.size() == 0) {
1953  session_endpoint_map_.erase(prev);
1954  }
1955  }
1956  }
1957 
1958  //Send any pending session export messages
1960 
1961  // Update iterator for next pass
1962  if (it == session_endpoint_map_.end()) {
1964  } else {
1965  session_ep_iteration_key_ = it->first;
1966  }
1967 
1968  session_task_ = NULL;
1969  return count;
1970 }
1972 // Introspect routines
1974 //TBD
1975 
1977 // Session Stats task
1980  Task(ssc->task_id(), ssc->instance_id()), ssc_(ssc) {
1981 }
1982 
1984 }
1985 
1987  return "Session Stats Collector Task";
1988 }
1989 
1991  ssc_->RunSessionEndpointStats(kSessionsPerTask);
1992  return true;
1993 }
1994 
1996  int32_t cfg_rate = agent_uve_->agent()->oper_db()->global_vrouter()->
1997  flow_export_rate();
1998  if (cfg_rate == GlobalVrouter::kDisableSampling) {
1999  return false;
2000  }
2001  return true;
2002 }
2003 
2005  (SessionPreAggInfo::SessionMap::iterator session_map_iter,
2006  SessionStatsParams *params) const {
2007  int32_t cfg_rate = agent_uve_->agent()->oper_db()->global_vrouter()->
2008  flow_export_rate();
2009  /* If session export is disabled, update stats and return */
2010  if (!cfg_rate) {
2012  return false;
2013  }
2014  const bool &deleted = session_map_iter->second.deleted;
2015  const bool &evicted = session_map_iter->second.evicted;
2016  SessionStatsParams *stats = params;
2017  if (evicted) {
2018  stats = &session_map_iter->second.evict_stats;
2019  } else if (deleted) {
2020  stats = &session_map_iter->second.del_stats;
2021  }
2022  stats->sampled = false;
2023  /* For session-sampling diff_bytes should consider the diff bytes for both
2024  * forward and reverse flow */
2025  uint64_t diff_bytes = stats->fwd_flow.diff_bytes +
2026  stats->rev_flow.diff_bytes;
2027  const SessionStatsInfo &info = session_map_iter->second;
2028  /* Subject a flow to sampling algorithm only when all of below is met:-
2029  * a. actual session-export-rate is >= 80% of configured flow-export-rate.
2030  * This is done only for first time.
2031  * b. diff_bytes is lesser than the threshold
2032  * c. Flow-sample does not have teardown time or the sample for the flow is
2033  * not exported earlier.
2034  */
2035  bool subject_flows_to_algorithm = false;
2036  if ((diff_bytes < threshold()) &&
2037  (!info.teardown_time || !info.exported_atleast_once) &&
2039  flow_stats_manager_->session_export_rate() >= ((double)cfg_rate) * 0.8)
2041  subject_flows_to_algorithm = true;
2043  }
2044 
2045  if (subject_flows_to_algorithm) {
2046  double probability = diff_bytes/threshold();
2047  uint32_t num = rand() % threshold();
2048  if (num > diff_bytes) {
2049  /* Do not export the flow, if the random number generated is more
2050  * than the diff_bytes */
2052  /* The second part of the if condition below is not required but
2053  * added for better readability. It is not required because
2054  * exported_atleast_once() will always be false if teardown time is
2055  * set. If both teardown_time and exported_atleast_once are true we
2056  * will never be here */
2057  if (info.teardown_time && !info.exported_atleast_once) {
2058  /* This counter indicates the number of sessions that were
2059  * never exported */
2061  }
2062  return false;
2063  }
2064  stats->sampled = true;
2065  /* Normalize the diff_bytes and diff_packets reported using the
2066  * probability value */
2067  if (probability == 0) {
2068  stats->fwd_flow.diff_bytes = 0;
2069  stats->fwd_flow.diff_packets = 0;
2070  stats->rev_flow.diff_bytes = 0;
2071  stats->rev_flow.diff_packets = 0;
2072  } else {
2073  stats->fwd_flow.diff_bytes = stats->fwd_flow.diff_bytes/
2074  probability;
2075  stats->fwd_flow.diff_packets = stats->fwd_flow.diff_packets/
2076  probability;
2077  stats->rev_flow.diff_bytes = stats->rev_flow.diff_bytes/
2078  probability;
2079  stats->rev_flow.diff_packets = stats->rev_flow.diff_packets/
2080  probability;
2081  }
2082  }
2083 
2084  return true;
2085 }
2086 
2088  return flow_stats_manager_->threshold();
2089 }
2090 
2092 // SessionStatsCollectorObject methods
2095  FlowStatsManager *mgr) {
2096  for (int i = 0; i < kMaxSessionCollectors; i++) {
2097  uint32_t instance_id = mgr->AllocateIndex();
2098  boost::asio::io_context& io_ref =
2099  const_cast<boost::asio::io_context&>
2100  (*agent->event_manager()->io_service());
2101  collectors[i].reset(
2102  AgentStaticObjectFactory::CreateRef<SessionStatsCollector>(
2103  io_ref,
2104  agent->uve(), instance_id, mgr, this));
2105  }
2106 }
2107 
2109  if (idx >= 0 && idx < kMaxSessionCollectors) {
2110  return collectors[idx].get();
2111  }
2112  return NULL;
2113 }
2114 
2116  for (int i = 0; i < kMaxSessionCollectors; i++) {
2117  collectors[i]->set_expiry_time(time);
2118  }
2119 }
2120 
2122  /* Same expiry time would be configured for all the collectors. Pick value
2123  * from any one of them */
2124  return collectors[0]->expiry_time();
2125 }
2126 
2128  (const FlowEntry *flow) {
2129  uint8_t idx = 0;
2130  FlowTable *table = flow->flow_table();
2131  if (table) {
2132  idx = table->table_index() % kMaxSessionCollectors;
2133  }
2134  return collectors[idx].get();
2135 }
2136 
2138  for (int i = 0; i < kMaxSessionCollectors; i++) {
2139  collectors[i]->Shutdown();
2140  collectors[i].reset();
2141  }
2142 }
2143 
2145  size_t size = 0;
2146  for (int i = 0; i < kMaxSessionCollectors; i++) {
2147  size += collectors[i]->Size();
2148  }
2149  return size;
2150 }
2151 
2153  for (int i = 0; i < kMaxSessionCollectors; i++) {
2154  if (collectors[i].get()) {
2155  collectors[i].get()->RegisterDBClients();
2156  }
2157  }
2158 }
2159 
2161  FlowEntry *rflow = NULL;
2162  if (flow_.get()) {
2163  rflow = flow_->reverse_flow_entry();
2164  }
2165  return rflow;
2166 }
2167 
2169  if (!(session_key_.IsEqual(rhs.session_key()))) {
2170  return false;
2171  }
2172  if (!(session_agg_key_.IsEqual(rhs.session_agg_key()))) {
2173  return false;
2174  }
2175  if (!(session_endpoint_key_.IsEqual(rhs.session_endpoint_key()))) {
2176  return false;
2177  }
2178  return true;
2179 }
2180 
2182  SessionSloRuleStateMap::iterator it;
2183  it = session_rule_state_map_.find(uuid);
2184  if (it != session_rule_state_map_.end()) {
2185  session_rule_state_map_.erase(it);
2186  }
2187 }
2188 
2190  int rate) {
2191  SessionSloRuleStateMap::iterator it;
2192  it = session_rule_state_map_.find(uuid);
2193  if (it == session_rule_state_map_.end()) {
2194  SessionSloRuleState slo_state_rule_entry = {};
2195  slo_state_rule_entry.rate = rate;
2196  session_rule_state_map_.insert(make_pair(uuid, slo_state_rule_entry));
2197  } else {
2198  SessionSloRuleState &prev = it->second;
2199  if (prev.rate != rate) {
2200  prev.rate = rate;
2201  prev.ref_count = 0;
2202  }
2203  }
2204 
2205 }
2206 
2208  const std::string &uuid,
2209  bool *match) {
2210  SessionSloRuleStateMap::iterator it;
2211  bool is_logged = false;
2212  it = session_rule_state_map_.find(uuid);
2213  if (it != session_rule_state_map_.end()) {
2214  *match = true;
2215  if (it->second.ref_count == 0) {
2216  is_logged = true;
2217  }
2218  it->second.ref_count++;
2219  if (it->second.ref_count == it->second.rate) {
2220  it->second.ref_count = 0;
2221  }
2222  }
2223  return is_logged;
2224 }
bool MeasureQueueDelay()
Definition: agent.cc:1136
bool FlowLogging(const SessionStatsInfo &stats_info, const FlowEntry *fe, bool *logged, const bool &exported_once)
FlowEntry * reverse_flow() const
uint32_t session_export_count() const
uint16_t max_endpoints_per_session_msg() const
Definition: agent_param.h:447
const UuidList & slo_list() const
Definition: vn.h:222
void SetBounded(bool bounded)
Definition: queue_task.h:200
uint64_t GetUpdatedSessionFlowBytes(uint64_t info_bytes, uint64_t k_flow_bytes) const
void UpdateSessionSloStateRuleEntry(std::string uuid, int rate)
SessionEndpointKey session_endpoint_key()
bool GetSessionKey(FlowEntry *fe, SessionAggKey &session_agg_key, SessionKey &session_key, SessionEndpointKey &session_endpoint_key)
bool IsEqual(const SessionEndpointKey &rhs) const
std::string application
IpAddress src_addr
Definition: flow_entry.h:213
const TagList & local_tagset() const
Definition: flow_entry.cc:3502
void SetEntryCallback(TaskEntryCallback on_entry)
Definition: queue_task.h:299
void GetPolicyIdFromDeletedFlow(const SessionFlowExportInfo &flow_info, std::string &fw_policy_uuid, std::string &nw_policy_uuid, std::string &sg_policy_uuid)
tbb::atomic< bool > sessions_sampled_atleast_once_
Type type() const
Definition: interface.h:112
tbb::atomic< uint64_t > session_slo_logging_drops_
const vr_flow_entry * GetKFlowStatsAndInfo(const FlowKey &key, uint32_t idx, uint8_t gen_id, vr_flow_stats *stats, KFlowData *info) const
bool SampleSession(SessionPreAggInfo::SessionMap::iterator session_map_iter, SessionStatsParams *params) const
static uint64_t GetFlowStats(const uint16_t &oflow_data, const uint32_t &data)
KSync * ksync() const
Definition: ksync_memory.h:49
std::vector< SessionEndpoint > session_msg_list_
static boost::uuids::uuid StringToUuid(const std::string &str)
Definition: string_util.h:145
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
#define MAX_SSC_REQUEST_QUEUE_ITERATIONS
bool HandleDeletedFlowLogging(const SessionStatsInfo &stats_info)
void Shutdown(bool delete_entries=true)
Definition: queue_task.h:152
#define CheckFlowLogging(logged)
std::string sg_uuid_
Definition: flow_entry.h:62
DBState * GetState(DBTableBase *tbl_base, ListenerId listener) const
Definition: db_entry.cc:37
const std::string fw_policy_name_uuid() const
Definition: flow_entry.cc:3634
std::string dest_vn_match
Definition: flow_entry.h:295
#define SESSION_STATS_TRACE(obj,...)
std::string tier
FlowTable * flow_table() const
Definition: flow_entry.h:597
int ComputeSloRate(int rate, SecurityLoggingObject *slo) const
DBTableBase * get_table() const
Definition: db_entry.cc:119
IpAddress dst_addr
Definition: flow_entry.h:214
SessionFlowStatsParams rev_flow
static void BuildTraceTagList(const TagList &slist, vector< string > *dlist)
std::string source_vn_match
Definition: flow_entry.h:294
AgentUveBase * uve() const
Definition: agent.cc:909
#define DEFAULT_SSC_REQUEST_QUEUE_SIZE
std::vector< SloRuleInfo > SloRuleList
void GetPolicyIdFromFlow(const FlowEntry *fe, std::string &fw_policy_uuid, std::string &nw_policy_uuid, std::string &sg_policy_uuid)
#define kTaskSessionStatsCollectorEvent
Definition: agent.h:330
bool IsEqual(FlowToSessionMap &rhs)
void UpdateSessionStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void BuildSloList(const SessionStatsInfo &stats_info, const FlowEntry *fe, SessionSloRuleMap *global_session_slo_rule_map, SessionSloRuleMap *vmi_session_slo_rule_map, SessionSloRuleMap *vn_session_slo_rule_map)
std::map< std::string, SessionSloRuleEntry > SessionSloRuleMap
DBTable::ListenerId slo_listener_id_
bool SessionStatsChangedLocked(SessionPreAggInfo::SessionMap::iterator session_map_iter, SessionStatsParams *params) const
bool IsDeleted() const
Definition: db_entry.h:49
void SetState(DBTableBase *tbl_base, ListenerId listener, DBState *state)
Definition: db_entry.cc:22
boost::asio::ip::address IpAddress
Definition: address.h:13
Type GetType() const
Definition: nexthop.h:303
Definition: acl.h:62
static const uint32_t kSessionsPerTask
const UuidList & slo_list() const
void AddSloRules(const std::vector< autogen::SecurityLoggingObjectRuleEntryType > &list, SecurityLoggingObject *slo, SessionSloRuleMap *slo_rule_map)
uint32_t session_export_without_sampling_reset()
virtual void DispatchSessionMsg(const std::vector< SessionEndpoint > &lst)
static bool ShouldDrop(uint32_t action)
Definition: flow_entry.cc:1112
boost::asio::io_context * io_service()
Definition: event_manager.h:42
void DeleteSession(FlowEntry *fe, const boost::uuids::uuid &del_uuid, uint64_t teardown_time, const RevFlowDepParams *params)
MatchPolicy match_p
Definition: flow_entry.h:309
SessionStatsCollector(boost::asio::io_context &io, AgentUveBase *uve, uint32_t instance_id, FlowStatsManager *fsm, SessionStatsCollectorObject *obj)
void FillSessionEvictStats(SessionPreAggInfo::SessionMap::iterator session_map_iter, SessionInfo *session_info, bool is_sampling, bool is_logging) const
void FillSessionInfoUnlocked(SessionPreAggInfo::SessionMap::iterator session_map_iter, const SessionStatsParams &stats, SessionInfo *session_info, SessionIpPort *session_key, const RevFlowDepParams *params, bool read_flow, bool is_sampling, bool is_logging) const
DBTableBase * parent()
#define FLOW_LOCK(flow, rflow, flow_event)
Definition: flow_table.h:61
AgentDBEntry * FindActiveEntry(const DBEntry *key)
Definition: agent_db.cc:110
std::string site
void UpdateAggregateStats(const SessionInfo &sinfo, SessionAggInfo *agg_info, bool is_sampling, bool is_logging) const
boost::uuids::uuid uuid
void BuildTagIdsFromList(const TagList &tl, UveTagData *info) const
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
uint32_t action
Definition: acl.h:44
uint16_t table_index() const
Definition: flow_table.h:198
tbb::atomic< uint64_t > session_export_disable_drops_
bool CheckPolicyMatch(const SessionSloRuleMap &map, const std::string &policy_uuid, const bool &deleted_flag, bool *match, const bool &exported_once)
SessionEndpointKey session_ep_iteration_key_
void set_sessions_sampled_atleast_once()
tbb::atomic< uint64_t > session_global_slo_logging_drops_
uint8_t protocol
Definition: flow_entry.h:215
tbb::atomic< uint64_t > session_export_drops_
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
Definition: sandesh_trace.h:18
uint64_t threshold() const
SessionEndpointMap session_endpoint_map_
Agent * agent() const
#define kTaskSessionStatsCollector
Definition: agent.h:329
uint32_t session_export_count_reset()
void BuildTagNamesFromList(const TagList &tl, UveTagData *info) const
const std::string & sg_rule_uuid() const
Definition: flow_entry.h:633
const std::string & service_intf_type() const
OperDB * oper_db() const
Definition: agent.cc:1013
const std::string & peer_vrouter() const
Definition: flow_entry.h:642
boost::uuids::uuid uuid_
static const int32_t kDisableSampling
std::string nw_ace_uuid_
Definition: flow_entry.h:65
TunnelType tunnel_type() const
Definition: flow_entry.h:643
const FlowKey & key() const
Definition: flow_entry.h:594
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
Definition: db_table.cc:181
void AddSloFirewallRules(SecurityLoggingObject *slo, SessionSloRuleMap *rule_map)
bool IsEqual(const SessionKey &rhs) const
void SloNotify(DBTablePartBase *partition, DBEntryBase *e)
bool UpdateSessionSloStateRuleRefCount(const std::string &uuid, bool *matc)
uint16_t tcp_flags
bool HandleFlowLogging(const SessionStatsInfo &stats_info)
static const uint32_t kDefaultFlowSamplingThreshold
bool MatchSloForFlow(const SessionStatsInfo &stats_info, const FlowEntry *fe, const std::string &fw_policy_uuid, const std::string &nw_policy_uuid, const std::string &sg_policy_uuid, const bool &deleted_flag, bool *logged, const bool &exported_once)
uint32_t prev_cfg_flow_export_rate_
SloRuleList & firewall_policy_list()
GlobalVrouter * global_vrouter() const
Definition: operdb_init.h:54
void AddSloEntry(const boost::uuids::uuid &uuid, SessionSloRuleMap *slo_rule_map)
const std::vector< autogen::SecurityLoggingObjectRuleEntryType > & rules() const
bool is_flags_set(const FlowEntryFlags &flags) const
Definition: flow_entry.h:610
TaskScheduler * task_scheduler() const
Definition: agent.h:1120
void SetExitCallback(TaskExitCallback on_exit)
Definition: queue_task.h:303
static const std::string integerToString(const NumberType &num)
Definition: string_util.h:19
SessionStatsCollector * FlowToCollector(const FlowEntry *flow)
Definition: agent.h:358
void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams &params)
size_t Length() const
Definition: queue_task.h:356
const std::string fw_policy_uuid() const
Definition: flow_entry.cc:3630
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
const boost::uuids::uuid & uuid() const
Definition: flow_entry.h:631
Ip4Address router_id() const
Definition: agent.h:666
bool IsClientFlow()
Definition: flow_entry.cc:1421
bool CheckAndDeleteSessionStatsFlow(SessionPreAggInfo::SessionMap::iterator session_map_iter)
bool IsLess(const SessionAggKey &rhs) const
EventManager * event_manager() const
Definition: agent.h:1103
bool RequestHandler(boost::shared_ptr< SessionStatsReq > req)
void UpdateSessionMsgExportStats(uint32_t count)
void AddSessionSloRuleEntry(const std::string &uuid, int rate, SecurityLoggingObject *slo, SessionSloRuleMap *slo_rule_map)
void CopyFlowInfo(SessionStatsInfo &session, const RevFlowDepParams *params)
Definition: trace.h:220
SloRuleList & firewall_rule_list()
bool FetchFlowStats(SessionFlowStatsInfo *info, SessionFlowStatsParams *params) const
SessionFlowStatsInfo rev_flow
void AddSloList(const UuidList &slo_list, SessionSloRuleMap *slo_rule_map)
void DeleteFlowToSessionMap(FlowEntry *fe)
void FillSessionFlowInfo(const SessionFlowStatsInfo &session_flow, const SessionStatsInfo &sinfo, const SessionFlowExportInfo &einfo, SessionFlowInfo *flow_info) const
static std::string DropReasonStr(uint16_t reason)
Definition: flow_entry.cc:1477
std::string deployment
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
SessionStatsCollector * GetCollector(uint8_t idx) const
void FillSessionTags(const TagList &list, SessionEndpoint *ep) const
const VnEntry * vn() const
std::string origin_vn_src
Definition: flow_entry.h:296
AgentParam * params() const
Definition: agent.h:1218
static void GetFlowSandeshActionParams(const FlowAction &action_info, std::string &action_str)
Definition: flow_table.cc:944
static uint64_t GetCurrentTime()
SandeshTraceBufferPtr SessionStatsTraceBuf
FlowStatsManager * flow_stats_manager_
void AddSloEntryRules(SecurityLoggingObject *slo, SessionSloRuleMap *slo_rule_map)
uint16_t drop_reason
Definition: flow_entry.h:335
boost::uuids::uuid slo_uuid() const
FlowAction action_info
Definition: flow_entry.h:277
const boost::uuids::uuid & uuid() const
void FillSessionFlowStats(const SessionFlowStatsParams &stats, SessionFlowInfo *flow_info, bool is_sampling, bool is_logging) const
void ClearState(DBTableBase *tbl_base, ListenerId listener)
Definition: db_entry.cc:73
std::vector< boost::uuids::uuid > UuidList
Definition: agent.h:203
bool DeletedFlowLogging(const SessionStatsInfo &stats_info, const SessionFlowExportInfo &flow_info, bool *logged, const bool &exported_once)
void AddEvent(const FlowEntryPtr &flow)
void UpdateSessionFlowStatsInfo(FlowEntry *fe, SessionFlowStatsInfo *session_flow) const
std::set< string > custom_tag_set
const Interface * intf_entry() const
Definition: flow_entry.h:652
std::string origin_vn_dst
Definition: flow_entry.h:297
uint16_t src_port
Definition: flow_entry.h:216
bool session_debug_
const TagList & remote_tagset() const
Definition: flow_entry.cc:3509
bool global_slo_status() const
Definition: agent.h:1361
uint64_t prev_flow_export_rate_compute_time_
const AclEntry * GetAclEntryAtIndex(uint32_t) const
Definition: acl.cc:888
boost::uuids::uuid uuid
void AddFlowToSessionMap(FlowEntry *fe, SessionKey session_key, SessionAggKey session_agg_key, SessionEndpointKey session_endpoint_key)
void AddSloFirewallPolicies(SecurityLoggingObject *slo, SessionSloRuleMap *r_map)
std::string vm_cfg_name
Definition: flow_entry.h:357
void FillSessionInfoLocked(SessionPreAggInfo::SessionMap::iterator session_map_iter, const SessionStatsParams &stats, SessionInfo *session_info, SessionIpPort *session_key, bool is_sampling, bool is_logging) const
SessionFlowStatsInfo fwd_flow
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
SessionFlowExportInfo fwd_flow
uint32_t flow_handle() const
Definition: flow_entry.h:600
void MakeSloList(const FlowEntry *fe, SessionSloRuleMap *vmi_session_slo_rule_map, SessionSloRuleMap *vn_session_slo_rule_map)
bool IsLess(const SessionKey &rhs) const
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
bool IsEqual(const SessionAggKey &rhs) const
SecurityLoggingObjectTable * slo_table() const
Definition: agent.h:547
uint16_t dst_port
Definition: flow_entry.h:217
void CopyFlowInfoInternal(SessionFlowExportInfo *info, const boost::uuids::uuid &u, FlowEntry *fe) const
void UpdateSessionStatsInfo(FlowEntry *fe, uint64_t setup_time, SessionStatsInfo *session) const
SessionFlowStatsParams fwd_flow
SessionFlowExportInfo rev_flow
uint32_t RunSessionEndpointStats(uint32_t max_count)
const std::string & cfg_name() const
void UpdateThreshold(uint64_t new_value, bool check_oflow)
FlowEntry * reverse_flow_entry()
Definition: flow_entry.h:602
tbb::atomic< uint64_t > session_export_sampling_drops_
bool UpdateSloMatchRuleEntry(const boost::uuids::uuid &slo_uuid, const std::string &match_uuid, bool *match)
#define LOG(_Level, _Msg)
Definition: logging.h:33
SessionAggKey session_agg_iteration_key_
const std::string RemotePrefix() const
Definition: flow_entry.cc:3535
uint64_t GetUpdatedSessionFlowPackets(uint64_t info_packets, uint64_t k_flow_pkts) const
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29
FlowData & data()
Definition: flow_entry.h:595
bool SessionStatsChangedUnlocked(SessionPreAggInfo::SessionMap::iterator session_map_iter, SessionStatsParams *params) const
const std::string & nw_ace_uuid() const
Definition: flow_entry.h:636
bool deleted()
Definition: flow_entry.h:680
FlowAction action_info_
Definition: flow_entry.h:66
void AddSession(FlowEntry *fe, uint64_t setup_time)
tbb::atomic< uint32_t > session_export_count_
const std::string & uuid() const
Definition: acl_entry.h:121
bool IsServerFlow()
Definition: flow_entry.cc:1448
uint8_t gen_id() const
Definition: flow_entry.h:599
static void TraceSession(const string &op, const SessionEndpointKey &ep, const SessionAggKey &agg, const SessionKey &session, bool rev_flow_params)
void FillSessionAggInfo(SessionEndpointInfo::SessionAggMap::iterator it, SessionIpPortProtocol *session_agg_key) const
void UpdateSloStateRules(SecurityLoggingObject *slo, SessionSloState *state)
SessionStatsCollectorObject(Agent *agent, FlowStatsManager *mgr)
uint16_t underlay_src_port
uint32_t session_export_rate() const
void FillSessionEndpoint(SessionEndpointMap::iterator it, SessionEndpoint *session_ep) const
bool ProcessSessionEndpoint(const SessionEndpointMap::iterator &it)
bool Enqueue(QueueEntryT entry)
Definition: queue_task.h:248
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
void FillSessionRemoteTags(const TagList &list, SessionEndpoint *ep) const
void UpdateSessionSampleExportStats(uint32_t count)
static uint64_t GetCurrentTime()
void DeleteSessionSloStateRuleEntry(std::string uuid)
void EvictedSessionStatsUpdate(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void set_name(const std::string &name)
Definition: queue_task.h:307
bool CheckSessionLogging(const SessionStatsInfo &stats_info)
bool IsLess(const SessionEndpointKey &rhs) const
Definition: acl.h:92
bool IsIngressFlow() const
Definition: flow_entry.h:685
SessionAggKey session_agg_key()
SessionExportInfo export_info
std::set< string > label_set
void FreeIndex(uint32_t idx)
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
Definition: sandesh_trace.h:46
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
Definition: flow_entry.h:125
std::vector< int > TagList
Definition: agent.h:202
#define FLOW_EXPORT_STATS_TRACE(...)
static std::string UTCUsecToString(uint64_t tstamp)
Definition: time_util.h:54
bool FindSloMatchRule(const SessionSloRuleMap &map, const std::string &fw_policy_uuid, const std::string &nw_policy_uuid, const std::string &sg_policy_uuid, const bool &deleted_flag, bool *match, const bool &exported_once)