OpenSDN source code
task.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <assert.h>
6 #include <fstream>
7 #include <map>
8 #include <iostream>
9 #include <atomic>
10 #include <boost/intrusive/set.hpp>
11 #include <boost/optional.hpp>
12 
13 #include "tbb/task.h"
14 #include "tbb/enumerable_thread_specific.h"
15 #include "base/logging.h"
16 #include "base/task.h"
17 #include "base/task_annotations.h"
18 #include "base/task_tbbkeepawake.h"
19 #include "base/task_monitor.h"
20 
21 #include <base/sandesh/task_types.h>
22 
23 using namespace std;
24 using tbb::task;
25 
27 class TaskEntry;
28 struct TaskDeferEntryCmp;
29 
30 typedef tbb::enumerable_thread_specific<Task *> TaskInfo;
31 
33 
34 // Vector of Task entries
35 typedef std::vector<TaskEntry *> TaskEntryList;
36 
37 boost::scoped_ptr<TaskScheduler> TaskScheduler::singleton_;
38 
39 #define TASK_TRACE(scheduler, task, msg, delay)\
40  do {\
41  scheduler->Log(__FILE__, __LINE__, task, msg, delay);\
42  } while (false)
43 
47 class TaskImpl : public tbb::task {
48 public:
49  TaskImpl(Task *t) : parent_(t) {};
50 
54  virtual ~TaskImpl();
55 
56 private:
57 
61  tbb::task *execute();
62 
64 
66 };
67 
95 class TaskEntry {
96 public:
97  TaskEntry(int task_id);
98  TaskEntry(int task_id, int task_instance);
99  ~TaskEntry();
100 
101  void AddPolicy(TaskEntry *entry);
102  size_t WaitQSize() const { return waitq_.size(); };
103  void AddToWaitQ(Task *t);
104  bool DeleteFromWaitQ(Task *t);
105 
108  void AddToDeferQ(TaskEntry *entry);
109 
111  void DeleteFromDeferQ(TaskEntry &entry);
112 
113  TaskEntry *ActiveEntryInPolicy();
114  bool DeferOnPolicyFail(Task *t);
115 
118  void RunTask(Task *t);
119 
121  void RunDeferQ();
122 
125  void RunCombinedDeferQ();
126  void RunWaitQ();
127  void RunDeferEntry();
128 
131  void RunDeferQForGroupEnable();
132 
133  void TaskExited(Task *t, TaskGroup *group);
135  void ClearTaskStats();
136  void ClearQueues();
137 
143  boost::optional<uint64_t> GetTaskDeferEntrySeqno() const;
144 
146  int task_code_id() const { return task_code_id_; }
147 
149  int task_data_id() const { return task_data_id_; }
150 
152  int GetRunCount() const { return run_count_; }
153 
155  void SetDisable(bool disable) { disable_ = disable; }
156  bool IsDisabled() { return disable_; }
157  void GetSandeshData(SandeshTaskEntry *resp) const;
158 
159 private:
160  friend class TaskGroup;
161  friend class TaskScheduler;
162 
164  typedef boost::intrusive::member_hook<Task,
165  boost::intrusive::list_member_hook<>, &Task::waitq_hook_> WaitQHook;
166  typedef boost::intrusive::list<Task, WaitQHook> TaskWaitQ;
167 
168  boost::intrusive::set_member_hook<> task_defer_node;
169  typedef boost::intrusive::member_hook<TaskEntry,
170  boost::intrusive::set_member_hook<>,
172 
176  typedef boost::intrusive::set<TaskEntry, TaskDeferListOption,
177  boost::intrusive::compare<TaskDeferEntryCmp> > TaskDeferList;
178 
181 
184 
187 
190 
193 
198  bool disable_;
199 
202 
204 };
205 
208  bool operator() (const TaskEntry &lhs, const TaskEntry &rhs) const {
209  return (lhs.GetTaskDeferEntrySeqno() <
210  rhs.GetTaskDeferEntrySeqno());
211  }
212 };
213 
229 class TaskGroup {
230 public:
231  TaskGroup(int task_id);
232  ~TaskGroup();
233 
234  TaskEntry *QueryTaskEntry(int task_instance) const;
235  TaskEntry *GetTaskEntry(int task_instance);
236  void AddPolicy(TaskGroup *group);
237 
240  void AddToDeferQ(TaskEntry *entry);
241 
243  void AddToDisableQ(TaskEntry *entry);
244 
247  void AddEntriesToDisableQ();
248 
249  TaskEntry *GetDisableEntry() { return disable_entry_; }
250 
252  void DeleteFromDeferQ(TaskEntry &entry);
253  TaskGroup *ActiveGroupInPolicy();
254  bool DeferOnPolicyFail(TaskEntry *entry, Task *t);
255 
263  bool IsWaitQEmpty();
264 
265  int TaskRunCount() const {return run_count_;};
266 
268  void RunDeferQ();
269 
272  void RunDisableEntries();
273  void TaskExited(Task *t);
274  void PolicySet();
275  void TaskStarted() {run_count_++;};
276  void IncrementTotalRunTime(int64_t rtime) { total_run_time_ += rtime; }
277  TaskStats *GetTaskGroupStats();
279  TaskStats *GetTaskStats(int task_instance);
280  void ClearTaskGroupStats();
281  void ClearTaskStats();
282  void ClearTaskStats(int instance_id);
283  void SetDisable(bool disable) { disable_ = disable; }
284  bool IsDisabled() { return disable_; }
285  void GetSandeshData(SandeshTaskGroup *resp, bool summary) const;
286 
287  int task_id() const { return task_code_id_; }
288  size_t deferq_size() const { return deferq_.size(); }
289  size_t num_tasks() const {
290  size_t count = 0;
291  for (TaskEntryList::const_iterator it = task_entry_db_.begin();
292  it != task_entry_db_.end(); ++it) {
293  if (*it != NULL) {
294  count++;
295  }
296  }
297  return count;
298  }
299 
300 private:
301  friend class TaskEntry;
302  friend class TaskScheduler;
303 
305  typedef std::vector<TaskGroup *> TaskGroupPolicyList;
306  typedef boost::intrusive::member_hook<TaskEntry,
307  boost::intrusive::set_member_hook<>,
309 
313  typedef boost::intrusive::set<TaskEntry, TaskDeferListOption,
314  boost::intrusive::compare<TaskDeferEntryCmp> > TaskDeferList;
315 
316  static const int kVectorGrowSize = 16;
318 
321 
324  std::atomic<uint64_t> total_run_time_;
325 
328 
331 
334 
337 
340  uint32_t execute_delay_;
341  uint32_t schedule_delay_;
342  bool disable_;
343 
346 };
347 
349 // Implementation for class TaskImpl
351 
353  TaskInfo::reference running = task_running.local();
354  running = parent_;
355  parent_->tbb_state(Task::TBB_EXEC);
356  try {
357  uint64_t t = 0;
358  if (parent_->enqueue_time() != 0) {
359  t = ClockMonotonicUsec();
361  if ((t - parent_->enqueue_time()) >
362  scheduler->schedule_delay(parent_)) {
363  TASK_TRACE(scheduler, parent_, "TBB schedule time(in usec) ",
364  (t - parent_->enqueue_time()));
365  }
366  } else if (TaskScheduler::GetInstance()->track_run_time()) {
367  t = ClockMonotonicUsec();
368  }
369  bool is_complete = parent_->Run();
370  if (t != 0) {
371  int64_t delay = ClockMonotonicUsec() - t;
373  uint32_t execute_delay = scheduler->execute_delay(parent_);
374  if (execute_delay && delay > execute_delay) {
375  TASK_TRACE(scheduler, parent_, "Run time(in usec) ", delay);
376  }
377  if (scheduler->track_run_time()) {
378  TaskGroup *group =
379  scheduler->QueryTaskGroup(parent_->task_code_id());
380  group->IncrementTotalRunTime(delay);
381  }
382  }
383  running = NULL;
384  if (is_complete == true) {
385  parent_->set_task_complete();
386  } else {
387  parent_->set_task_recycle();
388  }
389  } catch (std::exception &e) {
390 
391  // Store exception information statically, to easily read exception
392  // information from the core.
393  static std::string what = e.what();
394 
395  LOG(ERROR, "!!!! ERROR !!!! Task caught fatal exception: " << what
396  << " TaskImpl: " << this);
397  assert(0);
398  } catch (...) {
399  LOG(ERROR, "!!!! ERROR !!!! Task caught fatal unknown exception"
400  << " TaskImpl: " << this);
401  assert(0);
402  }
403 
404  return NULL;
405 }
406 
408  assert(parent_ != NULL);
409 
411  sched->OnTaskExit(parent_);
412 }
413 
414 int TaskScheduler::GetThreadCount(int thread_count) {
415  static bool init_;
416  static int num_cores_;
417 
418  if (init_) {
419  return num_cores_ * ThreadAmpFactor_;
420  }
421 
422  char *num_cores_str = getenv("TBB_THREAD_COUNT");
423  if (!num_cores_str) {
424  if (thread_count == 0)
425  num_cores_ = tbb::task_scheduler_init::default_num_threads();
426  else
427  num_cores_ = thread_count;
428  } else {
429  num_cores_ = strtol(num_cores_str, NULL, 0);
430  }
431 
432  init_ = true;
433  return num_cores_ * ThreadAmpFactor_;
434 }
435 
437  return tbb::task_scheduler_init::default_num_threads();
438 }
439 
441  if (getenv("TBB_USE_SPAWN"))
442  return true;
443 
444  return false;
445 }
446 
448 // Implementation for class TaskScheduler
450 
453  running_(true), seqno_(0), id_max_(0), log_fn_(), track_run_time_(false),
455  enqueue_count_(0), done_count_(0), cancel_count_(0), evm_(NULL),
456  tbb_awake_task_(NULL), task_monitor_(NULL) {
457  hw_thread_count_ = GetThreadCount(task_count);
459  stop_entry_ = new TaskEntry(-1);
460 }
461 
463  TaskGroup *group;
464 
465  for (TaskGroupDb::iterator iter = task_group_db_.begin();
466  iter != task_group_db_.end(); ++iter) {
467  if ((group = *iter) == NULL) {
468  continue;
469  }
470  *iter = NULL;
471  delete group;
472  }
473 
474  for (TaskIdMap::iterator loc = id_map_.begin(); loc != id_map_.end();
475  id_map_.erase(loc++)) {
476  }
477 
478  delete stop_entry_;
479  stop_entry_ = NULL;
480  task_group_db_.clear();
481 
482  return;
483 }
484 
485 void TaskScheduler::Initialize(uint32_t thread_count, EventManager *evm) {
486  assert(singleton_.get() == NULL);
487  singleton_.reset(new TaskScheduler((int)thread_count));
488 
489  if (evm) {
490  singleton_.get()->evm_ = evm;
491  singleton_.get()->tbb_awake_task_ = new TaskTbbKeepAwake();
492  assert(singleton_.get()->tbb_awake_task_);
493 
494  singleton_.get()->tbb_awake_task_->StartTbbKeepAwakeTask(
495  singleton_.get(), evm,
496  "TaskScheduler::TbbKeepAwake");
497  }
498 }
499 
501  assert(evm);
502  evm_ = evm;
503  if (tbb_awake_task_ == NULL) {
505  assert(tbb_awake_task_);
506 
508  "TaskScheduler::TbbKeepAwake");
509  }
510 }
511 
513  if (tbb_awake_task_) {
515  }
516 }
517 
519  uint64_t tbb_keepawake_time_msec,
520  uint64_t inactivity_time_msec,
521  uint64_t poll_interval_msec) {
522  if (task_monitor_ != NULL)
523  return;
524 
525  task_monitor_ = new TaskMonitor(this, tbb_keepawake_time_msec,
526  inactivity_time_msec, poll_interval_msec);
528 }
529 
530 void TaskScheduler::Log(const char *file_name, uint32_t line_no,
531  const Task *task, const char *description,
532  uint64_t delay) {
533  if (log_fn_.empty() == false) {
534  log_fn_(file_name, line_no, task, description, delay);
535  }
536 }
537 
539  log_fn_ = fn;
540 }
541 
543  if (task->schedule_delay() > schedule_delay_)
544  return task->schedule_delay();
545  return schedule_delay_;
546 }
547 
549  if (task->execute_delay() > execute_delay_)
550  return task->execute_delay();
551  return execute_delay_;
552 }
553 
555  if (singleton_.get() == NULL) {
556  singleton_.reset(new TaskScheduler());
557  }
558  return singleton_.get();
559 }
560 
562  assert(task_id >= 0);
563  int size = task_group_db_.size();
564  if (size <= task_id) {
566  }
567 
568  TaskGroup *group = task_group_db_[task_id];
569  if (group == NULL) {
570  group = new TaskGroup(task_id);
571  task_group_db_[task_id] = group;
572  }
573 
574  return group;
575 }
576 
578  return task_group_db_[task_id];
579 }
580 
581 bool TaskScheduler::IsTaskGroupEmpty(int task_id) const {
582  CHECK_CONCURRENCY("bgp::Config");
583  std::scoped_lock lock(mutex_);
584  TaskGroup *group = task_group_db_[task_id];
585  assert(group);
586  assert(group->TaskRunCount() == 0);
587  return group->IsWaitQEmpty();
588 }
589 
590 TaskEntry *TaskScheduler::GetTaskEntry(int task_id, int task_instance) {
591  TaskGroup *group = GetTaskGroup(task_id);
592  return group->GetTaskEntry(task_instance);
593 }
594 
595 TaskEntry *TaskScheduler::QueryTaskEntry(int task_id, int task_instance) {
596  TaskGroup *group = QueryTaskGroup(task_id);
597  if (group == NULL)
598  return NULL;
599  return group->QueryTaskEntry(task_instance);
600 }
601 
603  uint32_t schedule) {
604  execute_delay_ = execute;
605  schedule_delay_ = schedule;
607 }
608 
609 void TaskScheduler::SetLatencyThreshold(const std::string &name,
610  uint32_t execute, uint32_t schedule) {
611  int task_id = GetTaskId(name);
612  TaskGroup *group = GetTaskGroup(task_id);
613  group->execute_delay_ = execute;
614  group->schedule_delay_ = schedule;
615 }
616 
617 void TaskScheduler::SetPolicy(int task_id, TaskPolicy &policy) {
618  std::scoped_lock lock(mutex_);
619 
620  TaskGroup *group = GetTaskGroup(task_id);
621  TaskEntry *group_entry = group->GetTaskEntry(-1);
622  group->PolicySet();
623 
624  for (const auto& pol_item: policy) {
625  if (pol_item.match_data_id == -1) {
626  TaskGroup *policy_group = GetTaskGroup(pol_item.match_code_id);
627  group->AddPolicy(policy_group);
628  policy_group->AddPolicy(group);
629  } else {
630  TaskEntry *entry = GetTaskEntry(task_id, pol_item.match_data_id);
631  TaskEntry *policy_entry = GetTaskEntry(pol_item.match_code_id,
632  pol_item.match_data_id);
633  entry->AddPolicy(policy_entry);
634  policy_entry->AddPolicy(entry);
635 
636  group_entry->AddPolicy(policy_entry);
637  policy_entry->AddPolicy(group_entry);
638  }
639  }
640 }
641 
643  std::scoped_lock lock(mutex_);
644  EnqueueUnLocked(t);
645 }
646 
648  if (measure_delay_) {
650  }
651  // Ensure that task is enqueued only once.
652  assert(t->seqno() == 0);
653  enqueue_count_++;
654  t->seqno(++seqno_);
655  TaskGroup *group = GetTaskGroup(t->task_code_id());
656  t->schedule_delay_ = group->schedule_delay_;
657  t->execute_delay_ = group->execute_delay_;
658  group->stats_.enqueue_count_++;
659 
660  TaskEntry *entry = GetTaskEntry(t->task_code_id(), t->task_data_id());
661  entry->stats_.enqueue_count_++;
662  // If either TaskGroup or TaskEntry is disabled for Unit-Test purposes,
663  // enqueue new task in waitq and update TaskGroup if needed.
664  if (group->IsDisabled() || entry->IsDisabled()) {
665  entry->AddToWaitQ(t);
666  if (group->IsDisabled()) {
667  group->AddToDisableQ(entry);
668  }
669  return;
670  }
671 
672  // Add task to waitq_ if its already populated
673  if (entry->WaitQSize() != 0) {
674  entry->AddToWaitQ(t);
675  return;
676  }
677 
678  // Is scheduler stopped? Dont add task to deferq_ if scheduler is stopped.
679  // TaskScheduler::Start() will run tasks from waitq_
680  if (!running_) {
681  entry->AddToWaitQ(t);
682  stop_entry_->AddToDeferQ(entry);
683  return;
684  }
685 
686  // Check Task Group policy. On policy violation, DeferOnPolicyFail()
687  // adds the Task to the TaskEntry's waitq_ and the TaskEntry will be
688  // added to deferq_ of the matching TaskGroup.
689  if (group->DeferOnPolicyFail(entry, t)) {
690  return;
691  }
692 
693  // Check Task Entry policy. On policy violation, DeferOnPolicyFail()
694  // adds the Task to the TaskEntry's waitq_ and the TaskEntry will be
695  // added to deferq_ of the matching TaskEntry.
696  if (entry->DeferOnPolicyFail(t)) {
697  return;
698  }
699 
700  entry->RunTask(t);
701  return;
702 }
703 
705  std::scoped_lock lock(mutex_);
706 
707  // If the task is in RUN state, mark the task for cancellation and return.
708  if (t->state_ == Task::RUN) {
709  t->task_cancel_ = true;
710  } else if (t->state_ == Task::WAIT) {
711  TaskEntry *entry = QueryTaskEntry(t->task_code_id(), t->task_data_id());
712  TaskGroup *group = QueryTaskGroup(t->task_code_id());
713  assert(entry->WaitQSize());
714  // Get the first entry in the waitq_
715  Task *first_wait_task = &(*entry->waitq_.begin());
716  TaskEntry *disable_entry = group->GetDisableEntry();
717  assert(entry->DeleteFromWaitQ(t) == true);
718  // If the waitq_ is empty, then remove the TaskEntry from the deferq.
719  if (!entry->WaitQSize()) {
720  if (entry->deferq_task_group_) {
721  assert(entry->deferq_task_entry_ == NULL);
722  entry->deferq_task_group_->DeleteFromDeferQ(*entry);
723  } else if (entry->deferq_task_entry_) {
724  entry->deferq_task_entry_->DeleteFromDeferQ(*entry);
725  } else if (group->IsDisabled()) {
726  // Remove TaskEntry from deferq of disable_entry
727  disable_entry->DeleteFromDeferQ(*entry);
728  } else {
729  if (!entry->IsDisabled()) {
730  assert(0);
731  }
732  }
733  } else if (t == first_wait_task) {
734  // TaskEntry is inserted in the deferq_ based on the Task seqno.
735  // deferq_ comparison function uses the seqno of the first entry in
736  // the waitq_. Therefore, if the task to be cancelled is the first
737  // entry in the waitq_, then delete the entry from the deferq_ and
738  // add it again.
739  TaskGroup *deferq_tgroup = entry->deferq_task_group_;
740  TaskEntry *deferq_tentry = entry->deferq_task_entry_;
741  if (deferq_tgroup) {
742  assert(deferq_tentry == NULL);
743  deferq_tgroup->DeleteFromDeferQ(*entry);
744  deferq_tgroup->AddToDeferQ(entry);
745  } else if (deferq_tentry) {
746  deferq_tentry->DeleteFromDeferQ(*entry);
747  deferq_tentry->AddToDeferQ(entry);
748  } else if (group->IsDisabled()) {
749  // Remove TaskEntry from deferq of disable_entry and add back
750  disable_entry->DeleteFromDeferQ(*entry);
751  disable_entry->AddToDeferQ(entry);
752  } else {
753  if (!entry->IsDisabled()) {
754  assert(0);
755  }
756  }
757  }
758  delete t;
759  cancel_count_++;
760  return CANCELLED;
761  } else {
762  return FAILED;
763  }
764  return QUEUED;
765 }
766 
768  std::scoped_lock lock(mutex_);
769  done_count_++;
770 
772  TaskEntry *entry = QueryTaskEntry(t->task_code_id(), t->task_data_id());
773  entry->TaskExited(t, GetTaskGroup(t->task_code_id()));
774 
775  //
776  // Delete the task it is not marked for recycling or already cancelled.
777  //
778  if ((t->task_recycle_ == false) || (t->task_cancel_ == true)) {
779  // Delete the container Task object, if the
780  // task is not marked to be recycled (or)
781  // if the task is marked for cancellation
782  if (t->task_cancel_ == true) {
783  t->OnTaskCancel();
784  }
785  delete t;
786  return;
787  }
788 
789  // Task is being recycled, reset the state, seq_no and TBB task handle
790  t->task_impl_ = NULL;
791  t->seqno(0);
792  t->state(Task::INIT);
794  EnqueueUnLocked(t);
795 }
796 
798  std::scoped_lock lock(mutex_);
799 
800  running_ = false;
801 }
802 
804  std::scoped_lock lock(mutex_);
805 
806  running_ = true;
807 
808  // Run all tasks that may be suspended
810  return;
811 }
812 
814  for (TaskGroupDb::iterator iter = task_group_db_.begin();
815  iter != task_group_db_.end(); ++iter) {
816  TaskGroup *group = *iter;
817  if (group == NULL) {
818  continue;
819  }
820 
821  cout << "id: " << group->task_id() <<
822  " run: " << group->TaskRunCount() << endl;
823  cout << "deferq: " << group->deferq_size() <<
824  " task count: " << group->num_tasks() << endl;
825  }
826 }
827 
828 bool TaskScheduler::IsEmpty(bool running_only) {
829  TaskGroup *group;
830 
831  std::scoped_lock lock(mutex_);
832 
833  for (TaskGroupDb::iterator it = task_group_db_.begin();
834  it != task_group_db_.end(); ++it) {
835  if ((group = *it) == NULL) {
836  continue;
837  }
838  if (group->TaskRunCount()) {
839  return false;
840  }
841  if (group->IsDisabled()) {
842  continue;
843  }
844  if ((false == running_only) && (false == group->IsWaitQEmpty())) {
845  return false;
846  }
847  }
848 
849  return true;
850 }
851 std::string TaskScheduler::GetTaskName(int task_id) const {
852  for (TaskIdMap::const_iterator it = id_map_.begin(); it != id_map_.end();
853  it++) {
854  if (task_id == it->second)
855  return it->first;
856  }
857 
858  return "ERROR";
859 }
860 
861 int TaskScheduler::GetTaskId(const string &name) {
862  {
863  // Grab read-only lock first. Most of the time, task-id already exists
864  // in the id_map_. Hence there should not be any contention for lock
865  // aquisition.
866  std::shared_lock<std::shared_mutex> lock(id_map_mutex_);
867  TaskIdMap::iterator loc = id_map_.find(name);
868  if (loc != id_map_.end()) {
869  return loc->second;
870  }
871  }
872 
873  // Grab read-write lock to allocate a new task id and insert into the map.
874  std::unique_lock<std::shared_mutex> lock(id_map_mutex_);
875  int tid = ++id_max_;
876  id_map_.insert(make_pair(name, tid));
877  return tid;
878 }
879 
881  TaskGroup *group = GetTaskGroup(task_id);
882  if (group == NULL)
883  return;
884 
885  group->ClearTaskGroupStats();
886 }
887 
888 void TaskScheduler::ClearTaskStats(int task_id) {
889  TaskGroup *group = GetTaskGroup(task_id);
890  if (group == NULL)
891  return;
892 
893  group->ClearTaskStats();
894 }
895 
896 void TaskScheduler::ClearTaskStats(int task_id, int instance_id) {
897  TaskGroup *group = GetTaskGroup(task_id);
898  if (group == NULL)
899  return;
900 
901  group->ClearTaskStats(instance_id);
902 }
903 
905  TaskGroup *group = GetTaskGroup(task_id);
906  if (group == NULL)
907  return NULL;
908 
909  return group->GetTaskGroupStats();
910 }
911 
913  TaskGroup *group = GetTaskGroup(task_id);
914  if (group == NULL)
915  return NULL;
916 
917  return group->GetTaskStats();
918 }
919 
920 TaskStats *TaskScheduler::GetTaskStats(int task_id, int instance_id) {
921  TaskGroup *group = GetTaskGroup(task_id);
922  if (group == NULL)
923  return NULL;
924 
925  return group->GetTaskStats(instance_id);
926 }
927 
929  int threads;
930  threads = 0;
931 
932 #if defined(__linux__)
933  std::ostringstream file_name;
934  std::string line;
935 
936  file_name << "/proc/" << pid << "/status";
937 
938  std::ifstream file(file_name.str().c_str());
939 
940  if(!file) {
941  LOG(ERROR, "opening /proc failed");
942  return -1;
943  }
944 
945  while (threads == 0 && file.good()) {
946  getline(file, line);
947  if (line == "Threads:\t1") threads = 1;
948  }
949  file.close();
950 #else
951 #error "TaskScheduler::CountThreadsPerPid() - unsupported platform."
952 #endif
953 
954  return threads;
955 }
956 
958  //
959  // Wait for a bit to give a chance for all the threads to exit
960  //
961  usleep(1000);
962 
963  int count = 0;
964  int threadsRunning;
965  pid_t pid = getpid();
966 
967  while (count++ < 12000) {
968  threadsRunning = CountThreadsPerPid(pid);
969 
970  if (threadsRunning == 1)
971  break;
972 
973  if (threadsRunning == -1) {
974  LOG(ERROR, "could not check if any thread is running");
975  usleep(10000);
976  break;
977  }
978 
979  usleep(10000);
980  }
981 }
982 
984  if (task_monitor_) {
986  delete task_monitor_;
987  task_monitor_ = NULL;
988  }
989 
990  for (int i = 0; i < 10000; i++) {
991  if (IsEmpty()) break;
992  usleep(1000);
993  }
994  assert(IsEmpty());
995  if (tbb_awake_task_) {
997  delete tbb_awake_task_;
998  tbb_awake_task_ = NULL;
999  }
1000  evm_ = NULL;
1001  singleton_->task_scheduler_.terminate();
1003  singleton_.reset(NULL);
1004 }
1005 
1007  TaskInfo::reference running = task_running.local();
1008  running = unit_test;
1009 }
1010 
1012  TaskInfo::reference running = task_running.local();
1013  running = NULL;
1014 }
1015 
1017  ThreadAmpFactor_ = n;
1018 }
1019 
1021  TaskGroup *group = GetTaskGroup(task_id);
1022  if (!group->IsDisabled()) {
1023  // Add TaskEntries(that contain enqueued tasks) which are already
1024  // disabled to disable_ entry maintained at TaskGroup.
1025  group->SetDisable(true);
1026  group->AddEntriesToDisableQ();
1027  }
1028 }
1029 
1031  TaskGroup *group = GetTaskGroup(task_id);
1032  group->SetDisable(false);
1033  // Run tasks that maybe suspended
1034  group->RunDisableEntries();
1035 }
1036 
1037 void TaskScheduler::DisableTaskEntry(int task_id, int instance_id) {
1038  TaskEntry *entry = GetTaskEntry(task_id, instance_id);
1039  entry->SetDisable(true);
1040 }
1041 
1042 void TaskScheduler::EnableTaskEntry(int task_id, int instance_id) {
1043  TaskEntry *entry = GetTaskEntry(task_id, instance_id);
1044  entry->SetDisable(false);
1045  TaskGroup *group = GetTaskGroup(task_id);
1046  // If group is still disabled, do not schedule the task. Task will be
1047  // scheduled for run when TaskGroup is enabled.
1048  if (group->IsDisabled()) {
1049  return;
1050  }
1051  // Run task instances that maybe suspended
1052  if (entry->WaitQSize() != 0) {
1053  entry->RunDeferEntry();
1054  }
1055 }
1056 
1058 // Implementation for class TaskGroup
1060 
1061 TaskGroup::TaskGroup(int task_id) : task_code_id_(task_id), policy_set_(false),
1062  run_count_(0), execute_delay_(0), schedule_delay_(0), disable_(false) {
1063  total_run_time_ = 0;
1065  task_entry_ = new TaskEntry(task_id);
1066  memset(&stats_, 0, sizeof(stats_));
1068 }
1069 
1071  policy_.clear();
1072  deferq_.clear();
1073 
1074  delete task_entry_;
1075  task_entry_ = NULL;
1076 
1077  for (size_t i = 0; i < task_entry_db_.size(); i++) {
1078  if (task_entry_db_[i] != NULL) {
1079  delete task_entry_db_[i];
1080  task_entry_db_[i] = NULL;
1081  }
1082  }
1083 
1084  delete disable_entry_;
1085  disable_entry_ = NULL;
1086  task_entry_db_.clear();
1087 }
1088 
1089 TaskEntry *TaskGroup::GetTaskEntry(int task_instance) {
1090  if (task_instance == -1)
1091  return task_entry_;
1092 
1093  int size = task_entry_db_.size();
1094  if (size <= task_instance) {
1095  task_entry_db_.resize(task_instance + TaskGroup::kVectorGrowSize);
1096  }
1097 
1098  TaskEntry *entry = task_entry_db_.at(task_instance);
1099  if (entry == NULL) {
1100  entry = new TaskEntry(task_code_id_, task_instance);
1101  task_entry_db_[task_instance] = entry;
1102  }
1103 
1104  return entry;
1105 }
1106 
1107 TaskEntry *TaskGroup::QueryTaskEntry(int task_instance) const {
1108  if (task_instance == -1) {
1109  return task_entry_;
1110  }
1111 
1112  if (task_instance >= (int)task_entry_db_.size())
1113  return NULL;
1114 
1115  return task_entry_db_[task_instance];
1116 }
1117 
1119  policy_.push_back(group);
1120 }
1121 
1123  for (TaskGroupPolicyList::iterator it = policy_.begin();
1124  it != policy_.end(); ++it) {
1125  if ((*it)->run_count_ != 0) {
1126  return (*it);
1127  }
1128  }
1129  return NULL;
1130 }
1131 
1133  TaskGroup *group;
1134  if ((group = ActiveGroupInPolicy()) != NULL) {
1135  // TaskEntry is inserted in the deferq_ based on the Task seqno.
1136  // deferq_ comparison function uses the seqno of the first Task queued
1137  // in the waitq_. Therefore, add the Task to waitq_ before adding
1138  // TaskEntry in the deferq_.
1139  if (0 == entry->WaitQSize()) {
1140  entry->AddToWaitQ(task);
1141  }
1142  group->AddToDeferQ(entry);
1143  return true;
1144  }
1145  return false;
1146 }
1147 
1149  stats_.defer_count_++;
1150  deferq_.insert(*entry);
1151  assert(entry->deferq_task_group_ == NULL);
1152  entry->deferq_task_group_ = this;
1153 }
1154 
1156  assert(this == entry.deferq_task_group_);
1157  deferq_.erase(deferq_.iterator_to(entry));
1158  entry.deferq_task_group_ = NULL;
1159 }
1160 
1162  disable_entry_->AddToDeferQ(entry);
1163 }
1164 
1166  assert(policy_set_ == false);
1167  policy_set_ = true;
1168 }
1169 
1171  TaskDeferList::iterator it;
1172 
1173  it = deferq_.begin();
1174  while (it != deferq_.end()) {
1175  TaskEntry &entry = *it;
1176  TaskDeferList::iterator it_work = it++;
1177  DeleteFromDeferQ(*it_work);
1178  entry.RunDeferEntry();
1179  }
1180 
1181  return;
1182 }
1183 
1184 inline void TaskGroup::TaskExited(Task *t) {
1185  run_count_--;
1187 }
1188 
1191 }
1192 
1194  TaskEntry *entry;
1195  if (task_entry_->WaitQSize()) {
1197  }
1198 
1199  // Walk thru the task_entry_db_ and add if waitq is non-empty
1200  for (TaskEntryList::iterator it = task_entry_db_.begin();
1201  it != task_entry_db_.end(); ++it) {
1202  if ((entry = *it) == NULL) {
1203  continue;
1204  }
1205  if (entry->WaitQSize()) {
1206  AddToDisableQ(entry);
1207  }
1208  }
1209 }
1210 
1212  TaskEntry *entry;
1213 
1214  // Check the waitq_ of the instance -1
1215  if (task_entry_->WaitQSize()) {
1216  return false;
1217  }
1218 
1219  // Walk thru the task_entry_db_ until waitq_ of any of the task is non-zero
1220  for (TaskEntryList::iterator it = task_entry_db_.begin();
1221  it != task_entry_db_.end(); ++it) {
1222  if ((entry = *it) == NULL) {
1223  continue;
1224  }
1225  if (entry->IsDisabled()) {
1226  continue;
1227  }
1228  if (entry->WaitQSize()) {
1229  return false;
1230  }
1231  }
1232 
1233  // Well, no task has been enqueued in this task group
1234  return true;
1235 }
1236 
1238  memset(&stats_, 0, sizeof(stats_));
1239 }
1240 
1243 }
1244 
1245 void TaskGroup::ClearTaskStats(int task_instance) {
1246  TaskEntry *entry = QueryTaskEntry(task_instance);
1247  if (entry != NULL)
1248  entry->ClearTaskStats();
1249 }
1250 
1252  return &stats_;
1253 }
1254 
1256  return task_entry_->GetTaskStats();
1257 }
1258 
1259 TaskStats *TaskGroup::GetTaskStats(int task_instance) {
1260  TaskEntry *entry = QueryTaskEntry(task_instance);
1261  return entry->GetTaskStats();
1262 }
1263 
1265 // Implementation for class TaskEntry
1267 
1268 TaskEntry::TaskEntry(int task_id, int task_instance) : task_code_id_(task_id),
1269  task_data_id_(task_instance), run_count_(0), run_task_(NULL),
1270  waitq_(), deferq_task_entry_(NULL), deferq_task_group_(NULL),
1271  disable_(false) {
1272  // When a new TaskEntry is created, adds an implicit rule into policyq_ to
1273  // ensure that only one Task of an instance is run at a time
1274  if (task_instance != -1) {
1275  policyq_.push_back(this);
1276  }
1277  memset(&stats_, 0, sizeof(stats_));
1278  // allocate memory for deferq
1279  deferq_ = new TaskDeferList;
1280 }
1281 
1282 TaskEntry::TaskEntry(int task_id) : task_code_id_(task_id),
1283  task_data_id_(-1), run_count_(0), run_task_(NULL),
1284  deferq_task_entry_(NULL), deferq_task_group_(NULL), disable_(false) {
1285  memset(&stats_, 0, sizeof(stats_));
1286  // allocate memory for deferq
1287  deferq_ = new TaskDeferList;
1288 }
1289 
1291  policyq_.clear();
1292 
1293  assert(0 == deferq_->size());
1294  delete deferq_;
1295 }
1296 
1298  policyq_.push_back(entry);
1299 }
1300 
1302  for (TaskEntryList::iterator it = policyq_.begin(); it != policyq_.end();
1303  ++it) {
1304  if ((*it)->run_count_ != 0) {
1305  return (*it);
1306  }
1307  }
1308 
1309  return NULL;
1310 }
1311 
1313  TaskEntry *policy_entry;
1314 
1315  if ((policy_entry = ActiveEntryInPolicy()) != NULL) {
1316  // TaskEntry is inserted in the deferq_ based on the Task seqno.
1317  // deferq_ comparison function uses the seqno of the first Task queued
1318  // in the waitq_. Therefore, add the Task to waitq_ before adding
1319  // TaskEntry in the deferq_.
1320  if (0 == WaitQSize()) {
1321  AddToWaitQ(task);
1322  }
1323  policy_entry->AddToDeferQ(this);
1324  return true;
1325  }
1326  return false;
1327 }
1328 
1330  t->state(Task::WAIT);
1331  stats_.wait_count_++;
1332  waitq_.push_back(*t);
1333 
1335  TaskGroup *group = scheduler->GetTaskGroup(task_code_id_);
1336  group->stats_.wait_count_++;
1337 }
1338 
1340  TaskWaitQ::iterator it = waitq_.iterator_to(*t);
1341  waitq_.erase(it);
1342  return true;
1343 }
1344 
1346  stats_.defer_count_++;
1347  deferq_->insert(*entry);
1348  assert(entry->deferq_task_entry_ == NULL);
1349  entry->deferq_task_entry_ = this;
1350 }
1351 
1353  assert(this == entry.deferq_task_entry_);
1354  deferq_->erase(deferq_->iterator_to(entry));
1355  entry.deferq_task_entry_ = NULL;
1356 }
1357 
1359  stats_.run_count_++;
1360  if (t->task_data_id() != -1) {
1361  assert(run_task_ == NULL);
1362  assert (run_count_ == 0);
1363  run_task_ = t;
1364  }
1365 
1366  run_count_++;
1368  TaskGroup *group = scheduler->QueryTaskGroup(t->task_code_id());
1369  group->TaskStarted();
1370 
1371  t->StartTask(scheduler);
1372 }
1373 
1375  if (waitq_.size() == 0)
1376  return;
1377 
1378  TaskWaitQ::iterator it = waitq_.begin();
1379 
1380  if (task_data_id_ != -1) {
1381  Task *t = &(*it);
1382  DeleteFromWaitQ(t);
1383  RunTask(t);
1384  // If there are more tasks in waitq_, put them in deferq_
1385  if (waitq_.size() != 0) {
1386  AddToDeferQ(this);
1387  }
1388  } else {
1389  // Run all instances in waitq_
1390  while (it != waitq_.end()) {
1391  Task *t = &(*it);
1392  DeleteFromWaitQ(t);
1393  RunTask(t);
1394  if (waitq_.size() == 0)
1395  break;
1396  it = waitq_.begin();
1397  }
1398  }
1399 }
1400 
1403  TaskGroup *group = scheduler->GetTaskGroup(task_code_id_);
1404 
1405  // Sanity check
1406  assert(waitq_.size());
1407  Task *task = &(*waitq_.begin());
1408 
1409  // Check Task group policies
1410  if (group->DeferOnPolicyFail(this, task)) {
1411  return;
1412  }
1413 
1414  // Check Task entry policies
1415  if (DeferOnPolicyFail(task)) {
1416  return;
1417  }
1418 
1419  RunWaitQ();
1420  return;
1421 }
1422 
1424  TaskDeferList::iterator it;
1425 
1426  it = deferq_->begin();
1427  while (it != deferq_->end()) {
1428  TaskEntry &entry = *it;
1429  TaskDeferList::iterator it_work = it++;
1430  DeleteFromDeferQ(*it_work);
1431  entry.RunDeferEntry();
1432  }
1433 
1434  return;
1435 }
1436 
1438  TaskDeferList::iterator it;
1439 
1440  it = deferq_->begin();
1441  while (it != deferq_->end()) {
1442  TaskEntry &entry = *it;
1443  TaskDeferList::iterator it_work = it++;
1444  DeleteFromDeferQ(*it_work);
1445  if (!entry.IsDisabled()) {
1446  entry.RunDeferEntry();
1447  }
1448  }
1449 
1450  return;
1451 }
1452 
1455  TaskGroup *group = scheduler->QueryTaskGroup(task_code_id_);
1456  TaskDeferEntryCmp defer_entry_compare;
1457 
1458  TaskDeferList::iterator group_it = group->deferq_.begin();
1459  TaskDeferList::iterator entry_it = deferq_->begin();
1460 
1461  // Loop thru the deferq_ of TaskEntry and TaskGroup in the temporal order.
1462  // Exit the loop when any of the queues become empty.
1463  while ((group_it != group->deferq_.end()) &&
1464  (entry_it != deferq_->end())) {
1465  TaskEntry &g_entry = *group_it;
1466  TaskEntry &t_entry = *entry_it;
1467 
1468  if (defer_entry_compare(g_entry, t_entry)) {
1469  TaskDeferList::iterator group_it_work = group_it++;
1470  group->DeleteFromDeferQ(*group_it_work);
1471  g_entry.RunDeferEntry();
1472  } else {
1473  TaskDeferList::iterator entry_it_work = entry_it++;
1474  DeleteFromDeferQ(*entry_it_work);
1475  t_entry.RunDeferEntry();
1476  }
1477  }
1478 
1479  // Now, walk thru the non-empty deferq_
1480  if (group_it != group->deferq_.end()) {
1481  group->RunDeferQ();
1482  } else if (entry_it != deferq_->end()) {
1483  RunDeferQ();
1484  }
1485 }
1486 
1488  if (task_data_id_ != -1) {
1489  assert(run_task_ == t);
1490  run_task_ = NULL;
1491  assert(run_count_ == 1);
1492  }
1493 
1494  run_count_--;
1497  group->TaskExited(t);
1498 
1499  if (!group->run_count_ && !run_count_) {
1501  } else if (!group->run_count_) {
1502  group->RunDeferQ();
1503  } else if (!run_count_) {
1504  RunDeferQ();
1505  }
1506 }
1507 
1509  deferq_->clear();
1510  policyq_.clear();
1511  waitq_.clear();
1512 }
1513 
1515  memset(&stats_, 0, sizeof(stats_));
1516 }
1517 
1519  return &stats_;
1520 }
1521 
1522 boost::optional<uint64_t> TaskEntry::GetTaskDeferEntrySeqno() const {
1523  if(waitq_.size()) {
1524  const Task *task = &(*waitq_.begin());
1525  return task->seqno();
1526  }
1527 
1528  return boost::none;
1529 }
1530 
1532 // Implementation for class Task
1534 Task::Task(int task_id, int task_instance) : task_code_id_(task_id),
1535  task_data_id_(task_instance), task_impl_(NULL), state_(INIT),
1536  tbb_state_(TBB_INIT), seqno_(0), task_recycle_(false), task_cancel_(false),
1537  enqueue_time_(0), schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
1538 }
1539 
1540 Task::Task(int task_id) : task_code_id_(task_id),
1541  task_data_id_(-1), task_impl_(NULL), state_(INIT), tbb_state_(TBB_INIT),
1542  seqno_(0), task_recycle_(false), task_cancel_(false), enqueue_time_(0),
1543  schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
1544 }
1545 
1546 
1547 void Task::StartTask(TaskScheduler *scheduler) {
1548  if (enqueue_time_ != 0) {
1550  if ((schedule_time_ - enqueue_time_) >
1551  scheduler->schedule_delay(this)) {
1552  TASK_TRACE(scheduler, this, "Schedule delay(in usec) ",
1554  }
1555  }
1556  assert(task_impl_ == NULL);
1557  state(RUN);
1559  task_impl_ = new (task::allocate_root())TaskImpl(this);
1560  if (scheduler->use_spawn()) {
1561  task::spawn(*task_impl_);
1562  } else {
1563  task::enqueue(*task_impl_);
1564  }
1565 }
1566 
1568  TaskInfo::reference running = task_running.local();
1569  return running;
1570 }
1571 
1572 ostream& operator<<(ostream& out, const Task &t) {
1573  out << "Task <" << t.task_code_id_ << "," << t.task_data_id_ << ":"
1574  << t.seqno_ << "> ";
1575  return out;
1576 }
1577 
1579 // Implementation for sandesh APIs for Task
1581 void TaskEntry::GetSandeshData(SandeshTaskEntry *resp) const {
1582  resp->set_instance_id(task_data_id_);
1583  resp->set_tasks_created(stats_.enqueue_count_);
1584  resp->set_total_tasks_completed(stats_.total_tasks_completed_);
1585  resp->set_tasks_running(run_count_);
1586  resp->set_waitq_size(waitq_.size());
1587  resp->set_deferq_size(deferq_->size());
1588  resp->set_last_exit_time(stats_.last_exit_time_);
1589 }
1590 void TaskGroup::GetSandeshData(SandeshTaskGroup *resp, bool summary) const {
1591  if (total_run_time_)
1592  resp->set_total_run_time(duration_usecs_to_string(total_run_time_));
1593 
1594  std::vector<SandeshTaskEntry> list;
1595  TaskEntry *task_entry = QueryTaskEntry(-1);
1596  if (task_entry) {
1597  SandeshTaskEntry entry_resp;
1598  task_entry->GetSandeshData(&entry_resp);
1599  list.push_back(entry_resp);
1600  }
1601  for (TaskEntryList::const_iterator it = task_entry_db_.begin();
1602  it != task_entry_db_.end(); ++it) {
1603  task_entry = *it;
1604  if (task_entry) {
1605  SandeshTaskEntry entry_resp;
1606  task_entry->GetSandeshData(&entry_resp);
1607  list.push_back(entry_resp);
1608  }
1609  }
1610  resp->set_task_entry_list(list);
1611 
1612  if (summary)
1613  return;
1614 
1616  std::vector<SandeshTaskPolicyEntry> policy_list;
1617  for (TaskGroupPolicyList::const_iterator it = policy_.begin();
1618  it != policy_.end(); ++it) {
1619  SandeshTaskPolicyEntry policy_entry;
1620  policy_entry.set_task_name(scheduler->GetTaskName((*it)->task_code_id_));
1621  policy_entry.set_tasks_running((*it)->run_count_);
1622  policy_list.push_back(policy_entry);
1623  }
1624  resp->set_task_policy_list(policy_list);
1625 }
1626 
1627 void TaskScheduler::GetSandeshData(SandeshTaskScheduler *resp, bool summary) {
1628  std::scoped_lock lock(mutex_);
1629 
1630  resp->set_running(running_);
1631  resp->set_use_spawn(use_spawn_);
1632  resp->set_total_count(seqno_);
1633  resp->set_thread_count(hw_thread_count_);
1634 
1635  std::vector<SandeshTaskGroup> list;
1636  for (TaskIdMap::const_iterator it = id_map_.begin(); it != id_map_.end();
1637  it++) {
1638  SandeshTaskGroup resp_group;
1639  TaskGroup *group = QueryTaskGroup(it->second);
1640  resp_group.set_task_id(it->second);
1641  resp_group.set_name(it->first);
1642  if (group)
1643  group->GetSandeshData(&resp_group, summary);
1644  list.push_back(resp_group);
1645  }
1646  resp->set_task_group_list(list);
1647 }
static void GetTaskStats(TaskProfileStats *stats, int index, ProfileData *data)
A class maintaning information for every <task, instance>
Definition: task.cc:95
boost::optional< uint64_t > GetTaskDeferEntrySeqno() const
Addition/deletion of TaskEntry in the deferq_ is based on the seqno. seqno of the first Task in the w...
Definition: task.cc:1522
bool DeleteFromWaitQ(Task *t)
Definition: task.cc:1339
TaskEntryList policyq_
Policy rules for a task.
Definition: task.cc:192
void SetDisable(bool disable)
Disables this task entry.
Definition: task.cc:155
bool IsDisabled()
Definition: task.cc:156
void RunDeferQForGroupEnable()
Starts executing tasks from deferq_ of TaskEntries which are enabled.
Definition: task.cc:1437
int task_code_id() const
Returns the code ID of this task entry.
Definition: task.cc:146
int task_data_id_
Definition: task.cc:180
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskEntry.
Definition: task.cc:1423
void AddPolicy(TaskEntry *entry)
Definition: task.cc:1297
TaskWaitQ waitq_
Tasks waiting to run on some condition.
Definition: task.cc:189
TaskEntry * ActiveEntryInPolicy()
Definition: task.cc:1301
TaskDeferList * deferq_
Tasks deferred for this to exit.
Definition: task.cc:195
int task_data_id() const
Returns the data ID of this task entry.
Definition: task.cc:149
TaskStats * GetTaskStats()
Definition: task.cc:1518
DISALLOW_COPY_AND_ASSIGN(TaskEntry)
void AddToWaitQ(Task *t)
Definition: task.cc:1329
boost::intrusive::list< Task, WaitQHook > TaskWaitQ
Definition: task.cc:166
void GetSandeshData(SandeshTaskEntry *resp) const
Definition: task.cc:1581
void ClearQueues()
Definition: task.cc:1508
Task * run_task_
Task currently running.
Definition: task.cc:186
void TaskExited(Task *t, TaskGroup *group)
Definition: task.cc:1487
TaskGroup * deferq_task_group_
Definition: task.cc:197
int run_count_
No. of tasks running.
Definition: task.cc:183
bool DeferOnPolicyFail(Task *t)
Definition: task.cc:1312
TaskStats stats_
Cummulative Maintenance stats.
Definition: task.cc:201
void RunDeferEntry()
Definition: task.cc:1401
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>, &TaskEntry::task_defer_node > TaskDeferListOption
Definition: task.cc:171
void ClearTaskStats()
Definition: task.cc:1514
bool disable_
Definition: task.cc:198
TaskEntry(int task_id)
Definition: task.cc:1282
boost::intrusive::member_hook< Task, boost::intrusive::list_member_hook<>, &Task::waitq_hook_ > WaitQHook
List of Task's in waitq_.
Definition: task.cc:165
void DeleteFromDeferQ(TaskEntry &entry)
Deletes a task from deferq_.
Definition: task.cc:1352
void RunWaitQ()
Definition: task.cc:1374
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
Definition: task.cc:177
TaskEntry * deferq_task_entry_
Definition: task.cc:196
void RunCombinedDeferQ()
Starts executing tasks from deferq_ of TaskEntry and TaskGroup in the temporal order.
Definition: task.cc:1453
void AddToDeferQ(TaskEntry *entry)
Adds a task to deferq_. Only one task of a given instance goes into deferq_ for its policies.
Definition: task.cc:1345
void RunTask(Task *t)
Starts a task. If there are more entries in waitq_ add them to deferq_.
Definition: task.cc:1358
~TaskEntry()
Definition: task.cc:1290
size_t WaitQSize() const
Definition: task.cc:102
boost::intrusive::set_member_hook task_defer_node
Definition: task.cc:168
int task_code_id_
Definition: task.cc:179
int GetRunCount() const
Returns the count of runs for this task entry.
Definition: task.cc:152
TaskGroup maintains per <task-id> information including,.
Definition: task.cc:229
int task_code_id_
Definition: task.cc:317
uint32_t execute_delay_
Definition: task.cc:340
DISALLOW_COPY_AND_ASSIGN(TaskGroup)
void SetDisable(bool disable)
Definition: task.cc:283
void IncrementTotalRunTime(int64_t rtime)
Definition: task.cc:276
void ClearTaskStats()
Definition: task.cc:1241
bool disable_
Definition: task.cc:342
friend class TaskEntry
Definition: task.cc:301
void ClearTaskGroupStats()
Definition: task.cc:1237
int task_id() const
Definition: task.cc:287
bool DeferOnPolicyFail(TaskEntry *entry, Task *t)
Definition: task.cc:1132
void RunDisableEntries()
Run tasks that maybe suspended. Schedule tasks only for TaskEntries which are enabled.
Definition: task.cc:1189
void AddPolicy(TaskGroup *group)
Definition: task.cc:1118
TaskEntry * disable_entry_
Task entry for disabled group.
Definition: task.cc:336
size_t num_tasks() const
Definition: task.cc:289
int TaskRunCount() const
Definition: task.cc:265
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskGroup.
Definition: task.cc:1170
void TaskExited(Task *t)
Definition: task.cc:1184
void AddToDisableQ(TaskEntry *entry)
Enqueue TaskEntry in disable_entry's deferQ.
Definition: task.cc:1161
void DeleteFromDeferQ(TaskEntry &entry)
Delete task from deferq_.
Definition: task.cc:1155
static const int kVectorGrowSize
Definition: task.cc:316
uint32_t schedule_delay_
Definition: task.cc:341
bool IsWaitQEmpty()
Returns true, if the waiq_ of all the tasks in the group are empty.
Definition: task.cc:1211
void AddEntriesToDisableQ()
Add TaskEntries to disable_entry_ which have tasks enqueued and are already disabled.
Definition: task.cc:1193
bool IsDisabled()
Definition: task.cc:284
void GetSandeshData(SandeshTaskGroup *resp, bool summary) const
Definition: task.cc:1590
void AddToDeferQ(TaskEntry *entry)
Add task to deferq_ Only one task of a given instance goes into deferq_ for its policies.
Definition: task.cc:1148
std::vector< TaskGroup * > TaskGroupPolicyList
Vector of Task Group policies.
Definition: task.cc:305
TaskEntry * GetDisableEntry()
Definition: task.cc:249
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>, &TaskEntry::task_defer_node > TaskDeferListOption
Definition: task.cc:308
~TaskGroup()
Definition: task.cc:1070
TaskStats * GetTaskStats()
Definition: task.cc:1255
TaskDeferList deferq_
Tasks deferred till run_count_ is 0.
Definition: task.cc:330
bool policy_set_
Specifies if policy is already set.
Definition: task.cc:320
TaskEntry * GetTaskEntry(int task_instance)
Definition: task.cc:1089
TaskEntryList task_entry_db_
task-entries in this group
Definition: task.cc:339
std::atomic< uint64_t > total_run_time_
Definition: task.cc:324
TaskEntry * task_entry_
Tasks deferred till run_count_ is 0.
Definition: task.cc:333
TaskStats stats_
Definition: task.cc:344
TaskGroup(int task_id)
Definition: task.cc:1061
int run_count_
No. of tasks running in the group.
Definition: task.cc:323
TaskEntry * QueryTaskEntry(int task_instance) const
Definition: task.cc:1107
size_t deferq_size() const
Definition: task.cc:288
void TaskStarted()
Definition: task.cc:275
TaskGroupPolicyList policy_
Policy rules for the group.
Definition: task.cc:327
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
Definition: task.cc:314
void PolicySet()
Definition: task.cc:1165
TaskGroup * ActiveGroupInPolicy()
Definition: task.cc:1122
TaskStats * GetTaskGroupStats()
Definition: task.cc:1251
A private class used to implement tbb::task An object is created when task is ready for execution and...
Definition: task.cc:47
DISALLOW_COPY_AND_ASSIGN(TaskImpl)
virtual ~TaskImpl()
Destructor is called when a task execution is compeleted. Invoked implicitly by tbb::task....
Definition: task.cc:407
TaskImpl(Task *t)
Definition: task.cc:49
tbb::task * execute()
Method called from tbb::task to execute. Invoke Run() method of client. Supports task continuation wh...
Definition: task.cc:352
Task * parent_
Definition: task.cc:63
void Terminate()
Definition: task_monitor.cc:61
void Start(EventManager *evm)
Definition: task_monitor.cc:51
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void EnqueueUnLocked(Task *task)
Definition: task.cc:647
uint64_t enqueue_count_
Definition: task.h:513
uint64_t done_count_
Definition: task.h:514
bool IsTaskGroupEmpty(int task_id) const
Check if there are any Tasks in the given TaskGroup. Assumes that all task ids are mutually exclusive...
Definition: task.cc:581
void Stop()
Stops scheduling of all tasks.
Definition: task.cc:797
bool measure_delay_
Definition: task.h:505
TaskTbbKeepAwake * tbb_awake_task_
Definition: task.h:522
TaskEntry * stop_entry_
Definition: task.h:489
uint64_t cancel_count_
Definition: task.h:515
int hw_thread_count_
Definition: task.h:502
static boost::scoped_ptr< TaskScheduler > singleton_
Definition: task.h:462
TaskIdMap id_map_
Definition: task.h:498
void Terminate()
Definition: task.cc:983
void EnableTaskGroup(int task_id)
Definition: task.cc:1030
int GetTaskId(const std::string &name)
Definition: task.cc:861
static int GetThreadCount(int thread_count=0)
Get number of tbb worker threads. For testing purposes only. Limit the number of tbb worker threads.
Definition: task.cc:414
CancelReturnCode
Definition: task.h:331
@ CANCELLED
Definition: task.h:332
std::mutex mutex_
Definition: task.h:492
tbb::task_scheduler_init task_scheduler_
Definition: task.h:491
void Log(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)
Definition: task.cc:530
TaskGroup * QueryTaskGroup(int task_id)
Query TaskGroup for a task_id.Assumes valid entry is present for task_id.
Definition: task.cc:577
LogFn log_fn_
Definition: task.h:501
static void SetThreadAmpFactor(int n)
following function allows one to increase max num of threads used by TBB
Definition: task.cc:1016
boost::function< void(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)> LogFn
Definition: task.h:308
TaskMonitor * task_monitor_
Definition: task.h:523
void GetSandeshData(SandeshTaskScheduler *resp, bool summary)
Definition: task.cc:1627
int CountThreadsPerPid(pid_t pid)
Platfrom-dependent subroutine in Linux and FreeBSD implementations, used only in TaskScheduler::WaitF...
Definition: task.cc:928
TaskScheduler(int thread_count=0)
TaskScheduler constructor. TBB assumes it can use the "thread" invoking tbb::scheduler can be used fo...
Definition: task.cc:451
void WaitForTerminateCompletion()
Definition: task.cc:957
void DisableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1037
void DisableTaskGroup(int task_id)
Definition: task.cc:1020
uint32_t execute_delay_
Log if time taken to execute exceeds the delay.
Definition: task.h:511
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
Definition: task.cc:512
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static int GetDefaultThreadCount()
Definition: task.cc:436
TaskStats * GetTaskGroupStats(int task_id)
Definition: task.cc:904
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec, uint64_t inactivity_time_msec, uint64_t poll_interval_msec)
Enable Task monitoring.
Definition: task.cc:518
TaskGroupDb task_group_db_
Definition: task.h:495
EventManager * evm_
Definition: task.h:516
void ClearRunningTask()
Definition: task.cc:1011
~TaskScheduler()
Frees up the task_entry_db_ allocated for scheduler.
Definition: task.cc:462
static void Initialize(uint32_t thread_count=0, EventManager *evm=NULL)
Definition: task.cc:485
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
Definition: task.cc:617
std::string GetTaskName(int task_id) const
Definition: task.cc:851
void EnableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1042
int id_max_
Definition: task.h:499
uint64_t seqno_
Definition: task.h:494
TaskEntry * GetTaskEntry(int task_id, int instance_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:590
void SetLatencyThreshold(const std::string &name, uint32_t execute, uint32_t schedule)
Definition: task.cc:609
static TaskScheduler * GetInstance()
Definition: task.cc:554
static int ThreadAmpFactor_
following variable allows one to increase max num of threads used by TBB
Definition: task.h:520
void Start()
Starts scheduling of all tasks.
Definition: task.cc:803
TaskGroup * GetTaskGroup(int task_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:561
TaskEntry * QueryTaskEntry(int task_id, int instance_id)
Query TaskEntry for a task-id and task-instance.
Definition: task.cc:595
uint32_t schedule_delay_
Log if time between enqueue and task-execute exceeds the delay.
Definition: task.h:508
void ClearTaskStats(int task_id)
Definition: task.cc:888
TaskStats * GetTaskStats(int task_id)
Definition: task.cc:912
bool running_
Definition: task.h:493
void OnTaskExit(Task *task)
Method invoked on exit of a Task. Exit of a task can potentially start tasks in pendingq.
Definition: task.cc:767
bool track_run_time_
Definition: task.h:504
bool track_run_time() const
Definition: task.h:423
bool use_spawn_
Use spawn() to run a tbb::task instead of enqueue()
Definition: task.h:488
void RegisterLog(LogFn fn)
Definition: task.cc:538
static bool ShouldUseSpawn()
Definition: task.cc:440
void set_event_manager(EventManager *evm)
Definition: task.cc:500
uint32_t schedule_delay() const
Definition: task.h:427
void Print()
Debug print routine.
Definition: task.cc:813
uint32_t execute_delay() const
Definition: task.h:428
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Definition: task.cc:704
static const int kVectorGrowSize
Definition: task.h:461
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true,...
Definition: task.cc:828
std::shared_mutex id_map_mutex_
Definition: task.h:497
bool use_spawn() const
Definition: task.h:450
void ClearTaskGroupStats(int task_id)
Definition: task.cc:880
void EnableLatencyThresholds(uint32_t execute, uint32_t schedule)
Enable logging of tasks exceeding configured latency.
Definition: task.cc:602
void SetRunningTask(Task *)
This function should not be called in production code. It is only for unit testing to control current...
Definition: task.cc:1006
bool StartTbbKeepAwakeTask(TaskScheduler *ts, EventManager *event_mgr, const std::string task_name, uint32_t tbbKeepawakeTimeout=1000)
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
static Task * Running()
Returns a pointer to the current task the code is executing under.
Definition: task.cc:1567
uint64_t seqno() const
Returns the sequence number of this task.
Definition: task.h:139
bool task_recycle_
Determines if the task must be rescheduled (reused) after its completion.
Definition: task.h:218
void StartTask(TaskScheduler *scheduler)
Starts execution of a task.
Definition: task.cc:1547
void tbb_state(TbbState s)
Sets a TBB state for the task.
Definition: task.h:183
uint64_t seqno_
Stores the sequence number.
Definition: task.h:214
State state_
Stores a state of the task.
Definition: task.h:208
friend class TaskImpl
Gives access to private members for TaskImpl class.
Definition: task.h:177
int task_data_id_
The dataset id within a code path.
Definition: task.h:201
virtual void OnTaskCancel()
Called on task exit, if it is marked for cancellation. If the user wants to do any cleanup on task ca...
Definition: task.h:125
int task_code_id() const
Returns the code ID of this task.
Definition: task.h:133
tbb::task * task_impl_
A pointer to an Intel TBB object storing low-level information to manage the task.
Definition: task.h:205
int task_data_id() const
Returns the data ID of this task.
Definition: task.h:136
Task(int task_id, int task_data_id)
Creates a new task with the given values of task code ID and task data ID.
Definition: task.cc:1534
uint64_t schedule_time_
Contains the time when the task was started.
Definition: task.h:228
@ TBB_DONE
Definition: task.h:100
@ TBB_INIT
Definition: task.h:97
@ TBB_EXEC
Definition: task.h:99
@ TBB_ENQUEUED
Definition: task.h:98
bool task_cancel_
Determines if the task's execution was canceled.
Definition: task.h:221
uint32_t execute_delay_
Sets threshold for the task's execution time. If the threshold is exceeded, the event is logged.
Definition: task.h:232
State state() const
Returns a state value of a task.
Definition: task.h:130
uint32_t schedule_delay_
Sets threshold for delay between enqueueing and execution. If the threshold is exceeded,...
Definition: task.h:236
int task_code_id_
The code path executed by the task.
Definition: task.h:198
uint64_t enqueue_time_
Contains the time when the task was enqueued for execution.
Definition: task.h:225
@ WAIT
A task is waiting in a queue.
Definition: task.h:89
@ RUN
A task is being run.
Definition: task.h:92
@ INIT
A task was initialized.
Definition: task.h:86
boost::intrusive::list_member_hook waitq_hook_
Definition: task.h:239
static EventManager evm
#define LOG(_Level, _Msg)
Definition: logging.h:34
bool unit_test()
Definition: bgp_log.cc:53
Comparison routine for the TaskDeferList.
Definition: task.cc:207
uint64_t total_tasks_completed_
Number of total tasks ran.
Definition: task.h:289
uint64_t last_exit_time_
Number of time stamp of latest exist.
Definition: task.h:292
int run_count_
Number of entries currently running.
Definition: task.h:280
int defer_count_
Number of entries in deferq.
Definition: task.h:283
int wait_count_
Number of entries in waitq.
Definition: task.h:277
uint64_t enqueue_count_
Number of tasks enqueued.
Definition: task.h:286
tbb::enumerable_thread_specific< Task * > TaskInfo
Definition: task.cc:28
#define TASK_TRACE(scheduler, task, msg, delay)
Definition: task.cc:39
std::vector< TaskEntry * > TaskEntryList
Definition: task.cc:35
ostream & operator<<(ostream &out, const Task &t)
Definition: task.cc:1572
static TaskInfo task_running
Definition: task.cc:32
std::vector< TaskExclusion > TaskPolicy
Defines a type to store an execution policy (a list of task exclusions).
Definition: task.h:270
#define CHECK_CONCURRENCY(...)
struct task_ task
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29
static const std::string duration_usecs_to_string(const uint64_t usecs)
Definition: time_util.h:62
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13