OpenSDN source code
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
work_pipeline-inl.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef __WORK_PIPELINE_INL_H__
6 #define __WORK_PIPELINE_INL_H__
7 
8 
9 template<typename InputT, typename ResultT, typename ExternalT, typename SubResultT>
11  std::vector<std::pair<int,int> > tinfo,
12  ExecuteFn efn, MergeFn mfn, int tid, int tinst) :
13  stage_(std::numeric_limits<uint32_t>::max()),
14  merger_(mfn),
15  efn_(efn),
16  finished_(false),
17  running_(false),
18  tid_(tid),
19  tinst_(tinst),
20  tinfo_(tinfo) {}
21 
22 
23 template<typename InputT, typename ResultT, typename ExternalT, typename SubResultT>
24 void
26  const boost::shared_ptr<InputT> & inp) {
27  assert(!running_);
28  assert(!finished_);
29  stage_ = stage;
30  inp_ = inp;
31  finFn_ = finFn;
32  remainingInst_ = tinfo_.size();
33  for (uint32_t tk = 0 ; tk < tinfo_.size(); tk++) {
34  workers_.push_back(boost::shared_ptr<WorkProcessor<InputT,SubResultT,ExternalT> >(
37  this, tk, _1),
38  *inp_, tk, tinfo_[tk].first, tinfo_[tk].second)));
39  }
40  for (uint32_t tk = 0 ; tk < tinfo_.size(); tk++) {
41  workers_[tk]->Start();
42  }
43 
44 }
45 
46 template<typename InputT, typename ResultT, typename ExternalT, typename SubResultT>
47 boost::shared_ptr<ResultT>
49  if (!finished_) return boost::shared_ptr<ResultT>();
50  return res_;
51 }
52 
53 template<typename InputT, typename ResultT, typename ExternalT, typename SubResultT>
54 void
56  assert(finished_);
57  assert(subRes_.size() == tinfo_.size());
58  inp_.reset();
59  for (uint32_t i = 0; i < subRes_.size(); i++) {
60  subRes_[i].reset();
61  }
62  res_.reset();
63 }
64 
65 
66 template<typename InputT, typename ResultT, typename ExternalT, typename SubResultT>
67 bool
69  running_ = true;
70  if (!(merger_)(subRes_, inp_, *res_)) {
71  return false;
72  }
73  running_ = false;
74  finished_ = true;
75  finFn_(true);
76  return true;
77 }
78 
79 
80 template<typename InputT, typename ResultT, typename ExternalT, typename SubResultT>
81 void
83  uint32_t prev = remainingInst_.fetch_and_decrement();
84  if (prev == 1) {
85  assert(workers_.size() == tinfo_.size());
86  for (uint32_t i = 0; i < workers_.size(); i++) {
87  subRes_.push_back(workers_[i]->Result());
88  workers_[i]->Release();
89  }
90  StageProceed(boost::is_same<ResultT,SubResultT>());
91  if (merger_.empty()) {
92  finished_ = true;
93  finFn_(true);
94  } else {
95  res_.reset(new ResultT);
96  PipelineWorker *w = new PipelineWorker(tid_, tinst_,
98  this));
100  scheduler->Enqueue(w);
101  }
102  }
103 }
104 
105 
106 template<typename T0, typename T1, typename T2, typename T3,
107  typename T4, typename T5, typename T6>
109  WorkStageIf<T0,T1> * s0,
110  WorkStageIf<T1,T2> * s1,
111  WorkStageIf<T2,T3> * s2,
112  WorkStageIf<T3,T4> * s3,
113  WorkStageIf<T4,T5> * s4,
114  WorkStageIf<T5,T6> * s5) :
115  finished_(false),
116  sg_(
117  boost::shared_ptr<WorkStageIf<T0,T1> >(s0),
118  boost::shared_ptr<WorkStageIf<T1,T2> >(s1),
119  boost::shared_ptr<WorkStageIf<T2,T3> >(s2),
120  boost::shared_ptr<WorkStageIf<T3,T4> >(s3),
121  boost::shared_ptr<WorkStageIf<T4,T5> >(s4),
122  boost::shared_ptr<WorkStageIf<T5,T6> >(s5)) {}
123 
124 template<typename T0, typename T1, typename T2, typename T3,
125  typename T4, typename T5, typename T6>
126 void
127 WorkPipeline<T0,T1,T2,T3,T4,T5,T6>::Start(FinFn finFn, const boost::shared_ptr<T0> & inp) {
128  inp_ = inp;
129  finFn_ = finFn;
130  typename boost::tuples::element<0, sg_type>::type g(boost::get<0>(sg_));
131  g->Start(0, boost::bind(&SelfT::WorkStageCb,
132  this, 0, _1),inp_);
133 }
134 
135 template<typename T0, typename T1, typename T2, typename T3,
136  typename T4, typename T5, typename T6>
137 boost::shared_ptr<typename WorkPipeline<T0,T1,T2,T3,T4,T5,T6>::ResT>
139  if (!finished_) return boost::shared_ptr<ResT>();
140  return res_;
141 }
142 
143 template<typename T0, typename T1, typename T2, typename T3,
144  typename T4, typename T5, typename T6>
145 void
146 WorkPipeline<T0,T1,T2,T3,T4,T5,T6>::WorkStageCb(uint32_t stage, bool ret_code) {
147  switch(stage) {
148  case 0: {
149  NextStage<0,T1>();
150  }
151  break;
152  case 1: {
153  NextStage<1,T2>();
154  }
155  break;
156  case 2: {
157  NextStage<2,T3>();
158  }
159  break;
160  case 3: {
161  NextStage<3,T4>();
162  }
163  break;
164  case 4: {
165  NextStage<4,T5>();
166  }
167  break;
168  case 5: {
169  typename boost::tuples::element<5, sg_type>::type g(boost::get<5>(sg_));
170  res_ = g->Result();
171  g->Release();
172  finished_ = true;
173  finFn_(true);
174  }
175  break;
176  }
177 }
178 
179 template<typename T0, typename T1, typename T2, typename T3,
180  typename T4, typename T5, typename T6>
181 template<int kS,typename NextT>
182 void
184  boost::shared_ptr<NextT> res = boost::get<kS>(sg_)->Result();
186  boost::get<kS>(sg_)->Release();
187  if (boost::get<kS+1>(sg_)) {
188  res_.reset();
189  typename boost::tuples::element<kS+1, sg_type>::type g(boost::get<kS+1>(sg_));
190  g->Start(kS+1, boost::bind(&SelfT::WorkStageCb,
191  this, kS+1, _1),res);
192  } else {
193  finished_ = true;
194  finFn_(true);
195  }
196 }
197 
198 #endif
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
boost::shared_ptr< ResT > Result() const
boost::function< void(bool)> FinFn
Definition: work_pipeline.h:75
boost::function< bool(const std::vector< boost::shared_ptr< SubResultT > > &subs, const boost::shared_ptr< InputT > &inp, ResultT &res)> MergeFn
bool Runner(void)
WorkStage(std::vector< std::pair< int, int > > tinfo, ExecuteFn efn, MergeFn mfn=0, int tid=0, int tinst=-1)
void Start(uint32_t stage, FinFn finFn, const boost::shared_ptr< InputT > &inp)
uint8_t type
Definition: load_balance.h:109
void Start(FinFn finFn, const boost::shared_ptr< T0 > &inp)
WorkPipeline(WorkStageIf< T0, T1 > *s0, WorkStageIf< T1, T2 > *s1=NULL, WorkStageIf< T2, T3 > *s2=NULL, WorkStageIf< T3, T4 > *s3=NULL, WorkStageIf< T4, T5 > *s4=NULL, WorkStageIf< T5, T6 > *s5=NULL)
boost::function< ExternalBase::Efn(uint32_t inst, const std::vector< ExternalT * > &exts, const InputT &inp, SubResultT &subRes)> ExecuteFn
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
void WorkProcCb(uint32_t inst, bool ret_code)
void WorkStageCb(uint32_t stage, bool ret_code)
boost::shared_ptr< ResultT > Result() const