OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
config_amqp_client.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #include "config_amqp_client.h"
6 
7 #include <boost/algorithm/string/find.hpp>
8 #include <boost/lexical_cast.hpp>
9 #include <stdio.h>
10 #include <string>
11 
12 #include <SimpleAmqpClient/SimpleAmqpClient.h>
13 #include "rapidjson/document.h"
14 
15 #include "base/connection_info.h"
16 #include "base/task.h"
17 #include "base/address_util.h"
18 #include "base/string_util.h"
19 #include "config_factory.h"
21 #include "config_client_log.h"
22 #include "config_client_log_types.h"
23 #include "config_client_manager.h"
24 #include "config_db_client.h"
25 #include "config_client_show_types.h"
26 
27 using namespace boost;
28 using namespace std;
29 using namespace contrail_rapidjson;
30 
32 
34 public:
36  Task(amqpclient->reader_task_id()), amqpclient_(amqpclient) {
37  channel_.reset(ConfigStaticObjectFactory::Create<ConfigAmqpChannel>());
38 
39  // Connect to rabbit-mq asap so that notification messages over
40  // rabbit mq are never missed (during bulk db sync which happens
41  // soon afterwards.
42  ConnectToRabbitMQ();
43  }
44 
45  virtual bool Run();
46  string Description() const { return "ConfigAmqpClient::RabbitMQReader"; }
47 
48 private:
50  boost::scoped_ptr<ConfigAmqpChannel> channel_;
51  string consumer_tag_;
52  void ConnectToRabbitMQ(bool queue_delete = true);
53  bool AckRabbitMessages(AmqpClient::Envelope::ptr_t &envelop);
54  bool ReceiveRabbitMessages(AmqpClient::Envelope::ptr_t &envelop);
55 };
56 
58  string module_name, const ConfigClientOptions &options) :
59  mgr_(mgr), hostname_(hostname), module_name_(module_name),
60  current_server_index_(0), terminate_(false),
61  rabbitmq_user_(options.rabbitmq_user),
62  rabbitmq_password_(options.rabbitmq_password),
63  rabbitmq_vhost_(options.rabbitmq_vhost),
64  rabbitmq_use_ssl_(options.rabbitmq_use_ssl),
65  rabbitmq_ssl_version_(options.rabbitmq_ssl_version),
66  rabbitmq_ssl_keyfile_(options.rabbitmq_ssl_keyfile),
67  rabbitmq_ssl_certfile_(options.rabbitmq_ssl_certfile),
68  rabbitmq_ssl_ca_certs_(options.rabbitmq_ssl_ca_certs) {
69 
70  connection_status_ = false;
72 
74  reader_task_id_ = scheduler->GetTaskId("amqp::RabbitMQReader");
75 
76  if (options.rabbitmq_server_list.empty())
77  return;
78 
79  for (vector<string>::const_iterator iter =
80  options.rabbitmq_server_list.begin();
81  iter != options.rabbitmq_server_list.end(); iter++) {
82  string server_info(*iter);
83  typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
84  boost::char_separator<char> sep(":");
85  tokenizer tokens(server_info, sep);
86  tokenizer::iterator tit = tokens.begin();
87  string ip(*tit);
88  rabbitmq_ips_.push_back(ip);
89  ++tit;
90  string port_str(*tit);
91  rabbitmq_ports_.push_back(port_str);
92  Endpoint curr_ep;
93  int port = 0;
94  stringToInteger(port_str, port);
95  boost::system::error_code ec;
96  curr_ep.address(AddressFromString(ip, &ec));
97  curr_ep.port(port);
98  endpoints_.push_back(curr_ep);
99  }
100 }
101 
103  if (disable_) {
105  ConfigClientMgrDebug,
106  "RabbitMQ SM: StartRabbitMQReader: RabbitMQ disabled");
107  return;
108  }
109 
110  // If reinit is triggerred, Don't start the rabbitmq reader
111  if (config_manager()->is_reinit_triggered()) {
113  ConfigClientMgrDebug,
114  "RabbitMQ SM: StartRabbitMQReader: re init triggered,"
115  " dont start RabbitMQ");
116  return;
117  }
119  Task *task = new RabbitMQReader(this);
120  scheduler->Enqueue(task);
121 }
122 
123 void ConfigAmqpClient::EnqueueUUIDRequest(string oper, string obj_type,
124  string uuid_str) {
125  if (mgr_->config_json_parser()->IsReadObjectType(obj_type)) {
126  mgr_->EnqueueUUIDRequest(oper, obj_type, uuid_str);
127  }
128 }
129 
130 string ConfigAmqpClient::FormAmqpUri(bool hide_auth_info) const {
131  const string user = hide_auth_info ? "********" : rabbitmq_user();
132  const string password = hide_auth_info ? "********" : rabbitmq_password();
133  string uri = string("amqp://" + user + ":" +
134  password + "@" + rabbitmq_ip() + ":" + rabbitmq_port());
135  if (!rabbitmq_vhost().empty()) {
136  if (rabbitmq_vhost().compare("/") != 0) {
137  uri += "/" + rabbitmq_vhost();
138  }
139  }
140  return uri;
141 }
142 
144  if (connected) {
145  // Update connection info
147  process::ConnectionType::DATABASE, "RabbitMQ",
148  process::ConnectionStatus::UP,
149  endpoints(), "RabbitMQ connection established");
150  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
151  "RabbitMQ SM: RabbitMQ connection established");
152  } else {
154  process::ConnectionType::DATABASE, "RabbitMQ",
155  process::ConnectionStatus::DOWN,
156  endpoints(), "RabbitMQ connection down");
157  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
158  "RabbitMQ SM: RabbitMQ connection down");
159  }
160 }
161 
164  amqpclient_->set_connected(false);
165  string message = "RabbitMQ SM: Connect to Rabbit MQ with queue_delete ";
166  message += queue_delete ? "TRUE" : "FALSE";
167  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug, message);
168  size_t count = 0;
169  while (true) {
170  // If we are signalled to stop, break now.
172  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
173  "RabbitMQ SM: Skipped connect due to reinit");
174  return;
175  }
176  string uri = amqpclient_->FormAmqpUri(false);
177  try {
178  if (amqpclient_->rabbitmq_use_ssl()) {
179  int port = boost::lexical_cast<int>(
181 
182  channel_->CreateSecure(
187  port,
191  } else {
192  channel_->CreateFromUri(uri);
193  }
194  // passive = false, durable = false, auto_delete = false
195  channel_->DeclareExchange("vnc_config.object-update",
196  AmqpClient::Channel::EXCHANGE_TYPE_FANOUT, false, false, false);
197  string queue_name =
199 
200  if (queue_delete) {
201  channel_->DeleteQueue(queue_name, false, false);
202  }
203 
204  // passive = false, durable = false,
205  // exclusive = false, auto_delete = false
206  string queue = channel_->DeclareQueue(queue_name, false, false,
207  false, false);
208  channel_->BindQueue(queue, "vnc_config.object-update");
209  // no_local = true, no_ack = false,
210  // exclusive = false, message_prefetch_count = 0
211  consumer_tag_ = channel_->BasicConsume(queue, queue_name,
212  true, false, false, 0);
213  } catch (std::exception &e) {
214  static string what = e.what();
215  string message =
216  "RabbitMQ SM: Caught exception while connecting to RabbitMQ: "
217  + amqpclient_->rabbitmq_ip() + ":"
218  + amqpclient_->rabbitmq_port() + " : " + what;
219  cout << message << endl;
220  CONFIG_CLIENT_WARN(ConfigClientMgrWarning, message);
221  if (++count == amqpclient_->rabbitmq_server_list_len()) {
222  count = 0;
223  // Tried connecting to all given servers.. Now wait to reconnect
224  sleep(5);
225  }
227  continue;
228  } catch (...) {
229  string message =
230  "RabbitMQ SM: Caught fatal exception while "
231  "connecting to RabbitMQ: "
232  + amqpclient_->rabbitmq_ip() + ":"
234  cout << message << endl;
235  CONFIG_CLIENT_WARN(ConfigClientMgrWarning, message);
236  assert(0);
237  }
238 
240  amqpclient_->set_connected(true);
241  break;
242  }
243 }
244 
245 void ConfigAmqpClient::set_connected(bool connected) {
246  connection_status_ = connected;
248 }
249 
250 void ConfigAmqpClient::GetConnectionInfo(ConfigAmqpConnInfo &conn_info) const {
251  conn_info.connection_status = connection_status_;
252  conn_info.connection_status_change_at =
254  conn_info.url = FormAmqpUri(true);
255 }
256 
257 bool ConfigAmqpClient::ProcessMessage(const string &json_message) {
258  Document document;
259  document.Parse<0>(json_message.c_str());
260 
261  if (document.HasParseError()) {
262  size_t pos = document.GetErrorOffset();
263  // GetParseError returns const char *
264  cout << "Error in parsing JSON message from rabbitMQ at "
265  << pos << "with error description"
266  << document.GetParseError() << endl;
267  return false;
268  } else {
269  string oper = "";
270  string uuid_str = "";
271  string obj_type = "";
272  string obj_name = "";
273  for (Value::ConstMemberIterator itr = document.MemberBegin();
274  itr != document.MemberEnd(); ++itr) {
275  string key(itr->name.GetString());
276  if (key == "oper") {
277  oper = itr->value.GetString();
278  } else if (key == "type") {
279  obj_type = itr->value.GetString();
280  } else if (key == "fq_name") {
281  if (!itr->value.IsArray())
282  continue;
283  ostringstream os;
284  SizeType sz = itr->value.GetArray().Size();
285  if (sz == 0)
286  continue;
287  for (SizeType i = 0; i < sz-1; i++) {
288  os << itr->value[i].GetString() << ":";
289  }
290  os << itr->value[sz-1].GetString();
291  obj_name = os.str();
292  } else if (key == "uuid") {
293  uuid_str = itr->value.GetString();
294  }
295  }
296 
297  if ((oper == "") || (uuid_str == "") || (obj_type == "")) {
298  CONFIG_CLIENT_WARN(ConfigClientFQNameCache,
299  "Empty object name or empty type or empty uuid", obj_type,
300  obj_name, uuid_str);
301  return false;
302  }
303 
304  if ((oper == "CREATE") || (oper == "UPDATE")) {
305  if (obj_name.empty()) {
306  CONFIG_CLIENT_WARN(ConfigClientFQNameCache,
307  "Empty object name during CREATE/UPDATE",
308  obj_type, obj_name, uuid_str);
309  return false;
310  }
311 
312  // It is possible in some cases that RabbitMQ might club the
313  // CREATE and UPDATE for an object if they happen in quick
314  // succession (for instance control node failover). In such
315  // cases, we could only get an UPDATE of the object without
316  // a preceding CREATE. Handle the UPDATE as a CREATE and add
317  // object to FQNameCache.
318  // In the unlikely event that we do receive a CREATE for the
319  // same object after the UPDATE, we check if the FQNameCache
320  // is already present before adding to it.
321  // Also, it is ok to process the CREATE/UPDATE irrespective
322  // of the order in which they are received since we always
323  // read the uuid table from Cassandra.
324 
325  string stored_fq_name =
327  if (stored_fq_name == "ERROR") {
328  // FQName Cache entry not present. Create one.
329  // Log the event if the operation is an UPDATE.
330  if (oper == "UPDATE") {
331  CONFIG_CLIENT_WARN(ConfigClientFQNameCache,
332  "FQ Name Cache entry not found on UPDATE:",
333  obj_type, obj_name, uuid_str);
334  }
336  AddFQNameCache(uuid_str, obj_type, obj_name);
337  }
338  } else if (oper == "DELETE") {
340  InvalidateFQNameCache(uuid_str);
341  }
342 
343  CONFIG_CLIENT_RABBIT_MSG_TRACE(ConfigClientRabbitMQMsgTrace, oper,
344  obj_type, obj_name, uuid_str);
345  EnqueueUUIDRequest(oper, obj_type, uuid_str);
346  }
347  return true;
348 }
349 
351  AmqpClient::Envelope::ptr_t &envelope) {
352  try {
353  // timeout = 10ms.. To handle SIGHUP on config changes
354  // On reinit, config client manager will trigger the amqp client
355  // to shutdown. Blocking wait without timeout will not allow this.
356  channel_->BasicConsumeMessage(consumer_tag_, envelope, 10);
357  return true;
358  } catch (std::exception &e) {
359  static string what = e.what();
360  string message =
361  "RabbitMQ SM: Caught exception while receiving "
362  "messages from RabbitMQ: "
363  + amqpclient_->rabbitmq_ip() + ":"
364  + amqpclient_->rabbitmq_port() + " : " + what;
365  cout << message << endl;
366  CONFIG_CLIENT_WARN(ConfigClientMgrWarning, message);
367  return false;
368  } catch (...) {
369  string message =
370  "RabbitMQ SM: Caught fatal unknown exception while receiving "
371  "messages from RabbitMQ "
372  + amqpclient_->rabbitmq_ip() + ':'
373  + amqpclient_->rabbitmq_port();
374  cout << message << endl;
375  CONFIG_CLIENT_WARN(ConfigClientMgrWarning, message);
376  assert(0);
377  }
378  return true;
379 }
380 
382  AmqpClient::Envelope::ptr_t &envelope) {
383  try {
384  channel_->BasicAck(envelope);
385  } catch (std::exception &e) {
386  static string what = e.what();
387  string message =
388  "RabbitMQ SM: Caught exception while acking "
389  "messages from RabbitMQ: "
390  + amqpclient_->rabbitmq_ip() + ':'
391  + amqpclient_->rabbitmq_port() + ':' + what;
392  cout << message << endl;
393  CONFIG_CLIENT_WARN(ConfigClientMgrWarning, message);
394  return false;
395  } catch (...) {
396  string message =
397  "RabbitMQ SM: Caught fatal unknown exception while acking messages "
398  "from RabbitMQ " + amqpclient_->rabbitmq_ip() + ':'
399  + amqpclient_->rabbitmq_port();
400  cout << message << endl;
401  CONFIG_CLIENT_WARN(ConfigClientMgrWarning, message);
402  assert(0);
403  }
404  return true;
405 }
406 
407 
409  // If reinit is triggerred, don't wait for end of config trigger
410  // return from here to process reinit
411  if (amqpclient_->config_manager()->is_reinit_triggered()) {
413  ConfigClientMgrDebug,
414  "RabbitMQ SM: Reinit triggered, don't wait for end of config");
415  return true;
416  }
417 
418  // To start consuming the message, we should have finised bulk sync
419  amqpclient_->config_manager()->WaitForEndOfConfig();
420 
421  while (true) {
422  // Test only
423  if (amqpclient_->terminate())
424  break;
425  // If reinit is triggerred, break from the message receiving loop
426  if (amqpclient_->config_manager()->is_reinit_triggered()) {
427  CONFIG_CLIENT_DEBUG(ConfigClientMgrDebug,
428  "RabbitMQ SM: Reinit triggered, break from message receiving loop");
429  break;
430  }
431  AmqpClient::Envelope::ptr_t envelope;
432  if (ReceiveRabbitMessages(envelope) == false) {
433  ConnectToRabbitMQ(false);
434  continue;
435  }
436 
437  if (!envelope)
438  continue;
439 
440  amqpclient_->ProcessMessage(envelope->Message()->Body());
441  if (AckRabbitMessages(envelope) == false) {
442  ConnectToRabbitMQ(false);
443  continue;
444  }
445  }
446  return true;
447 }
std::string FormAmqpUri(bool hide_auth_info) const
ConfigJsonParserBase * config_json_parser()
tbb::atomic< uint64_t > connection_status_change_at_
void set_connected(bool connected)
bool ProcessMessage(const std::string &json_message)
bool AckRabbitMessages(AmqpClient::Envelope::ptr_t &envelop)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
std::string module_name() const
ConfigDbClient * config_db_client() const
bool stringToInteger(const std::string &str, NumberType &num)
Definition: string_util.h:71
void ConnectToRabbitMQ(bool queue_delete=true)
std::string rabbitmq_vhost() const
std::string rabbitmq_port() const
std::vector< std::string > rabbitmq_ips_
std::string rabbitmq_ssl_certfile() const
std::string rabbitmq_ssl_keyfile() const
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
#define CONFIG_CLIENT_DEBUG(obj,...)
int GetTaskId(const std::string &name)
Definition: task.cc:856
std::vector< Endpoint > endpoints_
tbb::atomic< bool > connection_status_
std::string rabbitmq_ip() const
void ReportRabbitMQConnectionStatus(bool connected) const
void GetConnectionInfo(ConfigAmqpConnInfo &info) const
const bool IsReadObjectType(std::string objectType)
ConfigClientManager * config_manager() const
std::vector< std::string > rabbitmq_ports_
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
boost::asio::ip::tcp::endpoint Endpoint
static Options options
std::string rabbitmq_ssl_ca_certs() const
bool ReceiveRabbitMessages(AmqpClient::Envelope::ptr_t &envelop)
#define CONFIG_CLIENT_RABBIT_MSG_TRACE(obj,...)
boost::tokenizer< boost::char_separator< char > > tokenizer
#define CONFIG_CLIENT_WARN(obj,...)
ConfigAmqpClient(ConfigClientManager *mgr, std::string hostname, std::string module_name, const ConfigClientOptions &options)
std::string hostname() const
std::string rabbitmq_user() const
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
void increment_rabbitmq_server_index()
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
static uint64_t UTCTimestampUsec()
Definition: time_util.h:13
static int compare(const Type &lhs, const Type &rhs)
virtual std::string FindFQName(const std::string &uuid) const
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
RabbitMQReader(ConfigAmqpClient *amqpclient)
static ConnectionState * GetInstance()
ConfigClientManager * mgr_
std::vector< std::string > rabbitmq_server_list
std::string rabbitmq_password() const
bool rabbitmq_use_ssl() const
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
size_t rabbitmq_server_list_len() const
std::vector< Endpoint > endpoints() const
struct task_ task
boost::scoped_ptr< ConfigAmqpChannel > channel_
static std::string UTCUsecToString(uint64_t tstamp)
Definition: time_util.h:54