OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
work_processor-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef __WORK_PROCESSOR_INL_H__
6 #define __WORK_PROCESSOR_INL_H__
7 
8 #include <vector>
9 #include <boost/function.hpp>
10 #include <boost/bind.hpp>
11 #include <boost/shared_ptr.hpp>
12 #include <boost/scoped_ptr.hpp>
13 #include <boost/assign/list_of.hpp>
14 #include <boost/type_traits.hpp>
15 #include <boost/mpl/assert.hpp>
16 #include <boost/tuple/tuple.hpp>
17 #include <limits>
18 #include <sstream>
19 #include <tbb/atomic.h>
20 #include "base/task.h"
21 
22 struct PipelineWorker : public Task {
23  PipelineWorker(int tid, int tinst, boost::function<bool(void)> runner) :
24  Task(tid,tinst), runner_(runner) {}
25 
26  bool Run() {
27  if (!(runner_)()) return false;
28  return true;
29  }
30  std::string Description() const { return "PipelineWorker"; }
31 private:
32  const boost::function<bool(void)> runner_;
33 };
34 
35 template<typename InputT, typename SubResultT, typename ExternalT>
36 class WorkProcessor : public ExternalProcIf<ExternalT> {
37 public:
38  typedef boost::function<ExternalBase::Efn(
39  uint32_t inst,
40  const std::vector<ExternalT*> & exts, // Info for previous steps of this stage
41  const InputT & inp, // Info from previous stage
42  SubResultT & subRes // Access to final result of this instance
44 
45  WorkProcessor(uint32_t stage, ExecuteFn efn, FinFn finFn, const InputT & inp,
46  uint32_t inst, int tid, int tinst) :
47  stage_(stage),
48  finFn_(finFn),
49  efn_(efn),
50  inp_(inp),
51  finished_(false),
52  running_(false),
53  inst_(inst),
54  tid_(tid),
55  tinst_(tinst),
57  boost::bind(&WorkProcessor<InputT,SubResultT,ExternalT>::Runner,
58  this))) {
59  res_.reset(new SubResultT);
60  }
61 
62  void Start() {
64  scheduler->Enqueue(w_);
65  }
66 
67  boost::shared_ptr<SubResultT> Result() const {
68  if (!finished_) return boost::shared_ptr<SubResultT>();
69  return res_;
70  }
71 
72  void Release() {
73  assert(finished_);
74  res_.reset();
75  }
76 
77  std::string Key() const {
78  std::stringstream keystr;
79  keystr << "PROC-STAGE:" << stage_ << "-INST:" << inst_ <<
80  "-STEP:" << externals_.size();
81  return keystr.str();
82  }
83 
84 private:
85  uint32_t stage_;
87  const ExecuteFn efn_;
88  const InputT & inp_;
89  boost::shared_ptr<SubResultT> res_;
90  std::vector<ExternalT*> externals_;
91  bool finished_;
92  bool running_;
93  const uint32_t inst_;
94  const int tid_;
95  const int tinst_;
96  PipelineWorker * const w_;
97 
98  void WorkDone(bool ret_code) {
99  for (typename std::vector<ExternalT*>::iterator it = externals_.begin();
100  it!=externals_.end(); it++) {
101  delete (*it);
102  }
103  finished_ = true;
104  finFn_(ret_code);
105  }
106 
107  bool Runner(void) {
108  running_ = true;
110  running_ = false;
111  if (fn.empty()) {
112  WorkDone(true);
113  } else {
114  if (fn == &ExternalBase::Incomplete) return false;
115 
116  if (!fn(this)) {
117  WorkDone(false);
118  }
119  }
120  return true;
121  }
122 
123  void Response(std::unique_ptr<ExternalT> resp) {
124  ExternalT * msg = resp.get();
125  resp.release();
126  externals_.push_back(msg);
127  assert(!running_);
130  this));
132  scheduler->Enqueue(w);
133  }
134 };
135 
136 template <typename InputT, typename ResultT>
137 struct WorkStageIf {
138  virtual void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr<InputT> & inp) = 0;
139  virtual boost::shared_ptr<ResultT> Result() const = 0;
140  virtual void Release() = 0;
141  virtual ~WorkStageIf() {}
142 };
143 #endif
const ExecuteFn efn_
PipelineWorker *const w_
void WorkDone(bool ret_code)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
const uint32_t inst_
virtual ~WorkStageIf()
boost::function< void(bool)> FinFn
Definition: work_pipeline.h:75
virtual void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr< InputT > &inp)=0
boost::function< ExternalBase::Efn(uint32_t inst, const std::vector< ExternalT * > &exts, const InputT &inp, SubResultT &subRes)> ExecuteFn
std::string Key() const
WorkProcessor(uint32_t stage, ExecuteFn efn, FinFn finFn, const InputT &inp, uint32_t inst, int tid, int tinst)
virtual void Release()=0
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Response(std::unique_ptr< ExternalT > resp)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
boost::shared_ptr< SubResultT > res_
static bool Incomplete(void *)
Definition: work_pipeline.h:90
boost::function< bool(ExternalBase *)> Efn
Definition: work_pipeline.h:89
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
PipelineWorker(int tid, int tinst, boost::function< bool(void)> runner)
virtual boost::shared_ptr< ResultT > Result() const =0
std::string Description() const
const boost::function< bool(void)> runner_
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
std::vector< ExternalT * > externals_
const InputT & inp_
boost::shared_ptr< SubResultT > Result() const