55 #ifndef __WORK_PIPELINE_H__
56 #define __WORK_PIPELINE_H__
59 #include <boost/function.hpp>
60 #include <boost/bind.hpp>
61 #include <boost/shared_ptr.hpp>
62 #include <boost/scoped_ptr.hpp>
63 #include <boost/assign/list_of.hpp>
64 #include <boost/type_traits.hpp>
65 #include <boost/mpl/assert.hpp>
66 #include <boost/tuple/tuple.hpp>
69 #include <tbb/atomic.h>
75 typedef boost::function<void(bool)>
FinFn;
89 typedef boost::function<bool(ExternalBase *)>
Efn;
92 template<
typename ExternalT>
94 virtual std::string
Key()
const = 0;
95 virtual void Response(std::unique_ptr<ExternalT>) = 0;
105 template<
typename InputT,
typename ResultT,
typename ExternalT,
typename SubResultT = ResultT>
121 typedef boost::function<
125 const std::vector<ExternalT*>
138 typedef boost::function<
140 const std::vector<boost::shared_ptr<SubResultT> >
142 const boost::shared_ptr<InputT> & inp,
150 WorkStage(std::vector<std::pair<int,int> > tinfo,
153 void Start(uint32_t stage,
FinFn finFn,
const boost::shared_ptr<InputT> & inp);
155 boost::shared_ptr<ResultT>
Result()
const;
163 boost::shared_ptr<InputT>
inp_;
164 boost::shared_ptr<ResultT>
res_;
165 std::vector<boost::shared_ptr<WorkProcessor<InputT,SubResultT,ExternalT> > >
workers_;
166 std::vector<boost::shared_ptr<SubResultT> >
subRes_;
173 const std::vector<std::pair<int,int> >
tinfo_;
176 void WorkProcCb(uint32_t inst,
bool ret_code);
183 template<
typename T0,
typename T1,
typename T2 = T1,
184 typename T3 = T2,
typename T4 = T3,
185 typename T5 = T4,
typename T6 = T5>
210 void Start(
FinFn finFn,
const boost::shared_ptr<T0> & inp);
216 boost::shared_ptr<ResT>
Result()
const;
219 typedef boost::tuple<
220 const boost::shared_ptr<WorkStageIf<T0,T1> >,
221 const boost::shared_ptr<WorkStageIf<T1,T2> >,
222 const boost::shared_ptr<WorkStageIf<T2,T3> >,
223 const boost::shared_ptr<WorkStageIf<T3,T4> >,
224 const boost::shared_ptr<WorkStageIf<T4,T5> >,
225 const boost::shared_ptr<WorkStageIf<T5,T6> > >
sg_type;
240 static void Do(
SelfT * wp) { assert(boost::get<kS+1>(wp->
sg_)); }
244 template<
int kS,
typename NextT>
void NextStage();
boost::tuple< const boost::shared_ptr< WorkStageIf< T0, T1 > >, const boost::shared_ptr< WorkStageIf< T1, T2 > >, const boost::shared_ptr< WorkStageIf< T2, T3 > >, const boost::shared_ptr< WorkStageIf< T3, T4 > >, const boost::shared_ptr< WorkStageIf< T4, T5 > >, const boost::shared_ptr< WorkStageIf< T5, T6 > > > sg_type
boost::shared_ptr< ResT > Result() const
boost::shared_ptr< ResultT > res_
boost::function< void(bool)> FinFn
WorkPipeline< T0, T1, T2, T3, T4, T5, T6 > SelfT
const std::vector< std::pair< int, int > > tinfo_
std::vector< boost::shared_ptr< WorkProcessor< InputT, SubResultT, ExternalT > > > workers_
boost::function< bool(const std::vector< boost::shared_ptr< SubResultT > > &subs, const boost::shared_ptr< InputT > &inp, ResultT &res)> MergeFn
std::vector< boost::shared_ptr< SubResultT > > subRes_
WorkStage(std::vector< std::pair< int, int > > tinfo, ExecuteFn efn, MergeFn mfn=0, int tid=0, int tinst=-1)
virtual void Response(std::unique_ptr< ExternalT >)=0
virtual ~ExternalProcIf()
tbb::atomic< uint32_t > remainingInst_
void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr< InputT > &inp)
void Start(FinFn finFn, const boost::shared_ptr< T0 > &inp)
WorkPipeline(WorkStageIf< T0, T1 > *s0, WorkStageIf< T1, T2 > *s1=NULL, WorkStageIf< T2, T3 > *s2=NULL, WorkStageIf< T3, T4 > *s3=NULL, WorkStageIf< T4, T5 > *s4=NULL, WorkStageIf< T5, T6 > *s5=NULL)
boost::function< ExternalBase::Efn(uint32_t inst, const std::vector< ExternalT * > &exts, const InputT &inp, SubResultT &subRes)> ExecuteFn
static bool Incomplete(void *)
boost::function< bool(ExternalBase *)> Efn
boost::shared_ptr< T0 > inp_
virtual std::string Key() const =0
void StageProceed(boost::true_type)
boost::shared_ptr< ResT > res_
static void Do(SelfT *wp)
void StageProceed(boost::false_type)
void WorkProcCb(uint32_t inst, bool ret_code)
void WorkStageCb(uint32_t stage, bool ret_code)
static void Do(SelfT *wp)
boost::shared_ptr< ResultT > Result() const
boost::shared_ptr< InputT > inp_