OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
config_client_manager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
5 
6 #include <boost/assign/list_of.hpp>
7 #include <sandesh/request_pipeline.h>
8 #include <sstream>
9 #include <string>
10 
11 #include "base/connection_info.h"
12 #include "base/task.h"
13 #include "base/task_trigger.h"
14 #include "config_amqp_client.h"
15 #include "config_db_client.h"
17 #ifdef CONTRAIL_ETCD_INCL
18 #include "config_etcd_client.h"
19 #endif
20 #include "config_client_log.h"
21 #include "config_client_log_types.h"
22 #include "config_client_show_types.h"
23 #include "config_factory.h"
24 #include "io/event_manager.h"
25 #include "schema/bgp_schema_types.h"
26 #include "schema/vnc_cfg_types.h"
27 
28 using namespace boost::assign;
29 using namespace std;
30 
31 const set<string> ConfigClientManager::skip_properties =
32  list_of("perms2")("draft_mode_state");
34 
36  static bool init_ = false;
37  static int num_config_readers = 0;
38 
39  if (!init_) {
40  // XXX To be used for testing purposes only.
41  char *count_str = getenv("CONFIG_NUM_WORKERS");
42  if (count_str) {
43  num_config_readers = strtol(count_str, NULL, 0);
44  } else {
45  num_config_readers = kNumConfigReaderTasks;
46  }
47  init_ = true;
48  }
49  return num_config_readers;
50 }
51 
53  static bool config_policy_set;
54  if (config_policy_set)
55  return;
56  config_policy_set = true;
57 
59  // Policy for config_client::Reader Task.
60  TaskPolicy cassadra_reader_policy = boost::assign::list_of
61  (TaskExclusion(scheduler->GetTaskId("config_client::Init")))
62  (TaskExclusion(scheduler->GetTaskId("config_client::DBReader")));
63  for (int idx = 0; idx < ConfigClientManager::GetNumConfigReader(); ++idx) {
64  cassadra_reader_policy.push_back(
65  TaskExclusion(scheduler->GetTaskId("config_client::ObjectProcessor"), idx));
66  }
67  scheduler->SetPolicy(scheduler->GetTaskId("config_client::Reader"),
68  cassadra_reader_policy);
69 
70  // Policy for config_client::ObjectProcessor Task.
71  TaskPolicy cassadra_obj_process_policy = boost::assign::list_of
72  (TaskExclusion(scheduler->GetTaskId("config_client::Init")));
73  for (int idx = 0; idx < ConfigClientManager::GetNumConfigReader(); ++idx) {
74  cassadra_obj_process_policy.push_back(
75  TaskExclusion(scheduler->GetTaskId("config_client::Reader"), idx));
76  }
77  scheduler->SetPolicy(scheduler->GetTaskId("config_client::ObjectProcessor"),
78  cassadra_obj_process_policy);
79 
80  // Policy for config_client::DBReader Task.
81  TaskPolicy fq_name_reader_policy = boost::assign::list_of
82  (TaskExclusion(scheduler->GetTaskId("config_client::Init")))
83  (TaskExclusion(scheduler->GetTaskId("config_client::Reader")));
84  scheduler->SetPolicy(scheduler->GetTaskId("config_client::DBReader"),
85  fq_name_reader_policy);
86 
87  // Policy for config_client::Init process
88  TaskPolicy cassandra_init_policy = boost::assign::list_of
89  (TaskExclusion(scheduler->GetTaskId("amqp::RabbitMQReader")))
90  (TaskExclusion(scheduler->GetTaskId("config_client::ObjectProcessor")))
91  (TaskExclusion(scheduler->GetTaskId("config_client::DBReader")))
92  (TaskExclusion(scheduler->GetTaskId("config_client::Reader")));
93  scheduler->SetPolicy(scheduler->GetTaskId("config_client::Init"),
94  cassandra_init_policy);
95 
96  // Policy for amqp::RabbitMQReader process
97  TaskPolicy rabbitmq_reader_policy = boost::assign::list_of
98  (TaskExclusion(scheduler->GetTaskId("config_client::Init")));
99  scheduler->SetPolicy(scheduler->GetTaskId("amqp::RabbitMQReader"),
100  rabbitmq_reader_policy);
101 
102  // Policy for etcd::EtcdWatcher process
103  TaskPolicy etcd_watcher_policy = boost::assign::list_of
104  (TaskExclusion(scheduler->GetTaskId("config_client::Init")));
105  scheduler->SetPolicy(scheduler->GetTaskId("etcd::EtcdWatcher"),
106  etcd_watcher_policy);
107 
108 }
109 
111  config_json_parser_.reset(cfg_json_base);
112  config_json_parser_->Init(this);
113  thread_count_ = GetNumConfigReader();
114  end_of_rib_computed_at_ = UTCTimestampUsec();
115  if (config_options_.config_db_use_etcd) {
116 #ifdef CONTRAIL_ETCD_INCL
117  config_db_client_.reset(ConfigStaticObjectFactory::Create<ConfigEtcdClient>
118  (this, evm_, config_options_,
119  thread_count_));
120 #endif
121  } else {
122  config_db_client_.reset(
123  ConfigStaticObjectFactory::CreateRef<ConfigCassandraClient>(
124  this,
125  evm_,
126  config_options_,
127  thread_count_));
128  config_amqp_client_.reset(new ConfigAmqpClient(this, hostname_,
129  module_name_, config_options_));
130  }
131  SetDefaultSchedulingPolicy();
132 
133  int task_id;
134  task_id = TaskScheduler::GetInstance()->GetTaskId("config_client::Init");
135 
136  init_trigger_.reset(new
138  task_id, 0));
139 
140  reinit_triggered_ = false;
141 }
142 
144  ConfigJsonParserBase *cfg_json_base,
145  std::string hostname,
146  std::string module_name,
147  const ConfigClientOptions& config_options)
148  : evm_(evm),
149  generation_number_(0),
150  hostname_(hostname), module_name_(module_name),
151  config_options_(config_options) {
152  end_of_rib_computed_ = false;
153  SetUp(cfg_json_base);
154 }
155 
157 }
158 
160  if (init_trigger_.get() != nullptr) {
161  init_trigger_->Set();
162  }
163 }
164 
166  return config_db_client_.get();
167 }
168 
170  return config_amqp_client_.get();
171 }
172 
174  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
175  return end_of_rib_computed_;
176 }
177 
179  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
181 }
182 
183 void ConfigClientManager::EnqueueUUIDRequest(string oper, string obj_type,
184  string uuid_str) {
185  config_db_client_->EnqueueUUIDRequest(oper, obj_type, uuid_str);
186 }
187 
189  {
190  // Notify waiting caller with the result
191  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
192  assert(!end_of_rib_computed_);
193  end_of_rib_computed_ = true;
194  cond_var_.notify_all();
196  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
197  "Config Client Mgr SM: End of RIB computed and notification sent");
198  }
199 
200  // Once we have finished reading the complete cassandra DB, we should verify
201  // whether all DBEntries(node/link) are as per the new generation number.
202  // The stale entry cleanup task ensure this.
203  // There is no need to run stale clean up during first time startup
204  if (GetGenerationNumber())
206 
208 }
209 
210 // This function waits forever for bulk sync of cassandra config to finish
211 // The condition variable is triggered even in case of "reinit". In such a case
212 // wait is terminated and function returns.
213 // AMQP reader task starts consuming messages only after bulk sync.
214 // During reinit, the tight loop is broken by triggering the condition variable
216  tbb::interface5::unique_lock<tbb::mutex> lock(end_of_rib_sync_mutex_);
217  // Wait for End of config
218  while (!end_of_rib_computed_) {
219  cond_var_.wait(lock);
220  if (is_reinit_triggered()) break;
221  }
222  string message;
223  message = "Config Client Mgr SM: End of RIB notification received, "
224  "re init triggered" + is_reinit_triggered()?"TRUE":"FALSE";
225  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug, message);
226  return;
227 }
228 
230  ConfigClientManagerInfo &info) const {
231  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
232  info.end_of_rib_computed = end_of_rib_computed_;
233  info.end_of_rib_computed_at = end_of_rib_computed_at_;
234  info.end_of_rib_computed_at = UTCUsecToString(end_of_rib_computed_at_);
235 }
236 
238  config_db_client_->PostShutdown();
239  reinit_triggered_ = false;
240  end_of_rib_computed_ = false;
241 
242  // All set to read next version of the config. Increment the generation
244 
245  // scoped ptr reset deletes the previous config db object
246  // Create new config db client and amqp client
247  // Delete of config db client object guarantees the flusing of
248  // object uuid cache and uuid read request list.
250 #ifdef CONTRAIL_ETCD_INCL
251  config_db_client_.reset(ConfigStaticObjectFactory::Create<ConfigEtcdClient>
252  (this, evm_, config_options_,
253  thread_count_));
254 #endif
255  } else {
257  CreateRef<ConfigCassandraClient>(
258  this,
259  evm_,
261  thread_count_));
264  }
265  stringstream ss;
266  ss << GetGenerationNumber();
267  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
268  "Config Client Mgr SM: Post shutdown, next version of config: "
269  + ss.str());
270 }
271 
273  if (is_reinit_triggered()) {
274  // "config_client::Init" task is mutually exclusive to
275  // 1. FQName reader task
276  // 2. Object UUID Table reader task
277  // 3. AMQP reader task
278  // 4. Object processing Work queue task
279  // Due to this task policy, if the reinit task is running, it ensured
280  // that above mutually exclusive tasks have finished/aborted
281  // Perform PostShutdown to prepare for new connection
282  // However, it is possible that these tasks have been scheduled
283  // but yet to begin execution. For Task and WorkQueue events, their
284  // destructor takes core of this but for TaskTrigger events, the
285  // destructor will crash (See Bug #1786154) as it expects the task
286  // to not be scheduled. Taking care of that case here by checking
287  // if TaskTrigger events are scheduled (but not executing) and
288  // return if they are so that this will be retried.
289  if (config_db_client_->IsTaskTriggered()) {
290  return false;
291  }
292  PostShutdown();
293  }
294  // Common code path for both init/reinit
296 #ifdef CONTRAIL_ETCD_INCL
297  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
298  "Config Client Mgr SM: Start ETCD Watcher");
299  config_db_client_->StartWatcher();
300 #endif
301  } else {
302  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
303  "Config Client Mgr SM: Start RabbitMqReader and init Database");
304  config_amqp_client_->StartRabbitMQReader();
305  }
306  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
307  "Config Client Mgr SM: Init Database");
308  config_db_client_->InitDatabase();
309  if (is_reinit_triggered()) return false;
310  return true;
311 }
312 
314  const ConfigClientOptions &config) {
315  config_options_ = config;
317 }
318 
320  {
321  // Wake up the amqp task waiting for EOR for config reading
322  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
323  cond_var_.notify_all();
324  }
325  reinit_triggered_ = true;
326  if (init_trigger_.get() != nullptr) {
327  init_trigger_->Set();
328  }
329  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
330  "Config Client Mgr SM: Re init triggered!");
331 }
ConfigJsonParserBase * config_json_parser()
void SetUp(ConfigJsonParserBase *)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
uint64_t GetGenerationNumber() const
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
Definition: task.cc:610
ConfigDbClient * config_db_client() const
uint64_t GetEndOfRibComputedAt() const
ConfigAmqpClient * config_amqp_client() const
ConfigClientOptions config_options_
uint64_t IncrementGenerationNumber()
#define CONFIG_CLIENT_DEBUG(obj,...)
int GetTaskId(const std::string &name)
Definition: task.cc:856
static const std::set< std::string > skip_properties
tbb::interface5::condition_variable cond_var_
boost::scoped_ptr< ConfigDbClient > config_db_client_
boost::scoped_ptr< ConfigAmqpClient > config_amqp_client_
boost::scoped_ptr< TaskTrigger > init_trigger_
static TaskScheduler * GetInstance()
Definition: task.cc:547
std::vector< TaskExclusion > TaskPolicy
Definition: task.h:59
tbb::atomic< bool > reinit_triggered_
ConfigClientManager(EventManager *evm, ConfigJsonParserBase *cfg_json_base, std::string hostname, std::string module_name, const ConfigClientOptions &config_options)
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
void GetClientManagerInfo(ConfigClientManagerInfo &info) const
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
static ConnectionState * GetInstance()
static EventManager evm
static std::string UTCUsecToString(uint64_t tstamp)
Definition: time_util.h:54