OpenSDN source code
config_client_manager.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3  */
5 
6 #include <boost/assign/list_of.hpp>
7 #include <sandesh/request_pipeline.h>
8 #include <sstream>
9 #include <string>
10 
11 #include "base/connection_info.h"
12 #include "base/task.h"
13 #include "base/task_trigger.h"
14 #include "config_amqp_client.h"
15 #include "config_db_client.h"
17 #include "config_client_log.h"
18 #include "config_client_log_types.h"
19 #include "config_client_show_types.h"
20 #include "config_factory.h"
21 #include "io/event_manager.h"
22 #include "schema/bgp_schema_types.h"
23 #include "schema/vnc_cfg_types.h"
24 
25 using namespace boost::assign;
26 using namespace std;
27 
28 const set<string> ConfigClientManager::skip_properties =
29  list_of("perms2")("draft_mode_state");
31 
33  static bool init_ = false;
34  static int num_config_readers = 0;
35 
36  if (!init_) {
37  // XXX To be used for testing purposes only.
38  char *count_str = getenv("CONFIG_NUM_WORKERS");
39  if (count_str) {
40  num_config_readers = strtol(count_str, NULL, 0);
41  } else {
42  num_config_readers = kNumConfigReaderTasks;
43  }
44  init_ = true;
45  }
46  return num_config_readers;
47 }
48 
50  static bool config_policy_set;
51  if (config_policy_set)
52  return;
53  config_policy_set = true;
54 
56  // Policy for config_client::Reader Task.
57  TaskPolicy cassadra_reader_policy = boost::assign::list_of
58  (TaskExclusion(scheduler->GetTaskId("config_client::Init")))
59  (TaskExclusion(scheduler->GetTaskId("config_client::DBReader")));
60  for (int idx = 0; idx < ConfigClientManager::GetNumConfigReader(); ++idx) {
61  cassadra_reader_policy.push_back(
62  TaskExclusion(scheduler->GetTaskId("config_client::ObjectProcessor"), idx));
63  }
64  scheduler->SetPolicy(scheduler->GetTaskId("config_client::Reader"),
65  cassadra_reader_policy);
66 
67  // Policy for config_client::ObjectProcessor Task.
68  TaskPolicy cassadra_obj_process_policy = boost::assign::list_of
69  (TaskExclusion(scheduler->GetTaskId("config_client::Init")));
70  for (int idx = 0; idx < ConfigClientManager::GetNumConfigReader(); ++idx) {
71  cassadra_obj_process_policy.push_back(
72  TaskExclusion(scheduler->GetTaskId("config_client::Reader"), idx));
73  }
74  scheduler->SetPolicy(scheduler->GetTaskId("config_client::ObjectProcessor"),
75  cassadra_obj_process_policy);
76 
77  // Policy for config_client::DBReader Task.
78  TaskPolicy fq_name_reader_policy = boost::assign::list_of
79  (TaskExclusion(scheduler->GetTaskId("config_client::Init")))
80  (TaskExclusion(scheduler->GetTaskId("config_client::Reader")));
81  scheduler->SetPolicy(scheduler->GetTaskId("config_client::DBReader"),
82  fq_name_reader_policy);
83 
84  // Policy for config_client::Init process
85  TaskPolicy cassandra_init_policy = boost::assign::list_of
86  (TaskExclusion(scheduler->GetTaskId("amqp::RabbitMQReader")))
87  (TaskExclusion(scheduler->GetTaskId("config_client::ObjectProcessor")))
88  (TaskExclusion(scheduler->GetTaskId("config_client::DBReader")))
89  (TaskExclusion(scheduler->GetTaskId("config_client::Reader")));
90  scheduler->SetPolicy(scheduler->GetTaskId("config_client::Init"),
91  cassandra_init_policy);
92 
93  // Policy for amqp::RabbitMQReader process
94  TaskPolicy rabbitmq_reader_policy = boost::assign::list_of
95  (TaskExclusion(scheduler->GetTaskId("config_client::Init")));
96  scheduler->SetPolicy(scheduler->GetTaskId("amqp::RabbitMQReader"),
97  rabbitmq_reader_policy);
98 
99  // Policy for etcd::EtcdWatcher process
100  TaskPolicy etcd_watcher_policy = boost::assign::list_of
101  (TaskExclusion(scheduler->GetTaskId("config_client::Init")));
102  scheduler->SetPolicy(scheduler->GetTaskId("etcd::EtcdWatcher"),
103  etcd_watcher_policy);
104 
105 }
106 
108  config_json_parser_.reset(cfg_json_base);
109  config_json_parser_->Init(this);
110  thread_count_ = GetNumConfigReader();
111  end_of_rib_computed_at_ = UTCTimestampUsec();
112  if (!config_options_.config_db_use_etcd) {
113  config_db_client_.reset(
114  ConfigStaticObjectFactory::CreateRef<ConfigCassandraClient>(
115  this,
116  evm_,
117  config_options_,
118  thread_count_));
119  config_amqp_client_.reset(new ConfigAmqpClient(this, hostname_,
120  module_name_, config_options_));
121  }
122  SetDefaultSchedulingPolicy();
123 
124  int task_id;
125  task_id = TaskScheduler::GetInstance()->GetTaskId("config_client::Init");
126 
127  init_trigger_.reset(new
129  task_id, 0));
130 
131  reinit_triggered_ = false;
132 }
133 
135  ConfigJsonParserBase *cfg_json_base,
136  std::string hostname,
137  std::string module_name,
138  const ConfigClientOptions& config_options)
139  : evm_(evm),
140  generation_number_(0),
141  hostname_(hostname), module_name_(module_name),
142  config_options_(config_options) {
143  end_of_rib_computed_ = false;
144  SetUp(cfg_json_base);
145 }
146 
148 }
149 
151  if (init_trigger_.get() != nullptr) {
152  init_trigger_->Set();
153  }
154 }
155 
157  return config_db_client_.get();
158 }
159 
161  return config_amqp_client_.get();
162 }
163 
165  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
166  return end_of_rib_computed_;
167 }
168 
170  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
172 }
173 
174 void ConfigClientManager::EnqueueUUIDRequest(string oper, string obj_type,
175  string uuid_str) {
176  config_db_client_->EnqueueUUIDRequest(oper, obj_type, uuid_str);
177 }
178 
180  {
181  // Notify waiting caller with the result
182  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
183  assert(!end_of_rib_computed_);
184  end_of_rib_computed_ = true;
185  cond_var_.notify_all();
187  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
188  "Config Client Mgr SM: End of RIB computed and notification sent");
189  }
190 
191  // Once we have finished reading the complete cassandra DB, we should verify
192  // whether all DBEntries(node/link) are as per the new generation number.
193  // The stale entry cleanup task ensure this.
194  // There is no need to run stale clean up during first time startup
195  if (GetGenerationNumber())
197 
199 }
200 
201 // This function waits forever for bulk sync of cassandra config to finish
202 // The condition variable is triggered even in case of "reinit". In such a case
203 // wait is terminated and function returns.
204 // AMQP reader task starts consuming messages only after bulk sync.
205 // During reinit, the tight loop is broken by triggering the condition variable
207  tbb::interface5::unique_lock<tbb::mutex> lock(end_of_rib_sync_mutex_);
208  // Wait for End of config
209  while (!end_of_rib_computed_) {
210  cond_var_.wait(lock);
211  if (is_reinit_triggered()) break;
212  }
213  string message;
214  message = "Config Client Mgr SM: End of RIB notification received, "
215  "re init triggered" + is_reinit_triggered()?"TRUE":"FALSE";
216  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug, message);
217  return;
218 }
219 
221  ConfigClientManagerInfo &info) const {
222  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
223  info.end_of_rib_computed = end_of_rib_computed_;
224  info.end_of_rib_computed_at = end_of_rib_computed_at_;
225  info.end_of_rib_computed_at = UTCUsecToString(end_of_rib_computed_at_);
226 }
227 
229  config_db_client_->PostShutdown();
230  reinit_triggered_ = false;
231  end_of_rib_computed_ = false;
232 
233  // All set to read next version of the config. Increment the generation
235 
236  // scoped ptr reset deletes the previous config db object
237  // Create new config db client and amqp client
238  // Delete of config db client object guarantees the flusing of
239  // object uuid cache and uuid read request list.
242  CreateRef<ConfigCassandraClient>(
243  this,
244  evm_,
246  thread_count_));
249  }
250  stringstream ss;
251  ss << GetGenerationNumber();
252  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
253  "Config Client Mgr SM: Post shutdown, next version of config: "
254  + ss.str());
255 }
256 
258  if (is_reinit_triggered()) {
259  // "config_client::Init" task is mutually exclusive to
260  // 1. FQName reader task
261  // 2. Object UUID Table reader task
262  // 3. AMQP reader task
263  // 4. Object processing Work queue task
264  // Due to this task policy, if the reinit task is running, it ensured
265  // that above mutually exclusive tasks have finished/aborted
266  // Perform PostShutdown to prepare for new connection
267  // However, it is possible that these tasks have been scheduled
268  // but yet to begin execution. For Task and WorkQueue events, their
269  // destructor takes core of this but for TaskTrigger events, the
270  // destructor will crash (See Bug #1786154) as it expects the task
271  // to not be scheduled. Taking care of that case here by checking
272  // if TaskTrigger events are scheduled (but not executing) and
273  // return if they are so that this will be retried.
274  if (config_db_client_->IsTaskTriggered()) {
275  return false;
276  }
277  PostShutdown();
278  }
279  // Common code path for both init/reinit
281  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
282  "Config Client Mgr SM: Start RabbitMqReader and init Database");
283  config_amqp_client_->StartRabbitMQReader();
284  }
285  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
286  "Config Client Mgr SM: Init Database");
287  config_db_client_->InitDatabase();
288  if (is_reinit_triggered()) return false;
289  return true;
290 }
291 
293  const ConfigClientOptions &config) {
294  config_options_ = config;
296 }
297 
299  {
300  // Wake up the amqp task waiting for EOR for config reading
301  tbb::mutex::scoped_lock lock(end_of_rib_sync_mutex_);
302  cond_var_.notify_all();
303  }
304  reinit_triggered_ = true;
305  if (init_trigger_.get() != nullptr) {
306  init_trigger_->Set();
307  }
308  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
309  "Config Client Mgr SM: Re init triggered!");
310 }
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
static const std::set< std::string > skip_properties
ConfigDbClient * config_db_client() const
ConfigClientOptions config_options_
boost::scoped_ptr< TaskTrigger > init_trigger_
uint64_t GetGenerationNumber() const
tbb::interface5::condition_variable cond_var_
boost::scoped_ptr< ConfigAmqpClient > config_amqp_client_
uint64_t GetEndOfRibComputedAt() const
ConfigJsonParserBase * config_json_parser()
tbb::atomic< bool > reinit_triggered_
uint64_t IncrementGenerationNumber()
ConfigAmqpClient * config_amqp_client() const
void GetClientManagerInfo(ConfigClientManagerInfo &info) const
ConfigClientManager(EventManager *evm, ConfigJsonParserBase *cfg_json_base, std::string hostname, std::string module_name, const ConfigClientOptions &config_options)
boost::scoped_ptr< ConfigDbClient > config_db_client_
void SetUp(ConfigJsonParserBase *)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
int GetTaskId(const std::string &name)
Definition: task.cc:856
void SetPolicy(int task_id, TaskPolicy &policy)
Sets the task exclusion policy. Adds policy entries for the task Examples:
Definition: task.cc:610
static TaskScheduler * GetInstance()
Definition: task.cc:547
static ConnectionState * GetInstance()
#define CONFIG_CLIENT_DEBUG(obj,...)
static EventManager evm
std::vector< TaskExclusion > TaskPolicy
Definition: task.h:59
static std::string UTCUsecToString(uint64_t tstamp)
Definition: time_util.h:54
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13