7 #include <boost/bind/bind.hpp>
8 #include <boost/functional/hash.hpp>
9 #include <boost/filesystem.hpp>
10 #include <boost/tokenizer.hpp>
26 using namespace boost::placeholders;
33 "/var/lib/contrail/loadbalancer/";
41 : agent_(agent), manager_(manager) {
45 namespace fs = boost::filesystem;
48 fs::path ns(manager_->namespace_store_path_);
49 if ( !fs::exists(ns) || !fs::is_directory(ns)) {
53 typedef boost::tokenizer<boost::char_separator<char> >
tokenizer;
54 boost::char_separator<char> slash_sep(
"/");
55 boost::char_separator<char> colon_sep(
":");
56 fs::directory_iterator end_iter;
57 for(fs::directory_iterator iter(ns); iter != end_iter; iter++) {
60 tokenizer tokens(iter->path().string(), slash_sep);
62 for(tokenizer::iterator it=tokens.begin(); it!=tokens.end(); it++){
68 std::size_t vrouter_found;
70 if (vrouter_found == std::string::npos) {
83 if (agent_->vm_table()->Find(&key,
true)) {
90 tokenizer::iterator next_tok = ++(tok.begin());
92 if (next_tok != tok.end()) {
98 manager_->StopStaleNetNS(prop);
103 std::stringstream pathgen;
104 pathgen << manager_->loadbalancer_config_path_
107 boost::system::error_code error;
108 if (fs::exists(pathgen.str())) {
109 fs::remove(pathgen.str(), error);
111 std::stringstream ss;
112 ss <<
"Stale loadbalancer cfg fle delete error ";
113 ss << error.message();
138 stale_timer_interval_(5 * 60 * 1000),
139 stale_timer_(
TimerManager::CreateTimer(*(agent->event_manager()->io_service()),
153 const std::string &docker_cmd,
154 const int netns_workers,
155 const int netns_timeout) {
163 LOG(ERROR,
"NetNS path for network namespace command not specified "
164 "in the config file, the namespaces won't be started");
166 if (docker_cmd.length() == 0) {
167 LOG(ERROR,
"Path for Docker starter command not specified "
168 "in the config file, the Docker instances won't be started");
171 std::stringstream pathgen;
173 boost::filesystem::path dir(pathgen.str());
174 boost::system::error_code error;
175 boost::filesystem::create_directories(dir, error);
177 LOG(ERROR,
"Falied to create Loadbalancer Directory " << pathgen.str());
189 if (netns_timeout >= 1) {
196 if (netns_workers > 0) {
197 workers = netns_workers;
201 for (std::vector<InstanceTaskQueue *>::iterator iter =
task_queues_.begin();
219 ServiceInstance::ServiceInstance::NetworkNamespace;
233 event.task_queue = task_queue;
239 std::stringstream ss;
240 ss <<
"TaskTimeOut for the TaskQ " <<
event.task_queue;
252 std::stringstream ss;
253 ss <<
"Error for the Task " <<
event.task <<
" " <<
event.errors;
268 std::stringstream ss;
269 ss <<
"Exit event for the Task " <<
event.task;
273 for (std::vector<InstanceTaskQueue *>::iterator iter =
277 if (!task_queue->
Empty()) {
320 int error_status =
event.error_val;
322 boost::system::errc::no_such_file_or_directory) {
325 if (!state->
errors().empty()) {
326 if (error_status == 0) {
333 if (error_status != 0) {
343 std::stringstream ss;
344 ss <<
"For the task " <<
event.task <<
" error status " <<
345 error_status <<
" status type " << state->
status_type();
378 if (!svc_instance || !svc_instance->
IsDeleted()) {
403 next = partition->
GetNext(entry);
420 for (std::vector<InstanceTaskQueue *>::iterator iter =
task_queues_.begin();
422 if ((task_queue = *iter) == NULL) {
434 std::stringstream ss;
442 boost::hash<std::string> hash;
465 bool status =
task->Run();
467 std::stringstream ss;
468 ss <<
"Run status for the task " <<
task <<
" " << status;
469 ss <<
" With running status " <<
task->is_running();
472 if (status ||
task->is_running()) {
485 ss.str(std::string());
486 ss <<
"Run failure for the task " <<
task <<
" attempt " <<
task->reattempts();
494 ss.str(std::string());
495 ss <<
"Run failure for the task " <<
task <<
" attempts exceeded";
509 while (!task_queue->
Empty()) {
514 if (!
task->is_running()) {
520 int delay = time(NULL) -
task->start_time();
528 std::stringstream ss;
529 ss <<
"Timeout for the Task " <<
task <<
" delay " << delay;
548 if (state && svc_instance)
553 std::stringstream ss;
554 ss <<
"Delete of the Task " <<
task;
564 TaskSvcMap::const_iterator iter =
574 pair<TaskSvcMap::iterator, bool> result =
576 assert(result.second);
584 for (TaskSvcMap::iterator iter =
587 if (
task == iter->first) {
605 TaskSvcMap::iterator iter =
608 if (svc_instance == iter->second) {
618 for (std::vector<InstanceManagerAdapter *>::iterator iter =
adapters_.begin();
632 std::stringstream ss;
633 if (adapter != NULL) {
636 ss <<
"Starting the Task " <<
task <<
" " <<
task->cmd();
645 std::stringstream info;
648 ss <<
"Error Starting the Task for " << props.
instance_id;
653 ss <<
" for " << svc_instance->
ToString();
663 std::stringstream ss;
664 if (adapter != NULL) {
667 ss <<
"Stopping the Task " <<
task <<
" " <<
task->cmd();
675 std::stringstream info;
678 std::stringstream ss;
679 ss <<
"Error Stopping the Task for " << props.
instance_id;
684 ss <<
" for " << svc_instance->
ToString();
690 const std::string errors) {
695 event.errors = errors;
701 const boost::system::error_code &ec) {
706 event.error_val = ec.value();
712 std::stringstream cmd_str;
721 cmd_str <<
" " <<
UuidToString(boost::uuids::nil_uuid());
722 cmd_str <<
" " <<
UuidToString(boost::uuids::nil_uuid());
725 LOG(ERROR,
"loadbalancer id is missing for service instance: "
732 std::string cmd = cmd_str.str();
733 std::vector<std::string> argv;
734 boost::split(argv, cmd, boost::is_any_of(
" "), boost::token_compress_on);
735 std::vector<const char *> c_argv(argv.size() + 1);
736 for (std::size_t i = 0; i != argv.size(); ++i) {
737 c_argv[i] = argv[i].c_str();
740 std::stringstream ss;
741 ss <<
"StaleNetNS " << cmd;
747 execvp(c_argv[0], (
char **) c_argv.data());
757 std::map<std::string, int>::iterator iter =
760 iter->second = last_cmd_type;
768 std::map<std::string, int>::const_iterator iter =
779 std::map<std::string, int>::iterator iter =
808 bool usable = svc_instance->
IsUsable();
810 std::stringstream ss;
811 ss <<
"NetNS event notification for uuid: " << svc_instance->
ToString();
812 ss << (usable ?
" usable" :
" not usable");
848 pid_(0), status_(0), status_type_(0), tasks_running_(0) {
#define INSTANCE_MANAGER_TASK_NAME
static void CloseTaskFds(void)
EventManager * event_manager() const
std::string AgentGUID() const
ServiceInstanceTable * service_instance_table() const
DBState * GetState(DBTableBase *tbl_base, ListenerId listener) const
void ClearState(DBTableBase *tbl_base, ListenerId listener)
void SetState(DBTableBase *tbl_base, ListenerId listener, DBState *state)
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
void Unregister(ListenerId listener)
virtual DBEntry * GetNext(const DBEntryBase *entry)
virtual DBEntry * GetFirst()
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
virtual InstanceTask * CreateStartTask(const ServiceInstance::Properties &props, bool update)=0
virtual InstanceTask * CreateStopTask(const ServiceInstance::Properties &props)=0
virtual bool isApplicable(const ServiceInstance::Properties &props)=0
NamespaceStaleCleaner(Agent *agent, InstanceManager *manager)
InstanceManager * manager_
std::vector< InstanceManagerAdapter * > adapters_
void SetStaleTimerInterval(int minutes)
void OnExitEventHandler(InstanceManagerChildEvent event)
bool StartTask(InstanceTaskQueue *task_queue, InstanceTask *task)
ServiceInstance * GetSvcInstance(InstanceTask *task) const
bool DequeueEvent(InstanceManagerChildEvent event)
void Enqueue(InstanceTask *task, const boost::uuids::uuid &uuid)
void OnTaskTimeoutEventHandler(InstanceManagerChildEvent event)
void SetNamespaceStorePath(std::string path)
void ClearLastCmdType(ServiceInstance *svc_instance)
void SetState(ServiceInstance *svc_instance, InstanceState *state)
static const int kWorkersDefault
void SetNetNSCmd(const std::string &netns_cmd)
void OnExit(InstanceTask *task, const boost::system::error_code &ec)
static const int kReattemptsDefault
void UpdateStateStatusType(InstanceManagerChildEvent event)
std::vector< InstanceTaskQueue * > task_queues_
InstanceTaskQueue * GetTaskQueue(const std::string &str)
void OnError(InstanceTask *task, const std::string errors)
static const int kTimeoutDefault
InstanceState * GetState(ServiceInstance *) const
void UnregisterSvcInstance(ServiceInstance *svc_instance)
bool DeleteState(ServiceInstance *svc_instance)
InstanceManagerAdapter * FindApplicableAdapter(const ServiceInstance::Properties &props)
DBTableBase::ListenerId si_listener_
void Initialize(DB *database, const std::string &netns_cmd, const std::string &docker_cmd, const int netns_workers, const int netns_timeout)
void ClearState(ServiceInstance *svc_instance)
void OnErrorEventHandler(InstanceManagerChildEvent event)
std::map< std::string, int > last_cmd_types_
void ScheduleNextTask(InstanceTaskQueue *task_queue)
WorkQueue< InstanceManagerChildEvent > work_queue_
std::unique_ptr< NamespaceStaleCleaner > stale_cleaner_
void OnTaskTimeout(InstanceTaskQueue *task_queue)
void StopStaleNetNS(ServiceInstance::Properties &props)
TaskSvcMap task_svc_instances_
void SetLastCmdType(ServiceInstance *svc_instance, int last_cmd_type)
void StopServiceInstance(ServiceInstance *svc_instance, InstanceState *state)
void StartServiceInstance(ServiceInstance *svc_instance, InstanceState *state, bool update)
void RegisterSvcInstance(InstanceTask *task, ServiceInstance *svc_instance)
std::string loadbalancer_config_path_
int stale_timer_interval_
std::string namespace_store_path_
int GetLastCmdType(ServiceInstance *svc_instance) const
void EventObserver(DBTablePartBase *db_part, DBEntryBase *entry)
void set_pid(const pid_t &pid)
std::string errors() const
int tasks_running() const
void set_cmd(const std::string &cmd)
const ServiceInstance::Properties & properties() const
void set_status(const int status)
void set_errors(const std::string &errors)
void set_status_type(const int status)
void set_properties(const ServiceInstance::Properties &properties)
void StartTimer(int time)
void Push(InstanceTask *task)
void set_on_timeout_cb(OnTimeoutCallback cb)
void set_cmd(const std::string &netns_cmd)
const Properties & properties() const
const boost::uuids::uuid & uuid() const
virtual std::string ToString() const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
static bool DeleteTimer(Timer *Timer)
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
SandeshTraceBufferPtr InstanceManagerTraceBuf(SandeshTraceBufferCreate("InstanceManager", 1000))
static const char namespace_store_path_default[]
static const char namespace_prefix[]
static const char loadbalancer_config_path_default[]
#define INSTANCE_MANAGER_TRACE(obj,...)
#define LOG(_Level, _Msg)
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
static boost::uuids::uuid StringToUuid(const std::string &str)
static std::string UuidToString(const boost::uuids::uuid &id)
InstanceTaskQueue * task_queue
boost::uuids::uuid instance_id
int CompareTo(const Properties &rhs) const
std::string loadbalancer_id
const std::string & ServiceTypeString() const
boost::tokenizer< boost::char_separator< char > > tokenizer
void STLDeleteValues(Container *container)