9 #include <boost/intrusive/set.hpp>
10 #include <boost/optional.hpp>
12 #include "tbb/atomic.h"
14 #include "tbb/enumerable_thread_specific.h"
21 #include <base/sandesh/task_types.h>
30 typedef tbb::enumerable_thread_specific<Task *>
TaskInfo;
39 #define TASK_TRACE(scheduler, task, msg, delay)\
41 scheduler->Log(__FILE__, __LINE__, task, msg, delay);\
98 TaskEntry(
int task_id,
int task_instance);
103 void AddToWaitQ(
Task *t);
104 bool DeleteFromWaitQ(
Task *t);
114 bool DeferOnPolicyFail(
Task *t);
118 void RunTask(
Task *t);
125 void RunCombinedDeferQ();
127 void RunDeferEntry();
131 void RunDeferQForGroupEnable();
135 void ClearTaskStats();
143 boost::optional<uint64_t> GetTaskDeferEntrySeqno()
const;
150 void GetSandeshData(SandeshTaskEntry *resp)
const;
157 typedef boost::intrusive::member_hook<
Task,
159 typedef boost::intrusive::list<Task, WaitQHook>
TaskWaitQ;
162 typedef boost::intrusive::member_hook<
TaskEntry,
163 boost::intrusive::set_member_hook<>,
227 TaskEntry *QueryTaskEntry(
int task_instance)
const;
228 TaskEntry *GetTaskEntry(
int task_instance);
240 void AddEntriesToDisableQ();
265 void RunDisableEntries();
266 void TaskExited(
Task *t);
273 void ClearTaskGroupStats();
274 void ClearTaskStats();
275 void ClearTaskStats(
int instance_id);
278 void GetSandeshData(SandeshTaskGroup *resp,
bool summary)
const;
284 for (TaskEntryList::const_iterator it = task_entry_db_.begin();
285 it != task_entry_db_.end(); ++it) {
299 typedef boost::intrusive::member_hook<
TaskEntry,
300 boost::intrusive::set_member_hook<>,
351 if (parent_->enqueue_time() != 0) {
354 if ((t - parent_->enqueue_time()) >
356 TASK_TRACE(scheduler, parent_,
"TBB schedule time(in usec) ",
357 (t - parent_->enqueue_time()));
362 bool is_complete = parent_->Run();
367 if (execute_delay && delay > execute_delay) {
368 TASK_TRACE(scheduler, parent_,
"Run time(in usec) ", delay);
377 if (is_complete ==
true) {
378 parent_->SetTaskComplete();
380 parent_->SetTaskRecycle();
382 }
catch (std::exception &e) {
386 static std::string what = e.what();
388 LOG(ERROR,
"!!!! ERROR !!!! Task caught fatal exception: " << what
389 <<
" TaskImpl: " <<
this);
392 LOG(ERROR,
"!!!! ERROR !!!! Task caught fatal unknown exception"
393 <<
" TaskImpl: " <<
this);
401 assert(parent_ != NULL);
409 static int num_cores_;
415 char *num_cores_str = getenv(
"TBB_THREAD_COUNT");
416 if (!num_cores_str) {
417 if (thread_count == 0)
418 num_cores_ = tbb::task_scheduler_init::default_num_threads();
420 num_cores_ = thread_count;
422 num_cores_ = strtol(num_cores_str, NULL, 0);
430 return tbb::task_scheduler_init::default_num_threads();
434 if (getenv(
"TBB_USE_SPAWN"))
460 if ((group = *iter) == NULL) {
467 for (TaskIdMap::iterator loc =
id_map_.begin(); loc !=
id_map_.end();
487 singleton_.get()->tbb_awake_task_->StartTbbKeepAwakeTask(
489 "TaskScheduler::TbbKeepAwake");
501 "TaskScheduler::TbbKeepAwake");
512 uint64_t tbb_keepawake_time_msec,
513 uint64_t inactivity_time_msec,
514 uint64_t poll_interval_msec) {
519 inactivity_time_msec, poll_interval_msec);
524 const Task *
task,
const char *description,
526 if (
log_fn_.empty() ==
false) {
527 log_fn_(file_name, line_no, task, description, delay);
555 assert(task_id >= 0);
557 if (size <= task_id) {
576 tbb::mutex::scoped_lock lock(
mutex_);
603 uint32_t execute, uint32_t schedule) {
611 tbb::mutex::scoped_lock lock(
mutex_);
617 for (TaskPolicy::iterator it = policy.begin(); it != policy.end(); ++it) {
619 if (it->match_instance == -1) {
637 tbb::mutex::scoped_lock lock(
mutex_);
700 tbb::mutex::scoped_lock lock(
mutex_);
710 Task *first_wait_task = &(*entry->
waitq_.begin());
711 TaskEntry *disable_entry = group->GetDisableEntry();
720 }
else if (group->IsDisabled()) {
728 }
else if (t == first_wait_task) {
737 assert(deferq_tentry == NULL);
740 }
else if (deferq_tentry) {
743 }
else if (group->IsDisabled()) {
763 tbb::mutex::scoped_lock lock(
mutex_);
793 tbb::mutex::scoped_lock lock(
mutex_);
799 tbb::mutex::scoped_lock lock(
mutex_);
816 cout <<
"id: " << group->
task_id() <<
819 " task count: " << group->
num_tasks() << endl;
826 tbb::mutex::scoped_lock lock(
mutex_);
830 if ((group = *it) == NULL) {
839 if ((
false == running_only) && (
false == group->
IsWaitQEmpty())) {
847 for (TaskIdMap::const_iterator it =
id_map_.begin(); it !=
id_map_.end();
849 if (task_id == it->second)
861 tbb::reader_writer_lock::scoped_lock_read lock(
id_map_mutex_);
862 TaskIdMap::iterator loc =
id_map_.find(name);
871 id_map_.insert(make_pair(name, tid));
927 #if defined(__linux__)
928 std::ostringstream file_name;
931 file_name <<
"/proc/" << pid <<
"/status";
933 std::ifstream file(file_name.str().c_str());
936 LOG(ERROR,
"opening /proc failed");
940 while (threads == 0 && file.good()) {
942 if (line ==
"Threads:\t1") threads = 1;
946 #error "TaskScheduler::CountThreadsPerPid() - unsupported platform."
960 pid_t pid = getpid();
962 while (count++ < 12000) {
965 if (threadsRunning == 1)
968 if (threadsRunning == -1) {
969 LOG(ERROR,
"could not check if any thread is running");
985 for (
int i = 0; i < 10000; i++) {
1085 if (task_instance == -1)
1089 if (size <= task_instance) {
1094 if (entry == NULL) {
1103 if (task_instance == -1) {
1118 for (TaskGroupPolicyList::iterator it =
policy_.begin();
1120 if ((*it)->run_count_ != 0) {
1166 TaskDeferList::iterator it;
1171 TaskDeferList::iterator it_work = it++;
1197 if ((entry = *it) == NULL) {
1217 if ((entry = *it) == NULL) {
1264 task_instance_(task_instance), run_count_(0), run_task_(NULL),
1265 waitq_(), deferq_task_entry_(NULL), deferq_task_group_(NULL),
1269 if (task_instance != -1) {
1278 task_instance_(-1), run_count_(0), run_task_(NULL),
1279 deferq_task_entry_(NULL), deferq_task_group_(NULL), disable_(false) {
1299 if ((*it)->run_count_ != 0) {
1335 TaskWaitQ::iterator it =
waitq_.iterator_to(*t);
1373 TaskWaitQ::iterator it =
waitq_.begin();
1380 if (
waitq_.size() != 0) {
1385 while (it !=
waitq_.end()) {
1419 TaskDeferList::iterator it;
1422 while (it !=
deferq_->end()) {
1424 TaskDeferList::iterator it_work = it++;
1433 TaskDeferList::iterator it;
1436 while (it !=
deferq_->end()) {
1438 TaskDeferList::iterator it_work = it++;
1453 TaskDeferList::iterator group_it = group->
deferq_.begin();
1454 TaskDeferList::iterator entry_it =
deferq_->begin();
1458 while ((group_it != group->
deferq_.end()) &&
1459 (entry_it !=
deferq_->end())) {
1463 if (defer_entry_compare(g_entry, t_entry)) {
1464 TaskDeferList::iterator group_it_work = group_it++;
1468 TaskDeferList::iterator entry_it_work = entry_it++;
1475 if (group_it != group->
deferq_.end()) {
1477 }
else if (entry_it !=
deferq_->end()) {
1530 task_instance_(task_instance), task_impl_(NULL), state_(INIT),
1531 tbb_state_(TBB_INIT),
seqno_(0), task_recycle_(false), task_cancel_(false),
1536 task_instance_(-1), task_impl_(NULL), state_(INIT), tbb_state_(TBB_INIT),
1537 seqno_(0), task_recycle_(false), task_cancel_(false), enqueue_time_(0),
1547 TASK_TRACE(scheduler,
this,
"Schedule delay(in usec) ",
1581 resp->set_waitq_size(
waitq_.size());
1582 resp->set_deferq_size(
deferq_->size());
1589 std::vector<SandeshTaskEntry> list;
1592 SandeshTaskEntry entry_resp;
1594 list.push_back(entry_resp);
1600 SandeshTaskEntry entry_resp;
1602 list.push_back(entry_resp);
1605 resp->set_task_entry_list(list);
1611 std::vector<SandeshTaskPolicyEntry> policy_list;
1612 for (TaskGroupPolicyList::const_iterator it =
policy_.begin();
1614 SandeshTaskPolicyEntry policy_entry;
1615 policy_entry.set_task_name(scheduler->
GetTaskName((*it)->task_id_));
1616 policy_entry.set_tasks_running((*it)->run_count_);
1617 policy_list.push_back(policy_entry);
1619 resp->set_task_policy_list(policy_list);
1623 tbb::mutex::scoped_lock lock(
mutex_);
1627 resp->set_total_count(
seqno_);
1630 std::vector<SandeshTaskGroup> list;
1631 for (TaskIdMap::const_iterator it =
id_map_.begin(); it !=
id_map_.end();
1633 SandeshTaskGroup resp_group;
1635 resp_group.set_task_id(it->second);
1636 resp_group.set_name(it->first);
1639 list.push_back(resp_group);
1641 resp->set_task_group_list(list);
int task_id_
The code path executed by the task.
void DisableTaskEntry(int task_id, int instance_id)
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec, uint64_t inactivity_time_msec, uint64_t poll_interval_msec)
Enable Task monitoring.
TaskGroupPolicyList policy_
Policy rules for the group.
int wait_count_
Number of entries in waitq.
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
TaskMonitor * task_monitor_
std::vector< TaskGroup * > TaskGroupPolicyList
Vector of Task Group policies.
TaskEntry * QueryTaskEntry(int task_id, int instance_id)
Query TaskEntry for a task-id and task-instance.
static const int kVectorGrowSize
void GetSandeshData(SandeshTaskScheduler *resp, bool summary)
std::vector< TaskEntry * > TaskEntryList
Task * run_task_
Task currently running.
void EnableTaskEntry(int task_id, int instance_id)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
static int GetDefaultThreadCount()
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskGroup.
static void Initialize(uint32_t thread_count=0, EventManager *evm=NULL)
static Task * Running()
Returns a pointer to the current task the code is executing under.
TaskEntry * GetTaskEntry(int task_id, int instance_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
uint64_t total_tasks_completed_
Number of total tasks ran.
int run_count_
No. of tasks running.
void AddToDeferQ(TaskEntry *entry)
Add task to deferq_ Only one task of a given instance goes into deferq_ for its policies.
bool IsWaitQEmpty()
Returns true, if the waiq_ of all the tasks in the group are empty.
boost::optional< uint64_t > GetTaskDeferEntrySeqno() const
Addition/deletion of TaskEntry in the deferq_ is based on the seqno. seqno of the first Task in the w...
void RunDeferQForGroupEnable()
Starts executing tasks from deferq_ of TaskEntries which are enabled.
uint32_t schedule_delay() const
static boost::scoped_ptr< TaskScheduler > singleton_
TaskEntry * ActiveEntryInPolicy()
TaskStats * GetTaskStats(int task_id)
tbb::reader_writer_lock id_map_mutex_
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
uint32_t schedule_delay_
Log if time between enqueue and task-execute exceeds the delay.
void SetTbbState(TbbState s)
uint32_t execute_delay() const
std::ostream & operator<<(std::ostream &out, BFDState state)
void AddToDisableQ(TaskEntry *entry)
Enqueue TaskEntry in disable_entry's deferQ.
int run_count_
No. of tasks running in the group.
uint32_t execute_delay() const
TaskEntry * deferq_task_entry_
virtual void OnTaskCancel()
Called on task exit, if it is marked for cancellation. If the user wants to do any cleanup on task ca...
TaskGroupDb task_group_db_
TaskGroup * QueryTaskGroup(int task_id)
Query TaskGroup for a task_id.Assumes valid entry is present for task_id.
void Start()
Starts scheduling of all tasks.
void DeleteFromDeferQ(TaskEntry &entry)
Deletes a task from deferq_.
TaskEntry * QueryTaskEntry(int task_instance) const
TaskStats * GetTaskGroupStats(int task_id)
void set_event_manager(EventManager *evm)
TaskEntryList policyq_
Policy rules for a task.
int run_count_
Number of entries currently running.
void SetDisable(bool disable)
void RunCombinedDeferQ()
Starts executing tasks from deferq_ of TaskEntry and TaskGroup in the temporal order.
TaskScheduler(int thread_count=0)
TaskScheduler constructor. TBB assumes it can use the "thread" invoking tbb::scheduler can be used fo...
void IncrementTotalRunTime(int64_t rtime)
void SetDisable(bool disable)
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
void RunDisableEntries()
Run tasks that maybe suspended. Schedule tasks only for TaskEntries which are enabled.
bool DeleteFromWaitQ(Task *t)
TaskGroup maintains per <task-id> information including,.
Comparison routine for the TaskDeferList.
int GetTaskId(const std::string &name)
A class maintaning information for every <task, instance>
void StartTask(TaskScheduler *scheduler)
Starts execution of a task.
int CountThreadsPerPid(pid_t pid)
Platfrom-dependent subroutine in Linux and FreeBSD implementations, used only in TaskScheduler::WaitF...
void EnableLatencyThresholds(uint32_t execute, uint32_t schedule)
Enable logging of tasks exceeding configured latency.
TaskEntry * disable_entry_
Task entry for disabled group.
bool track_run_time() const
uint64_t last_exit_time_
Number of time stamp of latest exist.
TaskWaitQ waitq_
Tasks waiting to run on some condition.
void Print()
Debug print routine.
void WaitForTerminateCompletion()
TaskGroup * deferq_task_group_
TaskEntry * GetTaskEntry(int task_instance)
static void SetThreadAmpFactor(int n)
following function allows one to increase max num of threads used by TBB
void RegisterLog(LogFn fn)
TaskStats * GetTaskGroupStats()
TaskStats stats_
Cummulative Maintenance stats.
bool policy_set_
Specifies if policy is already set.
TaskStats * GetTaskStats()
tbb::task_scheduler_init task_scheduler_
tbb::task * execute()
Method called from tbb::task to execute. Invoke Run() method of client. Supports task continuation wh...
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
A private class used to implement tbb::task An object is created when task is ready for execution and...
void AddPolicy(TaskEntry *entry)
boost::intrusive::list_member_hook waitq_hook_
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
boost::intrusive::list< Task, WaitQHook > TaskWaitQ
#define CHECK_CONCURRENCY(...)
int GetTaskInstance() const
void AddEntriesToDisableQ()
Add TaskEntries to disable_entry_ which have tasks enqueued and are already disabled.
static const std::string duration_usecs_to_string(const uint64_t usecs)
TaskTbbKeepAwake * tbb_awake_task_
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true, enqueued tasks are ignored i.e. return true if there are no running tasks. Ignore TaskGroup or TaskEntry if it is disabled.
#define DISALLOW_COPY_AND_ASSIGN(_Class)
void SetSeqNo(uint64_t seqno)
int GetTaskInstance() const
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
bool StartTbbKeepAwakeTask(TaskScheduler *ts, EventManager *event_mgr, const std::string task_name, uint32_t tbbKeepawakeTimeout=1000)
boost::intrusive::member_hook< Task, boost::intrusive::list_member_hook<>,&Task::waitq_hook_ > WaitQHook
List of Task's in waitq_.
void AddToDeferQ(TaskEntry *entry)
Adds a task to deferq_. Only one task of a given instance goes into deferq_ for its policies...
static void GetTaskStats(TaskProfileStats *stats, int index, ProfileData *data)
boost::intrusive::set_member_hook task_defer_node
int defer_count_
Number of entries in deferq.
void SetRunningTask(Task *)
This function should not be called in production code. It is only for unit testing to control current...
void ClearTaskGroupStats(int task_id)
std::vector< TaskExclusion > TaskPolicy
~TaskScheduler()
Frees up the task_entry_db_ allocated for scheduler.
virtual ~TaskImpl()
Destructor is called when a task execution is compeleted. Invoked implicitly by tbb::task. Invokes OnTaskExit to schedule tasks pending tasks.
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>,&TaskEntry::task_defer_node > TaskDeferListOption
bool DeferOnPolicyFail(Task *t)
void Start(EventManager *evm)
uint64_t enqueue_count_
Number of tasks enqueued.
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskEntry.
uint32_t execute_delay_
Log if time taken to execute exceeds the delay.
TaskGroup * GetTaskGroup(int task_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
TaskEntry * GetDisableEntry()
std::string GetTaskName(int task_id) const
TaskDeferList * deferq_
Tasks deferred for this to exit.
TaskStats * GetTaskStats()
static uint64_t UTCTimestampUsec()
uint64_t GetSeqno() const
void DisableTaskGroup(int task_id)
void Log(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)
#define TASK_TRACE(scheduler, task, msg, delay)
static int ThreadAmpFactor_
following variable allows one to increase max num of threads used by TBB
void RunTask(Task *t)
Starts a task. If there are more entries in waitq_ add them to deferq_.
static const int kVectorGrowSize
static bool ShouldUseSpawn()
#define LOG(_Level, _Msg)
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
void ShutTbbKeepAwakeTask()
Task(int task_id, int task_instance)
static uint64_t ClockMonotonicUsec()
void Stop()
Stops scheduling of all tasks.
size_t deferq_size() const
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>,&TaskEntry::task_defer_node > TaskDeferListOption
int task_instance_
The dataset id within a code path.
void EnableTaskGroup(int task_id)
bool use_spawn_
Use spawn() to run a tbb::task instead of enqueue()
TaskDeferList deferq_
Tasks deferred till run_count_ is 0.
bool DeferOnPolicyFail(TaskEntry *entry, Task *t)
void SetLatencyThreshold(const std::string &name, uint32_t execute, uint32_t schedule)
void DeleteFromDeferQ(TaskEntry &entry)
Delete task from deferq_.
tbb::atomic< uint64_t > total_run_time_
void OnTaskExit(Task *task)
Method invoked on exit of a Task. Exit of a task can potentially start tasks in pendingq.
void TaskExited(Task *t, TaskGroup *group)
TaskGroup * ActiveGroupInPolicy()
bool IsTaskGroupEmpty(int task_id) const
Check if there are any Tasks in the given TaskGroup. Assumes that all task ids are mutually exclusive...
void GetSandeshData(SandeshTaskEntry *resp) const
static TaskInfo task_running
void AddPolicy(TaskGroup *group)
Task is a wrapper over tbb::task to support policies.
TaskEntry * task_entry_
Tasks deferred till run_count_ is 0.
uint32_t schedule_delay() const
void EnqueueUnLocked(Task *task)
void ClearTaskGroupStats()
static int GetThreadCount(int thread_count=0)
Get number of tbb worker threads. For testing purposes only. Limit the number of tbb worker threads...
TaskEntryList task_entry_db_
task-entries in this group
void GetSandeshData(SandeshTaskGroup *resp, bool summary) const
boost::function< void(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)> LogFn
void ClearTaskStats(int task_id)
tbb::enumerable_thread_specific< Task * > TaskInfo