9 #include <boost/intrusive/set.hpp>
10 #include <boost/optional.hpp>
12 #include "tbb/atomic.h"
14 #include "tbb/enumerable_thread_specific.h"
21 #include <base/sandesh/task_types.h>
30 typedef tbb::enumerable_thread_specific<Task *>
TaskInfo;
39 #define TASK_TRACE(scheduler, task, msg, delay)\
41 scheduler->Log(__FILE__, __LINE__, task, msg, delay);\
98 TaskEntry(
int task_id,
int task_instance);
103 void AddToWaitQ(
Task *t);
104 bool DeleteFromWaitQ(
Task *t);
114 bool DeferOnPolicyFail(
Task *t);
118 void RunTask(
Task *t);
125 void RunCombinedDeferQ();
127 void RunDeferEntry();
131 void RunDeferQForGroupEnable();
135 void ClearTaskStats();
143 boost::optional<uint64_t> GetTaskDeferEntrySeqno()
const;
150 void GetSandeshData(SandeshTaskEntry *resp)
const;
157 typedef boost::intrusive::member_hook<
Task,
159 typedef boost::intrusive::list<Task, WaitQHook>
TaskWaitQ;
162 typedef boost::intrusive::member_hook<
TaskEntry,
163 boost::intrusive::set_member_hook<>,
227 TaskEntry *QueryTaskEntry(
int task_instance)
const;
228 TaskEntry *GetTaskEntry(
int task_instance);
240 void AddEntriesToDisableQ();
265 void RunDisableEntries();
266 void TaskExited(
Task *t);
273 void ClearTaskGroupStats();
274 void ClearTaskStats();
275 void ClearTaskStats(
int instance_id);
278 void GetSandeshData(SandeshTaskGroup *resp,
bool summary)
const;
284 for (TaskEntryList::const_iterator it = task_entry_db_.begin();
285 it != task_entry_db_.end(); ++it) {
299 typedef boost::intrusive::member_hook<
TaskEntry,
300 boost::intrusive::set_member_hook<>,
351 if (parent_->enqueue_time() != 0) {
354 if ((t - parent_->enqueue_time()) >
356 TASK_TRACE(scheduler, parent_,
"TBB schedule time(in usec) ",
357 (t - parent_->enqueue_time()));
362 bool is_complete = parent_->Run();
368 TASK_TRACE(scheduler, parent_,
"Run time(in usec) ", delay);
377 if (is_complete ==
true) {
378 parent_->SetTaskComplete();
380 parent_->SetTaskRecycle();
382 }
catch (std::exception &e) {
386 static std::string what = e.what();
388 LOG(ERROR,
"!!!! ERROR !!!! Task caught fatal exception: " << what
389 <<
" TaskImpl: " <<
this);
392 LOG(ERROR,
"!!!! ERROR !!!! Task caught fatal unknown exception"
393 <<
" TaskImpl: " <<
this);
401 assert(parent_ != NULL);
409 static int num_cores_;
415 char *num_cores_str = getenv(
"TBB_THREAD_COUNT");
416 if (!num_cores_str) {
417 if (thread_count == 0)
418 num_cores_ = tbb::task_scheduler_init::default_num_threads();
420 num_cores_ = thread_count;
422 num_cores_ = strtol(num_cores_str, NULL, 0);
430 return tbb::task_scheduler_init::default_num_threads();
434 if (getenv(
"TBB_USE_SPAWN"))
460 if ((group = *iter) == NULL) {
467 for (TaskIdMap::iterator loc =
id_map_.begin(); loc !=
id_map_.end();
487 singleton_.get()->tbb_awake_task_->StartTbbKeepAwakeTask(
489 "TaskScheduler::TbbKeepAwake");
501 "TaskScheduler::TbbKeepAwake");
512 uint64_t tbb_keepawake_time_msec,
513 uint64_t inactivity_time_msec,
514 uint64_t poll_interval_msec) {
519 inactivity_time_msec, poll_interval_msec);
524 const Task *
task,
const char *description,
526 if (
log_fn_.empty() ==
false) {
527 log_fn_(file_name, line_no,
task, description, delay);
537 return task->schedule_delay();
543 return task->execute_delay();
555 assert(task_id >= 0);
557 if (size <= task_id) {
576 tbb::mutex::scoped_lock lock(
mutex_);
603 uint32_t execute, uint32_t schedule) {
611 tbb::mutex::scoped_lock lock(
mutex_);
617 for (TaskPolicy::iterator it = policy.begin(); it != policy.end(); ++it) {
619 if (it->match_instance == -1) {
637 tbb::mutex::scoped_lock lock(
mutex_);
700 tbb::mutex::scoped_lock lock(
mutex_);
710 Task *first_wait_task = &(*entry->
waitq_.begin());
728 }
else if (t == first_wait_task) {
737 assert(deferq_tentry == NULL);
740 }
else if (deferq_tentry) {
763 tbb::mutex::scoped_lock lock(
mutex_);
793 tbb::mutex::scoped_lock lock(
mutex_);
799 tbb::mutex::scoped_lock lock(
mutex_);
816 cout <<
"id: " << group->
task_id() <<
819 " task count: " << group->
num_tasks() << endl;
826 tbb::mutex::scoped_lock lock(
mutex_);
830 if ((group = *it) == NULL) {
839 if ((
false == running_only) && (
false == group->
IsWaitQEmpty())) {
847 for (TaskIdMap::const_iterator it =
id_map_.begin(); it !=
id_map_.end();
849 if (task_id == it->second)
861 tbb::reader_writer_lock::scoped_lock_read lock(
id_map_mutex_);
862 TaskIdMap::iterator loc =
id_map_.find(name);
871 id_map_.insert(make_pair(name, tid));
927 #if defined(__linux__)
928 std::ostringstream file_name;
931 file_name <<
"/proc/" << pid <<
"/status";
933 std::ifstream file(file_name.str().c_str());
936 LOG(ERROR,
"opening /proc failed");
940 while (threads == 0 && file.good()) {
942 if (line ==
"Threads:\t1") threads = 1;
946 #error "TaskScheduler::CountThreadsPerPid() - unsupported platform."
960 pid_t pid = getpid();
962 while (count++ < 12000) {
965 if (threadsRunning == 1)
968 if (threadsRunning == -1) {
969 LOG(ERROR,
"could not check if any thread is running");
985 for (
int i = 0; i < 10000; i++) {
1085 if (task_instance == -1)
1089 if (size <= task_instance) {
1094 if (entry == NULL) {
1103 if (task_instance == -1) {
1118 for (TaskGroupPolicyList::iterator it =
policy_.begin();
1120 if ((*it)->run_count_ != 0) {
1166 TaskDeferList::iterator it;
1171 TaskDeferList::iterator it_work = it++;
1197 if ((entry = *it) == NULL) {
1217 if ((entry = *it) == NULL) {
1264 task_instance_(task_instance), run_count_(0), run_task_(NULL),
1265 waitq_(), deferq_task_entry_(NULL), deferq_task_group_(NULL),
1269 if (task_instance != -1) {
1278 task_instance_(-1), run_count_(0), run_task_(NULL),
1279 deferq_task_entry_(NULL), deferq_task_group_(NULL), disable_(false) {
1299 if ((*it)->run_count_ != 0) {
1335 TaskWaitQ::iterator it =
waitq_.iterator_to(*t);
1373 TaskWaitQ::iterator it =
waitq_.begin();
1380 if (
waitq_.size() != 0) {
1385 while (it !=
waitq_.end()) {
1419 TaskDeferList::iterator it;
1422 while (it !=
deferq_->end()) {
1424 TaskDeferList::iterator it_work = it++;
1433 TaskDeferList::iterator it;
1436 while (it !=
deferq_->end()) {
1438 TaskDeferList::iterator it_work = it++;
1453 TaskDeferList::iterator group_it = group->
deferq_.begin();
1454 TaskDeferList::iterator entry_it =
deferq_->begin();
1458 while ((group_it != group->
deferq_.end()) &&
1459 (entry_it !=
deferq_->end())) {
1463 if (defer_entry_compare(g_entry, t_entry)) {
1464 TaskDeferList::iterator group_it_work = group_it++;
1468 TaskDeferList::iterator entry_it_work = entry_it++;
1475 if (group_it != group->
deferq_.end()) {
1477 }
else if (entry_it !=
deferq_->end()) {
1520 return task->GetSeqno();
1530 task_instance_(task_instance), task_impl_(NULL), state_(INIT),
1531 tbb_state_(TBB_INIT),
seqno_(0), task_recycle_(false), task_cancel_(false),
1536 task_instance_(-1), task_impl_(NULL), state_(INIT), tbb_state_(TBB_INIT),
1537 seqno_(0), task_recycle_(false), task_cancel_(false), enqueue_time_(0),
1547 TASK_TRACE(scheduler,
this,
"Schedule delay(in usec) ",
1581 resp->set_waitq_size(
waitq_.size());
1582 resp->set_deferq_size(
deferq_->size());
1589 std::vector<SandeshTaskEntry> list;
1592 SandeshTaskEntry entry_resp;
1594 list.push_back(entry_resp);
1600 SandeshTaskEntry entry_resp;
1602 list.push_back(entry_resp);
1605 resp->set_task_entry_list(list);
1611 std::vector<SandeshTaskPolicyEntry> policy_list;
1612 for (TaskGroupPolicyList::const_iterator it =
policy_.begin();
1614 SandeshTaskPolicyEntry policy_entry;
1615 policy_entry.set_task_name(scheduler->
GetTaskName((*it)->task_id_));
1616 policy_entry.set_tasks_running((*it)->run_count_);
1617 policy_list.push_back(policy_entry);
1619 resp->set_task_policy_list(policy_list);
1623 tbb::mutex::scoped_lock lock(
mutex_);
1627 resp->set_total_count(
seqno_);
1630 std::vector<SandeshTaskGroup> list;
1631 for (TaskIdMap::const_iterator it =
id_map_.begin(); it !=
id_map_.end();
1633 SandeshTaskGroup resp_group;
1635 resp_group.set_task_id(it->second);
1636 resp_group.set_name(it->first);
1639 list.push_back(resp_group);
1641 resp->set_task_group_list(list);
static void GetTaskStats(TaskProfileStats *stats, int index, ProfileData *data)
A class maintaning information for every <task, instance>
boost::optional< uint64_t > GetTaskDeferEntrySeqno() const
Addition/deletion of TaskEntry in the deferq_ is based on the seqno. seqno of the first Task in the w...
bool DeleteFromWaitQ(Task *t)
TaskEntryList policyq_
Policy rules for a task.
void SetDisable(bool disable)
void RunDeferQForGroupEnable()
Starts executing tasks from deferq_ of TaskEntries which are enabled.
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskEntry.
void AddPolicy(TaskEntry *entry)
int GetTaskInstance() const
TaskWaitQ waitq_
Tasks waiting to run on some condition.
TaskEntry * ActiveEntryInPolicy()
TaskDeferList * deferq_
Tasks deferred for this to exit.
TaskStats * GetTaskStats()
DISALLOW_COPY_AND_ASSIGN(TaskEntry)
boost::intrusive::list< Task, WaitQHook > TaskWaitQ
void GetSandeshData(SandeshTaskEntry *resp) const
Task * run_task_
Task currently running.
void TaskExited(Task *t, TaskGroup *group)
TaskGroup * deferq_task_group_
int run_count_
No. of tasks running.
bool DeferOnPolicyFail(Task *t)
TaskStats stats_
Cummulative Maintenance stats.
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>, &TaskEntry::task_defer_node > TaskDeferListOption
boost::intrusive::member_hook< Task, boost::intrusive::list_member_hook<>, &Task::waitq_hook_ > WaitQHook
List of Task's in waitq_.
void DeleteFromDeferQ(TaskEntry &entry)
Deletes a task from deferq_.
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
TaskEntry * deferq_task_entry_
void RunCombinedDeferQ()
Starts executing tasks from deferq_ of TaskEntry and TaskGroup in the temporal order.
void AddToDeferQ(TaskEntry *entry)
Adds a task to deferq_. Only one task of a given instance goes into deferq_ for its policies.
void RunTask(Task *t)
Starts a task. If there are more entries in waitq_ add them to deferq_.
boost::intrusive::set_member_hook task_defer_node
TaskGroup maintains per <task-id> information including,.
DISALLOW_COPY_AND_ASSIGN(TaskGroup)
void SetDisable(bool disable)
void IncrementTotalRunTime(int64_t rtime)
void ClearTaskGroupStats()
bool DeferOnPolicyFail(TaskEntry *entry, Task *t)
void RunDisableEntries()
Run tasks that maybe suspended. Schedule tasks only for TaskEntries which are enabled.
void AddPolicy(TaskGroup *group)
TaskEntry * disable_entry_
Task entry for disabled group.
tbb::atomic< uint64_t > total_run_time_
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskGroup.
void AddToDisableQ(TaskEntry *entry)
Enqueue TaskEntry in disable_entry's deferQ.
void DeleteFromDeferQ(TaskEntry &entry)
Delete task from deferq_.
static const int kVectorGrowSize
bool IsWaitQEmpty()
Returns true, if the waiq_ of all the tasks in the group are empty.
void AddEntriesToDisableQ()
Add TaskEntries to disable_entry_ which have tasks enqueued and are already disabled.
void GetSandeshData(SandeshTaskGroup *resp, bool summary) const
void AddToDeferQ(TaskEntry *entry)
Add task to deferq_ Only one task of a given instance goes into deferq_ for its policies.
std::vector< TaskGroup * > TaskGroupPolicyList
Vector of Task Group policies.
TaskEntry * GetDisableEntry()
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>, &TaskEntry::task_defer_node > TaskDeferListOption
TaskStats * GetTaskStats()
TaskDeferList deferq_
Tasks deferred till run_count_ is 0.
bool policy_set_
Specifies if policy is already set.
TaskEntry * GetTaskEntry(int task_instance)
TaskEntryList task_entry_db_
task-entries in this group
TaskEntry * task_entry_
Tasks deferred till run_count_ is 0.
int run_count_
No. of tasks running in the group.
TaskEntry * QueryTaskEntry(int task_instance) const
size_t deferq_size() const
TaskGroupPolicyList policy_
Policy rules for the group.
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
TaskGroup * ActiveGroupInPolicy()
TaskStats * GetTaskGroupStats()
A private class used to implement tbb::task An object is created when task is ready for execution and...
DISALLOW_COPY_AND_ASSIGN(TaskImpl)
virtual ~TaskImpl()
Destructor is called when a task execution is compeleted. Invoked implicitly by tbb::task....
tbb::task * execute()
Method called from tbb::task to execute. Invoke Run() method of client. Supports task continuation wh...
void Start(EventManager *evm)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void EnqueueUnLocked(Task *task)
bool IsTaskGroupEmpty(int task_id) const
Check if there are any Tasks in the given TaskGroup. Assumes that all task ids are mutually exclusive...
void Stop()
Stops scheduling of all tasks.
TaskTbbKeepAwake * tbb_awake_task_
tbb::reader_writer_lock id_map_mutex_
static boost::scoped_ptr< TaskScheduler > singleton_
void EnableTaskGroup(int task_id)
int GetTaskId(const std::string &name)
static int GetThreadCount(int thread_count=0)
Get number of tbb worker threads. For testing purposes only. Limit the number of tbb worker threads.
tbb::task_scheduler_init task_scheduler_
void Log(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)
TaskGroup * QueryTaskGroup(int task_id)
Query TaskGroup for a task_id.Assumes valid entry is present for task_id.
static void SetThreadAmpFactor(int n)
following function allows one to increase max num of threads used by TBB
boost::function< void(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)> LogFn
TaskMonitor * task_monitor_
void GetSandeshData(SandeshTaskScheduler *resp, bool summary)
int CountThreadsPerPid(pid_t pid)
Platfrom-dependent subroutine in Linux and FreeBSD implementations, used only in TaskScheduler::WaitF...
TaskScheduler(int thread_count=0)
TaskScheduler constructor. TBB assumes it can use the "thread" invoking tbb::scheduler can be used fo...
void WaitForTerminateCompletion()
void DisableTaskEntry(int task_id, int instance_id)
void DisableTaskGroup(int task_id)
uint32_t execute_delay_
Log if time taken to execute exceeds the delay.
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static int GetDefaultThreadCount()
TaskStats * GetTaskGroupStats(int task_id)
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec, uint64_t inactivity_time_msec, uint64_t poll_interval_msec)
Enable Task monitoring.
TaskGroupDb task_group_db_
~TaskScheduler()
Frees up the task_entry_db_ allocated for scheduler.
static void Initialize(uint32_t thread_count=0, EventManager *evm=NULL)
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
std::string GetTaskName(int task_id) const
void EnableTaskEntry(int task_id, int instance_id)
TaskEntry * GetTaskEntry(int task_id, int instance_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
void SetLatencyThreshold(const std::string &name, uint32_t execute, uint32_t schedule)
static TaskScheduler * GetInstance()
static int ThreadAmpFactor_
following variable allows one to increase max num of threads used by TBB
void Start()
Starts scheduling of all tasks.
TaskGroup * GetTaskGroup(int task_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
TaskEntry * QueryTaskEntry(int task_id, int instance_id)
Query TaskEntry for a task-id and task-instance.
uint32_t schedule_delay_
Log if time between enqueue and task-execute exceeds the delay.
void ClearTaskStats(int task_id)
TaskStats * GetTaskStats(int task_id)
void OnTaskExit(Task *task)
Method invoked on exit of a Task. Exit of a task can potentially start tasks in pendingq.
bool track_run_time() const
bool use_spawn_
Use spawn() to run a tbb::task instead of enqueue()
void RegisterLog(LogFn fn)
static bool ShouldUseSpawn()
void set_event_manager(EventManager *evm)
uint32_t schedule_delay() const
void Print()
Debug print routine.
uint32_t execute_delay() const
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
static const int kVectorGrowSize
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true,...
void ClearTaskGroupStats(int task_id)
void EnableLatencyThresholds(uint32_t execute, uint32_t schedule)
Enable logging of tasks exceeding configured latency.
void SetRunningTask(Task *)
This function should not be called in production code. It is only for unit testing to control current...
bool StartTbbKeepAwakeTask(TaskScheduler *ts, EventManager *event_mgr, const std::string task_name, uint32_t tbbKeepawakeTimeout=1000)
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
void ShutTbbKeepAwakeTask()
Task is a wrapper over tbb::task to support policies.
int GetTaskInstance() const
static Task * Running()
Returns a pointer to the current task the code is executing under.
void SetTbbState(TbbState s)
int task_id_
The code path executed by the task.
int task_instance_
The dataset id within a code path.
void StartTask(TaskScheduler *scheduler)
Starts execution of a task.
void SetSeqNo(uint64_t seqno)
virtual void OnTaskCancel()
Called on task exit, if it is marked for cancellation. If the user wants to do any cleanup on task ca...
uint64_t GetSeqno() const
Task(int task_id, int task_instance)
boost::intrusive::list_member_hook waitq_hook_
#define LOG(_Level, _Msg)
Comparison routine for the TaskDeferList.
uint64_t total_tasks_completed_
Number of total tasks ran.
uint64_t last_exit_time_
Number of time stamp of latest exist.
int run_count_
Number of entries currently running.
int defer_count_
Number of entries in deferq.
int wait_count_
Number of entries in waitq.
uint64_t enqueue_count_
Number of tasks enqueued.
tbb::enumerable_thread_specific< Task * > TaskInfo
#define TASK_TRACE(scheduler, task, msg, delay)
std::vector< TaskEntry * > TaskEntryList
ostream & operator<<(ostream &out, const Task &t)
static TaskInfo task_running
std::vector< TaskExclusion > TaskPolicy
#define CHECK_CONCURRENCY(...)
static uint64_t ClockMonotonicUsec()
static const std::string duration_usecs_to_string(const uint64_t usecs)
static uint64_t UTCTimestampUsec()