7 #include <boost/bind.hpp>
8 #include <boost/functional/hash.hpp>
9 #include <boost/filesystem.hpp>
10 #include <boost/tokenizer.hpp>
31 "/var/lib/contrail/loadbalancer/";
43 namespace fs = boost::filesystem;
47 if ( !fs::exists(ns) || !fs::is_directory(ns)) {
51 typedef boost::tokenizer<boost::char_separator<char> >
tokenizer;
52 boost::char_separator<char> slash_sep(
"/");
53 boost::char_separator<char> colon_sep(
":");
54 fs::directory_iterator end_iter;
55 for(fs::directory_iterator iter(ns); iter != end_iter; iter++) {
58 tokenizer tokens(iter->path().string(), slash_sep);
60 for(tokenizer::iterator it=tokens.begin(); it!=tokens.end(); it++){
66 std::size_t vrouter_found;
67 vrouter_found = ns_name.find(namespace_prefix);
68 if (vrouter_found == std::string::npos) {
73 ns_name.replace(vrouter_found, strlen(namespace_prefix),
"");
78 tokenizer tok(ns_name, colon_sep);
88 tokenizer::iterator next_tok = ++(tok.begin());
90 if (next_tok != tok.end()) {
101 std::stringstream pathgen;
105 boost::system::error_code error;
106 if (fs::exists(pathgen.str())) {
107 fs::remove(pathgen.str(), error);
109 std::stringstream ss;
110 ss <<
"Stale loadbalancer cfg fle delete error ";
111 ss << error.message();
134 loadbalancer_config_path_(loadbalancer_config_path_default),
135 namespace_store_path_(namespace_store_path_default),
136 stale_timer_interval_(5 * 60 * 1000),
137 stale_timer_(
TimerManager::CreateTimer(*(agent->event_manager()->io_service()),
151 const std::string &docker_cmd,
152 const int netns_workers,
153 const int netns_timeout) {
161 LOG(ERROR,
"NetNS path for network namespace command not specified "
162 "in the config file, the namespaces won't be started");
164 if (docker_cmd.length() == 0) {
165 LOG(ERROR,
"Path for Docker starter command not specified "
166 "in the config file, the Docker instances won't be started");
169 std::stringstream pathgen;
171 boost::filesystem::path dir(pathgen.str());
172 boost::system::error_code error;
173 boost::filesystem::create_directories(dir, error);
175 LOG(ERROR,
"Falied to create Loadbalancer Directory " << pathgen.str());
180 loadbalancer_config_path_,
agent_));
187 if (netns_timeout >= 1) {
194 if (netns_workers > 0) {
195 workers = netns_workers;
199 for (std::vector<InstanceTaskQueue *>::iterator iter =
task_queues_.begin();
217 ServiceInstance::ServiceInstance::NetworkNamespace;
231 event.task_queue = task_queue;
237 std::stringstream ss;
238 ss <<
"TaskTimeOut for the TaskQ " <<
event.task_queue;
250 std::stringstream ss;
251 ss <<
"Error for the Task " <<
event.task <<
" " <<
event.errors;
256 state->set_errors(event.
errors);
266 std::stringstream ss;
267 ss <<
"Exit event for the Task " <<
event.task;
271 for (std::vector<InstanceTaskQueue *>::iterator iter =
275 if (!task_queue->
Empty()) {
276 if (task_queue->
Front() ==
event.task) {
318 int error_status =
event.error_val;
320 boost::system::errc::no_such_file_or_directory) {
323 if (!state->
errors().empty()) {
324 if (error_status == 0) {
331 if (error_status != 0) {
341 std::stringstream ss;
342 ss <<
"For the task " <<
event.task <<
" error status " <<
343 error_status <<
" status type " << state->
status_type();
376 if (!svc_instance || !svc_instance->
IsDeleted()) {
401 next = partition->
GetNext(entry);
418 for (std::vector<InstanceTaskQueue *>::iterator iter =
task_queues_.begin();
420 if ((task_queue = *iter) == NULL) {
432 std::stringstream ss;
435 task_queue->
Push(task);
440 boost::hash<std::string> hash;
463 bool status = task->
Run();
465 std::stringstream ss;
466 ss <<
"Run status for the task " << task <<
" " << status;
467 ss <<
" With running status " << task->
is_running();
483 ss.str(std::string());
484 ss <<
"Run failure for the task " << task <<
" attempt " << task->
reattempts();
492 ss.str(std::string());
493 ss <<
"Run failure for the task " << task <<
" attempts exceeded";
507 while (!task_queue->
Empty()) {
513 bool status =
StartTask(task_queue, task);
526 std::stringstream ss;
527 ss <<
"Timeout for the Task " << task <<
" delay " << delay;
532 if (delay >= (netns_timeout_ * 2)) {
546 if (state && svc_instance)
551 std::stringstream ss;
552 ss <<
"Delete of the Task " <<
task;
562 TaskSvcMap::const_iterator iter =
572 pair<TaskSvcMap::iterator, bool> result =
574 assert(result.second);
582 for (TaskSvcMap::iterator iter =
585 if (task == iter->first) {
603 TaskSvcMap::iterator iter =
606 if (svc_instance == iter->second) {
616 for (std::vector<InstanceManagerAdapter *>::iterator iter =
adapters_.begin();
630 std::stringstream ss;
631 if (adapter != NULL) {
634 ss <<
"Starting the Task " << task <<
" " << task->
cmd();
643 std::stringstream info;
646 ss <<
"Error Starting the Task for " << props.
instance_id;
651 ss <<
" for " << svc_instance->
ToString();
661 std::stringstream ss;
662 if (adapter != NULL) {
665 ss <<
"Stopping the Task " << task <<
" " << task->
cmd();
673 std::stringstream info;
676 std::stringstream ss;
677 ss <<
"Error Stopping the Task for " << props.
instance_id;
682 ss <<
" for " << svc_instance->
ToString();
688 const std::string errors) {
693 event.errors = errors;
699 const boost::system::error_code &ec) {
704 event.error_val = ec.value();
710 std::stringstream cmd_str;
719 cmd_str <<
" " <<
UuidToString(boost::uuids::nil_uuid());
720 cmd_str <<
" " <<
UuidToString(boost::uuids::nil_uuid());
723 LOG(ERROR,
"loadbalancer id is missing for service instance: "
730 std::string cmd = cmd_str.str();
731 std::vector<std::string> argv;
732 boost::split(argv, cmd, boost::is_any_of(
" "), boost::token_compress_on);
733 std::vector<const char *> c_argv(argv.size() + 1);
734 for (std::size_t i = 0; i != argv.size(); ++i) {
735 c_argv[i] = argv[i].c_str();
738 std::stringstream ss;
739 ss <<
"StaleNetNS " << cmd;
745 execvp(c_argv[0], (
char **) c_argv.data());
755 std::map<std::string, int>::iterator iter =
758 iter->second = last_cmd_type;
766 std::map<std::string, int>::const_iterator iter =
777 std::map<std::string, int>::iterator iter =
806 bool usable = svc_instance->
IsUsable();
808 std::stringstream ss;
809 ss <<
"NetNS event notification for uuid: " << svc_instance->
ToString();
810 ss << (usable ?
" usable" :
" not usable");
846 pid_(0), status_(0), status_type_(0), tasks_running_(0) {
time_t start_time() const
static void CloseTaskFds(void)
void UpdateStateStatusType(InstanceManagerChildEvent event)
SandeshTraceBufferPtr InstanceManagerTraceBuf
std::map< std::string, int > last_cmd_types_
void SetNamespaceStorePath(std::string path)
void STLDeleteValues(Container *container)
static boost::uuids::uuid StringToUuid(const std::string &str)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(InstanceTask *task, const boost::uuids::uuid &uuid)
void set_on_data_cb(OnDataCallback cb)
DBState * GetState(DBTableBase *tbl_base, ListenerId listener) const
void Push(InstanceTask *task)
std::string loadbalancer_id
void StopServiceInstance(ServiceInstance *svc_instance, InstanceState *state)
std::vector< InstanceTaskQueue * > task_queues_
TaskSvcMap task_svc_instances_
void SetState(DBTableBase *tbl_base, ListenerId listener, DBState *state)
DBTableBase::ListenerId si_listener_
virtual DBEntry * GetNext(const DBEntryBase *entry)
const std::string & ServiceTypeString() const
void RegisterSvcInstance(InstanceTask *task, ServiceInstance *svc_instance)
void SetStaleTimerInterval(int minutes)
void StopStaleNetNS(ServiceInstance::Properties &props)
void UnregisterSvcInstance(ServiceInstance *svc_instance)
void set_cmd(const std::string &cmd)
static std::string UuidToString(const boost::uuids::uuid &id)
virtual int cmd_type() const =0
void OnErrorEventHandler(InstanceManagerChildEvent event)
InstanceTaskQueue * task_queue
std::vector< InstanceManagerAdapter * > adapters_
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
void OnTaskTimeout(InstanceTaskQueue *task_queue)
void set_pid(const pid_t &pid)
static const int kReattemptsDefault
virtual InstanceTask * CreateStartTask(const ServiceInstance::Properties &props, bool update)=0
void Unregister(ListenerId listener)
int stale_timer_interval_
void set_on_timeout_cb(OnTimeoutCallback cb)
std::string loadbalancer_config_path_
virtual pid_t pid() const =0
ListenerId Register(ChangeCallback callback, const std::string &name="unspecified")
int CompareTo(const Properties &rhs) const
InstanceTaskQueue * GetTaskQueue(const std::string &str)
ServiceInstanceTable * service_instance_table() const
NamespaceStaleCleaner(Agent *agent, InstanceManager *manager)
InstanceState * GetState(ServiceInstance *) const
virtual bool isApplicable(const ServiceInstance::Properties &props)=0
static const char namespace_store_path_default[]
WorkQueue< InstanceManagerChildEvent > work_queue_
static const char loadbalancer_config_path_default[]
void StartServiceInstance(ServiceInstance *svc_instance, InstanceState *state, bool update)
boost::uuids::uuid instance_id
EventManager * event_manager() const
ServiceInstance * GetSvcInstance(InstanceTask *task) const
void StartTimer(int time)
void set_properties(const ServiceInstance::Properties &properties)
int GetLastCmdType(ServiceInstance *svc_instance) const
void OnExit(InstanceTask *task, const boost::system::error_code &ec)
std::string errors() const
boost::tokenizer< boost::char_separator< char > > tokenizer
void Initialize(DB *database, const std::string &netns_cmd, const std::string &docker_cmd, const int netns_workers, const int netns_timeout)
AgentDBEntry * Find(const DBEntry *key, bool ret_del)
void set_on_exit_cb(OnExitCallback cb)
void ScheduleNextTask(InstanceTaskQueue *task_queue)
const ServiceInstance::Properties & properties() const
void set_cmd(const std::string &netns_cmd)
void ClearState(DBTableBase *tbl_base, ListenerId listener)
const Properties & properties() const
const boost::uuids::uuid & uuid() const
virtual DBTablePartBase * GetTablePartition(const DBRequestKey *key)
std::string AgentGUID() const
void set_status(const int status)
std::unique_ptr< NamespaceStaleCleaner > stale_cleaner_
bool StartTask(InstanceTaskQueue *task_queue, InstanceTask *task)
virtual void Terminate()=0
void OnTaskTimeoutEventHandler(InstanceManagerChildEvent event)
static const int kWorkersDefault
static const int kTimeoutDefault
bool Start(int time, Handler handler, ErrorHandler error_handler=NULL)
void OnExitEventHandler(InstanceManagerChildEvent event)
int tasks_running() const
InstanceManagerAdapter * FindApplicableAdapter(const ServiceInstance::Properties &props)
#define INSTANCE_MANAGER_TRACE(obj,...)
#define LOG(_Level, _Msg)
virtual InstanceTask * CreateStopTask(const ServiceInstance::Properties &props)=0
virtual DBEntry * GetFirst()
InstanceManager * manager_
void OnError(InstanceTask *task, const std::string errors)
void EventObserver(DBTablePartBase *db_part, DBEntryBase *entry)
std::string namespace_store_path_
void SetState(ServiceInstance *svc_instance, InstanceState *state)
virtual const std::string & cmd() const =0
void ClearState(ServiceInstance *svc_instance)
#define INSTANCE_MANAGER_TASK_NAME
bool DequeueEvent(InstanceManagerChildEvent event)
void SetNetNSCmd(const std::string &netns_cmd)
void SetLastCmdType(ServiceInstance *svc_instance, int last_cmd_type)
bool DeleteState(ServiceInstance *svc_instance)
virtual std::string ToString() const
VmTable * vm_table() const
void ClearLastCmdType(ServiceInstance *svc_instance)
void set_status_type(const int status)
static const char namespace_prefix[]
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
static bool DeleteTimer(Timer *Timer)