OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
flowtable_ksync.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <sys/socket.h>
6 #if defined(__linux__)
7 #include <linux/netlink.h>
8 #endif
9 #include <fcntl.h>
10 #include <sys/mman.h>
11 #include <sys/types.h>
12 #include <sys/ipc.h>
13 #include <sys/shm.h>
14 #include <asm/types.h>
15 #include <boost/asio.hpp>
16 #include <boost/asio/buffer.hpp>
17 
18 #include <base/address_util.h>
19 #include <cmn/agent_cmn.h>
20 #include <ksync/ksync_index.h>
21 #include <ksync/ksync_entry.h>
22 #include <ksync/ksync_object.h>
23 #include <ksync/ksync_netlink.h>
24 #include <ksync/ksync_sock.h>
25 #include <ksync/ksync_netlink.h>
26 #include <ksync/ksync_types.h>
27 #include <vrouter/ksync/agent_ksync_types.h>
30 #include <filter/traffic_action.h>
31 #include <vr_types.h>
32 #include <nl_util.h>
33 #include <vr_flow.h>
34 #include <vr_genetlink.h>
35 #include <ksync/ksync_sock_user.h>
37 
38 #include <pkt/flow_proto.h>
39 #include <oper/agent_types.h>
40 #include <services/services_init.h>
42 #include <uve/stats_collector.h>
43 
49 
50 using namespace boost::asio::ip;
51 
52 static uint16_t GetDropReason(uint16_t dr) {
53  switch (dr) {
55  return VR_FLOW_DR_UNAVIALABLE_INTF;
57  return VR_FLOW_DR_IPv4_FWD_DIS;
59  return VR_FLOW_DR_UNAVAILABLE_VRF;
61  return VR_FLOW_DR_NO_SRC_ROUTE;
63  return VR_FLOW_DR_NO_DST_ROUTE;
65  return VR_FLOW_DR_AUDIT_ENTRY;
67  return VR_FLOW_DR_VRF_CHANGE;
69  return VR_FLOW_DR_NO_REVERSE_FLOW;
71  return VR_FLOW_DR_REVERSE_FLOW_CHANGE;
73  return VR_FLOW_DR_NAT_CHANGE;
75  return VR_FLOW_DR_FLOW_LIMIT;
77  return VR_FLOW_DR_LINKLOCAL_SRC_NAT;
79  return VR_FLOW_DR_NO_MIRROR_ENTRY;
81  return VR_FLOW_DR_SAME_FLOW_RFLOW_KEY;
83  return VR_FLOW_DR_PORT_MAP_DROP;
85  return VR_FLOW_DR_NO_SRC_ROUTE_L2RPF;
87  return VR_FLOW_DR_FAT_FLOW_NAT_CONFLICT;
89  return VR_FLOW_DR_POLICY;
91  return VR_FLOW_DR_OUT_POLICY;
92  case FlowEntry::DROP_SG:
93  return VR_FLOW_DR_SG;
95  return VR_FLOW_DR_OUT_SG;
97  return VR_FLOW_DR_REVERSE_SG;
99  return VR_FLOW_DR_REVERSE_OUT_SG;
101  return VR_FLOW_DR_FW_POLICY;
103  return VR_FLOW_DR_OUT_FW_POLICY;
105  return VR_FLOW_DR_REVERSE_FW_POLICY;
107  return VR_FLOW_DR_REVERSE_OUT_FW_POLICY;
109  return VR_FLOW_DR_FWAAS_POLICY;
111  return VR_FLOW_DR_OUT_FWAAS_POLICY;
113  return VR_FLOW_DR_REVERSE_FWAAS_POLICY;
115  return VR_FLOW_DR_REVERSE_OUT_FWAAS_POLICY;
116  default:
117  break;
118  }
119  return VR_FLOW_DR_UNKNOWN;
120 }
121 
123  Reset();
124  ksync_obj_ = obj;
125 }
126 
128  FlowEntry *flow, uint32_t hash_id) {
129  Reset();
130  Reset(flow, hash_id);
131  ksync_obj_ = obj;
132 }
133 
135 }
136 
139  flow_entry_ = NULL;
141  gen_id_ = 0;
142  evict_gen_id_ = 0;
143  vrouter_gen_id_ = 0;
144  vrouter_hash_id_ = FlowEntry::kInvalidFlowHandle;
145  old_reverse_flow_id_ = FlowEntry::kInvalidFlowHandle;
146  old_action_ = 0;
147  old_component_nh_idx_ = 0xFFFF;
148  old_first_mirror_index_ = 0xFFFF;
149  old_second_mirror_index_ = 0xFFFF;
150  trap_flow_ = false;
151  old_drop_reason_ = 0;
152  ecmp_ = false;
153  enable_rpf_ = true;
154  src_nh_id_ = NextHopTable::kRpfDiscardIndex;
155  last_event_ = FlowEvent::INVALID;
156  token_.reset();
157  ksync_response_info_.Reset();
158  qos_config_idx = AgentQosConfigTable::kInvalidIndex;
159  transaction_id_ = 0;
160  underlay_gw_index = -1;
161 }
162 
164  flow_entry_ = flow;
165  hash_id_ = hash_id;
166  gen_id_ = flow->gen_id();
167  transaction_id_ = flow->GetTransactionId();
168 }
169 
171  return ksync_obj_;
172 }
173 
175  FlowTableKSyncObject *obj =
176  static_cast<FlowTableKSyncObject *>(GetObject());
177  return (obj->flow_table()->table_index());
178 }
180  if (token_.get())
181  token_.reset();
182 }
183 
185  std::vector<int8_t> &data) {
186  data.clear();
187  uint32_t addr = ksync_obj_->ksync()->agent()->router_id().to_ulong();
188  data.push_back(FlowEntry::PCAP_CAPTURE_HOST);
189  data.push_back(0x4);
190  data.push_back(((addr >> 24) & 0xFF));
191  data.push_back(((addr >> 16) & 0xFF));
192  data.push_back(((addr >> 8) & 0xFF));
193  data.push_back(((addr) & 0xFF));
194 
195  data.push_back(FlowEntry::PCAP_FLAGS);
196  data.push_back(0x4);
197  uint32_t action;
198  action = fe->match_p().action_info.action;
199  if (fe->is_flags_set(FlowEntry::IngressDir)) {
200  // Set 31st bit for ingress
201  action |= 0x40000000;
202  }
203  data.push_back((action >> 24) & 0xFF);
204  data.push_back((action >> 16) & 0xFF);
205  data.push_back((action >> 8) & 0xFF);
206  data.push_back((action) & 0xFF);
207 
208  data.push_back(FlowEntry::PCAP_SOURCE_VN);
209  data.push_back(fe->data().source_vn_match.size());
210  data.insert(data.end(), fe->data().source_vn_match.begin(),
211  fe->data().source_vn_match.end());
212  data.push_back(FlowEntry::PCAP_DEST_VN);
213  data.push_back(fe->data().dest_vn_match.size());
214  data.insert(data.end(), fe->data().dest_vn_match.begin(),
215  fe->data().dest_vn_match.end());
216  data.push_back(FlowEntry::PCAP_TLV_END);
217  data.push_back(0x0);
218 }
219 
220 static void EncodeKSyncIp(vr_flow_req *req, const IpAddress &sip,
221  const IpAddress &dip) {
222  uint64_t supper, dupper, slower, dlower;
223 
224 
225  IpToU64(sip, dip, &supper, &slower, &dupper, &dlower);
226  req->set_fr_flow_sip_l(slower);
227  req->set_fr_flow_sip_u(supper);
228  req->set_fr_flow_dip_l(dlower);
229  req->set_fr_flow_dip_u(dupper);
230 
231 }
232 
233 int FlowTableKSyncEntry::Encode(sandesh_op::type op, char *buf, int buf_len) {
234  vr_flow_req &req = ksync_obj_->flow_req();
235  int encode_len;
236  int error;
237  uint16_t action = 0;
238  uint16_t drop_reason = VR_FLOW_DR_UNKNOWN;
239 
240  // currently vrouter doesnot guarantee gen id to always start from 0
241  // on vrouter-agent restart
242  // TODO(prabhjot) need to move last gen id seen by vrouter in KSync
243  // Index Manager
244  if (gen_id_ != evict_gen_id_) {
245  // skip sending update to vrouter for evicted entry
246  flow_entry_->LogFlow(FlowEventLog::FLOW_MSG_SKIP_EVICTED, this,
247  hash_id_, evict_gen_id_);
248  return 0;
249  }
250 
251  req.set_fr_op(flow_op::FLOW_SET);
252  req.set_fr_rid(0);
253  req.set_fr_index(hash_id_);
254  req.set_fr_gen_id(gen_id_);
255  const FlowKey *fe_key = &flow_entry_->key();
256  EncodeKSyncIp(&req, fe_key->src_addr, fe_key->dst_addr);
257  req.set_fr_flow_proto(fe_key->protocol);
258  req.set_fr_flow_sport(htons(fe_key->src_port));
259  req.set_fr_flow_dport(htons(fe_key->dst_port));
260  req.set_fr_flow_nh_id(fe_key->nh);
261  if (flow_entry_->key().family == Address::INET)
262  req.set_fr_family(AF_INET);
263  else
264  req.set_fr_family(AF_INET6);
265  req.set_fr_flow_vrf(flow_entry_->data().vrf);
266  uint16_t flags = 0;
267  uint16_t flags1 = 0;
268 
269  if (op == sandesh_op::DEL) {
270  if (hash_id_ == FlowEntry::kInvalidFlowHandle) {
271  return 0;
272  }
273 
274  req.set_fr_flags(0);
275  req.set_fr_flags1(0);
276  // Sync() is not called in case of delete. Copy the event to use
277  // the right token
278  last_event_ = (FlowEvent::Event)flow_entry_->last_event();
279  } else {
280  flags = VR_FLOW_FLAG_ACTIVE;
281  uint32_t fe_action = flow_entry_->match_p().action_info.action;
282  if ((fe_action) & (1 << TrafficAction::PASS)) {
283  action = VR_FLOW_ACTION_FORWARD;
284  }
285 
286  if ((fe_action) & (1 << TrafficAction::DENY)) {
287  action = VR_FLOW_ACTION_DROP;
288  drop_reason = GetDropReason(flow_entry_->data().drop_reason);
289  }
290 
291  if (action == VR_FLOW_ACTION_FORWARD &&
292  flow_entry_->is_flags_set(FlowEntry::NatFlow)) {
293  action = VR_FLOW_ACTION_NAT;
294  }
295 
296  if (action == VR_FLOW_ACTION_NAT &&
297  flow_entry_->reverse_flow_entry() == NULL) {
298  action = VR_FLOW_ACTION_DROP;
299  }
300 
301  if ((fe_action) & (1 << TrafficAction::HBS)) {
302  if (flow_entry_->is_flags_set(FlowEntry::HbfFlow)) {
303  if (flow_entry_->GetHbsInterface() ==
305  flags1 |= VR_FLOW_FLAG1_HBS_RIGHT;
306  } else {
307  flags1 |= VR_FLOW_FLAG1_HBS_LEFT;
308  }
309  }
310  }
311 
312  if ((fe_action) & (1 << TrafficAction::MIRROR)) {
313  flags |= VR_FLOW_FLAG_MIRROR;
314  req.set_fr_mir_id(-1);
315  req.set_fr_sec_mir_id(-1);
316  if (flow_entry_->match_p().action_info.mirror_l.size() >
318  FLOW_TRACE(Err, hash_id_,
319  "Don't support more than two mirrors/analyzers per "
320  "flow:" + integerToString
321  (flow_entry_->
322  data().match_p.action_info.mirror_l.size()));
323  }
324  // Lookup for fist and second mirror entries
325  std::vector<MirrorActionSpec>::const_iterator it;
326  it = flow_entry_->match_p().action_info.mirror_l.begin();
327  MirrorKSyncObject* obj = ksync_obj_->ksync()->agent()->ksync()->
328  mirror_ksync_obj();
329  uint16_t idx_1 = obj->GetIdx((*it).analyzer_name);
330  req.set_fr_mir_id(idx_1);
331  FLOW_TRACE(ModuleInfo, "Mirror index first: " +
332  integerToString(idx_1));
333  ++it;
334  if (it != flow_entry_->match_p().action_info.mirror_l.end()) {
335  uint16_t idx_2 = obj->GetIdx((*it).analyzer_name);
336  if (idx_1 != idx_2) {
337  req.set_fr_sec_mir_id(idx_2);
338  FLOW_TRACE(ModuleInfo, "Mirror index second: " +
339  integerToString(idx_2));
340  } else {
341  FLOW_TRACE(Err, hash_id_,
342  "Both Mirror indexes are same, hence didn't set "
343  "the second mirror dest.");
344  }
345  }
346  req.set_fr_mir_vrf(flow_entry_->data().mirror_vrf);
347  req.set_fr_mir_sip(htonl(ksync_obj_->ksync()->agent()->
348  router_id().to_ulong()));
349  req.set_fr_mir_sport(htons(ksync_obj_->ksync()->agent()->
350  mirror_port()));
351  std::vector<int8_t> pcap_data;
352  SetPcapData(flow_entry_, pcap_data);
353  req.set_fr_pcap_meta_data(pcap_data);
354  }
355 
356  if (flow_entry_->data().component_nh_idx !=
358  req.set_fr_ecmp_nh_index(flow_entry_->data().component_nh_idx);
359  } else {
360  req.set_fr_ecmp_nh_index(-1);
361  }
362 
363  if (action == VR_FLOW_ACTION_NAT) {
364  FlowEntry *nat_flow = flow_entry_->reverse_flow_entry();
365  const FlowKey *nat_key = &nat_flow->key();
366 
367  if (flow_entry_->key().src_addr != nat_key->dst_addr) {
368  flags |= VR_FLOW_FLAG_SNAT;
369  }
370  if (flow_entry_->key().dst_addr != nat_key->src_addr) {
371  flags |= VR_FLOW_FLAG_DNAT;
372  }
373 
374  if (flow_entry_->key().protocol == IPPROTO_TCP ||
375  flow_entry_->key().protocol == IPPROTO_UDP) {
376  if (flow_entry_->key().src_port != nat_key->dst_port) {
377  flags |= VR_FLOW_FLAG_SPAT;
378  }
379  if (flow_entry_->key().dst_port != nat_key->src_port) {
380  flags |= VR_FLOW_FLAG_DPAT;
381  }
382  }
383 
384  //Link local, flag determines relaxed policy
386  flags |= VR_FLOW_FLAG_LINK_LOCAL;
387  }
388 
389  //Bgp service, flag determines relaxed policy
390  if (nat_flow->is_flags_set(FlowEntry::BgpRouterService)) {
391  flags |= VR_FLOW_BGP_SERVICE;
392  }
393 
394  if (nat_flow->allocated_port()) {
395  flags |= VR_FLOW_BGP_SERVICE;
396  }
397 
398  flags |= VR_FLOW_FLAG_VRFT;
399  req.set_fr_flow_dvrf(flow_entry_->data().dest_vrf);
400  } else if (flow_entry_->is_flags_set(FlowEntry::AliasIpFlow)) {
401  flags |= VR_FLOW_FLAG_VRFT;
402  req.set_fr_flow_dvrf(flow_entry_->data().dest_vrf);
403  }
404 
405  if (fe_action & (1 << TrafficAction::VRF_TRANSLATE)) {
406  flags |= VR_FLOW_FLAG_VRFT;
407  req.set_fr_flow_dvrf(flow_entry_->data().dest_vrf);
408  }
409 
410  if (flow_entry_->is_flags_set(FlowEntry::Trap)) {
411  action = VR_FLOW_ACTION_HOLD;
412  }
413 
414  if (enable_rpf_) {
415  req.set_fr_src_nh_index(src_nh_id_);
416  } else {
417  //Set to discard, vrouter ignores RPF check if
418  //nexthop is set to discard
419  req.set_fr_src_nh_index(0);
420  }
421 
422  FlowEntry *rev_flow = flow_entry_->reverse_flow_entry();
423  if (rev_flow) {
424  flags |= VR_RFLOW_VALID;
425  req.set_fr_rindex(rev_flow->flow_handle());
426  if (rev_flow->flow_handle() == FlowEntry::kInvalidFlowHandle) {
427  const FlowKey &rkey = rev_flow->key();
428  req.set_fr_rflow_nh_id(rkey.nh);
429  uint64_t supper, dupper, slower, dlower;
430 
431  IpToU64(rkey.src_addr, rkey.dst_addr, &supper, &slower,
432  &dupper, &dlower);
433  req.set_fr_rflow_sip_l(slower);
434  req.set_fr_rflow_sip_u(supper);
435  req.set_fr_rflow_dip_l(dlower);
436  req.set_fr_rflow_dip_u(dupper);
437 
438  req.set_fr_rflow_sport(htons(rkey.src_port));
439  req.set_fr_rflow_dport(htons(rkey.dst_port));
440  }
441  }
442 
443  if (flow_entry_->IsShortFlow()) {
444  action = VR_FLOW_ACTION_DROP;
445  }
446 
447  req.set_fr_flags(flags);
448  req.set_fr_flags1(flags1);
449  req.set_fr_action(action);
450  req.set_fr_drop_reason(drop_reason);
451  req.set_fr_qos_id(qos_config_idx);
452  req.set_fr_ttl(flow_entry_->data().ttl);
453  req.set_fr_underlay_ecmp_index(underlay_gw_index);
454  }
455 
456  FlowProto *proto = ksync_obj_->ksync()->agent()->pkt()->get_flow_proto();
457  token_ = proto->GetToken(last_event_);
458  encode_len = req.WriteBinary((uint8_t *)buf, buf_len, &error);
459  return encode_len;
460 }
461 
463  bool changed = false;
464 
465  last_event_ = (FlowEvent::Event)flow_entry_->last_event();
466  FlowEntry *rev_flow = flow_entry_->reverse_flow_entry();
467  if (rev_flow) {
468  if (old_reverse_flow_id_ != rev_flow->flow_handle()) {
469  if (old_reverse_flow_id_ != FlowEntry::kInvalidFlowHandle)
470  changed = true;
471  old_reverse_flow_id_ = rev_flow->flow_handle();
472  }
473  }
474 
475  if (flow_entry_->match_p().action_info.action != old_action_) {
476  old_action_ = flow_entry_->match_p().action_info.action;
477  changed = true;
478  }
479 
480  if (flow_entry_->data().drop_reason != old_drop_reason_) {
481  old_drop_reason_ = flow_entry_->data().drop_reason;
482  changed = true;
483  }
484  if (flow_entry_->data().component_nh_idx != old_component_nh_idx_) {
485  old_component_nh_idx_ = flow_entry_->data().component_nh_idx;
486  changed = true;
487  }
488 
489  if (vrouter_gen_id_ != gen_id_) {
490  vrouter_gen_id_ = gen_id_;
491  changed = true;
492  }
493 
494  if (vrouter_hash_id_ != hash_id_) {
495  vrouter_hash_id_ = hash_id_;
496  changed = true;
497  }
498 
499  MirrorKSyncObject* obj = ksync_obj_->ksync()->mirror_ksync_obj();
500  // Lookup for fist and second mirror entries
501  std::vector<MirrorActionSpec>::const_iterator it;
502  it = flow_entry_->match_p().action_info.mirror_l.begin();
503  if (it != flow_entry_->match_p().action_info.mirror_l.end()) {
504  uint16_t idx = obj->GetIdx((*it).analyzer_name);
505  if (!((*it).analyzer_name.empty()) &&
506  (idx == MirrorTable::kInvalidIndex)) {
507  // runn timer to update flow entry
508  ksync_obj_->UpdateUnresolvedFlowEntry(flow_entry_);
509  } else if (old_first_mirror_index_ != idx) {
510  old_first_mirror_index_ = idx;
511  changed = true;
512  }
513  ++it;
514  if (it != flow_entry_->match_p().action_info.mirror_l.end()) {
515  idx = obj->GetIdx((*it).analyzer_name);
516  if (!((*it).analyzer_name.empty()) &&
517  (idx == MirrorTable::kInvalidIndex)) {
518  // run time and to update flow entry;
519  ksync_obj_->UpdateUnresolvedFlowEntry(flow_entry_);
520  } else if (old_second_mirror_index_ != idx) {
521  old_second_mirror_index_ = idx;
522  changed = true;
523  }
524  }
525  }
526 
527  //Trap reverse flow
528  if (trap_flow_ != flow_entry_->is_flags_set(FlowEntry::Trap)) {
529  trap_flow_ = flow_entry_->is_flags_set(FlowEntry::Trap);
530  changed = true;
531  }
532 
533  if (ecmp_ != flow_entry_->is_flags_set(FlowEntry::EcmpFlow)) {
534  ecmp_ = flow_entry_->is_flags_set(FlowEntry::EcmpFlow);
535  changed = true;
536  }
537 
538  if (enable_rpf_ != flow_entry_->data().enable_rpf) {
539  enable_rpf_ = flow_entry_->data().enable_rpf;
540  changed = true;
541  }
542 
543  uint32_t nh_id = NextHopTable::kRpfDiscardIndex;
544  if (flow_entry_->data().rpf_nh.get()) {
545  nh_id = flow_entry_->data().rpf_nh.get()->id();
546  }
547  if (src_nh_id_ != nh_id) {
548  src_nh_id_ = nh_id;
549  changed = true;
550  }
551 
552  if (qos_config_idx != flow_entry_->data().qos_config_idx) {
553  qos_config_idx = flow_entry_->data().qos_config_idx;
554  changed = true;
555  }
556  if (transaction_id_ != flow_entry_->GetTransactionId()) {
557  transaction_id_ = flow_entry_->GetTransactionId();
558  changed = true;
559  }
560  if (underlay_gw_index != flow_entry_->data().underlay_gw_index_) {
561  underlay_gw_index = flow_entry_->data().underlay_gw_index_;
562  changed = true;
563  }
564  return changed;
565 }
566 
568  // KSync Flow being triggered from parallel threads due to
569  // table partition doesnot allow safe usage of
570  // UnresolvedReference. Please avoid any dependency handling
571  // for KSync Flow
572  return NULL;
573 }
574 
575 int FlowTableKSyncEntry::AddMsg(char *buf, int buf_len) {
576  return Encode(sandesh_op::ADD, buf, buf_len);
577 }
578 
579 int FlowTableKSyncEntry::ChangeMsg(char *buf, int buf_len) {
580  return Encode(sandesh_op::ADD, buf, buf_len);
581 }
582 
583 int FlowTableKSyncEntry::DeleteMsg(char *buf, int buf_len) {
584  return Encode(sandesh_op::DEL, buf, buf_len);
585 }
586 
587 std::string FlowTableKSyncEntry::ToString() const {
588  std::ostringstream str;
589  const FlowKey *fe_key = &flow_entry_->key();
590  str << "Flow : " << hash_id_
591  << " with Source IP: " << fe_key->src_addr.to_string()
592  << " Source port: " << fe_key->src_port
593  << " Destination IP: " << fe_key->dst_addr.to_string()
594  << " Destination port: " << fe_key->dst_port
595  << " Protocol "<< (uint16_t)fe_key->protocol;
596  return str.str();
597 }
598 
599 bool FlowTableKSyncEntry::IsLess(const KSyncEntry &rhs) const {
600  const FlowTableKSyncEntry &entry = static_cast
601  <const FlowTableKSyncEntry &>(rhs);
602  /*
603  * Ksync Flow Table should have the same key as vrouter flow table,
604  * so that all the flow entries present in vrouter can be represented
605  * in Ksync. This will also ensure that the index change for a flow
606  * entry will be sync'ed appropriately in vrouter.
607  */
608  if (hash_id_ != entry.hash_id_) {
609  return hash_id_ < entry.hash_id_;
610  }
611  return flow_entry_ < entry.flow_entry_;
612 }
613 
614 void FlowTableKSyncEntry::ErrorHandler(int err, uint32_t seq_no,
615  KSyncEvent event) const {
616  if (err == ENOSPC || err == EBADF) {
617  KSYNC_ERROR(VRouterError, "VRouter operation failed. Error <", err,
618  ":", VrouterError(err), ">. Object <", ToString(),
619  ">. Operation <", AckOperationString(event),
620  ">. Message number :", seq_no);
621  }
622  return;
623 }
624 
625 std::string FlowTableKSyncEntry::VrouterError(uint32_t error) const {
626  if (error == EBADF)
627  return "Flow gen id Mismatch";
628  else if (error == ENOSPC)
629  return "Flow Table bucket full";
630  else if (error == EFAULT)
631  return "Flow Key Mismatch with same gen id";
632  else return KSyncEntry::VrouterError(error);
633 }
634 
636  FlowEntry *flow_entry = flowptr.get();
637  if (!flow_entry->IsShortFlow() && !flow_entry->IsOnUnresolvedList()) {
638  unresolved_flow_list_.push_back(flow_entry);
639  flow_entry->SetUnResolvedList(true);
640  StartTimer();
641  }
642 }
643 /*
644  * timer will be triggred once after adding unresolved entry.
645  * will be stoped once after list becomes empty.
646  */
648  if (timer_ == NULL) {
649  timer_ = TimerManager::CreateTimer(
650  *(ksync_->agent()->event_manager())->io_service(),
651  "flow dep sync timer",
652  ksync_->agent()->task_scheduler()->GetTaskId(kTaskFlowEvent),
653  flow_table()->table_index());
654  }
655  timer_->Start(kFlowDepSyncTimeout,
656  boost::bind(&FlowTableKSyncObject::TimerExpiry, this));
657 }
658 
659 /*
660  * This fuction will be triggred on 1 sec delay
661  * if the entry marked deleted will not call the ksync update
662  * if the number attempts are more than 4 times will mark the flow as shortflow
663  */
664 
666  uint16_t count = 0;
667  while (!unresolved_flow_list_.empty() && count < KFlowUnresolvedListYield) {
668  FlowEntryPtr flow = unresolved_flow_list_.front();
669  FlowEntry *flow_entry = flow.get();
670  unresolved_flow_list_.pop_front();
671  flow_entry->SetUnResolvedList(false);
672  count++;
673  if (!flow_entry->deleted()) {
674  FlowProto *proto = ksync()->agent()->pkt()->get_flow_proto();
675  proto->EnqueueUnResolvedFlowEntry(flow.get());
676  }
677  }
678  if (!unresolved_flow_list_.empty())
679  return true;
680  return false;
681 }
682 
684  KSyncObject("KSync FlowTable"), ksync_(ksync), free_list_(this),
685  timer_(NULL) {
686 }
687 
689  KSyncObject("KSync FlowTable", max_index), ksync_(ksync), free_list_(this) {
690 }
691 
694 }
695 
696 KSyncEntry *FlowTableKSyncObject::Alloc(const KSyncEntry *key, uint32_t index) {
697  const FlowTableKSyncEntry *entry =
698  static_cast<const FlowTableKSyncEntry *>(key);
699  return free_list_.Allocate(entry);
700 }
701 
703  FlowTableKSyncEntry *ksync = static_cast<FlowTableKSyncEntry *>(entry);
704  free_list_.Free(ksync);
705 }
706 
708  FlowTableKSyncEntry entry(this, key, key->flow_handle());
709  KSyncObject *obj = static_cast<KSyncObject *>(this);
710  return static_cast<FlowTableKSyncEntry *>(obj->Find(&entry));
711 }
712 
713 void FlowTableKSyncObject::UpdateKey(KSyncEntry *entry, uint32_t flow_handle) {
714  static_cast<FlowTableKSyncEntry *>(entry)->set_hash_id(flow_handle);
715 }
716 
718  return static_cast<FlowTableKSyncEntry *>(entry)->hash_id();
719 }
720 
722  uint32_t flow_handle) {
723  ChangeKey(entry, flow_handle);
724 }
725 
727 }
728 
730 // KSyncFlowEntryFreeList implementation
733  object_(object), max_count_(0), grow_pending_(false), total_alloc_(0),
734  total_free_(0), free_list_() {
735 
736  uint32_t count = kInitCount;
737  if (object->ksync()->agent()->test_mode()) {
738  count = kTestInitCount;
739  }
740  while (max_count_ < count) {
741  free_list_.push_back(*new FlowTableKSyncEntry(object_));
742  max_count_++;
743  }
744 }
745 
747  while (free_list_.empty() == false) {
748  FreeList::iterator it = free_list_.begin();
749  FlowTableKSyncEntry *flow = &(*it);
750  free_list_.erase(it);
751  delete flow;
752  }
753 }
754 
755 // Allocate a chunk of FlowEntries
757  grow_pending_ = false;
758  if (free_list_.size() >= kMinThreshold)
759  return;
760 
761  for (uint32_t i = 0; i < kGrowSize; i++) {
762  free_list_.push_front(*new FlowTableKSyncEntry(object_));
763  max_count_++;
764  }
765 }
766 
768  const FlowTableKSyncEntry *flow_key =
769  static_cast<const FlowTableKSyncEntry *>(key);
770  FlowTableKSyncEntry *flow = NULL;
771  if (free_list_.size() == 0) {
772  flow = new FlowTableKSyncEntry(object_);
773  max_count_++;
774  } else {
775  FreeList::iterator it = free_list_.begin();
776  flow = &(*it);
777  free_list_.erase(it);
778  }
779 
780  if (grow_pending_ == false && free_list_.size() < kMinThreshold) {
781  grow_pending_ = true;
782  FlowProto *proto = object_->ksync()->agent()->pkt()->get_flow_proto();
783  proto->GrowFreeListRequest(flow_key->flow_entry()->flow_table());
784  }
785 
786  // Do post allocation initialization
787  flow->Reset(flow_key->flow_entry().get(), flow_key->hash_id());
788  flow->set_evict_gen_id(flow_key->evict_gen_id_);
789  total_alloc_++;
790  return flow;
791 }
792 
794  total_free_++;
795  flow->Reset();
796  if (free_list_.size() < kMaxThreshold)
797  free_list_.push_back(*flow);
798  else {
799  delete flow;
800  --max_count_;
801  }
802 }
803 
805  free_list_.Grow();
806 }
807 
808 // We want to handle KSync transitions for flow from Flow task context.
809 // KSync allows the NetlinkAck API to be over-ridden for custom handling.
810 // Provide an implementation to enqueue an request
812  KSyncEntry::KSyncEvent event) {
813  FlowProto *proto = ksync()->agent()->pkt()->get_flow_proto();
814  const FlowTableKSyncEntry *flow_ksync_entry =
815  static_cast<const FlowTableKSyncEntry *>(entry);
816  const FlowKSyncResponseInfo *resp =
817  flow_ksync_entry->ksync_response_info();
818  proto->KSyncEventRequest(entry, event, resp->flow_handle_,
819  resp->gen_id_, resp->ksync_error_,
821  resp->evict_flow_oflow_,
822  flow_ksync_entry->get_transaction_id());
823 }
824 
826  KSyncEntry::KSyncEvent event) {
827  KSyncObject::NetlinkAck(entry, event);
828 }
IpAddress src_addr
Definition: flow_entry.h:213
int hash_id
FlowTable * flow_table() const
void KSyncEventRequest(KSyncEntry *ksync_entry, KSyncEntry::KSyncEvent event, uint32_t flow_handle, uint8_t gen_id, int ksync_error, uint64_t evict_flow_bytes, uint64_t evict_flow_packets, int32_t evict_flow_oflow, uint32_t transcation_id)
Definition: flow_proto.cc:633
FlowTableKSyncObject(KSync *ksync)
static const uint32_t kInvalidComponentNHIdx
Definition: nexthop.h:1777
void SetUnResolvedList(bool added)
Definition: flow_entry.h:750
bool test_mode() const
Definition: agent.h:1191
static const uint32_t kTestInitCount
void set_evict_gen_id(uint8_t gen_id)
#define kTaskFlowEvent
Definition: agent.h:321
uint32_t GetTransactionId()
Definition: flow_entry.h:770
Agent * agent() const
Definition: ksync_init.h:39
KSyncEntry * Alloc(const KSyncEntry *key, uint32_t index)
IpAddress dst_addr
Definition: flow_entry.h:214
static void EncodeKSyncIp(vr_flow_req *req, const IpAddress &sip, const IpAddress &dip)
virtual KSyncEntry * UnresolvedReference()
KSync * ksync() const
Definition: mirror_ksync.h:65
KSyncFlowEntryFreeList free_list_
void IpToU64(const IpAddress &sip, const IpAddress &dip, uint64_t *sip_u, uint64_t *sip_l, uint64_t *dip_u, uint64_t *dip_l)
uint32_t hash_id() const
boost::asio::ip::address IpAddress
Definition: address.h:13
std::string ToString() const
static const uint32_t kInvalidIndex
Definition: qos_config.h:144
#define KSYNC_ERROR(obj,...)
Definition: ksync_entry.h:16
uint16_t table_index() const
Definition: flow_table.h:198
FlowTableKSyncObject * object_
static const uint32_t kMaxThreshold
FlowEntryPtr flow_entry_
uint8_t protocol
Definition: flow_entry.h:215
virtual void NetlinkAck(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
void ChangeKey(KSyncEntry *entry, uint32_t arg)
uint16_t allocated_port()
Definition: flow_entry.h:766
void SetPcapData(FlowEntryPtr fe, std::vector< int8_t > &data)
void GenerateKSyncEvent(FlowTableKSyncEntry *entry, KSyncEntry::KSyncEvent event)
const FlowKey & key() const
Definition: flow_entry.h:594
static string ToString(PhysicalDevice::ManagementProtocol proto)
void UpdateKey(KSyncEntry *entry, uint32_t flow_handle)
uint32_t GetKey(KSyncEntry *entry)
virtual void ErrorHandler(int, uint32_t, KSyncEvent) const
static const uint8_t kInvalidIndex
Definition: mirror_table.h:119
static const uint32_t kRpfDiscardIndex
Definition: nexthop.h:1947
virtual ~FlowTableKSyncEntry()
bool is_flags_set(const FlowEntryFlags &flags) const
Definition: flow_entry.h:610
uint8_t type
Definition: load_balance.h:109
static const std::string integerToString(const NumberType &num)
Definition: string_util.h:19
void Free(FlowTableKSyncEntry *flow)
MirrorKSyncObject * mirror_ksync_obj() const
Definition: ksync_init.h:40
void UpdateUnresolvedFlowEntry(FlowEntryPtr flowptr)
uint32_t get_transaction_id() const
void Free(KSyncEntry *key)
KSync * ksync() const
Definition: agent.cc:901
FlowTableKSyncEntry * Allocate(const KSyncEntry *key)
void EnqueueUnResolvedFlowEntry(FlowEntry *flow)
Definition: flow_proto.cc:676
static const uint32_t kMinThreshold
#define FLOW_TRACE(obj,...)
Definition: flow_mgmt.h:377
const FlowKSyncResponseInfo * ksync_response_info() const
virtual std::string VrouterError(uint32_t error) const
static Timer * CreateTimer(boost::asio::io_context &service, const std::string &name, int task_id=Timer::GetTimerTaskId(), int task_instance=Timer::GetTimerInstanceId(), bool delete_on_completion=false)
Definition: timer.cc:201
FlowProto * get_flow_proto() const
Definition: pkt_init.h:43
uint16_t src_port
Definition: flow_entry.h:216
static uint16_t GetDropReason(uint16_t dr)
KSync * ksync() const
void NetlinkAck(KSyncEntry *entry, KSyncEntry::KSyncEvent event)
virtual std::string VrouterError(uint32_t error) const
uint32_t flow_handle() const
Definition: flow_entry.h:600
int AddMsg(char *buf, int buf_len)
bool IsLess(const KSyncEntry &rhs) const
static const uint8_t kMaxMirrorsPerFlow
Definition: flow_entry.h:522
uint16_t dst_port
Definition: flow_entry.h:217
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
static const uint32_t kInvalidFlowHandle
Definition: flow_entry.h:521
bool IsOnUnresolvedList()
Definition: flow_entry.h:749
FlowTableKSyncEntry(FlowTableKSyncObject *obj)
FlowEntry * reverse_flow_entry()
Definition: flow_entry.h:602
bool IsShortFlow() const
Definition: flow_entry.h:682
virtual uint32_t GetTableIndex() const
FlowEntryPtr flow_entry() const
int DeleteMsg(char *buf, int buf_len)
KSyncFlowEntryFreeList(FlowTableKSyncObject *object)
bool deleted()
Definition: flow_entry.h:680
int ChangeMsg(char *buf, int buf_len)
uint8_t gen_id() const
Definition: flow_entry.h:599
PktModule * pkt() const
Definition: agent.cc:965
uint32_t nh
Definition: flow_entry.h:212
uint32_t GetIdx(std::string analyzer_name)
static const uint32_t kGrowSize
static const uint32_t kInitCount
void UpdateFlowHandle(FlowTableKSyncEntry *entry, uint32_t flow_handle)
int Encode(sandesh_op::type op, char *buf, int buf_len)
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
boost::intrusive_ptr< FlowEntry > FlowEntryPtr
Definition: flow_entry.h:125
void GrowFreeListRequest(FlowTable *table)
Definition: flow_proto.cc:627
void Reset()
Definition: ksync_entry.h:82
KSyncObject * GetObject() const
TokenPtr GetToken(FlowEvent::Event event)
Definition: flow_proto.cc:714
FlowTableKSyncEntry * Find(FlowEntry *key)