OpenSDN source code
work_processor-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef __WORK_PROCESSOR_INL_H__
6 #define __WORK_PROCESSOR_INL_H__
7 
8 #include <vector>
9 #include <boost/function.hpp>
10 #include <boost/bind/bind.hpp>
11 #include <boost/shared_ptr.hpp>
12 #include <boost/scoped_ptr.hpp>
13 #include <boost/assign/list_of.hpp>
14 #include <boost/type_traits.hpp>
15 #include <boost/mpl/assert.hpp>
16 #include <boost/tuple/tuple.hpp>
17 #include <limits>
18 #include <sstream>
19 #include "base/task.h"
20 
21 using namespace boost::placeholders;
22 
23 struct PipelineWorker : public Task {
24  PipelineWorker(int tid, int tinst, boost::function<bool(void)> runner) :
25  Task(tid,tinst), runner_(runner) {}
26 
27  bool Run() {
28  if (!(runner_)()) return false;
29  return true;
30  }
31  std::string Description() const { return "PipelineWorker"; }
32 private:
33  const boost::function<bool(void)> runner_;
34 };
35 
36 template<typename InputT, typename SubResultT, typename ExternalT>
37 class WorkProcessor : public ExternalProcIf<ExternalT> {
38 public:
39  typedef boost::function<ExternalBase::Efn(
40  uint32_t inst,
41  const std::vector<ExternalT*> & exts, // Info for previous steps of this stage
42  const InputT & inp, // Info from previous stage
43  SubResultT & subRes // Access to final result of this instance
45 
46  WorkProcessor(uint32_t stage, ExecuteFn efn, FinFn finFn, const InputT & inp,
47  uint32_t inst, int tid, int tinst) :
48  stage_(stage),
49  finFn_(finFn),
50  efn_(efn),
51  inp_(inp),
52  finished_(false),
53  running_(false),
54  inst_(inst),
55  tid_(tid),
56  tinst_(tinst),
57  w_(new PipelineWorker(tid_, tinst_,
58  boost::bind(&WorkProcessor<InputT,SubResultT,ExternalT>::Runner,
59  this))) {
60  res_.reset(new SubResultT);
61  }
62 
63  void Start() {
65  scheduler->Enqueue(w_);
66  }
67 
68  boost::shared_ptr<SubResultT> Result() const {
69  if (!finished_) return boost::shared_ptr<SubResultT>();
70  return res_;
71  }
72 
73  void Release() {
74  assert(finished_);
75  res_.reset();
76  }
77 
78  std::string Key() const {
79  std::stringstream keystr;
80  keystr << "PROC-STAGE:" << stage_ << "-INST:" << inst_ <<
81  "-STEP:" << externals_.size();
82  return keystr.str();
83  }
84 
85 private:
86  uint32_t stage_;
88  const ExecuteFn efn_;
89  const InputT & inp_;
90  boost::shared_ptr<SubResultT> res_;
91  std::vector<ExternalT*> externals_;
92  bool finished_;
93  bool running_;
94  const uint32_t inst_;
95  const int tid_;
96  const int tinst_;
97  PipelineWorker * const w_;
98 
99  void WorkDone(bool ret_code) {
100  for (typename std::vector<ExternalT*>::iterator it = externals_.begin();
101  it!=externals_.end(); it++) {
102  delete (*it);
103  }
104  finished_ = true;
105  finFn_(ret_code);
106  }
107 
108  bool Runner(void) {
109  running_ = true;
110  ExternalBase::Efn fn = (efn_)(inst_, externals_, inp_, *res_);
111  running_ = false;
112  if (fn.empty()) {
113  WorkDone(true);
114  } else {
115  if (fn == &ExternalBase::Incomplete) return false;
116 
117  if (!fn(this)) {
118  WorkDone(false);
119  }
120  }
121  return true;
122  }
123 
124  void Response(std::unique_ptr<ExternalT> resp) {
125  ExternalT * msg = resp.get();
126  resp.release();
127  externals_.push_back(msg);
128  assert(!running_);
129  PipelineWorker *w = new PipelineWorker(tid_, tinst_,
131  this));
133  scheduler->Enqueue(w);
134  }
135 };
136 
137 template <typename InputT, typename ResultT>
138 struct WorkStageIf {
139  virtual void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr<InputT> & inp) = 0;
140  virtual boost::shared_ptr<ResultT> Result() const = 0;
141  virtual void Release() = 0;
142  virtual ~WorkStageIf() {}
143 };
144 #endif
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
void Response(std::unique_ptr< ExternalT > resp)
const InputT & inp_
const uint32_t inst_
void WorkDone(bool ret_code)
std::string Key() const
boost::function< ExternalBase::Efn(uint32_t inst, const std::vector< ExternalT * > &exts, const InputT &inp, SubResultT &subRes)> ExecuteFn
std::vector< ExternalT * > externals_
boost::shared_ptr< SubResultT > res_
const ExecuteFn efn_
PipelineWorker *const w_
boost::shared_ptr< SubResultT > Result() const
WorkProcessor(uint32_t stage, ExecuteFn efn, FinFn finFn, const InputT &inp, uint32_t inst, int tid, int tinst)
static bool Incomplete(void *)
Definition: work_pipeline.h:93
boost::function< bool(ExternalBase *)> Efn
Definition: work_pipeline.h:92
bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
std::string Description() const
Gives a description of the task.
const boost::function< bool(void)> runner_
PipelineWorker(int tid, int tinst, boost::function< bool(void)> runner)
virtual ~WorkStageIf()
virtual void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr< InputT > &inp)=0
virtual boost::shared_ptr< ResultT > Result() const =0
virtual void Release()=0
boost::function< void(bool)> FinFn
Definition: work_pipeline.h:78