9 #include <boost/intrusive/set.hpp>
10 #include <boost/optional.hpp>
12 #include "tbb/atomic.h"
14 #include "tbb/enumerable_thread_specific.h"
21 #include <base/sandesh/task_types.h>
30 typedef tbb::enumerable_thread_specific<Task *>
TaskInfo;
39 #define TASK_TRACE(scheduler, task, msg, delay)\
41 scheduler->Log(__FILE__, __LINE__, task, msg, delay);\
98 TaskEntry(
int task_id,
int task_instance);
103 void AddToWaitQ(
Task *t);
104 bool DeleteFromWaitQ(
Task *t);
114 bool DeferOnPolicyFail(
Task *t);
118 void RunTask(
Task *t);
125 void RunCombinedDeferQ();
127 void RunDeferEntry();
131 void RunDeferQForGroupEnable();
135 void ClearTaskStats();
143 boost::optional<uint64_t> GetTaskDeferEntrySeqno()
const;
157 void GetSandeshData(SandeshTaskEntry *resp)
const;
164 typedef boost::intrusive::member_hook<
Task,
166 typedef boost::intrusive::list<Task, WaitQHook>
TaskWaitQ;
169 typedef boost::intrusive::member_hook<
TaskEntry,
170 boost::intrusive::set_member_hook<>,
234 TaskEntry *QueryTaskEntry(
int task_instance)
const;
235 TaskEntry *GetTaskEntry(
int task_instance);
247 void AddEntriesToDisableQ();
272 void RunDisableEntries();
273 void TaskExited(
Task *t);
280 void ClearTaskGroupStats();
281 void ClearTaskStats();
282 void ClearTaskStats(
int instance_id);
285 void GetSandeshData(SandeshTaskGroup *resp,
bool summary)
const;
291 for (TaskEntryList::const_iterator it = task_entry_db_.begin();
292 it != task_entry_db_.end(); ++it) {
306 typedef boost::intrusive::member_hook<
TaskEntry,
307 boost::intrusive::set_member_hook<>,
358 if (parent_->enqueue_time() != 0) {
361 if ((t - parent_->enqueue_time()) >
363 TASK_TRACE(scheduler, parent_,
"TBB schedule time(in usec) ",
364 (t - parent_->enqueue_time()));
369 bool is_complete = parent_->Run();
375 TASK_TRACE(scheduler, parent_,
"Run time(in usec) ", delay);
384 if (is_complete ==
true) {
385 parent_->set_task_complete();
387 parent_->set_task_recycle();
389 }
catch (std::exception &e) {
393 static std::string what = e.what();
395 LOG(ERROR,
"!!!! ERROR !!!! Task caught fatal exception: " << what
396 <<
" TaskImpl: " <<
this);
399 LOG(ERROR,
"!!!! ERROR !!!! Task caught fatal unknown exception"
400 <<
" TaskImpl: " <<
this);
408 assert(parent_ != NULL);
416 static int num_cores_;
422 char *num_cores_str = getenv(
"TBB_THREAD_COUNT");
423 if (!num_cores_str) {
424 if (thread_count == 0)
425 num_cores_ = tbb::task_scheduler_init::default_num_threads();
427 num_cores_ = thread_count;
429 num_cores_ = strtol(num_cores_str, NULL, 0);
437 return tbb::task_scheduler_init::default_num_threads();
441 if (getenv(
"TBB_USE_SPAWN"))
467 if ((group = *iter) == NULL) {
474 for (TaskIdMap::iterator loc =
id_map_.begin(); loc !=
id_map_.end();
494 singleton_.get()->tbb_awake_task_->StartTbbKeepAwakeTask(
496 "TaskScheduler::TbbKeepAwake");
508 "TaskScheduler::TbbKeepAwake");
519 uint64_t tbb_keepawake_time_msec,
520 uint64_t inactivity_time_msec,
521 uint64_t poll_interval_msec) {
526 inactivity_time_msec, poll_interval_msec);
531 const Task *
task,
const char *description,
533 if (
log_fn_.empty() ==
false) {
534 log_fn_(file_name, line_no,
task, description, delay);
544 return task->schedule_delay();
550 return task->execute_delay();
562 assert(task_id >= 0);
564 if (size <= task_id) {
583 tbb::mutex::scoped_lock lock(
mutex_);
610 uint32_t execute, uint32_t schedule) {
618 tbb::mutex::scoped_lock lock(
mutex_);
624 for (
const auto& pol_item: policy) {
625 if (pol_item.match_data_id == -1) {
632 pol_item.match_data_id);
643 tbb::mutex::scoped_lock lock(
mutex_);
653 assert(t->
seqno() == 0);
706 tbb::mutex::scoped_lock lock(
mutex_);
716 Task *first_wait_task = &(*entry->
waitq_.begin());
734 }
else if (t == first_wait_task) {
743 assert(deferq_tentry == NULL);
746 }
else if (deferq_tentry) {
769 tbb::mutex::scoped_lock lock(
mutex_);
799 tbb::mutex::scoped_lock lock(
mutex_);
805 tbb::mutex::scoped_lock lock(
mutex_);
822 cout <<
"id: " << group->
task_id() <<
825 " task count: " << group->
num_tasks() << endl;
832 tbb::mutex::scoped_lock lock(
mutex_);
836 if ((group = *it) == NULL) {
845 if ((
false == running_only) && (
false == group->
IsWaitQEmpty())) {
853 for (TaskIdMap::const_iterator it =
id_map_.begin(); it !=
id_map_.end();
855 if (task_id == it->second)
867 tbb::reader_writer_lock::scoped_lock_read lock(
id_map_mutex_);
868 TaskIdMap::iterator loc =
id_map_.find(name);
877 id_map_.insert(make_pair(name, tid));
933 #if defined(__linux__)
934 std::ostringstream file_name;
937 file_name <<
"/proc/" << pid <<
"/status";
939 std::ifstream file(file_name.str().c_str());
942 LOG(ERROR,
"opening /proc failed");
946 while (threads == 0 && file.good()) {
948 if (line ==
"Threads:\t1") threads = 1;
952 #error "TaskScheduler::CountThreadsPerPid() - unsupported platform."
966 pid_t pid = getpid();
968 while (count++ < 12000) {
971 if (threadsRunning == 1)
974 if (threadsRunning == -1) {
975 LOG(ERROR,
"could not check if any thread is running");
991 for (
int i = 0; i < 10000; i++) {
1091 if (task_instance == -1)
1095 if (size <= task_instance) {
1100 if (entry == NULL) {
1109 if (task_instance == -1) {
1124 for (TaskGroupPolicyList::iterator it =
policy_.begin();
1126 if ((*it)->run_count_ != 0) {
1172 TaskDeferList::iterator it;
1177 TaskDeferList::iterator it_work = it++;
1203 if ((entry = *it) == NULL) {
1223 if ((entry = *it) == NULL) {
1270 task_data_id_(task_instance), run_count_(0), run_task_(NULL),
1271 waitq_(), deferq_task_entry_(NULL), deferq_task_group_(NULL),
1275 if (task_instance != -1) {
1284 task_data_id_(-1), run_count_(0), run_task_(NULL),
1285 deferq_task_entry_(NULL), deferq_task_group_(NULL), disable_(false) {
1305 if ((*it)->run_count_ != 0) {
1341 TaskWaitQ::iterator it =
waitq_.iterator_to(*t);
1379 TaskWaitQ::iterator it =
waitq_.begin();
1386 if (
waitq_.size() != 0) {
1391 while (it !=
waitq_.end()) {
1425 TaskDeferList::iterator it;
1428 while (it !=
deferq_->end()) {
1430 TaskDeferList::iterator it_work = it++;
1439 TaskDeferList::iterator it;
1442 while (it !=
deferq_->end()) {
1444 TaskDeferList::iterator it_work = it++;
1459 TaskDeferList::iterator group_it = group->
deferq_.begin();
1460 TaskDeferList::iterator entry_it =
deferq_->begin();
1464 while ((group_it != group->
deferq_.end()) &&
1465 (entry_it !=
deferq_->end())) {
1469 if (defer_entry_compare(g_entry, t_entry)) {
1470 TaskDeferList::iterator group_it_work = group_it++;
1474 TaskDeferList::iterator entry_it_work = entry_it++;
1481 if (group_it != group->
deferq_.end()) {
1483 }
else if (entry_it !=
deferq_->end()) {
1526 return task->seqno();
1535 Task::Task(
int task_id,
int task_instance) : task_code_id_(task_id),
1536 task_data_id_(task_instance), task_impl_(NULL), state_(INIT),
1537 tbb_state_(TBB_INIT),
seqno_(0), task_recycle_(false), task_cancel_(false),
1542 task_data_id_(-1), task_impl_(NULL), state_(INIT), tbb_state_(TBB_INIT),
1543 seqno_(0), task_recycle_(false), task_cancel_(false), enqueue_time_(0),
1553 TASK_TRACE(scheduler,
this,
"Schedule delay(in usec) ",
1587 resp->set_waitq_size(
waitq_.size());
1588 resp->set_deferq_size(
deferq_->size());
1595 std::vector<SandeshTaskEntry> list;
1598 SandeshTaskEntry entry_resp;
1600 list.push_back(entry_resp);
1606 SandeshTaskEntry entry_resp;
1608 list.push_back(entry_resp);
1611 resp->set_task_entry_list(list);
1617 std::vector<SandeshTaskPolicyEntry> policy_list;
1618 for (TaskGroupPolicyList::const_iterator it =
policy_.begin();
1620 SandeshTaskPolicyEntry policy_entry;
1621 policy_entry.set_task_name(scheduler->
GetTaskName((*it)->task_code_id_));
1622 policy_entry.set_tasks_running((*it)->run_count_);
1623 policy_list.push_back(policy_entry);
1625 resp->set_task_policy_list(policy_list);
1629 tbb::mutex::scoped_lock lock(
mutex_);
1633 resp->set_total_count(
seqno_);
1636 std::vector<SandeshTaskGroup> list;
1637 for (TaskIdMap::const_iterator it =
id_map_.begin(); it !=
id_map_.end();
1639 SandeshTaskGroup resp_group;
1641 resp_group.set_task_id(it->second);
1642 resp_group.set_name(it->first);
1645 list.push_back(resp_group);
1647 resp->set_task_group_list(list);
static void GetTaskStats(TaskProfileStats *stats, int index, ProfileData *data)
A class maintaning information for every <task, instance>
boost::optional< uint64_t > GetTaskDeferEntrySeqno() const
Addition/deletion of TaskEntry in the deferq_ is based on the seqno. seqno of the first Task in the w...
bool DeleteFromWaitQ(Task *t)
TaskEntryList policyq_
Policy rules for a task.
void SetDisable(bool disable)
Disables this task entry.
void RunDeferQForGroupEnable()
Starts executing tasks from deferq_ of TaskEntries which are enabled.
int task_code_id() const
Returns the code ID of this task entry.
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskEntry.
void AddPolicy(TaskEntry *entry)
TaskWaitQ waitq_
Tasks waiting to run on some condition.
TaskEntry * ActiveEntryInPolicy()
TaskDeferList * deferq_
Tasks deferred for this to exit.
int task_data_id() const
Returns the data ID of this task entry.
TaskStats * GetTaskStats()
DISALLOW_COPY_AND_ASSIGN(TaskEntry)
boost::intrusive::list< Task, WaitQHook > TaskWaitQ
void GetSandeshData(SandeshTaskEntry *resp) const
Task * run_task_
Task currently running.
void TaskExited(Task *t, TaskGroup *group)
TaskGroup * deferq_task_group_
int run_count_
No. of tasks running.
bool DeferOnPolicyFail(Task *t)
TaskStats stats_
Cummulative Maintenance stats.
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>, &TaskEntry::task_defer_node > TaskDeferListOption
boost::intrusive::member_hook< Task, boost::intrusive::list_member_hook<>, &Task::waitq_hook_ > WaitQHook
List of Task's in waitq_.
void DeleteFromDeferQ(TaskEntry &entry)
Deletes a task from deferq_.
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
TaskEntry * deferq_task_entry_
void RunCombinedDeferQ()
Starts executing tasks from deferq_ of TaskEntry and TaskGroup in the temporal order.
void AddToDeferQ(TaskEntry *entry)
Adds a task to deferq_. Only one task of a given instance goes into deferq_ for its policies.
void RunTask(Task *t)
Starts a task. If there are more entries in waitq_ add them to deferq_.
boost::intrusive::set_member_hook task_defer_node
int GetRunCount() const
Returns the count of runs for this task entry.
TaskGroup maintains per <task-id> information including,.
DISALLOW_COPY_AND_ASSIGN(TaskGroup)
void SetDisable(bool disable)
void IncrementTotalRunTime(int64_t rtime)
void ClearTaskGroupStats()
bool DeferOnPolicyFail(TaskEntry *entry, Task *t)
void RunDisableEntries()
Run tasks that maybe suspended. Schedule tasks only for TaskEntries which are enabled.
void AddPolicy(TaskGroup *group)
TaskEntry * disable_entry_
Task entry for disabled group.
tbb::atomic< uint64_t > total_run_time_
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskGroup.
void AddToDisableQ(TaskEntry *entry)
Enqueue TaskEntry in disable_entry's deferQ.
void DeleteFromDeferQ(TaskEntry &entry)
Delete task from deferq_.
static const int kVectorGrowSize
bool IsWaitQEmpty()
Returns true, if the waiq_ of all the tasks in the group are empty.
void AddEntriesToDisableQ()
Add TaskEntries to disable_entry_ which have tasks enqueued and are already disabled.
void GetSandeshData(SandeshTaskGroup *resp, bool summary) const
void AddToDeferQ(TaskEntry *entry)
Add task to deferq_ Only one task of a given instance goes into deferq_ for its policies.
std::vector< TaskGroup * > TaskGroupPolicyList
Vector of Task Group policies.
TaskEntry * GetDisableEntry()
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>, &TaskEntry::task_defer_node > TaskDeferListOption
TaskStats * GetTaskStats()
TaskDeferList deferq_
Tasks deferred till run_count_ is 0.
bool policy_set_
Specifies if policy is already set.
TaskEntry * GetTaskEntry(int task_instance)
TaskEntryList task_entry_db_
task-entries in this group
TaskEntry * task_entry_
Tasks deferred till run_count_ is 0.
int run_count_
No. of tasks running in the group.
TaskEntry * QueryTaskEntry(int task_instance) const
size_t deferq_size() const
TaskGroupPolicyList policy_
Policy rules for the group.
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
TaskGroup * ActiveGroupInPolicy()
TaskStats * GetTaskGroupStats()
A private class used to implement tbb::task An object is created when task is ready for execution and...
DISALLOW_COPY_AND_ASSIGN(TaskImpl)
virtual ~TaskImpl()
Destructor is called when a task execution is compeleted. Invoked implicitly by tbb::task....
tbb::task * execute()
Method called from tbb::task to execute. Invoke Run() method of client. Supports task continuation wh...
void Start(EventManager *evm)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void EnqueueUnLocked(Task *task)
bool IsTaskGroupEmpty(int task_id) const
Check if there are any Tasks in the given TaskGroup. Assumes that all task ids are mutually exclusive...
void Stop()
Stops scheduling of all tasks.
TaskTbbKeepAwake * tbb_awake_task_
tbb::reader_writer_lock id_map_mutex_
static boost::scoped_ptr< TaskScheduler > singleton_
void EnableTaskGroup(int task_id)
int GetTaskId(const std::string &name)
static int GetThreadCount(int thread_count=0)
Get number of tbb worker threads. For testing purposes only. Limit the number of tbb worker threads.
tbb::task_scheduler_init task_scheduler_
void Log(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)
TaskGroup * QueryTaskGroup(int task_id)
Query TaskGroup for a task_id.Assumes valid entry is present for task_id.
static void SetThreadAmpFactor(int n)
following function allows one to increase max num of threads used by TBB
boost::function< void(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)> LogFn
TaskMonitor * task_monitor_
void GetSandeshData(SandeshTaskScheduler *resp, bool summary)
int CountThreadsPerPid(pid_t pid)
Platfrom-dependent subroutine in Linux and FreeBSD implementations, used only in TaskScheduler::WaitF...
TaskScheduler(int thread_count=0)
TaskScheduler constructor. TBB assumes it can use the "thread" invoking tbb::scheduler can be used fo...
void WaitForTerminateCompletion()
void DisableTaskEntry(int task_id, int instance_id)
void DisableTaskGroup(int task_id)
uint32_t execute_delay_
Log if time taken to execute exceeds the delay.
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static int GetDefaultThreadCount()
TaskStats * GetTaskGroupStats(int task_id)
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec, uint64_t inactivity_time_msec, uint64_t poll_interval_msec)
Enable Task monitoring.
TaskGroupDb task_group_db_
~TaskScheduler()
Frees up the task_entry_db_ allocated for scheduler.
static void Initialize(uint32_t thread_count=0, EventManager *evm=NULL)
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
std::string GetTaskName(int task_id) const
void EnableTaskEntry(int task_id, int instance_id)
TaskEntry * GetTaskEntry(int task_id, int instance_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
void SetLatencyThreshold(const std::string &name, uint32_t execute, uint32_t schedule)
static TaskScheduler * GetInstance()
static int ThreadAmpFactor_
following variable allows one to increase max num of threads used by TBB
void Start()
Starts scheduling of all tasks.
TaskGroup * GetTaskGroup(int task_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
TaskEntry * QueryTaskEntry(int task_id, int instance_id)
Query TaskEntry for a task-id and task-instance.
uint32_t schedule_delay_
Log if time between enqueue and task-execute exceeds the delay.
void ClearTaskStats(int task_id)
TaskStats * GetTaskStats(int task_id)
void OnTaskExit(Task *task)
Method invoked on exit of a Task. Exit of a task can potentially start tasks in pendingq.
bool track_run_time() const
bool use_spawn_
Use spawn() to run a tbb::task instead of enqueue()
void RegisterLog(LogFn fn)
static bool ShouldUseSpawn()
void set_event_manager(EventManager *evm)
uint32_t schedule_delay() const
void Print()
Debug print routine.
uint32_t execute_delay() const
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
static const int kVectorGrowSize
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true,...
void ClearTaskGroupStats(int task_id)
void EnableLatencyThresholds(uint32_t execute, uint32_t schedule)
Enable logging of tasks exceeding configured latency.
void SetRunningTask(Task *)
This function should not be called in production code. It is only for unit testing to control current...
bool StartTbbKeepAwakeTask(TaskScheduler *ts, EventManager *event_mgr, const std::string task_name, uint32_t tbbKeepawakeTimeout=1000)
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
void ShutTbbKeepAwakeTask()
Task is a class to describe a computational task within OpenSDN control plane applications....
static Task * Running()
Returns a pointer to the current task the code is executing under.
uint64_t seqno() const
Returns the sequence number of this task.
bool task_recycle_
Determines if the task must be rescheduled (reused) after its completion.
void StartTask(TaskScheduler *scheduler)
Starts execution of a task.
void tbb_state(TbbState s)
Sets a TBB state for the task.
uint64_t seqno_
Stores the sequence number.
State state_
Stores a state of the task.
friend class TaskImpl
Gives access to private members for TaskImpl class.
int task_data_id_
The dataset id within a code path.
virtual void OnTaskCancel()
Called on task exit, if it is marked for cancellation. If the user wants to do any cleanup on task ca...
int task_code_id() const
Returns the code ID of this task.
tbb::task * task_impl_
A pointer to an Intel TBB object storing low-level information to manage the task.
int task_data_id() const
Returns the data ID of this task.
Task(int task_id, int task_data_id)
Creates a new task with the given values of task code ID and task data ID.
uint64_t schedule_time_
Contains the time when the task was started.
bool task_cancel_
Determines if the task's execution was canceled.
uint32_t execute_delay_
Sets threshold for the task's execution time. If the threshold is exceeded, the event is logged.
State state() const
Returns a state value of a task.
uint32_t schedule_delay_
Sets threshold for delay between enqueueing and execution. If the threshold is exceeded,...
int task_code_id_
The code path executed by the task.
uint64_t enqueue_time_
Contains the time when the task was enqueued for execution.
@ WAIT
A task is waiting in a queue.
@ RUN
A task is being run.
@ INIT
A task was initialized.
boost::intrusive::list_member_hook waitq_hook_
#define LOG(_Level, _Msg)
Comparison routine for the TaskDeferList.
uint64_t total_tasks_completed_
Number of total tasks ran.
uint64_t last_exit_time_
Number of time stamp of latest exist.
int run_count_
Number of entries currently running.
int defer_count_
Number of entries in deferq.
int wait_count_
Number of entries in waitq.
uint64_t enqueue_count_
Number of tasks enqueued.
tbb::enumerable_thread_specific< Task * > TaskInfo
#define TASK_TRACE(scheduler, task, msg, delay)
std::vector< TaskEntry * > TaskEntryList
ostream & operator<<(ostream &out, const Task &t)
static TaskInfo task_running
std::vector< TaskExclusion > TaskPolicy
Defines a type to store an execution policy (a list of task exclusions).
#define CHECK_CONCURRENCY(...)
static uint64_t ClockMonotonicUsec()
static const std::string duration_usecs_to_string(const uint64_t usecs)
static uint64_t UTCTimestampUsec()