OpenSDN source code
task.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <assert.h>
6 #include <fstream>
7 #include <map>
8 #include <iostream>
9 #include <boost/intrusive/set.hpp>
10 #include <boost/optional.hpp>
11 
12 #include "tbb/atomic.h"
13 #include "tbb/task.h"
14 #include "tbb/enumerable_thread_specific.h"
15 #include "base/logging.h"
16 #include "base/task.h"
17 #include "base/task_annotations.h"
18 #include "base/task_tbbkeepawake.h"
19 #include "base/task_monitor.h"
20 
21 #include <base/sandesh/task_types.h>
22 
23 using namespace std;
24 using tbb::task;
25 
27 class TaskEntry;
28 struct TaskDeferEntryCmp;
29 
30 typedef tbb::enumerable_thread_specific<Task *> TaskInfo;
31 
33 
34 // Vector of Task entries
35 typedef std::vector<TaskEntry *> TaskEntryList;
36 
37 boost::scoped_ptr<TaskScheduler> TaskScheduler::singleton_;
38 
39 #define TASK_TRACE(scheduler, task, msg, delay)\
40  do {\
41  scheduler->Log(__FILE__, __LINE__, task, msg, delay);\
42  } while (false)
43 
47 class TaskImpl : public tbb::task {
48 public:
49  TaskImpl(Task *t) : parent_(t) {};
50 
54  virtual ~TaskImpl();
55 
56 private:
57 
61  tbb::task *execute();
62 
64 
66 };
67 
95 class TaskEntry {
96 public:
97  TaskEntry(int task_id);
98  TaskEntry(int task_id, int task_instance);
99  ~TaskEntry();
100 
101  void AddPolicy(TaskEntry *entry);
102  size_t WaitQSize() const { return waitq_.size(); };
103  void AddToWaitQ(Task *t);
104  bool DeleteFromWaitQ(Task *t);
105 
108  void AddToDeferQ(TaskEntry *entry);
109 
111  void DeleteFromDeferQ(TaskEntry &entry);
112 
113  TaskEntry *ActiveEntryInPolicy();
114  bool DeferOnPolicyFail(Task *t);
115 
118  void RunTask(Task *t);
119 
121  void RunDeferQ();
122 
125  void RunCombinedDeferQ();
126  void RunWaitQ();
127  void RunDeferEntry();
128 
131  void RunDeferQForGroupEnable();
132 
133  void TaskExited(Task *t, TaskGroup *group);
135  void ClearTaskStats();
136  void ClearQueues();
137 
143  boost::optional<uint64_t> GetTaskDeferEntrySeqno() const;
144 
146  int task_code_id() const { return task_code_id_; }
147 
149  int task_data_id() const { return task_data_id_; }
150 
152  int GetRunCount() const { return run_count_; }
153 
155  void SetDisable(bool disable) { disable_ = disable; }
156  bool IsDisabled() { return disable_; }
157  void GetSandeshData(SandeshTaskEntry *resp) const;
158 
159 private:
160  friend class TaskGroup;
161  friend class TaskScheduler;
162 
164  typedef boost::intrusive::member_hook<Task,
165  boost::intrusive::list_member_hook<>, &Task::waitq_hook_> WaitQHook;
166  typedef boost::intrusive::list<Task, WaitQHook> TaskWaitQ;
167 
168  boost::intrusive::set_member_hook<> task_defer_node;
169  typedef boost::intrusive::member_hook<TaskEntry,
170  boost::intrusive::set_member_hook<>,
172 
176  typedef boost::intrusive::set<TaskEntry, TaskDeferListOption,
177  boost::intrusive::compare<TaskDeferEntryCmp> > TaskDeferList;
178 
181 
184 
187 
190 
193 
198  bool disable_;
199 
202 
204 };
205 
208  bool operator() (const TaskEntry &lhs, const TaskEntry &rhs) const {
209  return (lhs.GetTaskDeferEntrySeqno() <
210  rhs.GetTaskDeferEntrySeqno());
211  }
212 };
213 
229 class TaskGroup {
230 public:
231  TaskGroup(int task_id);
232  ~TaskGroup();
233 
234  TaskEntry *QueryTaskEntry(int task_instance) const;
235  TaskEntry *GetTaskEntry(int task_instance);
236  void AddPolicy(TaskGroup *group);
237 
240  void AddToDeferQ(TaskEntry *entry);
241 
243  void AddToDisableQ(TaskEntry *entry);
244 
247  void AddEntriesToDisableQ();
248 
249  TaskEntry *GetDisableEntry() { return disable_entry_; }
250 
252  void DeleteFromDeferQ(TaskEntry &entry);
253  TaskGroup *ActiveGroupInPolicy();
254  bool DeferOnPolicyFail(TaskEntry *entry, Task *t);
255 
263  bool IsWaitQEmpty();
264 
265  int TaskRunCount() const {return run_count_;};
266 
268  void RunDeferQ();
269 
272  void RunDisableEntries();
273  void TaskExited(Task *t);
274  void PolicySet();
275  void TaskStarted() {run_count_++;};
276  void IncrementTotalRunTime(int64_t rtime) { total_run_time_ += rtime; }
277  TaskStats *GetTaskGroupStats();
279  TaskStats *GetTaskStats(int task_instance);
280  void ClearTaskGroupStats();
281  void ClearTaskStats();
282  void ClearTaskStats(int instance_id);
283  void SetDisable(bool disable) { disable_ = disable; }
284  bool IsDisabled() { return disable_; }
285  void GetSandeshData(SandeshTaskGroup *resp, bool summary) const;
286 
287  int task_id() const { return task_code_id_; }
288  size_t deferq_size() const { return deferq_.size(); }
289  size_t num_tasks() const {
290  size_t count = 0;
291  for (TaskEntryList::const_iterator it = task_entry_db_.begin();
292  it != task_entry_db_.end(); ++it) {
293  if (*it != NULL) {
294  count++;
295  }
296  }
297  return count;
298  }
299 
300 private:
301  friend class TaskEntry;
302  friend class TaskScheduler;
303 
305  typedef std::vector<TaskGroup *> TaskGroupPolicyList;
306  typedef boost::intrusive::member_hook<TaskEntry,
307  boost::intrusive::set_member_hook<>,
309 
313  typedef boost::intrusive::set<TaskEntry, TaskDeferListOption,
314  boost::intrusive::compare<TaskDeferEntryCmp> > TaskDeferList;
315 
316  static const int kVectorGrowSize = 16;
318 
321 
324  tbb::atomic<uint64_t> total_run_time_;
325 
328 
331 
334 
337 
340  uint32_t execute_delay_;
341  uint32_t schedule_delay_;
342  bool disable_;
343 
346 };
347 
349 // Implementation for class TaskImpl
351 
353  TaskInfo::reference running = task_running.local();
354  running = parent_;
355  parent_->tbb_state(Task::TBB_EXEC);
356  try {
357  uint64_t t = 0;
358  if (parent_->enqueue_time() != 0) {
359  t = ClockMonotonicUsec();
361  if ((t - parent_->enqueue_time()) >
362  scheduler->schedule_delay(parent_)) {
363  TASK_TRACE(scheduler, parent_, "TBB schedule time(in usec) ",
364  (t - parent_->enqueue_time()));
365  }
366  } else if (TaskScheduler::GetInstance()->track_run_time()) {
367  t = ClockMonotonicUsec();
368  }
369  bool is_complete = parent_->Run();
370  if (t != 0) {
371  int64_t delay = ClockMonotonicUsec() - t;
373  uint32_t execute_delay = scheduler->execute_delay(parent_);
374  if (execute_delay && delay > execute_delay) {
375  TASK_TRACE(scheduler, parent_, "Run time(in usec) ", delay);
376  }
377  if (scheduler->track_run_time()) {
378  TaskGroup *group =
379  scheduler->QueryTaskGroup(parent_->task_code_id());
380  group->IncrementTotalRunTime(delay);
381  }
382  }
383  running = NULL;
384  if (is_complete == true) {
385  parent_->set_task_complete();
386  } else {
387  parent_->set_task_recycle();
388  }
389  } catch (std::exception &e) {
390 
391  // Store exception information statically, to easily read exception
392  // information from the core.
393  static std::string what = e.what();
394 
395  LOG(ERROR, "!!!! ERROR !!!! Task caught fatal exception: " << what
396  << " TaskImpl: " << this);
397  assert(0);
398  } catch (...) {
399  LOG(ERROR, "!!!! ERROR !!!! Task caught fatal unknown exception"
400  << " TaskImpl: " << this);
401  assert(0);
402  }
403 
404  return NULL;
405 }
406 
408  assert(parent_ != NULL);
409 
411  sched->OnTaskExit(parent_);
412 }
413 
414 int TaskScheduler::GetThreadCount(int thread_count) {
415  static bool init_;
416  static int num_cores_;
417 
418  if (init_) {
419  return num_cores_ * ThreadAmpFactor_;
420  }
421 
422  char *num_cores_str = getenv("TBB_THREAD_COUNT");
423  if (!num_cores_str) {
424  if (thread_count == 0)
425  num_cores_ = tbb::task_scheduler_init::default_num_threads();
426  else
427  num_cores_ = thread_count;
428  } else {
429  num_cores_ = strtol(num_cores_str, NULL, 0);
430  }
431 
432  init_ = true;
433  return num_cores_ * ThreadAmpFactor_;
434 }
435 
437  return tbb::task_scheduler_init::default_num_threads();
438 }
439 
441  if (getenv("TBB_USE_SPAWN"))
442  return true;
443 
444  return false;
445 }
446 
448 // Implementation for class TaskScheduler
450 
453  running_(true), seqno_(0), id_max_(0), log_fn_(), track_run_time_(false),
455  enqueue_count_(0), done_count_(0), cancel_count_(0), evm_(NULL),
456  tbb_awake_task_(NULL), task_monitor_(NULL) {
457  hw_thread_count_ = GetThreadCount(task_count);
459  stop_entry_ = new TaskEntry(-1);
460 }
461 
463  TaskGroup *group;
464 
465  for (TaskGroupDb::iterator iter = task_group_db_.begin();
466  iter != task_group_db_.end(); ++iter) {
467  if ((group = *iter) == NULL) {
468  continue;
469  }
470  *iter = NULL;
471  delete group;
472  }
473 
474  for (TaskIdMap::iterator loc = id_map_.begin(); loc != id_map_.end();
475  id_map_.erase(loc++)) {
476  }
477 
478  delete stop_entry_;
479  stop_entry_ = NULL;
480  task_group_db_.clear();
481 
482  return;
483 }
484 
485 void TaskScheduler::Initialize(uint32_t thread_count, EventManager *evm) {
486  assert(singleton_.get() == NULL);
487  singleton_.reset(new TaskScheduler((int)thread_count));
488 
489  if (evm) {
490  singleton_.get()->evm_ = evm;
491  singleton_.get()->tbb_awake_task_ = new TaskTbbKeepAwake();
492  assert(singleton_.get()->tbb_awake_task_);
493 
494  singleton_.get()->tbb_awake_task_->StartTbbKeepAwakeTask(
495  singleton_.get(), evm,
496  "TaskScheduler::TbbKeepAwake");
497  }
498 }
499 
501  assert(evm);
502  evm_ = evm;
503  if (tbb_awake_task_ == NULL) {
505  assert(tbb_awake_task_);
506 
508  "TaskScheduler::TbbKeepAwake");
509  }
510 }
511 
513  if (tbb_awake_task_) {
515  }
516 }
517 
519  uint64_t tbb_keepawake_time_msec,
520  uint64_t inactivity_time_msec,
521  uint64_t poll_interval_msec) {
522  if (task_monitor_ != NULL)
523  return;
524 
525  task_monitor_ = new TaskMonitor(this, tbb_keepawake_time_msec,
526  inactivity_time_msec, poll_interval_msec);
528 }
529 
530 void TaskScheduler::Log(const char *file_name, uint32_t line_no,
531  const Task *task, const char *description,
532  uint64_t delay) {
533  if (log_fn_.empty() == false) {
534  log_fn_(file_name, line_no, task, description, delay);
535  }
536 }
537 
539  log_fn_ = fn;
540 }
541 
543  if (task->schedule_delay() > schedule_delay_)
544  return task->schedule_delay();
545  return schedule_delay_;
546 }
547 
549  if (task->execute_delay() > execute_delay_)
550  return task->execute_delay();
551  return execute_delay_;
552 }
553 
555  if (singleton_.get() == NULL) {
556  singleton_.reset(new TaskScheduler());
557  }
558  return singleton_.get();
559 }
560 
562  assert(task_id >= 0);
563  int size = task_group_db_.size();
564  if (size <= task_id) {
566  }
567 
568  TaskGroup *group = task_group_db_[task_id];
569  if (group == NULL) {
570  group = new TaskGroup(task_id);
571  task_group_db_[task_id] = group;
572  }
573 
574  return group;
575 }
576 
578  return task_group_db_[task_id];
579 }
580 
581 bool TaskScheduler::IsTaskGroupEmpty(int task_id) const {
582  CHECK_CONCURRENCY("bgp::Config");
583  tbb::mutex::scoped_lock lock(mutex_);
584  TaskGroup *group = task_group_db_[task_id];
585  assert(group);
586  assert(group->TaskRunCount() == 0);
587  return group->IsWaitQEmpty();
588 }
589 
590 TaskEntry *TaskScheduler::GetTaskEntry(int task_id, int task_instance) {
591  TaskGroup *group = GetTaskGroup(task_id);
592  return group->GetTaskEntry(task_instance);
593 }
594 
595 TaskEntry *TaskScheduler::QueryTaskEntry(int task_id, int task_instance) {
596  TaskGroup *group = QueryTaskGroup(task_id);
597  if (group == NULL)
598  return NULL;
599  return group->QueryTaskEntry(task_instance);
600 }
601 
603  uint32_t schedule) {
604  execute_delay_ = execute;
605  schedule_delay_ = schedule;
607 }
608 
609 void TaskScheduler::SetLatencyThreshold(const std::string &name,
610  uint32_t execute, uint32_t schedule) {
611  int task_id = GetTaskId(name);
612  TaskGroup *group = GetTaskGroup(task_id);
613  group->execute_delay_ = execute;
614  group->schedule_delay_ = schedule;
615 }
616 
617 void TaskScheduler::SetPolicy(int task_id, TaskPolicy &policy) {
618  tbb::mutex::scoped_lock lock(mutex_);
619 
620  TaskGroup *group = GetTaskGroup(task_id);
621  TaskEntry *group_entry = group->GetTaskEntry(-1);
622  group->PolicySet();
623 
624  for (const auto& pol_item: policy) {
625  if (pol_item.match_data_id == -1) {
626  TaskGroup *policy_group = GetTaskGroup(pol_item.match_code_id);
627  group->AddPolicy(policy_group);
628  policy_group->AddPolicy(group);
629  } else {
630  TaskEntry *entry = GetTaskEntry(task_id, pol_item.match_data_id);
631  TaskEntry *policy_entry = GetTaskEntry(pol_item.match_code_id,
632  pol_item.match_data_id);
633  entry->AddPolicy(policy_entry);
634  policy_entry->AddPolicy(entry);
635 
636  group_entry->AddPolicy(policy_entry);
637  policy_entry->AddPolicy(group_entry);
638  }
639  }
640 }
641 
643  tbb::mutex::scoped_lock lock(mutex_);
644 
645  EnqueueUnLocked(t);
646 }
647 
649  if (measure_delay_) {
651  }
652  // Ensure that task is enqueued only once.
653  assert(t->seqno() == 0);
654  enqueue_count_++;
655  t->seqno(++seqno_);
656  TaskGroup *group = GetTaskGroup(t->task_code_id());
657  t->schedule_delay_ = group->schedule_delay_;
658  t->execute_delay_ = group->execute_delay_;
659  group->stats_.enqueue_count_++;
660 
661  TaskEntry *entry = GetTaskEntry(t->task_code_id(), t->task_data_id());
662  entry->stats_.enqueue_count_++;
663  // If either TaskGroup or TaskEntry is disabled for Unit-Test purposes,
664  // enqueue new task in waitq and update TaskGroup if needed.
665  if (group->IsDisabled() || entry->IsDisabled()) {
666  entry->AddToWaitQ(t);
667  if (group->IsDisabled()) {
668  group->AddToDisableQ(entry);
669  }
670  return;
671  }
672 
673  // Add task to waitq_ if its already populated
674  if (entry->WaitQSize() != 0) {
675  entry->AddToWaitQ(t);
676  return;
677  }
678 
679  // Is scheduler stopped? Dont add task to deferq_ if scheduler is stopped.
680  // TaskScheduler::Start() will run tasks from waitq_
681  if (running_ == false) {
682  entry->AddToWaitQ(t);
683  stop_entry_->AddToDeferQ(entry);
684  return;
685  }
686 
687  // Check Task Group policy. On policy violation, DeferOnPolicyFail()
688  // adds the Task to the TaskEntry's waitq_ and the TaskEntry will be
689  // added to deferq_ of the matching TaskGroup.
690  if (group->DeferOnPolicyFail(entry, t)) {
691  return;
692  }
693 
694  // Check Task Entry policy. On policy violation, DeferOnPolicyFail()
695  // adds the Task to the TaskEntry's waitq_ and the TaskEntry will be
696  // added to deferq_ of the matching TaskEntry.
697  if (entry->DeferOnPolicyFail(t)) {
698  return;
699  }
700 
701  entry->RunTask(t);
702  return;
703 }
704 
706  tbb::mutex::scoped_lock lock(mutex_);
707 
708  // If the task is in RUN state, mark the task for cancellation and return.
709  if (t->state_ == Task::RUN) {
710  t->task_cancel_ = true;
711  } else if (t->state_ == Task::WAIT) {
712  TaskEntry *entry = QueryTaskEntry(t->task_code_id(), t->task_data_id());
713  TaskGroup *group = QueryTaskGroup(t->task_code_id());
714  assert(entry->WaitQSize());
715  // Get the first entry in the waitq_
716  Task *first_wait_task = &(*entry->waitq_.begin());
717  TaskEntry *disable_entry = group->GetDisableEntry();
718  assert(entry->DeleteFromWaitQ(t) == true);
719  // If the waitq_ is empty, then remove the TaskEntry from the deferq.
720  if (!entry->WaitQSize()) {
721  if (entry->deferq_task_group_) {
722  assert(entry->deferq_task_entry_ == NULL);
723  entry->deferq_task_group_->DeleteFromDeferQ(*entry);
724  } else if (entry->deferq_task_entry_) {
725  entry->deferq_task_entry_->DeleteFromDeferQ(*entry);
726  } else if (group->IsDisabled()) {
727  // Remove TaskEntry from deferq of disable_entry
728  disable_entry->DeleteFromDeferQ(*entry);
729  } else {
730  if (!entry->IsDisabled()) {
731  assert(0);
732  }
733  }
734  } else if (t == first_wait_task) {
735  // TaskEntry is inserted in the deferq_ based on the Task seqno.
736  // deferq_ comparison function uses the seqno of the first entry in
737  // the waitq_. Therefore, if the task to be cancelled is the first
738  // entry in the waitq_, then delete the entry from the deferq_ and
739  // add it again.
740  TaskGroup *deferq_tgroup = entry->deferq_task_group_;
741  TaskEntry *deferq_tentry = entry->deferq_task_entry_;
742  if (deferq_tgroup) {
743  assert(deferq_tentry == NULL);
744  deferq_tgroup->DeleteFromDeferQ(*entry);
745  deferq_tgroup->AddToDeferQ(entry);
746  } else if (deferq_tentry) {
747  deferq_tentry->DeleteFromDeferQ(*entry);
748  deferq_tentry->AddToDeferQ(entry);
749  } else if (group->IsDisabled()) {
750  // Remove TaskEntry from deferq of disable_entry and add back
751  disable_entry->DeleteFromDeferQ(*entry);
752  disable_entry->AddToDeferQ(entry);
753  } else {
754  if (!entry->IsDisabled()) {
755  assert(0);
756  }
757  }
758  }
759  delete t;
760  cancel_count_++;
761  return CANCELLED;
762  } else {
763  return FAILED;
764  }
765  return QUEUED;
766 }
767 
769  tbb::mutex::scoped_lock lock(mutex_);
770  done_count_++;
771 
773  TaskEntry *entry = QueryTaskEntry(t->task_code_id(), t->task_data_id());
774  entry->TaskExited(t, GetTaskGroup(t->task_code_id()));
775 
776  //
777  // Delete the task it is not marked for recycling or already cancelled.
778  //
779  if ((t->task_recycle_ == false) || (t->task_cancel_ == true)) {
780  // Delete the container Task object, if the
781  // task is not marked to be recycled (or)
782  // if the task is marked for cancellation
783  if (t->task_cancel_ == true) {
784  t->OnTaskCancel();
785  }
786  delete t;
787  return;
788  }
789 
790  // Task is being recycled, reset the state, seq_no and TBB task handle
791  t->task_impl_ = NULL;
792  t->seqno(0);
793  t->state(Task::INIT);
795  EnqueueUnLocked(t);
796 }
797 
799  tbb::mutex::scoped_lock lock(mutex_);
800 
801  running_ = false;
802 }
803 
805  tbb::mutex::scoped_lock lock(mutex_);
806 
807  running_ = true;
808 
809  // Run all tasks that may be suspended
811  return;
812 }
813 
815  for (TaskGroupDb::iterator iter = task_group_db_.begin();
816  iter != task_group_db_.end(); ++iter) {
817  TaskGroup *group = *iter;
818  if (group == NULL) {
819  continue;
820  }
821 
822  cout << "id: " << group->task_id() <<
823  " run: " << group->TaskRunCount() << endl;
824  cout << "deferq: " << group->deferq_size() <<
825  " task count: " << group->num_tasks() << endl;
826  }
827 }
828 
829 bool TaskScheduler::IsEmpty(bool running_only) {
830  TaskGroup *group;
831 
832  tbb::mutex::scoped_lock lock(mutex_);
833 
834  for (TaskGroupDb::iterator it = task_group_db_.begin();
835  it != task_group_db_.end(); ++it) {
836  if ((group = *it) == NULL) {
837  continue;
838  }
839  if (group->TaskRunCount()) {
840  return false;
841  }
842  if (group->IsDisabled()) {
843  continue;
844  }
845  if ((false == running_only) && (false == group->IsWaitQEmpty())) {
846  return false;
847  }
848  }
849 
850  return true;
851 }
852 std::string TaskScheduler::GetTaskName(int task_id) const {
853  for (TaskIdMap::const_iterator it = id_map_.begin(); it != id_map_.end();
854  it++) {
855  if (task_id == it->second)
856  return it->first;
857  }
858 
859  return "ERROR";
860 }
861 
862 int TaskScheduler::GetTaskId(const string &name) {
863  {
864  // Grab read-only lock first. Most of the time, task-id already exists
865  // in the id_map_. Hence there should not be any contention for lock
866  // aquisition.
867  tbb::reader_writer_lock::scoped_lock_read lock(id_map_mutex_);
868  TaskIdMap::iterator loc = id_map_.find(name);
869  if (loc != id_map_.end()) {
870  return loc->second;
871  }
872  }
873 
874  // Grab read-write lock to allocate a new task id and insert into the map.
875  tbb::reader_writer_lock::scoped_lock lock(id_map_mutex_);
876  int tid = ++id_max_;
877  id_map_.insert(make_pair(name, tid));
878  return tid;
879 }
880 
882  TaskGroup *group = GetTaskGroup(task_id);
883  if (group == NULL)
884  return;
885 
886  group->ClearTaskGroupStats();
887 }
888 
889 void TaskScheduler::ClearTaskStats(int task_id) {
890  TaskGroup *group = GetTaskGroup(task_id);
891  if (group == NULL)
892  return;
893 
894  group->ClearTaskStats();
895 }
896 
897 void TaskScheduler::ClearTaskStats(int task_id, int instance_id) {
898  TaskGroup *group = GetTaskGroup(task_id);
899  if (group == NULL)
900  return;
901 
902  group->ClearTaskStats(instance_id);
903 }
904 
906  TaskGroup *group = GetTaskGroup(task_id);
907  if (group == NULL)
908  return NULL;
909 
910  return group->GetTaskGroupStats();
911 }
912 
914  TaskGroup *group = GetTaskGroup(task_id);
915  if (group == NULL)
916  return NULL;
917 
918  return group->GetTaskStats();
919 }
920 
921 TaskStats *TaskScheduler::GetTaskStats(int task_id, int instance_id) {
922  TaskGroup *group = GetTaskGroup(task_id);
923  if (group == NULL)
924  return NULL;
925 
926  return group->GetTaskStats(instance_id);
927 }
928 
930  int threads;
931  threads = 0;
932 
933 #if defined(__linux__)
934  std::ostringstream file_name;
935  std::string line;
936 
937  file_name << "/proc/" << pid << "/status";
938 
939  std::ifstream file(file_name.str().c_str());
940 
941  if(!file) {
942  LOG(ERROR, "opening /proc failed");
943  return -1;
944  }
945 
946  while (threads == 0 && file.good()) {
947  getline(file, line);
948  if (line == "Threads:\t1") threads = 1;
949  }
950  file.close();
951 #else
952 #error "TaskScheduler::CountThreadsPerPid() - unsupported platform."
953 #endif
954 
955  return threads;
956 }
957 
959  //
960  // Wait for a bit to give a chance for all the threads to exit
961  //
962  usleep(1000);
963 
964  int count = 0;
965  int threadsRunning;
966  pid_t pid = getpid();
967 
968  while (count++ < 12000) {
969  threadsRunning = CountThreadsPerPid(pid);
970 
971  if (threadsRunning == 1)
972  break;
973 
974  if (threadsRunning == -1) {
975  LOG(ERROR, "could not check if any thread is running");
976  usleep(10000);
977  break;
978  }
979 
980  usleep(10000);
981  }
982 }
983 
985  if (task_monitor_) {
987  delete task_monitor_;
988  task_monitor_ = NULL;
989  }
990 
991  for (int i = 0; i < 10000; i++) {
992  if (IsEmpty()) break;
993  usleep(1000);
994  }
995  assert(IsEmpty());
996  if (tbb_awake_task_) {
998  delete tbb_awake_task_;
999  tbb_awake_task_ = NULL;
1000  }
1001  evm_ = NULL;
1002  singleton_->task_scheduler_.terminate();
1004  singleton_.reset(NULL);
1005 }
1006 
1008  TaskInfo::reference running = task_running.local();
1009  running = unit_test;
1010 }
1011 
1013  TaskInfo::reference running = task_running.local();
1014  running = NULL;
1015 }
1016 
1018  ThreadAmpFactor_ = n;
1019 }
1020 
1022  TaskGroup *group = GetTaskGroup(task_id);
1023  if (!group->IsDisabled()) {
1024  // Add TaskEntries(that contain enqueued tasks) which are already
1025  // disabled to disable_ entry maintained at TaskGroup.
1026  group->SetDisable(true);
1027  group->AddEntriesToDisableQ();
1028  }
1029 }
1030 
1032  TaskGroup *group = GetTaskGroup(task_id);
1033  group->SetDisable(false);
1034  // Run tasks that maybe suspended
1035  group->RunDisableEntries();
1036 }
1037 
1038 void TaskScheduler::DisableTaskEntry(int task_id, int instance_id) {
1039  TaskEntry *entry = GetTaskEntry(task_id, instance_id);
1040  entry->SetDisable(true);
1041 }
1042 
1043 void TaskScheduler::EnableTaskEntry(int task_id, int instance_id) {
1044  TaskEntry *entry = GetTaskEntry(task_id, instance_id);
1045  entry->SetDisable(false);
1046  TaskGroup *group = GetTaskGroup(task_id);
1047  // If group is still disabled, do not schedule the task. Task will be
1048  // scheduled for run when TaskGroup is enabled.
1049  if (group->IsDisabled()) {
1050  return;
1051  }
1052  // Run task instances that maybe suspended
1053  if (entry->WaitQSize() != 0) {
1054  entry->RunDeferEntry();
1055  }
1056 }
1057 
1059 // Implementation for class TaskGroup
1061 
1062 TaskGroup::TaskGroup(int task_id) : task_code_id_(task_id), policy_set_(false),
1063  run_count_(0), execute_delay_(0), schedule_delay_(0), disable_(false) {
1064  total_run_time_ = 0;
1066  task_entry_ = new TaskEntry(task_id);
1067  memset(&stats_, 0, sizeof(stats_));
1069 }
1070 
1072  policy_.clear();
1073  deferq_.clear();
1074 
1075  delete task_entry_;
1076  task_entry_ = NULL;
1077 
1078  for (size_t i = 0; i < task_entry_db_.size(); i++) {
1079  if (task_entry_db_[i] != NULL) {
1080  delete task_entry_db_[i];
1081  task_entry_db_[i] = NULL;
1082  }
1083  }
1084 
1085  delete disable_entry_;
1086  disable_entry_ = NULL;
1087  task_entry_db_.clear();
1088 }
1089 
1090 TaskEntry *TaskGroup::GetTaskEntry(int task_instance) {
1091  if (task_instance == -1)
1092  return task_entry_;
1093 
1094  int size = task_entry_db_.size();
1095  if (size <= task_instance) {
1096  task_entry_db_.resize(task_instance + TaskGroup::kVectorGrowSize);
1097  }
1098 
1099  TaskEntry *entry = task_entry_db_.at(task_instance);
1100  if (entry == NULL) {
1101  entry = new TaskEntry(task_code_id_, task_instance);
1102  task_entry_db_[task_instance] = entry;
1103  }
1104 
1105  return entry;
1106 }
1107 
1108 TaskEntry *TaskGroup::QueryTaskEntry(int task_instance) const {
1109  if (task_instance == -1) {
1110  return task_entry_;
1111  }
1112 
1113  if (task_instance >= (int)task_entry_db_.size())
1114  return NULL;
1115 
1116  return task_entry_db_[task_instance];
1117 }
1118 
1120  policy_.push_back(group);
1121 }
1122 
1124  for (TaskGroupPolicyList::iterator it = policy_.begin();
1125  it != policy_.end(); ++it) {
1126  if ((*it)->run_count_ != 0) {
1127  return (*it);
1128  }
1129  }
1130  return NULL;
1131 }
1132 
1134  TaskGroup *group;
1135  if ((group = ActiveGroupInPolicy()) != NULL) {
1136  // TaskEntry is inserted in the deferq_ based on the Task seqno.
1137  // deferq_ comparison function uses the seqno of the first Task queued
1138  // in the waitq_. Therefore, add the Task to waitq_ before adding
1139  // TaskEntry in the deferq_.
1140  if (0 == entry->WaitQSize()) {
1141  entry->AddToWaitQ(task);
1142  }
1143  group->AddToDeferQ(entry);
1144  return true;
1145  }
1146  return false;
1147 }
1148 
1150  stats_.defer_count_++;
1151  deferq_.insert(*entry);
1152  assert(entry->deferq_task_group_ == NULL);
1153  entry->deferq_task_group_ = this;
1154 }
1155 
1157  assert(this == entry.deferq_task_group_);
1158  deferq_.erase(deferq_.iterator_to(entry));
1159  entry.deferq_task_group_ = NULL;
1160 }
1161 
1163  disable_entry_->AddToDeferQ(entry);
1164 }
1165 
1167  assert(policy_set_ == false);
1168  policy_set_ = true;
1169 }
1170 
1172  TaskDeferList::iterator it;
1173 
1174  it = deferq_.begin();
1175  while (it != deferq_.end()) {
1176  TaskEntry &entry = *it;
1177  TaskDeferList::iterator it_work = it++;
1178  DeleteFromDeferQ(*it_work);
1179  entry.RunDeferEntry();
1180  }
1181 
1182  return;
1183 }
1184 
1185 inline void TaskGroup::TaskExited(Task *t) {
1186  run_count_--;
1188 }
1189 
1192 }
1193 
1195  TaskEntry *entry;
1196  if (task_entry_->WaitQSize()) {
1198  }
1199 
1200  // Walk thru the task_entry_db_ and add if waitq is non-empty
1201  for (TaskEntryList::iterator it = task_entry_db_.begin();
1202  it != task_entry_db_.end(); ++it) {
1203  if ((entry = *it) == NULL) {
1204  continue;
1205  }
1206  if (entry->WaitQSize()) {
1207  AddToDisableQ(entry);
1208  }
1209  }
1210 }
1211 
1213  TaskEntry *entry;
1214 
1215  // Check the waitq_ of the instance -1
1216  if (task_entry_->WaitQSize()) {
1217  return false;
1218  }
1219 
1220  // Walk thru the task_entry_db_ until waitq_ of any of the task is non-zero
1221  for (TaskEntryList::iterator it = task_entry_db_.begin();
1222  it != task_entry_db_.end(); ++it) {
1223  if ((entry = *it) == NULL) {
1224  continue;
1225  }
1226  if (entry->IsDisabled()) {
1227  continue;
1228  }
1229  if (entry->WaitQSize()) {
1230  return false;
1231  }
1232  }
1233 
1234  // Well, no task has been enqueued in this task group
1235  return true;
1236 }
1237 
1239  memset(&stats_, 0, sizeof(stats_));
1240 }
1241 
1244 }
1245 
1246 void TaskGroup::ClearTaskStats(int task_instance) {
1247  TaskEntry *entry = QueryTaskEntry(task_instance);
1248  if (entry != NULL)
1249  entry->ClearTaskStats();
1250 }
1251 
1253  return &stats_;
1254 }
1255 
1257  return task_entry_->GetTaskStats();
1258 }
1259 
1260 TaskStats *TaskGroup::GetTaskStats(int task_instance) {
1261  TaskEntry *entry = QueryTaskEntry(task_instance);
1262  return entry->GetTaskStats();
1263 }
1264 
1266 // Implementation for class TaskEntry
1268 
1269 TaskEntry::TaskEntry(int task_id, int task_instance) : task_code_id_(task_id),
1270  task_data_id_(task_instance), run_count_(0), run_task_(NULL),
1271  waitq_(), deferq_task_entry_(NULL), deferq_task_group_(NULL),
1272  disable_(false) {
1273  // When a new TaskEntry is created, adds an implicit rule into policyq_ to
1274  // ensure that only one Task of an instance is run at a time
1275  if (task_instance != -1) {
1276  policyq_.push_back(this);
1277  }
1278  memset(&stats_, 0, sizeof(stats_));
1279  // allocate memory for deferq
1280  deferq_ = new TaskDeferList;
1281 }
1282 
1283 TaskEntry::TaskEntry(int task_id) : task_code_id_(task_id),
1284  task_data_id_(-1), run_count_(0), run_task_(NULL),
1285  deferq_task_entry_(NULL), deferq_task_group_(NULL), disable_(false) {
1286  memset(&stats_, 0, sizeof(stats_));
1287  // allocate memory for deferq
1288  deferq_ = new TaskDeferList;
1289 }
1290 
1292  policyq_.clear();
1293 
1294  assert(0 == deferq_->size());
1295  delete deferq_;
1296 }
1297 
1299  policyq_.push_back(entry);
1300 }
1301 
1303  for (TaskEntryList::iterator it = policyq_.begin(); it != policyq_.end();
1304  ++it) {
1305  if ((*it)->run_count_ != 0) {
1306  return (*it);
1307  }
1308  }
1309 
1310  return NULL;
1311 }
1312 
1314  TaskEntry *policy_entry;
1315 
1316  if ((policy_entry = ActiveEntryInPolicy()) != NULL) {
1317  // TaskEntry is inserted in the deferq_ based on the Task seqno.
1318  // deferq_ comparison function uses the seqno of the first Task queued
1319  // in the waitq_. Therefore, add the Task to waitq_ before adding
1320  // TaskEntry in the deferq_.
1321  if (0 == WaitQSize()) {
1322  AddToWaitQ(task);
1323  }
1324  policy_entry->AddToDeferQ(this);
1325  return true;
1326  }
1327  return false;
1328 }
1329 
1331  t->state(Task::WAIT);
1332  stats_.wait_count_++;
1333  waitq_.push_back(*t);
1334 
1336  TaskGroup *group = scheduler->GetTaskGroup(task_code_id_);
1337  group->stats_.wait_count_++;
1338 }
1339 
1341  TaskWaitQ::iterator it = waitq_.iterator_to(*t);
1342  waitq_.erase(it);
1343  return true;
1344 }
1345 
1347  stats_.defer_count_++;
1348  deferq_->insert(*entry);
1349  assert(entry->deferq_task_entry_ == NULL);
1350  entry->deferq_task_entry_ = this;
1351 }
1352 
1354  assert(this == entry.deferq_task_entry_);
1355  deferq_->erase(deferq_->iterator_to(entry));
1356  entry.deferq_task_entry_ = NULL;
1357 }
1358 
1360  stats_.run_count_++;
1361  if (t->task_data_id() != -1) {
1362  assert(run_task_ == NULL);
1363  assert (run_count_ == 0);
1364  run_task_ = t;
1365  }
1366 
1367  run_count_++;
1369  TaskGroup *group = scheduler->QueryTaskGroup(t->task_code_id());
1370  group->TaskStarted();
1371 
1372  t->StartTask(scheduler);
1373 }
1374 
1376  if (waitq_.size() == 0)
1377  return;
1378 
1379  TaskWaitQ::iterator it = waitq_.begin();
1380 
1381  if (task_data_id_ != -1) {
1382  Task *t = &(*it);
1383  DeleteFromWaitQ(t);
1384  RunTask(t);
1385  // If there are more tasks in waitq_, put them in deferq_
1386  if (waitq_.size() != 0) {
1387  AddToDeferQ(this);
1388  }
1389  } else {
1390  // Run all instances in waitq_
1391  while (it != waitq_.end()) {
1392  Task *t = &(*it);
1393  DeleteFromWaitQ(t);
1394  RunTask(t);
1395  if (waitq_.size() == 0)
1396  break;
1397  it = waitq_.begin();
1398  }
1399  }
1400 }
1401 
1404  TaskGroup *group = scheduler->GetTaskGroup(task_code_id_);
1405 
1406  // Sanity check
1407  assert(waitq_.size());
1408  Task *task = &(*waitq_.begin());
1409 
1410  // Check Task group policies
1411  if (group->DeferOnPolicyFail(this, task)) {
1412  return;
1413  }
1414 
1415  // Check Task entry policies
1416  if (DeferOnPolicyFail(task)) {
1417  return;
1418  }
1419 
1420  RunWaitQ();
1421  return;
1422 }
1423 
1425  TaskDeferList::iterator it;
1426 
1427  it = deferq_->begin();
1428  while (it != deferq_->end()) {
1429  TaskEntry &entry = *it;
1430  TaskDeferList::iterator it_work = it++;
1431  DeleteFromDeferQ(*it_work);
1432  entry.RunDeferEntry();
1433  }
1434 
1435  return;
1436 }
1437 
1439  TaskDeferList::iterator it;
1440 
1441  it = deferq_->begin();
1442  while (it != deferq_->end()) {
1443  TaskEntry &entry = *it;
1444  TaskDeferList::iterator it_work = it++;
1445  DeleteFromDeferQ(*it_work);
1446  if (!entry.IsDisabled()) {
1447  entry.RunDeferEntry();
1448  }
1449  }
1450 
1451  return;
1452 }
1453 
1456  TaskGroup *group = scheduler->QueryTaskGroup(task_code_id_);
1457  TaskDeferEntryCmp defer_entry_compare;
1458 
1459  TaskDeferList::iterator group_it = group->deferq_.begin();
1460  TaskDeferList::iterator entry_it = deferq_->begin();
1461 
1462  // Loop thru the deferq_ of TaskEntry and TaskGroup in the temporal order.
1463  // Exit the loop when any of the queues become empty.
1464  while ((group_it != group->deferq_.end()) &&
1465  (entry_it != deferq_->end())) {
1466  TaskEntry &g_entry = *group_it;
1467  TaskEntry &t_entry = *entry_it;
1468 
1469  if (defer_entry_compare(g_entry, t_entry)) {
1470  TaskDeferList::iterator group_it_work = group_it++;
1471  group->DeleteFromDeferQ(*group_it_work);
1472  g_entry.RunDeferEntry();
1473  } else {
1474  TaskDeferList::iterator entry_it_work = entry_it++;
1475  DeleteFromDeferQ(*entry_it_work);
1476  t_entry.RunDeferEntry();
1477  }
1478  }
1479 
1480  // Now, walk thru the non-empty deferq_
1481  if (group_it != group->deferq_.end()) {
1482  group->RunDeferQ();
1483  } else if (entry_it != deferq_->end()) {
1484  RunDeferQ();
1485  }
1486 }
1487 
1489  if (task_data_id_ != -1) {
1490  assert(run_task_ == t);
1491  run_task_ = NULL;
1492  assert(run_count_ == 1);
1493  }
1494 
1495  run_count_--;
1498  group->TaskExited(t);
1499 
1500  if (!group->run_count_ && !run_count_) {
1502  } else if (!group->run_count_) {
1503  group->RunDeferQ();
1504  } else if (!run_count_) {
1505  RunDeferQ();
1506  }
1507 }
1508 
1510  deferq_->clear();
1511  policyq_.clear();
1512  waitq_.clear();
1513 }
1514 
1516  memset(&stats_, 0, sizeof(stats_));
1517 }
1518 
1520  return &stats_;
1521 }
1522 
1523 boost::optional<uint64_t> TaskEntry::GetTaskDeferEntrySeqno() const {
1524  if(waitq_.size()) {
1525  const Task *task = &(*waitq_.begin());
1526  return task->seqno();
1527  }
1528 
1529  return boost::none;
1530 }
1531 
1533 // Implementation for class Task
1535 Task::Task(int task_id, int task_instance) : task_code_id_(task_id),
1536  task_data_id_(task_instance), task_impl_(NULL), state_(INIT),
1537  tbb_state_(TBB_INIT), seqno_(0), task_recycle_(false), task_cancel_(false),
1538  enqueue_time_(0), schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
1539 }
1540 
1541 Task::Task(int task_id) : task_code_id_(task_id),
1542  task_data_id_(-1), task_impl_(NULL), state_(INIT), tbb_state_(TBB_INIT),
1543  seqno_(0), task_recycle_(false), task_cancel_(false), enqueue_time_(0),
1544  schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
1545 }
1546 
1547 
1548 void Task::StartTask(TaskScheduler *scheduler) {
1549  if (enqueue_time_ != 0) {
1551  if ((schedule_time_ - enqueue_time_) >
1552  scheduler->schedule_delay(this)) {
1553  TASK_TRACE(scheduler, this, "Schedule delay(in usec) ",
1555  }
1556  }
1557  assert(task_impl_ == NULL);
1558  state(RUN);
1560  task_impl_ = new (task::allocate_root())TaskImpl(this);
1561  if (scheduler->use_spawn()) {
1562  task::spawn(*task_impl_);
1563  } else {
1564  task::enqueue(*task_impl_);
1565  }
1566 }
1567 
1569  TaskInfo::reference running = task_running.local();
1570  return running;
1571 }
1572 
1573 ostream& operator<<(ostream& out, const Task &t) {
1574  out << "Task <" << t.task_code_id_ << "," << t.task_data_id_ << ":"
1575  << t.seqno_ << "> ";
1576  return out;
1577 }
1578 
1580 // Implementation for sandesh APIs for Task
1582 void TaskEntry::GetSandeshData(SandeshTaskEntry *resp) const {
1583  resp->set_instance_id(task_data_id_);
1584  resp->set_tasks_created(stats_.enqueue_count_);
1585  resp->set_total_tasks_completed(stats_.total_tasks_completed_);
1586  resp->set_tasks_running(run_count_);
1587  resp->set_waitq_size(waitq_.size());
1588  resp->set_deferq_size(deferq_->size());
1589  resp->set_last_exit_time(stats_.last_exit_time_);
1590 }
1591 void TaskGroup::GetSandeshData(SandeshTaskGroup *resp, bool summary) const {
1592  if (total_run_time_)
1593  resp->set_total_run_time(duration_usecs_to_string(total_run_time_));
1594 
1595  std::vector<SandeshTaskEntry> list;
1596  TaskEntry *task_entry = QueryTaskEntry(-1);
1597  if (task_entry) {
1598  SandeshTaskEntry entry_resp;
1599  task_entry->GetSandeshData(&entry_resp);
1600  list.push_back(entry_resp);
1601  }
1602  for (TaskEntryList::const_iterator it = task_entry_db_.begin();
1603  it != task_entry_db_.end(); ++it) {
1604  task_entry = *it;
1605  if (task_entry) {
1606  SandeshTaskEntry entry_resp;
1607  task_entry->GetSandeshData(&entry_resp);
1608  list.push_back(entry_resp);
1609  }
1610  }
1611  resp->set_task_entry_list(list);
1612 
1613  if (summary)
1614  return;
1615 
1617  std::vector<SandeshTaskPolicyEntry> policy_list;
1618  for (TaskGroupPolicyList::const_iterator it = policy_.begin();
1619  it != policy_.end(); ++it) {
1620  SandeshTaskPolicyEntry policy_entry;
1621  policy_entry.set_task_name(scheduler->GetTaskName((*it)->task_code_id_));
1622  policy_entry.set_tasks_running((*it)->run_count_);
1623  policy_list.push_back(policy_entry);
1624  }
1625  resp->set_task_policy_list(policy_list);
1626 }
1627 
1628 void TaskScheduler::GetSandeshData(SandeshTaskScheduler *resp, bool summary) {
1629  tbb::mutex::scoped_lock lock(mutex_);
1630 
1631  resp->set_running(running_);
1632  resp->set_use_spawn(use_spawn_);
1633  resp->set_total_count(seqno_);
1634  resp->set_thread_count(hw_thread_count_);
1635 
1636  std::vector<SandeshTaskGroup> list;
1637  for (TaskIdMap::const_iterator it = id_map_.begin(); it != id_map_.end();
1638  it++) {
1639  SandeshTaskGroup resp_group;
1640  TaskGroup *group = QueryTaskGroup(it->second);
1641  resp_group.set_task_id(it->second);
1642  resp_group.set_name(it->first);
1643  if (group)
1644  group->GetSandeshData(&resp_group, summary);
1645  list.push_back(resp_group);
1646  }
1647  resp->set_task_group_list(list);
1648 }
static void GetTaskStats(TaskProfileStats *stats, int index, ProfileData *data)
A class maintaning information for every <task, instance>
Definition: task.cc:95
boost::optional< uint64_t > GetTaskDeferEntrySeqno() const
Addition/deletion of TaskEntry in the deferq_ is based on the seqno. seqno of the first Task in the w...
Definition: task.cc:1523
bool DeleteFromWaitQ(Task *t)
Definition: task.cc:1340
TaskEntryList policyq_
Policy rules for a task.
Definition: task.cc:192
void SetDisable(bool disable)
Disables this task entry.
Definition: task.cc:155
bool IsDisabled()
Definition: task.cc:156
void RunDeferQForGroupEnable()
Starts executing tasks from deferq_ of TaskEntries which are enabled.
Definition: task.cc:1438
int task_code_id() const
Returns the code ID of this task entry.
Definition: task.cc:146
int task_data_id_
Definition: task.cc:180
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskEntry.
Definition: task.cc:1424
void AddPolicy(TaskEntry *entry)
Definition: task.cc:1298
TaskWaitQ waitq_
Tasks waiting to run on some condition.
Definition: task.cc:189
TaskEntry * ActiveEntryInPolicy()
Definition: task.cc:1302
TaskDeferList * deferq_
Tasks deferred for this to exit.
Definition: task.cc:195
int task_data_id() const
Returns the data ID of this task entry.
Definition: task.cc:149
TaskStats * GetTaskStats()
Definition: task.cc:1519
DISALLOW_COPY_AND_ASSIGN(TaskEntry)
void AddToWaitQ(Task *t)
Definition: task.cc:1330
boost::intrusive::list< Task, WaitQHook > TaskWaitQ
Definition: task.cc:166
void GetSandeshData(SandeshTaskEntry *resp) const
Definition: task.cc:1582
void ClearQueues()
Definition: task.cc:1509
Task * run_task_
Task currently running.
Definition: task.cc:186
void TaskExited(Task *t, TaskGroup *group)
Definition: task.cc:1488
TaskGroup * deferq_task_group_
Definition: task.cc:197
int run_count_
No. of tasks running.
Definition: task.cc:183
bool DeferOnPolicyFail(Task *t)
Definition: task.cc:1313
TaskStats stats_
Cummulative Maintenance stats.
Definition: task.cc:201
void RunDeferEntry()
Definition: task.cc:1402
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>, &TaskEntry::task_defer_node > TaskDeferListOption
Definition: task.cc:171
void ClearTaskStats()
Definition: task.cc:1515
bool disable_
Definition: task.cc:198
TaskEntry(int task_id)
Definition: task.cc:1283
boost::intrusive::member_hook< Task, boost::intrusive::list_member_hook<>, &Task::waitq_hook_ > WaitQHook
List of Task's in waitq_.
Definition: task.cc:165
void DeleteFromDeferQ(TaskEntry &entry)
Deletes a task from deferq_.
Definition: task.cc:1353
void RunWaitQ()
Definition: task.cc:1375
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
Definition: task.cc:177
TaskEntry * deferq_task_entry_
Definition: task.cc:196
void RunCombinedDeferQ()
Starts executing tasks from deferq_ of TaskEntry and TaskGroup in the temporal order.
Definition: task.cc:1454
void AddToDeferQ(TaskEntry *entry)
Adds a task to deferq_. Only one task of a given instance goes into deferq_ for its policies.
Definition: task.cc:1346
void RunTask(Task *t)
Starts a task. If there are more entries in waitq_ add them to deferq_.
Definition: task.cc:1359
~TaskEntry()
Definition: task.cc:1291
size_t WaitQSize() const
Definition: task.cc:102
boost::intrusive::set_member_hook task_defer_node
Definition: task.cc:168
int task_code_id_
Definition: task.cc:179
int GetRunCount() const
Returns the count of runs for this task entry.
Definition: task.cc:152
TaskGroup maintains per <task-id> information including,.
Definition: task.cc:229
int task_code_id_
Definition: task.cc:317
uint32_t execute_delay_
Definition: task.cc:340
DISALLOW_COPY_AND_ASSIGN(TaskGroup)
void SetDisable(bool disable)
Definition: task.cc:283
void IncrementTotalRunTime(int64_t rtime)
Definition: task.cc:276
void ClearTaskStats()
Definition: task.cc:1242
bool disable_
Definition: task.cc:342
friend class TaskEntry
Definition: task.cc:301
void ClearTaskGroupStats()
Definition: task.cc:1238
int task_id() const
Definition: task.cc:287
bool DeferOnPolicyFail(TaskEntry *entry, Task *t)
Definition: task.cc:1133
void RunDisableEntries()
Run tasks that maybe suspended. Schedule tasks only for TaskEntries which are enabled.
Definition: task.cc:1190
void AddPolicy(TaskGroup *group)
Definition: task.cc:1119
TaskEntry * disable_entry_
Task entry for disabled group.
Definition: task.cc:336
size_t num_tasks() const
Definition: task.cc:289
int TaskRunCount() const
Definition: task.cc:265
tbb::atomic< uint64_t > total_run_time_
Definition: task.cc:324
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskGroup.
Definition: task.cc:1171
void TaskExited(Task *t)
Definition: task.cc:1185
void AddToDisableQ(TaskEntry *entry)
Enqueue TaskEntry in disable_entry's deferQ.
Definition: task.cc:1162
void DeleteFromDeferQ(TaskEntry &entry)
Delete task from deferq_.
Definition: task.cc:1156
static const int kVectorGrowSize
Definition: task.cc:316
uint32_t schedule_delay_
Definition: task.cc:341
bool IsWaitQEmpty()
Returns true, if the waiq_ of all the tasks in the group are empty.
Definition: task.cc:1212
void AddEntriesToDisableQ()
Add TaskEntries to disable_entry_ which have tasks enqueued and are already disabled.
Definition: task.cc:1194
bool IsDisabled()
Definition: task.cc:284
void GetSandeshData(SandeshTaskGroup *resp, bool summary) const
Definition: task.cc:1591
void AddToDeferQ(TaskEntry *entry)
Add task to deferq_ Only one task of a given instance goes into deferq_ for its policies.
Definition: task.cc:1149
std::vector< TaskGroup * > TaskGroupPolicyList
Vector of Task Group policies.
Definition: task.cc:305
TaskEntry * GetDisableEntry()
Definition: task.cc:249
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>, &TaskEntry::task_defer_node > TaskDeferListOption
Definition: task.cc:308
~TaskGroup()
Definition: task.cc:1071
TaskStats * GetTaskStats()
Definition: task.cc:1256
TaskDeferList deferq_
Tasks deferred till run_count_ is 0.
Definition: task.cc:330
bool policy_set_
Specifies if policy is already set.
Definition: task.cc:320
TaskEntry * GetTaskEntry(int task_instance)
Definition: task.cc:1090
TaskEntryList task_entry_db_
task-entries in this group
Definition: task.cc:339
TaskEntry * task_entry_
Tasks deferred till run_count_ is 0.
Definition: task.cc:333
TaskStats stats_
Definition: task.cc:344
TaskGroup(int task_id)
Definition: task.cc:1062
int run_count_
No. of tasks running in the group.
Definition: task.cc:323
TaskEntry * QueryTaskEntry(int task_instance) const
Definition: task.cc:1108
size_t deferq_size() const
Definition: task.cc:288
void TaskStarted()
Definition: task.cc:275
TaskGroupPolicyList policy_
Policy rules for the group.
Definition: task.cc:327
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
Definition: task.cc:314
void PolicySet()
Definition: task.cc:1166
TaskGroup * ActiveGroupInPolicy()
Definition: task.cc:1123
TaskStats * GetTaskGroupStats()
Definition: task.cc:1252
A private class used to implement tbb::task An object is created when task is ready for execution and...
Definition: task.cc:47
DISALLOW_COPY_AND_ASSIGN(TaskImpl)
virtual ~TaskImpl()
Destructor is called when a task execution is compeleted. Invoked implicitly by tbb::task....
Definition: task.cc:407
TaskImpl(Task *t)
Definition: task.cc:49
tbb::task * execute()
Method called from tbb::task to execute. Invoke Run() method of client. Supports task continuation wh...
Definition: task.cc:352
Task * parent_
Definition: task.cc:63
void Terminate()
Definition: task_monitor.cc:59
void Start(EventManager *evm)
Definition: task_monitor.cc:49
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:302
void EnqueueUnLocked(Task *task)
Definition: task.cc:648
uint64_t enqueue_count_
Definition: task.h:511
uint64_t done_count_
Definition: task.h:512
bool IsTaskGroupEmpty(int task_id) const
Check if there are any Tasks in the given TaskGroup. Assumes that all task ids are mutually exclusive...
Definition: task.cc:581
void Stop()
Stops scheduling of all tasks.
Definition: task.cc:798
bool measure_delay_
Definition: task.h:503
TaskTbbKeepAwake * tbb_awake_task_
Definition: task.h:520
TaskEntry * stop_entry_
Definition: task.h:487
tbb::reader_writer_lock id_map_mutex_
Definition: task.h:495
uint64_t cancel_count_
Definition: task.h:513
int hw_thread_count_
Definition: task.h:500
static boost::scoped_ptr< TaskScheduler > singleton_
Definition: task.h:460
TaskIdMap id_map_
Definition: task.h:496
void Terminate()
Definition: task.cc:984
void EnableTaskGroup(int task_id)
Definition: task.cc:1031
int GetTaskId(const std::string &name)
Definition: task.cc:862
static int GetThreadCount(int thread_count=0)
Get number of tbb worker threads. For testing purposes only. Limit the number of tbb worker threads.
Definition: task.cc:414
CancelReturnCode
Definition: task.h:329
@ CANCELLED
Definition: task.h:330
tbb::task_scheduler_init task_scheduler_
Definition: task.h:489
void Log(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)
Definition: task.cc:530
TaskGroup * QueryTaskGroup(int task_id)
Query TaskGroup for a task_id.Assumes valid entry is present for task_id.
Definition: task.cc:577
LogFn log_fn_
Definition: task.h:499
static void SetThreadAmpFactor(int n)
following function allows one to increase max num of threads used by TBB
Definition: task.cc:1017
boost::function< void(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)> LogFn
Definition: task.h:306
TaskMonitor * task_monitor_
Definition: task.h:521
void GetSandeshData(SandeshTaskScheduler *resp, bool summary)
Definition: task.cc:1628
int CountThreadsPerPid(pid_t pid)
Platfrom-dependent subroutine in Linux and FreeBSD implementations, used only in TaskScheduler::WaitF...
Definition: task.cc:929
TaskScheduler(int thread_count=0)
TaskScheduler constructor. TBB assumes it can use the "thread" invoking tbb::scheduler can be used fo...
Definition: task.cc:451
void WaitForTerminateCompletion()
Definition: task.cc:958
void DisableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1038
void DisableTaskGroup(int task_id)
Definition: task.cc:1021
uint32_t execute_delay_
Log if time taken to execute exceeds the delay.
Definition: task.h:509
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
Definition: task.cc:512
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static int GetDefaultThreadCount()
Definition: task.cc:436
TaskStats * GetTaskGroupStats(int task_id)
Definition: task.cc:905
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec, uint64_t inactivity_time_msec, uint64_t poll_interval_msec)
Enable Task monitoring.
Definition: task.cc:518
TaskGroupDb task_group_db_
Definition: task.h:493
EventManager * evm_
Definition: task.h:514
void ClearRunningTask()
Definition: task.cc:1012
~TaskScheduler()
Frees up the task_entry_db_ allocated for scheduler.
Definition: task.cc:462
static void Initialize(uint32_t thread_count=0, EventManager *evm=NULL)
Definition: task.cc:485
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
Definition: task.cc:617
std::string GetTaskName(int task_id) const
Definition: task.cc:852
tbb::mutex mutex_
Definition: task.h:490
void EnableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1043
int id_max_
Definition: task.h:497
uint64_t seqno_
Definition: task.h:492
TaskEntry * GetTaskEntry(int task_id, int instance_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:590
void SetLatencyThreshold(const std::string &name, uint32_t execute, uint32_t schedule)
Definition: task.cc:609
static TaskScheduler * GetInstance()
Definition: task.cc:554
static int ThreadAmpFactor_
following variable allows one to increase max num of threads used by TBB
Definition: task.h:518
void Start()
Starts scheduling of all tasks.
Definition: task.cc:804
TaskGroup * GetTaskGroup(int task_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:561
TaskEntry * QueryTaskEntry(int task_id, int instance_id)
Query TaskEntry for a task-id and task-instance.
Definition: task.cc:595
uint32_t schedule_delay_
Log if time between enqueue and task-execute exceeds the delay.
Definition: task.h:506
void ClearTaskStats(int task_id)
Definition: task.cc:889
TaskStats * GetTaskStats(int task_id)
Definition: task.cc:913
bool running_
Definition: task.h:491
void OnTaskExit(Task *task)
Method invoked on exit of a Task. Exit of a task can potentially start tasks in pendingq.
Definition: task.cc:768
bool track_run_time_
Definition: task.h:502
bool track_run_time() const
Definition: task.h:421
bool use_spawn_
Use spawn() to run a tbb::task instead of enqueue()
Definition: task.h:486
void RegisterLog(LogFn fn)
Definition: task.cc:538
static bool ShouldUseSpawn()
Definition: task.cc:440
void set_event_manager(EventManager *evm)
Definition: task.cc:500
uint32_t schedule_delay() const
Definition: task.h:425
void Print()
Debug print routine.
Definition: task.cc:814
uint32_t execute_delay() const
Definition: task.h:426
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Definition: task.cc:705
static const int kVectorGrowSize
Definition: task.h:459
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true,...
Definition: task.cc:829
bool use_spawn() const
Definition: task.h:448
void ClearTaskGroupStats(int task_id)
Definition: task.cc:881
void EnableLatencyThresholds(uint32_t execute, uint32_t schedule)
Enable logging of tasks exceeding configured latency.
Definition: task.cc:602
void SetRunningTask(Task *)
This function should not be called in production code. It is only for unit testing to control current...
Definition: task.cc:1007
bool StartTbbKeepAwakeTask(TaskScheduler *ts, EventManager *event_mgr, const std::string task_name, uint32_t tbbKeepawakeTimeout=1000)
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:77
static Task * Running()
Returns a pointer to the current task the code is executing under.
Definition: task.cc:1568
uint64_t seqno() const
Returns the sequence number of this task.
Definition: task.h:137
bool task_recycle_
Determines if the task must be rescheduled (reused) after its completion.
Definition: task.h:216
void StartTask(TaskScheduler *scheduler)
Starts execution of a task.
Definition: task.cc:1548
void tbb_state(TbbState s)
Sets a TBB state for the task.
Definition: task.h:181
uint64_t seqno_
Stores the sequence number.
Definition: task.h:212
State state_
Stores a state of the task.
Definition: task.h:206
friend class TaskImpl
Gives access to private members for TaskImpl class.
Definition: task.h:175
int task_data_id_
The dataset id within a code path.
Definition: task.h:199
virtual void OnTaskCancel()
Called on task exit, if it is marked for cancellation. If the user wants to do any cleanup on task ca...
Definition: task.h:123
int task_code_id() const
Returns the code ID of this task.
Definition: task.h:131
tbb::task * task_impl_
A pointer to an Intel TBB object storing low-level information to manage the task.
Definition: task.h:203
int task_data_id() const
Returns the data ID of this task.
Definition: task.h:134
Task(int task_id, int task_data_id)
Creates a new task with the given values of task code ID and task data ID.
Definition: task.cc:1535
uint64_t schedule_time_
Contains the time when the task was started.
Definition: task.h:226
@ TBB_DONE
Definition: task.h:98
@ TBB_INIT
Definition: task.h:95
@ TBB_EXEC
Definition: task.h:97
@ TBB_ENQUEUED
Definition: task.h:96
bool task_cancel_
Determines if the task's execution was canceled.
Definition: task.h:219
uint32_t execute_delay_
Sets threshold for the task's execution time. If the threshold is exceeded, the event is logged.
Definition: task.h:230
State state() const
Returns a state value of a task.
Definition: task.h:128
uint32_t schedule_delay_
Sets threshold for delay between enqueueing and execution. If the threshold is exceeded,...
Definition: task.h:234
int task_code_id_
The code path executed by the task.
Definition: task.h:196
uint64_t enqueue_time_
Contains the time when the task was enqueued for execution.
Definition: task.h:223
@ WAIT
A task is waiting in a queue.
Definition: task.h:87
@ RUN
A task is being run.
Definition: task.h:90
@ INIT
A task was initialized.
Definition: task.h:84
boost::intrusive::list_member_hook waitq_hook_
Definition: task.h:237
static EventManager evm
#define LOG(_Level, _Msg)
Definition: logging.h:33
bool unit_test()
Definition: bgp_log.cc:53
Comparison routine for the TaskDeferList.
Definition: task.cc:207
uint64_t total_tasks_completed_
Number of total tasks ran.
Definition: task.h:287
uint64_t last_exit_time_
Number of time stamp of latest exist.
Definition: task.h:290
int run_count_
Number of entries currently running.
Definition: task.h:278
int defer_count_
Number of entries in deferq.
Definition: task.h:281
int wait_count_
Number of entries in waitq.
Definition: task.h:275
uint64_t enqueue_count_
Number of tasks enqueued.
Definition: task.h:284
tbb::enumerable_thread_specific< Task * > TaskInfo
Definition: task.cc:28
#define TASK_TRACE(scheduler, task, msg, delay)
Definition: task.cc:39
std::vector< TaskEntry * > TaskEntryList
Definition: task.cc:35
ostream & operator<<(ostream &out, const Task &t)
Definition: task.cc:1573
static TaskInfo task_running
Definition: task.cc:32
std::vector< TaskExclusion > TaskPolicy
Defines a type to store an execution policy (a list of task exclusions).
Definition: task.h:268
#define CHECK_CONCURRENCY(...)
struct task_ task
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29
static const std::string duration_usecs_to_string(const uint64_t usecs)
Definition: time_util.h:62
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13