5 #include <boost/functional/hash.hpp>
24 add_tokens_(
"Add Tokens", this, agent->flow_add_tokens()),
25 ksync_tokens_(
"KSync` Tokens", this, agent->flow_ksync_tokens()),
26 del_tokens_(
"Delete Tokens", this, agent->flow_del_tokens()),
27 update_tokens_(
"Update Tokens", this, agent->flow_update_tokens()),
28 flow_update_queue_(agent, this, &update_tokens_,
29 agent->params()->flow_task_latency_limit(), 16),
30 use_vrouter_hash_(false), ipv4_trace_filter_(), ipv6_trace_filter_(),
32 port_table_manager_(agent, agent->params()->fabric_snat_hash_table_size()),
34 (*(agent->event_manager())->io_service(),
"FlowStatsUpdateTimer",
41 for (uint8_t i = 0; i < table_count; i++) {
45 for (uint32_t i = 0; i < table_count; i++) {
67 if (::getenv(
"USE_VROUTER_HASH") != NULL) {
68 string opt = ::getenv(
"USE_VROUTER_HASH");
69 if (opt ==
"" || strcasecmp(opt.c_str(),
"false"))
125 boost::hash_combine(hash, val);
135 }
else if (ip.is_v4()) {
162 uint8_t proto, uint16_t sport,
163 uint16_t dport, uint32_t flow_handle)
const {
168 std::size_t hash = 0;
189 boost::asio::io_context &io) {
191 info->ip_proto, info->sport, info->dport,
192 info->agent_hdr.cmd_param);
204 "Flow : Non-IP packet. Dropping",
false);
214 uint32_t flow_handle)
const {
274 uint32_t *out_count) {
280 std::vector<FlowMgmtManager *>::const_iterator it =
283 (*it)->VnFlowCounters(vn, in_count, out_count);
309 switch (event->
event()) {
311 PktInfo *info =
event->pkt_info().get();
343 event->flow_handle());
387 FlowTable *table =
event->flow()->flow_table();
405 switch (req->
event()) {
459 new_flow->flow_table()->Add(new_flow.get(), NULL);
517 switch (req->
event()) {
546 switch (req->
event()) {
579 switch (req->
event()) {
612 uint8_t gen_id, uint8_t evict_gen_id) {
614 flow_handle, gen_id, evict_gen_id);
635 uint32_t flow_handle, uint8_t gen_id,
636 int ksync_error, uint64_t evict_flow_bytes,
637 uint64_t evict_flow_packets,
638 int32_t evict_flow_oflow,
639 uint32_t transaction_id) {
641 gen_id, ksync_error, evict_flow_bytes,
642 evict_flow_packets, evict_flow_oflow,
671 msg, NULL, table_index));
690 bool trace = flow->
trace();
692 trace |= rflow->
trace();
694 if (trace ==
false) {
702 trace = filter->
Match(&flow->
key());
703 if (rflow && trace ==
false) {
704 trace = filter->
Match(&rflow->
key());
755 if (pool_base == NULL)
786 switch (req->
event()) {
927 uint32_t *active_flows)
const {
933 std::vector<FlowMgmtManager *>::const_iterator it =
936 (*it)->InterfaceFlowCount(intf, created, aged, active_flows);
static const int kMinTableCount
void DisableFlowKSyncQueue(uint32_t index, bool disabled)
uint32_t task_starts() const
WorkQueueStats flow_update_queue_
void DisableFlowUpdateQueue(bool disabled)
std::vector< WorkQueueStats > flow_delete_queue_
bool ConcurrencyCheck(int task_id, bool check_task_instance)
bool RunProtoHandler(ProtoHandler *handler)
static void UpdateStats(FlowEvent *event, FlowStats *stats)
void STLDeleteValues(Container *container)
FlowTraceFilter ipv6_trace_filter_
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void KSyncEventRequest(KSyncEntry *ksync_entry, KSyncEntry::KSyncEvent event, uint32_t flow_handle, uint8_t gen_id, int ksync_error, uint64_t evict_flow_bytes, uint64_t evict_flow_packets, int32_t evict_flow_oflow, uint32_t transcation_id)
std::vector< FlowMgmtManager * >::const_iterator flow_mgmt_manager_iterator_end() const
FlowEntry * Find(const FlowKey &key, uint32_t table_index) const
void RegisterPktFlowStatsCb(ProfileCb cb)
size_t max_queue_len() const
FlowTable * flow_table() const
boost::asio::io_context & io_
size_t NumDequeues() const
UpdateFlowEventQueue flow_update_queue_
boost::asio::ip::address IpAddress
const FlowKey & get_flow_key() const
std::vector< DeleteFlowEventQueue * > flow_delete_queue_
AgentStats * stats() const
static void SetFlowEventQueueStats(Agent *agent, const FlowEventQueueBase::Queue *queue, ProfileData::WorkQueueStats *stats)
WorkQueueStats pkt_handler_queue_
uint64_t flow_created() const
#define FLOW_LOCK(flow, rflow, flow_event)
uint32_t flow_handle() const
uint16_t table_index() const
uint64_t vrouter_responses_
std::vector< WorkQueueStats > flow_ksync_queue_
std::vector< FlowEventQueue * > flow_event_queue_
FlowTableKSyncEntry * ksync_entry()
uint64_t max_queue_count_
static FlowEntry * Allocate(const FlowKey &key, FlowTable *flow_table)
tbb::atomic< int > linklocal_flow_count_
int flow_ksync_task_id() const
uint16_t flow_task_latency_limit() const
bool ProcessFlowEvent(const FlowEvent *req, FlowEntry *flow, FlowEntry *rflow)
FlowTokenPool ksync_tokens_
bool FlowUpdateHandler(FlowEvent *req)
bool FlowEventHandler(FlowEvent *req, FlowTable *table)
FlowTable * GetFlowTable(const FlowKey &key, uint32_t flow_handle) const
uint64_t flow_aged() const
const FlowKey & key() const
FlowTable * GetTable(uint16_t index) const
uint32_t table_index() const
FlowEntry * Find(const FlowKey &key)
static const int kMaxIterations
bool is_flags_set(const FlowEntryFlags &flags) const
FlowHandler * AllocProtoHandler(PktInfoPtr info, boost::asio::io_context &io)
std::vector< FlowTable * > flow_table_list_
static const int kMaxTableCount
uint64_t update_restarts_
void InitAuditFlow(uint32_t flow_idx, uint8_t gen_id)
WorkQueueStats flow_mgmt_queue_
PktInfoPtr pkt_info() const
uint16_t FlowTableIndex(const IpAddress &sip, const IpAddress &dip, uint8_t proto, uint16_t sport, uint16_t dport, uint32_t flow_handle) const
void MessageRequest(FlowEntry *flow)
void UpdateFlowMinMaxStats(uint64_t total_flows, FlowCounters &stat) const
void FreeBuffer(PktInfo *msg)
#define CHECK_CONCURRENCY(...)
uint64_t vrouter_responses_
void Init(bool enable, Address::Family family)
void Add(FlowEntry *flow, FlowEntry *rflow)
FlowTokenPool update_tokens_
void EnqueueUnResolvedFlowEntry(FlowEntry *flow)
void RegisterFlowCountFn(FlowCountFn cb)
void set_measure_busy_time(bool val) const
PktHandler * pkt_handler() const
bool Enqueue(PktInfoPtr msg)
#define FLOW_TRACE(obj,...)
std::vector< FlowEventQueue * > flow_tokenless_queue_
static void SetFlowMgmtQueueStats(Agent *agent, const FlowMgmtManager::FlowMgmtQueue *queue, ProfileData::WorkQueueStats *stats)
void MakeShortFlow(FlowShortReason reason)
std::vector< WorkQueueStats > flow_event_queue_
void Ip6AddressToU64Array(const Ip6Address &addr, uint64_t *arr, int size)
void EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle, uint8_t gen_id, uint8_t evict_gen_id)
AgentParam * params() const
uint64_t restarts() const
#define kTaskFlowStatsUpdate
const DBEntry * db_entry() const
uint64_t failures() const
virtual void TokenAvailable(TokenPool *pool_base)
FlowTokenStats token_stats_
void DisableFlowEventQueue(uint32_t index, bool disabled)
std::string Description() const
FlowTraceFilter ipv4_trace_filter_
void SetProfileData(ProfileData *data)
bool FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table)
std::vector< KSyncFlowEventQueue * > flow_ksync_queue_
bool FreeDBState(const DBEntry *entry, uint32_t gen_id)
std::vector< FlowMgmtManager * >::const_iterator flow_mgmt_manager_iterator_begin() const
FlowTokenPool del_tokens_
uint64_t revaluate_count_
void set_disable(bool val)
static std::size_t HashCombine(std::size_t hash, uint64_t val)
bool Match(const FlowKey *key)
uint32_t FlowCount() const
static void SetPktHandlerQueueStats(Agent *agent, const PktHandler::PktHandlerQueue *queue, ProfileData::WorkQueueStats *stats)
uint32_t flow_handle() const
uint64_t update_failures_
void Enqueue(FlowEvent *event)
bool FlowDeleteHandler(FlowEvent *req, FlowTable *table)
void DeleteFlowRequest(FlowEntry *flow)
bool Validate(PktInfo *msg)
bool TokenCheck(const FlowTokenPool *pool) const
bool flow_trace_enable() const
uint64_t recompute_count_
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Timer * stats_update_timer_
FlowEntry * reverse_flow_entry()
KSyncEntry * ksync_entry() const
void EnqueueFlowEvent(FlowEvent *event)
void ForceEnqueueFreeFlowReference(FlowEntryPtr &flow)
int flow_delete_task_id() const
void CreateAuditEntry(const FlowKey &key, uint32_t flow_handle, uint8_t gen_id)
FlowEntryPtr flow_entry() const
const PktHandlerQueue * work_queue() const
bool FlowStatsUpdate() const
uint64_t busy_time() const
size_t NumEnqueues() const
bool EnqueueReentrant(boost::shared_ptr< PktInfo > msg, uint8_t table_index)
FlowMgmtManagerList flow_mgmt_manager_list() const
boost::shared_ptr< Token > TokenPtr
uint64_t recompute_count_
static std::size_t HashIp(std::size_t hash, const IpAddress &ip)
bool ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow)
FlowMgmtManager * flow_mgmt_manager(uint16_t index) const
std::vector< WorkQueueStats > flow_tokenless_queue_
FlowProto(Agent *agent, boost::asio::io_context &io)
void DisableFlowDeleteQueue(uint32_t index, bool disabled)
FlowMgmtDbClient * flow_mgmt_dbclient() const
uint16_t flow_thread_count() const
int ksync_response_error() const
bool ProcessProto(boost::shared_ptr< PktInfo > msg_info)
bool UpdateFlow(FlowEntry *flow)
void SetFlowProto(FlowProto *proto)
size_t FlowUpdateQueueLength()
AgentProfile * agent_profile() const
void InterfaceFlowCount(const Interface *intf, uint64_t *created, uint64_t *aged, uint32_t *active_flows) const
TokenPtr GetToken(FlowEntry *entry)
FlowTokenPool add_tokens_
void VnFlowCounters(const VnEntry *vn, uint32_t *in_count, uint32_t *out_count)
int flow_stats_update_timeout() const
boost::shared_ptr< PktInfo > PktInfoPtr
bool AddFlow(FlowEntry *flow)
static bool DeleteTimer(Timer *Timer)
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
void GrowFreeListRequest(FlowTable *table)
void Update(FlowEntry *flow, FlowEntry *rflow)
TokenPtr GetToken(FlowEvent::Event event)