OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
instance_manager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2014 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 
7 #include <boost/bind.hpp>
8 #include <boost/functional/hash.hpp>
9 #include <boost/filesystem.hpp>
10 #include <boost/tokenizer.hpp>
11 #include <sys/wait.h>
12 #include "cmn/agent.h"
13 #include "db/db.h"
14 #include "io/event_manager.h"
15 #include "oper/instance_task.h"
16 #include "oper/operdb_init.h"
17 #include "oper/service_instance.h"
18 #include "oper/vm.h"
21 #ifdef WITH_LIBVIRT
23 #endif
24 #include "base/util.h"
25 
26 using boost::uuids::uuid;
28  SandeshTraceBufferCreate("InstanceManager", 1000));
29 
30 static const char loadbalancer_config_path_default[] =
31  "/var/lib/contrail/loadbalancer/";
32 static const char namespace_store_path_default[] =
33  "/var/run/netns";
34 static const char namespace_prefix[] = "vrouter-";
35 
37 public:
39  : agent_(agent), manager_(manager) {
40  }
41 
43  namespace fs = boost::filesystem;
44 
45  //Read all the Namepaces in the system
46  fs::path ns(manager_->namespace_store_path_);
47  if ( !fs::exists(ns) || !fs::is_directory(ns)) {
48  return;
49  }
50 
51  typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
52  boost::char_separator<char> slash_sep("/");
53  boost::char_separator<char> colon_sep(":");
54  fs::directory_iterator end_iter;
55  for(fs::directory_iterator iter(ns); iter != end_iter; iter++) {
56 
57  // Get to the name of namespace by removing complete path
58  tokenizer tokens(iter->path().string(), slash_sep);
59  std::string ns_name;
60  for(tokenizer::iterator it=tokens.begin(); it!=tokens.end(); it++){
61  ns_name = *it;
62  }
63 
64  //We are interested only in namespaces starting with a given
65  //prefix
66  std::size_t vrouter_found;
67  vrouter_found = ns_name.find(namespace_prefix);
68  if (vrouter_found == std::string::npos) {
69  continue;
70  }
71 
72  //Remove the standard prefix
73  ns_name.replace(vrouter_found, strlen(namespace_prefix), "");
74 
75  //Namespace might have a ":". Extract both left and right of
76  //":" Left of ":" is the VM uuid. If not found in Agent's VM
77  //DB, it can be deleted
78  tokenizer tok(ns_name, colon_sep);
79  boost::uuids::uuid vm_uuid = StringToUuid(*tok.begin());
80  VmKey key(vm_uuid);
81  if (agent_->vm_table()->Find(&key, true)) {
82  continue;
83  }
84 
86  prop.instance_id = vm_uuid;
88  tokenizer::iterator next_tok = ++(tok.begin());
89  //Loadbalancer namespace
90  if (next_tok != tok.end()) {
91  prop.loadbalancer_id = *next_tok;
93  }
94 
95  //Delete Namespace
96  manager_->StopStaleNetNS(prop);
97 
98  //If Loadbalncer, delete the config files as well
100  //Delete the complete directory
101  std::stringstream pathgen;
103  << prop.loadbalancer_id << ".conf";
104 
105  boost::system::error_code error;
106  if (fs::exists(pathgen.str())) {
107  fs::remove(pathgen.str(), error);
108  if (error) {
109  std::stringstream ss;
110  ss << "Stale loadbalancer cfg fle delete error ";
111  ss << error.message();
112  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
113  }
114  }
115  }
116  }
117  }
118 
119 private:
122 };
123 
127 }
128 
130  : si_listener_(DBTableBase::kInvalidId),
131  netns_timeout_(-1),
132  work_queue_(TaskScheduler::GetInstance()->GetTaskId(INSTANCE_MANAGER_TASK_NAME), 0,
133  boost::bind(&InstanceManager::DequeueEvent, this, _1)),
134  loadbalancer_config_path_(loadbalancer_config_path_default),
135  namespace_store_path_(namespace_store_path_default),
136  stale_timer_interval_(5 * 60 * 1000),
137  stale_timer_(TimerManager::CreateTimer(*(agent->event_manager()->io_service()),
138  "NameSpaceStaleTimer", TaskScheduler::GetInstance()->
139  GetTaskId(INSTANCE_MANAGER_TASK_NAME), 0)), agent_(agent) {
140  if (agent->isMockMode()) {
141  loadbalancer_config_path_ = "/tmp/" + agent->AgentGUID()
143  namespace_store_path_ = "/tmp/" + agent->AgentGUID()
145  }
146  work_queue_.set_name("Instance Manager");
147 
148 }
149 
150 void InstanceManager::Initialize(DB *database, const std::string &netns_cmd,
151  const std::string &docker_cmd,
152  const int netns_workers,
153  const int netns_timeout) {
155  assert(si_table);
156  si_listener_ = si_table->Register(
157  boost::bind(&InstanceManager::EventObserver, this, _1, _2));
158 
159  netns_cmd_ = netns_cmd;
160  if (netns_cmd_.length() == 0) {
161  LOG(ERROR, "NetNS path for network namespace command not specified "
162  "in the config file, the namespaces won't be started");
163  }
164  if (docker_cmd.length() == 0) {
165  LOG(ERROR, "Path for Docker starter command not specified "
166  "in the config file, the Docker instances won't be started");
167  }
168 
169  std::stringstream pathgen;
170  pathgen << loadbalancer_config_path_;
171  boost::filesystem::path dir(pathgen.str());
172  boost::system::error_code error;
173  boost::filesystem::create_directories(dir, error);
174  if (error) {
175  LOG(ERROR, "Falied to create Loadbalancer Directory " << pathgen.str());
176  }
177 
178  adapters_.push_back(new DockerInstanceAdapter(docker_cmd, agent_));
179  adapters_.push_back(new NetNSInstanceAdapter(netns_cmd,
180  loadbalancer_config_path_, agent_));
181 #ifdef WITH_LIBVIRT
183  "qemu:///system"));
184 #endif
185 
187  if (netns_timeout >= 1) {
188  netns_timeout_ = netns_timeout;
189  }
190 
192 
193  int workers = kWorkersDefault;
194  if (netns_workers > 0) {
195  workers = netns_workers;
196  }
197 
198  task_queues_.resize(workers);
199  for (std::vector<InstanceTaskQueue *>::iterator iter = task_queues_.begin();
200  iter != task_queues_.end(); ++iter) {
201  InstanceTaskQueue *task_queue =
203  assert(task_queue);
204  task_queue->set_on_timeout_cb(
205  boost::bind(&InstanceManager::OnTaskTimeout,
206  this, _1));
207  *iter = task_queue;
208  }
210  boost::bind(&InstanceManager::StaleTimeout, this));
211 
212 }
213 
214 void InstanceManager::SetNetNSCmd(const std::string &netns_cmd) {
216  prop.virtualization_type =
217  ServiceInstance::ServiceInstance::NetworkNamespace;
218  NetNSInstanceAdapter *adapter = static_cast<NetNSInstanceAdapter
219  *>(FindApplicableAdapter(prop));
220  if (adapter)
221  adapter->set_cmd(netns_cmd);
222 }
223 
225  stale_timer_interval_ = minutes * 60 * 1000;
226 }
227 
230  event.type = OnTaskTimeoutEvent;
231  event.task_queue = task_queue;
232 
233  work_queue_.Enqueue(event);
234 }
235 
237  std::stringstream ss;
238  ss << "TaskTimeOut for the TaskQ " << event.task_queue;
239  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
240 
242 }
243 
245  ServiceInstance *svc_instance = GetSvcInstance(event.task);
246  if (!svc_instance) {
247  return;
248  }
249 
250  std::stringstream ss;
251  ss << "Error for the Task " << event.task << " " << event.errors;
252  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
253 
254  InstanceState *state = GetState(svc_instance);
255  if (state != NULL) {
256  state->set_errors(event.errors);
257  }
258 }
259 
261  ServiceInstance *svc_instance = GetSvcInstance(event.task);
262  if (!svc_instance) {
263  return;
264  }
265 
266  std::stringstream ss;
267  ss << "Exit event for the Task " << event.task;
268  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
269 
270  UpdateStateStatusType(event);
271  for (std::vector<InstanceTaskQueue *>::iterator iter =
272  task_queues_.begin();
273  iter != task_queues_.end(); ++iter) {
274  InstanceTaskQueue *task_queue = *iter;
275  if (!task_queue->Empty()) {
276  if (task_queue->Front() == event.task) {
277  task_queue->Pop();
278  delete event.task;
279  task_queue->StopTimer();
280  DeleteState(svc_instance);
281  ScheduleNextTask(task_queue);
282  return;
283  }
284  }
285  }
286 
287 }
288 
290  if (event.type == OnErrorEvent) {
291  OnErrorEventHandler(event);
292  } else if (event.type == OnTaskTimeoutEvent) {
294  } else if (event.type == OnExitEvent) {
295  OnExitEventHandler(event);
296  }
297 
298  return true;
299 }
300 
302  ServiceInstance* svc_instance = UnregisterSvcInstance(event.task);
303  if (svc_instance) {
304  InstanceState *state = GetState(svc_instance);
305 
306  // The below code might not really capture the errors that
307  // occured in the child process. As we are relying on the pipe
308  // status to identify child exit, even if child process ends up
309  // in error state, pipe might return "success" because pipe is
310  // closed without erros. But what we want to reflect is the
311  // error of child process. If there is any error in the pipe
312  // status, we show that as is. If not, if there is any error string
313  // in error_, we mark the error status as -1.
314  // pipe returning boost::system::errc::no_such_file_or_directory
315  // error is considered as no pipe errors
316 
317  if (state != NULL) {
318  int error_status = event.error_val;
319  if (error_status ==
320  boost::system::errc::no_such_file_or_directory) {
321  error_status = 0;
322  }
323  if (!state->errors().empty()) {
324  if (error_status == 0) {
325  error_status = -1;
326  }
327  }
328 
329  state->set_status(error_status);
330 
331  if (error_status != 0) {
332  if (state->status_type() != InstanceState::Timeout) {
334  }
335  } else if (state->status_type() == InstanceState::Starting) {
337  } else if (state->status_type() == InstanceState::Stopping) {
339  }
340 
341  std::stringstream ss;
342  ss << "For the task " << event.task << " error status " <<
343  error_status << " status type " << state->status_type();
344  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
345  }
346  }
347 }
348 
350  return static_cast<InstanceState *>(
351  svc_instance->GetState(agent_->service_instance_table(),
352  si_listener_));
353 }
354 
356  ServiceInstance* svc_instance = GetSvcInstance(task);
357  if (svc_instance) {
358  InstanceState *state = GetState(svc_instance);
359  return state;
360  }
361  return NULL;
362 }
363 
365  InstanceState *state) {
366  svc_instance->SetState(agent_->service_instance_table(),
367  si_listener_,state);
368 }
369 
372 }
373 
375 
376  if (!svc_instance || !svc_instance->IsDeleted()) {
377  return false;
378  }
379 
380  InstanceState *state = GetState(svc_instance);
381  if (state && !state->tasks_running()) {
382  ClearState(svc_instance);
383  delete state;
384  ClearLastCmdType(svc_instance);
385  return true;
386  }
387 
388  return false;
389 }
390 
391 
393  DBTablePartition *partition = static_cast<DBTablePartition *>(
395 
396  if (!partition)
397  return;
398 
399  DBEntryBase *next = NULL;
400  for (DBEntryBase *entry = partition->GetFirst(); entry; entry = next) {
401  next = partition->GetNext(entry);
402  DBState *state =
403  entry->GetState(agent_->service_instance_table(), si_listener_);
404  if (state != NULL) {
405  entry->ClearState(agent_->service_instance_table(), si_listener_);
406  delete state;
407  ClearLastCmdType(static_cast<ServiceInstance *>(entry));
408  }
409  }
410 }
411 
413  StateClear();
416 
417  InstanceTaskQueue *task_queue;
418  for (std::vector<InstanceTaskQueue *>::iterator iter = task_queues_.begin();
419  iter != task_queues_.end(); ++iter) {
420  if ((task_queue = *iter) == NULL) {
421  continue;
422  }
423  task_queue->Clear();
424 
425  delete task_queue;
426  }
427  work_queue_.Shutdown();
428 }
429 
431  const boost::uuids::uuid &uuid) {
432  std::stringstream ss;
433  ss << uuid;
434  InstanceTaskQueue *task_queue = GetTaskQueue(ss.str());
435  task_queue->Push(task);
436  ScheduleNextTask(task_queue);
437 }
438 
440  boost::hash<std::string> hash;
441  int index = hash(str) % task_queues_.size();
442  return task_queues_[index];
443 }
444 
445 //After Run(), if child process is running, we keep the task status as
446 //"Starting" or "Stopping". We start a timer to track TaskTimeout time.
447 //if process is not runnning, we verify how many times we already attempted
448 //to run. If netns_reattempts_ are already crossed, we return false,
449 // so that caller deletes the task without running any further.
450 //If required reattempts are not done, we start a timer and return true
451 // so that same task is run again after the timeout. The task status is set to
452 //"reattempt" to track reattempt case
454  InstanceTask *task) {
455 
456 
457  InstanceState *state = GetState(task);
458  if (state) {
459  state->reset_errors();
460  }
461 
462  pid_t pid;
463  bool status = task->Run();
464 
465  std::stringstream ss;
466  ss << "Run status for the task " << task << " " << status;
467  ss << " With running status " << task->is_running();
468  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
469 
470  if (status || task->is_running()) {
471  pid = task->pid();
472  if (state != NULL) {
473  state->set_pid(pid);
474  state->set_cmd(task->cmd());
475  if (task->cmd_type() == Start) {
477  } else {
479  }
480  }
481  } else {
482 
483  ss.str(std::string());
484  ss << "Run failure for the task " << task << " attempt " << task->reattempts();
485  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
486 
487  if (state) {
489  state->set_cmd(task->cmd());
490  }
491  if (task->incr_reattempts() > netns_reattempts_) {
492  ss.str(std::string());
493  ss << "Run failure for the task " << task << " attempts exceeded";
494  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
495  return false;
496  }
497  }
498 
499  task_queue->StartTimer(netns_timeout_ * 1000);
500 
501  return true;
502 }
503 
504 //If Starting the task succeds we wait for another event on that task.
505 //If not the task is removed from the front of the queue and is delted.
507  while (!task_queue->Empty()) {
508 
509  InstanceTask *task = task_queue->Front();
510  InstanceState *state = GetState(task);
511 
512  if (!task->is_running()) {
513  bool status = StartTask(task_queue, task);
514  if (status) {
515  return;
516  }
517  } else {
518  int delay = time(NULL) - task->start_time();
519  if (delay < netns_timeout_) {
520  return;
521  }
522  if (state) {
524  }
525 
526  std::stringstream ss;
527  ss << "Timeout for the Task " << task << " delay " << delay;
528  ss << " netns timeout " << netns_timeout_ << " ";
529  ss << task->cmd();
530  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
531 
532  if (delay >= (netns_timeout_ * 2)) {
533  task->Terminate();
534  if (task->IsSetup())
535  return;
536  } else {
537  task->Stop();
538  return;
539  }
540  }
541 
542  task_queue->StopTimer();
543  task_queue->Pop();
544 
545  ServiceInstance* svc_instance = GetSvcInstance(task);
546  if (state && svc_instance)
547  state->decr_tasks_running();
548 
549  task_svc_instances_.erase(task);
550 
551  std::stringstream ss;
552  ss << "Delete of the Task " << task;
553  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
554 
555  DeleteState(svc_instance);
556 
557  delete task;
558  }
559 }
560 
562  TaskSvcMap::const_iterator iter =
563  task_svc_instances_.find(task);
564  if (iter != task_svc_instances_.end()) {
565  return iter->second;
566  }
567  return NULL;
568 }
569 
571  ServiceInstance *svc_instance) {
572  pair<TaskSvcMap::iterator, bool> result =
573  task_svc_instances_.insert(std::make_pair(task, svc_instance));
574  assert(result.second);
575 
576  InstanceState *state = GetState(svc_instance);
577  assert(state);
578  state->incr_tasks_running();
579 }
580 
582  for (TaskSvcMap::iterator iter =
583  task_svc_instances_.begin();
584  iter != task_svc_instances_.end(); ++iter) {
585  if (task == iter->first) {
586  ServiceInstance *svc_instance = iter->second;
587  InstanceState *state = GetState(svc_instance);
588  assert(state);
589  state->decr_tasks_running();
590  task_svc_instances_.erase(iter);
591  return svc_instance;
592  }
593  }
594 
595  return NULL;
596 }
597 
599 
600  InstanceState *state = GetState(svc_instance);
601  assert(state);
602 
603  TaskSvcMap::iterator iter =
604  task_svc_instances_.begin();
605  while(iter != task_svc_instances_.end()) {
606  if (svc_instance == iter->second) {
607  task_svc_instances_.erase(iter++);
608  state->decr_tasks_running();
609  } else {
610  ++iter;
611  }
612  }
613 }
614 
616  for (std::vector<InstanceManagerAdapter *>::iterator iter = adapters_.begin();
617  iter != adapters_.end(); ++iter) {
618  InstanceManagerAdapter *adapter = *iter;
619  if (adapter != NULL && adapter->isApplicable(props)) {
620  return adapter;
621  }
622  }
623  return NULL;
624 }
625 
627  InstanceState *state, bool update) {
628  const ServiceInstance::Properties &props = svc_instance->properties();
629  InstanceManagerAdapter *adapter = this->FindApplicableAdapter(props);
630  std::stringstream ss;
631  if (adapter != NULL) {
632  InstanceTask *task = adapter->CreateStartTask(props, update);
633  if (task != NULL) {
634  ss << "Starting the Task " << task << " " << task->cmd();
635  ss << " for " << props.instance_id;
636  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
637  task->set_on_data_cb(boost::bind(&InstanceManager::OnError,
638  this, _1, _2));
639  task->set_on_exit_cb(boost::bind(&InstanceManager::OnExit,
640  this, _1, _2));
641  state->set_properties(props);
642  RegisterSvcInstance(task, svc_instance);
643  std::stringstream info;
644  Enqueue(task, props.instance_id);
645  } else {
646  ss << "Error Starting the Task for " << props.instance_id;
647  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
648  }
649  } else {
650  ss << "Unknown virtualization type " << props.virtualization_type;
651  ss << " for " << svc_instance->ToString();
652  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
653  }
654 }
655 
656 
658  InstanceState *state) {
659  const ServiceInstance::Properties &props = state->properties();
660  InstanceManagerAdapter *adapter = this->FindApplicableAdapter(props);
661  std::stringstream ss;
662  if (adapter != NULL) {
663  InstanceTask *task = adapter->CreateStopTask(props);
664  if (task != NULL) {
665  ss << "Stopping the Task " << task << " " << task->cmd();
666  ss << " for " << props.instance_id;
667  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
668  task->set_on_data_cb(boost::bind(&InstanceManager::OnError,
669  this, _1, _2));
670  task->set_on_exit_cb(boost::bind(&InstanceManager::OnExit,
671  this, _1, _2));
672  RegisterSvcInstance(task, svc_instance);
673  std::stringstream info;
674  Enqueue(task, props.instance_id);
675  } else {
676  std::stringstream ss;
677  ss << "Error Stopping the Task for " << props.instance_id;
678  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
679  }
680  } else {
681  ss << "Unknown virtualization type " << props.virtualization_type;
682  ss << " for " << svc_instance->ToString();
683  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
684  }
685 }
686 
688  const std::string errors) {
689 
691  event.type = OnErrorEvent;
692  event.task = task;
693  event.errors = errors;
694 
695  work_queue_.Enqueue(event);
696 }
697 
699  const boost::system::error_code &ec) {
700 
702  event.type = OnExitEvent;
703  event.task = task;
704  event.error_val = ec.value();
705 
706  work_queue_.Enqueue(event);
707 }
708 
710  std::stringstream cmd_str;
711 
712  if (netns_cmd_.length() == 0) {
713  return;
714  }
715  cmd_str << netns_cmd_ << " destroy";
716 
717  cmd_str << " " << props.ServiceTypeString();
718  cmd_str << " " << UuidToString(props.instance_id);
719  cmd_str << " " << UuidToString(boost::uuids::nil_uuid());
720  cmd_str << " " << UuidToString(boost::uuids::nil_uuid());
722  if (props.loadbalancer_id.empty()) {
723  LOG(ERROR, "loadbalancer id is missing for service instance: "
724  << UuidToString(props.instance_id));
725  return;
726  }
727  cmd_str << " --loadbalancer-id " << props.loadbalancer_id;
728  }
729 
730  std::string cmd = cmd_str.str();
731  std::vector<std::string> argv;
732  boost::split(argv, cmd, boost::is_any_of(" "), boost::token_compress_on);
733  std::vector<const char *> c_argv(argv.size() + 1);
734  for (std::size_t i = 0; i != argv.size(); ++i) {
735  c_argv[i] = argv[i].c_str();
736  }
737 
738  std::stringstream ss;
739  ss << "StaleNetNS " << cmd;
740  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
741 
742  pid_t pid = vfork();
743  if (pid == 0) {
744  CloseTaskFds();
745  execvp(c_argv[0], (char **) c_argv.data());
746  perror("execvp");
747 
748  _exit(127);
749  }
750 }
751 
753  int last_cmd_type) {
754  std::string uuid = UuidToString(svc_instance->uuid());
755  std::map<std::string, int>::iterator iter =
756  last_cmd_types_.find(uuid);
757  if (iter != last_cmd_types_.end()) {
758  iter->second = last_cmd_type;
759  } else {
760  last_cmd_types_.insert(std::make_pair(uuid, last_cmd_type));
761  }
762 }
763 
765  std::string uuid = UuidToString(svc_instance->uuid());
766  std::map<std::string, int>::const_iterator iter =
767  last_cmd_types_.find(uuid);
768  if (iter != last_cmd_types_.end()) {
769  return iter->second;
770  }
771 
772  return 0;
773 }
774 
776  std::string uuid = UuidToString(svc_instance->uuid());
777  std::map<std::string, int>::iterator iter =
778  last_cmd_types_.find(uuid);
779  if (iter != last_cmd_types_.end()) {
780  last_cmd_types_.erase(iter);
781  }
782 }
783 
785  DBTablePartBase *db_part, DBEntryBase *entry) {
786  ServiceInstance *svc_instance = static_cast<ServiceInstance *>(entry);
787 
788  InstanceState *state = GetState(svc_instance);
789  if (svc_instance->IsDeleted()) {
790  if (state) {
791  if (GetLastCmdType(svc_instance) == Start) {
792  StopServiceInstance(svc_instance, state);
793  SetLastCmdType(svc_instance, Stop);
794  }
795  if (DeleteState(svc_instance)) {
796  return;
797  }
798  }
799  ClearLastCmdType(svc_instance);
800  } else {
801  if (state == NULL) {
802  state = new InstanceState();
803  SetState(svc_instance, state);
804  }
805 
806  bool usable = svc_instance->IsUsable();
807 
808  std::stringstream ss;
809  ss << "NetNS event notification for uuid: " << svc_instance->ToString();
810  ss << (usable ? " usable" : " not usable");
811  INSTANCE_MANAGER_TRACE(Trace, ss.str().c_str());
812 
813  if (!usable && GetLastCmdType(svc_instance) == Start) {
814  StopServiceInstance(svc_instance, state);
815  SetLastCmdType(svc_instance, Stop);
816  } else if (usable) {
817  if (GetLastCmdType(svc_instance) == Start && state->properties().CompareTo(
818  svc_instance->properties()) != 0) {
819  StartServiceInstance(svc_instance, state, true);
820  } else if (GetLastCmdType(svc_instance) != Start) {
821  StartServiceInstance(svc_instance, state, false);
822  SetLastCmdType(svc_instance, Start);
823  }
824  }
825  }
826 }
827 
829 
830  if (stale_cleaner_.get())
831  return false;
832  stale_cleaner_.reset(new NamespaceStaleCleaner(agent_, this));
833  stale_cleaner_->CleanStaleEntries();
834  stale_cleaner_.reset(NULL);
835  return false;
836 }
837 
839  namespace_store_path_ = path;
840 }
841 
842 /*
843  * InstanceState class
844  */
846  pid_(0), status_(0), status_type_(0), tasks_running_(0) {
847 }
848 
850  pid_ = 0;
851  status_ = 0;
852  errors_.empty();
853  cmd_.empty();
854 }
time_t start_time() const
Definition: instance_task.h:43
static void CloseTaskFds(void)
Definition: agent_cmn.h:80
int reattempts()
Definition: instance_task.h:59
void UpdateStateStatusType(InstanceManagerChildEvent event)
SandeshTraceBufferPtr InstanceManagerTraceBuf
std::map< std::string, int > last_cmd_types_
InstanceTask * Front()
void SetNamespaceStorePath(std::string path)
void STLDeleteValues(Container *container)
Definition: util.h:101
static boost::uuids::uuid StringToUuid(const std::string &str)
Definition: string_util.h:145
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
void Enqueue(InstanceTask *task, const boost::uuids::uuid &uuid)
void set_on_data_cb(OnDataCallback cb)
Definition: instance_task.h:47
DBState * GetState(DBTableBase *tbl_base, ListenerId listener) const
Definition: db_entry.cc:37
void Push(InstanceTask *task)
void StopServiceInstance(ServiceInstance *svc_instance, InstanceState *state)
std::vector< InstanceTaskQueue * > task_queues_
Definition: vm.h:14
TaskSvcMap task_svc_instances_
bool IsDeleted() const
Definition: db_entry.h:49
void SetState(DBTableBase *tbl_base, ListenerId listener, DBState *state)
Definition: db_entry.cc:22
DBTableBase::ListenerId si_listener_
virtual DBEntry * GetNext(const DBEntryBase *entry)
bool IsUsable() const
const std::string & ServiceTypeString() const
int incr_reattempts()
Definition: instance_task.h:55
void RegisterSvcInstance(InstanceTask *task, ServiceInstance *svc_instance)
void SetStaleTimerInterval(int minutes)
void StopStaleNetNS(ServiceInstance::Properties &props)
boost::uuids::uuid uuid
void UnregisterSvcInstance(ServiceInstance *svc_instance)
Definition: task_int.h:10
void set_cmd(const std::string &cmd)
static std::string UuidToString(const boost::uuids::uuid &id)
Definition: string_util.h:138
virtual int cmd_type() const =0
void OnErrorEventHandler(InstanceManagerChildEvent event)
std::vector< InstanceManagerAdapter * > adapters_
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
Definition: sandesh_trace.h:18
void OnTaskTimeout(InstanceTaskQueue *task_queue)
void set_pid(const pid_t &pid)
static const int kReattemptsDefault
virtual InstanceTask * CreateStartTask(const ServiceInstance::Properties &props, bool update)=0
bool is_running() const
Definition: instance_task.h:39
void Unregister(ListenerId listener)
Definition: db_table.cc:186
void set_on_timeout_cb(OnTimeoutCallback cb)
std::string loadbalancer_config_path_
virtual pid_t pid() const =0
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
Definition: db_table.cc:181
Definition: db.h:24
int CompareTo(const Properties &rhs) const
InstanceTaskQueue * GetTaskQueue(const std::string &str)
ServiceInstanceTable * service_instance_table() const
Definition: agent.h:817
NamespaceStaleCleaner(Agent *agent, InstanceManager *manager)
InstanceState * GetState(ServiceInstance *) const
virtual bool isApplicable(const ServiceInstance::Properties &props)=0
static const char namespace_store_path_default[]
Definition: agent.h:358
WorkQueue< InstanceManagerChildEvent > work_queue_
static const char loadbalancer_config_path_default[]
void StartServiceInstance(ServiceInstance *svc_instance, InstanceState *state, bool update)
boost::uuids::uuid instance_id
EventManager * event_manager() const
Definition: agent.h:1103
Definition: trace.h:220
ServiceInstance * GetSvcInstance(InstanceTask *task) const
void StartTimer(int time)
void set_properties(const ServiceInstance::Properties &properties)
int GetLastCmdType(ServiceInstance *svc_instance) const
std::string cmd_
virtual void Clear()
Definition: agent_db.cc:215
void OnExit(InstanceTask *task, const boost::system::error_code &ec)
std::string errors() const
boost::tokenizer< boost::char_separator< char > > tokenizer
void Initialize(DB *database, const std::string &netns_cmd, const std::string &docker_cmd, const int netns_workers, const int netns_timeout)
AgentDBEntry * Find(const DBEntry *key, bool ret_del)
Definition: agent_db.cc:134
void set_on_exit_cb(OnExitCallback cb)
Definition: instance_task.h:51
void ScheduleNextTask(InstanceTaskQueue *task_queue)
const ServiceInstance::Properties & properties() const
void set_cmd(const std::string &netns_cmd)
void ClearState(DBTableBase *tbl_base, ListenerId listener)
Definition: db_entry.cc:73
const Properties & properties() const
const boost::uuids::uuid & uuid() const
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
Definition: db_table.cc:436
std::string AgentGUID() const
Definition: agent.cc:157
void set_status(const int status)
std::unique_ptr< NamespaceStaleCleaner > stale_cleaner_
bool StartTask(InstanceTaskQueue *task_queue, InstanceTask *task)
virtual void Terminate()=0
void OnTaskTimeoutEventHandler(InstanceManagerChildEvent event)
static const int kWorkersDefault
static const int kTimeoutDefault
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
Definition: timer.cc:108
void OnExitEventHandler(InstanceManagerChildEvent event)
int tasks_running() const
InstanceManagerAdapter * FindApplicableAdapter(const ServiceInstance::Properties &props)
#define INSTANCE_MANAGER_TRACE(obj,...)
#define LOG(_Level, _Msg)
Definition: logging.h:33
virtual InstanceTask * CreateStopTask(const ServiceInstance::Properties &props)=0
virtual bool Run()=0
std::string netns_cmd_
virtual DBEntry * GetFirst()
void OnError(InstanceTask *task, const std::string errors)
void EventObserver(DBTablePartBase *db_part, DBEntryBase *entry)
std::string namespace_store_path_
std::string errors_
void SetState(ServiceInstance *svc_instance, InstanceState *state)
virtual const std::string & cmd() const =0
void ClearState(ServiceInstance *svc_instance)
#define INSTANCE_MANAGER_TASK_NAME
Definition: agent.h:298
bool DequeueEvent(InstanceManagerChildEvent event)
void SetNetNSCmd(const std::string &netns_cmd)
void SetLastCmdType(ServiceInstance *svc_instance, int last_cmd_type)
bool DeleteState(ServiceInstance *svc_instance)
virtual std::string ToString() const
int status_type() const
VmTable * vm_table() const
Definition: agent.h:490
void ClearLastCmdType(ServiceInstance *svc_instance)
virtual bool IsSetup()=0
struct task_ task
void set_status_type(const int status)
static const char namespace_prefix[]
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
Definition: sandesh_trace.h:46
bool isMockMode() const
Definition: agent.cc:153
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
virtual void Stop()=0