19 #include <boost/utility.hpp>
25 #include <boost/assign/list_of.hpp>
27 #include "sandesh/sandesh_types.h"
42 const StageData* GetStageData(
int stage)
const;
44 bool RunInstance(
int instNum);
46 void DoneInstance(
void);
68 int prev = remainingInst_.fetch_sub(1);
69 return (prev == 1 ?
true :
false);
79 Task(taskId, instId) , pImpl_(pImpl), instNum_(instNum) {}
83 if (!pImpl_.RunInstance(instNum_))
return false;
85 pImpl_.DoneInstance();
89 std::string
Description()
const {
return "RequestPipeline::StageWorker"; }
106 return impl_->GetStageData(stage);
115 spec_(spec, this), currentStage_(-1) {
117 std::scoped_lock lock(
mutex_);
138 if (currentStage_ ==
static_cast<int>(spec_.stages_.size())) {
141 std::scoped_lock lock(mutex_);
142 if (!pendPipes_.empty()) {
143 PipeImpl * pipe = pendPipes_.front();
154 spec_.stages_[currentStage_],currentStage_));
156 const StageSpec& stageSpec = spec_.stages_[currentStage_];
158 for (
int i=0; i < insts; i++) {
173 if (stageImpls_[currentStage_].DecrInst()) {
185 const StageSpec &ss = spec_.stages_[currentStage_];
186 return ss.
cbFn_(spec_.snhRequest_.get(), spec_,
187 currentStage_, instNum, (ss.
allocFn_.empty() ?
188 NULL : &(stageImpls_[currentStage_].data_[instNum])));
197 if (stage >= currentStage_)
200 return &(stageImpls_[stage].data_);
208 for (
int i=0; i<remainingInst_; i++) {
const StageData * GetStageData(int stage) const
static queue< PipeImpl * > pendPipes_
bool RunInstance(int instNum)
PipeImpl(const PipeSpec &spec)
boost::ptr_vector< StageImpl > stageImpls_
StageImpl(const StageSpec &spec, int stage)
std::atomic< int > remainingInst_
std::string Description() const
Gives a description of the task.
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
StageWorker(PipeImpl &pImpl, int taskId, int instId, int instNum)
boost::ptr_vector< InstData > StageData
RequestPipeline(const PipeSpec &spec)
boost::shared_ptr< const SandeshRequest > SharedPtr() const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a class to describe a computational task within OpenSDN control plane applications....
const StageData * GetStageData(int stage) const
PipeSpec(const SandeshRequest *sr)
std::vector< int > instances_