OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
config_cassandra_client.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
4 
6 
7 #include <sandesh/request_pipeline.h>
8 
9 #include <boost/algorithm/string/join.hpp>
10 #include <boost/algorithm/string/predicate.hpp>
11 #include <boost/foreach.hpp>
12 #include <boost/functional/hash.hpp>
13 #include <boost/ptr_container/ptr_map.hpp>
14 #include <boost/uuid/uuid.hpp>
15 #include <map>
16 #include <set>
17 #include <string>
18 #include <utility>
19 #include <vector>
20 
21 #include "base/connection_info.h"
22 #include "base/logging.h"
23 #include "base/regex.h"
24 #include "base/task.h"
25 #include "base/task_annotations.h"
26 #include "base/task_trigger.h"
28 #include "io/event_manager.h"
30 #include "config_factory.h"
31 #include "config_client_log.h"
32 #include "config_client_log_types.h"
33 #include "config_client_show_types.h"
34 #include "sandesh/common/vns_constants.h"
35 
36 using contrail::regex;
39 using std::unique_ptr;
40 using std::multimap;
41 using std::set;
42 using std::string;
43 
44 const string ConfigCassandraClient::kUuidTableName = "obj_uuid_table";
45 const string ConfigCassandraClient::kFqnTableName = "obj_fq_name_table";
46 const string ConfigCassandraClient::kCassClientTaskId = "config_client::Reader";
48  "config_client::ObjectProcessor";
49 
52  int num_workers)
53  : ConfigDbClient(mgr, evm, options), num_workers_(num_workers) {
54  dbif_.reset(ConfigStaticObjectFactory::CreateRef<cass::cql::CqlIf>(
55  evm, config_db_ips(),
58  static_cast<bool>(options.config_db_use_ssl), options.config_db_ca_certs));
59 
60  // Initialized the casssadra connection status;
63 
64  for (int i = 0; i < num_workers_; i++) {
65  partitions_.push_back(
66  ConfigStaticObjectFactory::Create<ConfigCassandraPartition>
67  (this, static_cast<size_t>(i)));
68  }
69 
70  fq_name_reader_.reset(new
72  TaskScheduler::GetInstance()->GetTaskId("config_client::DBReader"),
73  0));
74 }
75 
77  if (dbif_) {
78  // dbif_->Db_Uninit(....);
79  }
80 
82 }
83 
86  while (true) {
87  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug, "Cassandra SM: Db Init");
88  if (!dbif_->Db_Init()) {
89  CONFIG_CLIENT_DEBUG(ConfigCassInitErrorMessage,
90  "Database initialization failed");
91  if (!InitRetry()) return;
92  continue;
93  }
94  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
95  "Cassandra SM: Db SetTableSpace");
96  if (!dbif_->Db_SetTablespace(
97  g_vns_constants.API_SERVER_KEYSPACE_NAME)) {
98  CONFIG_CLIENT_DEBUG(ConfigCassInitErrorMessage,
99  "Setting database keyspace failed");
100  if (!InitRetry()) return;
101  continue;
102  }
103  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
104  "Cassandra SM: Db UseColumnFamily uuidTableName");
105  if (!dbif_->Db_UseColumnfamily(kUuidTableName)) {
106  if (!InitRetry()) return;
107  continue;
108  }
109  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
110  "Cassandra SM: Db UseColumnFamily fqnTableName");
111  if (!dbif_->Db_UseColumnfamily(kFqnTableName)) {
112  if (!InitRetry()) return;
113  continue;
114  }
115  break;
116  }
118  BulkDataSync();
119 }
120 
122  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug, "Cassandra SM: DB uninit");
123  dbif_->Db_Uninit();
124  // If reinit is triggered, return false to abort connection attempt
125  if (mgr()->is_reinit_triggered()) return false;
126  usleep(GetInitRetryTimeUSec());
127  return true;
128 }
129 
132  int worker_id = HashUUID(uuid);
133  return partitions_[worker_id];
134 }
135 
137 ConfigCassandraClient::GetPartition(const string &uuid) const {
138  int worker_id = HashUUID(uuid);
139  return partitions_[worker_id];
140 }
141 
144  assert(worker_id < num_workers_);
145  return partitions_[worker_id];
146 }
147 
148 int ConfigCassandraClient::HashUUID(const string &uuid_str) const {
149  boost::hash<string> string_hash;
150  return string_hash(uuid_str) % num_workers_;
151 }
152 
153 bool ConfigCassandraPartition::ReadObjUUIDTable(const set<string> &req_list) {
154  GenDb::ColListVec col_list_vec;
155 
156  set<string> uuid_list = req_list;
157  vector<GenDb::DbDataValueVec> keys;
158  for (set<string>::const_iterator it = uuid_list.begin();
159  it != uuid_list.end(); it++) {
161  key.push_back(GenDb::Blob(reinterpret_cast<const uint8_t *>
162  (it->c_str()), it->size()));
163  keys.push_back(key);
164  }
165 
166  GenDb::Blob col_filter(reinterpret_cast<const uint8_t *>("d"), 1);
167  GenDb::ColumnNameRange crange;
168  crange.start_ =
169  boost::assign::list_of(GenDb::DbDataValue(col_filter)).convert_to_container<GenDb::DbDataValueVec>();
170 
171  GenDb::FieldNamesToReadVec field_vec;
172  field_vec.push_back(boost::make_tuple("key", true, false, false));
173  field_vec.push_back(boost::make_tuple("column1", false, true, false));
174  field_vec.push_back(boost::make_tuple("value", false, false, true));
175 
176  if (client()->dbif_->Db_GetMultiRow(&col_list_vec,
178  crange, field_vec,
181  BOOST_FOREACH(const GenDb::ColList &col_list, col_list_vec) {
182  assert(col_list.rowkey_.size() == 1);
183  assert(col_list.rowkey_[0].which() == GenDb::DB_VALUE_BLOB);
184  if (col_list.columns_.size()) {
185  GenDb::Blob uuid(boost::get<GenDb::Blob>(col_list.rowkey_[0]));
186  string uuid_str(reinterpret_cast<const char *>(uuid.data()),
187  uuid.size());
188  ProcessObjUUIDTableEntry(uuid_str, col_list);
189  uuid_list.erase(uuid_str);
190  }
191  }
192  } else {
193  // Failure is returned due to connectivity issue or consistency
194  // issues in reading from cassandra
196  CONFIG_CLIENT_WARN(ConfigClientGetRowError,
197  "GetMultiRow failed for table",
199  //
200  // Task is rescheduled to read the request queue
201  // Due to a bug CQL driver from datastax, connection status is
202  // not notified asynchronously. Because of this, polling is the only
203  // choice to determine the cql connection status.
204  // Since there are dedicated threads to read config,
205  // and it is ok to retry by rescheduling the reader task
206  // TODO: Sleep or No Sleep?
207  //
208  return false;
209  }
210 
211  // Delete all stale entries from the data base.
212  BOOST_FOREACH(string uuid_key, uuid_list) {
213  CONFIG_CLIENT_WARN(ConfigClientGetRowError, "Missing row in the table",
215  HandleObjectDelete(uuid_key, false);
216  }
217 
218  // Clear the uuid list.
219  uuid_list.clear();
220  return true;
221 }
222 
223 // Notes on list map property processing:
224 // Separate entries per list/map keys are stored in the ObjUuidCache partition
225 // based on the uuid.
226 // The cache map entries contain a refreshed bit and timestamp in addition to
227 // the values etc.
228 // A set containing list/map property names (updated_list_map_properties) that
229 // have key/value pairs with a new timestamp, a second set
230 // (candidate_list_map_properties) also containing list/map property names that
231 // may require an update given some key/value pairs have been deleted, and a
232 // multimap (list_map_properties) for all the list/map key value pairs in the
233 // new configuration, are build and held in the context(temporary).
234 // These lists are used to determine which list/map properties need to be pushed
235 // to the backend, they are built as columns are parsed. Once all columns are
236 // parsed,
237 // in ListMapPropReviseUpdateList, for each property name in
238 // candidate_list_map_properties we check it is already in the
239 // updated_list_map_property list, if not we proceed to find at least one stale
240 // list/map key value pair with the property name in the ObJUuidCache, if one is
241 // found that requires an update, the property name is added to
242 // updated_list_map_properties.
243 // Once updated_list_map_properties is revised, we iterate through each property
244 // name in it and push all matching key/value pairs in list_map_properties.
245 // ConfigCass2JsonAdapter groups the key value pairs belonging to the same
246 // property so that a single DB request is sent to the
247 // backend.
248 // Deletes are handled by FormDeleteRequestList. Note that deletes are sent
249 // only when all key/value pairs for a given list/map property are removed.
250 // Additionally, the resulting DB request only resets the property_set bit, it
251 // does not clear the entries in the backend.
252 //
253 // parent_or_ref_fq_name_unknown indicates that at least one parent or
254 // ref cannot be found in the FQNameCache, this can happen if the parent or
255 // referred object is not yet read.
259  }
260  std::multimap<string, JsonAdapterDataType> list_map_properties;
263  string obj_type;
264  string fq_name;
268 
269 private:
271 };
272 
274  const GenDb::ColList &col_list) {
275  CassColumnKVVec cass_data_vec;
276 
278 
280 
281  ParseObjUUIDTableEntry(uuid_key, col_list, &cass_data_vec, context);
282  // Ignore draft objects.
283  if (context.ignore_object) {
284  client()->PurgeFQNameCache(uuid_key);
285  DeleteCacheMap(uuid_key);
286  return false;
287  }
288  // If type or fq-name is not present in the db object, ignore the object
289  // and trigger delete of the object.
290  if (context.obj_type.empty() || !context.fq_name_present) {
291  // Handle as delete
292  CONFIG_CLIENT_WARN(ConfigClientGetRowError,
293  "Parsing row response for type/fq_name failed for table",
295  obj->DisableCassandraReadRetry(uuid_key);
296  HandleObjectDelete(uuid_key, false);
297  return false;
298  }
299 
300  obj->SetFQName(context.fq_name);
301  obj->SetObjType(context.obj_type);
302 
303  if (context.parent_or_ref_fq_name_unknown) {
304  obj->EnableCassandraReadRetry(uuid_key);
305  } else {
306  obj->DisableCassandraReadRetry(uuid_key);
307  }
308 
309  ListMapPropReviseUpdateList(uuid_key, context);
310 
311  // Read the context for map and list properties
312  if (context.updated_list_map_properties.size()) {
313  for (set<string>::iterator it =
314  context.updated_list_map_properties.begin();
315  it != context.updated_list_map_properties.end(); it++) {
316  pair<multimap<string, JsonAdapterDataType>::iterator,
317  multimap<string, JsonAdapterDataType>::iterator> ret =
318  context.list_map_properties.equal_range(*it);
319  for (multimap<string, JsonAdapterDataType>::iterator mit =
320  ret.first; mit != ret.second; mit++) {
321  cass_data_vec.push_back(mit->second);
322  }
323  }
324  }
325  GenerateAndPushJson(uuid_key, context.obj_type, cass_data_vec, true);
326  HandleObjectDelete(uuid_key, true);
327  return true;
328 }
329 
331  const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec,
332  ConfigCassandraParseContext &context) {
333  BOOST_FOREACH(const GenDb::NewCol &ncol, col_list.columns_) {
334  assert(ncol.name->size() == 1);
335  assert(ncol.value->size() == 1);
336  assert(ncol.timestamp->size() == 1);
337 
338  const GenDb::DbDataValue &dname(ncol.name->at(0));
339  assert(dname.which() == GenDb::DB_VALUE_BLOB);
340  GenDb::Blob dname_blob(boost::get<GenDb::Blob>(dname));
341  string key(reinterpret_cast<const char *>(dname_blob.data()),
342  dname_blob.size());
343 
344  const GenDb::DbDataValue &dvalue(ncol.value->at(0));
345  assert(dvalue.which() == GenDb::DB_VALUE_STRING);
346  string value(boost::get<string>(dvalue));
347 
348  const GenDb::DbDataValue &dtimestamp(ncol.timestamp->at(0));
349  assert(dtimestamp.which() == GenDb::DB_VALUE_UINT64);
350  uint64_t timestamp = boost::get<uint64_t>(dtimestamp);
351  ParseObjUUIDTableEachColumnBuildContext(uuid, key, value, timestamp,
352  cass_data_vec, context);
353  }
354 }
355 
357  const string &uuid, const string &key, const string &value,
358  uint64_t timestamp, CassColumnKVVec *cass_data_vec,
359  ConfigCassandraParseContext &context) {
360  // Check whether there was an update to property of ref
361  JsonAdapterDataType adapter(key, value);
362  if (StoreKeyIfUpdated(uuid, &adapter, timestamp, context)) {
363  // Field is updated.. enqueue to parsing
364  cass_data_vec->push_back(adapter);
365  }
366 }
367 
369  const string &uuid_key, const string &obj_type,
370  const CassColumnKVVec &cass_data_vec, bool add_change) {
371 
372  ConfigCass2JsonAdapter ccja(uuid_key, client(), obj_type,
373  cass_data_vec);
374  client()->mgr()->config_json_parser()->Receive(ccja, add_change);
375 }
376 
377 // Post shutdown during reinit, cleanup all previous states and connections
378 // 1. Disconnect from cassandra cluster
379 // 2. Clean FQ Name cache
380 // 3. Delete partitions which inturn will clear up the object cache and
381 // previously enqueued uuid read requests
383  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
384  "Cassandra SM: Post shutdown during re init");
385  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug, "Cassandra SM: Db Uninit");
386  dbif_->Db_Uninit();
389 }
390 
393  ConfigClientMgrDebug, "Cassandra SM: BulkDataSync Started");
395  fq_name_reader_->Set();
396  return true;
397 }
398 
400  // If FQNameReader task has been triggered return true.
401  if (fq_name_reader_->IsSet()) {
402  return true;
403  }
404 
409  BOOST_FOREACH(ConfigCassandraPartition *partition, partitions_) {
410  if (partition->IsTaskTriggered()) {
411  return true;
412  }
413  }
414  return false;
415 }
416 
418  for (ConfigClientManager::ObjectTypeList::const_iterator it =
419  mgr()->config_json_parser()->ObjectTypeListToRead().begin();
420  it != mgr()->config_json_parser()->ObjectTypeListToRead().end();
421  it++) {
422  string column_name;
423  while (true) {
424  // Ensure that FQName reader task aborts on reinit trigger.
425  if (mgr()->is_reinit_triggered()) {
426  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
427  "Cassandra SM: Abort FQName reader on reinit trigger");
428  return true;
429  }
430 
431  // Rowkey is obj-type
433  key.push_back(GenDb::Blob(reinterpret_cast<const uint8_t *>
434  (it->c_str()), it->size()));
435  GenDb::ColumnNameRange crange;
436  if (!column_name.empty()) {
437  GenDb::Blob col_filter(reinterpret_cast<const uint8_t *>
438  (column_name.c_str()), column_name.size());
439  // Start reading the next set of entries from where we ended in
440  // last read
441  crange.start_ =
442  boost::assign::list_of(
443  GenDb::DbDataValue(col_filter)).convert_to_container
445  crange.start_op_ = GenDb::Op::GT;
446  }
447 
448  // In large scale scenarios, each object type may have a large
449  // number of uuid entries. Read a fixed number of entries at a time
450  // to avoid cpu hogging by this thread.
451  crange.count_ = GetFQNameEntriesToRead();
452 
453  GenDb::FieldNamesToReadVec field_vec;
454  field_vec.push_back(boost::make_tuple("key", true, false, false));
455  field_vec.push_back(boost::make_tuple("column1", false, true,
456  false));
457 
458  GenDb::ColList col_list;
459  if (dbif_->Db_GetRow(&col_list, kFqnTableName, key,
460  GenDb::DbConsistency::QUORUM, crange, field_vec)) {
462 
463  // No entries for this obj-type
464  if (!col_list.columns_.size())
465  break;
466 
467  ObjTypeUUIDList uuid_list;
468  ParseFQNameRowGetUUIDList(*it, col_list, uuid_list,
469  &column_name);
470  EnqueueDBSyncRequest(uuid_list);
471 
472  // If we read less than what we sought, it means there are
473  // no more entries for current obj-type. We move to next
474  // obj-type.
475  if (col_list.columns_.size() < GetFQNameEntriesToRead())
476  break;
477  } else {
479  CONFIG_CLIENT_WARN(ConfigClientGetRowError,
480  "GetRow failed for table", kFqnTableName, *it);
481  usleep(GetInitRetryTimeUSec());
482  }
483  }
484  }
485  // At the end of task trigger
486  BOOST_FOREACH(ConfigCassandraPartition *partition, partitions_) {
487  ObjectProcessReq *req = new ObjectProcessReq("EndOfConfig", "", "");
488  partition->Enqueue(req);
489  }
490 
491  return true;
492 }
493 
495  const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list,
496  string *last_column) {
497  string column_name;
498  BOOST_FOREACH(const GenDb::NewCol &ncol, col_list.columns_) {
499  assert(ncol.name->size() == 1);
500  const GenDb::DbDataValue &dname(ncol.name->at(0));
501  assert(dname.which() == GenDb::DB_VALUE_BLOB);
502  GenDb::Blob dname_blob(boost::get<GenDb::Blob>(dname));
503  column_name = string(reinterpret_cast<const char *>(dname_blob.data()),
504  dname_blob.size());
505  UpdateFQNameCache(column_name, obj_type, uuid_list);
506  }
507 
508  *last_column = column_name;
509  return true;
510 }
511 
513  const string &obj_type, ObjTypeUUIDList &uuid_list) {
514  string uuid_str = FetchUUIDFromFQNameEntry(key);
515  if (uuid_str.empty())
516  return;
517  uuid_list.push_back(make_pair(obj_type, uuid_str));
518  AddFQNameCache(uuid_str, obj_type, key.substr(0, key.rfind(':')));
519 }
520 
522  const string &key) const {
523  size_t temp = key.rfind(':');
524  return (temp == string::npos) ? "" : key.substr(temp+1);
525 }
526 
528  const ObjTypeUUIDList &uuid_list) {
529  for (ObjTypeUUIDList::const_iterator it = uuid_list.begin();
530  it != uuid_list.end(); it++) {
531  EnqueueUUIDRequest("CREATE", it->first, it->second);
532  }
533  return true;
534 }
535 
537  const string &search_string, int inst_num, const string &last_uuid,
538  uint32_t num_entries, vector<ConfigDBUUIDCacheEntry> *entries) const {
539  return GetPartition(inst_num)->UUIDToObjCacheShow(search_string, last_uuid,
540  num_entries, entries);
541 }
542 
543 void ConfigCassandraClient::EnqueueUUIDRequest(string oper, string obj_type,
544  string uuid_str) {
545  ObjectProcessReq *req = new ObjectProcessReq(oper, uuid_str, obj_type);
546  GetPartition(uuid_str)->Enqueue(req);
547 }
548 
550  long num_config_readers_still_processing =
551  bulk_sync_status_.fetch_and_decrement();
552  if (num_config_readers_still_processing == 1) {
553  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
554  "Cassandra SM: BulkSyncDone by all readers");
555  mgr()->EndOfConfig();
556  } else {
557  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
558  "Cassandra SM: One reader finished BulkSync");
559  }
560 }
561 
563  bool force_update) {
564  UpdateConnectionInfo(success, force_update);
565 
566  if (success) {
567  // Update connection info
569  process::ConnectionType::DATABASE, "Cassandra",
570  process::ConnectionStatus::UP,
571  dbif_->Db_GetEndpoints(), "Established Cassandra connection");
572  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
573  "Cassandra SM: Established Cassandra connection");
574  } else {
576  process::ConnectionType::DATABASE, "Cassandra",
577  process::ConnectionStatus::DOWN,
578  dbif_->Db_GetEndpoints(), "Lost Cassandra connection");
579  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
580  "Cassandra SM: Lost Cassandra connection");
581  }
582 }
583 
585  const string &lookup_key) {
586  return GetPartition(uuid_key)->IsListOrMapPropEmpty(uuid_key, lookup_key);
587 }
588 
590  ConfigCassandraClient *client, size_t idx)
591  : config_client_(client), worker_id_(idx) {
592  int task_id = TaskScheduler::GetInstance()->GetTaskId("config_client::Reader");
593  config_reader_.reset(new
595  task_id, idx));
596  task_id =
597  TaskScheduler::GetInstance()->GetTaskId("config_client::ObjectProcessor");
599  task_id, idx, bind(&ConfigCassandraPartition::RequestHandler, this, _1),
601 }
602 
604  obj_process_queue_->Shutdown();
605 }
606 
608  obj_process_queue_->Enqueue(req);
609 }
610 
612  AddUUIDToRequestList(req->oper_, req->value_, req->uuid_str_);
613  delete req;
614  return true;
615 }
616 
618  const string &obj_type,
619  const string &uuid_str) {
620  pair<UUIDProcessSet::iterator, bool> ret;
621  bool trigger = uuid_read_set_.empty();
623  new ObjectProcessRequestType(oper, obj_type, uuid_str);
624  ret = uuid_read_set_.insert(make_pair(client()->GetUUID(uuid_str), req));
625  if (ret.second) {
626  if (trigger) {
627  config_reader_->Set();
628  }
629  } else {
630  delete req;
631  ret.first->second->oper = oper;
632  ret.first->second->uuid = uuid_str;
633  }
634 }
635 
637  const string &uuid, bool add_change) {
638  if (!add_change) {
639  ConfigCassandraClient::ObjTypeFQNPair obj_type_fq_name_pair =
640  client()->UUIDToFQName(uuid, true);
641  if (obj_type_fq_name_pair.second == "ERROR") {
642  return;
643  }
644  }
645 
646  bool needNotify = false;
647  std::string obj_type("");
648  ObjectCacheMap::iterator uuid_iter = object_cache_map_.find(uuid);
649  if (uuid_iter == object_cache_map_.end()) {
650  assert(!add_change);
651  return;
652  }
653 
654  CassColumnKVVec cass_data_vec;
655  for (FieldDetailMap::iterator it =
656  uuid_iter->second->GetFieldDetailMap().begin(), itnext;
657  it != uuid_iter->second->GetFieldDetailMap().end();
658  it = itnext) {
659  itnext = it;
660  ++itnext;
661  if (it->first.key == "type") {
662  obj_type = it->first.value;
663  obj_type.erase(remove(obj_type.begin(),
664  obj_type.end(), '\"'), obj_type.end());
665  cass_data_vec.push_back(it->first);
666  }
667 
668  if (it->first.key == "fq_name") {
669  cass_data_vec.push_back(it->first);
670  }
671 
672  if (!add_change || !it->second.refreshed) {
673  if (it->first.key == "type" || it->first.key == "fq_name") {
674  continue;
675  }
676 
677  needNotify = true;
678  cass_data_vec.push_back(it->first);
679  if (add_change) {
680  uuid_iter->second->GetFieldDetailMap().erase(it);
681  }
682  }
683  }
684 
685  if (add_change != true) {
686  object_cache_map_.erase(uuid_iter);
687  }
688  if (needNotify) {
689  GenerateAndPushJson(uuid, obj_type, cass_data_vec, false);
690  }
691  if (!add_change) {
692  client()->PurgeFQNameCache(uuid);
693  }
694 }
695 
697  const string &lookup_key) {
698  string key;
699  ObjectCacheMap::iterator uuid_iter = object_cache_map_.find(uuid_key);
700  if (uuid_iter == object_cache_map_.end()) {
701  return true;
702  }
703 
704  key = "propm:" + lookup_key;
705  FieldDetailMap::iterator lower_bound_it =
706  uuid_iter->second->GetFieldDetailMap().lower_bound(
707  JsonAdapterDataType(key, ""));
708  if (lower_bound_it != uuid_iter->second->GetFieldDetailMap().end() &&
709  boost::starts_with(lower_bound_it->first.key, key)) {
710  return false;
711  }
712  key = "propl:" + lookup_key;
713  lower_bound_it =
714  uuid_iter->second->GetFieldDetailMap().lower_bound(
715  JsonAdapterDataType(key, ""));
716  if (lower_bound_it != uuid_iter->second->GetFieldDetailMap().end() &&
717  boost::starts_with(lower_bound_it->first.key, key)) {
718  return false;
719  }
720  return true;
721 }
722 
724  return (config_reader_->IsSet());
725 }
726 
728  CHECK_CONCURRENCY("config_client::Reader");
729 
730  set<string> bunch_req_list;
731  int num_req_handled = 0;
732  // Config reader task should stop on reinit trigger
733  for (UUIDProcessSet::iterator it = uuid_read_set_.begin(), itnext;
734  it != uuid_read_set_.end() && !client()->mgr()->is_reinit_triggered();
735  it = itnext) {
736  itnext = it;
737  ++itnext;
738  ObjectProcessRequestType *obj_req = it->second;
739 
740  if (obj_req->oper == "CREATE" || obj_req->oper == "UPDATE" ||
741  obj_req->oper == "UPDATE-IMPLICIT") {
742  bunch_req_list.insert(obj_req->uuid);
743  bool is_last = (itnext == uuid_read_set_.end());
744  if (is_last ||
745  bunch_req_list.size() == client()->GetNumReadRequestToBunch()) {
746  if (!ReadObjUUIDTable(bunch_req_list)) {
747  return false;
748  }
749  num_req_handled += bunch_req_list.size();
750  RemoveObjReqEntries(bunch_req_list);
751  if (num_req_handled >= client()->GetMaxRequestsToYield()) {
752  return false;
753  }
754  }
755  continue;
756  } else if (obj_req->oper == "DELETE") {
757  HandleObjectDelete(obj_req->uuid, false);
758  } else if (obj_req->oper == "EndOfConfig") {
759  client()->BulkSyncDone();
760  }
761  RemoveObjReqEntry(obj_req->uuid);
762  if (++num_req_handled == client()->GetMaxRequestsToYield()) {
763  return false;
764  }
765  }
766 
767  // No need to read the object uuid table if reinit is triggered
768  if (!bunch_req_list.empty() && !client()->mgr()->is_reinit_triggered()) {
769  if (!ReadObjUUIDTable(bunch_req_list))
770  return false;
771  RemoveObjReqEntries(bunch_req_list);
772  }
773  // Clear the UUID read set if we are currently processing reinit request
774  if (client()->mgr()->is_reinit_triggered()) {
775  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
776  "Cassandra SM: Clear UUID read set due to reinit");
777  uuid_read_set_.clear();
778  }
779  assert(uuid_read_set_.empty());
780  return true;
781 }
782 
784  BOOST_FOREACH(string uuid, req_list) {
785  RemoveObjReqEntry(uuid);
786  }
787  req_list.clear();
788 }
789 
791  UUIDProcessSet::iterator req_it =
792  uuid_read_set_.find(client()->GetUUID(uuid));
793  delete req_it->second;
794  uuid_read_set_.erase(req_it);
795 }
796 
797 
798 boost::asio::io_context *ConfigCassandraPartition::ioservice() {
799  return client()->event_manager()->io_service();
800 }
801 
804  ObjectCacheMap::iterator uuid_iter = object_cache_map_.find(uuid);
805  if (uuid_iter == object_cache_map_.end())
806  return NULL;
807  return uuid_iter->second;
808 }
809 
811 ConfigCassandraPartition::GetObjCacheEntry(const string &uuid) const {
812  ObjectCacheMap::const_iterator uuid_iter = object_cache_map_.find(uuid);
813  if (uuid_iter == object_cache_map_.end())
814  return NULL;
815  return uuid_iter->second;
816 }
817 
819  const ObjCacheEntry *obj) const {
820  uint32_t retry_time_pow_of_two =
823  return ((1 << retry_time_pow_of_two) * kMinUUIDRetryTimeMSec);
824 }
825 
827  if (retry_timer_) {
829  }
830 }
831 
833  const string uuid) {
834  if (!retry_timer_) {
835  retry_timer_ = TimerManager::CreateTimer(
836  *parent_->client()->event_manager()->io_service(),
837  "UUID retry timer for " + uuid,
839  "config_client::Reader"),
840  parent_->worker_id_);
841  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
842  "Created UUID read retry timer ", uuid);
843  }
844  retry_timer_->Cancel();
845  retry_timer_->Start(parent_->UUIDRetryTimeInMSec(this),
846  boost::bind(
848  this, uuid),
849  boost::bind(
851  this));
852  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
853  "Start/restart UUID Read Retry timer due to configuration", uuid);
854 }
855 
857  const string uuid) {
858  if (retry_timer_) {
859  retry_timer_->Cancel();
860  TimerManager::DeleteTimer(retry_timer_);
861  retry_timer_ = NULL;
862  retry_count_ = 0;
863  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
864  "UUID Read retry timer - deleted timer due to configuration",
865  uuid);
866  }
867 }
868 
870  if (retry_timer_)
871  return (retry_timer_->running());
872  return false;
873 }
874 
876  const string uuid) {
877  parent_->client()->mgr()->EnqueueUUIDRequest(
878  "UPDATE", GetObjType(), parent_->client()->uuid_str(uuid));
879  retry_count_++;
880  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry, "timer expired ", uuid);
881  return false;
882 }
883 
884 void
886  std::string message = "Timer";
887  CONFIG_CLIENT_WARN(ConfigClientGetRowError,
888  "UUID Read Retry Timer error ", message, message);
889 }
890 
892  const string &uuid, ConfigCassandraParseContext &context) {
893  for (set<string>::iterator it =
894  context.candidate_list_map_properties.begin();
895  it != context.candidate_list_map_properties.end(); it++) {
896  if (context.updated_list_map_properties.find(*it) !=
897  context.updated_list_map_properties.end()) {
898  continue;
899  }
900  ObjectCacheMap::iterator uuid_iter = object_cache_map_.find(uuid);
901  assert(uuid_iter != object_cache_map_.end());
902  FieldDetailMap::iterator field_iter =
903  uuid_iter->second->GetFieldDetailMap().lower_bound(
904  JsonAdapterDataType(*it, ""));
905  assert(field_iter != uuid_iter->second->GetFieldDetailMap().end());
906  assert(it->compare(0, it->size() - 1, field_iter->first.key,
907  0, it->size() - 1) == 0);
908  while (it->compare(0, it->size() - 1, field_iter->first.key,
909  0, it->size() - 1) == 0) {
910  if (field_iter->second.refreshed == false) {
911  context.updated_list_map_properties.insert(*it);
912  break;
913  }
914  field_iter++;
915  }
916  }
917 }
918 
920  JsonAdapterDataType *adapter, uint64_t timestamp,
921  ConfigCassandraParseContext &context) {
922  ObjectCacheMap::iterator uuid_iter = object_cache_map_.find(uuid);
923  assert(uuid_iter != object_cache_map_.end());
924  size_t from_front_pos = adapter->key.find(':');
925  size_t from_back_pos = adapter->key.rfind(':');
926  string type_field = adapter->key.substr(0, from_front_pos+1);
927  bool is_ref = (type_field == ConfigCass2JsonAdapter::ref_prefix);
928  bool is_parent = (type_field == ConfigCass2JsonAdapter::parent_prefix);
929  bool is_propl = (type_field == ConfigCass2JsonAdapter::list_prop_prefix);
930  bool is_propm = (type_field == ConfigCass2JsonAdapter::map_prop_prefix);
931  bool is_prop = (type_field == ConfigCass2JsonAdapter::prop_prefix);
932  if (is_prop) {
933  string prop_name = adapter->key.substr(from_front_pos+1);
934  //
935  // properties like perms2 has no importance to control-node/dns
936  // This property is present on each config object. Hence skipping such
937  // properties gives performance improvement
938  //
939  if (ConfigClientManager::skip_properties.find(prop_name) !=
941  if ((prop_name.compare("draft_mode_state") == 0) &&
942  !adapter->value.empty()) {
943  context.ignore_object = true;
944  }
945  return false;
946  }
947  }
948 
949  string prop_name = "";
950  if (is_ref || is_parent) {
951  string ref_uuid = adapter->key.substr(from_back_pos+1);
952 
953  string ref_name = client()->UUIDToFQName(ref_uuid).second;
954  if (ref_name == "ERROR") {
955  context.parent_or_ref_fq_name_unknown = true;
956  CONFIG_CLIENT_DEBUG(ConfigClientReadRetry,
957  "Out of order parent or ref", uuid + ":" + adapter->key);
958  return false;
959  }
960  if (is_ref) {
961  adapter->ref_fq_name = ref_name;
962  }
963  } else if (is_propl || is_propm) {
964  prop_name = adapter->key.substr(0, from_back_pos);
965 
966  context.list_map_properties.insert(make_pair(prop_name, *adapter));
967  }
968 
969  if (adapter->key.compare("type") == 0) {
970  if (context.obj_type.empty()) {
971  context.obj_type = adapter->value;
972  context.obj_type.erase(remove(context.obj_type.begin(),
973  context.obj_type.end(), '\"'), context.obj_type.end());
974  }
975  } else if (adapter->key.compare("fq_name") == 0) {
976  context.fq_name_present = true;
977  if (context.fq_name.empty()) {
978  context.fq_name = adapter->value.substr(1, adapter->value.size()-2);
979  context.fq_name.erase(remove(context.fq_name.begin(),
980  context.fq_name.end(), '\"'), context.fq_name.end());
981  context.fq_name.erase(remove(context.fq_name.begin(),
982  context.fq_name.end(), ' '), context.fq_name.end());
983  replace(context.fq_name.begin(), context.fq_name.end(), ',', ':');
984  }
985  }
986  FieldDetailMap::iterator field_iter =
987  uuid_iter->second->GetFieldDetailMap().find(*adapter);
988  if (field_iter == uuid_iter->second->GetFieldDetailMap().end()) {
989  // seeing field for first time
990  FieldTimeStampInfo field_ts_info;
991  field_ts_info.refreshed = true;
992  field_ts_info.time_stamp = timestamp;
993  uuid_iter->second->GetFieldDetailMap().insert(make_pair
994  (*adapter, field_ts_info));
995  } else {
996  field_iter->second.refreshed = true;
997  if (client()->SkipTimeStampCheckForTypeAndFQName() &&
998  ((adapter->key.compare("type") == 0) ||
999  (adapter->key.compare("fq_name") == 0))) {
1000  return true;
1001  }
1002  if (timestamp && field_iter->second.time_stamp == timestamp) {
1003  if (is_propl || is_propm) {
1004  context.candidate_list_map_properties.insert(prop_name);
1005  }
1006  return false;
1007  }
1008  field_iter->second.time_stamp = timestamp;
1009  }
1010  if (is_propl || is_propm) {
1011  context.updated_list_map_properties.insert(prop_name);
1012  return false;
1013  } else {
1014  return true;
1015  }
1016 }
1017 
1020  ObjectCacheMap::iterator uuid_iter = object_cache_map_.find(uuid);
1021  if (uuid_iter == object_cache_map_.end()) {
1022  ObjCacheEntry *obj;
1023  string tmp_uuid = uuid;
1024  obj = new ObjCacheEntry(this, UTCTimestampUsec());
1025  pair<ObjectCacheMap::iterator, bool> ret_uuid =
1026  object_cache_map_.insert(tmp_uuid, obj);
1027  assert(ret_uuid.second);
1028  uuid_iter = ret_uuid.first;
1029  } else {
1030  uuid_iter->second->SetLastReadTimeStamp(UTCTimestampUsec());
1031  }
1032  for (FieldDetailMap::iterator it =
1033  uuid_iter->second->GetFieldDetailMap().begin();
1034  it != uuid_iter->second->GetFieldDetailMap().end(); it++) {
1035  it->second.refreshed = false;
1036  }
1037  return uuid_iter->second;
1038 }
1039 
1041  ObjectCacheMap::const_iterator uuid_iter,
1042  ConfigDBUUIDCacheEntry *entry) const {
1043  entry->set_uuid(uuid);
1044  entry->set_timestamp(
1045  UTCUsecToString(uuid_iter->second->GetLastReadTimeStamp()));
1046  entry->set_retry_count(uuid_iter->second->GetRetryCount());
1047  entry->set_fq_name(uuid_iter->second->GetFQName());
1048  entry->set_obj_type(uuid_iter->second->GetObjType());
1049  entry->set_timer_running(uuid_iter->second->IsRetryTimerRunning());
1050  entry->set_timer_created(uuid_iter->second->IsRetryTimerCreated());
1051  vector<ConfigDBUUIDCacheData> fields;
1052  for (FieldDetailMap::const_iterator it =
1053  uuid_iter->second->GetFieldDetailMap().begin();
1054  it != uuid_iter->second->GetFieldDetailMap().end(); it++) {
1055  ConfigDBUUIDCacheData each_field;
1056  each_field.set_refresh(it->second.refreshed);
1057  each_field.set_field_name(it->first.key);
1058  each_field.set_timestamp(UTCUsecToString(it->second.time_stamp));
1059  fields.push_back(each_field);
1060  }
1061  entry->set_field_list(fields);
1062 }
1063 
1065  const string &search_string, const string &last_uuid, uint32_t num_entries,
1066  vector<ConfigDBUUIDCacheEntry> *entries) const {
1067  uint32_t count = 0;
1068  regex search_expr(search_string);
1069  for (ObjectCacheMap::const_iterator it =
1070  object_cache_map_.upper_bound(last_uuid);
1071  count < num_entries && it != object_cache_map_.end(); it++) {
1072  if (regex_search(it->first, search_expr) ||
1073  regex_search(it->second->GetObjType(), search_expr) ||
1074  regex_search(it->second->GetFQName(), search_expr)) {
1075  count++;
1076  ConfigDBUUIDCacheEntry entry;
1077  FillUUIDToObjCacheInfo(it->first, it, &entry);
1078  entries->push_back(entry);
1079  }
1080  }
1081  return true;
1082 }
void DeleteCacheMap(const std::string &uuid)
static const std::string ref_prefix
ConfigJsonParserBase * config_json_parser()
std::string uuid_str_
static const std::string list_prop_prefix
std::string value_
void STLDeleteValues(Container *container)
Definition: util.h:101
bool UUIDToObjCacheShow(const std::string &search_string, const std::string &last_uuid, uint32_t num_entries, std::vector< ConfigDBUUIDCacheEntry > *entries) const
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
Definition: gendb_if.h:238
ConfigClientManager * mgr()
EventManager * event_manager()
bool EnqueueDBSyncRequest(const ObjTypeUUIDList &uuid_list)
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
void RemoveObjReqEntry(std::string &uuid)
DbDataValueVec rowkey_
Definition: gendb_if.h:198
const std::string & config_db_user() const
virtual void GenerateAndPushJson(const string &uuid_key, const string &obj_type, const CassColumnKVVec &cass_data_vec, bool add_change)
virtual void SetObjType(std::string obj_type)
void UpdateFQNameCache(const std::string &key, const std::string &obj_type, ObjTypeUUIDList &uuid_list)
ConfigCassandraPartition(ConfigCassandraClient *client, size_t idx)
const uint8_t * data() const
Definition: gendb_if.h:60
void Enqueue(ObjectProcessReq *req)
boost::asio::io_context * io_service()
Definition: event_manager.h:42
virtual int UUIDRetryTimeInMSec(const ObjCacheEntry *obj) const
virtual const uint64_t GetInitRetryTimeUSec() const
ObjTypeFQNPair UUIDToFQName(const std::string &uuid_str, bool deleted_ok=true) const
boost::uuids::uuid uuid
void ParseObjUUIDTableEachColumnBuildContext(const std::string &uuid, const std::string &key, const std::string &value, uint64_t timestamp, CassColumnKVVec *cass_data_vec, ConfigCassandraParseContext &context)
virtual void SetFQName(std::string fq_name)
static const std::string prop_prefix
static const uint32_t kMaxUUIDRetryTimePowOfTwo
virtual void InitConnectionInfo()
virtual void UpdateConnectionInfo(bool success, bool force)
virtual void ClearFQNameCache()
#define CONFIG_CLIENT_DEBUG(obj,...)
std::pair< std::string, std::string > ObjTypeFQNPair
void RemoveObjReqEntries(std::set< std::string > &req_list)
int GetTaskId(const std::string &name)
Definition: task.cc:856
virtual std::string uuid_str(const std::string &uuid)
boost::scoped_ptr< TaskTrigger > fq_name_reader_
static bool regex_match(const std::string &input, const regex &regex)
Definition: regex.h:34
bool CassReadRetryTimerExpired(const std::string uuid)
DbDataValueVec start_
Definition: gendb_if.h:225
virtual void ParseObjUUIDTableEntry(const std::string &uuid, const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec, ConfigCassandraParseContext &context)
bool StoreKeyIfUpdated(const std::string &uuid, JsonAdapterDataType *adapter, uint64_t timestamp, ConfigCassandraParseContext &context)
static const std::string kObjectProcessTaskId
static const std::set< std::string > skip_properties
virtual uint32_t GetFQNameEntriesToRead() const
virtual void PurgeFQNameCache(const std::string &uuid)
void HandleCassandraConnectionStatus(bool success, bool force_update=false)
ObjCacheEntry * GetObjCacheEntry(const std::string &uuid)
void EnableCassandraReadRetry(const std::string uuid)
virtual bool IsTaskTriggered() const
static TaskScheduler * GetInstance()
Definition: task.cc:547
static const std::string kCassClientTaskId
std::vector< DbDataValue > DbDataValueVec
Definition: gendb_if.h:100
static Options options
boost::scoped_ptr< DbDataValueVec > timestamp
Definition: gendb_if.h:183
#define CHECK_CONCURRENCY(...)
std::list< ObjTypeUUIDType > ObjTypeUUIDList
boost::variant< boost::blank, std::string, uint64_t, uint32_t, boost::uuids::uuid, uint8_t, uint16_t, double, IpAddress, Blob > DbDataValue
Definition: gendb_if.h:85
boost::scoped_ptr< DbDataValueVec > value
Definition: gendb_if.h:181
std::vector< JsonAdapterDataType > CassColumnKVVec
void DisableCassandraReadRetry(const std::string uuid)
virtual void HandleObjectDelete(const string &uuid, bool add_change)
const std::string & config_db_password() const
static bool regex_search(const std::string &input, const regex &regex)
Definition: regex.h:25
static Timer * CreateTimer(boost::asio::io_context &service, const std::string &name, int task_id=Timer::GetTimerTaskId(), int task_instance=Timer::GetTimerInstanceId(), bool delete_on_completion=false)
Definition: timer.cc:201
#define CONFIG_CLIENT_WARN(obj,...)
virtual void AddFQNameCache(const std::string &uuid, const std::string &obj_type, const std::string &fq_name)
NewColVec columns_
Definition: gendb_if.h:199
ObjProcessWorkQType obj_process_queue_
const std::vector< std::string > & config_db_ips() const
static const std::string kUuidTableName
bool ProcessObjUUIDTableEntry(const std::string &uuid_key, const GenDb::ColList &col_list)
boost::ptr_vector< ColList > ColListVec
Definition: gendb_if.h:208
ConfigCassandraClient(ConfigClientManager *mgr, EventManager *evm, const ConfigClientOptions &options, int num_workers)
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
size_t size() const
Definition: gendb_if.h:63
boost::shared_ptr< TaskTrigger > config_reader_
void AddUUIDToRequestList(const std::string &oper, const std::string &obj_type, const std::string &uuid_str)
static const std::string kFqnTableName
virtual bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
const ObjectTypeList & ObjectTypeListToRead() const
virtual std::string FetchUUIDFromFQNameEntry(const std::string &key) const
static ConnectionState * GetInstance()
virtual bool Receive(const ConfigCass2JsonAdapter &adapter, bool add_change)=0
ConfigCassandraPartition * GetPartition(const std::string &uuid)
virtual bool UUIDToObjCacheShow(const std::string &search_string, int inst_num, const std::string &last_uuid, uint32_t num_entries, std::vector< ConfigDBUUIDCacheEntry > *entries) const
static const std::string parent_prefix
virtual int HashUUID(const std::string &uuid_str) const
boost::scoped_ptr< DbDataValueVec > name
Definition: gendb_if.h:180
int GetFirstConfigDbPort() const
virtual uint32_t GetNumReadRequestToBunch() const
void ListMapPropReviseUpdateList(const std::string &uuid, ConfigCassandraParseContext &context)
static const std::string map_prop_prefix
void FillUUIDToObjCacheInfo(const std::string &uuid, ObjectCacheMap::const_iterator uuid_iter, ConfigDBUUIDCacheEntry *entry) const
boost::asio::io_context * ioservice()
static const uint32_t kMinUUIDRetryTimeMSec
tbb::atomic< long > bulk_sync_status_
DISALLOW_COPY_AND_ASSIGN(ConfigCassandraParseContext)
bool RequestHandler(ObjectProcessReq *req)
virtual bool ReadObjUUIDTable(const std::set< std::string > &uuid_list)
bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
std::multimap< string, JsonAdapterDataType > list_map_properties
static EventManager evm
ObjCacheEntry * MarkCacheDirty(const std::string &uuid)
static bool DeleteTimer(Timer *Timer)
Definition: timer.cc:222
bool ParseFQNameRowGetUUIDList(const std::string &obj_type, const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list, std::string *last_column)
static std::string UTCUsecToString(uint64_t tstamp)
Definition: time_util.h:54
ConfigCassandraClient * client()