5 #include <boost/functional/hash.hpp>
21 const std::string &name,
22 uint32_t task_id,
int task_instance,
24 uint16_t latency_limit,
25 uint32_t max_iterations) :
26 flow_proto_(proto), token_pool_(pool), task_start_(0), count_(0),
27 events_processed_(0), latency_limit_(latency_limit) {
32 sprintf(buff,
"%s-%d", name.c_str(), task_instance);
69 getrusage(RUSAGE_THREAD, &
rusage_);
80 getrusage(RUSAGE_THREAD, &r);
82 uint32_t user = (r.ru_utime.tv_sec -
rusage_.ru_utime.tv_sec) * 1000;
83 user += ((r.ru_utime.tv_usec -
rusage_.ru_utime.tv_usec) / 1000);
85 uint32_t sys = (r.ru_stime.tv_sec -
rusage_.ru_stime.tv_sec) * 1000;
86 sys += ((r.ru_stime.tv_usec -
rusage_.ru_stime.tv_usec) / 1000);
91 <<
" User " << user <<
" Sys " << sys);
97 std::unique_ptr<FlowEvent> event_ptr(event);
113 switch (event->
event()) {
117 tbb::mutex::scoped_lock mutext(flow->
mutex());
129 tbb::mutex::scoped_lock mutext(flow->
mutex());
135 tbb::mutex::scoped_lock mutext(flow->
mutex());
150 switch (event->
event()) {
154 tbb::mutex::scoped_lock mutext(flow->
mutex());
161 tbb::mutex::scoped_lock mutext(flow->
mutex());
168 tbb::mutex::scoped_lock mutext(flow->
mutex());
176 tbb::mutex::scoped_lock mutext(flow->
mutex());
191 if (flow && update_rev_flow)
194 switch (event->
event()) {
214 tbb::mutex::scoped_lock mutext(flow->
mutex());
236 uint16_t latency_limit,
237 uint32_t max_iterations) :
240 table->table_index(), pool, latency_limit,
255 uint16_t latency_limit,
256 uint32_t max_iterations) :
259 table->table_index(), pool, latency_limit,
274 uint16_t latency_limit,
275 uint32_t max_iterations) :
278 table->table_index(), pool, latency_limit,
292 uint16_t latency_limit,
293 uint32_t max_iterations) :
296 pool, latency_limit, max_iterations) {
UpdateFlowEventQueue(Agent *agent, FlowProto *proto, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
void SetEntryCallback(TaskEntryCallback on_entry)
KSyncFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
void ResetRecomputeDBEntry()
DeleteFlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
void Shutdown(bool delete_entries=true)
virtual bool Handler(FlowEvent *event)
bool HandleEvent(FlowEvent *event)
FlowPendingAction * GetPendingAction()
virtual bool HandleEvent(FlowEvent *event)=0
bool HandleEvent(FlowEvent *event)
#define FLOW_LOCK(flow, rflow, flow_event)
bool CanRecomputeDBEntry()
bool HandleEvent(FlowEvent *event)
bool FlowUpdateHandler(FlowEvent *req)
bool FlowEventHandler(FlowEvent *req, FlowTable *table)
bool CanProcess(FlowEvent *event)
void ProcessDone(FlowEvent *event, bool update_rev_flow)
virtual ~DeleteFlowEventQueue()
void SetExitCallback(TaskExitCallback on_exit)
static const int kMaxSize
FlowEventQueue(Agent *agent, FlowProto *proto, FlowTable *table, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
virtual ~FlowEventQueue()
FlowEventQueueBase(FlowProto *proto, const std::string &name, uint32_t task_id, int task_instance, FlowTokenPool *pool, uint16_t latency_limit, uint32_t max_iterations)
bool CanEnqueue(FlowEvent *event)
uint64_t events_processed_
void set_measure_busy_time(bool val) const
std::string Description() const
bool FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table)
bool HandleEvent(FlowEvent *event)
void Enqueue(FlowEvent *event)
bool FlowDeleteHandler(FlowEvent *req, FlowTable *table)
bool TokenCheck(const FlowTokenPool *pool) const
FlowEntry * reverse_flow_entry()
virtual ~FlowEventQueueBase()
#define LOG(_Level, _Msg)
bool SetRecomputeDBEntry()
static uint64_t ClockMonotonicUsec()
virtual ~KSyncFlowEventQueue()
void SetStartRunnerFunc(StartRunnerFunc start_runner_fn)
FlowTokenPool * token_pool_
WorkQueue< FlowEvent * > Queue
bool Enqueue(QueueEntryT entry)
void set_name(const std::string &name)
virtual ~UpdateFlowEventQueue()