7 #include <sandesh/request_pipeline.h>
9 #include <boost/algorithm/string/join.hpp>
10 #include <boost/algorithm/string/predicate.hpp>
11 #include <boost/foreach.hpp>
12 #include <boost/functional/hash.hpp>
13 #include <boost/ptr_container/ptr_map.hpp>
14 #include <boost/uuid/uuid.hpp>
32 #include "config_client_log_types.h"
33 #include "config_client_show_types.h"
34 #include "sandesh/common/vns_constants.h"
39 using std::unique_ptr;
48 "config_client::ObjectProcessor";
54 dbif_.reset(ConfigStaticObjectFactory::CreateRef<cass::cql::CqlIf>(
66 ConfigStaticObjectFactory::Create<ConfigCassandraPartition>
67 (
this, static_cast<size_t>(i)));
88 if (!
dbif_->Db_Init()) {
90 "Database initialization failed");
95 "Cassandra SM: Db SetTableSpace");
96 if (!
dbif_->Db_SetTablespace(
97 g_vns_constants.API_SERVER_KEYSPACE_NAME)) {
99 "Setting database keyspace failed");
104 "Cassandra SM: Db UseColumnFamily uuidTableName");
110 "Cassandra SM: Db UseColumnFamily fqnTableName");
125 if (
mgr()->is_reinit_triggered())
return false;
149 boost::hash<string> string_hash;
156 set<string> uuid_list = req_list;
157 vector<GenDb::DbDataValueVec> keys;
158 for (set<string>::const_iterator it = uuid_list.begin();
159 it != uuid_list.end(); it++) {
161 key.push_back(
GenDb::Blob(reinterpret_cast<const uint8_t *>
162 (it->c_str()), it->size()));
166 GenDb::Blob col_filter(reinterpret_cast<const uint8_t *>(
"d"), 1);
172 field_vec.push_back(boost::make_tuple(
"key",
true,
false,
false));
173 field_vec.push_back(boost::make_tuple(
"column1",
false,
true,
false));
174 field_vec.push_back(boost::make_tuple(
"value",
false,
false,
true));
176 if (
client()->dbif_->Db_GetMultiRow(&col_list_vec,
182 assert(col_list.
rowkey_.size() == 1);
186 string uuid_str(reinterpret_cast<const char *>(uuid.
data()),
189 uuid_list.erase(uuid_str);
197 "GetMultiRow failed for table",
212 BOOST_FOREACH(
string uuid_key, uuid_list) {
293 "Parsing row response for type/fq_name failed for table",
313 for (set<string>::iterator it =
316 pair<multimap<string, JsonAdapterDataType>::iterator,
317 multimap<string, JsonAdapterDataType>::iterator> ret =
319 for (multimap<string, JsonAdapterDataType>::iterator mit =
320 ret.first; mit != ret.second; mit++) {
321 cass_data_vec.push_back(mit->second);
334 assert(ncol.
name->size() == 1);
335 assert(ncol.
value->size() == 1);
340 GenDb::Blob dname_blob(boost::get<GenDb::Blob>(dname));
341 string key(reinterpret_cast<const char *>(dname_blob.data()),
346 string value(boost::get<string>(dvalue));
350 uint64_t timestamp = boost::get<uint64_t>(dtimestamp);
352 cass_data_vec, context);
357 const string &
uuid,
const string &key,
const string &value,
364 cass_data_vec->push_back(adapter);
369 const string &uuid_key,
const string &obj_type,
384 "Cassandra SM: Post shutdown during re init");
393 ConfigClientMgrDebug,
"Cassandra SM: BulkDataSync Started");
418 for (ConfigClientManager::ObjectTypeList::const_iterator it =
419 mgr()->config_json_parser()->ObjectTypeListToRead().begin();
427 "Cassandra SM: Abort FQName reader on reinit trigger");
433 key.push_back(
GenDb::Blob(reinterpret_cast<const uint8_t *>
434 (it->c_str()), it->size()));
436 if (!column_name.empty()) {
437 GenDb::Blob col_filter(reinterpret_cast<const uint8_t *>
438 (column_name.c_str()), column_name.size());
442 boost::assign::list_of(
454 field_vec.push_back(boost::make_tuple(
"key",
true,
false,
false));
455 field_vec.push_back(boost::make_tuple(
"column1",
false,
true,
496 string *last_column) {
499 assert(ncol.
name->size() == 1);
502 GenDb::Blob dname_blob(boost::get<GenDb::Blob>(dname));
503 column_name = string(reinterpret_cast<const char *>(dname_blob.data()),
508 *last_column = column_name;
515 if (uuid_str.empty())
517 uuid_list.push_back(make_pair(obj_type, uuid_str));
518 AddFQNameCache(uuid_str, obj_type, key.substr(0, key.rfind(
':')));
522 const string &key)
const {
523 size_t temp = key.rfind(
':');
524 return (temp == string::npos) ?
"" : key.substr(temp+1);
529 for (ObjTypeUUIDList::const_iterator it = uuid_list.begin();
530 it != uuid_list.end(); it++) {
537 const string &search_string,
int inst_num,
const string &last_uuid,
538 uint32_t num_entries, vector<ConfigDBUUIDCacheEntry> *entries)
const {
540 num_entries, entries);
550 long num_config_readers_still_processing =
552 if (num_config_readers_still_processing == 1) {
554 "Cassandra SM: BulkSyncDone by all readers");
558 "Cassandra SM: One reader finished BulkSync");
569 process::ConnectionType::DATABASE,
"Cassandra",
570 process::ConnectionStatus::UP,
571 dbif_->Db_GetEndpoints(),
"Established Cassandra connection");
573 "Cassandra SM: Established Cassandra connection");
576 process::ConnectionType::DATABASE,
"Cassandra",
577 process::ConnectionStatus::DOWN,
578 dbif_->Db_GetEndpoints(),
"Lost Cassandra connection");
580 "Cassandra SM: Lost Cassandra connection");
585 const string &lookup_key) {
591 : config_client_(client), worker_id_(idx) {
618 const string &obj_type,
619 const string &uuid_str) {
620 pair<UUIDProcessSet::iterator, bool> ret;
631 ret.first->second->
oper = oper;
632 ret.first->second->uuid = uuid_str;
637 const string &
uuid,
bool add_change) {
641 if (obj_type_fq_name_pair.second ==
"ERROR") {
646 bool needNotify =
false;
647 std::string obj_type(
"");
655 for (FieldDetailMap::iterator it =
656 uuid_iter->second->GetFieldDetailMap().begin(), itnext;
657 it != uuid_iter->second->GetFieldDetailMap().end();
661 if (it->first.key ==
"type") {
662 obj_type = it->first.value;
663 obj_type.erase(
remove(obj_type.begin(),
664 obj_type.end(),
'\"'), obj_type.end());
665 cass_data_vec.push_back(it->first);
668 if (it->first.key ==
"fq_name") {
669 cass_data_vec.push_back(it->first);
672 if (!add_change || !it->second.refreshed) {
673 if (it->first.key ==
"type" || it->first.key ==
"fq_name") {
678 cass_data_vec.push_back(it->first);
680 uuid_iter->second->GetFieldDetailMap().erase(it);
685 if (add_change !=
true) {
697 const string &lookup_key) {
704 key =
"propm:" + lookup_key;
705 FieldDetailMap::iterator lower_bound_it =
706 uuid_iter->second->GetFieldDetailMap().lower_bound(
708 if (lower_bound_it != uuid_iter->second->GetFieldDetailMap().end() &&
709 boost::starts_with(lower_bound_it->first.key, key)) {
712 key =
"propl:" + lookup_key;
714 uuid_iter->second->GetFieldDetailMap().lower_bound(
716 if (lower_bound_it != uuid_iter->second->GetFieldDetailMap().end() &&
717 boost::starts_with(lower_bound_it->first.key, key)) {
730 set<string> bunch_req_list;
731 int num_req_handled = 0;
733 for (UUIDProcessSet::iterator it =
uuid_read_set_.begin(), itnext;
740 if (obj_req->
oper ==
"CREATE" || obj_req->
oper ==
"UPDATE" ||
741 obj_req->
oper ==
"UPDATE-IMPLICIT") {
742 bunch_req_list.insert(obj_req->
uuid);
749 num_req_handled += bunch_req_list.size();
751 if (num_req_handled >=
client()->GetMaxRequestsToYield()) {
756 }
else if (obj_req->
oper ==
"DELETE") {
758 }
else if (obj_req->
oper ==
"EndOfConfig") {
762 if (++num_req_handled ==
client()->GetMaxRequestsToYield()) {
774 if (
client()->mgr()->is_reinit_triggered()) {
776 "Cassandra SM: Clear UUID read set due to reinit");
784 BOOST_FOREACH(
string uuid, req_list) {
791 UUIDProcessSet::iterator req_it =
793 delete req_it->second;
807 return uuid_iter->second;
815 return uuid_iter->second;
820 uint32_t retry_time_pow_of_two =
836 *parent_->client()->event_manager()->io_service(),
837 "UUID retry timer for " +
uuid,
839 "config_client::Reader"),
840 parent_->worker_id_);
842 "Created UUID read retry timer ", uuid);
844 retry_timer_->Cancel();
845 retry_timer_->Start(parent_->UUIDRetryTimeInMSec(
this),
853 "Start/restart UUID Read Retry timer due to configuration", uuid);
859 retry_timer_->Cancel();
864 "UUID Read retry timer - deleted timer due to configuration",
871 return (retry_timer_->running());
877 parent_->client()->mgr()->EnqueueUUIDRequest(
878 "UPDATE", GetObjType(), parent_->client()->uuid_str(uuid));
886 std::string message =
"Timer";
888 "UUID Read Retry Timer error ", message, message);
893 for (set<string>::iterator it =
902 FieldDetailMap::iterator field_iter =
903 uuid_iter->second->GetFieldDetailMap().lower_bound(
905 assert(field_iter != uuid_iter->second->GetFieldDetailMap().end());
906 assert(it->compare(0, it->size() - 1, field_iter->first.key,
907 0, it->size() - 1) == 0);
908 while (it->compare(0, it->size() - 1, field_iter->first.key,
909 0, it->size() - 1) == 0) {
910 if (field_iter->second.refreshed ==
false) {
924 size_t from_front_pos = adapter->
key.find(
':');
925 size_t from_back_pos = adapter->
key.rfind(
':');
926 string type_field = adapter->
key.substr(0, from_front_pos+1);
933 string prop_name = adapter->
key.substr(from_front_pos+1);
941 if ((prop_name.compare(
"draft_mode_state") == 0) &&
942 !adapter->
value.empty()) {
949 string prop_name =
"";
950 if (is_ref || is_parent) {
951 string ref_uuid = adapter->
key.substr(from_back_pos+1);
954 if (ref_name ==
"ERROR") {
957 "Out of order parent or ref", uuid +
":" + adapter->
key);
963 }
else if (is_propl || is_propm) {
964 prop_name = adapter->
key.substr(0, from_back_pos);
969 if (adapter->
key.compare(
"type") == 0) {
975 }
else if (adapter->
key.compare(
"fq_name") == 0) {
983 replace(context.
fq_name.begin(), context.
fq_name.end(),
',',
':');
986 FieldDetailMap::iterator field_iter =
987 uuid_iter->second->GetFieldDetailMap().find(*adapter);
988 if (field_iter == uuid_iter->second->GetFieldDetailMap().end()) {
993 uuid_iter->second->GetFieldDetailMap().insert(make_pair
994 (*adapter, field_ts_info));
996 field_iter->second.refreshed =
true;
997 if (
client()->SkipTimeStampCheckForTypeAndFQName() &&
998 ((adapter->
key.compare(
"type") == 0) ||
999 (adapter->
key.compare(
"fq_name") == 0))) {
1002 if (timestamp && field_iter->second.time_stamp == timestamp) {
1003 if (is_propl || is_propm) {
1008 field_iter->second.time_stamp = timestamp;
1010 if (is_propl || is_propm) {
1023 string tmp_uuid =
uuid;
1025 pair<ObjectCacheMap::iterator, bool> ret_uuid =
1027 assert(ret_uuid.second);
1028 uuid_iter = ret_uuid.first;
1032 for (FieldDetailMap::iterator it =
1033 uuid_iter->second->GetFieldDetailMap().begin();
1034 it != uuid_iter->second->GetFieldDetailMap().end(); it++) {
1035 it->second.refreshed =
false;
1037 return uuid_iter->second;
1041 ObjectCacheMap::const_iterator uuid_iter,
1042 ConfigDBUUIDCacheEntry *entry)
const {
1043 entry->set_uuid(uuid);
1044 entry->set_timestamp(
1046 entry->set_retry_count(uuid_iter->second->GetRetryCount());
1047 entry->set_fq_name(uuid_iter->second->GetFQName());
1048 entry->set_obj_type(uuid_iter->second->GetObjType());
1049 entry->set_timer_running(uuid_iter->second->IsRetryTimerRunning());
1050 entry->set_timer_created(uuid_iter->second->IsRetryTimerCreated());
1051 vector<ConfigDBUUIDCacheData> fields;
1052 for (FieldDetailMap::const_iterator it =
1053 uuid_iter->second->GetFieldDetailMap().begin();
1054 it != uuid_iter->second->GetFieldDetailMap().end(); it++) {
1055 ConfigDBUUIDCacheData each_field;
1056 each_field.set_refresh(it->second.refreshed);
1057 each_field.set_field_name(it->first.key);
1059 fields.push_back(each_field);
1061 entry->set_field_list(fields);
1065 const string &search_string,
const string &last_uuid, uint32_t num_entries,
1066 vector<ConfigDBUUIDCacheEntry> *entries)
const {
1068 regex search_expr(search_string);
1069 for (ObjectCacheMap::const_iterator it =
1076 ConfigDBUUIDCacheEntry entry;
1078 entries->push_back(entry);
void DeleteCacheMap(const std::string &uuid)
static const std::string ref_prefix
ConfigJsonParserBase * config_json_parser()
static const std::string list_prop_prefix
void STLDeleteValues(Container *container)
bool UUIDToObjCacheShow(const std::string &search_string, const std::string &last_uuid, uint32_t num_entries, std::vector< ConfigDBUUIDCacheEntry > *entries) const
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
ConfigClientManager * mgr()
EventManager * event_manager()
ConfigCassandraParseContext()
bool EnqueueDBSyncRequest(const ObjTypeUUIDList &uuid_list)
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
void RemoveObjReqEntry(std::string &uuid)
const std::string & config_db_user() const
virtual ~ConfigCassandraClient()
virtual void GenerateAndPushJson(const string &uuid_key, const string &obj_type, const CassColumnKVVec &cass_data_vec, bool add_change)
virtual void SetObjType(std::string obj_type)
void UpdateFQNameCache(const std::string &key, const std::string &obj_type, ObjTypeUUIDList &uuid_list)
ConfigCassandraPartition(ConfigCassandraClient *client, size_t idx)
const uint8_t * data() const
void Enqueue(ObjectProcessReq *req)
boost::asio::io_context * io_service()
virtual int UUIDRetryTimeInMSec(const ObjCacheEntry *obj) const
virtual const uint64_t GetInitRetryTimeUSec() const
ObjTypeFQNPair UUIDToFQName(const std::string &uuid_str, bool deleted_ok=true) const
void CassReadRetryTimerErrorHandler()
UUIDProcessSet uuid_read_set_
void ParseObjUUIDTableEachColumnBuildContext(const std::string &uuid, const std::string &key, const std::string &value, uint64_t timestamp, CassColumnKVVec *cass_data_vec, ConfigCassandraParseContext &context)
virtual void SetFQName(std::string fq_name)
std::string config_db_ca_certs
virtual void InitDatabase()
static const std::string prop_prefix
static const uint32_t kMaxUUIDRetryTimePowOfTwo
virtual void PostShutdown()
virtual void InitConnectionInfo()
virtual void UpdateConnectionInfo(bool success, bool force)
virtual void ClearFQNameCache()
#define CONFIG_CLIENT_DEBUG(obj,...)
std::pair< std::string, std::string > ObjTypeFQNPair
void RemoveObjReqEntries(std::set< std::string > &req_list)
int GetTaskId(const std::string &name)
virtual std::string uuid_str(const std::string &uuid)
boost::scoped_ptr< TaskTrigger > fq_name_reader_
static bool regex_match(const std::string &input, const regex ®ex)
bool CassReadRetryTimerExpired(const std::string uuid)
virtual void ParseObjUUIDTableEntry(const std::string &uuid, const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec, ConfigCassandraParseContext &context)
bool StoreKeyIfUpdated(const std::string &uuid, JsonAdapterDataType *adapter, uint64_t timestamp, ConfigCassandraParseContext &context)
static const std::string kObjectProcessTaskId
static const std::set< std::string > skip_properties
virtual uint32_t GetFQNameEntriesToRead() const
virtual ~ConfigCassandraPartition()
virtual void PurgeFQNameCache(const std::string &uuid)
uint32_t GetRetryCount() const
void HandleCassandraConnectionStatus(bool success, bool force_update=false)
ObjCacheEntry * GetObjCacheEntry(const std::string &uuid)
void EnableCassandraReadRetry(const std::string uuid)
virtual bool IsTaskTriggered() const
static TaskScheduler * GetInstance()
static const std::string kCassClientTaskId
PartitionList partitions_
std::vector< DbDataValue > DbDataValueVec
virtual bool BulkDataSync()
bool IsTaskTriggered() const
boost::scoped_ptr< DbDataValueVec > timestamp
#define CHECK_CONCURRENCY(...)
std::list< ObjTypeUUIDType > ObjTypeUUIDList
boost::variant< boost::blank, std::string, uint64_t, uint32_t, boost::uuids::uuid, uint8_t, uint16_t, double, IpAddress, Blob > DbDataValue
boost::scoped_ptr< DbDataValueVec > value
std::vector< JsonAdapterDataType > CassColumnKVVec
bool parent_or_ref_fq_name_unknown
void DisableCassandraReadRetry(const std::string uuid)
virtual void HandleObjectDelete(const string &uuid, bool add_change)
const std::string & config_db_password() const
static bool regex_search(const std::string &input, const regex ®ex)
static Timer * CreateTimer(boost::asio::io_context &service, const std::string &name, int task_id=Timer::GetTimerTaskId(), int task_instance=Timer::GetTimerInstanceId(), bool delete_on_completion=false)
#define CONFIG_CLIENT_WARN(obj,...)
virtual void AddFQNameCache(const std::string &uuid, const std::string &obj_type, const std::string &fq_name)
bool IsRetryTimerRunning() const
ObjProcessWorkQType obj_process_queue_
const std::vector< std::string > & config_db_ips() const
static const std::string kUuidTableName
bool ProcessObjUUIDTableEntry(const std::string &uuid_key, const GenDb::ColList &col_list)
boost::ptr_vector< ColList > ColListVec
ConfigCassandraClient(ConfigClientManager *mgr, EventManager *evm, const ConfigClientOptions &options, int num_workers)
static uint64_t UTCTimestampUsec()
boost::shared_ptr< TaskTrigger > config_reader_
void AddUUIDToRequestList(const std::string &oper, const std::string &obj_type, const std::string &uuid_str)
static const std::string kFqnTableName
virtual bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
bool is_reinit_triggered()
const ObjectTypeList & ObjectTypeListToRead() const
virtual std::string FetchUUIDFromFQNameEntry(const std::string &key) const
static ConnectionState * GetInstance()
virtual bool Receive(const ConfigCass2JsonAdapter &adapter, bool add_change)=0
ObjectCacheMap object_cache_map_
ConfigCassandraPartition * GetPartition(const std::string &uuid)
virtual bool UUIDToObjCacheShow(const std::string &search_string, int inst_num, const std::string &last_uuid, uint32_t num_entries, std::vector< ConfigDBUUIDCacheEntry > *entries) const
static const std::string parent_prefix
virtual int HashUUID(const std::string &uuid_str) const
boost::scoped_ptr< DbDataValueVec > name
int GetFirstConfigDbPort() const
virtual uint32_t GetNumReadRequestToBunch() const
void ListMapPropReviseUpdateList(const std::string &uuid, ConfigCassandraParseContext &context)
static const std::string map_prop_prefix
void FillUUIDToObjCacheInfo(const std::string &uuid, ObjectCacheMap::const_iterator uuid_iter, ConfigDBUUIDCacheEntry *entry) const
set< string > candidate_list_map_properties
boost::asio::io_context * ioservice()
static const uint32_t kMinUUIDRetryTimeMSec
set< string > updated_list_map_properties
tbb::atomic< long > bulk_sync_status_
DISALLOW_COPY_AND_ASSIGN(ConfigCassandraParseContext)
bool RequestHandler(ObjectProcessReq *req)
virtual bool ReadObjUUIDTable(const std::set< std::string > &uuid_list)
bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
std::multimap< string, JsonAdapterDataType > list_map_properties
ObjCacheEntry * MarkCacheDirty(const std::string &uuid)
static bool DeleteTimer(Timer *Timer)
bool ParseFQNameRowGetUUIDList(const std::string &obj_type, const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list, std::string *last_column)
static std::string UTCUsecToString(uint64_t tstamp)
ConfigCassandraClient * client()