OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
config_etcd_client.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef config_etcd_client_h
6 #define config_etcd_client_h
7 
8 #include <boost/ptr_container/ptr_map.hpp>
9 #include <boost/shared_ptr.hpp>
10 
11 #include "database/etcd/eql_if.h"
12 
13 #include <list>
14 #include <map>
15 #include <set>
16 #include <string>
17 #include <utility>
18 #include <vector>
19 
20 #include "base/queue_task.h"
21 #include "base/timer.h"
22 
23 #include "config_db_client.h"
25 #include "json_adapter_data.h"
26 
27 using namespace std;
30 using contrail_rapidjson::Document;
31 using contrail_rapidjson::Value;
32 
33 class EventManager;
35 struct ConfigDBConnInfo;
36 class TaskTrigger;
37 class ConfigEtcdClient;
38 class ConfigDBUUIDCacheEntry;
39 
41  public:
42  ConfigEtcdPartition(ConfigEtcdClient *client, size_t idx);
43  virtual ~ConfigEtcdPartition();
44 
45  typedef boost::shared_ptr<WorkQueue<ObjectProcessReq *> >
47 
49  public:
51  const string &value_str,
52  uint64_t last_read_tstamp)
53  : ObjectCacheEntry(last_read_tstamp),
54  retry_count_(0),
55  retry_timer_(NULL),
56  json_str_(value_str),
57  parent_(parent) {
58  }
59 
60  ~UUIDCacheEntry();
61 
62  void EnableEtcdReadRetry(const string uuid,
63  const string value);
64  void DisableEtcdReadRetry(const string uuid);
65 
66  const string &GetJsonString() const { return json_str_; }
67  void SetJsonString(const string &value_str) {
68  json_str_ = value_str;
69  }
70 
71  void SetListOrMapPropEmpty(const string &prop, bool empty) {
72  list_map_set_.insert(make_pair(prop.c_str(), empty));
73  }
74  bool ListOrMapPropEmpty(const string &prop) const;
75 
76  uint32_t GetRetryCount() const {
77  return retry_count_;
78  }
79  bool IsRetryTimerCreated() const {
80  return (retry_timer_ != NULL);
81  }
82  bool IsRetryTimerRunning() const;
83  Timer *GetRetryTimer() { return retry_timer_; }
84 
85  private:
86  friend class ConfigEtcdPartitionTest;
87  bool EtcdReadRetryTimerExpired(const string uuid,
88  const string value);
89  void EtcdReadRetryTimerErrorHandler();
90  typedef map<string, bool> ListMapSet;
92  uint32_t retry_count_;
94  string json_str_;
96  };
97 
98  static const uint32_t kMaxUUIDRetryTimePowOfTwo = 20;
99  static const uint32_t kMinUUIDRetryTimeMSec = 100;
100 
101  typedef boost::ptr_map<string, UUIDCacheEntry> UUIDCacheMap;
102 
103  UUIDCacheEntry *GetUUIDCacheEntry(const string &uuid);
104  const UUIDCacheEntry *GetUUIDCacheEntry(const string &uuid) const;
105  UUIDCacheEntry *GetUUIDCacheEntry(const string &uuid,
106  const string &value_str,
107  bool &is_new);
108  const UUIDCacheEntry *GetUUIDCacheEntry(const string &uuid,
109  const string &value_str,
110  bool &is_new) const;
111  void DeleteCacheMap(const string &uuid) {
112  uuid_cache_map_.erase(uuid);
113  }
114  virtual int UUIDRetryTimeInMSec(const UUIDCacheEntry *obj) const;
115 
116  void FillUUIDToObjCacheInfo(const string &uuid,
117  UUIDCacheMap::const_iterator uuid_iter,
118  ConfigDBUUIDCacheEntry *entry) const;
119  bool UUIDToObjCacheShow(
120  const string &search_string, const string &last_uuid,
121  uint32_t num_entries,
122  vector<ConfigDBUUIDCacheEntry> *entries) const;
123 
124  int GetInstanceId() const { return worker_id_; }
125 
127  return obj_process_queue_;
128  }
129 
130  void Enqueue(ObjectProcessReq *req);
131  bool IsListOrMapPropEmpty(const string &uuid_key,
132  const string &lookup_key);
133  virtual bool IsTaskTriggered() const;
134 
135 protected:
137  return config_client_;
138  }
139 
140 private:
141  friend class ConfigEtcdClient;
142 
144  UUIDProcessRequestType(const string &in_oper,
145  const string &in_uuid,
146  const string &in_value)
147  : oper(in_oper), uuid(in_uuid), value(in_value) {
148  }
149  string oper;
150  string uuid;
151  string value;
152  };
153 
154  typedef map<string, UUIDProcessRequestType *> UUIDProcessSet;
155 
156  bool RequestHandler(ObjectProcessReq *req);
157  void AddUUIDToProcessList(const string &oper,
158  const string &uuid_key,
159  const string &value_str);
160  bool ConfigReader();
161  void ProcessUUIDUpdate(const string &uuid_key,
162  const string &value_str);
163  void ProcessUUIDDelete(const string &uuid_key);
164  virtual bool GenerateAndPushJson(
165  const string &uuid_key,
166  Document &doc,
167  bool add_change,
168  UUIDCacheEntry *cache);
169  void RemoveObjReqEntry(string &uuid);
170 
174  boost::shared_ptr<TaskTrigger> config_reader_;
177 };
178 
179 /*
180  * This class has the functionality to interact with the cassandra servers that
181  * store the user configuration.
182  */
184  public:
185  typedef vector<ConfigEtcdPartition *> PartitionList;
186 
189  int num_workers);
190  virtual ~ConfigEtcdClient();
191 
192  virtual void InitDatabase();
193  void BulkSyncDone();
194  void EnqueueUUIDRequest(string oper, string obj_type,
195  string uuid_str);
196 
197  ConfigEtcdPartition *GetPartition(const string &uuid);
198  const ConfigEtcdPartition *GetPartition(const string &uuid) const;
199  const ConfigEtcdPartition *GetPartition(int worker_id) const;
200 
201  // Start ETCD watch for config updates
202  void StartWatcher();
203 
204  // UUID Cache
205  virtual bool UUIDToObjCacheShow(
206  const string &search_string, int inst_num,
207  const string &last_uuid, uint32_t num_entries,
208  vector<ConfigDBUUIDCacheEntry> *entries) const;
209 
210  virtual bool IsListOrMapPropEmpty(const string &uuid_key,
211  const string &lookup_key);
212 
213  bool IsTaskTriggered() const;
214  virtual void ProcessResponse(EtcdResponse resp);
215 
216  // For testing
217  static void set_watch_disable(bool disable) {
218  disable_watch_ = disable;
219  }
220 protected:
221  typedef pair<string, string> UUIDValueType;
222  typedef list<UUIDValueType> UUIDValueList;
223 
224  virtual bool BulkDataSync();
225  void EnqueueDBSyncRequest(const UUIDValueList &uuid_list);
226 
227  virtual int HashUUID(const std::string &uuid_str) const;
228  int num_workers() const { return num_workers_; }
229  PartitionList &partitions() { return partitions_; }
230  virtual void PostShutdown();
231 
232 private:
233  friend class ConfigEtcdPartition;
234 
235  // A Job for watching changes to config stored in etcd
236  class EtcdWatcher;
237 
238  bool InitRetry();
239  bool UUIDReader();
240  void HandleEtcdConnectionStatus(bool success,
241  bool force_update = false);
242 
243  // For testing
244  static bool disable_watch_;
245 
246  boost::scoped_ptr<EtcdIf> eqlif_;
249  boost::scoped_ptr<TaskTrigger> uuid_reader_;
250  tbb::atomic<long> bulk_sync_status_;
251 };
252 
253 #endif // config_etcd_client_h
UUIDProcessWorkQType obj_process_queue()
UUIDProcessWorkQType obj_process_queue_
pair< string, string > UUIDValueType
PartitionList & partitions()
boost::scoped_ptr< TaskTrigger > uuid_reader_
boost::shared_ptr< WorkQueue< ObjectProcessReq * > > UUIDProcessWorkQType
const string & GetJsonString() const
UUIDCacheMap uuid_cache_map_
boost::uuids::uuid uuid
vector< ConfigEtcdPartition * > PartitionList
int num_workers() const
UUIDProcessRequestType(const string &in_oper, const string &in_uuid, const string &in_value)
tbb::atomic< long > bulk_sync_status_
void SetJsonString(const string &value_str)
static Options options
boost::scoped_ptr< EtcdIf > eqlif_
UUIDProcessSet uuid_process_set_
static bool disable_watch_
PartitionList partitions_
boost::shared_ptr< TaskTrigger > config_reader_
map< string, UUIDProcessRequestType * > UUIDProcessSet
ConfigEtcdClient * client()
ConfigEtcdClient * config_client_
UUIDCacheEntry(ConfigEtcdPartition *parent, const string &value_str, uint64_t last_read_tstamp)
list< UUIDValueType > UUIDValueList
static void set_watch_disable(bool disable)
void DeleteCacheMap(const string &uuid)
Definition: timer.h:54
void SetListOrMapPropEmpty(const string &prop, bool empty)
boost::ptr_map< string, UUIDCacheEntry > UUIDCacheMap
static EventManager evm