10 #include <sandesh/common/flow_types.h>
18 #include <vrouter/flow_stats/flow_stats_types.h>
22 #define DEFAULT_SSC_REQUEST_QUEUE_SIZE 4*1024*1024
23 #define MAX_SSC_REQUEST_QUEUE_ITERATIONS 256
26 "SessionStats", 4000));
36 io, kSessionStatsTimerInterval,
"Session stats collector"),
38 task_id_(uve->agent()->task_scheduler()->GetTaskId
40 session_ep_iteration_key_(), session_agg_iteration_key_(),
41 session_iteration_key_(),
42 request_queue_(agent_uve_->agent()->task_scheduler()->
49 session_msg_list_(agent_uve_->agent()->params()->max_endpoints_per_session_msg(),
51 session_msg_index_(0), instance_id_(instance_id),
52 flow_stats_manager_(aging_module), parent_(obj), session_task_(NULL),
53 current_time_(GetCurrentTime()), session_task_starts_(0) {
112 bool export_rate_calculated =
false;
113 uint32_t exp_rate_without_sampling = 0;
122 uint64_t diff_secs = 0;
123 uint64_t diff_micro_secs = curr_time -
125 if (diff_micro_secs) {
126 diff_secs = diff_micro_secs/1000000;
131 exp_rate_without_sampling =
133 prev_flow_export_rate_compute_time_ = curr_time;
134 export_rate_calculated =
true;
146 if (!export_rate_calculated &&
187 boost::shared_ptr<SessionStatsReq>
195 boost::shared_ptr<SessionStatsReq>
204 uint32_t oflow_bytes,
206 boost::shared_ptr<SessionStatsReq>
208 bytes, packets, oflow_bytes, u));
215 SESSION_ENDPOINT_OBJECT_LOG(
"", SandeshLevel::SYS_INFO, lst);
235 vector<SessionEndpoint> new_list(first, last);
242 obj = SessionEndpoint();
259 switch (req->event()) {
272 req->oflow_bytes(), req->uuid());
409 uuid = boost::uuids::nil_uuid();
464 session_endpoint_key.
is_si =
false;
466 session_endpoint_key.
is_si=
true;
474 session_endpoint_key.
local_vn = src_vn;
488 session_endpoint_key.
local_vn = src_vn;
495 session_endpoint_key.
local_vn = dst_vn;
509 session_flow->
flow = fe;
529 TagList::const_iterator it = slist.begin();
530 while (it != slist.end()) {
538 bool rev_flow_params) {
539 SessionTraceInfo info;
547 info.is_si = ep.
is_si;
549 info.local_ip = agg.
local_ip.to_string();
551 info.protocol = agg.
proto;
552 info.remote_ip = session.
remote_ip.to_string();
554 info.flow_uuid = to_string(session.
uuid);
560 SessionEndpointInfo::SessionAggMap::iterator session_agg_map_iter;
564 SessionPreAggInfo::SessionMap::iterator session_map_iter;
567 SessionEndpointMap::iterator session_endpoint_map_iter;
581 success =
GetSessionKey(fe_fwd, session_agg_key, session_key,
582 session_endpoint_key);
594 FlowSessionMap::iterator flow_session_map_iter;
601 session_endpoint_key);
602 if (!(flow_to_session_map.
IsEqual(rhs_flow_to_session_map))) {
608 TraceSession(
"Add", session_endpoint_key, session_agg_key, session_key,
613 session_endpoint_key);
615 session_agg_info.
session_map_.insert(make_pair(session_key, session));
617 make_pair(session_agg_key, session_agg_info));
619 session_endpoint_info));
621 session_endpoint_key);
623 session_agg_map_iter = session_endpoint_map_iter->
624 second.session_agg_map_.find(
626 if (session_agg_map_iter ==
627 session_endpoint_map_iter->second.session_agg_map_.end()) {
628 session_agg_info.
session_map_.insert(make_pair(session_key, session));
629 session_endpoint_map_iter->second.session_agg_map_.insert(
630 make_pair(session_agg_key, session_agg_info));
632 session_endpoint_key);
635 session_agg_map_iter->second.session_map_.find(session_key);
636 if (session_map_iter ==
637 session_agg_map_iter->second.session_map_.end()) {
638 session_agg_map_iter->second.session_map_.insert(
639 make_pair(session_key, session));
641 session_endpoint_key);
654 uint64_t teardown_time,
657 SessionEndpointInfo::SessionAggMap::iterator session_agg_map_iter;
660 SessionPreAggInfo::SessionMap::iterator session_map_iter;
663 SessionEndpointMap::iterator session_endpoint_map_iter;
664 bool read_flow =
true;
666 if (del_uuid != fe->
uuid()) {
674 FlowSessionMap::iterator flow_session_map_iter;
678 if (del_uuid != flow_session_map_iter->second.session_key().uuid) {
691 session_endpoint_key = flow_session_map_iter->second.session_endpoint_key();
692 session_agg_key = flow_session_map_iter->second.session_agg_key();
693 session_key = flow_session_map_iter->second.session_key();
701 bool params_valid =
true;
702 if (params == NULL) {
703 params_valid =
false;
706 TraceSession(
"Del", session_endpoint_key, session_agg_key, session_key,
709 session_endpoint_key);
711 session_agg_map_iter = session_endpoint_map_iter->
712 second.session_agg_map_.find(
714 if (session_agg_map_iter !=
715 session_endpoint_map_iter->second.session_agg_map_.end()) {
717 session_agg_map_iter->second.session_map_.find(session_key);
718 if (session_map_iter !=
719 session_agg_map_iter->second.session_map_.end()) {
723 session_map_iter->second.teardown_time = teardown_time;
724 session_map_iter->second.deleted =
true;
726 if (!session_map_iter->second.evicted) {
728 &session_map_iter->second.del_stats);
734 assert(session_map_iter->second.fwd_flow.flow.get() == fe);
736 session_map_iter->second.fwd_flow.flow = NULL;
737 session_map_iter->second.rev_flow.flow = NULL;
746 uint32_t oflow_bytes,
748 FlowSessionMap::iterator flow_session_map_iter;
749 SessionInfo session_info;
750 SessionIpPort session_key;
751 SessionAggInfo session_agg_info;
752 SessionIpPortProtocol session_agg_key;
753 SessionEndpointMap::iterator session_ep_map_iter;
754 SessionEndpointInfo::SessionAggMap::iterator session_agg_map_iter;
755 SessionPreAggInfo::SessionMap::iterator session_map_iter;
764 if (flow_session_map_iter->second.session_key().uuid != u) {
768 SessionEndpointKey session_db_ep_key = flow_session_map_iter->second.session_endpoint_key();
769 SessionAggKey session_db_agg_key = flow_session_map_iter->second.session_agg_key();
770 SessionKey session_db_key = flow_session_map_iter->second.session_key();
774 session_agg_map_iter = session_ep_map_iter->
775 second.session_agg_map_.find(
777 if (session_agg_map_iter !=
778 session_ep_map_iter->second.session_agg_map_.end()) {
780 session_agg_map_iter->second.session_map_.find(session_db_key);
781 if (session_map_iter !=
782 session_agg_map_iter->second.session_map_.end()) {
787 uint64_t k_bytes, total_bytes, diff_bytes = 0;
788 uint64_t k_packets, total_packets, diff_packets = 0;
797 diff_bytes = total_bytes - session_flow.
total_bytes;
804 session_map_iter->second.evicted =
true;
818 session_endpoint_key);
819 std::pair<FlowSessionMap::iterator, bool> ret =
821 if (ret.second ==
false) {
828 FlowSessionMap::iterator flow_session_map_iter;
847 vector<autogen::SecurityLoggingObjectRuleEntryType>::const_iterator it;
848 it = slo->
rules().begin();
849 while (it != slo->
rules().end()) {
856 AclKey key(acl_it->uuid_);
858 acl_table()->FindActiveEntry(&key));
872 SloRuleList::const_iterator fw_rule_it;
909 std::pair<SessionSloRuleMap::iterator, bool> ret;
910 ret = slo_rule_map->insert(make_pair(uuid,
915 const std::vector<autogen::SecurityLoggingObjectRuleEntryType> &list,
918 vector<autogen::SecurityLoggingObjectRuleEntryType>::const_iterator it;
920 while (it != list.end()) {
929 SloRuleList::const_iterator acl_it = list.begin();
930 while (acl_it != list.end()) {
931 AclKey key(acl_it->uuid_);
933 acl_table()->FindActiveEntry(&key));
951 SloRuleList::const_iterator it = list.begin();
952 while (it != list.end()) {
980 UuidList::const_iterator sit = slo_list.begin();
981 while (sit != slo_list.end()) {
1015 vmi_session_slo_rule_map->clear();
1016 vn_session_slo_rule_map->clear();
1017 global_session_slo_rule_map->clear();
1020 boost::uuids::nil_uuid()) {
1023 global_session_slo_rule_map);
1030 MakeSloList(fe, vmi_session_slo_rule_map, vn_session_slo_rule_map);
1036 const std::string &match_uuid,
1054 const std::string &policy_uuid,
1055 const bool &deleted_flag,
1057 const bool &exported_once) {
1058 SessionSloRuleMap::const_iterator it;
1059 if (!policy_uuid.empty()) {
1060 it = map.find(policy_uuid);
1061 if (it != map.end()) {
1066 if (deleted_flag && exported_once) {
1078 const std::string &fw_policy_uuid,
1079 const std::string &nw_policy_uuid,
1080 const std::string &sg_policy_uuid,
1081 const bool &deleted_flag,
1083 const bool &exported_once) {
1084 SessionSloRuleMap::const_iterator it;
1085 bool fw_logged =
false, nw_logged =
false, sg_logged =
false;
1086 bool fw_match =
false, nw_match =
false, sg_match =
false;
1089 &fw_match, exported_once);
1091 &nw_match, exported_once);
1093 &sg_match, exported_once);
1095 if (fw_match || nw_match || sg_match) {
1101 if (fw_logged || nw_logged || sg_logged) {
1110 const std::string &fw_policy_uuid,
1111 const std::string &nw_policy_uuid,
1112 const std::string &sg_policy_uuid,
1113 const bool &deleted_flag,
1115 const bool &exported_once) {
1117 bool is_vmi_slo_logged, is_vn_slo_logged, is_global_slo_logged;
1118 bool vmi_slo_match, vn_slo_match, global_slo_match;
1127 &global_session_slo_rule_map,
1128 &vmi_session_slo_rule_map,
1129 &vn_session_slo_rule_map);
1157 if ((is_vmi_slo_logged) ||
1158 (is_vn_slo_logged) ||
1159 (is_global_slo_logged)) {
1162 if (vmi_slo_match || vn_slo_match || global_slo_match) {
1171 std::string &fw_policy_uuid,
1172 std::string &nw_policy_uuid,
1173 std::string &sg_policy_uuid) {
1182 std::string &fw_policy_uuid,
1183 std::string &nw_policy_uuid,
1184 std::string &sg_policy_uuid) {
1195 const bool &exported_once) {
1197 bool matched =
false, deleted_flag=
false;
1198 std::string fw_policy_uuid =
"", nw_policy_uuid =
"", sg_policy_uuid =
"";
1221 const bool &exported_once) {
1223 bool matched =
false, deleted_flag =
true;
1224 std::string fw_policy_uuid =
"", nw_policy_uuid =
"", sg_policy_uuid =
"";
1246 bool logged =
false;
1269 bool logged =
false;
1324 uint64_t k_flow_bytes)
const {
1325 uint64_t oflow_bytes = 0xffff000000000000ULL & info_bytes;
1326 uint64_t old_bytes = 0x0000ffffffffffffULL & info_bytes;
1327 if (old_bytes > k_flow_bytes) {
1328 oflow_bytes += 0x0001000000000000ULL;
1330 return (oflow_bytes |= k_flow_bytes);
1334 uint64_t info_packets,
1335 uint64_t k_flow_pkts)
const {
1336 uint64_t oflow_pkts = 0xffffff0000000000ULL & info_packets;
1337 uint64_t old_pkts = 0x000000ffffffffffULL & info_packets;
1338 if (old_pkts > k_flow_pkts) {
1339 oflow_pkts += 0x0000010000000000ULL;
1341 return (oflow_pkts |= k_flow_pkts);
1347 if (fe->
uuid() != u) {
1409 bool is_sampling,
bool is_logging)
const {
1413 flow_info->set_tcp_flags(stats.
tcp_flags);
1417 flow_info->set_sampled_bytes(stats.
diff_bytes);
1421 flow_info->set_logged_bytes(stats.
diff_bytes);
1429 std::string action_str, drop_reason =
"";
1431 flow_info->set_flow_uuid(session_flow.
uuid);
1435 flow_info->set_teardown_bytes(session_flow.
total_bytes);
1442 if (!einfo.
action.empty()) {
1443 flow_info->set_action(einfo.
action);
1457 flow_info->set_action(action_str);
1462 flow_info->set_drop_reason(drop_reason);
1468 SessionPreAggInfo::SessionMap::iterator session_map_iter) {
1469 FlowEntry *fe = session_map_iter->second.fwd_flow.flow.get();
1470 FlowEntry *rfe = session_map_iter->second.rev_flow.flow.get();
1481 (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1483 FlowEntry *fe = session_map_iter->second.fwd_flow.flow.get();
1484 FlowEntry *rfe = session_map_iter->second.rev_flow.flow.get();
1486 return SessionStatsChangedUnlocked(session_map_iter, params);
1490 (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1493 bool fwd_updated = FetchFlowStats(&session_map_iter->second.fwd_flow,
1495 bool rev_updated = FetchFlowStats(&session_map_iter->second.rev_flow,
1497 return (fwd_updated || rev_updated);
1502 vr_flow_stats k_stats;
1504 uint64_t k_bytes, bytes, k_packets;
1505 const vr_flow_entry *k_flow = NULL;
1507 ksync_flow_memory();
1512 if (fe && (info->
uuid == fe->
uuid())) {
1519 info->
gen_id, &k_stats, &kinfo);
1521 SandeshFlowKey skey;
1522 skey.set_nh(info->
flow->key().nh);
1523 skey.set_sip(info->
flow->key().src_addr.to_string());
1524 skey.set_dip(info->
flow->key().dst_addr.to_string());
1525 skey.set_src_port(info->
flow->key().src_port);
1526 skey.set_dst_port(info->
flow->key().dst_port);
1527 skey.set_protocol(info->
flow->key().protocol);
1534 k_stats.flow_bytes);
1536 k_stats.flow_packets);
1538 bytes = 0x0000ffffffffffffULL & info->
total_bytes;
1540 if (bytes != k_bytes) {
1541 uint64_t total_bytes = GetUpdatedSessionFlowBytes(info->
total_bytes,
1543 uint64_t total_packets = GetUpdatedSessionFlowPackets
1551 params->
valid =
true;
1558 (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1560 SessionIpPort *session_key,
bool is_sampling,
bool is_logging)
const {
1561 FlowEntry *fe = session_map_iter->second.fwd_flow.flow.get();
1562 FlowEntry *rfe = session_map_iter->second.rev_flow.flow.get();
1564 FillSessionInfoUnlocked(session_map_iter, stats, session_info, session_key, NULL,
1565 true, is_sampling, is_logging);
1569 (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1570 SessionInfo *session_info,
bool is_sampling,
bool is_logging)
const {
1576 session_info->forward_flow_info.set_logged_pkts(estats.
fwd_flow.
1578 session_info->forward_flow_info.set_logged_bytes(estats.
fwd_flow.
1581 session_info->reverse_flow_info.set_logged_pkts(0);
1582 session_info->reverse_flow_info.set_logged_bytes(0);
1585 session_info->forward_flow_info.set_sampled_pkts(estats.
fwd_flow.
1587 session_info->forward_flow_info.set_sampled_bytes(estats.
fwd_flow.
1590 session_info->reverse_flow_info.set_sampled_pkts(0);
1591 session_info->reverse_flow_info.set_sampled_bytes(0);
1596 (SessionPreAggInfo::SessionMap::iterator session_map_iter,
1598 SessionInfo *session_info,
1599 SessionIpPort *session_key,
1601 bool read_flow,
bool is_sampling,
bool is_logging)
const {
1602 string rid = agent_uve_->agent()->router_id().to_string();
1603 FlowEntry *fe = session_map_iter->second.fwd_flow.flow.get();
1604 FlowEntry *rfe = session_map_iter->second.rev_flow.flow.get();
1605 boost::system::error_code ec;
1609 session_key->set_ip(session_map_iter->first.remote_ip);
1610 session_key->set_port(session_map_iter->first.client_port);
1611 FillSessionFlowInfo(session_map_iter->second.fwd_flow,
1612 session_map_iter->second,
1613 session_map_iter->second.export_info.fwd_flow,
1614 &session_info->forward_flow_info);
1615 FillSessionFlowInfo(session_map_iter->second.rev_flow,
1616 session_map_iter->second,
1617 session_map_iter->second.export_info.rev_flow,
1618 &session_info->reverse_flow_info);
1619 bool first_time_export =
false;
1620 if (!session_map_iter->second.exported_atleast_once) {
1621 first_time_export =
true;
1623 session_map_iter->second.exported_atleast_once =
true;
1626 const bool &evicted = session_map_iter->second.evicted;
1627 const bool &deleted = session_map_iter->second.deleted;
1630 FillSessionEvictStats(session_map_iter, session_info, is_sampling,
1632 flow_stats_manager_->UpdateSessionExportStats(1, first_time_export,
1637 real_stats = &session_map_iter->second.del_stats;
1639 FillSessionFlowStats(real_stats->
fwd_flow,
1640 &session_info->forward_flow_info, is_sampling,
1642 FillSessionFlowStats(real_stats->
rev_flow,
1643 &session_info->reverse_flow_info, is_sampling,
1645 flow_stats_manager_->UpdateSessionExportStats(1, first_time_export,
1654 session_info->set_other_vrouter_ip(
1667 session_info->set_other_vrouter_ip(
1675 SessionAggInfo *agg_info,
1677 bool is_logging)
const {
1679 agg_info->set_sampled_forward_bytes(agg_info->get_sampled_forward_bytes() +
1680 sinfo.get_forward_flow_info().get_sampled_bytes());
1681 agg_info->set_sampled_forward_pkts(agg_info->get_sampled_forward_pkts() +
1682 sinfo.get_forward_flow_info().get_sampled_pkts());
1683 agg_info->set_sampled_reverse_bytes(agg_info->get_sampled_reverse_bytes() +
1684 sinfo.get_reverse_flow_info().get_sampled_bytes());
1685 agg_info->set_sampled_reverse_pkts(agg_info->get_sampled_reverse_pkts() +
1686 sinfo.get_reverse_flow_info().get_sampled_pkts());
1689 agg_info->set_logged_forward_bytes(agg_info->get_logged_forward_bytes() +
1690 sinfo.get_forward_flow_info().get_logged_bytes());
1691 agg_info->set_logged_forward_pkts(agg_info->get_logged_forward_pkts() +
1692 sinfo.get_forward_flow_info().get_logged_pkts());
1693 agg_info->set_logged_reverse_bytes(agg_info->get_logged_reverse_bytes() +
1694 sinfo.get_reverse_flow_info().get_logged_bytes());
1695 agg_info->set_logged_reverse_pkts(agg_info->get_logged_reverse_pkts() +
1696 sinfo.get_reverse_flow_info().get_logged_pkts());
1701 (SessionEndpointInfo::SessionAggMap::iterator it, SessionIpPortProtocol *key)
1706 key->set_local_ip(it->first.local_ip);
1707 key->set_service_port(it->first.server_port);
1708 key->set_protocol(it->first.proto);
1712 SessionEndpoint *ep)
const {
1718 if (!tinfo.
tier.empty()) {
1719 ep->set_tier(tinfo.
tier);
1721 if (!tinfo.
site.empty()) {
1722 ep->set_site(tinfo.
site);
1736 SessionEndpoint *ep)
const {
1742 if (!tinfo.
tier.empty()){
1743 ep->set_remote_tier(tinfo.
tier);
1745 if (!tinfo.
site.empty()){
1746 ep->set_remote_site(tinfo.
site);
1760 SessionEndpoint *session_ep)
1763 boost::system::error_code ec;
1765 session_ep->set_vmi(it->first.vmi_cfg_name);
1766 session_ep->set_vn(it->first.local_vn);
1767 session_ep->set_remote_vn(it->first.remote_vn);
1768 session_ep->set_is_client_session(it->first.is_client_session);
1769 session_ep->set_is_si(it->first.is_si);
1770 if (!it->first.remote_prefix.empty()) {
1771 session_ep->set_remote_prefix(it->first.remote_prefix);
1773 session_ep->set_security_policy_rule(it->first.match_policy);
1774 if (it->first.local_tagset.size() > 0) {
1777 if (it->first.remote_tagset.size() > 0) {
1784 (
const SessionEndpointMap::iterator &it) {
1785 SessionEndpointInfo::SessionAggMap::iterator session_agg_map_iter;
1786 SessionEndpointInfo::SessionAggMap::iterator prev_agg_iter;
1787 SessionPreAggInfo::SessionMap::iterator session_map_iter, prev;
1789 SessionInfo session_info;
1790 SessionIpPort session_key;
1791 uint32_t session_count = 0, session_agg_count = 0;
1792 bool exit =
false, ep_completed =
true;
1794 SessionEndpoint &session_ep = session_msg_list_[GetSessionMsgIdx()];
1796 session_agg_map_iter = it->second.session_agg_map_.
1797 lower_bound(session_agg_iteration_key_);
1798 while (session_agg_map_iter != it->second.session_agg_map_.end()) {
1799 SessionAggInfo session_agg_info;
1800 SessionIpPortProtocol session_agg_key;
1802 session_map_iter = session_agg_map_iter->second.session_map_.
1803 lower_bound(session_iteration_key_);
1804 while (session_map_iter != session_agg_map_iter->second.session_map_.end()) {
1805 prev = session_map_iter;
1807 if (!session_map_iter->second.deleted &&
1808 !session_map_iter->second.evicted) {
1809 bool delete_marked = CheckAndDeleteSessionStatsFlow(session_map_iter);
1810 if (!delete_marked) {
1811 bool changed = SessionStatsChangedLocked(session_map_iter,
1813 if (!changed && session_map_iter->second.exported_atleast_once) {
1820 bool is_sampling =
true;
1821 if (IsSamplingEnabled()) {
1822 is_sampling = SampleSession(session_map_iter, ¶ms);
1824 bool is_logging = CheckSessionLogging(session_map_iter->second);
1827 if (!is_sampling && !is_logging) {
1829 if (prev->second.deleted) {
1830 session_agg_map_iter->second.session_map_.erase(prev);
1834 if (session_map_iter->second.deleted) {
1835 FillSessionInfoUnlocked(session_map_iter, params, &session_info,
1836 &session_key, NULL,
true, is_sampling,
1839 FillSessionInfoLocked(session_map_iter, params, &session_info,
1840 &session_key, is_sampling, is_logging);
1842 session_agg_info.sessionMap.insert(make_pair(session_key,
1844 UpdateAggregateStats(session_info, &session_agg_info, is_sampling,
1848 if (prev->second.deleted) {
1849 session_agg_map_iter->second.session_map_.erase(prev);
1851 if (session_count ==
1852 agent_uve_->agent()->params()->max_sessions_per_aggregate()) {
1857 if (session_count) {
1858 FillSessionAggInfo(session_agg_map_iter, &session_agg_key);
1859 session_ep.sess_agg_info.insert(make_pair(session_agg_key,
1865 session_iteration_key_.Reset();
1866 prev_agg_iter = session_agg_map_iter;
1867 session_agg_map_iter++;
1868 if (prev_agg_iter->second.session_map_.size() == 0) {
1869 it->second.session_agg_map_.erase(prev_agg_iter);
1871 ++session_agg_count;
1872 if (session_agg_count ==
1873 agent_uve_->agent()->params()->max_aggregates_per_session_endpoint()) {
1878 if (session_ep.sess_agg_info.size()) {
1879 FillSessionEndpoint(it, &session_ep);
1880 EnqueueSessionMsg();
1883 if (session_count ==
1884 agent_uve_->agent()->params()->max_sessions_per_aggregate()) {
1885 ep_completed =
false;
1886 session_ep_iteration_key_ = it->first;
1887 session_agg_iteration_key_ = session_agg_map_iter->first;
1888 if (session_map_iter == session_agg_map_iter->second.session_map_.end()) {
1889 prev_agg_iter = session_agg_map_iter;
1890 ++session_agg_map_iter;
1891 if (prev_agg_iter->second.session_map_.size() == 0) {
1892 it->second.session_agg_map_.erase(prev_agg_iter);
1894 if (session_agg_map_iter == it->second.session_agg_map_.end()) {
1898 ep_completed =
true;
1900 session_iteration_key_.Reset();
1901 session_agg_iteration_key_ = session_agg_map_iter->first;
1904 session_iteration_key_ = session_map_iter->first;
1906 }
else if (session_agg_count ==
1907 agent_uve_->agent()->params()->
1908 max_aggregates_per_session_endpoint()) {
1909 ep_completed =
false;
1910 session_ep_iteration_key_ = it->first;
1911 if (session_agg_map_iter == it->second.session_agg_map_.end()) {
1915 ep_completed =
true;
1917 session_agg_iteration_key_ = session_agg_map_iter->first;
1918 session_iteration_key_.Reset();
1921 return ep_completed;
1935 while (count < max_count) {
1947 SessionEndpointMap::iterator prev = it;
1952 if (prev->second.session_agg_map_.size() == 0) {
1980 Task(ssc->task_id(), ssc->instance_id()), ssc_(ssc) {
1987 return "Session Stats Collector Task";
2005 (SessionPreAggInfo::SessionMap::iterator session_map_iter,
2014 const bool &deleted = session_map_iter->second.deleted;
2015 const bool &evicted = session_map_iter->second.evicted;
2018 stats = &session_map_iter->second.evict_stats;
2019 }
else if (deleted) {
2020 stats = &session_map_iter->second.del_stats;
2035 bool subject_flows_to_algorithm =
false;
2041 subject_flows_to_algorithm =
true;
2045 if (subject_flows_to_algorithm) {
2046 double probability = diff_bytes/
threshold();
2048 if (num > diff_bytes) {
2067 if (probability == 0) {
2096 for (
int i = 0; i < kMaxSessionCollectors; i++) {
2098 boost::asio::io_context& io_ref =
2099 const_cast<boost::asio::io_context&
>
2101 collectors[i].reset(
2102 AgentStaticObjectFactory::CreateRef<SessionStatsCollector>(
2109 if (idx >= 0 && idx < kMaxSessionCollectors) {
2110 return collectors[idx].get();
2116 for (
int i = 0; i < kMaxSessionCollectors; i++) {
2117 collectors[i]->set_expiry_time(time);
2124 return collectors[0]->expiry_time();
2132 idx = table->
table_index() % kMaxSessionCollectors;
2134 return collectors[idx].get();
2138 for (
int i = 0; i < kMaxSessionCollectors; i++) {
2139 collectors[i]->Shutdown();
2140 collectors[i].reset();
2146 for (
int i = 0; i < kMaxSessionCollectors; i++) {
2147 size += collectors[i]->Size();
2153 for (
int i = 0; i < kMaxSessionCollectors; i++) {
2154 if (collectors[i].
get()) {
2155 collectors[i].get()->RegisterDBClients();
2182 SessionSloRuleStateMap::iterator it;
2183 it = session_rule_state_map_.find(uuid);
2184 if (it != session_rule_state_map_.end()) {
2185 session_rule_state_map_.erase(it);
2191 SessionSloRuleStateMap::iterator it;
2192 it = session_rule_state_map_.find(uuid);
2193 if (it == session_rule_state_map_.end()) {
2195 slo_state_rule_entry.
rate = rate;
2196 session_rule_state_map_.insert(make_pair(uuid, slo_state_rule_entry));
2199 if (prev.
rate != rate) {
2208 const std::string &
uuid,
2210 SessionSloRuleStateMap::iterator it;
2211 bool is_logged =
false;
2212 it = session_rule_state_map_.find(uuid);
2213 if (it != session_rule_state_map_.end()) {
2215 if (it->second.ref_count == 0) {
2218 it->second.ref_count++;
2219 if (it->second.ref_count == it->second.rate) {
2220 it->second.ref_count = 0;
bool FlowLogging(const SessionStatsInfo &stats_info, const FlowEntry *fe, bool *logged, const bool &exported_once)
FlowEntry * reverse_flow() const
uint32_t session_export_count() const
uint16_t max_endpoints_per_session_msg() const
const UuidList & slo_list() const
uint8_t GetSessionMsgIdx()
void SetBounded(bool bounded)
uint64_t GetUpdatedSessionFlowBytes(uint64_t info_bytes, uint64_t k_flow_bytes) const
void UpdateSessionSloStateRuleEntry(std::string uuid, int rate)
SessionEndpointKey session_endpoint_key()
bool GetSessionKey(FlowEntry *fe, SessionAggKey &session_agg_key, SessionKey &session_key, SessionEndpointKey &session_endpoint_key)
bool IsEqual(const SessionEndpointKey &rhs) const
bool IsSamplingEnabled() const
const TagList & local_tagset() const
void SetEntryCallback(TaskEntryCallback on_entry)
void GetPolicyIdFromDeletedFlow(const SessionFlowExportInfo &flow_info, std::string &fw_policy_uuid, std::string &nw_policy_uuid, std::string &sg_policy_uuid)
tbb::atomic< bool > sessions_sampled_atleast_once_
uint8_t session_msg_index_
tbb::atomic< uint64_t > session_slo_logging_drops_
const vr_flow_entry * GetKFlowStatsAndInfo(const FlowKey &key, uint32_t idx, uint8_t gen_id, vr_flow_stats *stats, KFlowData *info) const
bool SampleSession(SessionPreAggInfo::SessionMap::iterator session_map_iter, SessionStatsParams *params) const
static uint64_t GetFlowStats(const uint16_t &oflow_data, const uint32_t &data)
AgentUveBase * agent_uve_
std::vector< SessionEndpoint > session_msg_list_
static boost::uuids::uuid StringToUuid(const std::string &str)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
#define MAX_SSC_REQUEST_QUEUE_ITERATIONS
bool HandleDeletedFlowLogging(const SessionStatsInfo &stats_info)
void SetExpiryTime(int time)
void Shutdown(bool delete_entries=true)
#define CheckFlowLogging(logged)
DBState * GetState(DBTableBase *tbl_base, ListenerId listener) const
SessionKey session_iteration_key_
const std::string fw_policy_name_uuid() const
std::string dest_vn_match
#define SESSION_STATS_TRACE(obj,...)
FlowTable * flow_table() const
int ComputeSloRate(int rate, SecurityLoggingObject *slo) const
DBTableBase * get_table() const
uint32_t instance_id() const
SessionFlowStatsParams rev_flow
static void BuildTraceTagList(const TagList &slist, vector< string > *dlist)
std::string source_vn_match
AgentUveBase * uve() const
#define DEFAULT_SSC_REQUEST_QUEUE_SIZE
std::vector< SloRuleInfo > SloRuleList
void GetPolicyIdFromFlow(const FlowEntry *fe, std::string &fw_policy_uuid, std::string &nw_policy_uuid, std::string &sg_policy_uuid)
#define kTaskSessionStatsCollectorEvent
bool IsEqual(FlowToSessionMap &rhs)
void UpdateSessionStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void BuildSloList(const SessionStatsInfo &stats_info, const FlowEntry *fe, SessionSloRuleMap *global_session_slo_rule_map, SessionSloRuleMap *vmi_session_slo_rule_map, SessionSloRuleMap *vn_session_slo_rule_map)
std::map< std::string, SessionSloRuleEntry > SessionSloRuleMap
DBTable::ListenerId slo_listener_id_
bool SessionStatsChangedLocked(SessionPreAggInfo::SessionMap::iterator session_map_iter, SessionStatsParams *params) const
void SetState(DBTableBase *tbl_base, ListenerId listener, DBState *state)
boost::asio::ip::address IpAddress
static const uint32_t kSessionsPerTask
const UuidList & slo_list() const
void AddSloRules(const std::vector< autogen::SecurityLoggingObjectRuleEntryType > &list, SecurityLoggingObject *slo, SessionSloRuleMap *slo_rule_map)
uint32_t session_export_without_sampling_reset()
virtual void DispatchSessionMsg(const std::vector< SessionEndpoint > &lst)
static bool ShouldDrop(uint32_t action)
boost::asio::io_context * io_service()
void DeleteSession(FlowEntry *fe, const boost::uuids::uuid &del_uuid, uint64_t teardown_time, const RevFlowDepParams *params)
SessionStatsCollector(boost::asio::io_context &io, AgentUveBase *uve, uint32_t instance_id, FlowStatsManager *fsm, SessionStatsCollectorObject *obj)
void FillSessionEvictStats(SessionPreAggInfo::SessionMap::iterator session_map_iter, SessionInfo *session_info, bool is_sampling, bool is_logging) const
void FillSessionInfoUnlocked(SessionPreAggInfo::SessionMap::iterator session_map_iter, const SessionStatsParams &stats, SessionInfo *session_info, SessionIpPort *session_key, const RevFlowDepParams *params, bool read_flow, bool is_sampling, bool is_logging) const
#define FLOW_LOCK(flow, rflow, flow_event)
AgentDBEntry * FindActiveEntry(const DBEntry *key)
void UpdateAggregateStats(const SessionInfo &sinfo, SessionAggInfo *agg_info, bool is_sampling, bool is_logging) const
void BuildTagIdsFromList(const TagList &tl, UveTagData *info) const
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
uint16_t table_index() const
uint64_t session_task_starts_
tbb::atomic< uint64_t > session_export_disable_drops_
bool CheckPolicyMatch(const SessionSloRuleMap &map, const std::string &policy_uuid, const bool &deleted_flag, bool *match, const bool &exported_once)
SessionEndpointKey session_ep_iteration_key_
void set_sessions_sampled_atleast_once()
uint32_t session_ep_visited_
tbb::atomic< uint64_t > session_global_slo_logging_drops_
tbb::atomic< uint64_t > session_export_drops_
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
uint64_t threshold() const
SessionEndpointMap session_endpoint_map_
uint32_t session_export_rate_
#define kTaskSessionStatsCollector
uint32_t session_export_count_reset()
void BuildTagNamesFromList(const TagList &tl, UveTagData *info) const
const std::string & sg_rule_uuid() const
const std::string & service_intf_type() const
const std::string & peer_vrouter() const
static const int32_t kDisableSampling
TunnelType tunnel_type() const
const FlowKey & key() const
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
void AddSloFirewallRules(SecurityLoggingObject *slo, SessionSloRuleMap *rule_map)
bool IsEqual(const SessionKey &rhs) const
void SloNotify(DBTablePartBase *partition, DBEntryBase *e)
bool UpdateSessionSloStateRuleRefCount(const std::string &uuid, bool *matc)
bool HandleFlowLogging(const SessionStatsInfo &stats_info)
static const uint32_t kDefaultFlowSamplingThreshold
bool MatchSloForFlow(const SessionStatsInfo &stats_info, const FlowEntry *fe, const std::string &fw_policy_uuid, const std::string &nw_policy_uuid, const std::string &sg_policy_uuid, const bool &deleted_flag, bool *logged, const bool &exported_once)
uint32_t prev_cfg_flow_export_rate_
SloRuleList & firewall_policy_list()
FlowSessionMap flow_session_map_
GlobalVrouter * global_vrouter() const
void AddSloEntry(const boost::uuids::uuid &uuid, SessionSloRuleMap *slo_rule_map)
const std::vector< autogen::SecurityLoggingObjectRuleEntryType > & rules() const
bool is_flags_set(const FlowEntryFlags &flags) const
bool UpdateSessionThreshold(void)
TaskScheduler * task_scheduler() const
std::string other_vrouter
void SetExitCallback(TaskExitCallback on_exit)
static const std::string integerToString(const NumberType &num)
SessionStatsCollector * FlowToCollector(const FlowEntry *flow)
void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams ¶ms)
const std::string fw_policy_uuid() const
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
const boost::uuids::uuid & uuid() const
Ip4Address router_id() const
bool CheckAndDeleteSessionStatsFlow(SessionPreAggInfo::SessionMap::iterator session_map_iter)
bool IsLess(const SessionAggKey &rhs) const
EventManager * event_manager() const
bool RequestHandler(boost::shared_ptr< SessionStatsReq > req)
void UpdateSessionMsgExportStats(uint32_t count)
void AddSessionSloRuleEntry(const std::string &uuid, int rate, SecurityLoggingObject *slo, SessionSloRuleMap *slo_rule_map)
void CopyFlowInfo(SessionStatsInfo &session, const RevFlowDepParams *params)
bool exported_atleast_once
SloRuleList & firewall_rule_list()
bool FetchFlowStats(SessionFlowStatsInfo *info, SessionFlowStatsParams *params) const
SessionFlowStatsInfo rev_flow
void AddSloList(const UuidList &slo_list, SessionSloRuleMap *slo_rule_map)
void DeleteFlowToSessionMap(FlowEntry *fe)
void FillSessionFlowInfo(const SessionFlowStatsInfo &session_flow, const SessionStatsInfo &sinfo, const SessionFlowExportInfo &einfo, SessionFlowInfo *flow_info) const
static std::string DropReasonStr(uint16_t reason)
void set_measure_busy_time(bool val) const
SessionStatsCollector * GetCollector(uint8_t idx) const
void FillSessionTags(const TagList &list, SessionEndpoint *ep) const
const VnEntry * vn() const
std::string origin_vn_src
AgentParam * params() const
static void GetFlowSandeshActionParams(const FlowAction &action_info, std::string &action_str)
static uint64_t GetCurrentTime()
SandeshTraceBufferPtr SessionStatsTraceBuf
FlowStatsManager * flow_stats_manager_
void AddSloEntryRules(SecurityLoggingObject *slo, SessionSloRuleMap *slo_rule_map)
boost::uuids::uuid slo_uuid() const
const boost::uuids::uuid & uuid() const
void FillSessionFlowStats(const SessionFlowStatsParams &stats, SessionFlowInfo *flow_info, bool is_sampling, bool is_logging) const
void ClearState(DBTableBase *tbl_base, ListenerId listener)
std::string aps_rule_uuid
std::vector< boost::uuids::uuid > UuidList
bool DeletedFlowLogging(const SessionStatsInfo &stats_info, const SessionFlowExportInfo &flow_info, bool *logged, const bool &exported_once)
void AddEvent(const FlowEntryPtr &flow)
void UpdateSessionFlowStatsInfo(FlowEntry *fe, SessionFlowStatsInfo *session_flow) const
std::set< string > custom_tag_set
const Interface * intf_entry() const
std::string origin_vn_dst
int GetExpiryTime() const
const TagList & remote_tagset() const
bool global_slo_status() const
uint64_t prev_flow_export_rate_compute_time_
const AclEntry * GetAclEntryAtIndex(uint32_t) const
void AddFlowToSessionMap(FlowEntry *fe, SessionKey session_key, SessionAggKey session_agg_key, SessionEndpointKey session_endpoint_key)
void AddSloFirewallPolicies(SecurityLoggingObject *slo, SessionSloRuleMap *r_map)
void FillSessionInfoLocked(SessionPreAggInfo::SessionMap::iterator session_map_iter, const SessionStatsParams &stats, SessionInfo *session_info, SessionIpPort *session_key, bool is_sampling, bool is_logging) const
SessionTask * session_task_
SessionFlowStatsInfo fwd_flow
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
SessionFlowExportInfo fwd_flow
uint32_t flow_handle() const
void MakeSloList(const FlowEntry *fe, SessionSloRuleMap *vmi_session_slo_rule_map, SessionSloRuleMap *vn_session_slo_rule_map)
bool IsLess(const SessionKey &rhs) const
static uint64_t UTCTimestampUsec()
bool IsEqual(const SessionAggKey &rhs) const
SecurityLoggingObjectTable * slo_table() const
void CopyFlowInfoInternal(SessionFlowExportInfo *info, const boost::uuids::uuid &u, FlowEntry *fe) const
void UpdateSessionStatsInfo(FlowEntry *fe, uint64_t setup_time, SessionStatsInfo *session) const
SessionFlowStatsParams fwd_flow
SessionFlowExportInfo rev_flow
uint32_t RunSessionEndpointStats(uint32_t max_count)
const std::string & cfg_name() const
void UpdateThreshold(uint64_t new_value, bool check_oflow)
FlowEntry * reverse_flow_entry()
tbb::atomic< uint64_t > session_export_sampling_drops_
bool UpdateSloMatchRuleEntry(const boost::uuids::uuid &slo_uuid, const std::string &match_uuid, bool *match)
void DispatchPendingSessionMsg()
#define LOG(_Level, _Msg)
std::string remote_prefix
uint64_t threshold() const
SessionAggKey session_agg_iteration_key_
const std::string RemotePrefix() const
uint64_t GetUpdatedSessionFlowPackets(uint64_t info_packets, uint64_t k_flow_pkts) const
SessionAggMap session_agg_map_
static uint64_t ClockMonotonicUsec()
bool SessionStatsChangedUnlocked(SessionPreAggInfo::SessionMap::iterator session_map_iter, SessionStatsParams *params) const
const std::string & nw_ace_uuid() const
void AddSession(FlowEntry *fe, uint64_t setup_time)
tbb::atomic< uint32_t > session_export_count_
bool RequestHandlerEntry()
const std::string & uuid() const
static void TraceSession(const string &op, const SessionEndpointKey &ep, const SessionAggKey &agg, const SessionKey &session, bool rev_flow_params)
void FillSessionAggInfo(SessionEndpointInfo::SessionAggMap::iterator it, SessionIpPortProtocol *session_agg_key) const
uint16_t underlay_src_port
SessionTask(SessionStatsCollector *ssc)
void UpdateSloStateRules(SecurityLoggingObject *slo, SessionSloState *state)
SessionStatsCollectorObject(Agent *agent, FlowStatsManager *mgr)
uint16_t underlay_src_port
uint32_t session_export_rate() const
void FillSessionEndpoint(SessionEndpointMap::iterator it, SessionEndpoint *session_ep) const
bool ProcessSessionEndpoint(const SessionEndpointMap::iterator &it)
bool Enqueue(QueueEntryT entry)
Task is a wrapper over tbb::task to support policies.
void FillSessionRemoteTags(const TagList &list, SessionEndpoint *ep) const
void UpdateSessionSampleExportStats(uint32_t count)
void RequestHandlerExit(bool done)
static uint64_t GetCurrentTime()
void DeleteSessionSloStateRuleEntry(std::string uuid)
void EvictedSessionStatsUpdate(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void set_name(const std::string &name)
bool CheckSessionLogging(const SessionStatsInfo &stats_info)
bool IsLess(const SessionEndpointKey &rhs) const
bool IsIngressFlow() const
std::string Description() const
SessionAggKey session_agg_key()
virtual ~SessionStatsCollector()
SessionExportInfo export_info
std::set< string > label_set
void FreeIndex(uint32_t idx)
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
std::vector< int > TagList
#define FLOW_EXPORT_STATS_TRACE(...)
static std::string UTCUsecToString(uint64_t tstamp)
bool FindSloMatchRule(const SessionSloRuleMap &map, const std::string &fw_policy_uuid, const std::string &nw_policy_uuid, const std::string &sg_policy_uuid, const bool &deleted_flag, bool *match, const bool &exported_once)