5 #include <boost/functional/hash.hpp>
24 add_tokens_(
"Add Tokens", this, agent->flow_add_tokens()),
25 ksync_tokens_(
"KSync` Tokens", this, agent->flow_ksync_tokens()),
26 del_tokens_(
"Delete Tokens", this, agent->flow_del_tokens()),
27 update_tokens_(
"Update Tokens", this, agent->flow_update_tokens()),
28 flow_update_queue_(agent, this, &update_tokens_,
29 agent->params()->flow_task_latency_limit(), 16),
30 use_vrouter_hash_(false), ipv4_trace_filter_(), ipv6_trace_filter_(),
32 port_table_manager_(agent, agent->params()->fabric_snat_hash_table_size()),
34 (*(agent->event_manager())->io_service(),
"FlowStatsUpdateTimer",
41 for (uint8_t i = 0; i < table_count; i++) {
45 for (uint32_t i = 0; i < table_count; i++) {
67 if (::getenv(
"USE_VROUTER_HASH") != NULL) {
68 string opt = ::getenv(
"USE_VROUTER_HASH");
69 if (opt ==
"" || strcasecmp(opt.c_str(),
"false"))
125 boost::hash_combine(hash, val);
135 }
else if (ip.is_v4()) {
162 uint8_t proto, uint16_t sport,
163 uint16_t dport, uint32_t flow_handle)
const {
168 std::size_t hash = 0;
189 boost::asio::io_context &
io) {
191 info->ip_proto, info->sport, info->dport,
192 info->agent_hdr.cmd_param);
204 "Flow : Non-IP packet. Dropping",
false);
214 uint32_t flow_handle)
const {
274 uint32_t *out_count) {
280 std::vector<FlowMgmtManager *>::const_iterator it =
283 (*it)->VnFlowCounters(vn, in_count, out_count);
309 switch (event->
event()) {
311 PktInfo *info =
event->pkt_info().get();
387 FlowTable *table =
event->flow()->flow_table();
405 switch (req->
event()) {
459 new_flow->flow_table()->Add(new_flow.get(), NULL);
517 switch (req->
event()) {
546 switch (req->
event()) {
579 switch (req->
event()) {
612 uint8_t gen_id, uint8_t evict_gen_id) {
614 flow_handle, gen_id, evict_gen_id);
635 uint32_t flow_handle, uint8_t gen_id,
636 int ksync_error, uint64_t evict_flow_bytes,
637 uint64_t evict_flow_packets,
638 int32_t evict_flow_oflow,
639 uint32_t transaction_id) {
641 gen_id, ksync_error, evict_flow_bytes,
642 evict_flow_packets, evict_flow_oflow,
671 msg, NULL, table_index));
690 bool trace = flow->
trace();
692 trace |= rflow->
trace();
694 if (trace ==
false) {
702 trace = filter->
Match(&flow->
key());
703 if (rflow && trace ==
false) {
704 trace = filter->
Match(&rflow->
key());
755 if (pool_base == NULL)
786 switch (req->
event()) {
927 uint32_t *active_flows)
const {
933 std::vector<FlowMgmtManager *>::const_iterator it =
936 (*it)->InterfaceFlowCount(intf, created, aged, active_flows);
boost::asio::ip::address IpAddress
void Ip6AddressToU64Array(const Ip6Address &addr, uint64_t *arr, int size)
#define kTaskFlowStatsUpdate
uint16_t flow_task_latency_limit() const
void RegisterPktFlowStatsCb(ProfileCb cb)
int flow_stats_update_timeout() const
uint64_t flow_created() const
uint64_t flow_aged() const
void RegisterFlowCountFn(FlowCountFn cb)
void UpdateFlowMinMaxStats(uint64_t total_flows, FlowCounters &stat) const
bool flow_trace_enable() const
void SetFlowProto(FlowProto *proto)
AgentParam * params() const
uint16_t flow_thread_count() const
AgentStats * stats() const
FlowEntry * reverse_flow_entry()
FlowTableKSyncEntry * ksync_entry()
const FlowKey & key() const
bool is_flags_set(const FlowEntryFlags &flags) const
uint32_t flow_handle() const
static FlowEntry * Allocate(const FlowKey &key, FlowTable *flow_table)
FlowTable * flow_table() const
void MakeShortFlow(FlowShortReason reason)
KSyncEntry * ksync_entry() const
void set_disable(bool val)
void Enqueue(FlowEvent *event)
const DBEntry * db_entry() const
PktInfoPtr pkt_info() const
uint32_t flow_handle() const
const FlowKey & get_flow_key() const
uint32_t table_index() const
bool FreeDBState(const DBEntry *entry, uint32_t gen_id)
FlowMgmtDbClient * flow_mgmt_dbclient() const
TokenPtr GetToken(FlowEvent::Event event)
void EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle, uint8_t gen_id, uint8_t evict_gen_id)
FlowTokenPool del_tokens_
bool FlowStatsUpdate() const
std::vector< DeleteFlowEventQueue * > flow_delete_queue_
void DeleteFlowRequest(FlowEntry *flow)
bool FlowDeleteHandler(FlowEvent *req, FlowTable *table)
void GrowFreeListRequest(FlowTable *table)
void CreateAuditEntry(const FlowKey &key, uint32_t flow_handle, uint8_t gen_id)
void ForceEnqueueFreeFlowReference(FlowEntryPtr &flow)
bool FlowUpdateHandler(FlowEvent *req)
void DisableFlowDeleteQueue(uint32_t index, bool disabled)
FlowTable * GetTable(uint16_t index) const
Timer * stats_update_timer_
void DisableFlowUpdateQueue(bool disabled)
FlowTraceFilter ipv4_trace_filter_
std::vector< FlowTable * > flow_table_list_
void InterfaceFlowCount(const Interface *intf, uint64_t *created, uint64_t *aged, uint32_t *active_flows) const
uint32_t FlowCount() const
void VnFlowCounters(const VnEntry *vn, uint32_t *in_count, uint32_t *out_count)
FlowHandler * AllocProtoHandler(PktInfoPtr info, boost::asio::io_context &io)
tbb::atomic< int > linklocal_flow_count_
virtual void TokenAvailable(TokenPool *pool_base)
void EnqueueUnResolvedFlowEntry(FlowEntry *flow)
bool ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow)
FlowTable * GetFlowTable(const FlowKey &key, uint32_t flow_handle) const
FlowProto(Agent *agent, boost::asio::io_context &io)
void EnqueueFlowEvent(FlowEvent *event)
static const int kMaxTableCount
void DisableFlowKSyncQueue(uint32_t index, bool disabled)
FlowTokenPool add_tokens_
std::vector< FlowEventQueue * > flow_tokenless_queue_
FlowTokenPool update_tokens_
FlowTraceFilter ipv6_trace_filter_
static const int kMinTableCount
FlowTokenPool ksync_tokens_
void MessageRequest(FlowEntry *flow)
bool FlowEventHandler(FlowEvent *req, FlowTable *table)
FlowEntry * Find(const FlowKey &key, uint32_t table_index) const
bool TokenCheck(const FlowTokenPool *pool) const
void KSyncEventRequest(KSyncEntry *ksync_entry, KSyncEntry::KSyncEvent event, uint32_t flow_handle, uint8_t gen_id, int ksync_error, uint64_t evict_flow_bytes, uint64_t evict_flow_packets, int32_t evict_flow_oflow, uint32_t transcation_id)
void DisableFlowEventQueue(uint32_t index, bool disabled)
bool UpdateFlow(FlowEntry *flow)
UpdateFlowEventQueue flow_update_queue_
void SetProfileData(ProfileData *data)
std::vector< KSyncFlowEventQueue * > flow_ksync_queue_
bool EnqueueReentrant(boost::shared_ptr< PktInfo > msg, uint8_t table_index)
bool FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table)
uint16_t FlowTableIndex(const IpAddress &sip, const IpAddress &dip, uint8_t proto, uint16_t sport, uint16_t dport, uint32_t flow_handle) const
size_t FlowUpdateQueueLength()
bool Enqueue(PktInfoPtr msg)
std::vector< FlowEventQueue * > flow_event_queue_
bool Validate(PktInfo *msg)
bool AddFlow(FlowEntry *flow)
int ksync_response_error() const
FlowEntryPtr flow_entry() const
int flow_ksync_task_id() const
FlowEntry * Find(const FlowKey &key)
int flow_delete_task_id() const
void Add(FlowEntry *flow, FlowEntry *rflow)
bool ProcessFlowEvent(const FlowEvent *req, FlowEntry *flow, FlowEntry *rflow)
uint16_t table_index() const
void Update(FlowEntry *flow, FlowEntry *rflow)
bool ConcurrencyCheck(int task_id, bool check_task_instance)
TokenPtr GetToken(FlowEntry *entry)
AgentProfile * agent_profile() const
const PktHandlerQueue * work_queue() const
FlowMgmtManagerList flow_mgmt_manager_list() const
FlowMgmtManager * flow_mgmt_manager(uint16_t index) const
PktHandler * pkt_handler() const
std::vector< FlowMgmtManager * >::const_iterator flow_mgmt_manager_iterator_begin() const
std::vector< FlowMgmtManager * >::const_iterator flow_mgmt_manager_iterator_end() const
bool ProcessProto(boost::shared_ptr< PktInfo > msg_info)
bool RunProtoHandler(ProtoHandler *handler)
void FreeBuffer(PktInfo *msg)
boost::asio::io_context & io_
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
static bool DeleteTimer(Timer *Timer)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
uint64_t failures() const
uint64_t restarts() const
static const int kMaxIterations
size_t max_queue_len() const
size_t NumDequeues() const
size_t NumEnqueues() const
void set_measure_busy_time(bool val) const
uint64_t busy_time() const
uint32_t task_starts() const
std::string Description() const
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
#define FLOW_TRACE(obj,...)
static void SetFlowMgmtQueueStats(Agent *agent, const FlowMgmtManager::FlowMgmtQueue *queue, ProfileData::WorkQueueStats *stats)
static void SetPktHandlerQueueStats(Agent *agent, const PktHandler::PktHandlerQueue *queue, ProfileData::WorkQueueStats *stats)
static std::size_t HashCombine(std::size_t hash, uint64_t val)
static void UpdateStats(FlowEvent *event, FlowStats *stats)
static std::size_t HashIp(std::size_t hash, const IpAddress &ip)
static void SetFlowEventQueueStats(Agent *agent, const FlowEventQueueBase::Queue *queue, ProfileData::WorkQueueStats *stats)
#define FLOW_LOCK(flow, rflow, flow_event)
boost::shared_ptr< Token > TokenPtr
boost::shared_ptr< PktInfo > PktInfoPtr
uint64_t revaluate_count_
uint64_t vrouter_responses_
uint64_t recompute_count_
bool Match(const FlowKey *key)
void Init(bool enable, Address::Family family)
FlowTokenStats token_stats_
std::vector< WorkQueueStats > flow_delete_queue_
uint64_t recompute_count_
std::vector< WorkQueueStats > flow_tokenless_queue_
uint64_t vrouter_responses_
std::vector< WorkQueueStats > flow_event_queue_
WorkQueueStats flow_mgmt_queue_
WorkQueueStats pkt_handler_queue_
std::vector< WorkQueueStats > flow_ksync_queue_
WorkQueueStats flow_update_queue_
uint64_t update_restarts_
uint64_t update_failures_
uint64_t max_queue_count_
#define CHECK_CONCURRENCY(...)
void STLDeleteValues(Container *container)