OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
task.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include <assert.h>
6 #include <fstream>
7 #include <map>
8 #include <iostream>
9 #include <boost/intrusive/set.hpp>
10 #include <boost/optional.hpp>
11 
12 #include "tbb/atomic.h"
13 #include "tbb/task.h"
14 #include "tbb/enumerable_thread_specific.h"
15 #include "base/logging.h"
16 #include "base/task.h"
17 #include "base/task_annotations.h"
18 #include "base/task_tbbkeepawake.h"
19 #include "base/task_monitor.h"
20 
21 #include <base/sandesh/task_types.h>
22 
23 using namespace std;
24 using tbb::task;
25 
27 class TaskEntry;
29 
30 typedef tbb::enumerable_thread_specific<Task *> TaskInfo;
31 
33 
34 // Vector of Task entries
35 typedef std::vector<TaskEntry *> TaskEntryList;
36 
37 boost::scoped_ptr<TaskScheduler> TaskScheduler::singleton_;
38 
39 #define TASK_TRACE(scheduler, task, msg, delay)\
40  do {\
41  scheduler->Log(__FILE__, __LINE__, task, msg, delay);\
42  } while (false)
43 
47 class TaskImpl : public tbb::task {
48 public:
49  TaskImpl(Task *t) : parent_(t) {};
50 
54  virtual ~TaskImpl();
55 
56 private:
57 
61  tbb::task *execute();
62 
64 
66 };
67 
95 class TaskEntry {
96 public:
97  TaskEntry(int task_id);
98  TaskEntry(int task_id, int task_instance);
99  ~TaskEntry();
100 
101  void AddPolicy(TaskEntry *entry);
102  size_t WaitQSize() const { return waitq_.size(); };
103  void AddToWaitQ(Task *t);
104  bool DeleteFromWaitQ(Task *t);
105 
108  void AddToDeferQ(TaskEntry *entry);
109 
111  void DeleteFromDeferQ(TaskEntry &entry);
112 
113  TaskEntry *ActiveEntryInPolicy();
114  bool DeferOnPolicyFail(Task *t);
115 
118  void RunTask(Task *t);
119 
121  void RunDeferQ();
122 
125  void RunCombinedDeferQ();
126  void RunWaitQ();
127  void RunDeferEntry();
128 
131  void RunDeferQForGroupEnable();
132 
133  void TaskExited(Task *t, TaskGroup *group);
135  void ClearTaskStats();
136  void ClearQueues();
137 
143  boost::optional<uint64_t> GetTaskDeferEntrySeqno() const;
144 
145  int GetTaskId() const { return task_id_; }
146  int GetTaskInstance() const { return task_instance_; }
147  int GetRunCount() const { return run_count_; }
148  void SetDisable(bool disable) { disable_ = disable; }
149  bool IsDisabled() { return disable_; }
150  void GetSandeshData(SandeshTaskEntry *resp) const;
151 
152 private:
153  friend class TaskGroup;
154  friend class TaskScheduler;
155 
157  typedef boost::intrusive::member_hook<Task,
158  boost::intrusive::list_member_hook<>, &Task::waitq_hook_> WaitQHook;
159  typedef boost::intrusive::list<Task, WaitQHook> TaskWaitQ;
160 
161  boost::intrusive::set_member_hook<> task_defer_node;
162  typedef boost::intrusive::member_hook<TaskEntry,
163  boost::intrusive::set_member_hook<>,
165 
169  typedef boost::intrusive::set<TaskEntry, TaskDeferListOption,
170  boost::intrusive::compare<TaskDeferEntryCmp> > TaskDeferList;
171 
172  int task_id_;
174 
177 
180 
183 
186 
191  bool disable_;
192 
195 
197 };
198 
201  bool operator() (const TaskEntry &lhs, const TaskEntry &rhs) const {
202  return (lhs.GetTaskDeferEntrySeqno() <
203  rhs.GetTaskDeferEntrySeqno());
204  }
205 };
206 
222 class TaskGroup {
223 public:
224  TaskGroup(int task_id);
225  ~TaskGroup();
226 
227  TaskEntry *QueryTaskEntry(int task_instance) const;
228  TaskEntry *GetTaskEntry(int task_instance);
229  void AddPolicy(TaskGroup *group);
230 
233  void AddToDeferQ(TaskEntry *entry);
234 
236  void AddToDisableQ(TaskEntry *entry);
237 
240  void AddEntriesToDisableQ();
241 
242  TaskEntry *GetDisableEntry() { return disable_entry_; }
243 
245  void DeleteFromDeferQ(TaskEntry &entry);
246  TaskGroup *ActiveGroupInPolicy();
247  bool DeferOnPolicyFail(TaskEntry *entry, Task *t);
248 
256  bool IsWaitQEmpty();
257 
258  int TaskRunCount() const {return run_count_;};
259 
261  void RunDeferQ();
262 
265  void RunDisableEntries();
266  void TaskExited(Task *t);
267  void PolicySet();
268  void TaskStarted() {run_count_++;};
269  void IncrementTotalRunTime(int64_t rtime) { total_run_time_ += rtime; }
270  TaskStats *GetTaskGroupStats();
272  TaskStats *GetTaskStats(int task_instance);
273  void ClearTaskGroupStats();
274  void ClearTaskStats();
275  void ClearTaskStats(int instance_id);
276  void SetDisable(bool disable) { disable_ = disable; }
277  bool IsDisabled() { return disable_; }
278  void GetSandeshData(SandeshTaskGroup *resp, bool summary) const;
279 
280  int task_id() const { return task_id_; }
281  size_t deferq_size() const { return deferq_.size(); }
282  size_t num_tasks() const {
283  size_t count = 0;
284  for (TaskEntryList::const_iterator it = task_entry_db_.begin();
285  it != task_entry_db_.end(); ++it) {
286  if (*it != NULL) {
287  count++;
288  }
289  }
290  return count;
291  }
292 
293 private:
294  friend class TaskEntry;
295  friend class TaskScheduler;
296 
298  typedef std::vector<TaskGroup *> TaskGroupPolicyList;
299  typedef boost::intrusive::member_hook<TaskEntry,
300  boost::intrusive::set_member_hook<>,
302 
306  typedef boost::intrusive::set<TaskEntry, TaskDeferListOption,
307  boost::intrusive::compare<TaskDeferEntryCmp> > TaskDeferList;
308 
309  static const int kVectorGrowSize = 16;
310  int task_id_;
311 
314 
317  tbb::atomic<uint64_t> total_run_time_;
318 
321 
324 
327 
330 
333  uint32_t execute_delay_;
334  uint32_t schedule_delay_;
335  bool disable_;
336 
339 };
340 
342 // Implementation for class TaskImpl
344 
346  TaskInfo::reference running = task_running.local();
347  running = parent_;
348  parent_->SetTbbState(Task::TBB_EXEC);
349  try {
350  uint64_t t = 0;
351  if (parent_->enqueue_time() != 0) {
352  t = ClockMonotonicUsec();
354  if ((t - parent_->enqueue_time()) >
355  scheduler->schedule_delay(parent_)) {
356  TASK_TRACE(scheduler, parent_, "TBB schedule time(in usec) ",
357  (t - parent_->enqueue_time()));
358  }
359  } else if (TaskScheduler::GetInstance()->track_run_time()) {
360  t = ClockMonotonicUsec();
361  }
362  bool is_complete = parent_->Run();
363  if (t != 0) {
364  int64_t delay = ClockMonotonicUsec() - t;
366  uint32_t execute_delay = scheduler->execute_delay(parent_);
367  if (execute_delay && delay > execute_delay) {
368  TASK_TRACE(scheduler, parent_, "Run time(in usec) ", delay);
369  }
370  if (scheduler->track_run_time()) {
371  TaskGroup *group =
372  scheduler->QueryTaskGroup(parent_->GetTaskId());
373  group->IncrementTotalRunTime(delay);
374  }
375  }
376  running = NULL;
377  if (is_complete == true) {
378  parent_->SetTaskComplete();
379  } else {
380  parent_->SetTaskRecycle();
381  }
382  } catch (std::exception &e) {
383 
384  // Store exception information statically, to easily read exception
385  // information from the core.
386  static std::string what = e.what();
387 
388  LOG(ERROR, "!!!! ERROR !!!! Task caught fatal exception: " << what
389  << " TaskImpl: " << this);
390  assert(0);
391  } catch (...) {
392  LOG(ERROR, "!!!! ERROR !!!! Task caught fatal unknown exception"
393  << " TaskImpl: " << this);
394  assert(0);
395  }
396 
397  return NULL;
398 }
399 
401  assert(parent_ != NULL);
402 
404  sched->OnTaskExit(parent_);
405 }
406 
407 int TaskScheduler::GetThreadCount(int thread_count) {
408  static bool init_;
409  static int num_cores_;
410 
411  if (init_) {
412  return num_cores_ * ThreadAmpFactor_;
413  }
414 
415  char *num_cores_str = getenv("TBB_THREAD_COUNT");
416  if (!num_cores_str) {
417  if (thread_count == 0)
418  num_cores_ = tbb::task_scheduler_init::default_num_threads();
419  else
420  num_cores_ = thread_count;
421  } else {
422  num_cores_ = strtol(num_cores_str, NULL, 0);
423  }
424 
425  init_ = true;
426  return num_cores_ * ThreadAmpFactor_;
427 }
428 
430  return tbb::task_scheduler_init::default_num_threads();
431 }
432 
434  if (getenv("TBB_USE_SPAWN"))
435  return true;
436 
437  return false;
438 }
439 
441 // Implementation for class TaskScheduler
443 
446  running_(true), seqno_(0), id_max_(0), log_fn_(), track_run_time_(false),
448  enqueue_count_(0), done_count_(0), cancel_count_(0), evm_(NULL),
449  tbb_awake_task_(NULL), task_monitor_(NULL) {
450  hw_thread_count_ = GetThreadCount(task_count);
452  stop_entry_ = new TaskEntry(-1);
453 }
454 
456  TaskGroup *group;
457 
458  for (TaskGroupDb::iterator iter = task_group_db_.begin();
459  iter != task_group_db_.end(); ++iter) {
460  if ((group = *iter) == NULL) {
461  continue;
462  }
463  *iter = NULL;
464  delete group;
465  }
466 
467  for (TaskIdMap::iterator loc = id_map_.begin(); loc != id_map_.end();
468  id_map_.erase(loc++)) {
469  }
470 
471  delete stop_entry_;
472  stop_entry_ = NULL;
473  task_group_db_.clear();
474 
475  return;
476 }
477 
478 void TaskScheduler::Initialize(uint32_t thread_count, EventManager *evm) {
479  assert(singleton_.get() == NULL);
480  singleton_.reset(new TaskScheduler((int)thread_count));
481 
482  if (evm) {
483  singleton_.get()->evm_ = evm;
484  singleton_.get()->tbb_awake_task_ = new TaskTbbKeepAwake();
485  assert(singleton_.get()->tbb_awake_task_);
486 
487  singleton_.get()->tbb_awake_task_->StartTbbKeepAwakeTask(
488  singleton_.get(), evm,
489  "TaskScheduler::TbbKeepAwake");
490  }
491 }
492 
494  assert(evm);
495  evm_ = evm;
496  if (tbb_awake_task_ == NULL) {
498  assert(tbb_awake_task_);
499 
501  "TaskScheduler::TbbKeepAwake");
502  }
503 }
504 
506  if (tbb_awake_task_) {
508  }
509 }
510 
512  uint64_t tbb_keepawake_time_msec,
513  uint64_t inactivity_time_msec,
514  uint64_t poll_interval_msec) {
515  if (task_monitor_ != NULL)
516  return;
517 
518  task_monitor_ = new TaskMonitor(this, tbb_keepawake_time_msec,
519  inactivity_time_msec, poll_interval_msec);
520  task_monitor_->Start(evm);
521 }
522 
523 void TaskScheduler::Log(const char *file_name, uint32_t line_no,
524  const Task *task, const char *description,
525  uint64_t delay) {
526  if (log_fn_.empty() == false) {
527  log_fn_(file_name, line_no, task, description, delay);
528  }
529 }
530 
532  log_fn_ = fn;
533 }
534 
536  if (task->schedule_delay() > schedule_delay_)
537  return task->schedule_delay();
538  return schedule_delay_;
539 }
540 
542  if (task->execute_delay() > execute_delay_)
543  return task->execute_delay();
544  return execute_delay_;
545 }
546 
548  if (singleton_.get() == NULL) {
549  singleton_.reset(new TaskScheduler());
550  }
551  return singleton_.get();
552 }
553 
555  assert(task_id >= 0);
556  int size = task_group_db_.size();
557  if (size <= task_id) {
559  }
560 
561  TaskGroup *group = task_group_db_[task_id];
562  if (group == NULL) {
563  group = new TaskGroup(task_id);
564  task_group_db_[task_id] = group;
565  }
566 
567  return group;
568 }
569 
571  return task_group_db_[task_id];
572 }
573 
574 bool TaskScheduler::IsTaskGroupEmpty(int task_id) const {
575  CHECK_CONCURRENCY("bgp::Config");
576  tbb::mutex::scoped_lock lock(mutex_);
577  TaskGroup *group = task_group_db_[task_id];
578  assert(group);
579  assert(group->TaskRunCount() == 0);
580  return group->IsWaitQEmpty();
581 }
582 
583 TaskEntry *TaskScheduler::GetTaskEntry(int task_id, int task_instance) {
584  TaskGroup *group = GetTaskGroup(task_id);
585  return group->GetTaskEntry(task_instance);
586 }
587 
588 TaskEntry *TaskScheduler::QueryTaskEntry(int task_id, int task_instance) {
589  TaskGroup *group = QueryTaskGroup(task_id);
590  if (group == NULL)
591  return NULL;
592  return group->QueryTaskEntry(task_instance);
593 }
594 
596  uint32_t schedule) {
597  execute_delay_ = execute;
598  schedule_delay_ = schedule;
600 }
601 
602 void TaskScheduler::SetLatencyThreshold(const std::string &name,
603  uint32_t execute, uint32_t schedule) {
604  int task_id = GetTaskId(name);
605  TaskGroup *group = GetTaskGroup(task_id);
606  group->execute_delay_ = execute;
607  group->schedule_delay_ = schedule;
608 }
609 
610 void TaskScheduler::SetPolicy(int task_id, TaskPolicy &policy) {
611  tbb::mutex::scoped_lock lock(mutex_);
612 
613  TaskGroup *group = GetTaskGroup(task_id);
614  TaskEntry *group_entry = group->GetTaskEntry(-1);
615  group->PolicySet();
616 
617  for (TaskPolicy::iterator it = policy.begin(); it != policy.end(); ++it) {
618 
619  if (it->match_instance == -1) {
620  TaskGroup *policy_group = GetTaskGroup(it->match_id);
621  group->AddPolicy(policy_group);
622  policy_group->AddPolicy(group);
623  } else {
624  TaskEntry *entry = GetTaskEntry(task_id, it->match_instance);
625  TaskEntry *policy_entry = GetTaskEntry(it->match_id,
626  it->match_instance);
627  entry->AddPolicy(policy_entry);
628  policy_entry->AddPolicy(entry);
629 
630  group_entry->AddPolicy(policy_entry);
631  policy_entry->AddPolicy(group_entry);
632  }
633  }
634 }
635 
637  tbb::mutex::scoped_lock lock(mutex_);
638 
639  EnqueueUnLocked(t);
640 }
641 
643  if (measure_delay_) {
645  }
646  // Ensure that task is enqueued only once.
647  assert(t->GetSeqno() == 0);
648  enqueue_count_++;
649  t->SetSeqNo(++seqno_);
650  TaskGroup *group = GetTaskGroup(t->GetTaskId());
651  t->schedule_delay_ = group->schedule_delay_;
652  t->execute_delay_ = group->execute_delay_;
653  group->stats_.enqueue_count_++;
654 
655  TaskEntry *entry = GetTaskEntry(t->GetTaskId(), t->GetTaskInstance());
656  entry->stats_.enqueue_count_++;
657  // If either TaskGroup or TaskEntry is disabled for Unit-Test purposes,
658  // enqueue new task in waitq and update TaskGroup if needed.
659  if (group->IsDisabled() || entry->IsDisabled()) {
660  entry->AddToWaitQ(t);
661  if (group->IsDisabled()) {
662  group->AddToDisableQ(entry);
663  }
664  return;
665  }
666 
667  // Add task to waitq_ if its already populated
668  if (entry->WaitQSize() != 0) {
669  entry->AddToWaitQ(t);
670  return;
671  }
672 
673  // Is scheduler stopped? Dont add task to deferq_ if scheduler is stopped.
674  // TaskScheduler::Start() will run tasks from waitq_
675  if (running_ == false) {
676  entry->AddToWaitQ(t);
677  stop_entry_->AddToDeferQ(entry);
678  return;
679  }
680 
681  // Check Task Group policy. On policy violation, DeferOnPolicyFail()
682  // adds the Task to the TaskEntry's waitq_ and the TaskEntry will be
683  // added to deferq_ of the matching TaskGroup.
684  if (group->DeferOnPolicyFail(entry, t)) {
685  return;
686  }
687 
688  // Check Task Entry policy. On policy violation, DeferOnPolicyFail()
689  // adds the Task to the TaskEntry's waitq_ and the TaskEntry will be
690  // added to deferq_ of the matching TaskEntry.
691  if (entry->DeferOnPolicyFail(t)) {
692  return;
693  }
694 
695  entry->RunTask(t);
696  return;
697 }
698 
700  tbb::mutex::scoped_lock lock(mutex_);
701 
702  // If the task is in RUN state, mark the task for cancellation and return.
703  if (t->state_ == Task::RUN) {
704  t->task_cancel_ = true;
705  } else if (t->state_ == Task::WAIT) {
706  TaskEntry *entry = QueryTaskEntry(t->GetTaskId(), t->GetTaskInstance());
707  TaskGroup *group = QueryTaskGroup(t->GetTaskId());
708  assert(entry->WaitQSize());
709  // Get the first entry in the waitq_
710  Task *first_wait_task = &(*entry->waitq_.begin());
711  TaskEntry *disable_entry = group->GetDisableEntry();
712  assert(entry->DeleteFromWaitQ(t) == true);
713  // If the waitq_ is empty, then remove the TaskEntry from the deferq.
714  if (!entry->WaitQSize()) {
715  if (entry->deferq_task_group_) {
716  assert(entry->deferq_task_entry_ == NULL);
717  entry->deferq_task_group_->DeleteFromDeferQ(*entry);
718  } else if (entry->deferq_task_entry_) {
719  entry->deferq_task_entry_->DeleteFromDeferQ(*entry);
720  } else if (group->IsDisabled()) {
721  // Remove TaskEntry from deferq of disable_entry
722  disable_entry->DeleteFromDeferQ(*entry);
723  } else {
724  if (!entry->IsDisabled()) {
725  assert(0);
726  }
727  }
728  } else if (t == first_wait_task) {
729  // TaskEntry is inserted in the deferq_ based on the Task seqno.
730  // deferq_ comparison function uses the seqno of the first entry in
731  // the waitq_. Therefore, if the task to be cancelled is the first
732  // entry in the waitq_, then delete the entry from the deferq_ and
733  // add it again.
734  TaskGroup *deferq_tgroup = entry->deferq_task_group_;
735  TaskEntry *deferq_tentry = entry->deferq_task_entry_;
736  if (deferq_tgroup) {
737  assert(deferq_tentry == NULL);
738  deferq_tgroup->DeleteFromDeferQ(*entry);
739  deferq_tgroup->AddToDeferQ(entry);
740  } else if (deferq_tentry) {
741  deferq_tentry->DeleteFromDeferQ(*entry);
742  deferq_tentry->AddToDeferQ(entry);
743  } else if (group->IsDisabled()) {
744  // Remove TaskEntry from deferq of disable_entry and add back
745  disable_entry->DeleteFromDeferQ(*entry);
746  disable_entry->AddToDeferQ(entry);
747  } else {
748  if (!entry->IsDisabled()) {
749  assert(0);
750  }
751  }
752  }
753  delete t;
754  cancel_count_++;
755  return CANCELLED;
756  } else {
757  return FAILED;
758  }
759  return QUEUED;
760 }
761 
763  tbb::mutex::scoped_lock lock(mutex_);
764  done_count_++;
765 
767  TaskEntry *entry = QueryTaskEntry(t->GetTaskId(), t->GetTaskInstance());
768  entry->TaskExited(t, GetTaskGroup(t->GetTaskId()));
769 
770  //
771  // Delete the task it is not marked for recycling or already cancelled.
772  //
773  if ((t->task_recycle_ == false) || (t->task_cancel_ == true)) {
774  // Delete the container Task object, if the
775  // task is not marked to be recycled (or)
776  // if the task is marked for cancellation
777  if (t->task_cancel_ == true) {
778  t->OnTaskCancel();
779  }
780  delete t;
781  return;
782  }
783 
784  // Task is being recycled, reset the state, seq_no and TBB task handle
785  t->task_impl_ = NULL;
786  t->SetSeqNo(0);
787  t->SetState(Task::INIT);
789  EnqueueUnLocked(t);
790 }
791 
793  tbb::mutex::scoped_lock lock(mutex_);
794 
795  running_ = false;
796 }
797 
799  tbb::mutex::scoped_lock lock(mutex_);
800 
801  running_ = true;
802 
803  // Run all tasks that may be suspended
805  return;
806 }
807 
809  for (TaskGroupDb::iterator iter = task_group_db_.begin();
810  iter != task_group_db_.end(); ++iter) {
811  TaskGroup *group = *iter;
812  if (group == NULL) {
813  continue;
814  }
815 
816  cout << "id: " << group->task_id() <<
817  " run: " << group->TaskRunCount() << endl;
818  cout << "deferq: " << group->deferq_size() <<
819  " task count: " << group->num_tasks() << endl;
820  }
821 }
822 
823 bool TaskScheduler::IsEmpty(bool running_only) {
824  TaskGroup *group;
825 
826  tbb::mutex::scoped_lock lock(mutex_);
827 
828  for (TaskGroupDb::iterator it = task_group_db_.begin();
829  it != task_group_db_.end(); ++it) {
830  if ((group = *it) == NULL) {
831  continue;
832  }
833  if (group->TaskRunCount()) {
834  return false;
835  }
836  if (group->IsDisabled()) {
837  continue;
838  }
839  if ((false == running_only) && (false == group->IsWaitQEmpty())) {
840  return false;
841  }
842  }
843 
844  return true;
845 }
846 std::string TaskScheduler::GetTaskName(int task_id) const {
847  for (TaskIdMap::const_iterator it = id_map_.begin(); it != id_map_.end();
848  it++) {
849  if (task_id == it->second)
850  return it->first;
851  }
852 
853  return "ERROR";
854 }
855 
856 int TaskScheduler::GetTaskId(const string &name) {
857  {
858  // Grab read-only lock first. Most of the time, task-id already exists
859  // in the id_map_. Hence there should not be any contention for lock
860  // aquisition.
861  tbb::reader_writer_lock::scoped_lock_read lock(id_map_mutex_);
862  TaskIdMap::iterator loc = id_map_.find(name);
863  if (loc != id_map_.end()) {
864  return loc->second;
865  }
866  }
867 
868  // Grab read-write lock to allocate a new task id and insert into the map.
869  tbb::reader_writer_lock::scoped_lock lock(id_map_mutex_);
870  int tid = ++id_max_;
871  id_map_.insert(make_pair(name, tid));
872  return tid;
873 }
874 
876  TaskGroup *group = GetTaskGroup(task_id);
877  if (group == NULL)
878  return;
879 
880  group->ClearTaskGroupStats();
881 }
882 
883 void TaskScheduler::ClearTaskStats(int task_id) {
884  TaskGroup *group = GetTaskGroup(task_id);
885  if (group == NULL)
886  return;
887 
888  group->ClearTaskStats();
889 }
890 
891 void TaskScheduler::ClearTaskStats(int task_id, int instance_id) {
892  TaskGroup *group = GetTaskGroup(task_id);
893  if (group == NULL)
894  return;
895 
896  group->ClearTaskStats(instance_id);
897 }
898 
900  TaskGroup *group = GetTaskGroup(task_id);
901  if (group == NULL)
902  return NULL;
903 
904  return group->GetTaskGroupStats();
905 }
906 
908  TaskGroup *group = GetTaskGroup(task_id);
909  if (group == NULL)
910  return NULL;
911 
912  return group->GetTaskStats();
913 }
914 
915 TaskStats *TaskScheduler::GetTaskStats(int task_id, int instance_id) {
916  TaskGroup *group = GetTaskGroup(task_id);
917  if (group == NULL)
918  return NULL;
919 
920  return group->GetTaskStats(instance_id);
921 }
922 
924  int threads;
925  threads = 0;
926 
927 #if defined(__linux__)
928  std::ostringstream file_name;
929  std::string line;
930 
931  file_name << "/proc/" << pid << "/status";
932 
933  std::ifstream file(file_name.str().c_str());
934 
935  if(!file) {
936  LOG(ERROR, "opening /proc failed");
937  return -1;
938  }
939 
940  while (threads == 0 && file.good()) {
941  getline(file, line);
942  if (line == "Threads:\t1") threads = 1;
943  }
944  file.close();
945 #else
946 #error "TaskScheduler::CountThreadsPerPid() - unsupported platform."
947 #endif
948 
949  return threads;
950 }
951 
953  //
954  // Wait for a bit to give a chance for all the threads to exit
955  //
956  usleep(1000);
957 
958  int count = 0;
959  int threadsRunning;
960  pid_t pid = getpid();
961 
962  while (count++ < 12000) {
963  threadsRunning = CountThreadsPerPid(pid);
964 
965  if (threadsRunning == 1)
966  break;
967 
968  if (threadsRunning == -1) {
969  LOG(ERROR, "could not check if any thread is running");
970  usleep(10000);
971  break;
972  }
973 
974  usleep(10000);
975  }
976 }
977 
979  if (task_monitor_) {
981  delete task_monitor_;
982  task_monitor_ = NULL;
983  }
984 
985  for (int i = 0; i < 10000; i++) {
986  if (IsEmpty()) break;
987  usleep(1000);
988  }
989  assert(IsEmpty());
990  if (tbb_awake_task_) {
992  delete tbb_awake_task_;
993  tbb_awake_task_ = NULL;
994  }
995  evm_ = NULL;
996  singleton_->task_scheduler_.terminate();
998  singleton_.reset(NULL);
999 }
1000 
1002  TaskInfo::reference running = task_running.local();
1003  running = unit_test;
1004 }
1005 
1007  TaskInfo::reference running = task_running.local();
1008  running = NULL;
1009 }
1010 
1012  ThreadAmpFactor_ = n;
1013 }
1014 
1016  TaskGroup *group = GetTaskGroup(task_id);
1017  if (!group->IsDisabled()) {
1018  // Add TaskEntries(that contain enqueued tasks) which are already
1019  // disabled to disable_ entry maintained at TaskGroup.
1020  group->SetDisable(true);
1021  group->AddEntriesToDisableQ();
1022  }
1023 }
1024 
1026  TaskGroup *group = GetTaskGroup(task_id);
1027  group->SetDisable(false);
1028  // Run tasks that maybe suspended
1029  group->RunDisableEntries();
1030 }
1031 
1032 void TaskScheduler::DisableTaskEntry(int task_id, int instance_id) {
1033  TaskEntry *entry = GetTaskEntry(task_id, instance_id);
1034  entry->SetDisable(true);
1035 }
1036 
1037 void TaskScheduler::EnableTaskEntry(int task_id, int instance_id) {
1038  TaskEntry *entry = GetTaskEntry(task_id, instance_id);
1039  entry->SetDisable(false);
1040  TaskGroup *group = GetTaskGroup(task_id);
1041  // If group is still disabled, do not schedule the task. Task will be
1042  // scheduled for run when TaskGroup is enabled.
1043  if (group->IsDisabled()) {
1044  return;
1045  }
1046  // Run task instances that maybe suspended
1047  if (entry->WaitQSize() != 0) {
1048  entry->RunDeferEntry();
1049  }
1050 }
1051 
1053 // Implementation for class TaskGroup
1055 
1056 TaskGroup::TaskGroup(int task_id) : task_id_(task_id), policy_set_(false),
1057  run_count_(0), execute_delay_(0), schedule_delay_(0), disable_(false) {
1058  total_run_time_ = 0;
1060  task_entry_ = new TaskEntry(task_id);
1061  memset(&stats_, 0, sizeof(stats_));
1062  disable_entry_ = new TaskEntry(task_id);
1063 }
1064 
1066  policy_.clear();
1067  deferq_.clear();
1068 
1069  delete task_entry_;
1070  task_entry_ = NULL;
1071 
1072  for (size_t i = 0; i < task_entry_db_.size(); i++) {
1073  if (task_entry_db_[i] != NULL) {
1074  delete task_entry_db_[i];
1075  task_entry_db_[i] = NULL;
1076  }
1077  }
1078 
1079  delete disable_entry_;
1080  disable_entry_ = NULL;
1081  task_entry_db_.clear();
1082 }
1083 
1084 TaskEntry *TaskGroup::GetTaskEntry(int task_instance) {
1085  if (task_instance == -1)
1086  return task_entry_;
1087 
1088  int size = task_entry_db_.size();
1089  if (size <= task_instance) {
1090  task_entry_db_.resize(task_instance + TaskGroup::kVectorGrowSize);
1091  }
1092 
1093  TaskEntry *entry = task_entry_db_.at(task_instance);
1094  if (entry == NULL) {
1095  entry = new TaskEntry(task_id_, task_instance);
1096  task_entry_db_[task_instance] = entry;
1097  }
1098 
1099  return entry;
1100 }
1101 
1102 TaskEntry *TaskGroup::QueryTaskEntry(int task_instance) const {
1103  if (task_instance == -1) {
1104  return task_entry_;
1105  }
1106 
1107  if (task_instance >= (int)task_entry_db_.size())
1108  return NULL;
1109 
1110  return task_entry_db_[task_instance];
1111 }
1112 
1114  policy_.push_back(group);
1115 }
1116 
1118  for (TaskGroupPolicyList::iterator it = policy_.begin();
1119  it != policy_.end(); ++it) {
1120  if ((*it)->run_count_ != 0) {
1121  return (*it);
1122  }
1123  }
1124  return NULL;
1125 }
1126 
1128  TaskGroup *group;
1129  if ((group = ActiveGroupInPolicy()) != NULL) {
1130  // TaskEntry is inserted in the deferq_ based on the Task seqno.
1131  // deferq_ comparison function uses the seqno of the first Task queued
1132  // in the waitq_. Therefore, add the Task to waitq_ before adding
1133  // TaskEntry in the deferq_.
1134  if (0 == entry->WaitQSize()) {
1135  entry->AddToWaitQ(task);
1136  }
1137  group->AddToDeferQ(entry);
1138  return true;
1139  }
1140  return false;
1141 }
1142 
1144  stats_.defer_count_++;
1145  deferq_.insert(*entry);
1146  assert(entry->deferq_task_group_ == NULL);
1147  entry->deferq_task_group_ = this;
1148 }
1149 
1151  assert(this == entry.deferq_task_group_);
1152  deferq_.erase(deferq_.iterator_to(entry));
1153  entry.deferq_task_group_ = NULL;
1154 }
1155 
1157  disable_entry_->AddToDeferQ(entry);
1158 }
1159 
1161  assert(policy_set_ == false);
1162  policy_set_ = true;
1163 }
1164 
1166  TaskDeferList::iterator it;
1167 
1168  it = deferq_.begin();
1169  while (it != deferq_.end()) {
1170  TaskEntry &entry = *it;
1171  TaskDeferList::iterator it_work = it++;
1172  DeleteFromDeferQ(*it_work);
1173  entry.RunDeferEntry();
1174  }
1175 
1176  return;
1177 }
1178 
1179 inline void TaskGroup::TaskExited(Task *t) {
1180  run_count_--;
1182 }
1183 
1186 }
1187 
1189  TaskEntry *entry;
1190  if (task_entry_->WaitQSize()) {
1192  }
1193 
1194  // Walk thru the task_entry_db_ and add if waitq is non-empty
1195  for (TaskEntryList::iterator it = task_entry_db_.begin();
1196  it != task_entry_db_.end(); ++it) {
1197  if ((entry = *it) == NULL) {
1198  continue;
1199  }
1200  if (entry->WaitQSize()) {
1201  AddToDisableQ(entry);
1202  }
1203  }
1204 }
1205 
1207  TaskEntry *entry;
1208 
1209  // Check the waitq_ of the instance -1
1210  if (task_entry_->WaitQSize()) {
1211  return false;
1212  }
1213 
1214  // Walk thru the task_entry_db_ until waitq_ of any of the task is non-zero
1215  for (TaskEntryList::iterator it = task_entry_db_.begin();
1216  it != task_entry_db_.end(); ++it) {
1217  if ((entry = *it) == NULL) {
1218  continue;
1219  }
1220  if (entry->IsDisabled()) {
1221  continue;
1222  }
1223  if (entry->WaitQSize()) {
1224  return false;
1225  }
1226  }
1227 
1228  // Well, no task has been enqueued in this task group
1229  return true;
1230 }
1231 
1233  memset(&stats_, 0, sizeof(stats_));
1234 }
1235 
1238 }
1239 
1240 void TaskGroup::ClearTaskStats(int task_instance) {
1241  TaskEntry *entry = QueryTaskEntry(task_instance);
1242  if (entry != NULL)
1243  entry->ClearTaskStats();
1244 }
1245 
1247  return &stats_;
1248 }
1249 
1251  return task_entry_->GetTaskStats();
1252 }
1253 
1254 TaskStats *TaskGroup::GetTaskStats(int task_instance) {
1255  TaskEntry *entry = QueryTaskEntry(task_instance);
1256  return entry->GetTaskStats();
1257 }
1258 
1260 // Implementation for class TaskEntry
1262 
1263 TaskEntry::TaskEntry(int task_id, int task_instance) : task_id_(task_id),
1264  task_instance_(task_instance), run_count_(0), run_task_(NULL),
1265  waitq_(), deferq_task_entry_(NULL), deferq_task_group_(NULL),
1266  disable_(false) {
1267  // When a new TaskEntry is created, adds an implicit rule into policyq_ to
1268  // ensure that only one Task of an instance is run at a time
1269  if (task_instance != -1) {
1270  policyq_.push_back(this);
1271  }
1272  memset(&stats_, 0, sizeof(stats_));
1273  // allocate memory for deferq
1274  deferq_ = new TaskDeferList;
1275 }
1276 
1277 TaskEntry::TaskEntry(int task_id) : task_id_(task_id),
1278  task_instance_(-1), run_count_(0), run_task_(NULL),
1279  deferq_task_entry_(NULL), deferq_task_group_(NULL), disable_(false) {
1280  memset(&stats_, 0, sizeof(stats_));
1281  // allocate memory for deferq
1282  deferq_ = new TaskDeferList;
1283 }
1284 
1286  policyq_.clear();
1287 
1288  assert(0 == deferq_->size());
1289  delete deferq_;
1290 }
1291 
1293  policyq_.push_back(entry);
1294 }
1295 
1297  for (TaskEntryList::iterator it = policyq_.begin(); it != policyq_.end();
1298  ++it) {
1299  if ((*it)->run_count_ != 0) {
1300  return (*it);
1301  }
1302  }
1303 
1304  return NULL;
1305 }
1306 
1308  TaskEntry *policy_entry;
1309 
1310  if ((policy_entry = ActiveEntryInPolicy()) != NULL) {
1311  // TaskEntry is inserted in the deferq_ based on the Task seqno.
1312  // deferq_ comparison function uses the seqno of the first Task queued
1313  // in the waitq_. Therefore, add the Task to waitq_ before adding
1314  // TaskEntry in the deferq_.
1315  if (0 == WaitQSize()) {
1316  AddToWaitQ(task);
1317  }
1318  policy_entry->AddToDeferQ(this);
1319  return true;
1320  }
1321  return false;
1322 }
1323 
1325  t->SetState(Task::WAIT);
1326  stats_.wait_count_++;
1327  waitq_.push_back(*t);
1328 
1330  TaskGroup *group = scheduler->GetTaskGroup(task_id_);
1331  group->stats_.wait_count_++;
1332 }
1333 
1335  TaskWaitQ::iterator it = waitq_.iterator_to(*t);
1336  waitq_.erase(it);
1337  return true;
1338 }
1339 
1341  stats_.defer_count_++;
1342  deferq_->insert(*entry);
1343  assert(entry->deferq_task_entry_ == NULL);
1344  entry->deferq_task_entry_ = this;
1345 }
1346 
1348  assert(this == entry.deferq_task_entry_);
1349  deferq_->erase(deferq_->iterator_to(entry));
1350  entry.deferq_task_entry_ = NULL;
1351 }
1352 
1354  stats_.run_count_++;
1355  if (t->GetTaskInstance() != -1) {
1356  assert(run_task_ == NULL);
1357  assert (run_count_ == 0);
1358  run_task_ = t;
1359  }
1360 
1361  run_count_++;
1363  TaskGroup *group = scheduler->QueryTaskGroup(t->GetTaskId());
1364  group->TaskStarted();
1365 
1366  t->StartTask(scheduler);
1367 }
1368 
1370  if (waitq_.size() == 0)
1371  return;
1372 
1373  TaskWaitQ::iterator it = waitq_.begin();
1374 
1375  if (task_instance_ != -1) {
1376  Task *t = &(*it);
1377  DeleteFromWaitQ(t);
1378  RunTask(t);
1379  // If there are more tasks in waitq_, put them in deferq_
1380  if (waitq_.size() != 0) {
1381  AddToDeferQ(this);
1382  }
1383  } else {
1384  // Run all instances in waitq_
1385  while (it != waitq_.end()) {
1386  Task *t = &(*it);
1387  DeleteFromWaitQ(t);
1388  RunTask(t);
1389  if (waitq_.size() == 0)
1390  break;
1391  it = waitq_.begin();
1392  }
1393  }
1394 }
1395 
1398  TaskGroup *group = scheduler->GetTaskGroup(task_id_);
1399 
1400  // Sanity check
1401  assert(waitq_.size());
1402  Task *task = &(*waitq_.begin());
1403 
1404  // Check Task group policies
1405  if (group->DeferOnPolicyFail(this, task)) {
1406  return;
1407  }
1408 
1409  // Check Task entry policies
1410  if (DeferOnPolicyFail(task)) {
1411  return;
1412  }
1413 
1414  RunWaitQ();
1415  return;
1416 }
1417 
1419  TaskDeferList::iterator it;
1420 
1421  it = deferq_->begin();
1422  while (it != deferq_->end()) {
1423  TaskEntry &entry = *it;
1424  TaskDeferList::iterator it_work = it++;
1425  DeleteFromDeferQ(*it_work);
1426  entry.RunDeferEntry();
1427  }
1428 
1429  return;
1430 }
1431 
1433  TaskDeferList::iterator it;
1434 
1435  it = deferq_->begin();
1436  while (it != deferq_->end()) {
1437  TaskEntry &entry = *it;
1438  TaskDeferList::iterator it_work = it++;
1439  DeleteFromDeferQ(*it_work);
1440  if (!entry.IsDisabled()) {
1441  entry.RunDeferEntry();
1442  }
1443  }
1444 
1445  return;
1446 }
1447 
1450  TaskGroup *group = scheduler->QueryTaskGroup(task_id_);
1451  TaskDeferEntryCmp defer_entry_compare;
1452 
1453  TaskDeferList::iterator group_it = group->deferq_.begin();
1454  TaskDeferList::iterator entry_it = deferq_->begin();
1455 
1456  // Loop thru the deferq_ of TaskEntry and TaskGroup in the temporal order.
1457  // Exit the loop when any of the queues become empty.
1458  while ((group_it != group->deferq_.end()) &&
1459  (entry_it != deferq_->end())) {
1460  TaskEntry &g_entry = *group_it;
1461  TaskEntry &t_entry = *entry_it;
1462 
1463  if (defer_entry_compare(g_entry, t_entry)) {
1464  TaskDeferList::iterator group_it_work = group_it++;
1465  group->DeleteFromDeferQ(*group_it_work);
1466  g_entry.RunDeferEntry();
1467  } else {
1468  TaskDeferList::iterator entry_it_work = entry_it++;
1469  DeleteFromDeferQ(*entry_it_work);
1470  t_entry.RunDeferEntry();
1471  }
1472  }
1473 
1474  // Now, walk thru the non-empty deferq_
1475  if (group_it != group->deferq_.end()) {
1476  group->RunDeferQ();
1477  } else if (entry_it != deferq_->end()) {
1478  RunDeferQ();
1479  }
1480 }
1481 
1483  if (task_instance_ != -1) {
1484  assert(run_task_ == t);
1485  run_task_ = NULL;
1486  assert(run_count_ == 1);
1487  }
1488 
1489  run_count_--;
1492  group->TaskExited(t);
1493 
1494  if (!group->run_count_ && !run_count_) {
1496  } else if (!group->run_count_) {
1497  group->RunDeferQ();
1498  } else if (!run_count_) {
1499  RunDeferQ();
1500  }
1501 }
1502 
1504  deferq_->clear();
1505  policyq_.clear();
1506  waitq_.clear();
1507 }
1508 
1510  memset(&stats_, 0, sizeof(stats_));
1511 }
1512 
1514  return &stats_;
1515 }
1516 
1517 boost::optional<uint64_t> TaskEntry::GetTaskDeferEntrySeqno() const {
1518  if(waitq_.size()) {
1519  const Task *task = &(*waitq_.begin());
1520  return task->GetSeqno();
1521  }
1522 
1523  return boost::none;
1524 }
1525 
1527 // Implementation for class Task
1529 Task::Task(int task_id, int task_instance) : task_id_(task_id),
1530  task_instance_(task_instance), task_impl_(NULL), state_(INIT),
1531  tbb_state_(TBB_INIT), seqno_(0), task_recycle_(false), task_cancel_(false),
1532  enqueue_time_(0), schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
1533 }
1534 
1535 Task::Task(int task_id) : task_id_(task_id),
1536  task_instance_(-1), task_impl_(NULL), state_(INIT), tbb_state_(TBB_INIT),
1537  seqno_(0), task_recycle_(false), task_cancel_(false), enqueue_time_(0),
1538  schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
1539 }
1540 
1541 
1542 void Task::StartTask(TaskScheduler *scheduler) {
1543  if (enqueue_time_ != 0) {
1545  if ((schedule_time_ - enqueue_time_) >
1546  scheduler->schedule_delay(this)) {
1547  TASK_TRACE(scheduler, this, "Schedule delay(in usec) ",
1549  }
1550  }
1551  assert(task_impl_ == NULL);
1552  SetState(RUN);
1554  task_impl_ = new (task::allocate_root())TaskImpl(this);
1555  if (scheduler->use_spawn()) {
1556  task::spawn(*task_impl_);
1557  } else {
1558  task::enqueue(*task_impl_);
1559  }
1560 }
1561 
1563  TaskInfo::reference running = task_running.local();
1564  return running;
1565 }
1566 
1567 ostream& operator<<(ostream& out, const Task &t) {
1568  out << "Task <" << t.task_id_ << "," << t.task_instance_ << ":"
1569  << t.seqno_ << "> ";
1570  return out;
1571 }
1572 
1574 // Implementation for sandesh APIs for Task
1576 void TaskEntry::GetSandeshData(SandeshTaskEntry *resp) const {
1577  resp->set_instance_id(task_instance_);
1578  resp->set_tasks_created(stats_.enqueue_count_);
1579  resp->set_total_tasks_completed(stats_.total_tasks_completed_);
1580  resp->set_tasks_running(run_count_);
1581  resp->set_waitq_size(waitq_.size());
1582  resp->set_deferq_size(deferq_->size());
1583  resp->set_last_exit_time(stats_.last_exit_time_);
1584 }
1585 void TaskGroup::GetSandeshData(SandeshTaskGroup *resp, bool summary) const {
1586  if (total_run_time_)
1587  resp->set_total_run_time(duration_usecs_to_string(total_run_time_));
1588 
1589  std::vector<SandeshTaskEntry> list;
1590  TaskEntry *task_entry = QueryTaskEntry(-1);
1591  if (task_entry) {
1592  SandeshTaskEntry entry_resp;
1593  task_entry->GetSandeshData(&entry_resp);
1594  list.push_back(entry_resp);
1595  }
1596  for (TaskEntryList::const_iterator it = task_entry_db_.begin();
1597  it != task_entry_db_.end(); ++it) {
1598  task_entry = *it;
1599  if (task_entry) {
1600  SandeshTaskEntry entry_resp;
1601  task_entry->GetSandeshData(&entry_resp);
1602  list.push_back(entry_resp);
1603  }
1604  }
1605  resp->set_task_entry_list(list);
1606 
1607  if (summary)
1608  return;
1609 
1611  std::vector<SandeshTaskPolicyEntry> policy_list;
1612  for (TaskGroupPolicyList::const_iterator it = policy_.begin();
1613  it != policy_.end(); ++it) {
1614  SandeshTaskPolicyEntry policy_entry;
1615  policy_entry.set_task_name(scheduler->GetTaskName((*it)->task_id_));
1616  policy_entry.set_tasks_running((*it)->run_count_);
1617  policy_list.push_back(policy_entry);
1618  }
1619  resp->set_task_policy_list(policy_list);
1620 }
1621 
1622 void TaskScheduler::GetSandeshData(SandeshTaskScheduler *resp, bool summary) {
1623  tbb::mutex::scoped_lock lock(mutex_);
1624 
1625  resp->set_running(running_);
1626  resp->set_use_spawn(use_spawn_);
1627  resp->set_total_count(seqno_);
1628  resp->set_thread_count(hw_thread_count_);
1629 
1630  std::vector<SandeshTaskGroup> list;
1631  for (TaskIdMap::const_iterator it = id_map_.begin(); it != id_map_.end();
1632  it++) {
1633  SandeshTaskGroup resp_group;
1634  TaskGroup *group = QueryTaskGroup(it->second);
1635  resp_group.set_task_id(it->second);
1636  resp_group.set_name(it->first);
1637  if (group)
1638  group->GetSandeshData(&resp_group, summary);
1639  list.push_back(resp_group);
1640  }
1641  resp->set_task_group_list(list);
1642 }
size_t num_tasks() const
Definition: task.cc:282
int task_id_
The code path executed by the task.
Definition: task.h:149
void DisableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1032
bool measure_delay_
Definition: task.h:379
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec, uint64_t inactivity_time_msec, uint64_t poll_interval_msec)
Enable Task monitoring.
Definition: task.cc:511
TaskGroupPolicyList policy_
Policy rules for the group.
Definition: task.cc:320
int wait_count_
Number of entries in waitq.
Definition: task.h:29
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
Definition: task.cc:170
bool task_recycle_
Definition: task.h:157
TaskMonitor * task_monitor_
Definition: task.h:397
std::vector< TaskGroup * > TaskGroupPolicyList
Vector of Task Group policies.
Definition: task.cc:298
TaskEntry * QueryTaskEntry(int task_id, int instance_id)
Query TaskEntry for a task-id and task-instance.
Definition: task.cc:588
static const int kVectorGrowSize
Definition: task.h:335
Task * parent_
Definition: task.cc:63
void GetSandeshData(SandeshTaskScheduler *resp, bool summary)
Definition: task.cc:1622
std::vector< TaskEntry * > TaskEntryList
Definition: task.cc:35
Task * run_task_
Task currently running.
Definition: task.cc:179
void EnableTaskEntry(int task_id, int instance_id)
Definition: task.cc:1037
tbb::task * task_impl_
Definition: task.h:153
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
bool task_cancel_
Definition: task.h:158
int hw_thread_count_
Definition: task.h:376
static int GetDefaultThreadCount()
Definition: task.cc:429
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskGroup.
Definition: task.cc:1165
static void Initialize(uint32_t thread_count=0, EventManager *evm=NULL)
Definition: task.cc:478
static Task * Running()
Returns a pointer to the current task the code is executing under.
Definition: task.cc:1562
TaskEntry * GetTaskEntry(int task_id, int instance_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:583
uint64_t total_tasks_completed_
Number of total tasks ran.
Definition: task.h:41
int run_count_
No. of tasks running.
Definition: task.cc:176
void AddToDeferQ(TaskEntry *entry)
Add task to deferq_ Only one task of a given instance goes into deferq_ for its policies.
Definition: task.cc:1143
bool IsWaitQEmpty()
Returns true, if the waiq_ of all the tasks in the group are empty.
Definition: task.cc:1206
boost::optional< uint64_t > GetTaskDeferEntrySeqno() const
Addition/deletion of TaskEntry in the deferq_ is based on the seqno. seqno of the first Task in the w...
Definition: task.cc:1517
TaskIdMap id_map_
Definition: task.h:372
void RunDeferQForGroupEnable()
Starts executing tasks from deferq_ of TaskEntries which are enabled.
Definition: task.cc:1432
uint32_t schedule_delay() const
Definition: task.h:133
static boost::scoped_ptr< TaskScheduler > singleton_
Definition: task.h:336
TaskEntry * ActiveEntryInPolicy()
Definition: task.cc:1296
TaskStats * GetTaskStats(int task_id)
Definition: task.cc:907
tbb::reader_writer_lock id_map_mutex_
Definition: task.h:371
CancelReturnCode
Definition: task.h:205
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
Definition: task.cc:610
uint32_t schedule_delay_
Log if time between enqueue and task-execute exceeds the delay.
Definition: task.h:382
void SetTbbState(TbbState s)
Definition: task.h:140
TaskEntry * stop_entry_
Definition: task.h:363
uint32_t execute_delay() const
Definition: task.h:132
std::ostream & operator<<(std::ostream &out, BFDState state)
Definition: bfd_common.cc:23
uint64_t enqueue_count_
Definition: task.h:387
void AddToDisableQ(TaskEntry *entry)
Enqueue TaskEntry in disable_entry&#39;s deferQ.
Definition: task.cc:1156
int GetRunCount() const
Definition: task.cc:147
int run_count_
No. of tasks running in the group.
Definition: task.cc:316
TaskGroup(int task_id)
Definition: task.cc:1056
uint32_t execute_delay() const
Definition: task.h:302
uint32_t execute_delay_
Definition: task.cc:333
TaskEntry * deferq_task_entry_
Definition: task.cc:189
virtual void OnTaskCancel()
Called on task exit, if it is marked for cancellation. If the user wants to do any cleanup on task ca...
Definition: task.h:114
TaskGroupDb task_group_db_
Definition: task.h:369
TaskGroup * QueryTaskGroup(int task_id)
Query TaskGroup for a task_id.Assumes valid entry is present for task_id.
Definition: task.cc:570
EventManager * evm_
Definition: task.h:390
void Start()
Starts scheduling of all tasks.
Definition: task.cc:798
uint32_t execute_delay_
Definition: task.h:161
void DeleteFromDeferQ(TaskEntry &entry)
Deletes a task from deferq_.
Definition: task.cc:1347
TaskEntry * QueryTaskEntry(int task_instance) const
Definition: task.cc:1102
TaskStats * GetTaskGroupStats(int task_id)
Definition: task.cc:899
uint64_t seqno_
Definition: task.h:156
void set_event_manager(EventManager *evm)
Definition: task.cc:493
TaskEntryList policyq_
Policy rules for a task.
Definition: task.cc:185
int run_count_
Number of entries currently running.
Definition: task.h:32
void SetDisable(bool disable)
Definition: task.cc:148
void RunCombinedDeferQ()
Starts executing tasks from deferq_ of TaskEntry and TaskGroup in the temporal order.
Definition: task.cc:1448
TaskScheduler(int thread_count=0)
TaskScheduler constructor. TBB assumes it can use the &quot;thread&quot; invoking tbb::scheduler can be used fo...
Definition: task.cc:444
void IncrementTotalRunTime(int64_t rtime)
Definition: task.cc:269
void SetDisable(bool disable)
Definition: task.cc:276
CancelReturnCode Cancel(Task *task)
Cancels a Task that can be in RUN/WAIT state. The caller needs to ensure that the task exists when Ca...
Definition: task.cc:699
void RunDisableEntries()
Run tasks that maybe suspended. Schedule tasks only for TaskEntries which are enabled.
Definition: task.cc:1184
void RunDeferEntry()
Definition: task.cc:1396
bool DeleteFromWaitQ(Task *t)
Definition: task.cc:1334
void TaskStarted()
Definition: task.cc:268
TaskGroup maintains per &lt;task-id&gt; information including,.
Definition: task.cc:222
Comparison routine for the TaskDeferList.
Definition: task.cc:200
int GetTaskId(const std::string &name)
Definition: task.cc:856
A class maintaning information for every &lt;task, instance&gt;
Definition: task.cc:95
void StartTask(TaskScheduler *scheduler)
Starts execution of a task.
Definition: task.cc:1542
int CountThreadsPerPid(pid_t pid)
Platfrom-dependent subroutine in Linux and FreeBSD implementations, used only in TaskScheduler::WaitF...
Definition: task.cc:923
bool disable_
Definition: task.cc:191
void EnableLatencyThresholds(uint32_t execute, uint32_t schedule)
Enable logging of tasks exceeding configured latency.
Definition: task.cc:595
int GetTaskId() const
Definition: task.cc:145
void Terminate()
Definition: task_monitor.cc:59
TaskEntry * disable_entry_
Task entry for disabled group.
Definition: task.cc:329
bool track_run_time() const
Definition: task.h:297
uint64_t last_exit_time_
Number of time stamp of latest exist.
Definition: task.h:44
TaskWaitQ waitq_
Tasks waiting to run on some condition.
Definition: task.cc:182
void Print()
Debug print routine.
Definition: task.cc:808
void WaitForTerminateCompletion()
Definition: task.cc:952
TaskGroup * deferq_task_group_
Definition: task.cc:190
void TaskExited(Task *t)
Definition: task.cc:1179
TaskEntry * GetTaskEntry(int task_instance)
Definition: task.cc:1084
State state_
Definition: task.h:154
static void SetThreadAmpFactor(int n)
following function allows one to increase max num of threads used by TBB
Definition: task.cc:1011
void RegisterLog(LogFn fn)
Definition: task.cc:531
TaskStats * GetTaskGroupStats()
Definition: task.cc:1246
TaskStats stats_
Cummulative Maintenance stats.
Definition: task.cc:194
bool policy_set_
Specifies if policy is already set.
Definition: task.cc:313
TaskStats * GetTaskStats()
Definition: task.cc:1513
Definition: task.h:26
tbb::task_scheduler_init task_scheduler_
Definition: task.h:365
bool unit_test()
Definition: bgp_log.cc:53
tbb::task * execute()
Method called from tbb::task to execute. Invoke Run() method of client. Supports task continuation wh...
Definition: task.cc:345
int task_id() const
Definition: task.cc:280
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
A private class used to implement tbb::task An object is created when task is ready for execution and...
Definition: task.cc:47
LogFn log_fn_
Definition: task.h:375
void AddPolicy(TaskEntry *entry)
Definition: task.cc:1292
boost::intrusive::list_member_hook waitq_hook_
Definition: task.h:164
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
~TaskEntry()
Definition: task.cc:1285
boost::intrusive::list< Task, WaitQHook > TaskWaitQ
Definition: task.cc:159
int task_instance_
Definition: task.cc:173
#define CHECK_CONCURRENCY(...)
int GetTaskInstance() const
Definition: task.cc:146
void AddEntriesToDisableQ()
Add TaskEntries to disable_entry_ which have tasks enqueued and are already disabled.
Definition: task.cc:1188
uint32_t schedule_delay_
Definition: task.h:162
static const std::string duration_usecs_to_string(const uint64_t usecs)
Definition: time_util.h:62
TaskTbbKeepAwake * tbb_awake_task_
Definition: task.h:396
void ClearRunningTask()
Definition: task.cc:1006
bool IsEmpty(bool running_only=false)
Returns true if there are no tasks running and/or enqueued If running_only is true, enqueued tasks are ignored i.e. return true if there are no running tasks. Ignore TaskGroup or TaskEntry if it is disabled.
Definition: task.cc:823
#define DISALLOW_COPY_AND_ASSIGN(_Class)
Definition: util.h:15
void SetSeqNo(uint64_t seqno)
Definition: task.h:139
int GetTaskInstance() const
Definition: task.h:119
void ModifyTbbKeepAwakeTimeout(uint32_t timeout)
Definition: task.cc:505
bool StartTbbKeepAwakeTask(TaskScheduler *ts, EventManager *event_mgr, const std::string task_name, uint32_t tbbKeepawakeTimeout=1000)
uint64_t cancel_count_
Definition: task.h:389
boost::intrusive::member_hook< Task, boost::intrusive::list_member_hook<>,&Task::waitq_hook_ > WaitQHook
List of Task&#39;s in waitq_.
Definition: task.cc:158
void AddToDeferQ(TaskEntry *entry)
Adds a task to deferq_. Only one task of a given instance goes into deferq_ for its policies...
Definition: task.cc:1340
static void GetTaskStats(TaskProfileStats *stats, int index, ProfileData *data)
boost::intrusive::set_member_hook task_defer_node
Definition: task.cc:161
int defer_count_
Number of entries in deferq.
Definition: task.h:35
TaskEntry(int task_id)
Definition: task.cc:1277
void ClearTaskStats()
Definition: task.cc:1509
void SetRunningTask(Task *)
This function should not be called in production code. It is only for unit testing to control current...
Definition: task.cc:1001
void ClearTaskGroupStats(int task_id)
Definition: task.cc:875
int task_id_
Definition: task.cc:172
TaskImpl(Task *t)
Definition: task.cc:49
std::vector< TaskExclusion > TaskPolicy
Definition: task.h:59
~TaskScheduler()
Frees up the task_entry_db_ allocated for scheduler.
Definition: task.cc:455
virtual ~TaskImpl()
Destructor is called when a task execution is compeleted. Invoked implicitly by tbb::task. Invokes OnTaskExit to schedule tasks pending tasks.
Definition: task.cc:400
int task_id_
Definition: task.cc:310
uint64_t done_count_
Definition: task.h:388
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>,&TaskEntry::task_defer_node > TaskDeferListOption
Definition: task.cc:301
bool DeferOnPolicyFail(Task *t)
Definition: task.cc:1307
void Start(EventManager *evm)
Definition: task_monitor.cc:49
uint64_t enqueue_count_
Number of tasks enqueued.
Definition: task.h:38
void RunDeferQ()
Starts executing tasks from deferq_ of a TaskEntry.
Definition: task.cc:1418
uint32_t execute_delay_
Log if time taken to execute exceeds the delay.
Definition: task.h:385
TaskGroup * GetTaskGroup(int task_id)
Get TaskGroup for a task_id. Grows task_entry_db_ if necessary.
Definition: task.cc:554
bool use_spawn() const
Definition: task.h:324
int GetTaskId() const
Definition: task.h:118
TaskEntry * GetDisableEntry()
Definition: task.cc:242
std::string GetTaskName(int task_id) const
Definition: task.cc:846
TaskDeferList * deferq_
Tasks deferred for this to exit.
Definition: task.cc:188
int id_max_
Definition: task.h:373
TaskStats * GetTaskStats()
Definition: task.cc:1250
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
uint64_t GetSeqno() const
Definition: task.h:120
void DisableTaskGroup(int task_id)
Definition: task.cc:1015
bool disable_
Definition: task.cc:335
void Log(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)
Definition: task.cc:523
int TaskRunCount() const
Definition: task.cc:258
uint32_t schedule_delay_
Definition: task.cc:334
bool running_
Definition: task.h:367
#define TASK_TRACE(scheduler, task, msg, delay)
Definition: task.cc:39
static int ThreadAmpFactor_
following variable allows one to increase max num of threads used by TBB
Definition: task.h:394
void RunTask(Task *t)
Starts a task. If there are more entries in waitq_ add them to deferq_.
Definition: task.cc:1353
void RunWaitQ()
Definition: task.cc:1369
static const int kVectorGrowSize
Definition: task.cc:309
Definition: task.h:92
static bool ShouldUseSpawn()
Definition: task.cc:433
void ClearQueues()
Definition: task.cc:1503
#define LOG(_Level, _Msg)
Definition: logging.h:33
size_t WaitQSize() const
Definition: task.cc:102
boost::intrusive::set< TaskEntry, TaskDeferListOption, boost::intrusive::compare< TaskDeferEntryCmp > > TaskDeferList
It is a tree of TaskEntries deferred and waiting on the containing task to exit. The tree is sorted b...
Definition: task.cc:307
void PolicySet()
Definition: task.cc:1160
bool IsDisabled()
Definition: task.cc:149
Task(int task_id, int task_instance)
Definition: task.cc:1529
static uint64_t ClockMonotonicUsec()
Definition: time_util.h:29
void Stop()
Stops scheduling of all tasks.
Definition: task.cc:792
size_t deferq_size() const
Definition: task.cc:281
boost::intrusive::member_hook< TaskEntry, boost::intrusive::set_member_hook<>,&TaskEntry::task_defer_node > TaskDeferListOption
Definition: task.cc:164
int task_instance_
The dataset id within a code path.
Definition: task.h:152
void EnableTaskGroup(int task_id)
Definition: task.cc:1025
tbb::mutex mutex_
Definition: task.h:366
uint64_t schedule_time_
Definition: task.h:160
bool track_run_time_
Definition: task.h:378
bool use_spawn_
Use spawn() to run a tbb::task instead of enqueue()
Definition: task.h:362
TaskDeferList deferq_
Tasks deferred till run_count_ is 0.
Definition: task.cc:323
bool DeferOnPolicyFail(TaskEntry *entry, Task *t)
Definition: task.cc:1127
void SetLatencyThreshold(const std::string &name, uint32_t execute, uint32_t schedule)
Definition: task.cc:602
void DeleteFromDeferQ(TaskEntry &entry)
Delete task from deferq_.
Definition: task.cc:1150
tbb::atomic< uint64_t > total_run_time_
Definition: task.cc:317
friend class TaskEntry
Definition: task.cc:294
void OnTaskExit(Task *task)
Method invoked on exit of a Task. Exit of a task can potentially start tasks in pendingq.
Definition: task.cc:762
void TaskExited(Task *t, TaskGroup *group)
Definition: task.cc:1482
TaskGroup * ActiveGroupInPolicy()
Definition: task.cc:1117
bool IsTaskGroupEmpty(int task_id) const
Check if there are any Tasks in the given TaskGroup. Assumes that all task ids are mutually exclusive...
Definition: task.cc:574
void Terminate()
Definition: task.cc:978
void GetSandeshData(SandeshTaskEntry *resp) const
Definition: task.cc:1576
bool IsDisabled()
Definition: task.cc:277
TaskStats stats_
Definition: task.cc:337
static TaskInfo task_running
Definition: task.cc:32
void AddPolicy(TaskGroup *group)
Definition: task.cc:1113
void ClearTaskStats()
Definition: task.cc:1236
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
uint64_t enqueue_time_
Definition: task.h:159
TaskEntry * task_entry_
Tasks deferred till run_count_ is 0.
Definition: task.cc:326
friend class TaskImpl
Definition: task.h:138
uint32_t schedule_delay() const
Definition: task.h:301
void EnqueueUnLocked(Task *task)
Definition: task.cc:642
void ClearTaskGroupStats()
Definition: task.cc:1232
static int GetThreadCount(int thread_count=0)
Get number of tbb worker threads. For testing purposes only. Limit the number of tbb worker threads...
Definition: task.cc:407
TaskEntryList task_entry_db_
task-entries in this group
Definition: task.cc:332
void AddToWaitQ(Task *t)
Definition: task.cc:1324
~TaskGroup()
Definition: task.cc:1065
void GetSandeshData(SandeshTaskGroup *resp, bool summary) const
Definition: task.cc:1585
uint64_t seqno_
Definition: task.h:368
struct task_ task
static EventManager evm
boost::function< void(const char *file_name, uint32_t line_no, const Task *task, const char *description, uint64_t delay)> LogFn
Definition: task.h:182
void SetState(State s)
Definition: task.h:141
void ClearTaskStats(int task_id)
Definition: task.cc:883
tbb::enumerable_thread_specific< Task * > TaskInfo
Definition: task.cc:28