OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
config_cassandra_client.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef config_cass_client_h
6 #define config_cass_client_h
7 
8 #include <boost/ptr_container/ptr_map.hpp>
9 #include <boost/shared_ptr.hpp>
10 
11 #include <list>
12 #include <map>
13 #include <set>
14 #include <string>
15 #include <utility>
16 #include <vector>
17 
18 #include "base/queue_task.h"
19 #include "base/timer.h"
20 
21 #include "config_db_client.h"
24 #include "database/gendb_if.h"
25 #include "json_adapter_data.h"
26 
27 class EventManager;
29 struct ConfigDBConnInfo;
30 class TaskTrigger;
33 class ConfigDBFQNameCacheEntry;
34 class ConfigDBUUIDCacheEntry;
35 
37  public:
39  virtual ~ConfigCassandraPartition();
40 
41  typedef boost::shared_ptr<WorkQueue<ObjectProcessReq *> >
43 
45  uint64_t time_stamp;
46  bool refreshed;
47  };
48 
49  struct cmp_json_key {
51  const JsonAdapterDataType &k2) const {
52  return k1.key < k2.key;
53  }
54  };
55  typedef std::map<JsonAdapterDataType, FieldTimeStampInfo, cmp_json_key>
58  public:
60  uint64_t last_read_tstamp)
61  : ObjectCacheEntry(last_read_tstamp),
62  retry_count_(0), retry_timer_(NULL),
63  parent_(parent) {
64  }
65 
66  virtual ~ObjCacheEntry();
67 
68  void EnableCassandraReadRetry(const std::string uuid);
69  void DisableCassandraReadRetry(const std::string uuid);
72  return field_detail_map_;
73  }
74  uint32_t GetRetryCount() const { return retry_count_; }
75  bool IsRetryTimerCreated() const { return (retry_timer_ != NULL); }
76  bool IsRetryTimerRunning() const;
78 
79  private:
83 
84  bool CassReadRetryTimerExpired(const std::string uuid);
86  uint32_t retry_count_;
90  };
91 
92  static const uint32_t kMaxUUIDRetryTimePowOfTwo = 20;
93  static const uint32_t kMinUUIDRetryTimeMSec = 100;
94  typedef boost::ptr_map<std::string, ObjCacheEntry> ObjectCacheMap;
95 
97  return obj_process_queue_;
98  }
99 
100  virtual int UUIDRetryTimeInMSec(const ObjCacheEntry *obj) const;
101  ObjCacheEntry *GetObjCacheEntry(const std::string &uuid);
102  const ObjCacheEntry *GetObjCacheEntry(const std::string &uuid) const;
103  bool StoreKeyIfUpdated(const std::string &uuid,
104  JsonAdapterDataType *adapter,
105  uint64_t timestamp,
106  ConfigCassandraParseContext &context);
107  void ListMapPropReviseUpdateList(const std::string &uuid,
108  ConfigCassandraParseContext &context);
109  ObjCacheEntry *MarkCacheDirty(const std::string &uuid);
110  void DeleteCacheMap(const std::string &uuid) {
111  object_cache_map_.erase(uuid);
112  }
113  void Enqueue(ObjectProcessReq *req);
114 
115 
116  bool UUIDToObjCacheShow(
117  const std::string &search_string, const std::string &last_uuid,
118  uint32_t num_entries,
119  std::vector<ConfigDBUUIDCacheEntry> *entries) const;
120  int GetInstanceId() const { return worker_id_; }
121 
122  boost::asio::io_context *ioservice();
123 
124  bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key);
125  bool IsTaskTriggered() const;
126 protected:
127  virtual bool ReadObjUUIDTable(const std::set<std::string> &uuid_list);
128  bool ProcessObjUUIDTableEntry(const std::string &uuid_key,
129  const GenDb::ColList &col_list);
130  virtual void ParseObjUUIDTableEntry(const std::string &uuid,
131  const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec,
132  ConfigCassandraParseContext &context);
133  void ParseObjUUIDTableEachColumnBuildContext(const std::string &uuid,
134  const std::string &key, const std::string &value,
135  uint64_t timestamp, CassColumnKVVec *cass_data_vec,
136  ConfigCassandraParseContext &context);
137  virtual void HandleObjectDelete(const string &uuid, bool add_change);
139 
140 private:
141  friend class ConfigCassandraClient;
142 
144  ObjectProcessRequestType(const std::string &in_oper,
145  const std::string &in_obj_type,
146  const std::string &in_uuid)
147  : oper(in_oper), obj_type(in_obj_type), uuid(in_uuid) {
148  }
149  std::string oper;
150  std::string obj_type;
151  std::string uuid;
152  };
153 
154  typedef std::map<std::string, ObjectProcessRequestType *> UUIDProcessSet;
155 
156  bool RequestHandler(ObjectProcessReq *req);
157  void AddUUIDToRequestList(const std::string &oper,
158  const std::string &obj_type, const std::string &uuid_str);
159  bool ConfigReader();
160  void RemoveObjReqEntries(std::set<std::string> &req_list);
161  void RemoveObjReqEntry(std::string &uuid);
162  virtual void GenerateAndPushJson(
163  const string &uuid_key, const string &obj_type,
164  const CassColumnKVVec &cass_data_vec, bool add_change);
165 
166  void FillUUIDToObjCacheInfo(const std::string &uuid,
167  ObjectCacheMap::const_iterator uuid_iter,
168  ConfigDBUUIDCacheEntry *entry) const;
169 
173  boost::shared_ptr<TaskTrigger> config_reader_;
176 };
177 
178 /*
179  * This class has the functionality to interact with the cassandra servers that
180  * store the user configuration.
181  */
183  public:
184  // Cassandra table names
185  static const std::string kUuidTableName;
186  static const std::string kFqnTableName;
187 
188  // Task names
189  static const std::string kCassClientTaskId;
190  static const std::string kObjectProcessTaskId;
191 
192  // Number of UUIDs to read in one read request
193  static const int kMaxNumUUIDToRead = 64;
194 
195  // Number of FQName entries to read in one read request
196  static const int kNumFQNameEntriesToRead = 4096;
197 
198  typedef boost::scoped_ptr<GenDb::GenDbIf> GenDbIfPtr;
199  typedef std::vector<ConfigCassandraPartition *> PartitionList;
200 
203  int num_workers);
204  virtual ~ConfigCassandraClient();
205 
206  virtual void InitDatabase();
207  void BulkSyncDone();
208  ConfigCassandraPartition *GetPartition(const std::string &uuid);
209  const ConfigCassandraPartition *GetPartition(const std::string &uuid) const;
210  const ConfigCassandraPartition *GetPartition(int worker_id) const;
211 
212  void EnqueueUUIDRequest(std::string oper, std::string obj_type,
213  std::string uuid_str);
214 
215  virtual bool UUIDToObjCacheShow(
216  const std::string &search_string, int inst_num,
217  const std::string &last_uuid, uint32_t num_entries,
218  std::vector<ConfigDBUUIDCacheEntry> *entries) const;
219  virtual bool IsListOrMapPropEmpty(const string &uuid_key,
220  const string &lookup_key);
221  virtual bool IsTaskTriggered() const;
222 protected:
223  typedef std::pair<std::string, std::string> ObjTypeUUIDType;
224  typedef std::list<ObjTypeUUIDType> ObjTypeUUIDList;
225 
226  void UpdateFQNameCache(const std::string &key, const std::string &obj_type,
227  ObjTypeUUIDList &uuid_list);
228  virtual bool BulkDataSync();
229  bool EnqueueDBSyncRequest(const ObjTypeUUIDList &uuid_list);
230  virtual std::string FetchUUIDFromFQNameEntry(const std::string &key) const;
231 
232  virtual int HashUUID(const std::string &uuid_str) const;
233  virtual bool SkipTimeStampCheckForTypeAndFQName() const { return true; }
234  virtual uint32_t GetFQNameEntriesToRead() const {
236  }
237  int num_workers() const { return num_workers_; }
239  virtual void PostShutdown();
240 
241  private:
243 
244  bool InitRetry();
245 
246  bool FQNameReader();
247  bool ParseFQNameRowGetUUIDList(const std::string &obj_type,
248  const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list,
249  std::string *last_column);
250 
251  void HandleCassandraConnectionStatus(bool success,
252  bool force_update = false);
253 
257  boost::scoped_ptr<TaskTrigger> fq_name_reader_;
258  tbb::atomic<long> bulk_sync_status_;
259 };
260 
261 #endif // config_cass_client_h
void DeleteCacheMap(const std::string &uuid)
std::pair< std::string, std::string > ObjTypeUUIDType
std::vector< ConfigCassandraPartition * > PartitionList
bool UUIDToObjCacheShow(const std::string &search_string, const std::string &last_uuid, uint32_t num_entries, std::vector< ConfigDBUUIDCacheEntry > *entries) const
ConfigClientManager * mgr()
bool EnqueueDBSyncRequest(const ObjTypeUUIDList &uuid_list)
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
void RemoveObjReqEntry(std::string &uuid)
bool operator()(const JsonAdapterDataType &k1, const JsonAdapterDataType &k2) const
virtual bool SkipTimeStampCheckForTypeAndFQName() const
virtual void GenerateAndPushJson(const string &uuid_key, const string &obj_type, const CassColumnKVVec &cass_data_vec, bool add_change)
ObjCacheEntry(ConfigCassandraPartition *parent, uint64_t last_read_tstamp)
const FieldDetailMap & GetFieldDetailMap() const
void UpdateFQNameCache(const std::string &key, const std::string &obj_type, ObjTypeUUIDList &uuid_list)
ConfigCassandraPartition(ConfigCassandraClient *client, size_t idx)
void Enqueue(ObjectProcessReq *req)
virtual int UUIDRetryTimeInMSec(const ObjCacheEntry *obj) const
ObjectProcessRequestType(const std::string &in_oper, const std::string &in_obj_type, const std::string &in_uuid)
boost::uuids::uuid uuid
void ParseObjUUIDTableEachColumnBuildContext(const std::string &uuid, const std::string &key, const std::string &value, uint64_t timestamp, CassColumnKVVec *cass_data_vec, ConfigCassandraParseContext &context)
static const uint32_t kMaxUUIDRetryTimePowOfTwo
void RemoveObjReqEntries(std::set< std::string > &req_list)
virtual std::string uuid_str(const std::string &uuid)
boost::scoped_ptr< TaskTrigger > fq_name_reader_
std::map< JsonAdapterDataType, FieldTimeStampInfo, cmp_json_key > FieldDetailMap
bool CassReadRetryTimerExpired(const std::string uuid)
virtual void ParseObjUUIDTableEntry(const std::string &uuid, const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec, ConfigCassandraParseContext &context)
bool StoreKeyIfUpdated(const std::string &uuid, JsonAdapterDataType *adapter, uint64_t timestamp, ConfigCassandraParseContext &context)
static const std::string kObjectProcessTaskId
virtual uint32_t GetFQNameEntriesToRead() const
void HandleCassandraConnectionStatus(bool success, bool force_update=false)
ObjCacheEntry * GetObjCacheEntry(const std::string &uuid)
void EnableCassandraReadRetry(const std::string uuid)
virtual bool IsTaskTriggered() const
static const std::string kCassClientTaskId
static Options options
boost::ptr_map< std::string, ObjCacheEntry > ObjectCacheMap
std::list< ObjTypeUUIDType > ObjTypeUUIDList
std::vector< JsonAdapterDataType > CassColumnKVVec
boost::shared_ptr< WorkQueue< ObjectProcessReq * > > ObjProcessWorkQType
void DisableCassandraReadRetry(const std::string uuid)
virtual void HandleObjectDelete(const string &uuid, bool add_change)
ObjProcessWorkQType obj_process_queue_
static const std::string kUuidTableName
bool ProcessObjUUIDTableEntry(const std::string &uuid_key, const GenDb::ColList &col_list)
boost::scoped_ptr< GenDb::GenDbIf > GenDbIfPtr
ConfigCassandraClient(ConfigClientManager *mgr, EventManager *evm, const ConfigClientOptions &options, int num_workers)
boost::shared_ptr< TaskTrigger > config_reader_
void AddUUIDToRequestList(const std::string &oper, const std::string &obj_type, const std::string &uuid_str)
static const std::string kFqnTableName
virtual bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
ObjProcessWorkQType obj_process_queue()
virtual std::string FetchUUIDFromFQNameEntry(const std::string &key) const
ConfigCassandraClient * config_client_
ConfigCassandraPartition * GetPartition(const std::string &uuid)
virtual bool UUIDToObjCacheShow(const std::string &search_string, int inst_num, const std::string &last_uuid, uint32_t num_entries, std::vector< ConfigDBUUIDCacheEntry > *entries) const
static const int kNumFQNameEntriesToRead
virtual int HashUUID(const std::string &uuid_str) const
Definition: timer.h:54
void ListMapPropReviseUpdateList(const std::string &uuid, ConfigCassandraParseContext &context)
void FillUUIDToObjCacheInfo(const std::string &uuid, ObjectCacheMap::const_iterator uuid_iter, ConfigDBUUIDCacheEntry *entry) const
boost::asio::io_context * ioservice()
static const uint32_t kMinUUIDRetryTimeMSec
tbb::atomic< long > bulk_sync_status_
std::map< std::string, ObjectProcessRequestType * > UUIDProcessSet
bool RequestHandler(ObjectProcessReq *req)
virtual bool ReadObjUUIDTable(const std::set< std::string > &uuid_list)
bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
static EventManager evm
ObjCacheEntry * MarkCacheDirty(const std::string &uuid)
static const int kMaxNumUUIDToRead
bool ParseFQNameRowGetUUIDList(const std::string &obj_type, const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list, std::string *last_column)
ConfigCassandraClient * client()