7 #include <boost/uuid/uuid.hpp>
8 #include <boost/uuid/uuid_io.hpp>
16 #include <boost/functional/factory.hpp>
37 #include <vrouter/flow_stats/flow_stats_types.h>
41 uint32_t flow_cache_timeout,
49 io, kFlowStatsTimerInterval,
"Flow stats collector"),
51 task_id_(uve->agent()->task_scheduler()->GetTaskId
53 rand_gen_(boost::uuids::random_generator()),
54 flow_iteration_key_(NULL),
56 flow_tcp_syn_age_time_(FlowTcpSynAgeTime),
58 request_queue_(agent_uve_->agent()->task_scheduler()->
63 flow_aging_key_(*key), instance_id_(instance_id),
64 flow_stats_manager_(aging_module), parent_(obj), ageing_task_(NULL),
65 current_time_(GetCurrentTime()), ageing_task_starts_(0) {
66 if (flow_cache_timeout) {
107 uint64_t scan_time_millisec;
115 scan_time_millisec = (scan_time_millisec *
kFlowScanTime) / 100;
154 const vr_flow_entry *k_flow,
155 const vr_flow_stats &k_stats,
156 uint64_t curr_time) {
160 if (k_flow != NULL) {
161 uint64_t k_flow_bytes, bytes;
164 bytes = 0x0000ffffffffffffULL & info->
bytes();
167 if (bytes < k_flow_bytes) {
185 const uint32_t &data) {
186 uint64_t flow_stats = (uint64_t) oflow_data << (
sizeof(uint32_t) * 8);
192 uint64_t k_flow_bytes) {
193 uint64_t oflow_bytes = 0xffff000000000000ULL & stats->
bytes();
194 uint64_t old_bytes = 0x0000ffffffffffffULL & stats->
bytes();
195 if (old_bytes > k_flow_bytes) {
196 oflow_bytes += 0x0001000000000000ULL;
198 return (oflow_bytes |= k_flow_bytes);
202 uint64_t k_flow_pkts) {
203 uint64_t oflow_pkts = 0xffffff0000000000ULL & stats->
packets();
204 uint64_t old_pkts = 0x000000ffffffffffULL & stats->
packets();
205 if (old_pkts > k_flow_pkts) {
206 oflow_pkts += 0x0000010000000000ULL;
208 return (oflow_pkts |= k_flow_pkts);
212 uint64_t bytes, uint64_t pkts) {
250 uint32_t fip = ReverseFlowFip(flow);
253 (agent_uve_->agent()->interface_table()->FindActiveEntry(&vmi));
257 (agent_uve_->interface_uve_table());
259 return table->
FipEntry(fip, vn, intf);
284 uint64_t bytes, uint64_t pkts) {
299 if (!src_vn.length() || !dst_vn.length()) {
342 bool egress_flow_is_client;
345 egress_flow_is_client =
true;
348 egress_flow_is_client =
false;
367 ep.
client = egress_flow_is_client;
398 uint64_t bytes, uint64_t pkts) {
405 if (!src_vn.length())
407 if (!dst_vn.length())
429 uint64_t teardown_time) {
439 vr_flow_stats k_stats;
446 k_stats.flow_bytes_oflow,
447 k_stats.flow_packets,
448 k_stats.flow_packets_oflow,
449 teardown_time,
true);
469 uint32_t flow_handle,
474 (fe, flow_handle, gen_id, (gen_id + 1));
480 uint16_t oflow_bytes,
484 bool teardown_time) {
494 uint16_t oflow_bytes,
498 bool teardown_time) {
499 uint64_t k_bytes, k_packets, total_bytes, total_packets;
505 uint64_t diff_bytes = total_bytes - info->
bytes();
506 uint64_t diff_pkts = total_packets - info->
packets();
529 const vr_flow_entry *k_flow,
530 uint16_t k_flow_flags,
531 uint32_t flow_handle, uint16_t gen_id,
555 const vr_flow_entry *k_flow,
556 const vr_flow_stats &k_stats,
587 const vr_flow_entry *k_flow_rev = NULL;
588 vr_flow_stats k_rflow_stats;
593 if (
ShouldBeAged(rev_info, k_flow_rev, k_rflow_stats, curr_time)) {
601 if (deleted ==
true) {
607 if (deleted ==
false && k_flow) {
608 uint64_t k_bytes, bytes;
612 bytes = 0x0000ffffffffffffULL & info->
bytes();
615 if (bytes != k_bytes) {
618 k_stats.flow_bytes_oflow,
619 k_stats.flow_packets,
620 k_stats.flow_packets_oflow,
631 uint64_t curr_time) {
642 uint16_t gen_id = info->
gen_id();
660 const vr_flow_entry *k_flow = NULL;
661 vr_flow_stats k_stats;
670 gen_id, &k_stats, &kinfo);
673 if (
EvictFlow(ksync_obj, k_flow, kinfo.
flags, flow_handle, gen_id,
674 info, curr_time) ==
true) {
680 assert(info->is_linked());
681 FlowExportInfoList::iterator flow_it =
691 if (
AgeFlow(ksync_obj, k_flow, k_stats, kinfo, info, curr_time) ==
false)
699 assert(info->is_linked());
700 FlowExportInfoList::iterator flow_it =
707 if (rev_info->is_linked()) {
708 FlowExportInfoList::iterator rev_flow_it =
710 if (rev_flow_it == it) {
721 FlowExportInfoList::iterator it;
739 while (count < max_count) {
747 count +=
ProcessFlow(it, ksync_obj, info, curr_time);
818 boost::shared_ptr<FlowExportReq>
826 boost::shared_ptr<FlowExportReq>
835 uint32_t oflow_bytes,
838 boost::shared_ptr<FlowExportReq>
840 packets, oflow_bytes, u));
858 switch (req->event()) {
865 FlowEntryTree::iterator it;
889 req->oflow_bytes(), req->
uuid());
907 FlowEntryTree::iterator it =
flow_tree_.find(fe);
917 FlowEntryTree::const_iterator it =
flow_tree_.find(fe);
926 FlowEntryTree::iterator &it) {
976 std::pair<FlowEntryTree::iterator, bool> ret =
978 if (ret.second ==
false) {
1003 if (ret.first->second.is_linked() ==
false) {
1011 (
const FlowEntry *del_flow, FlowEntryTree::iterator &tree_it) {
1014 if (tree_it == flow_tree_.end()) {
1015 flow_iteration_key_ = NULL;
1018 if (flow_iteration_key_ == NULL) {
1024 FlowExportInfoList::iterator it =
1025 flow_export_info_list_.iterator_to(tree_it->second);
1029 if (it == flow_export_info_list_.end())
1030 it = flow_export_info_list_.begin();
1032 if (it == flow_export_info_list_.end()) {
1033 flow_iteration_key_ = NULL;
1035 flow_iteration_key_ = it->flow();
1050 if (it->second.is_linked()) {
1051 FlowExportInfoList::iterator it1 =
1062 uint32_t oflow_bytes,
1070 if (info->
uuid() != u) {
1077 packets, oflow_bytes & 0xFFFF0000,
1086 SandeshFlowKey &skey) {
1087 skey.set_nh(key.
nh);
1088 skey.set_sip(key.
src_addr.to_string());
1089 skey.set_dip(key.
dst_addr.to_string());
1096 SandeshFlowExportInfo &info) {
1097 SandeshFlowKey skey;
1102 info.set_uuid(to_string(flow->
uuid()));
1104 info.set_rev_flow_uuid(to_string(rflow->
uuid()));
1120 info.set_bytes(value.
bytes());
1121 info.set_packets(value.
packets());
1123 std::vector<ActionStr> action_str_l;
1125 info.set_action(action_str_l);
1130 string vmi_str = to_string(vmi.
uuid_) + vmi.
name_;
1131 info.set_fip_vmi(vmi_str);
1133 info.set_fip(ip.to_string());
1137 void FlowStatsRecordsReq::HandleRequest()
const {
1138 FlowStatsCollector::FlowEntryTree::iterator it;
1139 vector<FlowStatsRecord> list;
1140 FlowStatsRecordsResp *resp =
new FlowStatsRecordsResp();
1143 flow_stats_manager()->default_flow_stats_collector_obj()->
1150 SandeshFlowKey skey;
1153 SandeshFlowExportInfo info;
1156 FlowStatsRecord rec;
1158 list.push_back(rec);
1161 resp->set_records_list(list);
1163 resp->set_context(context());
1172 Task(fsc->task_id(), fsc->instance_id()), fsc_(fsc) {
1179 return "Flow Stats Collector Ageing Task";
1183 return fsc_->RunAgeingTask();
1193 for (
int i = 0; i < kMaxCollectors; i++) {
1195 boost::asio::io_context& io_ref =
1196 const_cast<boost::asio::io_context&
>
1198 collectors[i].reset(
1199 AgentStaticObjectFactory::CreateRef<FlowStatsCollector>(
1207 if (idx >= 0 && idx < kMaxCollectors) {
1208 return collectors[idx].get();
1214 for (
int i = 0; i < kMaxCollectors; i++) {
1215 collectors[i]->set_expiry_time(time);
1222 return collectors[0]->expiry_time();
1226 for (
int i = 0; i < kMaxCollectors; i++) {
1227 collectors[i]->set_deleted(
true);
1232 for (
int i = 0; i < kMaxCollectors; i++) {
1233 collectors[i]->set_deleted(
false);
1238 for (
int i = 0; i < kMaxCollectors; i++) {
1239 if (!collectors[i]->
deleted()) {
1247 for (
int i = 0; i < kMaxCollectors; i++) {
1248 collectors[i]->set_flow_age_time_intvl(value);
1255 return collectors[0]->flow_age_time_intvl();
1259 for (
int i = 0; i < kMaxCollectors; i++) {
1261 collectors[i]->request_queue_.IsQueueEmpty() ==
false) {
1269 for (
int i = 0; i < kMaxCollectors; i++) {
1270 collectors[i]->Shutdown();
1271 collectors[i].reset();
1282 return collectors[idx].get();
1286 for (
int i = 0; i < kMaxCollectors; i++) {
1287 collectors[i]->UpdateFlowAgeTimeInSecs(age_time);
1294 return collectors[0]->flow_age_time_intvl_in_secs();
1299 for (
int i = 0; i < kMaxCollectors; i++) {
1300 size += collectors[i]->Size();
static const int kMaxCollectors
uint64_t GetFlowAgeTime() const
VmInterfaceKey ReverseFlowFipVmi(const FlowExportInfo *info)
VnUveTableBase * vn_uve_table() const
uint64_t ageing_task_starts_
const TagList & local_tagset() const
void SetEntryCallback(TaskEntryCallback on_entry)
VrouterUveEntryBase * vrouter_uve_entry() const
const vr_flow_entry * GetKFlowStatsAndInfo(const FlowKey &key, uint32_t idx, uint8_t gen_id, vr_flow_stats *stats, KFlowData *info) const
static uint64_t GetFlowStats(const uint16_t &oflow_data, const uint32_t &data)
void set_packets(uint64_t value)
static Agent * GetInstance()
FlowStatsCollector * FlowToCollector(const FlowEntry *flow)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void set_teardown_time(uint64_t time)
void Shutdown(bool delete_entries=true)
void Free(const FlowAgingTableKey &key)
const std::string fw_policy_name_uuid() const
std::string dest_vn_match
uint64_t evict_enqueue_time() const
void set_last_modified_time(uint64_t time)
void EvictedFlowStatsUpdate(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
FlowTable * flow_table() const
void UpdateStatsEvent(const FlowEntryPtr &flow, uint32_t bytes, uint32_t packets, uint32_t oflow_bytes, const boost::uuids::uuid &u)
void DeleteEvent(const FlowEntryPtr &flow, const RevFlowDepParams ¶ms)
void UpdateFlowStatsInternal(FlowExportInfo *info, uint32_t bytes, uint16_t oflow_bytes, uint32_t pkts, uint16_t oflow_pkts, uint64_t time, bool teardown_time)
std::string source_vn_match
AgentUveBase * uve() const
AgeingTask * ageing_task_
uint32_t ProcessFlow(FlowExportInfoList::iterator &it, KSyncFlowMemory *ksync_obj, FlowExportInfo *info, uint64_t curr_time)
void AddEvent(const FlowEntryPtr &flow)
static void KeyToSandeshFlowKey(const FlowKey &key, SandeshFlowKey &skey)
FlowAgingTableKey flow_aging_key_
static const uint32_t kFlowsPerTask
InterfaceUveTable::FloatingIp * FipEntry(uint32_t fip, const string &vn, Interface *intf)
bool AgeFlow(KSyncFlowMemory *ksync_obj, const vr_flow_entry *k_flow, const vr_flow_stats &k_stats, const KFlowData &kinfo, FlowExportInfo *info, uint64_t curr_time)
AgeingTask(FlowStatsCollector *fsc)
boost::asio::io_context * io_service()
void UpdateBitmap(uint8_t proto, uint16_t sport, uint16_t dport)
void UpdateFlowStats(FlowExportInfo *info, uint64_t teardown_time)
uint32_t RunAgeing(uint32_t max_count)
void CopyFlowInfo(FlowEntry *fe)
#define FLOW_LOCK(flow, rflow, flow_event)
boost::uuids::random_generator rand_gen_
uint64_t flow_age_time_intvl()
uint16_t table_index() const
bool ShouldBeAged(FlowExportInfo *info, const vr_flow_entry *k_flow, const vr_flow_stats &k_stats, uint64_t curr_time)
bool RequestHandlerEntry()
void UpdateFlowStatsInternalLocked(FlowExportInfo *info, uint32_t bytes, uint16_t oflow_bytes, uint32_t pkts, uint16_t oflow_pkts, uint64_t time, bool teardown_time)
void SetActionStr(const FlowAction &action_info, std::vector< ActionStr > &action_str_l)
void UpdateInterVnStats(FlowExportInfo *info, uint64_t bytes, uint64_t pkts)
const FlowEntry * flow_iteration_key_
FlowStatsCollector * GetCollector(uint8_t idx) const
FlowStatsCollectorObject(Agent *agent, FlowStatsCollectorReq *req, FlowStatsManager *mgr)
bool EvictFlow(KSyncFlowMemory *ksync_obj, const vr_flow_entry *k_flow, uint16_t k_flow_flags, uint32_t flow_handle, uint16_t gen_id, FlowExportInfo *info, uint64_t curr_time)
uint64_t last_modified_time() const
uint32_t instance_id() const
uint64_t GetUpdatedFlowBytes(const FlowExportInfo *stats, uint64_t k_flow_bytes)
void UpdateFloatingIpStats(const FlowExportInfo *flow, uint64_t bytes, uint64_t pkts)
const std::string & sg_rule_uuid() const
void NewFlow(FlowEntry *flow)
FlowStatsCollector(boost::asio::io_context &io, int intvl, uint32_t flow_cache_timeout, AgentUveBase *uve, uint32_t instance_id, FlowAgingTableKey *key, FlowStatsManager *aging_module, FlowStatsCollectorObject *obj)
const std::string & peer_vrouter() const
void set_evict_enqueue_time(uint64_t value)
TunnelType tunnel_type() const
static void FlowExportInfoToSandesh(const FlowExportInfo &value, SandeshFlowExportInfo &info)
const FlowKey & key() const
uint64_t flow_age_time_intvl_
VmUveTableBase * vm_uve_table() const
static const std::string UnknownVn()
void UpdateAgeTimeInSeconds(uint32_t age_time)
bool is_flags_set(const FlowEntryFlags &flags) const
std::string Description() const
TaskScheduler * task_scheduler() const
void SetExitCallback(TaskExitCallback on_exit)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
const vr_flow_entry * GetKFlowStats(const FlowKey &key, uint32_t idx, uint8_t gen_id, vr_flow_stats *stats) const
const boost::uuids::uuid & uuid() const
const boost::uuids::uuid & uuid() const
virtual ~FlowStatsCollector()
VmInterfaceKey fip_vmi() const
EventManager * event_manager() const
std::string ToString() const
void UpdateInterVnStats(const std::string &src, const std::string &dst, uint64_t bytes, uint64_t pkts, bool outgoing)
void UpdateBitmap(const VmEntry *vm, uint8_t proto, uint16_t sport, uint16_t dport)
void set_measure_busy_time(bool val) const
std::string remote_prefix
int GetExpiryTime() const
void UpdateEntriesToVisit()
std::string origin_vn_src
uint64_t delete_enqueue_time() const
void EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle, uint8_t gen_id, uint8_t evict_gen_id)
static void GetFlowSandeshActionParams(const FlowAction &action_info, std::string &action_str)
bool RequestHandler(boost::shared_ptr< FlowExportReq > req)
static const uint32_t kFlowStatsTimerInterval
boost::asio::ip::address_v4 Ip4Address
#define kTaskFlowStatsCollector
FlowProto * get_flow_proto() const
void SetFlowAgeTime(uint64_t value)
const Interface * intf_entry() const
std::string origin_vn_dst
const TagList & remote_tagset() const
boost::uuids::uuid rand_gen()
InterfaceUveTable::FloatingIp * ReverseFlowFipEntry(const FlowExportInfo *flow)
InterfaceUveTable * interface_uve_table() const
void UpdateFloatingIpStats(const FipInfo &fip_info)
void UpdateVmiTagBasedStats(const EndpointStatsInfo &info)
uint64_t flow_cache_timeout
uint32_t flow_handle() const
FlowExportInfoList flow_export_info_list_
void DeleteFlow(FlowEntryTree::iterator &it)
static const uint32_t kFlowScanTime
uint32_t flow_handle() const
static uint64_t UTCTimestampUsec()
FlowStatsCollectorObject * parent_
void DeleteFlowRequest(FlowEntry *flow)
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
AgentUveBase * agent_uve_
void UpdateVmiTagBasedStats(FlowExportInfo *info, uint64_t bytes, uint64_t pkts)
uint64_t flow_stats_interval
bool IsEvictionMarked(const vr_flow_entry *entry, uint16_t flags) const
static const uint32_t kInvalidFlowHandle
void AddFlow(FlowExportInfo info)
static const uint32_t kMinFlowsPerTimer
#define LOG(_Level, _Msg)
void set_bytes(uint64_t value)
bool delete_short_flow() const
const std::string RemotePrefix() const
uint64_t teardown_time() const
static uint64_t ClockMonotonicUsec()
void UpdateFlowIterationKey(const FlowEntry *del_flow, FlowEntryTree::iterator &tree_it)
void FlowDeleteEnqueue(FlowExportInfo *info, uint64_t t)
const std::string & nw_ace_uuid() const
static const uint64_t kFlowDeleteRetryTime
tbb::atomic< bool > deleted_
void RequestHandlerExit(bool done)
static const uint64_t FlowAgeTime
uint32_t timers_per_scan_
void SetExpiryTime(int time)
bool Enqueue(QueueEntryT entry)
Task is a wrapper over tbb::task to support policies.
uint32_t ReverseFlowFip(const FlowExportInfo *info)
static uint64_t GetCurrentTime()
uint32_t entries_to_visit_
uint64_t GetUpdatedFlowPackets(const FlowExportInfo *stats, uint64_t k_flow_pkts)
bool FindFlowExportInfo(const FlowEntry *fe, FlowEntryTree::iterator &it)
void set_name(const std::string &name)
const VmEntry * vm() const
FlowStatsManager * flow_stats_manager_
uint32_t GetAgeTimeInSeconds() const
void FreeIndex(uint32_t idx)
FlowEntry * reverse_flow() const
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
void UpdateBitmap(const std::string &vn, uint8_t proto, uint16_t sport, uint16_t dport)
void set_delete_enqueue_time(uint64_t value)
void FlowEvictEnqueue(FlowExportInfo *info, uint64_t t, uint32_t flow_handle, uint16_t gen_id)
static std::string UTCUsecToString(uint64_t tstamp)