OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
flow_sandesh_impl.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <sandesh/common/flow_types.h>
6 
7 bool SessionIpPortProtocol::operator < (const SessionIpPortProtocol &rhs) const {
8  if (service_port < rhs.service_port) {
9  return true;
10  } else if (service_port == rhs.service_port && protocol < rhs.protocol) {
11  return true;
12  } else if (service_port == rhs.service_port && protocol == rhs.protocol && local_ip < rhs.local_ip) {
13  return true;
14  } else {
15  return false;
16  }
17 }
18 
19 bool SessionIpPort::operator < (const SessionIpPort &rhs) const {
20  if (port < rhs.port) {
21  return true;
22  } else if (port == rhs.port && ip < rhs.ip) {
23  return true;
24  } else {
25  return false;
26  }
27 }
28 
29 /*
30  * print everything in SessionEndpoint structure other than the map
31  */
32 std::string SessionEndpointLog(const SessionEndpoint &sep) {
33  std::stringstream Xbuf;
34  //construct per session messages
35  Xbuf << "vmi" << " = " << sep.vmi;
36  Xbuf << " " << "vn" << " = " << sep.vn;
37  if (sep.__isset.deployment) {
38  Xbuf << " " << "deployment" << " = " << sep.deployment;
39  }
40  if (sep.__isset.tier) {
41  Xbuf << " " << "tier" << " = " << sep.tier;
42  }
43  if (sep.__isset.application) {
44  Xbuf << " " << "application" << " = " << sep.application;
45  }
46  if (sep.__isset.site) {
47  Xbuf << " " << "site" << " = " << sep.site;
48  }
49  if (sep.__isset.labels) {
50  Xbuf << " " << "labels" << "= [ ";
51  {
52  Xbuf << " " << "[";
53  std::set<std::string> ::const_iterator label_iter;
54  for (label_iter = sep.labels.begin();
55  label_iter != sep.labels.end(); ++label_iter) {
56  Xbuf << " " << "label val" << " = " << (*label_iter);
57  Xbuf << ", ";
58  }
59  Xbuf << " " << "]";
60  }
61  }
62  Xbuf << " ]";
63  if (sep.__isset.remote_deployment) {
64  Xbuf << " " << "remote_deployment" << " = " << sep.remote_deployment;
65  }
66  if (sep.__isset.remote_tier) {
67  Xbuf << " " << "remote_tier" << " = " << sep.remote_tier;
68  }
69  if (sep.__isset.remote_application) {
70  Xbuf << " " << "remote_application" << " = " << sep.remote_application;
71  }
72  if (sep.__isset.remote_site) {
73  Xbuf << " " << "remote_site" << " = " << sep.remote_site;
74  }
75  if (sep.__isset.remote_labels) {
76  Xbuf << " " << "remote_labels" << "= [ ";
77  {
78  Xbuf << " " << "[";
79  std::set<std::string> ::const_iterator label_iter;
80  for (label_iter = sep.remote_labels.begin();
81  label_iter != sep.remote_labels.end(); ++label_iter) {
82  Xbuf << " " << "(label val)" << " = " << (*label_iter);
83  Xbuf << ", ";
84  }
85  Xbuf << " " << "]";
86  }
87  Xbuf << " ]";
88  }
89  if (sep.__isset.security_policy_rule) {
90  Xbuf << " " << "security_policy_rule" << " = " <<
91  sep.security_policy_rule;
92  }
93  Xbuf << " " << "remote_vn" << " = " << sep.remote_vn;
94  Xbuf << " " << "is_client_session" << " = " <<
95  integerToString(sep.is_client_session);
96  Xbuf << " " << "is_si" << " = " << integerToString(sep.is_si);
97  if (sep.__isset.remote_prefix) {
98  Xbuf << " " << "remote_prefix" << " = " << sep.remote_prefix;
99  }
100  Xbuf << " " << "vrouter_ip" << " = " << sep.vrouter_ip;
101  return Xbuf.str();
102 }
103 
104 /*
105  * Print everything in SessionAggInfo struct otherthan the map
106  */
107 std::string SessionAggInfoLog(
108  const SessionAggInfo &sess_agg) {
109  std::stringstream Xbuf;
110  if (sess_agg.__isset.sampled_forward_bytes) {
111  Xbuf << "sampled_forward_bytes" << " = " << integerToString(sess_agg.sampled_forward_bytes);
112  }
113  if (sess_agg.__isset.sampled_forward_pkts) {
114  Xbuf << " " << "sampled_forward_pkts" << " = " << integerToString(sess_agg.sampled_forward_pkts);
115  }
116  if (sess_agg.__isset.sampled_reverse_bytes) {
117  Xbuf << " " << "sampled_reverse_bytes" << " = " << integerToString(sess_agg.sampled_reverse_bytes);
118  }
119  if (sess_agg.__isset.sampled_reverse_pkts) {
120  Xbuf << " " << "sampled_reverse_pkts" << " = " << integerToString(sess_agg.sampled_reverse_pkts);
121  }
122  if (sess_agg.__isset.logged_forward_bytes) {
123  Xbuf << " " << "logged_forward_bytes" << " = " << integerToString(sess_agg.logged_forward_bytes);
124  }
125  if (sess_agg.__isset.logged_forward_pkts) {
126  Xbuf << " " << "logged_forward_pkts" << " = " << integerToString(sess_agg.logged_forward_pkts);
127  }
128  if (sess_agg.__isset.logged_reverse_bytes) {
129  Xbuf << " " << "logged_reverse_bytes" << " = " << integerToString(sess_agg.logged_reverse_bytes);
130  }
131  return Xbuf.str();
132 }
133 
134 void SessionEndpointObject::LogUnrolled(std::string category,
135  SandeshLevel::type level,
136  const std::vector<SessionEndpoint> & session_data) {
137  log4cplus::LogLevel Xlog4level(SandeshLevelTolog4Level(level));
138  log4cplus::Logger Xlogger = Sandesh::sampled_logger();
139  log4cplus::Logger SLO_logger = Sandesh::slo_logger();
140  bool is_send_sample_to_logger_enabled = Sandesh::is_send_sampled_to_logger_enabled();
141  bool is_send_slo_to_logger_enabled = Sandesh::is_send_slo_to_logger_enabled();
142  log4cplus::tostringstream Xbuf;
143  std::vector<SessionEndpoint> ::const_iterator sep_iter;
144  for (sep_iter = session_data.begin(); sep_iter != session_data.end();
145  ++sep_iter) {
146  std::string sep_info = SessionEndpointLog(*sep_iter);
147  std::map<SessionIpPortProtocol, SessionAggInfo>::const_iterator
148  local_ep_iter;
149  for (local_ep_iter = sep_iter->sess_agg_info.begin();
150  local_ep_iter != sep_iter->sess_agg_info.end(); ++local_ep_iter++) {
151  std::map<SessionIpPort, SessionInfo>::const_iterator sessions_iter;
152  std::string local_ep_info = local_ep_iter->first.log();
153  std::string sess_agg_data = SessionAggInfoLog(local_ep_iter->second);
154  // for each of the individual session
155  for (sessions_iter = local_ep_iter->second.sessionMap.begin();
156  sessions_iter != local_ep_iter->second.sessionMap.end();
157  ++sessions_iter) {
158  Xbuf.clear();
159  Xbuf.str(std::string());
160  Xbuf << category << " [" << LevelToString(level) << "]: SessionData: ";
161  Xbuf << "[ ";
162  Xbuf << sep_info << " ";
163  // print sess_agg_info keys [ip, port, protocol of local]
164  Xbuf << local_ep_info << " ";
165  // print sess_agg_info values [aggregate logged and sampled data
166  // for one session endpoint]
167  Xbuf << sess_agg_data << " ";
168  // print SessionInfo keys [ip, port of the remote end]
169  Xbuf << sessions_iter->first.log() << " " ;
170  // print SessionInfo values [aggregate data for individual session]
171  Xbuf << sessions_iter->second.log() << " ";
172  Xbuf << " ]";
173  // If SLO session log to SLO_logger
174  const SessionInfo & sess_info = sessions_iter->second;
175  if (sess_info.forward_flow_info.__isset.logged_bytes ||
176  sess_info.reverse_flow_info.__isset.logged_bytes) {
177  if (is_send_slo_to_logger_enabled) {
178  SLO_logger.forcedLog(Xlog4level, Xbuf.str());
179  }
180  }
181  // If sampled session log to Xlogger
182  if (sess_info.forward_flow_info.__isset.sampled_bytes ||
183  sess_info.reverse_flow_info.__isset.sampled_bytes) {
184  if (is_send_sample_to_logger_enabled) {
185  Xlogger.forcedLog(Xlog4level, Xbuf.str());
186  }
187  }
188  }
189  }
190  }
191 }
192 
193 /*
194  * Remove the sessions from the SessionEndPoint struct
195  * if the session need not go to the collector
196  */
197 void SessionEndpointObject::adjust_session_end_point_objects(
198  std::vector<SessionEndpoint> &session_data) {
199  bool is_send_sampled_to_collector = Sandesh::is_send_sampled_to_collector_enabled();
200  bool is_send_slo_to_collector = Sandesh::is_send_slo_to_collector_enabled();
201  std::vector<SessionEndpoint> ::iterator sep_iter;
202  for (sep_iter = session_data.begin();
203  sep_iter != session_data.end(); ++sep_iter) {
204  std::map<SessionIpPortProtocol, SessionAggInfo>::iterator
205  local_ep_iter;
206  for (local_ep_iter = sep_iter->sess_agg_info.begin();
207  local_ep_iter != sep_iter->sess_agg_info.end();
208  local_ep_iter++) {
209  // Adjust the aggregate logged and sampled info
210  if (!is_send_slo_to_collector) {
211  local_ep_iter->second.__isset.logged_forward_bytes = false;
212  local_ep_iter->second.__isset.logged_forward_pkts = false;
213  local_ep_iter->second.__isset.logged_reverse_bytes = false;
214  local_ep_iter->second.__isset.logged_reverse_pkts = false;
215  }
216  if (!is_send_sampled_to_collector) {
217  local_ep_iter->second.__isset.sampled_forward_bytes = false;
218  local_ep_iter->second.__isset.sampled_forward_pkts = false;
219  local_ep_iter->second.__isset.sampled_reverse_bytes = false;
220  local_ep_iter->second.__isset.sampled_reverse_pkts = false;
221  }
222  // Iterate the individual sessions
223  std::map<SessionIpPort, SessionInfo>::iterator sessions_iter;
224  sessions_iter = local_ep_iter->second.sessionMap.begin();
225  while (sessions_iter != local_ep_iter->second.sessionMap.end()) {
226  bool erase_session = false;
227  SessionInfo & sess_info = sessions_iter->second;
228  // Handle if its a logged session
229  if (sess_info.forward_flow_info.__isset.logged_bytes ||
230  sess_info.reverse_flow_info.__isset.logged_bytes) {
231  if (!is_send_slo_to_collector) {
232  // dont erase if session is both sampled and logged
233  if (!sess_info.forward_flow_info.__isset.
234  sampled_bytes &&
235  !sess_info.reverse_flow_info.__isset.
236  sampled_bytes) {
237  erase_session = true;
238  } else {
239  sess_info.forward_flow_info.__isset.logged_bytes = false;
240  sess_info.forward_flow_info.__isset.logged_pkts = false;
241  sess_info.reverse_flow_info.__isset.logged_bytes = false;
242  sess_info.reverse_flow_info.__isset.logged_pkts = false;
243  }
244  }
245  }
246  // Handle if its a sampled session
247  if (sess_info.forward_flow_info.__isset.sampled_bytes ||
248  sess_info.reverse_flow_info.__isset.sampled_bytes) {
249  if (!is_send_sampled_to_collector) {
250  // dont erase if session is both sampled and logged
251  if (!sess_info.forward_flow_info.__isset.
252  logged_bytes &&
253  !sess_info.reverse_flow_info.__isset.
254  logged_bytes) {
255  erase_session = true;
256  } else {
257  sess_info.forward_flow_info.__isset.sampled_bytes = false;
258  sess_info.forward_flow_info.__isset.sampled_pkts = false;
259  sess_info.reverse_flow_info.__isset.sampled_bytes = false;
260  sess_info.reverse_flow_info.__isset.sampled_pkts = false;
261  }
262  }
263  }
264  std::map<SessionIpPort, SessionInfo>::iterator sessions_temp_iter = sessions_iter;
265  sessions_temp_iter++;
266  if (erase_session) {
267  local_ep_iter->second.sessionMap.
268  erase(sessions_iter);
269  }
270  sessions_iter = sessions_temp_iter;
271 
272  }
273  }
274  }
275 }
bool operator<(const WaterMarkInfo &lhs, const WaterMarkInfo &rhs)
Definition: watermark.h:32
log4cplus::LogLevel SandeshLevelTolog4Level(SandeshLevel::type slevel)
Definition: sandesh.cc:395
static bool is_send_sampled_to_logger_enabled()
Definition: p/sandesh.h:339
static log4cplus::Logger & sampled_logger()
Definition: p/sandesh.h:325
static bool is_send_slo_to_collector_enabled()
Definition: p/sandesh.h:336
static log4cplus::Logger & slo_logger()
Definition: p/sandesh.h:324
std::string SessionEndpointLog(const SessionEndpoint &sep)
uint8_t type
Definition: load_balance.h:109
static const std::string integerToString(const NumberType &num)
Definition: string_util.h:19
static bool is_send_slo_to_logger_enabled()
Definition: p/sandesh.h:338
std::string SessionAggInfoLog(const SessionAggInfo &sess_agg)
static bool is_send_sampled_to_collector_enabled()
Definition: p/sandesh.h:337