OpenSDN source code
task.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef ctrlplane_task_h
6 #define ctrlplane_task_h
7 
8 #include <boost/scoped_ptr.hpp>
9 #include <boost/intrusive/list.hpp>
10 #include <map>
11 #include <shared_mutex>
12 #include <vector>
13 #include <mutex>
14 // TODO: change deprecated tbb::task to something else
15 #define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
16 #include <tbb/task.h>
17 #include <tbb/task_scheduler_init.h>
18 #include "base/util.h"
19 
20 class TaskGroup;
21 class TaskEntry;
22 class SandeshTaskScheduler;
23 class TaskTbbKeepAwake;
24 class EventManager;
25 class TaskMonitor;
26 class TaskScheduler;
27 
79 class Task {
80 public:
81 
83  enum State {
84 
87 
90 
92  RUN
93  };
94 
96  enum TbbState {
100  TBB_DONE
101  };
102 
104  const static int kTaskInstanceAny = -1;
105 
108  Task(int task_id, int task_data_id);
109 
110 
113  Task(int task_id);
114 
116  virtual ~Task() { };
117 
120  virtual bool Run() = 0;
121 
125  virtual void OnTaskCancel() { };
126 
127  // Accessor methods
128 
130  State state() const { return state_; };
131 
133  int task_code_id() const { return task_code_id_; };
134 
136  int task_data_id() const { return task_data_id_; };
137 
139  uint64_t seqno() const { return seqno_; };
140 
143  friend std::ostream& operator<<(std::ostream& out, const Task &task);
144 
147  static Task *Running();
148 
150  bool task_cancelled() const { return task_cancel_; };
151 
153  virtual std::string Description() const = 0;
154 
156  uint64_t enqueue_time() const { return enqueue_time_; }
157 
159  uint64_t schedule_time() const { return schedule_time_; }
160 
162  uint32_t execute_delay() const { return execute_delay_; }
163 
166  uint32_t schedule_delay() const { return schedule_delay_; }
167 
168 private:
169 
171  friend class TaskEntry;
172 
174  friend class TaskScheduler;
175 
177  friend class TaskImpl;
178 
180  void seqno(uint64_t seqno) {seqno_ = seqno;};
181 
183  void tbb_state(TbbState s) { tbb_state_ = s; };
184 
186  void state(State s) { state_ = s; };
187 
189  void set_task_recycle() { task_recycle_ = true; };
190 
192  void set_task_complete() { task_recycle_ = false; };
193 
195  void StartTask(TaskScheduler *scheduler);
196 
199 
202 
206 
209 
212 
214  uint64_t seqno_;
215 
219 
222 
225  uint64_t enqueue_time_;
226 
228  uint64_t schedule_time_;
229 
232  uint32_t execute_delay_;
233 
236  uint32_t schedule_delay_;
237 
238  // Hook in intrusive list for TaskEntry::waitq_
239  boost::intrusive::list_member_hook<> waitq_hook_;
240 
242 };
243 
247 
250  TaskExclusion(int task_code_id)
251  : match_code_id(task_code_id), match_data_id(-1) {}
252 
255  TaskExclusion(int task_code_id, int task_data_id)
256  : match_code_id(task_code_id), match_data_id(task_data_id) {
257  }
258 
262 
266 };
267 
270 typedef std::vector<TaskExclusion> TaskPolicy;
271 
274 struct TaskStats {
275 
278 
281 
284 
286  uint64_t enqueue_count_;
287 
290 
292  uint64_t last_exit_time_;
293 };
294 
305 public:
306  typedef boost::function<void(const char *file_name, uint32_t line_no,
307  const Task *task, const char *description,
308  uint64_t delay)> LogFn;
309 
314  TaskScheduler(int thread_count = 0);
315 
317  ~TaskScheduler();
318 
319  static void Initialize(uint32_t thread_count = 0, EventManager *evm = NULL);
320  static TaskScheduler *GetInstance();
321 
327  void Enqueue(Task *task);
328 
329  void EnqueueUnLocked(Task *task);
330 
335  };
336 
341 
351  void SetPolicy(int task_id, TaskPolicy &policy);
352 
353  bool GetRunStatus() { return running_; };
354  int GetTaskId(const std::string &name);
355  std::string GetTaskName(int task_id) const;
356 
357  TaskStats *GetTaskGroupStats(int task_id);
358  TaskStats *GetTaskStats(int task_id);
359  TaskStats *GetTaskStats(int task_id, int instance_id);
360  void ClearTaskGroupStats(int task_id);
361  void ClearTaskStats(int task_id);
362  void ClearTaskStats(int task_id, int instance_id);
363 
365  TaskGroup *GetTaskGroup(int task_id);
366 
369  TaskGroup *QueryTaskGroup(int task_id);
370 
373  bool IsTaskGroupEmpty(int task_id) const;
374 
376  TaskEntry *GetTaskEntry(int task_id, int instance_id);
377 
379  TaskEntry *QueryTaskEntry(int task_id, int instance_id);
380 
383  void OnTaskExit(Task *task);
384 
386  void Stop();
387 
389  void Start();
390 
392  void Print();
393 
398  bool IsEmpty(bool running_only = false);
399 
400  void Terminate();
401 
403 
406  static int GetThreadCount(int thread_count = 0);
407  static bool ShouldUseSpawn();
408 
409  static int GetDefaultThreadCount();
410 
411  uint64_t enqueue_count() const { return enqueue_count_; }
412  uint64_t done_count() const { return done_count_; }
413  uint64_t cancel_count() const { return cancel_count_; }
414 
416  void SetMaxThreadCount(int n);
417  void GetSandeshData(SandeshTaskScheduler *resp, bool summary);
418  void Log(const char *file_name, uint32_t line_no, const Task *task,
419  const char *description, uint64_t delay);
420  void RegisterLog(LogFn fn);
421 
422  void SetTrackRunTime(bool value) { track_run_time_ = value; }
423  bool track_run_time() const { return track_run_time_; }
424 
426  void EnableLatencyThresholds(uint32_t execute, uint32_t schedule);
427  uint32_t schedule_delay() const { return schedule_delay_; }
428  uint32_t execute_delay() const { return execute_delay_; }
429 
430  bool measure_delay() const { return measure_delay_; }
431  void SetLatencyThreshold(const std::string &name, uint32_t execute,
432  uint32_t schedule);
433  uint32_t schedule_delay(Task *task) const;
434  uint32_t execute_delay(Task *task) const;
436 
437  void DisableTaskGroup(int task_id);
438  void EnableTaskGroup(int task_id);
439  void DisableTaskEntry(int task_id, int instance_id);
440  void EnableTaskEntry(int task_id, int instance_id);
441 
442  void ModifyTbbKeepAwakeTimeout(uint32_t timeout);
443 
445  void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec,
446  uint64_t inactivity_time_msec,
447  uint64_t poll_interval_msec);
448  const TaskMonitor *task_monitor() const { return task_monitor_; }
450  bool use_spawn() const { return use_spawn_; }
451 
454  static void SetThreadAmpFactor(int n);
455 
456 private:
457  friend class ConcurrencyScope;
458  typedef std::vector<TaskGroup *> TaskGroupDb;
459  typedef std::map<std::string, int> TaskIdMap;
460 
461  static const int kVectorGrowSize = 16;
462  static boost::scoped_ptr<TaskScheduler> singleton_;
463 
464  // XXX
465  // Following two methods are only for Unit Testing to control
466  // current running task. Usage of this method would result in
467  // unexpected behavior.
468 
472  void SetRunningTask(Task *);
473  void ClearRunningTask();
475 
485  int CountThreadsPerPid(pid_t pid);
486 
490 
491  tbb::task_scheduler_init task_scheduler_;
492  mutable std::mutex mutex_;
493  bool running_;
494  uint64_t seqno_;
496 
497  std::shared_mutex id_map_mutex_;
499  int id_max_;
500 
503 
506 
508  uint32_t schedule_delay_;
509 
511  uint32_t execute_delay_;
512 
513  uint64_t enqueue_count_;
514  uint64_t done_count_;
515  uint64_t cancel_count_;
517 
520  static int ThreadAmpFactor_;
521 
525 };
526 
527 #endif
A class maintaning information for every <task, instance>
Definition: task.cc:95
TaskGroup maintains per <task-id> information including,.
Definition: task.cc:229
A private class used to implement tbb::task An object is created when task is ready for execution and...
Definition: task.cc:47
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void EnqueueUnLocked(Task *task)
Definition: task.cc:647
uint64_t enqueue_count_
Definition: task.h:513
uint64_t done_count_
Definition: task.h:514
bool IsTaskGroupEmpty(int task_id) const
Check if there are any Tasks in the given TaskGroup. Assumes that all task ids are mutually exclusive...
Definition: task.cc:581
void Stop()
Stops scheduling of all tasks.
Definition: task.cc:797
bool measure_delay_
Definition: task.h:505
TaskTbbKeepAwake * tbb_awake_task_
Definition: task.h:522
TaskEntry * stop_entry_
Definition: task.h:489
uint64_t cancel_count_
Definition: task.h:515
int hw_thread_count_
Definition: task.h:502
static boost::scoped_ptr< TaskScheduler > singleton_
Definition: task.h:462
TaskIdMap id_map_
Definition: task.h:498
void Terminate()
Definition: task.cc:983
void EnableTaskGroup(int task_id)
Definition: task.cc:1030
int GetTaskId(const std::string &name)
Definition: task.cc:861
static int GetThreadCount(int thread_count=0)
Get number of tbb worker threads. For testing purposes only. Limit the number of tbb worker threads.
Definition: task.cc:414
CancelReturnCode
Definition: task.h:331
@ CANCELLED
Definition: task.h:332
std::mutex mutex_
Definition: task.h:492
uint64_t enqueue_count() const
Definition: task.h:411
tbb::task_scheduler_init task_scheduler_
Definition: task.h:491
void SetMaxThreadCount(int n)
Force number of threads.
void Log(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)
Definition: task.cc:530
TaskGroup * QueryTaskGroup(int task_id)
Query TaskGroup for a task_id.Assumes valid entry is present for task_id.
Definition: task.cc:577
LogFn log_fn_
Definition: task.h:501
static void SetThreadAmpFactor(int n)
following function allows one to increase max num of threads used by TBB
Definition: task.cc:1016
bool measure_delay() const
Definition: task.h:430
boost::function< void(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)> LogFn
Definition: task.h:308
uint64_t cancel_count() const
Definition: task.h:413
TaskMonitor * task_monitor_
Definition: task.h:523
void GetSandeshData(SandeshTaskScheduler *resp, bool summary)
Definition: task.cc:1627
int CountThreadsPerPid(pid_t pid)
Platfrom-dependent subroutine in Linux and FreeBSD implementations, used only in TaskScheduler::WaitF...
Definition: task.cc:928
TaskScheduler(int thread_count=0)
TaskScheduler constructor. TBB assumes it can use the "thread" invoking tbb::scheduler can be used fo...
Definition: task.cc:451
void WaitForTerminateCompletion()
Definition: task.cc:957
void DisableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1037
void DisableTaskGroup(int task_id)
Definition: task.cc:1020
uint32_t execute_delay_
Log if time taken to execute exceeds the delay.
Definition: task.h:511
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
Definition: task.cc:512
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static int GetDefaultThreadCount()
Definition: task.cc:436
void SetTrackRunTime(bool value)
Definition: task.h:422
TaskStats * GetTaskGroupStats(int task_id)
Definition: task.cc:904
std::map< std::string, int > TaskIdMap
Definition: task.h:459
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec, uint64_t inactivity_time_msec, uint64_t poll_interval_msec)
Enable Task monitoring.
Definition: task.cc:518
TaskGroupDb task_group_db_
Definition: task.h:495
EventManager * evm_
Definition: task.h:516
void ClearRunningTask()
Definition: task.cc:1011
~TaskScheduler()
Frees up the task_entry_db_ allocated for scheduler.
Definition: task.cc:462
static void Initialize(uint32_t thread_count=0, EventManager *evm=NULL)
Definition: task.cc:485
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
Definition: task.cc:617
std::string GetTaskName(int task_id) const
Definition: task.cc:851
void EnableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1042
uint64_t done_count() const
Definition: task.h:412
int id_max_
Definition: task.h:499
uint64_t seqno_
Definition: task.h:494
TaskEntry * GetTaskEntry(int task_id, int instance_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:590
void SetLatencyThreshold(const std::string &name, uint32_t execute, uint32_t schedule)
Definition: task.cc:609
DISALLOW_COPY_AND_ASSIGN(TaskScheduler)
static TaskScheduler * GetInstance()
Definition: task.cc:554
static int ThreadAmpFactor_
following variable allows one to increase max num of threads used by TBB
Definition: task.h:520
void Start()
Starts scheduling of all tasks.
Definition: task.cc:803
TaskGroup * GetTaskGroup(int task_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:561
std::vector< TaskGroup * > TaskGroupDb
Definition: task.h:458
TaskEntry * QueryTaskEntry(int task_id, int instance_id)
Query TaskEntry for a task-id and task-instance.
Definition: task.cc:595
bool GetRunStatus()
Definition: task.h:353
int HardwareThreadCount()
Definition: task.h:402
uint32_t schedule_delay_
Log if time between enqueue and task-execute exceeds the delay.
Definition: task.h:508
void ClearTaskStats(int task_id)
Definition: task.cc:888
TaskStats * GetTaskStats(int task_id)
Definition: task.cc:912
bool running_
Definition: task.h:493
void OnTaskExit(Task *task)
Method invoked on exit of a Task. Exit of a task can potentially start tasks in pendingq.
Definition: task.cc:767
bool track_run_time_
Definition: task.h:504
bool track_run_time() const
Definition: task.h:423
bool use_spawn_
Use spawn() to run a tbb::task instead of enqueue()
Definition: task.h:488
void RegisterLog(LogFn fn)
Definition: task.cc:538
static bool ShouldUseSpawn()
Definition: task.cc:440
void set_event_manager(EventManager *evm)
Definition: task.cc:500
uint32_t schedule_delay() const
Definition: task.h:427
void Print()
Debug print routine.
Definition: task.cc:813
uint32_t execute_delay() const
Definition: task.h:428
const TaskMonitor * task_monitor() const
Definition: task.h:448
const TaskTbbKeepAwake * tbb_awake_task() const
Definition: task.h:449
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Definition: task.cc:704
static const int kVectorGrowSize
Definition: task.h:461
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true,...
Definition: task.cc:828
std::shared_mutex id_map_mutex_
Definition: task.h:497
bool use_spawn() const
Definition: task.h:450
void ClearTaskGroupStats(int task_id)
Definition: task.cc:880
void EnableLatencyThresholds(uint32_t execute, uint32_t schedule)
Enable logging of tasks exceeding configured latency.
Definition: task.cc:602
void SetRunningTask(Task *)
This function should not be called in production code. It is only for unit testing to control current...
Definition: task.cc:1006
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
static Task * Running()
Returns a pointer to the current task the code is executing under.
Definition: task.cc:1567
friend std::ostream & operator<<(std::ostream &out, const Task &task)
Provides access to private members of a task for the output stream redirection operator.
static const int kTaskInstanceAny
Specifies value for wildcard (any or *) task data ID.
Definition: task.h:104
uint64_t seqno() const
Returns the sequence number of this task.
Definition: task.h:139
bool task_recycle_
Determines if the task must be rescheduled (reused) after its completion.
Definition: task.h:218
void state(State s)
Sets a state for this task.
Definition: task.h:186
void StartTask(TaskScheduler *scheduler)
Starts execution of a task.
Definition: task.cc:1547
TbbState tbb_state_
Stores a state of the TBB object.
Definition: task.h:211
void tbb_state(TbbState s)
Sets a TBB state for the task.
Definition: task.h:183
uint64_t seqno_
Stores the sequence number.
Definition: task.h:214
bool task_cancelled() const
Returns true if the task has been canceled.
Definition: task.h:150
DISALLOW_COPY_AND_ASSIGN(Task)
uint64_t schedule_time() const
Returns the time when the task execution was started.
Definition: task.h:159
State state_
Stores a state of the task.
Definition: task.h:208
virtual ~Task()
Destroys a task.
Definition: task.h:116
void set_task_recycle()
Marks this task for recycle.
Definition: task.h:189
int task_data_id_
The dataset id within a code path.
Definition: task.h:201
uint32_t schedule_delay() const
Returns the time threshold for time difference between moments when the task was started and when it ...
Definition: task.h:166
virtual void OnTaskCancel()
Called on task exit, if it is marked for cancellation. If the user wants to do any cleanup on task ca...
Definition: task.h:125
int task_code_id() const
Returns the code ID of this task.
Definition: task.h:133
tbb::task * task_impl_
A pointer to an Intel TBB object storing low-level information to manage the task.
Definition: task.h:205
int task_data_id() const
Returns the data ID of this task.
Definition: task.h:136
uint64_t enqueue_time() const
Returns the time when the task was enqueued for execution.
Definition: task.h:156
Task(int task_id, int task_data_id)
Creates a new task with the given values of task code ID and task data ID.
Definition: task.cc:1534
uint64_t schedule_time_
Contains the time when the task was started.
Definition: task.h:228
TbbState
Describes states of a task according to TBB library.
Definition: task.h:96
@ TBB_DONE
Definition: task.h:100
@ TBB_INIT
Definition: task.h:97
@ TBB_EXEC
Definition: task.h:99
@ TBB_ENQUEUED
Definition: task.h:98
void set_task_complete()
Marks this task as completed (forbids recycling)
Definition: task.h:192
void seqno(uint64_t seqno)
Sets sequence number of the task.
Definition: task.h:180
virtual bool Run()=0
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
virtual std::string Description() const =0
Gives a description of the task.
bool task_cancel_
Determines if the task's execution was canceled.
Definition: task.h:221
uint32_t execute_delay_
Sets threshold for the task's execution time. If the threshold is exceeded, the event is logged.
Definition: task.h:232
State state() const
Returns a state value of a task.
Definition: task.h:130
uint32_t schedule_delay_
Sets threshold for delay between enqueueing and execution. If the threshold is exceeded,...
Definition: task.h:236
uint32_t execute_delay() const
Returns the threshold for the task execution duration.
Definition: task.h:162
int task_code_id_
The code path executed by the task.
Definition: task.h:198
uint64_t enqueue_time_
Contains the time when the task was enqueued for execution.
Definition: task.h:225
State
Task states.
Definition: task.h:83
@ WAIT
A task is waiting in a queue.
Definition: task.h:89
@ RUN
A task is being run.
Definition: task.h:92
@ INIT
A task was initialized.
Definition: task.h:86
boost::intrusive::list_member_hook waitq_hook_
Definition: task.h:239
static EventManager evm
The class is used to specify a Task label for formulating a task exclusion list (an execution policy)...
Definition: task.h:246
int match_data_id
Specifies task data ID for a task execution policy. The value of -1 corresponds to wildcard (any).
Definition: task.h:265
TaskExclusion(int task_code_id)
Creates a new task exclusion from the given task code ID value and wildcard for task data ID.
Definition: task.h:250
TaskExclusion(int task_code_id, int task_data_id)
Creates a new task exclusion from the given task code ID and task data ID values.
Definition: task.h:255
int match_code_id
Specifies task code ID (must be a valid id >= 0) for a task execution policy.
Definition: task.h:261
uint64_t total_tasks_completed_
Number of total tasks ran.
Definition: task.h:289
uint64_t last_exit_time_
Number of time stamp of latest exist.
Definition: task.h:292
int run_count_
Number of entries currently running.
Definition: task.h:280
int defer_count_
Number of entries in deferq.
Definition: task.h:283
int wait_count_
Number of entries in waitq.
Definition: task.h:277
uint64_t enqueue_count_
Number of tasks enqueued.
Definition: task.h:286
Definition: task_int.h:10
std::vector< TaskExclusion > TaskPolicy
Defines a type to store an execution policy (a list of task exclusions).
Definition: task.h:270
struct task_ task