OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef ctrlplane_task_h
6 #define ctrlplane_task_h
7 
8 #include <boost/scoped_ptr.hpp>
9 #include <boost/intrusive/list.hpp>
10 #include <map>
11 #include <vector>
12 #include <tbb/mutex.h>
13 #include <tbb/reader_writer_lock.h>
14 #include <tbb/task.h>
15 #include <tbb/task_scheduler_init.h>
16 #include "base/util.h"
17 
18 class TaskGroup;
19 class TaskEntry;
20 class SandeshTaskScheduler;
21 class TaskTbbKeepAwake;
22 class EventManager;
23 class TaskMonitor;
24 class TaskScheduler;
25 
26 struct TaskStats {
27 
30 
33 
36 
38  uint64_t enqueue_count_;
39 
42 
44  uint64_t last_exit_time_;
45 };
46 
47 struct TaskExclusion {
48  TaskExclusion(int task_id) : match_id(task_id), match_instance(-1) {}
49  TaskExclusion(int task_id, int instance_id)
50  : match_id(task_id), match_instance(instance_id) {
51  }
52 
54  int match_id;
55 
58 };
59 typedef std::vector<TaskExclusion> TaskPolicy;
60 
86 class Task {
87 public:
89  enum State {
93  };
94 
95  enum TbbState {
100  };
101 
102  const static int kTaskInstanceAny = -1;
103  Task(int task_id, int task_instance);
104  Task(int task_id);
105  virtual ~Task() { };
106 
109  virtual bool Run() = 0;
110 
114  virtual void OnTaskCancel() { };
115 
116  // Accessor methods
117  State GetState() const { return state_; };
118  int GetTaskId() const { return task_id_; };
119  int GetTaskInstance() const { return task_instance_; };
120  uint64_t GetSeqno() const { return seqno_; };
121  friend std::ostream& operator<<(std::ostream& out, const Task &task);
122 
125  static Task *Running();
126 
127  bool task_cancelled() const { return task_cancel_; };
128  virtual std::string Description() const = 0;
129 
130  uint64_t enqueue_time() const { return enqueue_time_; }
131  uint64_t schedule_time() const { return schedule_time_; }
132  uint32_t execute_delay() const { return execute_delay_; }
133  uint32_t schedule_delay() const { return schedule_delay_; }
134 
135 private:
136  friend class TaskEntry;
137  friend class TaskScheduler;
138  friend class TaskImpl;
139  void SetSeqNo(uint64_t seqno) {seqno_ = seqno;};
140  void SetTbbState(TbbState s) { tbb_state_ = s; };
141  void SetState(State s) { state_ = s; };
142  void SetTaskRecycle() { task_recycle_ = true; };
143  void SetTaskComplete() { task_recycle_ = false; };
144 
146  void StartTask(TaskScheduler *scheduler);
147 
149  int task_id_;
150 
156  uint64_t seqno_;
159  uint64_t enqueue_time_;
160  uint64_t schedule_time_;
161  uint32_t execute_delay_;
162  uint32_t schedule_delay_;
163  // Hook in intrusive list for TaskEntry::waitq_
164  boost::intrusive::list_member_hook<> waitq_hook_;
165 
167 };
168 
179 public:
180  typedef boost::function<void(const char *file_name, uint32_t line_no,
181  const Task *task, const char *description,
182  uint64_t delay)> LogFn;
183 
188  TaskScheduler(int thread_count = 0);
189 
191  ~TaskScheduler();
192 
193  static void Initialize(uint32_t thread_count = 0, EventManager *evm = NULL);
194  static TaskScheduler *GetInstance();
195 
201  void Enqueue(Task *task);
202 
203  void EnqueueUnLocked(Task *task);
204 
209  };
210 
215 
225  void SetPolicy(int task_id, TaskPolicy &policy);
226 
227  bool GetRunStatus() { return running_; };
228  int GetTaskId(const std::string &name);
229  std::string GetTaskName(int task_id) const;
230 
231  TaskStats *GetTaskGroupStats(int task_id);
232  TaskStats *GetTaskStats(int task_id);
233  TaskStats *GetTaskStats(int task_id, int instance_id);
234  void ClearTaskGroupStats(int task_id);
235  void ClearTaskStats(int task_id);
236  void ClearTaskStats(int task_id, int instance_id);
237 
239  TaskGroup *GetTaskGroup(int task_id);
240 
243  TaskGroup *QueryTaskGroup(int task_id);
244 
247  bool IsTaskGroupEmpty(int task_id) const;
248 
250  TaskEntry *GetTaskEntry(int task_id, int instance_id);
251 
253  TaskEntry *QueryTaskEntry(int task_id, int instance_id);
254 
257  void OnTaskExit(Task *task);
258 
260  void Stop();
261 
263  void Start();
264 
266  void Print();
267 
272  bool IsEmpty(bool running_only = false);
273 
274  void Terminate();
275 
277 
280  static int GetThreadCount(int thread_count = 0);
281  static bool ShouldUseSpawn();
282 
283  static int GetDefaultThreadCount();
284 
285  uint64_t enqueue_count() const { return enqueue_count_; }
286  uint64_t done_count() const { return done_count_; }
287  uint64_t cancel_count() const { return cancel_count_; }
288 
290  void SetMaxThreadCount(int n);
291  void GetSandeshData(SandeshTaskScheduler *resp, bool summary);
292  void Log(const char *file_name, uint32_t line_no, const Task *task,
293  const char *description, uint64_t delay);
294  void RegisterLog(LogFn fn);
295 
296  void SetTrackRunTime(bool value) { track_run_time_ = value; }
297  bool track_run_time() const { return track_run_time_; }
298 
300  void EnableLatencyThresholds(uint32_t execute, uint32_t schedule);
301  uint32_t schedule_delay() const { return schedule_delay_; }
302  uint32_t execute_delay() const { return execute_delay_; }
303 
304  bool measure_delay() const { return measure_delay_; }
305  void SetLatencyThreshold(const std::string &name, uint32_t execute,
306  uint32_t schedule);
307  uint32_t schedule_delay(Task *task) const;
308  uint32_t execute_delay(Task *task) const;
310 
311  void DisableTaskGroup(int task_id);
312  void EnableTaskGroup(int task_id);
313  void DisableTaskEntry(int task_id, int instance_id);
314  void EnableTaskEntry(int task_id, int instance_id);
315 
316  void ModifyTbbKeepAwakeTimeout(uint32_t timeout);
317 
319  void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec,
320  uint64_t inactivity_time_msec,
321  uint64_t poll_interval_msec);
322  const TaskMonitor *task_monitor() const { return task_monitor_; }
324  bool use_spawn() const { return use_spawn_; }
325 
328  static void SetThreadAmpFactor(int n);
329 
330 private:
331  friend class ConcurrencyScope;
332  typedef std::vector<TaskGroup *> TaskGroupDb;
333  typedef std::map<std::string, int> TaskIdMap;
334 
335  static const int kVectorGrowSize = 16;
336  static boost::scoped_ptr<TaskScheduler> singleton_;
337 
338  // XXX
339  // Following two methods are only for Unit Testing to control
340  // current running task. Usage of this method would result in
341  // unexpected behavior.
342 
346  void SetRunningTask(Task *);
347  void ClearRunningTask();
349 
359  int CountThreadsPerPid(pid_t pid);
360 
364 
365  tbb::task_scheduler_init task_scheduler_;
366  mutable tbb::mutex mutex_;
367  bool running_;
368  uint64_t seqno_;
370 
371  tbb::reader_writer_lock id_map_mutex_;
373  int id_max_;
374 
377 
380 
382  uint32_t schedule_delay_;
383 
385  uint32_t execute_delay_;
386 
387  uint64_t enqueue_count_;
388  uint64_t done_count_;
389  uint64_t cancel_count_;
391 
394  static int ThreadAmpFactor_;
395 
399 };
400 
401 #endif
int task_id_
The code path executed by the task.
Definition: task.h:149
void DisableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1032
bool measure_delay_
Definition: task.h:379
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec, uint64_t inactivity_time_msec, uint64_t poll_interval_msec)
Enable Task monitoring.
Definition: task.cc:511
std::vector< TaskGroup * > TaskGroupDb
Definition: task.h:332
int wait_count_
Number of entries in waitq.
Definition: task.h:29
bool task_recycle_
Definition: task.h:157
TaskMonitor * task_monitor_
Definition: task.h:397
const TaskTbbKeepAwake * tbb_awake_task() const
Definition: task.h:323
TaskEntry * QueryTaskEntry(int task_id, int instance_id)
Query TaskEntry for a task-id and task-instance.
Definition: task.cc:588
static const int kVectorGrowSize
Definition: task.h:335
void GetSandeshData(SandeshTaskScheduler *resp, bool summary)
Definition: task.cc:1622
void EnableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1037
tbb::task * task_impl_
Definition: task.h:153
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
bool task_cancel_
Definition: task.h:158
int hw_thread_count_
Definition: task.h:376
static int GetDefaultThreadCount()
Definition: task.cc:429
static void Initialize(uint32_t thread_count=0, EventManager *evm=NULL)
Definition: task.cc:478
static Task * Running()
Returns a pointer to the current task the code is executing under.
Definition: task.cc:1562
TaskEntry * GetTaskEntry(int task_id, int instance_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:583
uint64_t total_tasks_completed_
Number of total tasks ran.
Definition: task.h:41
static const int kTaskInstanceAny
Definition: task.h:102
TaskIdMap id_map_
Definition: task.h:372
uint32_t schedule_delay() const
Definition: task.h:133
static boost::scoped_ptr< TaskScheduler > singleton_
Definition: task.h:336
TaskStats * GetTaskStats(int task_id)
Definition: task.cc:907
tbb::reader_writer_lock id_map_mutex_
Definition: task.h:371
DISALLOW_COPY_AND_ASSIGN(Task)
CancelReturnCode
Definition: task.h:205
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
Definition: task.cc:610
uint32_t schedule_delay_
Log if time between enqueue and task-execute exceeds the delay.
Definition: task.h:382
void SetTbbState(TbbState s)
Definition: task.h:140
TaskEntry * stop_entry_
Definition: task.h:363
uint32_t execute_delay() const
Definition: task.h:132
uint64_t enqueue_count_
Definition: task.h:387
DISALLOW_COPY_AND_ASSIGN(TaskScheduler)
uint32_t execute_delay() const
Definition: task.h:302
virtual ~Task()
Definition: task.h:105
bool measure_delay() const
Definition: task.h:304
virtual void OnTaskCancel()
Called on task exit, if it is marked for cancellation. If the user wants to do any cleanup on task ca...
Definition: task.h:114
TaskGroupDb task_group_db_
Definition: task.h:369
TaskGroup * QueryTaskGroup(int task_id)
Query TaskGroup for a task_id.Assumes valid entry is present for task_id.
Definition: task.cc:570
EventManager * evm_
Definition: task.h:390
void Start()
Starts scheduling of all tasks.
Definition: task.cc:798
Definition: task_int.h:10
uint32_t execute_delay_
Definition: task.h:161
TaskStats * GetTaskGroupStats(int task_id)
Definition: task.cc:899
uint64_t seqno_
Definition: task.h:156
TbbState
Definition: task.h:95
void set_event_manager(EventManager *evm)
Definition: task.cc:493
int run_count_
Number of entries currently running.
Definition: task.h:32
TaskScheduler(int thread_count=0)
TaskScheduler constructor. TBB assumes it can use the &quot;thread&quot; invoking tbb::scheduler can be used fo...
Definition: task.cc:444
TbbState tbb_state_
Definition: task.h:155
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Definition: task.cc:699
TaskGroup maintains per &lt;task-id&gt; information including,.
Definition: task.cc:222
int match_id
must be a valid id (&gt;= 0).
Definition: task.h:54
int GetTaskId(const std::string &name)
Definition: task.cc:856
A class maintaning information for every &lt;task, instance&gt;
Definition: task.cc:95
void StartTask(TaskScheduler *scheduler)
Starts execution of a task.
Definition: task.cc:1542
int CountThreadsPerPid(pid_t pid)
Platfrom-dependent subroutine in Linux and FreeBSD implementations, used only in TaskScheduler::WaitF...
Definition: task.cc:923
State
Task states.
Definition: task.h:89
void EnableLatencyThresholds(uint32_t execute, uint32_t schedule)
Enable logging of tasks exceeding configured latency.
Definition: task.cc:595
virtual bool Run()=0
Code to execute. Returns true if task is completed. Return false to reschedule the task...
bool track_run_time() const
Definition: task.h:297
uint64_t last_exit_time_
Number of time stamp of latest exist.
Definition: task.h:44
void Print()
Debug print routine.
Definition: task.cc:808
void WaitForTerminateCompletion()
Definition: task.cc:952
void SetTrackRunTime(bool value)
Definition: task.h:296
void SetTaskComplete()
Definition: task.h:143
State state_
Definition: task.h:154
static void SetThreadAmpFactor(int n)
following function allows one to increase max num of threads used by TBB
Definition: task.cc:1011
void RegisterLog(LogFn fn)
Definition: task.cc:531
const TaskMonitor * task_monitor() const
Definition: task.h:322
Definition: task.h:26
tbb::task_scheduler_init task_scheduler_
Definition: task.h:365
bool task_cancelled() const
Definition: task.h:127
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
A private class used to implement tbb::task An object is created when task is ready for execution and...
Definition: task.cc:47
LogFn log_fn_
Definition: task.h:375
boost::intrusive::list_member_hook waitq_hook_
Definition: task.h:164
virtual std::string Description() const =0
uint32_t schedule_delay_
Definition: task.h:162
TaskTbbKeepAwake * tbb_awake_task_
Definition: task.h:396
void ClearRunningTask()
Definition: task.cc:1006
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true, enqueued tasks are ignored i.e. return true if there are no running tasks. Ignore TaskGroup or TaskEntry if it is disabled.
Definition: task.cc:823
void SetSeqNo(uint64_t seqno)
Definition: task.h:139
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
Definition: task.cc:505
int GetTaskInstance() const
Definition: task.h:119
uint64_t cancel_count_
Definition: task.h:389
int defer_count_
Number of entries in deferq.
Definition: task.h:35
void SetTaskRecycle()
Definition: task.h:142
void SetRunningTask(Task *)
This function should not be called in production code. It is only for unit testing to control current...
Definition: task.cc:1001
void ClearTaskGroupStats(int task_id)
Definition: task.cc:875
void SetMaxThreadCount(int n)
Force number of threads.
std::vector< TaskExclusion > TaskPolicy
Definition: task.h:59
~TaskScheduler()
Frees up the task_entry_db_ allocated for scheduler.
Definition: task.cc:455
State GetState() const
Definition: task.h:117
uint64_t cancel_count() const
Definition: task.h:287
uint64_t done_count_
Definition: task.h:388
uint64_t enqueue_count_
Number of tasks enqueued.
Definition: task.h:38
uint32_t execute_delay_
Log if time taken to execute exceeds the delay.
Definition: task.h:385
TaskGroup * GetTaskGroup(int task_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:554
uint64_t enqueue_time() const
Definition: task.h:130
bool use_spawn() const
Definition: task.h:324
int GetTaskId() const
Definition: task.h:118
std::string GetTaskName(int task_id) const
Definition: task.cc:846
int id_max_
Definition: task.h:373
uint64_t GetSeqno() const
Definition: task.h:120
void DisableTaskGroup(int task_id)
Definition: task.cc:1015
void Log(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)
Definition: task.cc:523
bool GetRunStatus()
Definition: task.h:227
bool running_
Definition: task.h:367
static int ThreadAmpFactor_
following variable allows one to increase max num of threads used by TBB
Definition: task.h:394
TaskExclusion(int task_id, int instance_id)
Definition: task.h:49
Definition: task.h:92
static bool ShouldUseSpawn()
Definition: task.cc:433
Task(int task_id, int task_instance)
Definition: task.cc:1529
void Stop()
Stops scheduling of all tasks.
Definition: task.cc:792
uint64_t schedule_time() const
Definition: task.h:131
TaskExclusion(int task_id)
Definition: task.h:48
int task_instance_
The dataset id within a code path.
Definition: task.h:152
void EnableTaskGroup(int task_id)
Definition: task.cc:1025
std::map< std::string, int > TaskIdMap
Definition: task.h:333
tbb::mutex mutex_
Definition: task.h:366
friend std::ostream & operator<<(std::ostream &out, const Task &task)
uint64_t schedule_time_
Definition: task.h:160
bool track_run_time_
Definition: task.h:378
bool use_spawn_
Use spawn() to run a tbb::task instead of enqueue()
Definition: task.h:362
void SetLatencyThreshold(const std::string &name, uint32_t execute, uint32_t schedule)
Definition: task.cc:602
uint64_t done_count() const
Definition: task.h:286
void OnTaskExit(Task *task)
Method invoked on exit of a Task. Exit of a task can potentially start tasks in pendingq.
Definition: task.cc:762
bool IsTaskGroupEmpty(int task_id) const
Check if there are any Tasks in the given TaskGroup. Assumes that all task ids are mutually exclusive...
Definition: task.cc:574
void Terminate()
Definition: task.cc:978
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
uint64_t enqueue_time_
Definition: task.h:159
uint32_t schedule_delay() const
Definition: task.h:301
void EnqueueUnLocked(Task *task)
Definition: task.cc:642
static int GetThreadCount(int thread_count=0)
Get number of tbb worker threads. For testing purposes only. Limit the number of tbb worker threads...
Definition: task.cc:407
uint64_t enqueue_count() const
Definition: task.h:285
int HardwareThreadCount()
Definition: task.h:276
uint64_t seqno_
Definition: task.h:368
struct task_ task
static EventManager evm
boost::function< void(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)> LogFn
Definition: task.h:182
int match_instance
-1 (wildcard) or user specified id.
Definition: task.h:57
void SetState(State s)
Definition: task.h:141
void ClearTaskStats(int task_id)
Definition: task.cc:883