OpenSDN source code
config_etcd_client.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef config_etcd_client_h
6 #define config_etcd_client_h
7 
8 #include <boost/ptr_container/ptr_map.hpp>
9 #include <boost/shared_ptr.hpp>
10 
11 #include "database/etcd/eql_if.h"
12 
13 #include <atomic>
14 #include <list>
15 #include <map>
16 #include <set>
17 #include <string>
18 #include <utility>
19 #include <vector>
20 
21 #include "base/queue_task.h"
22 #include "base/timer.h"
23 
24 #include "config_db_client.h"
26 #include "json_adapter_data.h"
27 
28 using namespace std;
31 using contrail_rapidjson::Document;
32 using contrail_rapidjson::Value;
33 
34 class EventManager;
36 struct ConfigDBConnInfo;
37 class TaskTrigger;
38 class ConfigEtcdClient;
39 class ConfigDBUUIDCacheEntry;
40 
42  public:
43  ConfigEtcdPartition(ConfigEtcdClient *client, size_t idx);
44  virtual ~ConfigEtcdPartition();
45 
46  typedef boost::shared_ptr<WorkQueue<ObjectProcessReq *> >
48 
50  public:
52  const string &value_str,
53  uint64_t last_read_tstamp)
54  : ObjectCacheEntry(last_read_tstamp),
55  retry_count_(0),
56  retry_timer_(NULL),
57  json_str_(value_str),
58  parent_(parent) {
59  }
60 
61  ~UUIDCacheEntry();
62 
63  void EnableEtcdReadRetry(const string uuid,
64  const string value);
65  void DisableEtcdReadRetry(const string uuid);
66 
67  const string &GetJsonString() const { return json_str_; }
68  void SetJsonString(const string &value_str) {
69  json_str_ = value_str;
70  }
71 
72  void SetListOrMapPropEmpty(const string &prop, bool empty) {
73  list_map_set_.insert(make_pair(prop.c_str(), empty));
74  }
75  bool ListOrMapPropEmpty(const string &prop) const;
76 
77  uint32_t GetRetryCount() const {
78  return retry_count_;
79  }
80  bool IsRetryTimerCreated() const {
81  return (retry_timer_ != NULL);
82  }
83  bool IsRetryTimerRunning() const;
84  Timer *GetRetryTimer() { return retry_timer_; }
85 
86  private:
87  friend class ConfigEtcdPartitionTest;
88  bool EtcdReadRetryTimerExpired(const string uuid,
89  const string value);
90  void EtcdReadRetryTimerErrorHandler();
91  typedef map<string, bool> ListMapSet;
93  uint32_t retry_count_;
95  string json_str_;
97  };
98 
99  static const uint32_t kMaxUUIDRetryTimePowOfTwo = 20;
100  static const uint32_t kMinUUIDRetryTimeMSec = 100;
101 
102  typedef boost::ptr_map<string, UUIDCacheEntry> UUIDCacheMap;
103 
104  UUIDCacheEntry *GetUUIDCacheEntry(const string &uuid);
105  const UUIDCacheEntry *GetUUIDCacheEntry(const string &uuid) const;
106  UUIDCacheEntry *GetUUIDCacheEntry(const string &uuid,
107  const string &value_str,
108  bool &is_new);
109  const UUIDCacheEntry *GetUUIDCacheEntry(const string &uuid,
110  const string &value_str,
111  bool &is_new) const;
112  void DeleteCacheMap(const string &uuid) {
113  uuid_cache_map_.erase(uuid);
114  }
115  virtual int UUIDRetryTimeInMSec(const UUIDCacheEntry *obj) const;
116 
117  void FillUUIDToObjCacheInfo(const string &uuid,
118  UUIDCacheMap::const_iterator uuid_iter,
119  ConfigDBUUIDCacheEntry *entry) const;
120  bool UUIDToObjCacheShow(
121  const string &search_string, const string &last_uuid,
122  uint32_t num_entries,
123  vector<ConfigDBUUIDCacheEntry> *entries) const;
124 
125  int GetInstanceId() const { return worker_id_; }
126 
128  return obj_process_queue_;
129  }
130 
131  void Enqueue(ObjectProcessReq *req);
132  bool IsListOrMapPropEmpty(const string &uuid_key,
133  const string &lookup_key);
134  virtual bool IsTaskTriggered() const;
135 
136 protected:
138  return config_client_;
139  }
140 
141 private:
142  friend class ConfigEtcdClient;
143 
145  UUIDProcessRequestType(const string &in_oper,
146  const string &in_uuid,
147  const string &in_value)
148  : oper(in_oper), uuid(in_uuid), value(in_value) {
149  }
150  string oper;
151  string uuid;
152  string value;
153  };
154 
155  typedef map<string, UUIDProcessRequestType *> UUIDProcessSet;
156 
157  bool RequestHandler(ObjectProcessReq *req);
158  void AddUUIDToProcessList(const string &oper,
159  const string &uuid_key,
160  const string &value_str);
161  bool ConfigReader();
162  void ProcessUUIDUpdate(const string &uuid_key,
163  const string &value_str);
164  void ProcessUUIDDelete(const string &uuid_key);
165  virtual bool GenerateAndPushJson(
166  const string &uuid_key,
167  Document &doc,
168  bool add_change,
169  UUIDCacheEntry *cache);
170  void RemoveObjReqEntry(string &uuid);
171 
175  boost::shared_ptr<TaskTrigger> config_reader_;
178 };
179 
180 /*
181  * This class has the functionality to interact with the cassandra servers that
182  * store the user configuration.
183  */
185  public:
186  typedef vector<ConfigEtcdPartition *> PartitionList;
187 
190  int num_workers);
191  virtual ~ConfigEtcdClient();
192 
193  virtual void InitDatabase();
194  void BulkSyncDone();
195  void EnqueueUUIDRequest(string oper, string obj_type,
196  string uuid_str);
197 
198  ConfigEtcdPartition *GetPartition(const string &uuid);
199  const ConfigEtcdPartition *GetPartition(const string &uuid) const;
200  const ConfigEtcdPartition *GetPartition(int worker_id) const;
201 
202  // Start ETCD watch for config updates
203  void StartWatcher();
204 
205  // UUID Cache
206  virtual bool UUIDToObjCacheShow(
207  const string &search_string, int inst_num,
208  const string &last_uuid, uint32_t num_entries,
209  vector<ConfigDBUUIDCacheEntry> *entries) const;
210 
211  virtual bool IsListOrMapPropEmpty(const string &uuid_key,
212  const string &lookup_key);
213 
214  bool IsTaskTriggered() const;
215  virtual void ProcessResponse(EtcdResponse resp);
216 
217  // For testing
218  static void set_watch_disable(bool disable) {
219  disable_watch_ = disable;
220  }
221 protected:
222  typedef pair<string, string> UUIDValueType;
223  typedef list<UUIDValueType> UUIDValueList;
224 
225  virtual bool BulkDataSync();
226  void EnqueueDBSyncRequest(const UUIDValueList &uuid_list);
227 
228  virtual int HashUUID(const std::string &uuid_str) const;
229  int num_workers() const { return num_workers_; }
230  PartitionList &partitions() { return partitions_; }
231  virtual void PostShutdown();
232 
233 private:
234  friend class ConfigEtcdPartition;
235 
236  // A Job for watching changes to config stored in etcd
237  class EtcdWatcher;
238 
239  bool InitRetry();
240  bool UUIDReader();
241  void HandleEtcdConnectionStatus(bool success,
242  bool force_update = false);
243 
244  // For testing
245  static bool disable_watch_;
246 
247  boost::scoped_ptr<EtcdIf> eqlif_;
250  boost::scoped_ptr<TaskTrigger> uuid_reader_;
251  std::atomic<long> bulk_sync_status_;
252 };
253 
254 #endif // config_etcd_client_h
PartitionList partitions_
pair< string, string > UUIDValueType
vector< ConfigEtcdPartition * > PartitionList
std::atomic< long > bulk_sync_status_
static void set_watch_disable(bool disable)
boost::scoped_ptr< EtcdIf > eqlif_
static bool disable_watch_
PartitionList & partitions()
list< UUIDValueType > UUIDValueList
boost::scoped_ptr< TaskTrigger > uuid_reader_
UUIDCacheEntry(ConfigEtcdPartition *parent, const string &value_str, uint64_t last_read_tstamp)
void SetJsonString(const string &value_str)
void SetListOrMapPropEmpty(const string &prop, bool empty)
boost::ptr_map< string, UUIDCacheEntry > UUIDCacheMap
void DeleteCacheMap(const string &uuid)
boost::shared_ptr< TaskTrigger > config_reader_
ConfigEtcdClient * config_client_
boost::shared_ptr< WorkQueue< ObjectProcessReq * > > UUIDProcessWorkQType
const UUIDCacheEntry * GetUUIDCacheEntry(const string &uuid, const string &value_str, bool &is_new) const
UUIDProcessWorkQType obj_process_queue_
ConfigEtcdClient * client()
UUIDProcessWorkQType obj_process_queue()
UUIDCacheMap uuid_cache_map_
map< string, UUIDProcessRequestType * > UUIDProcessSet
const UUIDCacheEntry * GetUUIDCacheEntry(const string &uuid) const
UUIDProcessSet uuid_process_set_
Definition: timer.h:57
static EventManager evm
UUIDProcessRequestType(const string &in_oper, const string &in_uuid, const string &in_value)
boost::uuids::uuid uuid