OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
derived_stats_algo.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 
6 // derived_stats_algo.h
7 //
8 // This file has the implementation of
9 // derived stats algorithms
10 //
11 #ifndef __DERIVED_STATS_ALGO_H__
12 #define __DERIVED_STATS_ALGO_H__
13 
14 #include <iostream>
15 #include <cstdlib>
16 #include <cmath>
17 #include <sstream>
18 #include <sandesh/derived_stats_results_types.h>
19 #include <sandesh/derived_stats.h>
20 #include <boost/assign/list_of.hpp>
21 #include <boost/scoped_ptr.hpp>
22 
23 using std::vector;
24 using std::map;
25 using std::pair;
26 using std::make_pair;
27 using std::string;
28 
29 namespace contrail {
30 namespace sandesh {
31 
32 // Interface for all Anomaly Detection algorithms
33 // That can be plugged into the DSAnomaly DerivedStat
34 template <typename ElemT>
35 class DSAnomalyIf {
36  public:
37  virtual DSReturnType FillResult(AnomalyResult& res) const = 0;
38  virtual ~DSAnomalyIf() { }
39  virtual void Update(const ElemT& raw) = 0;
40 };
41 
42 // Implementation of Exponential Weighted Mean
43 // algorithm for Anomaly Detection
44 template <typename ElemT>
45 class DSAnomalyEWM : public DSAnomalyIf<ElemT> {
46  public:
47  DSAnomalyEWM (const std::string &annotation,
48  std::string &errstr) : samples_(0),
49  mean_(0), variance_(0), sigma_(0), stddev_(0), psigma_(0) {
50  alpha_ = (double) strtod(annotation.c_str(), NULL);
51  if ((alpha_ <= 0) || (alpha_ > 1)) {
52  errstr = std::string("Invalid alpha ") + annotation;
53  alpha_ = 0;
54  }
55  }
56  uint64_t samples_;
57  double alpha_;
58  double mean_;
59  double variance_;
60  double sigma_;
61  double stddev_;
62  double psigma_;
63 
64  virtual DSReturnType FillResult(AnomalyResult& res) const {
65  assert(alpha_ != 0);
66  std::ostringstream meanstr, stddevstr;
67  meanstr << mean_;
68  stddevstr << stddev_;
69 
70  res.set_samples(samples_);
71  res.set_sigma(sigma_);
72  res.set_state(boost::assign::map_list_of(
73  std::string("mean"), meanstr.str())(
74  std::string("stddev"), stddevstr.str()));
75 
76  // We don't have enough samples to report an anomaly
77  if (samples_ < (uint64_t(1.0/alpha_))) return DSR_INVALID;
78  if (samples_ == (uint64_t(1.0/alpha_))) return DSR_OK;
79 
80  // If sigma was low, and is still low,
81  // we don't need to send anything
82  if ((psigma_ < 0.5) && (psigma_ > -0.5) &&
83  (sigma_ < 0.5) && (sigma_ > -0.5))
84  return DSR_SKIP;
85  else
86  return DSR_OK;
87  }
88 
89  virtual void Update(const ElemT& raw) {
90  assert(alpha_ != 0);
91  samples_++;
92  variance_ = (1-alpha_)*(variance_ + (alpha_*pow(raw-mean_,2)));
93  mean_ = ((1-alpha_)*mean_) + (alpha_*raw);
94  stddev_ = sqrt(variance_);
95  psigma_ = sigma_;
96  if (stddev_) sigma_ = (raw - mean_) / stddev_;
97  else sigma_ = 0;
98  }
99 };
100 
101 template <typename ElemT, class AnomalyResT>
102 class DSAnomaly {
103  public:
104  DSAnomaly(const std::string &annotation) {
105  size_t rpos = annotation.find(':');
106  started_ = false;
107  algo_ = annotation.substr(0,rpos);
108  config_ = annotation.substr(rpos+1, string::npos);
109  if (algo_.compare("EWM") == 0) {
111  if (!error_.empty()) {
112  impl_.reset();
113  }
114  return;
115  }
116  // No valid Anomaly Detection algorithm was found
117  impl_.reset(NULL);
118  }
119 
120  DSReturnType FillResult(AnomalyResT &res) const {
121  DSReturnType ret = DSR_OK;
122  if (impl_) {
123  ret = impl_->FillResult(res);
124  // We should have cleared impl_ if there was a parsing error
125  // with the DSAnomaly config
126  assert(error_.empty());
127  }
128  if (started_) res.set_metric(previous_);
129  res.set_algo(algo_);
130  res.set_config(config_);
131  if (!error_.empty()) res.set_error(error_);
132  return ret;
133  }
134 
135  void Update(const ElemT& raw, uint64_t mono_usec) {
136  if (impl_) {
137  impl_->Update(raw);
138  started_ = true;
139  previous_ = raw;
140  }
141  }
142 
143  std::string algo_;
144  std::string config_;
145  std::string error_;
146  ElemT previous_;
147  bool started_;
148  boost::scoped_ptr<DSAnomalyIf<ElemT> > impl_;
149 };
150 
151 template <typename ElemT, class EWMResT>
152 class DSEWM {
153  public:
154  DSEWM(const std::string &annotation):
155  mean_(0), variance_(0),
156  sigma_(0), stddev_(0), samples_(0) {
157  alpha_ = (double) strtod(annotation.c_str(), NULL);
158  if (alpha_ == 0) {
159  error_ = std::string("Disabled");
160  return;
161  }
162  if ((alpha_ < 0) || (alpha_ > 1)) {
163  error_ = std::string("Invalid alpha ") + annotation;
164  alpha_ = 0;
165  }
166  }
167 
168  double alpha_;
169  double mean_;
170  double variance_;
171  double sigma_;
172  double stddev_;
173  uint64_t samples_;
174  std::string error_;
175 
176  DSReturnType FillResult(EWMResT &res) const {
177  res.set_samples(samples_);
178  if (!error_.empty()) {
179  res.set_error(error_);
180  return DSR_OK;
181  }
182  res.set_mean(mean_);
183  res.set_stddev(stddev_);
184  res.set_sigma(sigma_);
185  return DSR_OK;
186  }
187  void Update(const ElemT& raw, uint64_t mono_usec) {
188  samples_++;
189  if (!error_.empty()) return;
190  variance_ = (1-alpha_)*(variance_ + (alpha_*pow(raw-mean_,2)));
191  mean_ = ((1-alpha_)*mean_) + (alpha_*raw);
192  stddev_ = sqrt(variance_);
193  if (stddev_) sigma_ = (raw - mean_) / stddev_;
194  else sigma_ = 0;
195  }
196 };
197 
198 template <typename ElemT, class NullResT>
199 class DSChange {
200  public:
201  DSChange(const std::string &annotation) : init_(false) {}
202 
203  bool init_;
204  ElemT value_;
205  ElemT prev_;
206 
207  DSReturnType FillResult(NullResT &res) const {
208  if (!init_) return DSR_INVALID;
209  res = value_;
210  if (prev_ == value_) return DSR_SKIP;
211  else return DSR_OK;
212  }
213 
214  void Update(const ElemT& raw, uint64_t mono_usec) {
215  if (init_) prev_ = value_;
216  value_ = raw;
217  init_ = true;
218  }
219 };
220 
221 template <typename ElemT, class NullResT>
222 class DSNon0 {
223  public:
224  DSNon0(const std::string &annotation) {}
225  ElemT value_;
226 
227  DSReturnType FillResult(NullResT &res) const {
228  res = value_;
229  if (value_ == 0) return DSR_SKIP;
230  else return DSR_OK;
231  }
232 
233  void Update(const ElemT& raw, uint64_t mono_usec) {
234  value_ = raw;
235  }
236 };
237 
238 template <typename ElemT, class NullResT>
239 class DSNone {
240  public:
241  DSNone(const std::string &annotation) {}
242  ElemT value_;
243 
244  DSReturnType FillResult(NullResT &res) const {
245  res = value_;
246  return DSR_OK;
247  }
248  void Update(const ElemT& raw, uint64_t mono_usec) {
249  value_ = raw;
250  }
251 };
252 
253 template <typename ElemT, class NullResT>
254 class DSNull {
255  public:
256  DSNull(const std::string &annotation): samples_(0) {}
257  ElemT value_;
258  uint64_t samples_;
259 
260  DSReturnType FillResult(NullResT &res) const {
261  res.set_samples(samples_);
262  res.set_value(value_);
263  return DSR_OK;
264  }
265  void Update(const ElemT& raw, uint64_t mono_usec) {
266  samples_++;
267  value_ = raw;
268  }
269 };
270 
271 template <typename ElemT, class SumResT>
272 class DSSum {
273  public:
274  DSSum(const std::string &annotation): samples_(0), shifter_(0),
275  start_tbin_(0), last_tbin_(0) {
276 
277  if (annotation.empty()) {
278  range_usecs_ = 0;
279  } else {
280  range_usecs_ = ((uint64_t) strtoul(annotation.c_str(), NULL, 10)) * 1000000;
281  // We want each time bucket to represent between
282  // 1/128th and 1/256th of the entire range
283  // This ensures that there will never be more than 256 buckets
284  while ((uint64_t)(1 << (shifter_ + 8)) < range_usecs_) shifter_++;
285  }
286  }
287  uint64_t samples_;
288  SumResT value_;
289  map<uint64_t, pair<uint64_t,ElemT> > history_buf_;
290  uint64_t range_usecs_;
291  uint8_t shifter_;
292  uint64_t start_tbin_;
293  uint64_t last_tbin_;
294 
295  virtual DSReturnType FillResult(SumResT &res) const {
296  static SumResT empty_val;
297  if (!samples_) {
298  res = empty_val;
299  return DSR_INVALID;
300  }
301  if (range_usecs_) {
302  // At least one time-range's worth of samples must be seen
303  // before we can report a SUM
304  uint64_t end_range =
306  if (end_range > last_tbin_) {
307  res = empty_val;
308  return DSR_INVALID;
309  } else {
310  }
311  }
312  res = value_;
313  return DSR_OK;
314  }
315 
316  virtual void Purge(uint64_t mono_usec) {
317 
318  for (typename map<uint64_t, pair<uint64_t,ElemT> >::iterator it =
319  history_buf_.begin(); it!= history_buf_.end(); ) {
320  uint64_t end_range = ((it->first + range_usecs_) >> shifter_) << shifter_;
321  if (end_range <= mono_usec) {
322  value_ = value_ - it->second.second;
323  samples_ = samples_ - it->second.first;
324  history_buf_.erase(it++);
325  } else {
326  ++it;
327  }
328  }
329  }
330 
331  virtual void Update(const ElemT& raw, uint64_t mono_usec) {
332  if (!samples_) {
333  value_ = raw;
334  } else {
335  value_ = value_ + raw;
336  }
337  if (range_usecs_) {
338  // if 'range_usecs_' is non-0, we need to aggregate only
339  // the last 'range_usecs_' worth of elements
340  uint64_t tbin = (mono_usec >> shifter_) << shifter_;
341  if (!start_tbin_) start_tbin_ = tbin;
342  last_tbin_ = tbin;
343 
344  // Subtract old entries, if there are any
345  Purge(tbin);
346 
347  // Record the new update, so we can subtract when needed
348  typename map<uint64_t, pair<uint64_t,ElemT> >::iterator ut =
349  history_buf_.find(tbin);
350  if (ut == history_buf_.end()) {
351  history_buf_[tbin] = make_pair(1,raw);
352  } else {
353  ut->second.second = ut->second.second + raw;
354  ut->second.first++;
355  }
356  }
357  samples_++;
358  }
359 };
360 
361 template <typename ElemT, class AvgResT>
362 class DSAvg : public DSSum<ElemT,AvgResT> {
363  public:
364  DSAvg(const std::string &annotation): DSSum<ElemT,AvgResT>(annotation) {}
365 
366  virtual DSReturnType FillResult(AvgResT &res) const {
367  AvgResT sres;
369  res = sres / DSSum<ElemT,AvgResT>::samples_;
370  return DSR_OK;
371  } else {
372  return DSR_INVALID;
373  }
374  }
375 };
376 
377 } // namespace sandesh
378 } // namespace contrail
379 
380 #endif // #define __DERIVED_STATS_ALGO_H__
void Update(const ElemT &raw, uint64_t mono_usec)
DSReturnType FillResult(NullResT &res) const
DSReturnType FillResult(NullResT &res) const
void Update(const ElemT &raw, uint64_t mono_usec)
DSReturnType FillResult(EWMResT &res) const
virtual void Update(const ElemT &raw)
virtual void Purge(uint64_t mono_usec)
void Update(const ElemT &raw, uint64_t mono_usec)
boost::scoped_ptr< DSAnomalyIf< ElemT > > impl_
DSNon0(const std::string &annotation)
virtual void Update(const ElemT &raw, uint64_t mono_usec)
void Update(const ElemT &raw, uint64_t mono_usec)
virtual DSReturnType FillResult(AnomalyResult &res) const =0
DSReturnType FillResult(NullResT &res) const
DSReturnType FillResult(NullResT &res) const
void Update(const ElemT &raw, uint64_t mono_usec)
DSSum(const std::string &annotation)
DSAnomaly(const std::string &annotation)
virtual void Update(const ElemT &raw)=0
DSChange(const std::string &annotation)
virtual DSReturnType FillResult(AnomalyResult &res) const
virtual DSReturnType FillResult(SumResT &res) const
DSEWM(const std::string &annotation)
DSAvg(const std::string &annotation)
void Update(const ElemT &raw, uint64_t mono_usec)
DSReturnType FillResult(AnomalyResT &res) const
DSNull(const std::string &annotation)
DSNone(const std::string &annotation)
DSAnomalyEWM(const std::string &annotation, std::string &errstr)
virtual DSReturnType FillResult(AvgResT &res) const
map< uint64_t, pair< uint64_t, ElemT > > history_buf_