OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
work_pipeline.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 /*
6  * work_pipeline.h
7  *
8  * This file provides some templates that can be used to manage processing.
9  *
10  * The user creates a WorkPipeline, with multiple WorkStage's inside it.
11  *
12  * The stages will execute in sequence. A stage has an execution phase, where
13  * multiple threads (instances) can do processing in parallel, and an optional
14  * merge phase where the results of the parallel execution can be consolidated.
15  * For each stage, the user provides the execution function (of type ExecuteFn)
16  * (optionally) a merge function (of type MergeFn).
17  *
18  * The execution function runs in parallel over a number of instances. Each
19  * instance executes in steps. During a step, the ExecuteFn is expected to
20  * provide the external command that needs to be run to get information (e.g.
21  * cassandra or redis, or another WorkPipleline) . ExecuteFn will called again
22  * (as a next step) when the results of the external command are available. In
23  * that next step, ExecuteFn can consolidate the external result into the
24  * instances' subresult. The user can run multiple such steps within the
25  * instance.
26  *
27  * After the subresults from all instances of that stage are available, MergeFn
28  * will run which can consolidate the subresults into the final result for
29  * that stage. If a stage has a single instance, then MergeFn is not needed
30  * for that stage.
31  *
32  * After all stages have been executed, the user's callback function is called.
33  * The user can then access the final result of the WorkPipeline.
34  *
35  * Here are some of the features:
36  *
37  * - Buffers are shared and transferred wherever possible (instead of copy)
38  *
39  * - A generic mechanism is provided to get information from any source.
40  * Both Async and Sync processing is allowed for the external sources.
41  *
42  * - Strong typechecking for Input Type, Subresult Type and Result type
43  * across stages.
44  *
45  * - TaskId and TaskInstance can be customized for ExecuteFn (for every
46  * instance of every stage) and MergeFn (for every stage)
47  *
48  *
49  * This file has the interfaces provided by WorkPipeline and WorkStage.
50  * Most of the implementation is split across two other files -
51  * work_processor-inl.h and work_pipeline-inl.h
52  *
53  */
54 
55 #ifndef __WORK_PIPELINE_H__
56 #define __WORK_PIPELINE_H__
57 
58 #include <vector>
59 #include <boost/function.hpp>
60 #include <boost/bind.hpp>
61 #include <boost/shared_ptr.hpp>
62 #include <boost/scoped_ptr.hpp>
63 #include <boost/assign/list_of.hpp>
64 #include <boost/type_traits.hpp>
65 #include <boost/mpl/assert.hpp>
66 #include <boost/tuple/tuple.hpp>
67 #include <limits>
68 #include <sstream>
69 #include <tbb/atomic.h>
70 #include "base/task.h"
71 
72 // When the WorkPipeline is done, it will call this function back
73 // to indicate that the result is ready.
74 // The bool indicates success or failure.
75 typedef boost::function<void(bool)> FinFn;
76 
77 
78 // This is the interface to the external command execution that
79 // can be requested by the user in ExecuteFn
80 //
81 // The user provides the function ExternalBase::Efn that the
82 // WorkStage can call with a single argument (ExternalBase *)
83 //
84 // The user should ensure that when the command is done, the
85 // argument is cast into ExternalProcIf *, and the
86 // Response function is called with the result of the the
87 // external command
88 struct ExternalBase {
89  typedef boost::function<bool(ExternalBase *)> Efn;
90  static bool Incomplete(void *) { assert(0); }
91 };
92 template<typename ExternalT>
93 struct ExternalProcIf : public ExternalBase {
94  virtual std::string Key() const = 0;
95  virtual void Response(std::unique_ptr<ExternalT>) = 0;
96  virtual ~ExternalProcIf() {}
97 };
98 
99 #include "work_processor-inl.h"
100 
101 // When creating the WorkPipeline, the user needs to supply it
102 // constructed WorkStage objects.
103 // The Constructor of this class is the only function
104 // that the client needs to call.
105 template<typename InputT, typename ResultT, typename ExternalT, typename SubResultT = ResultT>
106 class WorkStage : public WorkStageIf<InputT, ResultT> {
107 public:
108 
109  // A WorkStage executes multiple instances in parallel by calling ExecuteFn
110  // Each execution run of ExecuteFn is called a "step".
111  // - If the client returns ExternalBase::Incomplete, the step is incomplete.
112  // After yielding, ExecuteFn will be called again to continue the step.
113  // - If the client returns NULL, the step ends, and so does the instance.
114  // - Otherwise, WorkStage will call the returned callback function
115  // and move to the next step when the client calls ExternalProcIf's
116  // Response function. The buffer passed in the Response function is added
117  // to the exts vector before calling ExecuteFn for the next step.
118  //
119  // The client should fill in the final result in subRes before the
120  // instance ends.
121  typedef boost::function<
122  ExternalBase::Efn( // Client returns External fn to call and
123  // whether processing is incomplete.
124  uint32_t inst, // Instance num. (the instances execute in parallel)
125  const std::vector<ExternalT*>
126  & exts, // Info for previous steps of this stage
127  const InputT & inp, // Info from previous stage
128  SubResultT & subRes // Access to final result of this instance
130 
131  // When all the ExecuteFn instances end, the Execute phase of the stage
132  // is over. At that time, WorkStage calls MergeFn. MergeFn is expected to
133  // aggregate the SubResults from the ExecuteFn instances into the final
134  // Result for the stage.
135  // If the SubResult type and Result type is the same, MergeFn is optional.
136  // If no MergeFn is supplied, the SubResult of the 1st instance will be
137  // used as the final result of the stage.
138  typedef boost::function<
139  bool( // Client returns whether processing is incomplete.
140  const std::vector<boost::shared_ptr<SubResultT> >
141  & subs, // Subresults from this stage's instances
142  const boost::shared_ptr<InputT> & inp, // Info from previous stage
143  ResultT & res // Final result to be reported by this stage
145 
146  // The client needs to constuct WorkStages.
147  // tinfo is a vector of TaskId,TaskInstance pairs for ExecuteFn
148  // ExecuteFn and MergeFn were explained above
149  // tid and tinst are the TaskId and TaskInstance used for MergeFn
150  WorkStage(std::vector<std::pair<int,int> > tinfo,
151  ExecuteFn efn, MergeFn mfn = 0, int tid = 0, int tinst = -1);
152 
153  void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr<InputT> & inp);
154 
155  boost::shared_ptr<ResultT> Result() const;
156 
157  void Release();
158 
159 private:
160  uint32_t stage_;
161  tbb::atomic<uint32_t> remainingInst_;
163  boost::shared_ptr<InputT> inp_;
164  boost::shared_ptr<ResultT> res_;
165  std::vector<boost::shared_ptr<WorkProcessor<InputT,SubResultT,ExternalT> > > workers_;
166  std::vector<boost::shared_ptr<SubResultT> > subRes_;
169  bool finished_;
170  bool running_;
171  const int tid_;
172  const int tinst_;
173  const std::vector<std::pair<int,int> > tinfo_;
174 
175  bool Runner(void);
176  void WorkProcCb(uint32_t inst, bool ret_code);
177  void StageProceed(boost::true_type) { res_ = subRes_[0]; }
178  void StageProceed(boost::false_type) { assert(!merger_.empty()); }
179 
180 };
181 
182 
183 template<typename T0, typename T1, typename T2 = T1,
184  typename T3 = T2, typename T4 = T3,
185  typename T5 = T4, typename T6 = T5>
187 public:
188  typedef T6 ResT;
190 
191  // The client should instantiate the WorkPipeline with instances
192  // of WorkStage. The WorkPipeline accomodates a minimum of 1 and
193  // maximum of 6 stages. Each stage has an Input and Output type.
194  // The Output type of a stage must match the Input type of the
195  // next stage. This sequence of types must be used to instantiate
196  // the WorkPipeline template.
197  WorkPipeline(
198  WorkStageIf<T0,T1> * s0,
199  WorkStageIf<T1,T2> * s1 = NULL,
200  WorkStageIf<T2,T3> * s2 = NULL,
201  WorkStageIf<T3,T4> * s3 = NULL,
202  WorkStageIf<T4,T5> * s4 = NULL,
203  WorkStageIf<T5,T6> * s5 = NULL);
204 
205  // The client should call this function to start the WorkPipeline
206  // "finFn" is the callback function that WorkPipeline will call when
207  // the last stage ends. "inp" is the Input to the 1st stage.
208  // The WorkPipeline will retain a shared_ptr to the initial
209  // input for the lifetime of the WorkPipeline object.
210  void Start(FinFn finFn, const boost::shared_ptr<T0> & inp);
211 
212  // After "finFn" has been called by WorkPipeline, the client can
213  // access the final result by calling this function
214  // The WorkPipeline will retain a shared_ptr to the final result
215  // for the lifetime of the WorkPipeline object.
216  boost::shared_ptr<ResT> Result() const;
217 
218 private:
219  typedef boost::tuple<
220  const boost::shared_ptr<WorkStageIf<T0,T1> >,
221  const boost::shared_ptr<WorkStageIf<T1,T2> >,
222  const boost::shared_ptr<WorkStageIf<T2,T3> >,
223  const boost::shared_ptr<WorkStageIf<T3,T4> >,
224  const boost::shared_ptr<WorkStageIf<T4,T5> >,
225  const boost::shared_ptr<WorkStageIf<T5,T6> > > sg_type;
226 
227  bool finished_;
229  boost::shared_ptr<T0> inp_;
230  boost::shared_ptr<ResT> res_;
232 
233  template<int kS, bool same> struct PipeProceed {};
234 
235  template<int kS> struct PipeProceed<kS, true> {
236  static void Do(SelfT * wp) { wp->res_ = boost::get<kS>(wp->sg_)->Result(); }
237  };
238 
239  template<int kS> struct PipeProceed<kS, false> {
240  static void Do(SelfT * wp) { assert(boost::get<kS+1>(wp->sg_)); }
241  };
242 
243  void WorkStageCb(uint32_t stage, bool ret_code);
244  template<int kS,typename NextT> void NextStage();
245 
246 };
247 
248 #include "work_pipeline-inl.h"
249 
250 
251 #endif
boost::tuple< const boost::shared_ptr< WorkStageIf< T0, T1 > >, const boost::shared_ptr< WorkStageIf< T1, T2 > >, const boost::shared_ptr< WorkStageIf< T2, T3 > >, const boost::shared_ptr< WorkStageIf< T3, T4 > >, const boost::shared_ptr< WorkStageIf< T4, T5 > >, const boost::shared_ptr< WorkStageIf< T5, T6 > > > sg_type
boost::shared_ptr< ResT > Result() const
boost::shared_ptr< ResultT > res_
const int tid_
boost::function< void(bool)> FinFn
Definition: work_pipeline.h:75
WorkPipeline< T0, T1, T2, T3, T4, T5, T6 > SelfT
FinFn finFn_
const std::vector< std::pair< int, int > > tinfo_
std::vector< boost::shared_ptr< WorkProcessor< InputT, SubResultT, ExternalT > > > workers_
boost::function< bool(const std::vector< boost::shared_ptr< SubResultT > > &subs, const boost::shared_ptr< InputT > &inp, ResultT &res)> MergeFn
const MergeFn merger_
uint32_t stage_
std::vector< boost::shared_ptr< SubResultT > > subRes_
bool Runner(void)
WorkStage(std::vector< std::pair< int, int > > tinfo, ExecuteFn efn, MergeFn mfn=0, int tid=0, int tinst=-1)
virtual void Response(std::unique_ptr< ExternalT >)=0
virtual ~ExternalProcIf()
Definition: work_pipeline.h:96
tbb::atomic< uint32_t > remainingInst_
void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr< InputT > &inp)
const int tinst_
void Start(FinFn finFn, const boost::shared_ptr< T0 > &inp)
WorkPipeline(WorkStageIf< T0, T1 > *s0, WorkStageIf< T1, T2 > *s1=NULL, WorkStageIf< T2, T3 > *s2=NULL, WorkStageIf< T3, T4 > *s3=NULL, WorkStageIf< T4, T5 > *s4=NULL, WorkStageIf< T5, T6 > *s5=NULL)
boost::function< ExternalBase::Efn(uint32_t inst, const std::vector< ExternalT * > &exts, const InputT &inp, SubResultT &subRes)> ExecuteFn
static bool Incomplete(void *)
Definition: work_pipeline.h:90
boost::function< bool(ExternalBase *)> Efn
Definition: work_pipeline.h:89
boost::shared_ptr< T0 > inp_
bool finished_
virtual std::string Key() const =0
void StageProceed(boost::true_type)
boost::shared_ptr< ResT > res_
void StageProceed(boost::false_type)
const ExecuteFn efn_
void WorkProcCb(uint32_t inst, bool ret_code)
void WorkStageCb(uint32_t stage, bool ret_code)
boost::shared_ptr< ResultT > Result() const
boost::shared_ptr< InputT > inp_