OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
config_amqp_client.h
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2017 Juniper Networks, Inc. All rights reserved.
3  */
4 
5 #ifndef config_amqp_client_h
6 #define config_amqp_client_h
7 
8 #include <string>
9 #include <vector>
10 
11 #include <boost/asio/ip/tcp.hpp>
12 
13 #include <SimpleAmqpClient/SimpleAmqpClient.h>
14 #include <tbb/atomic.h>
15 
16 struct ConfigClientOptions;
18 struct ConfigAmqpConnInfo;
19 
20 // Interface to AmqpClient
22 public:
24  virtual ~ConfigAmqpChannel() { }
25 
26  virtual AmqpClient::Channel::ptr_t CreateFromUri(std::string uri) {
27  return (channel_ = AmqpClient::Channel::CreateFromUri(uri));
28  }
29 
30  virtual AmqpClient::Channel::ptr_t CreateSecure(
31  std::string ca_cert, std::string host, std::string client_key,
32  std::string client_cert, int port, std::string username,
33  std::string password, std::string vhost, int frame_max = 131072,
34  bool verify_hostname = false) {
35  return (channel_ = AmqpClient::Channel::CreateSecure(ca_cert, host,
36  client_key, client_cert, port, username, password, vhost,
37  frame_max , verify_hostname));
38  }
39 
40  virtual void DeclareExchange(const std::string &exchange_name,
41  const std::string &exchange_type, bool passive, bool durable,
42  bool auto_delete) {
43  channel_->DeclareExchange(exchange_name, exchange_type, passive,
44  durable, auto_delete);
45  }
46 
47  virtual void DeleteQueue(const std::string &queue_name, bool if_unused,
48  bool if_empty) {
49  channel_->DeleteQueue(queue_name, if_unused, if_empty);
50  }
51 
52  virtual std::string DeclareQueue(const std::string &queue_name,
53  bool passive, bool durable, bool exclusive, bool auto_delete) {
54  return channel_->DeclareQueue(queue_name, passive, durable, exclusive,
55  auto_delete);
56  }
57 
58  virtual void BindQueue(const std::string &queue_name,
59  const std::string &exchange_name,
60  const std::string &routing_key = "") {
61  channel_->BindQueue(queue_name, exchange_name, routing_key);
62  }
63 
64  virtual std::string BasicConsume(const std::string &queue,
65  const std::string &consumer_tag, bool no_local, bool no_ack,
66  bool exclusive, boost::uint16_t message_prefetch_count) {
67  return channel_->BasicConsume(queue, consumer_tag, no_local, no_ack,
68  exclusive, message_prefetch_count);
69  }
70 
71  virtual bool BasicConsumeMessage(const std::string &consumer_tag,
72  AmqpClient::Envelope::ptr_t &envelope, int timeout) {
73  return channel_->BasicConsumeMessage(consumer_tag, envelope, timeout);
74  }
75 
76  virtual void BasicAck(const AmqpClient::Envelope::ptr_t &message) {
77  channel_->BasicAck(message);
78  }
79 
80 private:
81  AmqpClient::Channel::ptr_t channel_;
82 };
83 
84 /*
85  * This is class interacts with RabbitMQ
86  */
88 public:
89  typedef boost::asio::ip::tcp::endpoint Endpoint;
91  std::string module_name, const ConfigClientOptions &options);
92  virtual ~ConfigAmqpClient() { }
93 
94  void StartRabbitMQReader();
95 
96  std::string rabbitmq_ip() const {
98  return "";
100  }
101 
102  std::string rabbitmq_port() const {
103  if (current_server_index_ >= rabbitmq_ips_.size())
104  return "";
106  }
107 
108  size_t rabbitmq_server_list_len() const {
109  return rabbitmq_ips_.size();
110  }
111 
113  if (rabbitmq_ips_.size()) {
115  ((current_server_index_ + 1) % rabbitmq_ips_.size());
116  }
117  }
118 
119  std::string rabbitmq_user() const { return rabbitmq_user_; }
120  std::string rabbitmq_password() const { return rabbitmq_password_; }
121  std::string rabbitmq_vhost() const { return rabbitmq_vhost_; }
122  bool rabbitmq_use_ssl() const { return rabbitmq_use_ssl_; }
123  std::string rabbitmq_ssl_version() const { return rabbitmq_ssl_version_; }
124  std::string rabbitmq_ssl_keyfile() const { return rabbitmq_ssl_keyfile_; }
125  std::string rabbitmq_ssl_certfile() const { return rabbitmq_ssl_certfile_; }
126  std::string rabbitmq_ssl_ca_certs() const { return rabbitmq_ssl_ca_certs_; }
129  std::vector<Endpoint> endpoints() const { return endpoints_; }
130  int reader_task_id() const { return reader_task_id_; }
131  std::string hostname() const { return hostname_; }
132  std::string module_name() const { return module_name_; }
133 
134  static void set_disable(bool disable) { disable_ = disable; }
135 
136  std::string FormAmqpUri(bool hide_auth_info) const;
137  void EnqueueUUIDRequest(std::string oper, std::string obj_type,
138  std::string uuid_str);
139  bool ProcessMessage(const std::string &json_message);
140  void set_connected(bool connected);
141  void GetConnectionInfo(ConfigAmqpConnInfo &info) const;
142  // Test only
143  bool terminate() const { return terminate_; }
145 
146 private:
147  // A Job for reading the rabbitmq
148  class RabbitMQReader;
149 
150  void ReportRabbitMQConnectionStatus(bool connected) const;
151 
153  std::string hostname_;
154  std::string module_name_;
155 
159  std::vector<std::string> rabbitmq_ips_;
160  std::vector<std::string> rabbitmq_ports_;
161  std::string rabbitmq_user_;
162  std::string rabbitmq_password_;
163  std::string rabbitmq_vhost_;
169  static bool disable_;
170  std::vector<Endpoint> endpoints_;
171  tbb::atomic<bool> connection_status_;
172  tbb::atomic<uint64_t> connection_status_change_at_;
173 };
174 
175 #endif // config_amqp_client_h
std::string FormAmqpUri(bool hide_auth_info) const
tbb::atomic< uint64_t > connection_status_change_at_
void set_connected(bool connected)
bool ProcessMessage(const std::string &json_message)
virtual void BindQueue(const std::string &queue_name, const std::string &exchange_name, const std::string &routing_key="")
std::string module_name() const
std::string rabbitmq_ssl_keyfile_
std::string rabbitmq_ssl_certfile_
std::string rabbitmq_ssl_ca_certs_
AmqpClient::Channel::ptr_t channel_
std::string rabbitmq_vhost() const
std::string rabbitmq_port() const
virtual AmqpClient::Channel::ptr_t CreateSecure(std::string ca_cert, std::string host, std::string client_key, std::string client_cert, int port, std::string username, std::string password, std::string vhost, int frame_max=131072, bool verify_hostname=false)
std::vector< std::string > rabbitmq_ips_
std::string rabbitmq_ssl_version() const
std::string rabbitmq_ssl_certfile() const
std::string rabbitmq_ssl_keyfile() const
virtual std::string DeclareQueue(const std::string &queue_name, bool passive, bool durable, bool exclusive, bool auto_delete)
static void set_disable(bool disable)
std::vector< Endpoint > endpoints_
std::string rabbitmq_password_
tbb::atomic< bool > connection_status_
std::string rabbitmq_ip() const
void ReportRabbitMQConnectionStatus(bool connected) const
void GetConnectionInfo(ConfigAmqpConnInfo &info) const
ConfigClientManager * config_manager() const
std::vector< std::string > rabbitmq_ports_
std::string rabbitmq_user_
boost::asio::ip::tcp::endpoint Endpoint
static Options options
std::string rabbitmq_ssl_ca_certs() const
ConfigClientManager * config_manager()
virtual bool BasicConsumeMessage(const std::string &consumer_tag, AmqpClient::Envelope::ptr_t &envelope, int timeout)
virtual ~ConfigAmqpChannel()
virtual void DeclareExchange(const std::string &exchange_name, const std::string &exchange_type, bool passive, bool durable, bool auto_delete)
virtual void DeleteQueue(const std::string &queue_name, bool if_unused, bool if_empty)
ConfigAmqpClient(ConfigClientManager *mgr, std::string hostname, std::string module_name, const ConfigClientOptions &options)
std::string hostname() const
virtual std::string BasicConsume(const std::string &queue, const std::string &consumer_tag, bool no_local, bool no_ack, bool exclusive, boost::uint16_t message_prefetch_count)
std::string rabbitmq_user() const
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
void increment_rabbitmq_server_index()
int reader_task_id() const
std::string rabbitmq_vhost_
void set_terminate(bool terminate)
ConfigClientManager * mgr_
std::string rabbitmq_password() const
std::string module_name_
bool rabbitmq_use_ssl() const
virtual void BasicAck(const AmqpClient::Envelope::ptr_t &message)
std::string rabbitmq_ssl_version_
virtual AmqpClient::Channel::ptr_t CreateFromUri(std::string uri)
size_t rabbitmq_server_list_len() const
std::vector< Endpoint > endpoints() const
virtual ~ConfigAmqpClient()
bool terminate() const