19 #include <boost/utility.hpp>
21 #include <tbb/atomic.h>
25 #include <boost/assign/list_of.hpp>
26 #include "tbb/mutex.h"
28 #include "sandesh/sandesh_types.h"
43 const StageData* GetStageData(
int stage)
const;
45 bool RunInstance(
int instNum);
47 void DoneInstance(
void);
69 int prev = remainingInst_.fetch_and_decrement();
70 return (prev == 1 ?
true :
false);
80 Task(taskId, instId) , pImpl_(pImpl), instNum_(instNum) {}
84 if (!pImpl_.RunInstance(instNum_))
return false;
86 pImpl_.DoneInstance();
90 std::string
Description()
const {
return "RequestPipeline::StageWorker"; }
107 return impl_->GetStageData(stage);
116 spec_(spec, this), currentStage_(-1) {
118 tbb::mutex::scoped_lock lock(
mutex_);
139 if (currentStage_ == static_cast<int>(spec_.stages_.size())) {
142 tbb::mutex::scoped_lock lock(mutex_);
143 if (!pendPipes_.empty()) {
144 PipeImpl * pipe = pendPipes_.front();
155 spec_.stages_[currentStage_],currentStage_));
157 const StageSpec& stageSpec = spec_.stages_[currentStage_];
159 for (
int i=0; i < insts; i++) {
174 if (stageImpls_[currentStage_].DecrInst()) {
186 const StageSpec &ss = spec_.stages_[currentStage_];
187 return ss.
cbFn_(spec_.snhRequest_.get(), spec_,
188 currentStage_, instNum, (ss.
allocFn_.empty() ?
189 NULL : &(stageImpls_[currentStage_].data_[instNum])));
198 if (stage >= currentStage_)
201 return &(stageImpls_[stage].data_);
209 for (
int i=0; i<remainingInst_; i++) {
bool RunInstance(int instNum)
std::vector< int > instances_
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
boost::ptr_vector< InstData > StageData
std::string Description() const
static queue< PipeImpl * > pendPipes_
boost::shared_ptr< const SandeshRequest > SharedPtr() const
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
PipeSpec(const SandeshRequest *sr)
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
StageWorker(PipeImpl &pImpl, int taskId, int instId, int instNum)
tbb::atomic< int > remainingInst_
boost::ptr_vector< StageImpl > stageImpls_
PipeImpl(const PipeSpec &spec)
const StageData * GetStageData(int stage) const
Task is a wrapper over tbb::task to support policies.
RequestPipeline(const PipeSpec &spec)
StageImpl(const StageSpec &spec, int stage)
const StageData * GetStageData(int stage) const