OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
flow_proto.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 #include <base/address_util.h>
5 #include <boost/functional/hash.hpp>
6 #include <init/agent_param.h>
7 #include <cmn/agent_stats.h>
8 #include <oper/agent_profile.h>
13 #include "flow_proto.h"
15 #include "flow_mgmt.h"
16 #include "flow_event.h"
17 #include <strings.h>
18 #include "flow_entry.h"
19 
20 static void UpdateStats(FlowEvent *event, FlowStats *stats);
21 
22 FlowProto::FlowProto(Agent *agent, boost::asio::io_context &io) :
23  Proto(agent, kTaskFlowEvent, PktHandler::FLOW, io),
24  add_tokens_("Add Tokens", this, agent->flow_add_tokens()),
25  ksync_tokens_("KSync` Tokens", this, agent->flow_ksync_tokens()),
26  del_tokens_("Delete Tokens", this, agent->flow_del_tokens()),
27  update_tokens_("Update Tokens", this, agent->flow_update_tokens()),
28  flow_update_queue_(agent, this, &update_tokens_,
29  agent->params()->flow_task_latency_limit(), 16),
30  use_vrouter_hash_(false), ipv4_trace_filter_(), ipv6_trace_filter_(),
31  stats_(),
32  port_table_manager_(agent, agent->params()->fabric_snat_hash_table_size()),
33  stats_update_timer_(TimerManager::CreateTimer
34  (*(agent->event_manager())->io_service(), "FlowStatsUpdateTimer",
35  TaskScheduler::GetInstance()->GetTaskId(kTaskFlowStatsUpdate), 0)) {
37  agent->SetFlowProto(this);
38  set_trace(false);
39  uint16_t table_count = agent->flow_thread_count();
40  assert(table_count >= kMinTableCount && table_count <= kMaxTableCount);
41  for (uint8_t i = 0; i < table_count; i++) {
42  flow_table_list_.push_back(new FlowTable(agent_, i));
43  }
44 
45  for (uint32_t i = 0; i < table_count; i++) {
46  uint16_t latency = agent->params()->flow_task_latency_limit();
47  flow_event_queue_.push_back
48  (new FlowEventQueue(agent, this, flow_table_list_[i],
49  &add_tokens_, latency,
51 
52  flow_tokenless_queue_.push_back
53  (new FlowEventQueue(agent, this, flow_table_list_[i],
54  NULL, latency,
56 
57  flow_delete_queue_.push_back
58  (new DeleteFlowEventQueue(agent, this, flow_table_list_[i],
59  &del_tokens_, latency,
61 
62  flow_ksync_queue_.push_back
63  (new KSyncFlowEventQueue(agent, this, flow_table_list_[i],
64  &ksync_tokens_, latency,
66  }
67  if (::getenv("USE_VROUTER_HASH") != NULL) {
68  string opt = ::getenv("USE_VROUTER_HASH");
69  if (opt == "" || strcasecmp(opt.c_str(), "false"))
70  use_vrouter_hash_ = false;
71  else
72  use_vrouter_hash_ = true;
73  }
74 }
75 
82 }
83 
86  this));
87  for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
88  flow_table_list_[i]->Init();
89  }
90 
91  AgentProfile *profile = agent_->oper_db()->agent_profile();
93  this, _1));
94 
97 }
98 
100  for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
101  flow_table_list_[i]->InitDone();
102  }
104  boost::bind(&FlowProto::FlowStatsUpdate, this));
105 }
106 
108  for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
109  flow_table_list_[i]->Shutdown();
110  }
111  for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
112  flow_event_queue_[i]->Shutdown();
113  flow_tokenless_queue_[i]->Shutdown();
114  flow_delete_queue_[i]->Shutdown();
115  flow_ksync_queue_[i]->Shutdown();
116  }
118  if (stats_update_timer_) {
121  }
122 }
123 
124 static std::size_t HashCombine(std::size_t hash, uint64_t val) {
125  boost::hash_combine(hash, val);
126  return hash;
127 }
128 
129 static std::size_t HashIp(std::size_t hash, const IpAddress &ip) {
130  if (ip.is_v6()) {
131  uint64_t val[2];
132  Ip6AddressToU64Array(ip.to_v6(), val, 2);
133  hash = HashCombine(hash, val[0]);
134  hash = HashCombine(hash, val[1]);
135  } else if (ip.is_v4()) {
136  hash = HashCombine(hash, ip.to_v4().to_ulong());
137  } else {
138  assert(0);
139  }
140  return hash;
141 }
142 
143 // Get the thread to be used for the flow. We *try* to map forward and reverse
144 // flow to same thread with following,
145 // if (sip < dip)
146 // ip1 = sip
147 // ip2 = dip
148 // else
149 // ip1 = dip
150 // ip2 = sip
151 // if (sport < dport)
152 // port1 = sport
153 // port2 = dport
154 // else
155 // port1 = dport
156 // port2 = sport
157 // field5 = proto
158 // hash = HASH(ip1, ip2, port1, port2, proto)
159 //
160 // The algorithm above cannot ensure NAT flows belong to same thread.
161 uint16_t FlowProto::FlowTableIndex(const IpAddress &sip, const IpAddress &dip,
162  uint8_t proto, uint16_t sport,
163  uint16_t dport, uint32_t flow_handle) const {
164  if (use_vrouter_hash_) {
165  return (flow_handle/flow_table_list_.size()) % flow_table_list_.size();
166  }
167 
168  std::size_t hash = 0;
169  if (sip < dip) {
170  hash = HashIp(hash, sip);
171  hash = HashIp(hash, dip);
172  } else {
173  hash = HashIp(hash, dip);
174  hash = HashIp(hash, sip);
175  }
176 
177  if (sport < dport) {
178  hash = HashCombine(hash, sport);
179  hash = HashCombine(hash, dport);
180  } else {
181  hash = HashCombine(hash, dport);
182  hash = HashCombine(hash, sport);
183  }
184  hash = HashCombine(hash, proto);
185  return (hash % (flow_event_queue_.size()));
186 }
187 
189  boost::asio::io_context &io) {
190  uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr,
191  info->ip_proto, info->sport, info->dport,
192  info->agent_hdr.cmd_param);
193  return new FlowHandler(agent(), info, io, this, index);
194 }
195 
197  if (msg->ip == NULL && msg->ip6 == NULL && msg->type != PktType::MESSAGE) {
198  if (msg->family == Address::INET || msg->family == Address::INET6) {
199  FLOW_TRACE(DetailErr, msg->agent_hdr.cmd_param,
200  msg->agent_hdr.ifindex,
201  msg->agent_hdr.vrf,
202  msg->ip_saddr.to_string(),
203  msg->ip_daddr.to_string(),
204  "Flow : Non-IP packet. Dropping", false);
205  } else {
206  assert(0);
207  }
208  return false;
209  }
210  return true;
211 }
212 
214  uint32_t flow_handle) const {
215  uint32_t index = FlowTableIndex(key.src_addr, key.dst_addr, key.protocol,
216  key.src_port, key.dst_port, flow_handle);
217  return flow_table_list_[index];
218 }
219 
221  if (Validate(msg.get()) == false) {
222  return true;
223  }
224 
225  FreeBuffer(msg.get());
227  return true;
228 }
229 
230 void FlowProto::DisableFlowEventQueue(uint32_t index, bool disabled) {
231  flow_event_queue_[index]->set_disable(disabled);
232  flow_tokenless_queue_[index]->set_disable(disabled);
233  flow_delete_queue_[index]->set_disable(disabled);
234 }
235 
238 }
239 
240 void FlowProto::DisableFlowKSyncQueue(uint32_t index, bool disabled) {
241  flow_ksync_queue_[index]->set_disable(disabled);
242 }
243 
245  return flow_update_queue_.Length();
246 }
247 
248 void FlowProto::DisableFlowDeleteQueue(uint32_t index, bool disabled) {
249  flow_delete_queue_[index]->set_disable(disabled);
250 }
251 
253 // FlowTable related routines
256  for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
257  flow_table_list_[i]->DeleteAll();
258  }
259 }
260 
261 FlowTable *FlowProto::GetTable(uint16_t index) const {
262  return flow_table_list_[index];
263 }
264 
265 uint32_t FlowProto::FlowCount() const {
266  uint32_t count = 0;
267  for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
268  count += flow_table_list_[i]->Size();
269  }
270  return count;
271 }
272 
273 void FlowProto::VnFlowCounters(const VnEntry *vn, uint32_t *in_count,
274  uint32_t *out_count) {
275  *in_count = 0;
276  *out_count = 0;
277  if (vn == NULL)
278  return;
279 
280  std::vector<FlowMgmtManager *>::const_iterator it =
282  while (it != agent_->pkt()->flow_mgmt_manager_iterator_end()) {
283  (*it)->VnFlowCounters(vn, in_count, out_count);
284  it++;
285  }
286 }
287 
288 FlowEntry *FlowProto::Find(const FlowKey &key, uint32_t table_index) const {
289  return GetTable(table_index)->Find(key);
290 }
291 
293  FlowTable *table = flow->flow_table();
294  table->Add(flow, flow->reverse_flow_entry());
295  return true;
296 }
297 
299  FlowTable *table = flow->flow_table();
300  table->Update(flow, flow->reverse_flow_entry());
301  return true;
302 }
303 
305 // Flow Control Event routines
308  FlowEventQueueBase *queue = NULL;
309  switch (event->event()) {
311  PktInfo *info = event->pkt_info().get();
312  uint32_t index = FlowTableIndex(info->ip_saddr, info->ip_daddr,
313  info->ip_proto, info->sport,
314  info->dport,
315  info->agent_hdr.cmd_param);
316  queue = flow_event_queue_[index];
317  break;
318  }
319 
321  FlowEntry *flow = event->flow();
322  FlowTable *table = flow->flow_table();
323  queue = flow_event_queue_[table->table_index()];
324  break;
325  }
326 
327  case FlowEvent::EVICT_FLOW: {
328  FlowEntry *flow = event->flow();
329  FlowTable *table = flow->flow_table();
330  queue = flow_ksync_queue_[table->table_index()];
331  break;
332  }
333 
335  FlowEntry *flow = event->flow();
336  FlowTable *table = flow->flow_table();
337  queue = flow_tokenless_queue_[table->table_index()];
338  break;
339  }
340 
341  case FlowEvent::AUDIT_FLOW: {
342  FlowTable *table = GetFlowTable(event->get_flow_key(),
343  event->flow_handle());
344  queue = flow_event_queue_[table->table_index()];
345  break;
346  }
347 
349  queue = flow_tokenless_queue_[event->table_index()];
350  break;
351  }
352 
353  case FlowEvent::KSYNC_EVENT: {
354  FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(event);
355  FlowTableKSyncEntry *ksync_entry =
356  (static_cast<FlowTableKSyncEntry *> (ksync_event->ksync_entry()));
357  FlowEntry *flow = ksync_entry->flow_entry().get();
358  FlowTable *table = flow->flow_table();
359  queue = flow_ksync_queue_[table->table_index()];
360  break;
361  }
362 
363  case FlowEvent::REENTRANT: {
364  queue = flow_event_queue_[event->table_index()];
365  break;
366  }
367 
368  case FlowEvent::DELETE_FLOW: {
369  FlowEntry *flow = event->flow();
370  queue = flow_delete_queue_[flow->flow_table()->table_index()];
371  break;
372  }
373 
375  queue = flow_tokenless_queue_[0];
376  break;
377  }
378 
382  queue = &flow_update_queue_;
383  break;
384  }
385 
387  FlowTable *table = event->flow()->flow_table();
388  queue = flow_event_queue_[table->table_index()];
389  break;
390  }
391  default:
392  assert(0);
393  break;
394  }
395 
396  UpdateStats(event, &stats_);
397  queue->Enqueue(event);
398  return;
399 }
400 
402  // concurrency check to ensure all request are in right partitions
403  assert(table->ConcurrencyCheck(table->flow_task_id()) == true);
404 
405  switch (req->event()) {
407  ProcessProto(req->pkt_info());
408  break;
409  }
410 
411  case FlowEvent::REENTRANT: {
412  FlowHandler *handler = new FlowHandler(agent(), req->pkt_info(), io_,
413  this, table->table_index());
414  RunProtoHandler(handler);
415  break;
416  }
417 
419  FlowEntry *flow = req->flow();
420  // process event only for forward flow with same gen_id
421  // it may happen that after enqueued for recompute,
422  // flow become reverse flow when the following sequence of
423  // events occur.
424  // 1. route is changed , flow is enqueued for recompute
425  // 2. flow get evicted in vrouter
426  // 3. traffic is received for reverse flow and get the same flow handle
427  // 4. since flow handle is same , existing flow entries in agent won't
428  // be deleted but forward flow become reverse and vice versa
429  // added check to process events only if gen id matches,
430  // otherwise ignore it. added assertion not to process reverseflow
431  // at this stage as we only enqueue forward flows.
432 
433  if ((flow->flow_handle() == req->flow_handle()) &&
434  (flow->gen_id() == req->gen_id()) &&
435  (flow->is_flags_set(FlowEntry::ReverseFlow) == false)) {
436  FlowTaskMsg *flow_msg = new FlowTaskMsg(flow);
437  PktInfoPtr pkt_info(new PktInfo(PktHandler::FLOW, flow_msg));
438  FlowHandler *handler = new FlowHandler(agent(), pkt_info, io_,
439  this, table->table_index());
440  RunProtoHandler(handler);
441  }
442  break;
443  }
444 
446  break;
447 
449  table->GrowFreeList();
450  break;
451  }
452 
453  case FlowEvent::AUDIT_FLOW: {
454  FlowEntryPtr flow_ref = table->Find(req->get_flow_key());
455  FlowEntry *flow = flow_ref.get();
456  if (flow == NULL) {
457  FlowEntryPtr new_flow = FlowEntry::Allocate(req->get_flow_key(), table);
458  new_flow->InitAuditFlow(req->flow_handle(), req->gen_id());
459  new_flow->flow_table()->Add(new_flow.get(), NULL);
460  } else {
461  // scenario: forward flow trap is received , before installing
462  // reverse flow, traffic received for reverse flow and trap is
463  // dropped and not received in agent. vrouter returns
464  // EEXIST error for reverse flow. flow entry is present
465  // in flow table but it is in hold state.
466  // take lock in forward and reverse flow order to avoid
467  // deadlock.
468  // EEXIST is seen only for reverse flows,
469  if (flow && flow->is_flags_set(FlowEntry::ReverseFlow)) {
470  FLOW_LOCK(flow->reverse_flow_entry(), flow, req->event());
471  if (!(flow->deleted()) &&
472  flow->ksync_entry() &&
473  flow->ksync_entry()->ksync_response_error() == EEXIST) {
475  }
476  }
477  }
478  break;
479  }
480 
481  // Check if flow-handle changed. This can happen if vrouter tries to
482  // setup the flow which was evicted earlier
484  FlowEntry *flow = req->flow();
485  flow->flow_table()->ProcessFlowEvent(req, flow,
486  flow->reverse_flow_entry());
487  break;
488  }
489 
490  case FlowEvent::KSYNC_EVENT: {
491  return FlowKSyncMsgHandler(req, table);
492  }
493 
496  req->table_index());
497  mgr->flow_mgmt_dbclient()->FreeDBState(req->db_entry(), req->gen_id());
498  break;
499  }
500 
501  default: {
502  assert(0);
503  break;
504  }
505  }
506 
507  return true;
508 }
509 
511  FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(req);
512 
513  // concurrency check to ensure all request are in right partitions
514  assert((table->ConcurrencyCheck(table->flow_ksync_task_id()) == true) ||
515  (table->ConcurrencyCheck(table->flow_task_id()) == true));
516 
517  switch (req->event()) {
518  // Flow was waiting for an index. Index is available now. Retry acquiring
519  // the index
520  case FlowEvent::KSYNC_EVENT: {
521  FlowTableKSyncEntry *ksync_entry =
522  (static_cast<FlowTableKSyncEntry *> (ksync_event->ksync_entry()));
523  FlowEntry *flow = ksync_entry->flow_entry().get();
524  flow->flow_table()->ProcessFlowEvent(req, flow,
525  flow->reverse_flow_entry());
526  break;
527  }
528 
529  case FlowEvent::EVICT_FLOW: {
530  FlowEntry *flow = req->flow();
531  flow->flow_table()->ProcessFlowEvent(req, flow,
532  flow->reverse_flow_entry());
533  break;
534  }
535 
536  default: {
537  assert(0);
538  break;
539  }
540  }
541 
542  return true;
543 }
544 
546  switch (req->event()) {
549  FlowEntry *flow = req->flow();
550  flow->flow_table()->ProcessFlowEvent(req, flow,
551  flow->reverse_flow_entry());
552  break;
553  }
554 
556  FlowEntry *flow = req->flow();
557  flow->flow_table()->ProcessFlowEvent(req, flow,
558  flow->reverse_flow_entry());
559  break;
560  }
561 
562  default: {
563  assert(0);
564  break;
565  }
566  }
567 
568  return true;
569 }
570 
572  // concurrency check to ensure all request are in right partitions
573  // flow-update-queue doenst happen table pointer. Skip concurrency check
574  // for flow-update-queue
575  if (table) {
576  assert(table->ConcurrencyCheck(table->flow_delete_task_id()) == true);
577  }
578 
579  switch (req->event()) {
580  case FlowEvent::DELETE_FLOW: {
581  FlowEntry *flow = req->flow();
582  table->ProcessFlowEvent(req, flow, flow->reverse_flow_entry());
583  break;
584  }
585 
586  default: {
587  assert(0);
588  break;
589  }
590  }
591 
592  return true;
593 }
594 
596 // Utility methods to generate events
600  return;
601 }
602 
605  FlowEntry *flow = Find(key, 0);
606  if (flow) {
608  }
609 }
610 
611 void FlowProto::EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle,
612  uint8_t gen_id, uint8_t evict_gen_id) {
613  FlowEvent *event = new FlowEvent(FlowEvent::EVICT_FLOW, flow,
614  flow_handle, gen_id, evict_gen_id);
615  EnqueueFlowEvent(event);
616  return;
617 }
618 
619 void FlowProto::CreateAuditEntry(const FlowKey &key, uint32_t flow_handle,
620  uint8_t gen_id) {
621  EnqueueFlowEvent(new FlowEvent(FlowEvent::AUDIT_FLOW, key, flow_handle,
622  gen_id));
623  return;
624 }
625 
626 
629  table->table_index()));
630  return;
631 }
632 
635  uint32_t flow_handle, uint8_t gen_id,
636  int ksync_error, uint64_t evict_flow_bytes,
637  uint64_t evict_flow_packets,
638  int32_t evict_flow_oflow,
639  uint32_t transaction_id) {
640  EnqueueFlowEvent(new FlowEventKSync(ksync_entry, event, flow_handle,
641  gen_id, ksync_error, evict_flow_bytes,
642  evict_flow_packets, evict_flow_oflow,
643  transaction_id));
644 }
645 
648  flow->flow_handle(), flow->gen_id()));
649  return;
650 }
651 
652 // Flow management runs in parallel to flow processing. As a result,
653 // we need to ensure that last reference for flow will go away from
654 // kTaskFlowEvent context only. This is ensured by following 2 actions
655 //
656 // 1. On return from here reference to the flow is removed which can
657 // potentially be last reference. So, enqueue a dummy request to
658 // flow-table queue.
659 // 2. Due to OS scheduling, its possible that the request we are
660 // enqueuing completes even before this function is returned. So,
661 // drop the reference immediately after allocating the event
664  flow.get());
665  flow.reset();
666  EnqueueFlowEvent(event);
667 }
668 
669 bool FlowProto::EnqueueReentrant(PktInfoPtr msg, uint8_t table_index) {
671  msg, NULL, table_index));
672  return true;
673 }
674 
675 // Enqueue event to force revaluation of KSync entry
678  EnqueueFlowEvent(event);
679 }
680 
681 // Apply trace-filter for flow. Will not allow true-false transistions.
682 // That is, if flows are already marked for tracing, action is retained
683 bool FlowProto::ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow) {
684  // Handle case where flow is NULL. It can happen if Update is called
685  // and flow is deleted between event-processing and calling
686  // FlowTable::Update
687  if (flow == NULL)
688  return false;
689 
690  bool trace = flow->trace();
691  if (rflow)
692  trace |= rflow->trace();
693 
694  if (trace == false) {
695  FlowTraceFilter *filter;
696  if (flow->key().family == Address::INET) {
697  filter = &ipv4_trace_filter_;
698  } else {
699  filter = &ipv6_trace_filter_;
700  }
701 
702  trace = filter->Match(&flow->key());
703  if (rflow && trace == false) {
704  trace = filter->Match(&rflow->key());
705  }
706  }
707 
708  return trace;
709 }
710 
712 // Token Management routines
715  switch (event) {
719  return add_tokens_.GetToken(NULL);
720  break;
721 
723  return ksync_tokens_.GetToken(NULL);
724  break;
725 
730  return update_tokens_.GetToken(NULL);
731  break;
732 
734  return del_tokens_.GetToken(NULL);
735  break;
736 
738  case FlowEvent::INVALID:
739  break;
740 
741  default:
742  assert(0);
743  break;
744  }
745 
746  return add_tokens_.GetToken(NULL);
747 }
748 
749 bool FlowProto::TokenCheck(const FlowTokenPool *pool) const {
750  return pool->TokenCheck();
751 }
752 
754  FlowTokenPool *pool = dynamic_cast<FlowTokenPool *>(pool_base);
755  if (pool_base == NULL)
756  return;
757 
758  pool->IncrementRestarts();
759  if (pool == &add_tokens_) {
760  for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
761  flow_event_queue_[i]->MayBeStartRunner();
762  }
763  }
764 
765  if (pool == &ksync_tokens_) {
766  for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
767  flow_ksync_queue_[i]->MayBeStartRunner();
768  }
769  }
770 
771  if (pool == &del_tokens_) {
772  for (uint32_t i = 0; i < flow_event_queue_.size(); i++) {
773  flow_delete_queue_[i]->MayBeStartRunner();
774  }
775  }
776 
777  if (pool == &update_tokens_) {
779  }
780 }
781 
783 // Set profile information
785 void UpdateStats(FlowEvent *req, FlowStats *stats) {
786  switch (req->event()) {
788  stats->add_count_++;
789  break;
791  stats->flow_messages_++;
792  break;
794  stats->delete_count_++;
795  break;
797  stats->audit_count_++;
798  break;
800  stats->recompute_count_++;
801  break;
803  stats->revaluate_count_++;
804  break;
806  stats->evict_count_++;
807  break;
808  case FlowEvent::KSYNC_EVENT: {
809  stats->vrouter_responses_++;
810  FlowEventKSync *ksync_event = static_cast<FlowEventKSync *>(req);
811  if (ksync_event->ksync_error())
812  stats->vrouter_error_++;
813  break;
814  }
815  default:
816  break;
817  }
818 }
819 
820 static void SetFlowEventQueueStats(Agent *agent,
821  const FlowEventQueueBase::Queue *queue,
823  stats->name_ = queue->Description();
824  stats->queue_count_ = queue->Length();
825  stats->enqueue_count_ = queue->NumEnqueues();
826  stats->dequeue_count_ = queue->NumDequeues();
827  stats->max_queue_count_ = queue->max_queue_len();
828  stats->start_count_ = queue->task_starts();
829  stats->busy_time_ = queue->busy_time();
830  queue->set_measure_busy_time(agent->MeasureQueueDelay());
831  if (agent->MeasureQueueDelay()) {
832  queue->ClearStats();
833  }
834 }
835 
836 static void SetFlowMgmtQueueStats(Agent *agent,
837  const FlowMgmtManager::FlowMgmtQueue *queue,
839  stats->name_ = queue->Description();
840  stats->queue_count_ = queue->Length();
841  stats->enqueue_count_ = queue->NumEnqueues();
842  stats->dequeue_count_ = queue->NumDequeues();
843  stats->max_queue_count_ = queue->max_queue_len();
844  stats->start_count_ = queue->task_starts();
845  stats->busy_time_ = queue->busy_time();
846  queue->set_measure_busy_time(agent->MeasureQueueDelay());
847  if (agent->MeasureQueueDelay())
848  queue->ClearStats();
849 }
850 
851 static void SetPktHandlerQueueStats(Agent *agent,
852  const PktHandler::PktHandlerQueue *queue,
854  stats->name_ = queue->Description();
855  stats->queue_count_ = queue->Length();
856  stats->enqueue_count_ = queue->NumEnqueues();
857  stats->dequeue_count_ = queue->NumDequeues();
858  stats->max_queue_count_ = queue->max_queue_len();
859  stats->start_count_ = queue->task_starts();
860  stats->busy_time_ = queue->busy_time();
861  queue->set_measure_busy_time(agent->MeasureQueueDelay());
862  if (agent->MeasureQueueDelay())
863  queue->ClearStats();
864 }
865 
867  data->flow_.flow_count_ = FlowCount();
876 
877  PktModule *pkt = agent()->pkt();
878  std::vector<FlowMgmtManager *> mgr_list = pkt->flow_mgmt_manager_list();
879 
880  data->flow_.flow_event_queue_.resize(flow_table_list_.size());
881  data->flow_.flow_delete_queue_.resize(flow_table_list_.size());
882  data->flow_.flow_tokenless_queue_.resize(flow_table_list_.size());
883  data->flow_.flow_ksync_queue_.resize(flow_table_list_.size());
884  for (uint16_t i = 0; i < flow_table_list_.size(); i++) {
885  SetFlowMgmtQueueStats(agent(), mgr_list[i]->request_queue(),
886  &data->flow_.flow_mgmt_queue_);
888  &data->flow_.flow_event_queue_[i]);
890  &data->flow_.flow_delete_queue_[i]);
892  &data->flow_.flow_tokenless_queue_[i]);
894  &data->flow_.flow_ksync_queue_[i]);
895  }
897  &data->flow_.flow_update_queue_);
898  const PktHandler::PktHandlerQueue *pkt_queue =
899  pkt->pkt_handler()->work_queue();
900  SetPktHandlerQueueStats(agent(), pkt_queue,
901  &data->flow_.pkt_handler_queue_);
902 
915 }
916 
919  agent_->stats()->added());
921  agent_->stats()->deleted());
922  return true;
923 }
924 
925 void FlowProto::InterfaceFlowCount(const Interface *intf, uint64_t *created,
926  uint64_t *aged,
927  uint32_t *active_flows) const {
928  *created = 0;
929  *aged = 0;
930  *active_flows = 0;
931  if (intf == NULL)
932  return;
933  std::vector<FlowMgmtManager *>::const_iterator it =
935  while (it != agent_->pkt()->flow_mgmt_manager_iterator_end()) {
936  (*it)->InterfaceFlowCount(intf, created, aged, active_flows);
937  it++;
938  }
939 }
static const int kMinTableCount
Definition: flow_proto.h:48
bool MeasureQueueDelay()
Definition: agent.cc:1136
uint32_t dport
Definition: pkt_handler.h:398
void DisableFlowKSyncQueue(uint32_t index, bool disabled)
Definition: flow_proto.cc:240
IpAddress ip_saddr
Definition: pkt_handler.h:394
uint32_t task_starts() const
Definition: queue_task.h:376
Address::Family family
Definition: pkt_handler.h:386
WorkQueueStats flow_update_queue_
Definition: agent_profile.h:66
void DisableFlowUpdateQueue(bool disabled)
Definition: flow_proto.cc:236
std::vector< WorkQueueStats > flow_delete_queue_
Definition: agent_profile.h:69
bool ConcurrencyCheck(int task_id, bool check_task_instance)
Definition: flow_table.cc:95
IpAddress src_addr
Definition: flow_entry.h:213
bool RunProtoHandler(ProtoHandler *handler)
static void UpdateStats(FlowEvent *event, FlowStats *stats)
Definition: flow_proto.cc:785
virtual ~FlowProto()
Definition: flow_proto.cc:76
void STLDeleteValues(Container *container)
Definition: util.h:101
uint32_t ifindex
Definition: pkt_handler.h:180
FlowTraceFilter ipv6_trace_filter_
Definition: flow_proto.h:156
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
void KSyncEventRequest(KSyncEntry *ksync_entry, KSyncEntry::KSyncEvent event, uint32_t flow_handle, uint8_t gen_id, int ksync_error, uint64_t evict_flow_bytes, uint64_t evict_flow_packets, int32_t evict_flow_oflow, uint32_t transcation_id)
Definition: flow_proto.cc:633
std::vector< FlowMgmtManager * >::const_iterator flow_mgmt_manager_iterator_end() const
Definition: pkt_init.h:53
struct ip * ip
Definition: pkt_handler.h:423
FlowEntry * Find(const FlowKey &key, uint32_t table_index) const
Definition: flow_proto.cc:288
void RegisterPktFlowStatsCb(ProfileCb cb)
size_t max_queue_len() const
Definition: queue_task.h:377
int flow_task_id() const
Definition: flow_table.h:252
#define kTaskFlowEvent
Definition: agent.h:321
FlowTable * flow_table() const
Definition: flow_entry.h:597
IpAddress dst_addr
Definition: flow_entry.h:214
boost::asio::io_context & io_
size_t NumDequeues() const
Definition: queue_task.h:364
uint8_t ip_proto
Definition: pkt_handler.h:396
UpdateFlowEventQueue flow_update_queue_
Definition: flow_proto.h:152
boost::asio::ip::address IpAddress
Definition: address.h:13
const FlowKey & get_flow_key() const
Definition: flow_event.h:154
std::vector< DeleteFlowEventQueue * > flow_delete_queue_
Definition: flow_proto.h:149
AgentStats * stats() const
Definition: agent.cc:881
void Shutdown()
Definition: flow_proto.cc:107
static void SetFlowEventQueueStats(Agent *agent, const FlowEventQueueBase::Queue *queue, ProfileData::WorkQueueStats *stats)
Definition: flow_proto.cc:820
WorkQueueStats pkt_handler_queue_
Definition: agent_profile.h:64
uint64_t flow_created() const
Definition: agent_stats.h:100
#define FLOW_LOCK(flow, rflow, flow_event)
Definition: flow_table.h:61
uint32_t flow_handle() const
Definition: flow_event.h:156
uint16_t table_index() const
Definition: flow_table.h:198
uint8_t protocol
Definition: flow_entry.h:215
std::vector< WorkQueueStats > flow_ksync_queue_
Definition: agent_profile.h:70
std::vector< FlowEventQueue * > flow_event_queue_
Definition: flow_proto.h:147
FlowTableKSyncEntry * ksync_entry()
Definition: flow_entry.h:727
uint64_t audit_count_
Definition: flow_proto.h:28
static FlowEntry * Allocate(const FlowKey &key, FlowTable *flow_table)
Definition: flow_entry.cc:514
tbb::atomic< int > linklocal_flow_count_
Definition: flow_proto.h:153
int flow_ksync_task_id() const
Definition: flow_table.h:255
uint16_t flow_task_latency_limit() const
Definition: agent_param.h:436
bool ProcessFlowEvent(const FlowEvent *req, FlowEntry *flow, FlowEntry *rflow)
Definition: flow_table.cc:842
FlowTokenPool ksync_tokens_
Definition: flow_proto.h:144
bool FlowUpdateHandler(FlowEvent *req)
Definition: flow_proto.cc:545
OperDB * oper_db() const
Definition: agent.cc:1013
void FlushFlows()
Definition: flow_proto.cc:255
bool FlowEventHandler(FlowEvent *req, FlowTable *table)
Definition: flow_proto.cc:401
FlowTable * GetFlowTable(const FlowKey &key, uint32_t flow_handle) const
Definition: flow_proto.cc:213
uint32_t sport
Definition: pkt_handler.h:397
uint64_t flow_aged() const
Definition: agent_stats.h:107
const FlowKey & key() const
Definition: flow_entry.h:594
FlowTable * GetTable(uint16_t index) const
Definition: flow_proto.cc:261
uint32_t table_index() const
Definition: flow_event.h:157
FlowEntry * Find(const FlowKey &key)
Definition: flow_table.cc:136
static const int kMaxIterations
Definition: queue_task.h:112
uint64_t evict_count_
Definition: flow_proto.h:31
bool is_flags_set(const FlowEntryFlags &flags) const
Definition: flow_entry.h:610
void GrowFreeList()
Definition: flow_table.cc:960
FlowHandler * AllocProtoHandler(PktInfoPtr info, boost::asio::io_context &io)
Definition: flow_proto.cc:188
Definition: agent.h:358
size_t Length() const
Definition: queue_task.h:356
std::vector< FlowTable * > flow_table_list_
Definition: flow_proto.h:151
static const int kMaxTableCount
Definition: flow_proto.h:49
void InitAuditFlow(uint32_t flow_idx, uint8_t gen_id)
Definition: flow_entry.cc:871
WorkQueueStats flow_mgmt_queue_
Definition: agent_profile.h:65
PktInfoPtr pkt_info() const
Definition: flow_event.h:155
void MayBeStartRunner()
Definition: flow_event.h:286
uint16_t FlowTableIndex(const IpAddress &sip, const IpAddress &dip, uint8_t proto, uint16_t sport, uint16_t dport, uint32_t flow_handle) const
Definition: flow_proto.cc:161
void MessageRequest(FlowEntry *flow)
Definition: flow_proto.cc:646
uint32_t gen_id() const
Definition: flow_event.h:152
uint32_t Length()
Definition: flow_event.h:285
void UpdateFlowMinMaxStats(uint64_t total_flows, FlowCounters &stat) const
Definition: agent_stats.cc:139
void FreeBuffer(PktInfo *msg)
#define CHECK_CONCURRENCY(...)
uint64_t vrouter_responses_
Definition: flow_proto.h:29
void Init(bool enable, Address::Family family)
uint32_t vrf
Definition: pkt_handler.h:181
void Add(FlowEntry *flow, FlowEntry *rflow)
Definition: flow_table.cc:175
FlowTokenPool update_tokens_
Definition: flow_proto.h:146
void EnqueueUnResolvedFlowEntry(FlowEntry *flow)
Definition: flow_proto.cc:676
void RegisterFlowCountFn(FlowCountFn cb)
Definition: agent_stats.cc:158
uint64_t delete_count_
Definition: flow_proto.h:24
void set_measure_busy_time(bool val) const
Definition: queue_task.h:379
PktHandler * pkt_handler() const
Definition: pkt_init.h:31
FlowCounters & added()
Definition: agent_stats.h:222
bool Enqueue(PktInfoPtr msg)
Definition: flow_proto.cc:220
#define FLOW_TRACE(obj,...)
Definition: flow_mgmt.h:377
FlowStats stats_
Definition: flow_proto.h:157
std::vector< FlowEventQueue * > flow_tokenless_queue_
Definition: flow_proto.h:148
static void SetFlowMgmtQueueStats(Agent *agent, const FlowMgmtManager::FlowMgmtQueue *queue, ProfileData::WorkQueueStats *stats)
Definition: flow_proto.cc:836
void MakeShortFlow(FlowShortReason reason)
Definition: flow_entry.cc:2869
std::vector< WorkQueueStats > flow_event_queue_
Definition: agent_profile.h:67
void Ip6AddressToU64Array(const Ip6Address &addr, uint64_t *arr, int size)
void EvictFlowRequest(FlowEntry *flow, uint32_t flow_handle, uint8_t gen_id, uint8_t evict_gen_id)
Definition: flow_proto.cc:611
AgentParam * params() const
Definition: agent.h:1218
bool trace() const
Definition: flow_entry.h:752
uint64_t restarts() const
Definition: flow_token.h:35
FlowStats flow_
#define kTaskFlowStatsUpdate
Definition: agent.h:331
const DBEntry * db_entry() const
Definition: flow_event.h:150
uint64_t failures() const
Definition: flow_token.h:33
virtual void TokenAvailable(TokenPool *pool_base)
Definition: flow_proto.cc:753
FlowTokenStats token_stats_
Definition: agent_profile.h:63
uint64_t flow_messages_
Definition: flow_proto.h:25
uint64_t add_count_
Definition: flow_proto.h:23
bool Cancel()
Definition: timer.cc:150
Definition: vn.h:151
void DisableFlowEventQueue(uint32_t index, bool disabled)
Definition: flow_proto.cc:230
std::string Description() const
Definition: queue_task.h:310
FlowTraceFilter ipv4_trace_filter_
Definition: flow_proto.h:155
void SetProfileData(ProfileData *data)
Definition: flow_proto.cc:866
bool FlowKSyncMsgHandler(FlowEvent *req, FlowTable *table)
Definition: flow_proto.cc:510
std::vector< KSyncFlowEventQueue * > flow_ksync_queue_
Definition: flow_proto.h:150
uint16_t src_port
Definition: flow_entry.h:216
void InitDone()
Definition: flow_proto.cc:99
bool FreeDBState(const DBEntry *entry, uint32_t gen_id)
std::vector< FlowMgmtManager * >::const_iterator flow_mgmt_manager_iterator_begin() const
Definition: pkt_init.h:49
bool use_vrouter_hash_
Definition: flow_proto.h:154
FlowTokenPool del_tokens_
Definition: flow_proto.h:145
uint64_t revaluate_count_
Definition: flow_proto.h:26
void set_disable(bool val)
Definition: flow_event.h:284
static std::size_t HashCombine(std::size_t hash, uint64_t val)
Definition: flow_proto.cc:124
bool Match(const FlowKey *key)
int ksync_error() const
Definition: flow_event.h:214
void ClearStats() const
Definition: queue_task.h:382
Address::Family family
Definition: flow_entry.h:211
uint32_t FlowCount() const
Definition: flow_proto.cc:265
static void SetPktHandlerQueueStats(Agent *agent, const PktHandler::PktHandlerQueue *queue, ProfileData::WorkQueueStats *stats)
Definition: flow_proto.cc:851
uint32_t flow_handle() const
Definition: flow_entry.h:600
FlowCounters & deleted()
Definition: agent_stats.h:223
void Enqueue(FlowEvent *event)
Definition: flow_event.cc:54
struct ip6_hdr * ip6
Definition: pkt_handler.h:424
bool FlowDeleteHandler(FlowEvent *req, FlowTable *table)
Definition: flow_proto.cc:571
void DeleteFlowRequest(FlowEntry *flow)
Definition: flow_proto.cc:598
bool Validate(PktInfo *msg)
Definition: flow_proto.cc:196
uint16_t dst_port
Definition: flow_entry.h:217
bool TokenCheck(const FlowTokenPool *pool) const
Definition: flow_proto.cc:749
bool flow_trace_enable() const
Definition: agent.h:1203
FlowEntry * flow() const
Definition: flow_event.h:147
int token_count() const
Definition: flow_token.h:31
uint64_t recompute_count_
Definition: flow_proto.h:27
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
Timer * stats_update_timer_
Definition: flow_proto.h:159
FlowEntry * reverse_flow_entry()
Definition: flow_entry.h:602
void Init()
Definition: flow_proto.cc:84
KSyncEntry * ksync_entry() const
Definition: flow_event.h:212
void EnqueueFlowEvent(FlowEvent *event)
Definition: flow_proto.cc:307
void ForceEnqueueFreeFlowReference(FlowEntryPtr &flow)
Definition: flow_proto.cc:662
int flow_delete_task_id() const
Definition: flow_table.h:254
void CreateAuditEntry(const FlowKey &key, uint32_t flow_handle, uint8_t gen_id)
Definition: flow_proto.cc:619
FlowEntryPtr flow_entry() const
const PktHandlerQueue * work_queue() const
Definition: pkt_handler.h:322
bool FlowStatsUpdate() const
Definition: flow_proto.cc:917
uint64_t busy_time() const
Definition: queue_task.h:380
size_t NumEnqueues() const
Definition: queue_task.h:360
IpAddress ip_daddr
Definition: pkt_handler.h:395
uint32_t cmd_param
Definition: pkt_handler.h:183
Queue * queue() const
Definition: flow_event.h:287
bool deleted()
Definition: flow_entry.h:680
AgentHdr agent_hdr
Definition: pkt_handler.h:388
bool EnqueueReentrant(boost::shared_ptr< PktInfo > msg, uint8_t table_index)
Definition: flow_proto.cc:669
FlowMgmtManagerList flow_mgmt_manager_list() const
Definition: pkt_init.h:36
boost::shared_ptr< Token > TokenPtr
Definition: flow_token.h:11
static std::size_t HashIp(std::size_t hash, const IpAddress &ip)
Definition: flow_proto.cc:129
uint8_t gen_id() const
Definition: flow_entry.h:599
bool ShouldTrace(const FlowEntry *flow, const FlowEntry *rflow)
Definition: flow_proto.cc:683
FlowMgmtManager * flow_mgmt_manager(uint16_t index) const
Definition: pkt_init.h:39
PktModule * pkt() const
Definition: agent.cc:965
std::vector< WorkQueueStats > flow_tokenless_queue_
Definition: agent_profile.h:68
FlowProto(Agent *agent, boost::asio::io_context &io)
Definition: flow_proto.cc:22
void DisableFlowDeleteQueue(uint32_t index, bool disabled)
Definition: flow_proto.cc:248
FlowMgmtDbClient * flow_mgmt_dbclient() const
Definition: flow_mgmt.h:306
uint16_t flow_thread_count() const
Definition: agent.h:1202
void IncrementRestarts()
Definition: flow_token.h:34
int ksync_response_error() const
bool ProcessProto(boost::shared_ptr< PktInfo > msg_info)
bool UpdateFlow(FlowEntry *flow)
Definition: flow_proto.cc:298
void SetFlowProto(FlowProto *proto)
Definition: agent.h:1000
size_t FlowUpdateQueueLength()
Definition: flow_proto.cc:244
AgentProfile * agent_profile() const
Definition: operdb_init.h:79
void InterfaceFlowCount(const Interface *intf, uint64_t *created, uint64_t *aged, uint32_t *active_flows) const
Definition: flow_proto.cc:925
Event event() const
Definition: flow_event.h:146
TokenPtr GetToken(FlowEntry *entry)
Definition: flow_token.cc:48
FlowTokenPool add_tokens_
Definition: flow_proto.h:143
void VnFlowCounters(const VnEntry *vn, uint32_t *in_count, uint32_t *out_count)
Definition: flow_proto.cc:273
int flow_stats_update_timeout() const
Definition: agent_stats.h:109
bool TokenCheck() const
Definition: flow_token.cc:32
boost::shared_ptr< PktInfo > PktInfoPtr
Definition: pkt_handler.h:61
bool AddFlow(FlowEntry *flow)
Definition: flow_proto.cc:292
uint64_t vrouter_error_
Definition: flow_proto.h:30
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
Definition: flow_entry.h:125
void GrowFreeListRequest(FlowTable *table)
Definition: flow_proto.cc:627
void Update(FlowEntry *flow, FlowEntry *rflow)
Definition: flow_table.cc:184
TokenPtr GetToken(FlowEvent::Event event)
Definition: flow_proto.cc:714
PktType::Type type
Definition: pkt_handler.h:387