OpenSDN source code
work_pipeline.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 /*
6  * work_pipeline.h
7  *
8  * This file provides some templates that can be used to manage processing.
9  *
10  * The user creates a WorkPipeline, with multiple WorkStage's inside it.
11  *
12  * The stages will execute in sequence. A stage has an execution phase, where
13  * multiple threads (instances) can do processing in parallel, and an optional
14  * merge phase where the results of the parallel execution can be consolidated.
15  * For each stage, the user provides the execution function (of type ExecuteFn)
16  * (optionally) a merge function (of type MergeFn).
17  *
18  * The execution function runs in parallel over a number of instances. Each
19  * instance executes in steps. During a step, the ExecuteFn is expected to
20  * provide the external command that needs to be run to get information (e.g.
21  * cassandra or redis, or another WorkPipleline) . ExecuteFn will called again
22  * (as a next step) when the results of the external command are available. In
23  * that next step, ExecuteFn can consolidate the external result into the
24  * instances' subresult. The user can run multiple such steps within the
25  * instance.
26  *
27  * After the subresults from all instances of that stage are available, MergeFn
28  * will run which can consolidate the subresults into the final result for
29  * that stage. If a stage has a single instance, then MergeFn is not needed
30  * for that stage.
31  *
32  * After all stages have been executed, the user's callback function is called.
33  * The user can then access the final result of the WorkPipeline.
34  *
35  * Here are some of the features:
36  *
37  * - Buffers are shared and transferred wherever possible (instead of copy)
38  *
39  * - A generic mechanism is provided to get information from any source.
40  * Both Async and Sync processing is allowed for the external sources.
41  *
42  * - Strong typechecking for Input Type, Subresult Type and Result type
43  * across stages.
44  *
45  * - TaskId and TaskInstance can be customized for ExecuteFn (for every
46  * instance of every stage) and MergeFn (for every stage)
47  *
48  *
49  * This file has the interfaces provided by WorkPipeline and WorkStage.
50  * Most of the implementation is split across two other files -
51  * work_processor-inl.h and work_pipeline-inl.h
52  *
53  */
54 
55 #ifndef __WORK_PIPELINE_H__
56 #define __WORK_PIPELINE_H__
57 
58 #include <vector>
59 #include <limits>
60 #include <sstream>
61 #include <atomic>
62 
63 #include <boost/function.hpp>
64 #include <boost/bind/bind.hpp>
65 #include <boost/shared_ptr.hpp>
66 #include <boost/scoped_ptr.hpp>
67 #include <boost/assign/list_of.hpp>
68 #include <boost/type_traits.hpp>
69 #include <boost/mpl/assert.hpp>
70 #include <boost/tuple/tuple.hpp>
71 #include "base/task.h"
72 
73 using namespace boost::placeholders;
74 
75 // When the WorkPipeline is done, it will call this function back
76 // to indicate that the result is ready.
77 // The bool indicates success or failure.
78 typedef boost::function<void(bool)> FinFn;
79 
80 
81 // This is the interface to the external command execution that
82 // can be requested by the user in ExecuteFn
83 //
84 // The user provides the function ExternalBase::Efn that the
85 // WorkStage can call with a single argument (ExternalBase *)
86 //
87 // The user should ensure that when the command is done, the
88 // argument is cast into ExternalProcIf *, and the
89 // Response function is called with the result of the the
90 // external command
91 struct ExternalBase {
92  typedef boost::function<bool(ExternalBase *)> Efn;
93  static bool Incomplete(void *) { assert(0); }
94 };
95 template<typename ExternalT>
96 struct ExternalProcIf : public ExternalBase {
97  virtual std::string Key() const = 0;
98  virtual void Response(std::unique_ptr<ExternalT>) = 0;
99  virtual ~ExternalProcIf() {}
100 };
101 
102 #include "work_processor-inl.h"
103 
104 // When creating the WorkPipeline, the user needs to supply it
105 // constructed WorkStage objects.
106 // The Constructor of this class is the only function
107 // that the client needs to call.
108 template<typename InputT, typename ResultT, typename ExternalT, typename SubResultT = ResultT>
109 class WorkStage : public WorkStageIf<InputT, ResultT> {
110 public:
111 
112  // A WorkStage executes multiple instances in parallel by calling ExecuteFn
113  // Each execution run of ExecuteFn is called a "step".
114  // - If the client returns ExternalBase::Incomplete, the step is incomplete.
115  // After yielding, ExecuteFn will be called again to continue the step.
116  // - If the client returns NULL, the step ends, and so does the instance.
117  // - Otherwise, WorkStage will call the returned callback function
118  // and move to the next step when the client calls ExternalProcIf's
119  // Response function. The buffer passed in the Response function is added
120  // to the exts vector before calling ExecuteFn for the next step.
121  //
122  // The client should fill in the final result in subRes before the
123  // instance ends.
124  typedef boost::function<
125  ExternalBase::Efn( // Client returns External fn to call and
126  // whether processing is incomplete.
127  uint32_t inst, // Instance num. (the instances execute in parallel)
128  const std::vector<ExternalT*>
129  & exts, // Info for previous steps of this stage
130  const InputT & inp, // Info from previous stage
131  SubResultT & subRes // Access to final result of this instance
133 
134  // When all the ExecuteFn instances end, the Execute phase of the stage
135  // is over. At that time, WorkStage calls MergeFn. MergeFn is expected to
136  // aggregate the SubResults from the ExecuteFn instances into the final
137  // Result for the stage.
138  // If the SubResult type and Result type is the same, MergeFn is optional.
139  // If no MergeFn is supplied, the SubResult of the 1st instance will be
140  // used as the final result of the stage.
141  typedef boost::function<
142  bool( // Client returns whether processing is incomplete.
143  const std::vector<boost::shared_ptr<SubResultT> >
144  & subs, // Subresults from this stage's instances
145  const boost::shared_ptr<InputT> & inp, // Info from previous stage
146  ResultT & res // Final result to be reported by this stage
148 
149  // The client needs to constuct WorkStages.
150  // tinfo is a vector of TaskId,TaskInstance pairs for ExecuteFn
151  // ExecuteFn and MergeFn were explained above
152  // tid and tinst are the TaskId and TaskInstance used for MergeFn
153  WorkStage(std::vector<std::pair<int,int> > tinfo,
154  ExecuteFn efn, MergeFn mfn = 0, int tid = 0, int tinst = -1);
155 
156  void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr<InputT> & inp);
157 
158  boost::shared_ptr<ResultT> Result() const;
159 
160  void Release();
161 
162 private:
163  uint32_t stage_;
164  std::atomic<uint32_t> remainingInst_;
166  boost::shared_ptr<InputT> inp_;
167  boost::shared_ptr<ResultT> res_;
168  std::vector<boost::shared_ptr<WorkProcessor<InputT,SubResultT,ExternalT> > > workers_;
169  std::vector<boost::shared_ptr<SubResultT> > subRes_;
172  bool finished_;
173  bool running_;
174  const int tid_;
175  const int tinst_;
176  const std::vector<std::pair<int,int> > tinfo_;
177 
178  bool Runner(void);
179  void WorkProcCb(uint32_t inst, bool ret_code);
180  void StageProceed(boost::true_type) { res_ = subRes_[0]; }
181  void StageProceed(boost::false_type) { assert(!merger_.empty()); }
182 
183 };
184 
185 
186 template<typename T0, typename T1, typename T2 = T1,
187  typename T3 = T2, typename T4 = T3,
188  typename T5 = T4, typename T6 = T5>
190 public:
191  typedef T6 ResT;
193 
194  // The client should instantiate the WorkPipeline with instances
195  // of WorkStage. The WorkPipeline accomodates a minimum of 1 and
196  // maximum of 6 stages. Each stage has an Input and Output type.
197  // The Output type of a stage must match the Input type of the
198  // next stage. This sequence of types must be used to instantiate
199  // the WorkPipeline template.
200  WorkPipeline(
201  WorkStageIf<T0,T1> * s0,
202  WorkStageIf<T1,T2> * s1 = NULL,
203  WorkStageIf<T2,T3> * s2 = NULL,
204  WorkStageIf<T3,T4> * s3 = NULL,
205  WorkStageIf<T4,T5> * s4 = NULL,
206  WorkStageIf<T5,T6> * s5 = NULL);
207 
208  // The client should call this function to start the WorkPipeline
209  // "finFn" is the callback function that WorkPipeline will call when
210  // the last stage ends. "inp" is the Input to the 1st stage.
211  // The WorkPipeline will retain a shared_ptr to the initial
212  // input for the lifetime of the WorkPipeline object.
213  void Start(FinFn finFn, const boost::shared_ptr<T0> & inp);
214 
215  // After "finFn" has been called by WorkPipeline, the client can
216  // access the final result by calling this function
217  // The WorkPipeline will retain a shared_ptr to the final result
218  // for the lifetime of the WorkPipeline object.
219  boost::shared_ptr<ResT> Result() const;
220 
221 private:
222  typedef boost::tuple<
223  const boost::shared_ptr<WorkStageIf<T0,T1> >,
224  const boost::shared_ptr<WorkStageIf<T1,T2> >,
225  const boost::shared_ptr<WorkStageIf<T2,T3> >,
226  const boost::shared_ptr<WorkStageIf<T3,T4> >,
227  const boost::shared_ptr<WorkStageIf<T4,T5> >,
228  const boost::shared_ptr<WorkStageIf<T5,T6> > > sg_type;
229 
230  bool finished_;
232  boost::shared_ptr<T0> inp_;
233  boost::shared_ptr<ResT> res_;
235 
236  template<int kS, bool same> struct PipeProceed {};
237 
238  template<int kS> struct PipeProceed<kS, true> {
239  static void Do(SelfT * wp) { wp->res_ = boost::get<kS>(wp->sg_)->Result(); }
240  };
241 
242  template<int kS> struct PipeProceed<kS, false> {
243  static void Do(SelfT * wp) { assert(boost::get<kS+1>(wp->sg_)); }
244  };
245 
246  void WorkStageCb(uint32_t stage, bool ret_code);
247  template<int kS,typename NextT> void NextStage();
248 
249 };
250 
251 #include "work_pipeline-inl.h"
252 
253 
254 #endif
boost::tuple< const boost::shared_ptr< WorkStageIf< T0, T1 > >, const boost::shared_ptr< WorkStageIf< T1, T2 > >, const boost::shared_ptr< WorkStageIf< T2, T3 > >, const boost::shared_ptr< WorkStageIf< T3, T4 > >, const boost::shared_ptr< WorkStageIf< T4, T5 > >, const boost::shared_ptr< WorkStageIf< T5, T6 > > > sg_type
boost::shared_ptr< T0 > inp_
WorkPipeline< T0, T1, T2, T3, T4, T5, T6 > SelfT
boost::shared_ptr< ResT > res_
uint32_t stage_
boost::shared_ptr< InputT > inp_
const MergeFn merger_
void StageProceed(boost::true_type)
FinFn finFn_
void StageProceed(boost::false_type)
boost::function< ExternalBase::Efn(uint32_t inst, const std::vector< ExternalT * > &exts, const InputT &inp, SubResultT &subRes)> ExecuteFn
std::atomic< uint32_t > remainingInst_
const std::vector< std::pair< int, int > > tinfo_
const int tinst_
const ExecuteFn efn_
std::vector< boost::shared_ptr< SubResultT > > subRes_
std::vector< boost::shared_ptr< WorkProcessor< InputT, SubResultT, ExternalT > > > workers_
bool finished_
const int tid_
boost::function< bool(const std::vector< boost::shared_ptr< SubResultT > > &subs, const boost::shared_ptr< InputT > &inp, ResultT &res)> MergeFn
boost::shared_ptr< ResultT > res_
static bool Incomplete(void *)
Definition: work_pipeline.h:93
boost::function< bool(ExternalBase *)> Efn
Definition: work_pipeline.h:92
virtual ~ExternalProcIf()
Definition: work_pipeline.h:99
virtual void Response(std::unique_ptr< ExternalT >)=0
virtual std::string Key() const =0
boost::function< void(bool)> FinFn
Definition: work_pipeline.h:78