7 #include <sandesh/request_pipeline.h>
9 #include "rapidjson/writer.h"
10 #include "rapidjson/stringbuffer.h"
13 #include "config_client_log_types.h"
14 #include "config_client_show_types.h"
16 #include <boost/algorithm/string/join.hpp>
17 #include <boost/algorithm/string/predicate.hpp>
18 #include <boost/foreach.hpp>
19 #include <boost/functional/hash.hpp>
20 #include <boost/ptr_container/ptr_map.hpp>
21 #include <boost/uuid/uuid.hpp>
37 #include "config_client_log_types.h"
38 #include "config_client_show_types.h"
39 #include "sandesh/common/vns_constants.h"
47 using contrail_rapidjson::Value;
48 using contrail_rapidjson::Document;
49 using contrail_rapidjson::SizeType;
50 using contrail_rapidjson::StringBuffer;
51 using contrail_rapidjson::Writer;
66 etcd_client_(etcd_client) {
75 return "ConfigEtcdClient::EtcdWatcher";
88 num_workers_(num_workers)
99 ConfigStaticObjectFactory::Create<ConfigEtcdPartition>(
this, i));
115 ConfigClientMgrDebug,
116 "ETCD Watcher SM: StartWatcher: ETCD watch disabled");
123 if (
mgr()->is_reinit_triggered()) {
125 ConfigClientMgrDebug,
126 "ETCD Watcher SM: StartWatcher: re init triggered,"
127 " don't enqueue ETCD Watcher Task.");
146 if (etcd_client_->mgr()->is_reinit_triggered()) {
148 ConfigClientMgrDebug,
149 "ETCD Watcher SM: Run: re init triggered,"
150 " don't wait for end of config");
157 client()->eqlif_->Watch(
"/contrail/",
169 if (
mgr()->is_reinit_triggered()) {
171 ConfigClientMgrDebug,
172 "ETCD Watcher SM: ProcessResponse: re init triggered,"
192 }
else if (resp.
action() == 1) {
194 }
else if (resp.
action() == 2) {
205 "Database initialization failed");
222 process::ConnectionType::DATABASE,
"Etcd",
223 process::ConnectionStatus::UP,
224 eqlif_->endpoints(),
"Established ETCD connection");
226 "ETCD SM: Established ETCD connection");
229 process::ConnectionType::DATABASE,
"Etcd",
230 process::ConnectionStatus::DOWN,
231 eqlif_->endpoints(),
"Lost ETCD connection");
233 "ETCD SM: Lost ETCD connection");
240 if (
mgr()->is_reinit_triggered())
return false;
247 "ETCD SM: BulkDataSync Started");
254 long num_config_readers_still_processing =
256 if (num_config_readers_still_processing == 1) {
258 "Etcd SM: BulkSyncDone by all readers");
262 "Etcd SM: One reader finished BulkSync");
268 "ETCD SM: Post shutdown during re-init");
292 boost::hash<string> string_hash;
314 size_t front_pos = uuid.rfind(
'/');
315 string uuid_key = uuid.substr(front_pos + 1);
318 if (oper ==
"CREATE" || oper ==
"UPDATE") {
321 d.Parse<0>(value.c_str());
322 Document::AllocatorType &a = d.GetAllocator();
327 "non-object json. uuid: "
328 + uuid_key +
" value: "
329 + value +
" .Skipping");
336 if (!d.HasMember(
"type")) {
337 string type = uuid.substr(10, front_pos - 10);
340 d.AddMember(v.SetString(
"type", a),
341 va.SetString(type.c_str(), a), a);
343 Writer<StringBuffer> writer(sb);
345 value = sb.GetString();
349 if (d.HasMember(
"type") &&
350 d.HasMember(
"fq_name")) {
351 string obj_type = d[
"type"].GetString();
353 const Value &name = d[
"fq_name"];
354 for (Value::ConstValueIterator name_itr = name.Begin();
355 name_itr != name.End(); ++name_itr) {
356 fq_name += name_itr->GetString();
359 fq_name.erase(fq_name.end()-1);
364 }
else if (oper ==
"DELETE") {
380 for (UUIDValueList::const_iterator it = uuid_list.begin();
381 it != uuid_list.end(); it++) {
389 string prefix =
"/contrail/";
390 bool read_done =
false;
393 for (ConfigClientManager::ObjectTypeList::const_iterator it =
394 mgr()->config_json_parser()->ObjectTypeListToRead().begin();
399 next_key = prefix + it->c_str();
404 unsigned int num_entries;
409 if (
mgr()->is_reinit_triggered()) {
411 "ETCD SM: Abort UUID reader on reinit trigger");
426 EtcdResponse::kv_map kvs = resp.
kvmap();
437 for (multimap<string, string>::const_iterator iter = kvs.begin();
443 next_key = iter->first;
444 if (!boost::starts_with(next_key,
"/contrail/")) {
446 "ETCD SM: Non-contrail uuid: "
447 + next_key +
" received");
449 uuid_list.push_back(make_pair(iter->first, iter->second));
469 if (kvs.size() < num_entries) {
472 }
else if (resp.
err_code() == 100) {
523 const string &search_string,
525 const string &last_uuid,
526 uint32_t num_entries,
527 vector<ConfigDBUUIDCacheEntry> *entries)
const {
529 num_entries, entries);
533 const string &lookup_key) {
539 : config_client_(client), worker_id_(idx) {
569 const string &uuid_key,
570 const string &value_str) {
571 pair<UUIDProcessSet::iterator, bool> ret;
578 size_t front_pos = uuid_key.rfind(
'/');
579 string uuid = uuid_key.substr(front_pos + 1);
599 if ((oper ==
"DELETE") &&
600 (ret.first->second->oper ==
"CREATE")) {
605 ret.first->second->
oper = oper;
606 ret.first->second->uuid =
uuid;
607 ret.first->second->value = value_str;
616 UUIDCacheMap::const_iterator uuid_iter,
617 ConfigDBUUIDCacheEntry *entry)
const {
618 entry->set_uuid(uuid);
619 entry->set_timestamp(
621 entry->set_fq_name(uuid_iter->second->GetFQName());
622 entry->set_obj_type(uuid_iter->second->GetObjType());
623 entry->set_json_str(uuid_iter->second->GetJsonString());
627 const string &search_string,
628 const string &last_uuid,
629 uint32_t num_entries,
630 vector<ConfigDBUUIDCacheEntry> *entries)
const {
632 regex search_expr(search_string);
633 for (UUIDCacheMap::const_iterator it =
640 ConfigDBUUIDCacheEntry entry;
642 entries->push_back(entry);
654 return uuid_iter->second;
673 string tmp_uuid =
uuid;
674 pair<UUIDCacheMap::iterator, bool> ret_uuid =
676 assert(ret_uuid.second);
677 uuid_iter = ret_uuid.first;
694 return uuid_iter->second;
699 uint32_t retry_time_pow_of_two =
707 const string value) {
710 *parent_->client()->event_manager()->io_service(),
711 "UUID retry timer for " +
uuid,
713 "config_client::Reader"),
714 parent_->worker_id_);
716 "Created UUID read retry timer ", uuid);
718 retry_timer_->Cancel();
719 retry_timer_->Start(parent_->UUIDRetryTimeInMSec(
this),
727 "Start/restart UUID Read Retry timer due to configuration", uuid);
734 retry_timer_->Cancel();
739 "UUID Read retry timer - deleted timer due to configuration",
746 return (retry_timer_->running());
752 const string value) {
754 parent_->client()->EnqueueUUIDRequest(
755 "UPDATE", parent_->client()->uuid_str(uuid), value);
763 std::string message =
"Timer";
765 "UUID Read Retry Timer error ", message, message);
769 const string &prop)
const {
770 ListMapSet::const_iterator it = list_map_set_.find(prop);
771 if (it == list_map_set_.end()) {
774 return (it->second ==
false);
790 bool notify_update =
false;
794 Value::ConstMemberIterator itr = doc.MemberBegin();
795 Document::AllocatorType &a = doc.GetAllocator();
797 while (itr != doc.MemberEnd()) {
799 string key = itr->name.GetString();
806 if (!notify_update &&
807 key.compare(
"type") != 0 &&
808 key.compare(
"fq_name") != 0) {
809 notify_update =
true;
819 itr = doc.EraseMember(itr);
827 if (key.compare(
"type") == 0) {
828 type_str = itr->value.GetString();
829 itr = doc.EraseMember(itr);
834 GetWrapperFieldName(obj_type, key.c_str());
835 if (!wrapper.empty()) {
848 Value &map_value = doc[key.c_str()];
854 }
else if (key.compare(
"parent_type") == 0) {
860 string parent_type = doc[key.c_str()].GetString();
861 replace(parent_type.begin(), parent_type.end(),
863 doc[key.c_str()].SetString(parent_type.c_str(), a);
865 }
else if (key.compare(
"parent_uuid") == 0) {
873 string parent_uuid = doc[key.c_str()].GetString();
875 if (parent_fq_name ==
"ERROR") {
877 "Parent fq_name not available for ", uuid);
882 }
else if (key.compare(
"bgpaas_session_attributes") == 0) {
889 doc[key.c_str()].SetString(
"", a);
891 }
else if (key.find(
"_refs") != string::npos && add_change) {
902 string ref_type = key.substr(0, key.length() - 5);
903 bool link_with_attr =
905 IsLinkWithAttr(obj_type, ref_type);
908 Value *v = &doc[key.c_str()];
910 assert(v->IsArray());
911 for (SizeType i = 0; i < v->Size(); i++) {
915 if (link_with_attr) {
916 if (va[
"attr"].IsNull()) {
917 (*v)[i].RemoveMember(
"attr");
919 (*v)[i].AddMember(
"attr", vm.SetObject(), a);
931 Value &uuidVal = va[
"uuid"];
932 const string ref_uuid = uuidVal.GetString();
935 if (ref_fq_name ==
"ERROR") {
939 if (!va.HasMember(
"to")) {
941 "Ref fq_name not available for ", uuid);
946 const Value &name = va[
"to"];
947 for (Value::ConstValueIterator itr = name.Begin();
948 itr != name.End(); ++itr) {
949 ref_fq_name += itr->GetString();
952 ref_fq_name.erase(ref_fq_name.end()-1);
957 (*v)[i].RemoveMember(
"to");
958 Value vs1(ref_fq_name.c_str(), a);
959 (*v)[i].AddMember(
"to", vs1, a);
968 cacheDoc.Parse<0>(cache_json_str.c_str());
969 cacheDoc.RemoveMember(key.c_str());
973 refVal.CopyFrom(*v, a);
974 cacheDoc.AddMember(vr.SetString(key.c_str(), a),
977 Writer<StringBuffer> writer(sb);
978 cacheDoc.Accept(writer);
979 string cache_str = sb.GetString();
983 if (itr != doc.MemberEnd()) itr++;
986 if (!notify_update) {
988 "ETCD SM: Nothing to update");
993 Writer<StringBuffer> writer1(sb1);
995 refDoc.CopyFrom(doc, refDoc.GetAllocator());
996 refDoc.Accept(writer1);
997 string refString = sb1.GetString();
999 "ETCD SM: JSON Doc fed to CJP: " + refString);
1008 const string &uuid_key) {
1015 if (
client()->FindFQName(uuid_key) ==
"ERROR") {
1017 "ETCD SM: Nothing to delete");
1052 delDoc.Parse<0>(cache_json_str.c_str());
1075 const string &value_str) {
1082 bool is_new =
false;
1097 if (cache_json_str.compare(
"retry") == 0) {
1102 cache_json_str = value_str;
1106 cacheDoc.Parse<0>(cache_json_str.c_str());
1115 updDoc.Parse<0>(value_str.c_str());
1122 if (!updDoc.HasMember(
"fq_name") ||
1123 !updDoc.HasMember(
"type")) {
1125 "fq_name or type not present for ",
1126 "obj_uuid_table with uuid: ", uuid_key);
1142 Value::ConstMemberIterator itr = updDoc.MemberBegin();
1143 while (itr != updDoc.MemberEnd()) {
1145 key = itr->name.GetString();
1152 if (key.compare(
"draft_mode_state") == 0) {
1153 string mode = itr->value.GetString();
1154 if (!mode.empty()) {
1159 itr = updDoc.EraseMember(itr);
1170 if (key.compare(
"type") == 0) {
1171 string type = itr->value.GetString();
1173 }
else if (key.compare(
"fq_name") == 0) {
1175 const Value &name = updDoc[key.c_str()];
1176 for (Value::ConstValueIterator name_itr = name.Begin();
1177 name_itr != name.End(); ++name_itr) {
1178 fq_name += name_itr->GetString();
1181 fq_name.erase(fq_name.end()-1);
1193 if (!is_new && cacheDoc.HasMember(key.c_str()) &&
1194 key.compare(
"type") != 0 &&
1195 key.compare(
"fq_name") != 0) {
1196 if (cacheDoc[key.c_str()] == updDoc[key.c_str()]) {
1197 itr = updDoc.EraseMember(itr);
1199 assert(cacheDoc.RemoveMember(key.c_str()));
1201 if (itr != updDoc.MemberEnd()) itr++;
1242 const string &lookup_key) {
1259 int num_req_handled = 0;
1277 if (obj_req->
oper ==
"CREATE" || obj_req->
oper ==
"UPDATE") {
1279 }
else if (obj_req->
oper ==
"DELETE") {
1281 }
else if (obj_req->
oper ==
"EndOfConfig") {
1290 if (++num_req_handled ==
client()->GetMaxRequestsToYield()) {
1296 if (
client()->mgr()->is_reinit_triggered()) {
1298 "ETCD SM: Clear UUID process set due to reinit");
1306 UUIDProcessSet::iterator req_it =
1308 delete req_it->second;
ConfigJsonParserBase * config_json_parser()
virtual bool BulkDataSync()
bool EtcdReadRetryTimerExpired(const string uuid, const string value)
void STLDeleteValues(Container *container)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
virtual void InvalidateFQNameCache(const std::string &uuid)
UUIDProcessWorkQType obj_process_queue_
ConfigClientManager * mgr()
void ProcessResponse(EtcdResponse resp)
WatchAction action() const
boost::scoped_ptr< TaskTrigger > uuid_reader_
bool UUIDToObjCacheShow(const string &search_string, const string &last_uuid, uint32_t num_entries, vector< ConfigDBUUIDCacheEntry > *entries) const
virtual void InitDatabase()
virtual void SetObjType(std::string obj_type)
bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
virtual const uint64_t GetInitRetryTimeUSec() const
ConfigEtcdClient * etcd_client_
const string & GetJsonString() const
UUIDCacheMap uuid_cache_map_
void EnableEtcdReadRetry(const string uuid, const string value)
const std::string & value() const
virtual void SetFQName(std::string fq_name)
virtual bool IsTaskTriggered() const
EtcdWatcher(ConfigEtcdClient *etcd_client)
virtual void InitConnectionInfo()
virtual void UpdateConnectionInfo(bool success, bool force)
virtual void ClearFQNameCache()
#define CONFIG_CLIENT_DEBUG(obj,...)
int GetTaskId(const std::string &name)
virtual std::string uuid_str(const std::string &uuid)
void EnqueueUUIDRequest(string oper, string obj_type, string uuid_str)
static bool regex_match(const std::string &input, const regex ®ex)
tbb::atomic< long > bulk_sync_status_
static const std::set< std::string > skip_properties
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
void WaitForEndOfConfig()
virtual bool UUIDToObjCacheShow(const string &search_string, int inst_num, const string &last_uuid, uint32_t num_entries, vector< ConfigDBUUIDCacheEntry > *entries) const
virtual void PurgeFQNameCache(const std::string &uuid)
void ProcessUUIDUpdate(const string &uuid_key, const string &value_str)
const std::string & key() const
void SetJsonString(const string &value_str)
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
ConfigEtcdClient(ConfigClientManager *mgr, EventManager *evm, const ConfigClientOptions &options, int num_workers)
boost::scoped_ptr< EtcdIf > eqlif_
UUIDCacheEntry * GetUUIDCacheEntry(const string &uuid)
bool RequestHandler(ObjectProcessReq *req)
#define CHECK_CONCURRENCY(...)
virtual ~ConfigEtcdPartition()
virtual int HashUUID(const std::string &uuid_str) const
UUIDProcessSet uuid_process_set_
void EtcdReadRetryTimerErrorHandler()
static bool disable_watch_
virtual const std::string & GetObjType() const
void ProcessUUIDDelete(const string &uuid_key)
ConfigEtcdClient * client() const
PartitionList partitions_
static bool regex_search(const std::string &input, const regex ®ex)
static Timer * CreateTimer(boost::asio::io_context &service, const std::string &name, int task_id=Timer::GetTimerTaskId(), int task_instance=Timer::GetTimerInstanceId(), bool delete_on_completion=false)
#define CONFIG_CLIENT_WARN(obj,...)
bool IsRetryTimerRunning() const
virtual void AddFQNameCache(const std::string &uuid, const std::string &obj_type, const std::string &fq_name)
void EnqueueDBSyncRequest(const UUIDValueList &uuid_list)
const kv_map & kvmap() const
bool IsTaskTriggered() const
boost::shared_ptr< TaskTrigger > config_reader_
const std::vector< std::string > & config_db_ips() const
uint32_t GetRetryCount() const
void RemoveObjReqEntry(string &uuid)
static uint64_t UTCTimestampUsec()
ConfigEtcdClient * client()
virtual void ProcessResponse(EtcdResponse resp)
virtual ~ConfigEtcdClient()
bool is_reinit_triggered()
list< UUIDValueType > UUIDValueList
ConfigEtcdPartition * GetPartition(const string &uuid)
virtual std::string FindFQName(const std::string &uuid) const
static const uint32_t kMaxUUIDRetryTimePowOfTwo
const ObjectTypeList & ObjectTypeListToRead() const
virtual void PostShutdown()
static ConnectionState * GetInstance()
void FillUUIDToObjCacheInfo(const string &uuid, UUIDCacheMap::const_iterator uuid_iter, ConfigDBUUIDCacheEntry *entry) const
virtual bool Receive(const ConfigCass2JsonAdapter &adapter, bool add_change)=0
void HandleEtcdConnectionStatus(bool success, bool force_update=false)
void AddUUIDToProcessList(const string &oper, const string &uuid_key, const string &value_str)
ConfigEtcdPartition(ConfigEtcdClient *client, size_t idx)
void Enqueue(ObjectProcessReq *req)
void DisableEtcdReadRetry(const string uuid)
bool ListOrMapPropEmpty(const string &prop) const
void DeleteCacheMap(const string &uuid)
int GetFirstConfigDbPort() const
virtual uint32_t GetNumReadRequestToBunch() const
void SetListOrMapPropEmpty(const string &prop, bool empty)
virtual bool GenerateAndPushJson(const string &uuid_key, Document &doc, bool add_change, UUIDCacheEntry *cache)
Task is a wrapper over tbb::task to support policies.
virtual bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
static const uint32_t kMinUUIDRetryTimeMSec
string Description() const
virtual int UUIDRetryTimeInMSec(const UUIDCacheEntry *obj) const
static bool DeleteTimer(Timer *Timer)
static std::string UTCUsecToString(uint64_t tstamp)