OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
config_etcd_client.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 
7 #include <sandesh/request_pipeline.h>
8 
9 #include "rapidjson/writer.h"
10 #include "rapidjson/stringbuffer.h"
11 
12 #include "config_client_log.h"
13 #include "config_client_log_types.h"
14 #include "config_client_show_types.h"
15 
16 #include <boost/algorithm/string/join.hpp>
17 #include <boost/algorithm/string/predicate.hpp>
18 #include <boost/foreach.hpp>
19 #include <boost/functional/hash.hpp>
20 #include <boost/ptr_container/ptr_map.hpp>
21 #include <boost/uuid/uuid.hpp>
22 #include <map>
23 #include <set>
24 #include <string>
25 #include <utility>
26 
27 #include "base/connection_info.h"
28 #include "base/logging.h"
29 #include "base/regex.h"
30 #include "base/task.h"
31 #include "base/task_annotations.h"
32 #include "base/task_trigger.h"
34 #include "io/event_manager.h"
35 #include "config_factory.h"
36 #include "config_client_log.h"
37 #include "config_client_log_types.h"
38 #include "config_client_show_types.h"
39 #include "sandesh/common/vns_constants.h"
40 
41 using contrail::regex;
44 using namespace std;
47 using contrail_rapidjson::Value;
48 using contrail_rapidjson::Document;
49 using contrail_rapidjson::SizeType;
50 using contrail_rapidjson::StringBuffer;
51 using contrail_rapidjson::Writer;
52 
54 
63 public:
64  EtcdWatcher(ConfigEtcdClient *etcd_client) :
65  Task(TaskScheduler::GetInstance()->GetTaskId("etcd::EtcdWatcher")),
66  etcd_client_(etcd_client) {
67  }
68 
69  virtual bool Run();
70 
72  return etcd_client_;
73  }
74  string Description() const {
75  return "ConfigEtcdClient::EtcdWatcher";
76  }
77 
78 private:
80  void ProcessResponse(EtcdResponse resp);
81 };
82 
86  int num_workers)
87  : ConfigDbClient(mgr, evm, options),
88  num_workers_(num_workers)
89 {
90  eqlif_.reset(ConfigStaticObjectFactory::Create<EtcdIf>(config_db_ips(),
92  false));
93 
96 
97  for (int i = 0; i < num_workers_; i++) {
98  partitions_.push_back(
99  ConfigStaticObjectFactory::Create<ConfigEtcdPartition>(this, i));
100  }
101 
102  uuid_reader_.reset(new
103  TaskTrigger(boost::bind(&ConfigEtcdClient::UUIDReader, this),
104  TaskScheduler::GetInstance()->GetTaskId("config_client::DBReader"),
105  0));
106 }
107 
110 }
111 
113  if (disable_watch_) {
115  ConfigClientMgrDebug,
116  "ETCD Watcher SM: StartWatcher: ETCD watch disabled");
117  return;
118  }
119 
123  if (mgr()->is_reinit_triggered()) {
125  ConfigClientMgrDebug,
126  "ETCD Watcher SM: StartWatcher: re init triggered,"
127  " don't enqueue ETCD Watcher Task.");
128  return;
129  }
130 
132  Task *task = new EtcdWatcher(this);
133  scheduler->Enqueue(task);
134 }
135 
137  EtcdResponse resp) {
138  client()->ProcessResponse(resp);
139 }
140 
146  if (etcd_client_->mgr()->is_reinit_triggered()) {
148  ConfigClientMgrDebug,
149  "ETCD Watcher SM: Run: re init triggered,"
150  " don't wait for end of config");
151  return true;
152  }
153 
157  client()->eqlif_->Watch("/contrail/",
159  this, _1));
160 
161  return true;
162 }
163 
169  if (mgr()->is_reinit_triggered()) {
171  ConfigClientMgrDebug,
172  "ETCD Watcher SM: ProcessResponse: re init triggered,"
173  " stop watching");
174  eqlif_->StopWatch();
175  return;
176  }
177 
182  mgr()->WaitForEndOfConfig();
183 
188  assert(resp.err_code() == 0);
189 
190  if (resp.action() == 0) {
191  EnqueueUUIDRequest("CREATE", resp.key(), resp.value());
192  } else if (resp.action() == 1) {
193  EnqueueUUIDRequest("UPDATE", resp.key(), resp.value());
194  } else if (resp.action() == 2) {
195  EnqueueUUIDRequest("DELETE", resp.key(), resp.value());
196  }
197 }
198 
200  HandleEtcdConnectionStatus(false, true);
201  while (true) {
202  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug, "ETCD SM: Db Init");
203  if (!eqlif_->Connect()) {
204  CONFIG_CLIENT_DEBUG(ConfigEtcdInitErrorMessage,
205  "Database initialization failed");
206  if (!InitRetry()) return;
207  continue;
208  }
209  break;
210  }
212  BulkDataSync();
213 }
214 
216  bool force_update) {
217  UpdateConnectionInfo(success, force_update);
218 
219  if (success) {
220  // Update connection info
222  process::ConnectionType::DATABASE, "Etcd",
223  process::ConnectionStatus::UP,
224  eqlif_->endpoints(), "Established ETCD connection");
225  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
226  "ETCD SM: Established ETCD connection");
227  } else {
229  process::ConnectionType::DATABASE, "Etcd",
230  process::ConnectionStatus::DOWN,
231  eqlif_->endpoints(), "Lost ETCD connection");
232  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
233  "ETCD SM: Lost ETCD connection");
234  }
235 }
236 
238  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug, "ETCD SM: DB Init Retry");
239  // If reinit is triggered, return false to abort connection attempt
240  if (mgr()->is_reinit_triggered()) return false;
241  usleep(GetInitRetryTimeUSec());
242  return true;
243 }
244 
246  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
247  "ETCD SM: BulkDataSync Started");
249  uuid_reader_->Set();
250  return true;
251 }
252 
254  long num_config_readers_still_processing =
255  bulk_sync_status_.fetch_and_decrement();
256  if (num_config_readers_still_processing == 1) {
257  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
258  "Etcd SM: BulkSyncDone by all readers");
259  mgr()->EndOfConfig();
260  } else {
261  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
262  "Etcd SM: One reader finished BulkSync");
263  }
264 }
265 
267  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
268  "ETCD SM: Post shutdown during re-init");
271 }
272 
275  int worker_id = HashUUID(uuid);
276  return partitions_[worker_id];
277 }
278 
279 const ConfigEtcdPartition *
280 ConfigEtcdClient::GetPartition(const string &uuid) const {
281  int worker_id = HashUUID(uuid);
282  return partitions_[worker_id];
283 }
284 
285 const ConfigEtcdPartition *
286 ConfigEtcdClient::GetPartition(int worker_id) const {
287  assert(worker_id < num_workers_);
288  return partitions_[worker_id];
289 }
290 
291 int ConfigEtcdClient::HashUUID(const string &uuid_str) const {
292  boost::hash<string> string_hash;
293  return string_hash(uuid_str) % num_workers_;
294 }
295 
297  string uuid,
298  string value) {
313  // Get the trimmed uuid
314  size_t front_pos = uuid.rfind('/');
315  string uuid_key = uuid.substr(front_pos + 1);
316 
317  // Cache uses the trimmed uuid
318  if (oper == "CREATE" || oper == "UPDATE") {
319 
320  Document d;
321  d.Parse<0>(value.c_str());
322  Document::AllocatorType &a = d.GetAllocator();
323 
324  // If non-object JSON is received, log a warning and return.
325  if (!d.IsObject()) {
326  CONFIG_CLIENT_WARN(ConfigClientMgrWarning, "ETCD SM: Received "
327  "non-object json. uuid: "
328  + uuid_key + " value: "
329  + value + " .Skipping");
330  return;
331  }
332 
333  // ETCD does not provide obj-type since it is encoded in the
334  // UUID key. Since config_json_parser and IFMap need type to be
335  // present in the document, fix up by adding obj-type.
336  if (!d.HasMember("type")) {
337  string type = uuid.substr(10, front_pos - 10);
338  Value v;
339  Value va;
340  d.AddMember(v.SetString("type", a),
341  va.SetString(type.c_str(), a), a);
342  StringBuffer sb;
343  Writer<StringBuffer> writer(sb);
344  d.Accept(writer);
345  value = sb.GetString();
346  }
347 
348  // Add to FQName cache if not present
349  if (d.HasMember("type") &&
350  d.HasMember("fq_name")) {
351  string obj_type = d["type"].GetString();
352  string fq_name;
353  const Value &name = d["fq_name"];
354  for (Value::ConstValueIterator name_itr = name.Begin();
355  name_itr != name.End(); ++name_itr) {
356  fq_name += name_itr->GetString();
357  fq_name += ":";
358  }
359  fq_name.erase(fq_name.end()-1);
360  if (FindFQName(uuid_key) == "ERROR") {
361  AddFQNameCache(uuid_key, obj_type, fq_name);
362  }
363  }
364  } else if (oper == "DELETE") {
365  // Invalidate cache
366  InvalidateFQNameCache(uuid_key);
367  }
368 
369  // Request has the uuid with entire path
370  ObjectProcessReq *req = new ObjectProcessReq(oper, uuid, value);
371 
372  // GetPartition uses the trimmed uuid so that the same
373  // partition is returned for different requests on the
374  // same UUID
375  GetPartition(uuid_key)->Enqueue(req);
376 }
377 
379  const UUIDValueList &uuid_list) {
380  for (UUIDValueList::const_iterator it = uuid_list.begin();
381  it != uuid_list.end(); it++) {
382  EnqueueUUIDRequest("CREATE", it->first, it->second);
383  }
384 }
385 
387 
388  string next_key;
389  string prefix = "/contrail/";
390  bool read_done = false;
391  ostringstream os;
392 
393  for (ConfigClientManager::ObjectTypeList::const_iterator it =
394  mgr()->config_json_parser()->ObjectTypeListToRead().begin();
395  it != mgr()->config_json_parser()->ObjectTypeListToRead().end();
396  it++) {
397 
398  /* Form the key for the object type to lookup */
399  next_key = prefix + it->c_str();
400  os.str("");
401  os << next_key << 1;
402 
403  while (true) {
404  unsigned int num_entries;
405 
409  if (mgr()->is_reinit_triggered()) {
410  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
411  "ETCD SM: Abort UUID reader on reinit trigger");
412  return true;
413  }
414 
418  num_entries = GetNumReadRequestToBunch();
419 
423  EtcdResponse resp = eqlif_->Get(next_key,
424  os.str(),
425  num_entries);
426  EtcdResponse::kv_map kvs = resp.kvmap();
427 
431  if (resp.err_code() == 0) {
435  UUIDValueList uuid_list;
436 
437  for (multimap<string, string>::const_iterator iter = kvs.begin();
438  iter != kvs.end();
439  ++iter) {
443  next_key = iter->first;
444  if (!boost::starts_with(next_key, "/contrail/")) {
445  CONFIG_CLIENT_WARN(ConfigClientMgrWarning,
446  "ETCD SM: Non-contrail uuid: "
447  + next_key + " received");
448  } else {
449  uuid_list.push_back(make_pair(iter->first, iter->second));
450  }
451  }
452 
457  EnqueueDBSyncRequest(uuid_list);
458 
462  next_key += "00";
463 
469  if (kvs.size() < num_entries) {
470  break;
471  }
472  } else if (resp.err_code() == 100) {
476  break;
477  } else if (resp.err_code() == -1) {
478  /* Test ONLY */
479  read_done = true;
480  break;
481  } else {
487  usleep(GetInitRetryTimeUSec());
488  }
489  } //while
490  if (read_done) {
491  break;
492  }
493  } //for
494 
495  // At the end of task trigger
496  BOOST_FOREACH(ConfigEtcdPartition *partition, partitions_) {
497  ObjectProcessReq *req = new ObjectProcessReq("EndOfConfig", "", "");
498  partition->Enqueue(req);
499  }
500 
501  return true;
502 }
503 
505  // If UUIDReader task has been triggered return true.
506  if (uuid_reader_->IsSet()) {
507  return true;
508  }
509 
514  BOOST_FOREACH(ConfigEtcdPartition *partition, partitions_) {
515  if (partition->IsTaskTriggered()) {
516  return true;
517  }
518  }
519  return false;
520 }
521 
523  const string &search_string,
524  int inst_num,
525  const string &last_uuid,
526  uint32_t num_entries,
527  vector<ConfigDBUUIDCacheEntry> *entries) const {
528  return GetPartition(inst_num)->UUIDToObjCacheShow(search_string, last_uuid,
529  num_entries, entries);
530 }
531 
532 bool ConfigEtcdClient::IsListOrMapPropEmpty(const string &uuid_key,
533  const string &lookup_key) {
534  return GetPartition(uuid_key)->IsListOrMapPropEmpty(uuid_key, lookup_key);
535 }
536 
538  ConfigEtcdClient *client, size_t idx)
539  : config_client_(client), worker_id_(idx) {
540  int task_id = TaskScheduler::GetInstance()->GetTaskId("config_client::Reader");
541  config_reader_.reset(new
543  task_id, idx));
544  task_id =
545  TaskScheduler::GetInstance()->GetTaskId("config_client::ObjectProcessor");
547  task_id, idx, bind(&ConfigEtcdPartition::RequestHandler, this, _1),
549 }
550 
552  obj_process_queue_->Shutdown();
553 }
554 
556  obj_process_queue_->Enqueue(req);
557 }
558 
560  AddUUIDToProcessList(req->oper_, req->uuid_str_, req->value_);
561  delete req;
562  return true;
563 }
564 
569  const string &uuid_key,
570  const string &value_str) {
571  pair<UUIDProcessSet::iterator, bool> ret;
572  bool trigger = uuid_process_set_.empty();
573 
578  size_t front_pos = uuid_key.rfind('/');
579  string uuid = uuid_key.substr(front_pos + 1);
581  new UUIDProcessRequestType(oper, uuid, value_str);
582  ret = uuid_process_set_.insert(make_pair(client()->GetUUID(uuid), req));
583  if (ret.second) {
589  if (trigger) {
590  config_reader_->Set();
591  }
592  } else {
599  if ((oper == "DELETE") &&
600  (ret.first->second->oper == "CREATE")) {
601  uuid_process_set_.erase(ret.first);
602  client()->PurgeFQNameCache(uuid);
603  } else {
604  delete req;
605  ret.first->second->oper = oper;
606  ret.first->second->uuid = uuid;
607  ret.first->second->value = value_str;
608  }
609  }
610 }
611 
613 }
614 
616  UUIDCacheMap::const_iterator uuid_iter,
617  ConfigDBUUIDCacheEntry *entry) const {
618  entry->set_uuid(uuid);
619  entry->set_timestamp(
620  UTCUsecToString(uuid_iter->second->GetLastReadTimeStamp()));
621  entry->set_fq_name(uuid_iter->second->GetFQName());
622  entry->set_obj_type(uuid_iter->second->GetObjType());
623  entry->set_json_str(uuid_iter->second->GetJsonString());
624 }
625 
627  const string &search_string,
628  const string &last_uuid,
629  uint32_t num_entries,
630  vector<ConfigDBUUIDCacheEntry> *entries) const {
631  uint32_t count = 0;
632  regex search_expr(search_string);
633  for (UUIDCacheMap::const_iterator it =
634  uuid_cache_map_.upper_bound(last_uuid);
635  count < num_entries && it != uuid_cache_map_.end(); it++) {
636  if (regex_search(it->first, search_expr) ||
637  regex_search(it->second->GetObjType(), search_expr) ||
638  regex_search(it->second->GetFQName(), search_expr)) {
639  count++;
640  ConfigDBUUIDCacheEntry entry;
641  FillUUIDToObjCacheInfo(it->first, it, &entry);
642  entries->push_back(entry);
643  }
644  }
645  return true;
646 }
647 
650  UUIDCacheMap::iterator uuid_iter = uuid_cache_map_.find(uuid);
651  if (uuid_iter == uuid_cache_map_.end()) {
652  return NULL;
653  }
654  return uuid_iter->second;
655 }
656 
659  const string &value,
660  bool &is_new) {
661  UUIDCacheMap::iterator uuid_iter = uuid_cache_map_.find(uuid);
662  if (uuid_iter == uuid_cache_map_.end()) {
666  UUIDCacheEntry *obj;
667  obj = new UUIDCacheEntry(this,
668  value,
669  UTCTimestampUsec());
673  string tmp_uuid = uuid;
674  pair<UUIDCacheMap::iterator, bool> ret_uuid =
675  uuid_cache_map_.insert(tmp_uuid, obj);
676  assert(ret_uuid.second);
677  uuid_iter = ret_uuid.first;
678 
683  is_new = true;
684  } else {
688  uuid_iter->second->SetLastReadTimeStamp(UTCTimestampUsec());
689  }
690 
694  return uuid_iter->second;
695 }
696 
698  const UUIDCacheEntry *obj) const {
699  uint32_t retry_time_pow_of_two =
702  return ((1 << retry_time_pow_of_two) * kMinUUIDRetryTimeMSec);
703 }
704 
706  const string uuid,
707  const string value) {
708  if (!retry_timer_) {
709  retry_timer_ = TimerManager::CreateTimer(
710  *parent_->client()->event_manager()->io_service(),
711  "UUID retry timer for " + uuid,
713  "config_client::Reader"),
714  parent_->worker_id_);
715  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
716  "Created UUID read retry timer ", uuid);
717  }
718  retry_timer_->Cancel();
719  retry_timer_->Start(parent_->UUIDRetryTimeInMSec(this),
720  boost::bind(
722  this, uuid, value),
723  boost::bind(
725  this));
726  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
727  "Start/restart UUID Read Retry timer due to configuration", uuid);
728 }
729 
731  const string uuid) {
732  CHECK_CONCURRENCY("config_client::Reader");
733  if (retry_timer_) {
734  retry_timer_->Cancel();
735  TimerManager::DeleteTimer(retry_timer_);
736  retry_timer_ = NULL;
737  retry_count_ = 0;
738  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
739  "UUID Read retry timer - deleted timer due to configuration",
740  uuid);
741  }
742 }
743 
745  if (retry_timer_)
746  return (retry_timer_->running());
747  return false;
748 }
749 
751  const string uuid,
752  const string value) {
753  CHECK_CONCURRENCY("config_client::Reader");
754  parent_->client()->EnqueueUUIDRequest(
755  "UPDATE", parent_->client()->uuid_str(uuid), value);
756  retry_count_++;
757  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry, "timer expired ", uuid);
758  return false;
759 }
760 
761 void
763  std::string message = "Timer";
764  CONFIG_CLIENT_WARN(ConfigClientGetRowError,
765  "UUID Read Retry Timer error ", message, message);
766 }
767 
769  const string &prop) const {
770  ListMapSet::const_iterator it = list_map_set_.find(prop);
771  if (it == list_map_set_.end()) {
772  return true;
773  }
774  return (it->second == false);
775 }
776 
778  Document &doc,
779  bool add_change,
780  UUIDCacheEntry *cache) {
781 
782  // Get obj_type from cache.
783  const string &obj_type = cache->GetObjType();
784 
785  // string to get the type field.
786  string type_str;
787 
788  // bool to indicate if an update to ifmap_server is
789  // necessary.
790  bool notify_update = false;
791 
792  // Walk the document, remove unwanted properties and do
793  // needed fixup for the others.
794  Value::ConstMemberIterator itr = doc.MemberBegin();
795  Document::AllocatorType &a = doc.GetAllocator();
796 
797  while (itr != doc.MemberEnd()) {
798 
799  string key = itr->name.GetString();
800 
801  /*
802  * Indicate need for update since document has
803  * at least one field other than fq_name or
804  * obj_type to be updated.
805  */
806  if (!notify_update &&
807  key.compare("type") != 0 &&
808  key.compare("fq_name") != 0) {
809  notify_update = true;
810  }
811 
817  if (ConfigClientManager::skip_properties.find(key) !=
819  itr = doc.EraseMember(itr);
820  continue;
821  }
822 
827  if (key.compare("type") == 0) {
828  type_str = itr->value.GetString();
829  itr = doc.EraseMember(itr);
830  continue;
831  }
832 
833  string wrapper = client()->mgr()->config_json_parser()-> \
834  GetWrapperFieldName(obj_type, key.c_str());
835  if (!wrapper.empty()) {
836 
847  // Get the propl/propm json Value
848  Value &map_value = doc[key.c_str()];
849 
850  // Indicate in cache if propm/propl is empty
851  cache->SetListOrMapPropEmpty(key,
852  map_value.IsNull());
853 
854  } else if (key.compare("parent_type") == 0) {
855 
860  string parent_type = doc[key.c_str()].GetString();
861  replace(parent_type.begin(), parent_type.end(),
862  '-', '_');
863  doc[key.c_str()].SetString(parent_type.c_str(), a);
864 
865  } else if (key.compare("parent_uuid") == 0) {
866 
872  if (add_change) {
873  string parent_uuid = doc[key.c_str()].GetString();
874  string parent_fq_name = client()->FindFQName(parent_uuid);
875  if (parent_fq_name == "ERROR") {
876  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
877  "Parent fq_name not available for ", uuid);
878  return false;
879  }
880  }
881 
882  } else if (key.compare("bgpaas_session_attributes") == 0) {
883 
889  doc[key.c_str()].SetString("", a);
890 
891  } else if (key.find("_refs") != string::npos && add_change) {
892 
901  // Determine if NULL attr needs to be processed
902  string ref_type = key.substr(0, key.length() - 5);
903  bool link_with_attr =
904  client()->mgr()->config_json_parser()-> \
905  IsLinkWithAttr(obj_type, ref_type);
906 
907  // Get a pointer to the _refs json Value
908  Value *v = &doc[key.c_str()];
909 
910  assert(v->IsArray());
911  for (SizeType i = 0; i < v->Size(); i++) {
912 
913  // Process NULL attr
914  Value &va = (*v)[i];
915  if (link_with_attr) {
916  if (va["attr"].IsNull()) {
917  (*v)[i].RemoveMember("attr");
918  Value vm;
919  (*v)[i].AddMember("attr", vm.SetObject(), a);
920  }
921  }
922 
931  Value &uuidVal = va["uuid"];
932  const string ref_uuid = uuidVal.GetString();
933  string ref_fq_name = client()->FindFQName(ref_uuid);
934 
935  if (ref_fq_name == "ERROR") {
936  // ref_fq_name not in FQNameCache
937  // If we cannot find ref_fq_name in the doc
938  // as well, return false to enable retry.
939  if (!va.HasMember("to")) {
940  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
941  "Ref fq_name not available for ", uuid);
942  return false;
943  }
944 
945  ref_fq_name.clear();
946  const Value &name = va["to"];
947  for (Value::ConstValueIterator itr = name.Begin();
948  itr != name.End(); ++itr) {
949  ref_fq_name += itr->GetString();
950  ref_fq_name += ":";
951  }
952  ref_fq_name.erase(ref_fq_name.end()-1);
953  }
954 
955  // Remove ref_fq_name from doc and re-add the
956  // string formatted fq_name.
957  (*v)[i].RemoveMember("to");
958  Value vs1(ref_fq_name.c_str(), a);
959  (*v)[i].AddMember("to", vs1, a);
960  }
961 
962  // For creates/updates, need to update cache json_str
963  // with the new fixed ref_fq_names for _refs.
964  // Remove existing reference in cache and create a new
965  // ref with updated ref_fq_names.
966  Document cacheDoc;
967  string cache_json_str = cache->GetJsonString();
968  cacheDoc.Parse<0>(cache_json_str.c_str());
969  cacheDoc.RemoveMember(key.c_str());
970  Value vr;
971  Value vra;
972  Value refVal;
973  refVal.CopyFrom(*v, a);
974  cacheDoc.AddMember(vr.SetString(key.c_str(), a),
975  refVal, a);
976  StringBuffer sb;
977  Writer<StringBuffer> writer(sb);
978  cacheDoc.Accept(writer);
979  string cache_str = sb.GetString();
980  cache->SetJsonString(cache_str);
981  }
982 
983  if (itr != doc.MemberEnd()) itr++;
984  }
985 
986  if (!notify_update) {
987  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
988  "ETCD SM: Nothing to update");
989  return true;
990  }
991 
992  StringBuffer sb1;
993  Writer<StringBuffer> writer1(sb1);
994  Document refDoc;
995  refDoc.CopyFrom(doc, refDoc.GetAllocator());
996  refDoc.Accept(writer1);
997  string refString = sb1.GetString();
998  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
999  "ETCD SM: JSON Doc fed to CJP: " + refString);
1000 
1001  ConfigCass2JsonAdapter ccja(uuid, type_str, doc);
1002  client()->mgr()->config_json_parser()->Receive(ccja, add_change);
1003 
1004  return true;
1005 }
1006 
1008  const string &uuid_key) {
1009 
1015  if (client()->FindFQName(uuid_key) == "ERROR") {
1016  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
1017  "ETCD SM: Nothing to delete");
1018  return;
1019  }
1020 
1025  UUIDCacheMap::iterator uuid_iter = uuid_cache_map_.find(uuid_key);
1026  if (uuid_iter == uuid_cache_map_.end()) {
1027  return;
1028  }
1029  UUIDCacheEntry *cache = uuid_iter->second;
1030 
1036  if (cache->IsRetryTimerRunning()) {
1037  cache->DisableEtcdReadRetry(uuid_key);
1038  client()->PurgeFQNameCache(uuid_key);
1039  return;
1040  }
1041 
1050  const string cache_json_str = cache->GetJsonString();
1051  Document delDoc;
1052  delDoc.Parse<0>(cache_json_str.c_str());
1053 
1058  GenerateAndPushJson(uuid_key,
1059  delDoc,
1060  false,
1061  cache);
1062 
1066  uuid_cache_map_.erase(uuid_iter);
1067 
1071  client()->PurgeFQNameCache(uuid_key);
1072 }
1073 
1074 void ConfigEtcdPartition::ProcessUUIDUpdate(const string &uuid_key,
1075  const string &value_str) {
1082  bool is_new = false;
1083  UUIDCacheEntry *cache = GetUUIDCacheEntry(uuid_key,
1084  value_str,
1085  is_new);
1086 
1096  string cache_json_str = cache->GetJsonString();
1097  if (cache_json_str.compare("retry") == 0) {
1098  // If we are retrying due to ref or parent
1099  // fq_name not available previously, cache
1100  // json_str would have been cleared and set
1101  // to retry. Process now like a new create.
1102  cache_json_str = value_str;
1103  is_new = true;
1104  }
1105  Document cacheDoc;
1106  cacheDoc.Parse<0>(cache_json_str.c_str());
1107 
1114  Document updDoc;
1115  updDoc.Parse<0>(value_str.c_str());
1116  string key;
1117 
1122  if (!updDoc.HasMember("fq_name") ||
1123  !updDoc.HasMember("type")) {
1124  CONFIG_CLIENT_WARN(ConfigClientGetRowError,
1125  "fq_name or type not present for ",
1126  "obj_uuid_table with uuid: ", uuid_key);
1127  cache->DisableEtcdReadRetry(uuid_key);
1128  ProcessUUIDDelete(uuid_key);
1129  return;
1130  }
1131 
1142  Value::ConstMemberIterator itr = updDoc.MemberBegin();
1143  while (itr != updDoc.MemberEnd()) {
1144 
1145  key = itr->name.GetString();
1146 
1152  if (key.compare("draft_mode_state") == 0) {
1153  string mode = itr->value.GetString();
1154  if (!mode.empty()) {
1155  client()->PurgeFQNameCache(uuid_key);
1156  DeleteCacheMap(uuid_key);
1157  return;
1158  }
1159  itr = updDoc.EraseMember(itr);
1160  continue;
1161  }
1162 
1169  if (is_new) {
1170  if (key.compare("type") == 0) {
1171  string type = itr->value.GetString();
1172  cache->SetObjType(type);
1173  } else if (key.compare("fq_name") == 0) {
1174  string fq_name;
1175  const Value &name = updDoc[key.c_str()];
1176  for (Value::ConstValueIterator name_itr = name.Begin();
1177  name_itr != name.End(); ++name_itr) {
1178  fq_name += name_itr->GetString();
1179  fq_name += ":";
1180  }
1181  fq_name.erase(fq_name.end()-1);
1182  cache->SetFQName(fq_name);
1183  }
1184  }
1185 
1193  if (!is_new && cacheDoc.HasMember(key.c_str()) &&
1194  key.compare("type") != 0 &&
1195  key.compare("fq_name") != 0) {
1196  if (cacheDoc[key.c_str()] == updDoc[key.c_str()]) {
1197  itr = updDoc.EraseMember(itr);
1198  }
1199  assert(cacheDoc.RemoveMember(key.c_str()));
1200  } else {
1201  if (itr != updDoc.MemberEnd()) itr++;
1202  }
1203  }
1204 
1209  cache->SetJsonString(value_str);
1210 
1224  if (!GenerateAndPushJson(uuid_key,
1225  updDoc,
1226  true,
1227  cache)) {
1228  cache->EnableEtcdReadRetry(uuid_key, value_str);
1229  cache->SetJsonString("retry");
1230  } else {
1231  cache->DisableEtcdReadRetry(uuid_key);
1232  }
1233  if (!is_new) {
1234  GenerateAndPushJson(uuid_key,
1235  cacheDoc,
1236  false,
1237  cache);
1238  }
1239 }
1240 
1241 bool ConfigEtcdPartition::IsListOrMapPropEmpty(const string &uuid_key,
1242  const string &lookup_key) {
1243  UUIDCacheMap::iterator uuid_iter = uuid_cache_map_.find(uuid_key);
1244  if (uuid_iter == uuid_cache_map_.end()) {
1245  return true;
1246  }
1247  UUIDCacheEntry *cache = uuid_iter->second;
1248 
1249  return cache->ListOrMapPropEmpty(lookup_key);
1250 }
1251 
1253  return (config_reader_->IsSet());
1254 }
1255 
1257  CHECK_CONCURRENCY("config_client::Reader");
1258 
1259  int num_req_handled = 0;
1260 
1267  for (UUIDProcessSet::iterator it = uuid_process_set_.begin(), itnext;
1268  it != uuid_process_set_.end() &&
1269  !client()->mgr()->is_reinit_triggered();
1270  it = itnext) {
1271 
1272  itnext = it;
1273  ++itnext;
1274 
1275  UUIDProcessRequestType *obj_req = it->second;
1276 
1277  if (obj_req->oper == "CREATE" || obj_req->oper == "UPDATE") {
1278  ProcessUUIDUpdate(obj_req->uuid, obj_req->value);
1279  } else if (obj_req->oper == "DELETE") {
1280  ProcessUUIDDelete(obj_req->uuid);
1281  } else if (obj_req->oper == "EndOfConfig") {
1282  client()->BulkSyncDone();
1283  }
1284  RemoveObjReqEntry(obj_req->uuid);
1285 
1290  if (++num_req_handled == client()->GetMaxRequestsToYield()) {
1291  return false;
1292  }
1293  }
1294 
1295  // Clear the UUID read set if we are currently processing reinit request
1296  if (client()->mgr()->is_reinit_triggered()) {
1297  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
1298  "ETCD SM: Clear UUID process set due to reinit");
1299  uuid_process_set_.clear();
1300  }
1301  assert(uuid_process_set_.empty());
1302  return true;
1303 }
1304 
1306  UUIDProcessSet::iterator req_it =
1307  uuid_process_set_.find(client()->GetUUID(uuid));
1308  delete req_it->second;
1309  uuid_process_set_.erase(req_it);
1310 }
ConfigJsonParserBase * config_json_parser()
virtual bool BulkDataSync()
std::string uuid_str_
bool EtcdReadRetryTimerExpired(const string uuid, const string value)
std::string value_
void STLDeleteValues(Container *container)
Definition: util.h:101
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
virtual void InvalidateFQNameCache(const std::string &uuid)
UUIDProcessWorkQType obj_process_queue_
ConfigClientManager * mgr()
void ProcessResponse(EtcdResponse resp)
WatchAction action() const
Definition: eql_if.h:208
boost::scoped_ptr< TaskTrigger > uuid_reader_
bool UUIDToObjCacheShow(const string &search_string, const string &last_uuid, uint32_t num_entries, vector< ConfigDBUUIDCacheEntry > *entries) const
virtual void InitDatabase()
virtual void SetObjType(std::string obj_type)
bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
virtual const uint64_t GetInitRetryTimeUSec() const
const string & GetJsonString() const
UUIDCacheMap uuid_cache_map_
boost::uuids::uuid uuid
void EnableEtcdReadRetry(const string uuid, const string value)
const std::string & value() const
Definition: eql_if.h:218
virtual void SetFQName(std::string fq_name)
virtual bool IsTaskTriggered() const
EtcdWatcher(ConfigEtcdClient *etcd_client)
virtual void InitConnectionInfo()
virtual void UpdateConnectionInfo(bool success, bool force)
virtual void ClearFQNameCache()
#define CONFIG_CLIENT_DEBUG(obj,...)
int GetTaskId(const std::string &name)
Definition: task.cc:856
virtual std::string uuid_str(const std::string &uuid)
void EnqueueUUIDRequest(string oper, string obj_type, string uuid_str)
static bool regex_match(const std::string &input, const regex &regex)
Definition: regex.h:34
tbb::atomic< long > bulk_sync_status_
static const std::set< std::string > skip_properties
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
virtual bool UUIDToObjCacheShow(const string &search_string, int inst_num, const string &last_uuid, uint32_t num_entries, vector< ConfigDBUUIDCacheEntry > *entries) const
virtual void PurgeFQNameCache(const std::string &uuid)
uint8_t type
Definition: load_balance.h:109
void ProcessUUIDUpdate(const string &uuid_key, const string &value_str)
const std::string & key() const
Definition: eql_if.h:213
void SetJsonString(const string &value_str)
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
static Options options
ConfigEtcdClient(ConfigClientManager *mgr, EventManager *evm, const ConfigClientOptions &options, int num_workers)
boost::scoped_ptr< EtcdIf > eqlif_
UUIDCacheEntry * GetUUIDCacheEntry(const string &uuid)
bool RequestHandler(ObjectProcessReq *req)
#define CHECK_CONCURRENCY(...)
virtual int HashUUID(const std::string &uuid_str) const
UUIDProcessSet uuid_process_set_
static bool disable_watch_
virtual const std::string & GetObjType() const
void ProcessUUIDDelete(const string &uuid_key)
ConfigEtcdClient * client() const
PartitionList partitions_
static bool regex_search(const std::string &input, const regex &regex)
Definition: regex.h:25
static Timer * CreateTimer(boost::asio::io_context &service, const std::string &name, int task_id=Timer::GetTimerTaskId(), int task_instance=Timer::GetTimerInstanceId(), bool delete_on_completion=false)
Definition: timer.cc:201
#define CONFIG_CLIENT_WARN(obj,...)
virtual void AddFQNameCache(const std::string &uuid, const std::string &obj_type, const std::string &fq_name)
void EnqueueDBSyncRequest(const UUIDValueList &uuid_list)
const kv_map & kvmap() const
Definition: eql_if.h:233
int err_code() const
Definition: eql_if.h:193
bool IsTaskTriggered() const
boost::shared_ptr< TaskTrigger > config_reader_
const std::vector< std::string > & config_db_ips() const
void RemoveObjReqEntry(string &uuid)
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
ConfigEtcdClient * client()
virtual void ProcessResponse(EtcdResponse resp)
list< UUIDValueType > UUIDValueList
ConfigEtcdPartition * GetPartition(const string &uuid)
virtual std::string FindFQName(const std::string &uuid) const
static const uint32_t kMaxUUIDRetryTimePowOfTwo
const ObjectTypeList & ObjectTypeListToRead() const
virtual void PostShutdown()
static ConnectionState * GetInstance()
void FillUUIDToObjCacheInfo(const string &uuid, UUIDCacheMap::const_iterator uuid_iter, ConfigDBUUIDCacheEntry *entry) const
virtual bool Receive(const ConfigCass2JsonAdapter &adapter, bool add_change)=0
void HandleEtcdConnectionStatus(bool success, bool force_update=false)
void AddUUIDToProcessList(const string &oper, const string &uuid_key, const string &value_str)
ConfigEtcdPartition(ConfigEtcdClient *client, size_t idx)
void Enqueue(ObjectProcessReq *req)
void DisableEtcdReadRetry(const string uuid)
bool ListOrMapPropEmpty(const string &prop) const
void DeleteCacheMap(const string &uuid)
int GetFirstConfigDbPort() const
virtual uint32_t GetNumReadRequestToBunch() const
void SetListOrMapPropEmpty(const string &prop, bool empty)
virtual bool GenerateAndPushJson(const string &uuid_key, Document &doc, bool add_change, UUIDCacheEntry *cache)
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
virtual bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
static const uint32_t kMinUUIDRetryTimeMSec
virtual int UUIDRetryTimeInMSec(const UUIDCacheEntry *obj) const
struct task_ task
static EventManager evm
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
static std::string UTCUsecToString(uint64_t tstamp)
Definition: time_util.h:54