OpenSDN source code
config_cassandra_client.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef config_cass_client_h
6 #define config_cass_client_h
7 
8 #include <boost/ptr_container/ptr_map.hpp>
9 #include <boost/shared_ptr.hpp>
10 
11 #include <atomic>
12 #include <list>
13 #include <map>
14 #include <set>
15 #include <string>
16 #include <utility>
17 #include <vector>
18 
19 #include "base/queue_task.h"
20 #include "base/timer.h"
21 
22 #include "config_db_client.h"
25 #include "database/gendb_if.h"
26 #include "json_adapter_data.h"
27 
28 class EventManager;
30 struct ConfigDBConnInfo;
31 class TaskTrigger;
34 class ConfigDBFQNameCacheEntry;
35 class ConfigDBUUIDCacheEntry;
36 
38  public:
40  virtual ~ConfigCassandraPartition();
41 
42  typedef boost::shared_ptr<WorkQueue<ObjectProcessReq *> >
44 
46  uint64_t time_stamp;
47  bool refreshed;
48  };
49 
50  struct cmp_json_key {
52  const JsonAdapterDataType &k2) const {
53  return k1.key < k2.key;
54  }
55  };
56  typedef std::map<JsonAdapterDataType, FieldTimeStampInfo, cmp_json_key>
59  public:
61  uint64_t last_read_tstamp)
62  : ObjectCacheEntry(last_read_tstamp),
63  retry_count_(0), retry_timer_(NULL),
64  parent_(parent) {
65  }
66 
67  virtual ~ObjCacheEntry();
68 
69  void EnableCassandraReadRetry(const std::string uuid);
70  void DisableCassandraReadRetry(const std::string uuid);
73  return field_detail_map_;
74  }
75  uint32_t GetRetryCount() const { return retry_count_; }
76  bool IsRetryTimerCreated() const { return (retry_timer_ != NULL); }
77  bool IsRetryTimerRunning() const;
79 
80  private:
84 
85  bool CassReadRetryTimerExpired(const std::string uuid);
87  uint32_t retry_count_;
91  };
92 
93  static const uint32_t kMaxUUIDRetryTimePowOfTwo = 20;
94  static const uint32_t kMinUUIDRetryTimeMSec = 100;
95  typedef boost::ptr_map<std::string, ObjCacheEntry> ObjectCacheMap;
96 
98  return obj_process_queue_;
99  }
100 
101  virtual int UUIDRetryTimeInMSec(const ObjCacheEntry *obj) const;
102  ObjCacheEntry *GetObjCacheEntry(const std::string &uuid);
103  const ObjCacheEntry *GetObjCacheEntry(const std::string &uuid) const;
104  bool StoreKeyIfUpdated(const std::string &uuid,
105  JsonAdapterDataType *adapter,
106  uint64_t timestamp,
107  ConfigCassandraParseContext &context);
108  void ListMapPropReviseUpdateList(const std::string &uuid,
109  ConfigCassandraParseContext &context);
110  ObjCacheEntry *MarkCacheDirty(const std::string &uuid);
111  void DeleteCacheMap(const std::string &uuid) {
112  object_cache_map_.erase(uuid);
113  }
114  void Enqueue(ObjectProcessReq *req);
115 
116 
117  bool UUIDToObjCacheShow(
118  const std::string &search_string, const std::string &last_uuid,
119  uint32_t num_entries,
120  std::vector<ConfigDBUUIDCacheEntry> *entries) const;
121  int GetInstanceId() const { return worker_id_; }
122 
123  boost::asio::io_context *ioservice();
124 
125  bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key);
126  bool IsTaskTriggered() const;
127 protected:
128  virtual bool ReadObjUUIDTable(const std::set<std::string> &uuid_list);
129  bool ProcessObjUUIDTableEntry(const std::string &uuid_key,
130  const GenDb::ColList &col_list);
131  virtual void ParseObjUUIDTableEntry(const std::string &uuid,
132  const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec,
133  ConfigCassandraParseContext &context);
134  void ParseObjUUIDTableEachColumnBuildContext(const std::string &uuid,
135  const std::string &key, const std::string &value,
136  uint64_t timestamp, CassColumnKVVec *cass_data_vec,
137  ConfigCassandraParseContext &context);
138  virtual void HandleObjectDelete(const string &uuid, bool add_change);
140 
141 private:
142  friend class ConfigCassandraClient;
143 
145  ObjectProcessRequestType(const std::string &in_oper,
146  const std::string &in_obj_type,
147  const std::string &in_uuid)
148  : oper(in_oper), obj_type(in_obj_type), uuid(in_uuid) {
149  }
150  std::string oper;
151  std::string obj_type;
152  std::string uuid;
153  };
154 
155  typedef std::map<std::string, ObjectProcessRequestType *> UUIDProcessSet;
156 
157  bool RequestHandler(ObjectProcessReq *req);
158  void AddUUIDToRequestList(const std::string &oper,
159  const std::string &obj_type, const std::string &uuid_str);
160  bool ConfigReader();
161  void RemoveObjReqEntries(std::set<std::string> &req_list);
162  void RemoveObjReqEntry(std::string &uuid);
163  virtual void GenerateAndPushJson(
164  const string &uuid_key, const string &obj_type,
165  const CassColumnKVVec &cass_data_vec, bool add_change);
166 
167  void FillUUIDToObjCacheInfo(const std::string &uuid,
168  ObjectCacheMap::const_iterator uuid_iter,
169  ConfigDBUUIDCacheEntry *entry) const;
170 
174  boost::shared_ptr<TaskTrigger> config_reader_;
177 };
178 
179 /*
180  * This class has the functionality to interact with the cassandra servers that
181  * store the user configuration.
182  */
184  public:
185  // Cassandra table names
186  static const std::string kUuidTableName;
187  static const std::string kFqnTableName;
188 
189  // Task names
190  static const std::string kCassClientTaskId;
191  static const std::string kObjectProcessTaskId;
192 
193  // Number of UUIDs to read in one read request
194  static const int kMaxNumUUIDToRead = 64;
195 
196  // Number of FQName entries to read in one read request
197  static const int kNumFQNameEntriesToRead = 4096;
198 
199  typedef boost::scoped_ptr<GenDb::GenDbIf> GenDbIfPtr;
200  typedef std::vector<ConfigCassandraPartition *> PartitionList;
201 
204  int num_workers);
205  virtual ~ConfigCassandraClient();
206 
207  virtual void InitDatabase();
208  void BulkSyncDone();
210  const ConfigCassandraPartition *GetPartition(const std::string &uuid) const;
211  const ConfigCassandraPartition *GetPartition(int worker_id) const;
212 
213  void EnqueueUUIDRequest(std::string oper, std::string obj_type,
214  std::string uuid_str);
215 
216  virtual bool UUIDToObjCacheShow(
217  const std::string &search_string, int inst_num,
218  const std::string &last_uuid, uint32_t num_entries,
219  std::vector<ConfigDBUUIDCacheEntry> *entries) const;
220  virtual bool IsListOrMapPropEmpty(const string &uuid_key,
221  const string &lookup_key);
222  virtual bool IsTaskTriggered() const;
223 protected:
224  typedef std::pair<std::string, std::string> ObjTypeUUIDType;
225  typedef std::list<ObjTypeUUIDType> ObjTypeUUIDList;
226 
227  void UpdateFQNameCache(const std::string &key, const std::string &obj_type,
228  ObjTypeUUIDList &uuid_list);
229  virtual bool BulkDataSync();
230  bool EnqueueDBSyncRequest(const ObjTypeUUIDList &uuid_list);
231  virtual std::string FetchUUIDFromFQNameEntry(const std::string &key) const;
232 
233  virtual int HashUUID(const std::string &uuid_str) const;
234  virtual bool SkipTimeStampCheckForTypeAndFQName() const { return true; }
235  virtual uint32_t GetFQNameEntriesToRead() const {
237  }
238  int num_workers() const { return num_workers_; }
240  virtual void PostShutdown();
241 
242  private:
244 
245  bool InitRetry();
246 
247  bool FQNameReader();
248  bool ParseFQNameRowGetUUIDList(const std::string &obj_type,
249  const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list,
250  std::string *last_column);
251 
252  void HandleCassandraConnectionStatus(bool success,
253  bool force_update = false);
254 
258  boost::scoped_ptr<TaskTrigger> fq_name_reader_;
259  std::atomic<long> bulk_sync_status_;
260 };
261 
262 #endif // config_cass_client_h
ConfigCassandraPartition * GetPartition(const std::string &uuid)
virtual bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
std::atomic< long > bulk_sync_status_
static const int kNumFQNameEntriesToRead
const ConfigCassandraPartition * GetPartition(const std::string &uuid) const
virtual bool SkipTimeStampCheckForTypeAndFQName() const
std::vector< ConfigCassandraPartition * > PartitionList
bool EnqueueDBSyncRequest(const ObjTypeUUIDList &uuid_list)
static const std::string kUuidTableName
virtual bool UUIDToObjCacheShow(const std::string &search_string, int inst_num, const std::string &last_uuid, uint32_t num_entries, std::vector< ConfigDBUUIDCacheEntry > *entries) const
virtual int HashUUID(const std::string &uuid_str) const
std::list< ObjTypeUUIDType > ObjTypeUUIDList
boost::scoped_ptr< TaskTrigger > fq_name_reader_
virtual std::string FetchUUIDFromFQNameEntry(const std::string &key) const
static const std::string kFqnTableName
static const std::string kObjectProcessTaskId
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
void UpdateFQNameCache(const std::string &key, const std::string &obj_type, ObjTypeUUIDList &uuid_list)
void HandleCassandraConnectionStatus(bool success, bool force_update=false)
virtual bool IsTaskTriggered() const
bool ParseFQNameRowGetUUIDList(const std::string &obj_type, const GenDb::ColList &col_list, ObjTypeUUIDList &uuid_list, std::string *last_column)
std::pair< std::string, std::string > ObjTypeUUIDType
static const std::string kCassClientTaskId
static const int kMaxNumUUIDToRead
virtual uint32_t GetFQNameEntriesToRead() const
ConfigCassandraClient(ConfigClientManager *mgr, EventManager *evm, const ConfigClientOptions &options, int num_workers)
boost::scoped_ptr< GenDb::GenDbIf > GenDbIfPtr
const FieldDetailMap & GetFieldDetailMap() const
void EnableCassandraReadRetry(const std::string uuid)
bool CassReadRetryTimerExpired(const std::string uuid)
ObjCacheEntry(ConfigCassandraPartition *parent, uint64_t last_read_tstamp)
void DisableCassandraReadRetry(const std::string uuid)
void RemoveObjReqEntry(std::string &uuid)
static const uint32_t kMinUUIDRetryTimeMSec
void ParseObjUUIDTableEachColumnBuildContext(const std::string &uuid, const std::string &key, const std::string &value, uint64_t timestamp, CassColumnKVVec *cass_data_vec, ConfigCassandraParseContext &context)
ObjProcessWorkQType obj_process_queue_
void DeleteCacheMap(const std::string &uuid)
virtual bool ReadObjUUIDTable(const std::set< std::string > &uuid_list)
const ObjCacheEntry * GetObjCacheEntry(const std::string &uuid) const
void AddUUIDToRequestList(const std::string &oper, const std::string &obj_type, const std::string &uuid_str)
bool RequestHandler(ObjectProcessReq *req)
ConfigCassandraClient * config_client_
std::map< JsonAdapterDataType, FieldTimeStampInfo, cmp_json_key > FieldDetailMap
ConfigCassandraClient * client()
bool IsListOrMapPropEmpty(const string &uuid_key, const string &lookup_key)
boost::asio::io_context * ioservice()
virtual void GenerateAndPushJson(const string &uuid_key, const string &obj_type, const CassColumnKVVec &cass_data_vec, bool add_change)
bool UUIDToObjCacheShow(const std::string &search_string, const std::string &last_uuid, uint32_t num_entries, std::vector< ConfigDBUUIDCacheEntry > *entries) const
virtual int UUIDRetryTimeInMSec(const ObjCacheEntry *obj) const
virtual void HandleObjectDelete(const string &uuid, bool add_change)
boost::ptr_map< std::string, ObjCacheEntry > ObjectCacheMap
bool ProcessObjUUIDTableEntry(const std::string &uuid_key, const GenDb::ColList &col_list)
boost::shared_ptr< TaskTrigger > config_reader_
ObjCacheEntry * GetObjCacheEntry(const std::string &uuid)
boost::shared_ptr< WorkQueue< ObjectProcessReq * > > ObjProcessWorkQType
std::map< std::string, ObjectProcessRequestType * > UUIDProcessSet
ConfigCassandraPartition(ConfigCassandraClient *client, size_t idx)
void Enqueue(ObjectProcessReq *req)
ObjCacheEntry * MarkCacheDirty(const std::string &uuid)
static const uint32_t kMaxUUIDRetryTimePowOfTwo
void ListMapPropReviseUpdateList(const std::string &uuid, ConfigCassandraParseContext &context)
void RemoveObjReqEntries(std::set< std::string > &req_list)
bool StoreKeyIfUpdated(const std::string &uuid, JsonAdapterDataType *adapter, uint64_t timestamp, ConfigCassandraParseContext &context)
virtual void ParseObjUUIDTableEntry(const std::string &uuid, const GenDb::ColList &col_list, CassColumnKVVec *cass_data_vec, ConfigCassandraParseContext &context)
void FillUUIDToObjCacheInfo(const std::string &uuid, ObjectCacheMap::const_iterator uuid_iter, ConfigDBUUIDCacheEntry *entry) const
ObjProcessWorkQType obj_process_queue()
virtual std::string uuid_str(const std::string &uuid)
ConfigClientManager * mgr()
Definition: timer.h:57
static EventManager evm
std::vector< JsonAdapterDataType > CassColumnKVVec
ObjectProcessRequestType(const std::string &in_oper, const std::string &in_obj_type, const std::string &in_uuid)
bool operator()(const JsonAdapterDataType &k1, const JsonAdapterDataType &k2) const
boost::uuids::uuid uuid