7 #include <boost/bind.hpp>
8 #include <boost/functional/hash.hpp>
9 #include <boost/filesystem.hpp>
10 #include <boost/tokenizer.hpp>
31 "/var/lib/contrail/loadbalancer/";
43 namespace fs = boost::filesystem;
47 if ( !fs::exists(ns) || !fs::is_directory(ns)) {
51 typedef boost::tokenizer<boost::char_separator<char> >
tokenizer;
52 boost::char_separator<char> slash_sep(
"/");
53 boost::char_separator<char> colon_sep(
":");
54 fs::directory_iterator end_iter;
55 for(fs::directory_iterator iter(ns); iter != end_iter; iter++) {
58 tokenizer tokens(iter->path().string(), slash_sep);
60 for(tokenizer::iterator it=tokens.begin(); it!=tokens.end(); it++){
66 std::size_t vrouter_found;
68 if (vrouter_found == std::string::npos) {
88 tokenizer::iterator next_tok = ++(tok.begin());
90 if (next_tok != tok.end()) {
101 std::stringstream pathgen;
105 boost::system::error_code error;
106 if (fs::exists(pathgen.str())) {
107 fs::remove(pathgen.str(), error);
109 std::stringstream ss;
110 ss <<
"Stale loadbalancer cfg fle delete error ";
111 ss << error.message();
136 stale_timer_interval_(5 * 60 * 1000),
137 stale_timer_(
TimerManager::CreateTimer(*(agent->event_manager()->io_service()),
151 const std::string &docker_cmd,
152 const int netns_workers,
153 const int netns_timeout) {
161 LOG(ERROR,
"NetNS path for network namespace command not specified "
162 "in the config file, the namespaces won't be started");
164 if (docker_cmd.length() == 0) {
165 LOG(ERROR,
"Path for Docker starter command not specified "
166 "in the config file, the Docker instances won't be started");
169 std::stringstream pathgen;
171 boost::filesystem::path dir(pathgen.str());
172 boost::system::error_code error;
173 boost::filesystem::create_directories(dir, error);
175 LOG(ERROR,
"Falied to create Loadbalancer Directory " << pathgen.str());
187 if (netns_timeout >= 1) {
194 if (netns_workers > 0) {
195 workers = netns_workers;
199 for (std::vector<InstanceTaskQueue *>::iterator iter =
task_queues_.begin();
217 ServiceInstance::ServiceInstance::NetworkNamespace;
231 event.task_queue = task_queue;
237 std::stringstream ss;
238 ss <<
"TaskTimeOut for the TaskQ " <<
event.task_queue;
250 std::stringstream ss;
251 ss <<
"Error for the Task " <<
event.task <<
" " <<
event.errors;
266 std::stringstream ss;
267 ss <<
"Exit event for the Task " <<
event.task;
271 for (std::vector<InstanceTaskQueue *>::iterator iter =
275 if (!task_queue->
Empty()) {
318 int error_status =
event.error_val;
320 boost::system::errc::no_such_file_or_directory) {
323 if (!state->
errors().empty()) {
324 if (error_status == 0) {
331 if (error_status != 0) {
341 std::stringstream ss;
342 ss <<
"For the task " <<
event.task <<
" error status " <<
343 error_status <<
" status type " << state->
status_type();
376 if (!svc_instance || !svc_instance->
IsDeleted()) {
401 next = partition->
GetNext(entry);
418 for (std::vector<InstanceTaskQueue *>::iterator iter =
task_queues_.begin();
420 if ((task_queue = *iter) == NULL) {
432 std::stringstream ss;
440 boost::hash<std::string> hash;
463 bool status =
task->Run();
465 std::stringstream ss;
466 ss <<
"Run status for the task " <<
task <<
" " << status;
467 ss <<
" With running status " <<
task->is_running();
470 if (status ||
task->is_running()) {
483 ss.str(std::string());
484 ss <<
"Run failure for the task " <<
task <<
" attempt " <<
task->reattempts();
492 ss.str(std::string());
493 ss <<
"Run failure for the task " <<
task <<
" attempts exceeded";
507 while (!task_queue->
Empty()) {
512 if (!
task->is_running()) {
518 int delay = time(NULL) -
task->start_time();
526 std::stringstream ss;
527 ss <<
"Timeout for the Task " <<
task <<
" delay " << delay;
546 if (state && svc_instance)
551 std::stringstream ss;
552 ss <<
"Delete of the Task " <<
task;
562 TaskSvcMap::const_iterator iter =
572 pair<TaskSvcMap::iterator, bool> result =
574 assert(result.second);
582 for (TaskSvcMap::iterator iter =
585 if (
task == iter->first) {
603 TaskSvcMap::iterator iter =
606 if (svc_instance == iter->second) {
616 for (std::vector<InstanceManagerAdapter *>::iterator iter =
adapters_.begin();
630 std::stringstream ss;
631 if (adapter != NULL) {
634 ss <<
"Starting the Task " <<
task <<
" " <<
task->cmd();
643 std::stringstream info;
646 ss <<
"Error Starting the Task for " << props.
instance_id;
651 ss <<
" for " << svc_instance->
ToString();
661 std::stringstream ss;
662 if (adapter != NULL) {
665 ss <<
"Stopping the Task " <<
task <<
" " <<
task->cmd();
673 std::stringstream info;
676 std::stringstream ss;
677 ss <<
"Error Stopping the Task for " << props.
instance_id;
682 ss <<
" for " << svc_instance->
ToString();
688 const std::string errors) {
693 event.errors = errors;
699 const boost::system::error_code &ec) {
704 event.error_val = ec.value();
710 std::stringstream cmd_str;
719 cmd_str <<
" " <<
UuidToString(boost::uuids::nil_uuid());
720 cmd_str <<
" " <<
UuidToString(boost::uuids::nil_uuid());
723 LOG(ERROR,
"loadbalancer id is missing for service instance: "
730 std::string cmd = cmd_str.str();
731 std::vector<std::string> argv;
732 boost::split(argv, cmd, boost::is_any_of(
" "), boost::token_compress_on);
733 std::vector<const char *> c_argv(argv.size() + 1);
734 for (std::size_t i = 0; i != argv.size(); ++i) {
735 c_argv[i] = argv[i].c_str();
738 std::stringstream ss;
739 ss <<
"StaleNetNS " << cmd;
745 execvp(c_argv[0], (
char **) c_argv.data());
755 std::map<std::string, int>::iterator iter =
758 iter->second = last_cmd_type;
766 std::map<std::string, int>::const_iterator iter =
777 std::map<std::string, int>::iterator iter =
806 bool usable = svc_instance->
IsUsable();
808 std::stringstream ss;
809 ss <<
"NetNS event notification for uuid: " << svc_instance->
ToString();
810 ss << (usable ?
" usable" :
" not usable");
846 pid_(0), status_(0), status_type_(0), tasks_running_(0) {
#define INSTANCE_MANAGER_TASK_NAME
static void CloseTaskFds(void)
AgentDBEntry * Find(const DBEntry *key, bool ret_del)
VmTable * vm_table() const
EventManager * event_manager() const
std::string AgentGUID() const
ServiceInstanceTable * service_instance_table() const
DBState * GetState(DBTableBase *tbl_base, ListenerId listener) const
void ClearState(DBTableBase *tbl_base, ListenerId listener)
void SetState(DBTableBase *tbl_base, ListenerId listener, DBState *state)
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
void Unregister(ListenerId listener)
virtual DBEntry * GetNext(const DBEntryBase *entry)
virtual DBEntry * GetFirst()
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
virtual InstanceTask * CreateStartTask(const ServiceInstance::Properties &props, bool update)=0
virtual InstanceTask * CreateStopTask(const ServiceInstance::Properties &props)=0
virtual bool isApplicable(const ServiceInstance::Properties &props)=0
NamespaceStaleCleaner(Agent *agent, InstanceManager *manager)
InstanceManager * manager_
std::vector< InstanceManagerAdapter * > adapters_
void SetStaleTimerInterval(int minutes)
void OnExitEventHandler(InstanceManagerChildEvent event)
bool StartTask(InstanceTaskQueue *task_queue, InstanceTask *task)
ServiceInstance * GetSvcInstance(InstanceTask *task) const
bool DequeueEvent(InstanceManagerChildEvent event)
void Enqueue(InstanceTask *task, const boost::uuids::uuid &uuid)
void OnTaskTimeoutEventHandler(InstanceManagerChildEvent event)
void SetNamespaceStorePath(std::string path)
void ClearLastCmdType(ServiceInstance *svc_instance)
void SetState(ServiceInstance *svc_instance, InstanceState *state)
static const int kWorkersDefault
void SetNetNSCmd(const std::string &netns_cmd)
void OnExit(InstanceTask *task, const boost::system::error_code &ec)
static const int kReattemptsDefault
void UpdateStateStatusType(InstanceManagerChildEvent event)
std::vector< InstanceTaskQueue * > task_queues_
InstanceTaskQueue * GetTaskQueue(const std::string &str)
void OnError(InstanceTask *task, const std::string errors)
static const int kTimeoutDefault
InstanceState * GetState(ServiceInstance *) const
void UnregisterSvcInstance(ServiceInstance *svc_instance)
bool DeleteState(ServiceInstance *svc_instance)
InstanceManagerAdapter * FindApplicableAdapter(const ServiceInstance::Properties &props)
DBTableBase::ListenerId si_listener_
void Initialize(DB *database, const std::string &netns_cmd, const std::string &docker_cmd, const int netns_workers, const int netns_timeout)
void ClearState(ServiceInstance *svc_instance)
void OnErrorEventHandler(InstanceManagerChildEvent event)
std::map< std::string, int > last_cmd_types_
void ScheduleNextTask(InstanceTaskQueue *task_queue)
WorkQueue< InstanceManagerChildEvent > work_queue_
std::unique_ptr< NamespaceStaleCleaner > stale_cleaner_
void OnTaskTimeout(InstanceTaskQueue *task_queue)
void StopStaleNetNS(ServiceInstance::Properties &props)
TaskSvcMap task_svc_instances_
void SetLastCmdType(ServiceInstance *svc_instance, int last_cmd_type)
void StopServiceInstance(ServiceInstance *svc_instance, InstanceState *state)
void StartServiceInstance(ServiceInstance *svc_instance, InstanceState *state, bool update)
void RegisterSvcInstance(InstanceTask *task, ServiceInstance *svc_instance)
std::string loadbalancer_config_path_
int stale_timer_interval_
std::string namespace_store_path_
int GetLastCmdType(ServiceInstance *svc_instance) const
void EventObserver(DBTablePartBase *db_part, DBEntryBase *entry)
void set_pid(const pid_t &pid)
std::string errors() const
int tasks_running() const
void set_cmd(const std::string &cmd)
const ServiceInstance::Properties & properties() const
void set_status(const int status)
void set_errors(const std::string &errors)
void set_status_type(const int status)
void set_properties(const ServiceInstance::Properties &properties)
void StartTimer(int time)
void Push(InstanceTask *task)
void set_on_timeout_cb(OnTimeoutCallback cb)
void set_cmd(const std::string &netns_cmd)
const Properties & properties() const
const boost::uuids::uuid & uuid() const
virtual std::string ToString() const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
static bool DeleteTimer(Timer *Timer)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
SandeshTraceBufferPtr InstanceManagerTraceBuf(SandeshTraceBufferCreate("InstanceManager", 1000))
static const char namespace_store_path_default[]
static const char namespace_prefix[]
static const char loadbalancer_config_path_default[]
#define INSTANCE_MANAGER_TRACE(obj,...)
#define LOG(_Level, _Msg)
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
static boost::uuids::uuid StringToUuid(const std::string &str)
static std::string UuidToString(const boost::uuids::uuid &id)
InstanceTaskQueue * task_queue
boost::uuids::uuid instance_id
int CompareTo(const Properties &rhs) const
std::string loadbalancer_id
const std::string & ServiceTypeString() const
boost::tokenizer< boost::char_separator< char > > tokenizer
void STLDeleteValues(Container *container)