OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
request_pipeline.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 //
6 // request_pipeline.cc
7 //
8 // Implementation of RequestPipeline
9 //
10 // - The PipeImpl class implements the Pipeline
11 // It contains multiple stages inside it.
12 //
13 // - The StageImpl class hold a given stage of the Pipeline
14 // It contains multiple instances inside it.
15 //
16 // - The StageWorker class is a Task which runs the Client's callback functions
17 //
18 
19 #include <boost/utility.hpp>
20 #include <utility>
21 #include <tbb/atomic.h>
22 #include "base/logging.h"
23 #include "base/task.h"
24 #include <queue>
25 #include <boost/assign/list_of.hpp>
26 #include "tbb/mutex.h"
27 
28 #include "sandesh/sandesh_types.h"
29 #include "sandesh.h"
30 #include "request_pipeline.h"
31 
32 using namespace std;
33 
35  snhRequest_ = sr->SharedPtr();
36 }
37 
39 public:
40  PipeImpl(const PipeSpec& spec);
41  ~PipeImpl() {}
42 
43  const StageData* GetStageData(int stage) const;
44 
45  bool RunInstance(int instNum);
46 
47  void DoneInstance(void);
48 
49 private:
50  bool NextStage(void);
51 
52  const PipeSpec spec_;
53  boost::ptr_vector<StageImpl> stageImpls_;
55 
56  static tbb::mutex mutex_;
57  static int activePipes_;
58  static queue<PipeImpl*> pendPipes_;
59 };
60 
62 public:
63  StageImpl(const StageSpec& spec, int stage);
65 
66  // Decrement the number of outstanding instances.
67  // Returns "true" when there are no instances left.
68  bool DecrInst() {
69  int prev = remainingInst_.fetch_and_decrement();
70  return (prev == 1 ? true : false);
71  }
72 
74  tbb::atomic<int> remainingInst_;
75 };
76 
78 public:
79  StageWorker(PipeImpl& pImpl, int taskId, int instId, int instNum) :
80  Task(taskId, instId) , pImpl_(pImpl), instNum_(instNum) {}
81 
82  virtual bool Run() {
83 
84  if (!pImpl_.RunInstance(instNum_)) return false;
85 
86  pImpl_.DoneInstance();
87 
88  return true;
89  }
90  std::string Description() const { return "RequestPipeline::StageWorker"; }
91 private:
93  const int instNum_;
94 };
95 
98 queue<RequestPipeline::PipeImpl*> RequestPipeline::PipeImpl::pendPipes_;
99 
100 // This is the interface for the client callback function to look into
101 // the Client Data generated during earlier stages of the pipeline.
102 //
103 // Returns the StageData for a previous stage of the pipeline
106  if (impl_)
107  return impl_->GetStageData(stage);
108  else
109  return NULL;
110 }
111 
112 // Constructor of PipeImpl.
113 // If there are no active Pipelines, start it's first stage.
114 // If there are active Pipelines, queue this up for later
116  spec_(spec, this), currentStage_(-1) {
117 
118  tbb::mutex::scoped_lock lock(mutex_);
119  if (activePipes_ < 1) {
120  activePipes_++;
121  NextStage();
122  }
123  else {
124  pendPipes_.push(this);
125  }
126 }
127 
128 // This function moves the Pipeline to it's next stage.
129 // If we are the last stage,
130 // - delete the pipeline
131 // - release the Sandesh
132 // - if any Pipelines are queue up, start one pipeline from the queue
133 //
134 // Return true if we moved to the next stage of the given pipeline.
135 // false if reached the last stage.
136 bool
138  currentStage_++;
139  if (currentStage_ == static_cast<int>(spec_.stages_.size())) {
140  delete this;
141  {
142  tbb::mutex::scoped_lock lock(mutex_);
143  if (!pendPipes_.empty()) {
144  PipeImpl * pipe = pendPipes_.front();
145  pendPipes_.pop();
146  pipe->NextStage();
147  } else {
148  activePipes_--;
149  }
150  }
151  return false;
152  }
153  else {
154  stageImpls_.push_back(new StageImpl(
155  spec_.stages_[currentStage_],currentStage_));
156 
157  const StageSpec& stageSpec = spec_.stages_[currentStage_];
158  int insts = stageSpec.instances_.size();
159  for (int i=0; i < insts; i++) {
160  StageWorker * sw = new StageWorker(*this, stageSpec.taskId_,
161  stageSpec.instances_[i], i);
163  scheduler->Enqueue(sw);
164  }
165  return true;
166  }
167 }
168 
169 // The StageWorker calls this function when an Instance completes.
170 // If all instances of the current stage have completed execution,
171 // we can move to the next stage of the pipeline.
172 void
174  if (stageImpls_[currentStage_].DecrInst()) {
175  NextStage();
176  }
177 }
178 
179 // The StageWorker calls this function to drive execution
180 // of the client callback function.
181 //
182 // Returns true if this instance's execution is complete
183 // false if the callback needs to be scheduled again.
184 bool
186  const StageSpec &ss = spec_.stages_[currentStage_];
187  return ss.cbFn_(spec_.snhRequest_.get(), spec_,
188  currentStage_, instNum, (ss.allocFn_.empty() ?
189  NULL : &(stageImpls_[currentStage_].data_[instNum])));
190 }
191 
192 // This function allows the client callback function to look into
193 // the Client Data generated during earlier stages of the pipeline.
194 //
195 // Returns the StageData for a previous stage of the pipeline
198  if (stage >= currentStage_)
199  return NULL;
200  else
201  return &(stageImpls_[stage].data_);
202 }
203 
204 // Contructor for StageImpl
205 // Creates the StageWorker for each Instance of this Stage
206 // Also Creates the Client Data for each Instance.
208  remainingInst_ = spec.instances_.size();
209  for (int i=0; i<remainingInst_; i++) {
210  if (!spec.allocFn_.empty()) data_.push_back(spec.allocFn_(stage));
211  }
212 }
213 
215  impl_ = new PipeImpl(spec);
216 }
217 
bool RunInstance(int instNum)
std::vector< int > instances_
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
boost::ptr_vector< InstData > StageData
std::string Description() const
static queue< PipeImpl * > pendPipes_
boost::shared_ptr< const SandeshRequest > SharedPtr() const
Definition: p/sandesh.h:487
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
PipeSpec(const SandeshRequest *sr)
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
StageWorker(PipeImpl &pImpl, int taskId, int instId, int instNum)
tbb::atomic< int > remainingInst_
boost::ptr_vector< StageImpl > stageImpls_
PipeImpl(const PipeSpec &spec)
const StageData * GetStageData(int stage) const
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
RequestPipeline(const PipeSpec &spec)
StageImpl(const StageSpec &spec, int stage)
const StageData * GetStageData(int stage) const