OpenSDN source code
request_pipeline.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // request_pipeline.cc
7 //
8 // Implementation of RequestPipeline
9 //
10 // - The PipeImpl class implements the Pipeline
11 // It contains multiple stages inside it.
12 //
13 // - The StageImpl class hold a given stage of the Pipeline
14 // It contains multiple instances inside it.
15 //
16 // - The StageWorker class is a Task which runs the Client's callback functions
17 //
18 
19 #include <boost/utility.hpp>
20 #include <atomic>
21 #include <utility>
22 #include "base/logging.h"
23 #include "base/task.h"
24 #include <queue>
25 #include <boost/assign/list_of.hpp>
26 
27 #include "sandesh/sandesh_types.h"
28 #include "sandesh.h"
29 #include "request_pipeline.h"
30 
31 using namespace std;
32 
34  snhRequest_ = sr->SharedPtr();
35 }
36 
38 public:
39  PipeImpl(const PipeSpec& spec);
40  ~PipeImpl() {}
41 
42  const StageData* GetStageData(int stage) const;
43 
44  bool RunInstance(int instNum);
45 
46  void DoneInstance(void);
47 
48 private:
49  bool NextStage(void);
50 
51  const PipeSpec spec_;
52  boost::ptr_vector<StageImpl> stageImpls_;
54 
55  static std::mutex mutex_;
56  static int activePipes_;
57  static queue<PipeImpl*> pendPipes_;
58 };
59 
61 public:
62  StageImpl(const StageSpec& spec, int stage);
64 
65  // Decrement the number of outstanding instances.
66  // Returns "true" when there are no instances left.
67  bool DecrInst() {
68  int prev = remainingInst_.fetch_sub(1);
69  return (prev == 1 ? true : false);
70  }
71 
73  std::atomic<int> remainingInst_;
74 };
75 
77 public:
78  StageWorker(PipeImpl& pImpl, int taskId, int instId, int instNum) :
79  Task(taskId, instId) , pImpl_(pImpl), instNum_(instNum) {}
80 
81  virtual bool Run() {
82 
83  if (!pImpl_.RunInstance(instNum_)) return false;
84 
85  pImpl_.DoneInstance();
86 
87  return true;
88  }
89  std::string Description() const { return "RequestPipeline::StageWorker"; }
90 private:
92  const int instNum_;
93 };
94 
97 queue<RequestPipeline::PipeImpl*> RequestPipeline::PipeImpl::pendPipes_;
98 
99 // This is the interface for the client callback function to look into
100 // the Client Data generated during earlier stages of the pipeline.
101 //
102 // Returns the StageData for a previous stage of the pipeline
105  if (impl_)
106  return impl_->GetStageData(stage);
107  else
108  return NULL;
109 }
110 
111 // Constructor of PipeImpl.
112 // If there are no active Pipelines, start it's first stage.
113 // If there are active Pipelines, queue this up for later
115  spec_(spec, this), currentStage_(-1) {
116 
117  std::scoped_lock lock(mutex_);
118  if (activePipes_ < 1) {
119  activePipes_++;
120  NextStage();
121  }
122  else {
123  pendPipes_.push(this);
124  }
125 }
126 
127 // This function moves the Pipeline to it's next stage.
128 // If we are the last stage,
129 // - delete the pipeline
130 // - release the Sandesh
131 // - if any Pipelines are queue up, start one pipeline from the queue
132 //
133 // Return true if we moved to the next stage of the given pipeline.
134 // false if reached the last stage.
135 bool
137  currentStage_++;
138  if (currentStage_ == static_cast<int>(spec_.stages_.size())) {
139  delete this;
140  {
141  std::scoped_lock lock(mutex_);
142  if (!pendPipes_.empty()) {
143  PipeImpl * pipe = pendPipes_.front();
144  pendPipes_.pop();
145  pipe->NextStage();
146  } else {
147  activePipes_--;
148  }
149  }
150  return false;
151  }
152  else {
153  stageImpls_.push_back(new StageImpl(
154  spec_.stages_[currentStage_],currentStage_));
155 
156  const StageSpec& stageSpec = spec_.stages_[currentStage_];
157  int insts = stageSpec.instances_.size();
158  for (int i=0; i < insts; i++) {
159  StageWorker * sw = new StageWorker(*this, stageSpec.taskId_,
160  stageSpec.instances_[i], i);
162  scheduler->Enqueue(sw);
163  }
164  return true;
165  }
166 }
167 
168 // The StageWorker calls this function when an Instance completes.
169 // If all instances of the current stage have completed execution,
170 // we can move to the next stage of the pipeline.
171 void
173  if (stageImpls_[currentStage_].DecrInst()) {
174  NextStage();
175  }
176 }
177 
178 // The StageWorker calls this function to drive execution
179 // of the client callback function.
180 //
181 // Returns true if this instance's execution is complete
182 // false if the callback needs to be scheduled again.
183 bool
185  const StageSpec &ss = spec_.stages_[currentStage_];
186  return ss.cbFn_(spec_.snhRequest_.get(), spec_,
187  currentStage_, instNum, (ss.allocFn_.empty() ?
188  NULL : &(stageImpls_[currentStage_].data_[instNum])));
189 }
190 
191 // This function allows the client callback function to look into
192 // the Client Data generated during earlier stages of the pipeline.
193 //
194 // Returns the StageData for a previous stage of the pipeline
197  if (stage >= currentStage_)
198  return NULL;
199  else
200  return &(stageImpls_[stage].data_);
201 }
202 
203 // Contructor for StageImpl
204 // Creates the StageWorker for each Instance of this Stage
205 // Also Creates the Client Data for each Instance.
207  remainingInst_ = spec.instances_.size();
208  for (int i=0; i<remainingInst_; i++) {
209  if (!spec.allocFn_.empty()) data_.push_back(spec.allocFn_(stage));
210  }
211 }
212 
214  impl_ = new PipeImpl(spec);
215 }
216 
const StageData * GetStageData(int stage) const
static queue< PipeImpl * > pendPipes_
bool RunInstance(int instNum)
PipeImpl(const PipeSpec &spec)
boost::ptr_vector< StageImpl > stageImpls_
StageImpl(const StageSpec &spec, int stage)
std::atomic< int > remainingInst_
std::string Description() const
Gives a description of the task.
virtual bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
StageWorker(PipeImpl &pImpl, int taskId, int instId, int instNum)
boost::ptr_vector< InstData > StageData
RequestPipeline(const PipeSpec &spec)
boost::shared_ptr< const SandeshRequest > SharedPtr() const
Definition: cpp/sandesh.h:489
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
const StageData * GetStageData(int stage) const
PipeSpec(const SandeshRequest *sr)
std::vector< int > instances_