5 #ifndef __WORK_PIPELINE_INL_H__
6 #define __WORK_PIPELINE_INL_H__
9 template<
typename InputT,
typename ResultT,
typename ExternalT,
typename SubResultT>
11 std::vector<std::pair<int,int> > tinfo,
13 stage_(std::numeric_limits<uint32_t>::max()),
23 template<
typename InputT,
typename ResultT,
typename ExternalT,
typename SubResultT>
26 const boost::shared_ptr<InputT> & inp) {
32 remainingInst_ = tinfo_.size();
33 for (uint32_t tk = 0 ; tk < tinfo_.size(); tk++) {
38 *inp_, tk, tinfo_[tk].first, tinfo_[tk].second)));
40 for (uint32_t tk = 0 ; tk < tinfo_.size(); tk++) {
41 workers_[tk]->Start();
46 template<
typename InputT,
typename ResultT,
typename ExternalT,
typename SubResultT>
47 boost::shared_ptr<ResultT>
49 if (!finished_)
return boost::shared_ptr<ResultT>();
53 template<
typename InputT,
typename ResultT,
typename ExternalT,
typename SubResultT>
57 assert(subRes_.size() == tinfo_.size());
59 for (uint32_t i = 0; i < subRes_.size(); i++) {
66 template<
typename InputT,
typename ResultT,
typename ExternalT,
typename SubResultT>
70 if (!(merger_)(subRes_, inp_, *res_)) {
80 template<
typename InputT,
typename ResultT,
typename ExternalT,
typename SubResultT>
83 uint32_t prev = remainingInst_.fetch_and_decrement();
85 assert(workers_.size() == tinfo_.size());
86 for (uint32_t i = 0; i < workers_.size(); i++) {
87 subRes_.push_back(workers_[i]->Result());
88 workers_[i]->Release();
90 StageProceed(boost::is_same<ResultT,SubResultT>());
91 if (merger_.empty()) {
95 res_.reset(
new ResultT);
106 template<
typename T0,
typename T1,
typename T2,
typename T3,
107 typename T4,
typename T5,
typename T6>
124 template<
typename T0,
typename T1,
typename T2,
typename T3,
125 typename T4,
typename T5,
typename T6>
131 g->Start(0, boost::bind(&SelfT::WorkStageCb,
135 template<
typename T0,
typename T1,
typename T2,
typename T3,
136 typename T4,
typename T5,
typename T6>
137 boost::shared_ptr<typename WorkPipeline<T0,T1,T2,T3,T4,T5,T6>::ResT>
139 if (!finished_)
return boost::shared_ptr<ResT>();
143 template<
typename T0,
typename T1,
typename T2,
typename T3,
144 typename T4,
typename T5,
typename T6>
179 template<
typename T0,
typename T1,
typename T2,
typename T3,
180 typename T4,
typename T5,
typename T6>
181 template<
int kS,
typename NextT>
184 boost::shared_ptr<NextT> res = boost::get<kS>(sg_)->Result();
186 boost::get<kS>(sg_)->Release();
187 if (boost::get<kS+1>(sg_)) {
190 g->Start(kS+1, boost::bind(&SelfT::WorkStageCb,
191 this, kS+1, _1),res);
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
boost::shared_ptr< ResT > Result() const
boost::function< void(bool)> FinFn
boost::function< bool(const std::vector< boost::shared_ptr< SubResultT > > &subs, const boost::shared_ptr< InputT > &inp, ResultT &res)> MergeFn
WorkStage(std::vector< std::pair< int, int > > tinfo, ExecuteFn efn, MergeFn mfn=0, int tid=0, int tinst=-1)
void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr< InputT > &inp)
void Start(FinFn finFn, const boost::shared_ptr< T0 > &inp)
WorkPipeline(WorkStageIf< T0, T1 > *s0, WorkStageIf< T1, T2 > *s1=NULL, WorkStageIf< T2, T3 > *s2=NULL, WorkStageIf< T3, T4 > *s3=NULL, WorkStageIf< T4, T5 > *s4=NULL, WorkStageIf< T5, T6 > *s5=NULL)
boost::function< ExternalBase::Efn(uint32_t inst, const std::vector< ExternalT * > &exts, const InputT &inp, SubResultT &subRes)> ExecuteFn
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
void WorkProcCb(uint32_t inst, bool ret_code)
void WorkStageCb(uint32_t stage, bool ret_code)
boost::shared_ptr< ResultT > Result() const