7 #include <boost/algorithm/string/find.hpp>
8 #include <boost/lexical_cast.hpp>
12 #include <SimpleAmqpClient/SimpleAmqpClient.h>
13 #include "rapidjson/document.h"
22 #include "config_client_log_types.h"
25 #include "config_client_show_types.h"
27 using namespace boost;
29 using namespace contrail_rapidjson;
36 Task(amqpclient->reader_task_id()), amqpclient_(amqpclient) {
37 channel_.reset(ConfigStaticObjectFactory::Create<ConfigAmqpChannel>());
46 string Description()
const {
return "ConfigAmqpClient::RabbitMQReader"; }
52 void ConnectToRabbitMQ(
bool queue_delete =
true);
53 bool AckRabbitMessages(AmqpClient::Envelope::ptr_t &envelop);
54 bool ReceiveRabbitMessages(AmqpClient::Envelope::ptr_t &envelop);
59 mgr_(mgr), hostname_(hostname), module_name_(module_name),
60 current_server_index_(0), terminate_(false),
61 rabbitmq_user_(options.rabbitmq_user),
62 rabbitmq_password_(options.rabbitmq_password),
63 rabbitmq_vhost_(options.rabbitmq_vhost),
64 rabbitmq_use_ssl_(options.rabbitmq_use_ssl),
65 rabbitmq_ssl_version_(options.rabbitmq_ssl_version),
66 rabbitmq_ssl_keyfile_(options.rabbitmq_ssl_keyfile),
67 rabbitmq_ssl_certfile_(options.rabbitmq_ssl_certfile),
68 rabbitmq_ssl_ca_certs_(options.rabbitmq_ssl_ca_certs) {
79 for (vector<string>::const_iterator iter =
82 string server_info(*iter);
83 typedef boost::tokenizer<boost::char_separator<char> >
tokenizer;
84 boost::char_separator<char> sep(
":");
85 tokenizer tokens(server_info, sep);
86 tokenizer::iterator tit = tokens.begin();
90 string port_str(*tit);
95 boost::system::error_code ec;
105 ConfigClientMgrDebug,
106 "RabbitMQ SM: StartRabbitMQReader: RabbitMQ disabled");
113 ConfigClientMgrDebug,
114 "RabbitMQ SM: StartRabbitMQReader: re init triggered,"
115 " dont start RabbitMQ");
131 const string user = hide_auth_info ?
"********" :
rabbitmq_user();
133 string uri = string(
"amqp://" + user +
":" +
147 process::ConnectionType::DATABASE,
"RabbitMQ",
148 process::ConnectionStatus::UP,
149 endpoints(),
"RabbitMQ connection established");
151 "RabbitMQ SM: RabbitMQ connection established");
154 process::ConnectionType::DATABASE,
"RabbitMQ",
155 process::ConnectionStatus::DOWN,
156 endpoints(),
"RabbitMQ connection down");
158 "RabbitMQ SM: RabbitMQ connection down");
165 string message =
"RabbitMQ SM: Connect to Rabbit MQ with queue_delete ";
166 message += queue_delete ?
"TRUE" :
"FALSE";
173 "RabbitMQ SM: Skipped connect due to reinit");
179 int port = boost::lexical_cast<
int>(
195 channel_->DeclareExchange(
"vnc_config.object-update",
196 AmqpClient::Channel::EXCHANGE_TYPE_FANOUT,
false,
false,
false);
201 channel_->DeleteQueue(queue_name,
false,
false);
206 string queue =
channel_->DeclareQueue(queue_name,
false,
false,
208 channel_->BindQueue(queue,
"vnc_config.object-update");
212 true,
false,
false, 0);
213 }
catch (std::exception &e) {
214 static string what = e.what();
216 "RabbitMQ SM: Caught exception while connecting to RabbitMQ: "
219 cout << message << endl;
230 "RabbitMQ SM: Caught fatal exception while "
231 "connecting to RabbitMQ: "
234 cout << message << endl;
252 conn_info.connection_status_change_at =
259 document.Parse<0>(json_message.c_str());
261 if (document.HasParseError()) {
262 size_t pos = document.GetErrorOffset();
264 cout <<
"Error in parsing JSON message from rabbitMQ at "
265 << pos <<
"with error description"
266 << document.GetParseError() << endl;
270 string uuid_str =
"";
271 string obj_type =
"";
272 string obj_name =
"";
273 for (Value::ConstMemberIterator itr = document.MemberBegin();
274 itr != document.MemberEnd(); ++itr) {
275 string key(itr->name.GetString());
277 oper = itr->value.GetString();
278 }
else if (key ==
"type") {
279 obj_type = itr->value.GetString();
280 }
else if (key ==
"fq_name") {
281 if (!itr->value.IsArray())
284 SizeType sz = itr->value.GetArray().Size();
287 for (SizeType i = 0; i < sz-1; i++) {
288 os << itr->value[i].GetString() <<
":";
290 os << itr->value[sz-1].GetString();
292 }
else if (key ==
"uuid") {
293 uuid_str = itr->value.GetString();
297 if ((oper ==
"") || (uuid_str ==
"") || (obj_type ==
"")) {
299 "Empty object name or empty type or empty uuid", obj_type,
304 if ((oper ==
"CREATE") || (oper ==
"UPDATE")) {
305 if (obj_name.empty()) {
307 "Empty object name during CREATE/UPDATE",
308 obj_type, obj_name, uuid_str);
325 string stored_fq_name =
327 if (stored_fq_name ==
"ERROR") {
330 if (oper ==
"UPDATE") {
332 "FQ Name Cache entry not found on UPDATE:",
333 obj_type, obj_name, uuid_str);
336 AddFQNameCache(uuid_str, obj_type, obj_name);
338 }
else if (oper ==
"DELETE") {
340 InvalidateFQNameCache(uuid_str);
344 obj_type, obj_name, uuid_str);
351 AmqpClient::Envelope::ptr_t &envelope) {
356 channel_->BasicConsumeMessage(consumer_tag_, envelope, 10);
358 }
catch (std::exception &e) {
359 static string what = e.what();
361 "RabbitMQ SM: Caught exception while receiving "
362 "messages from RabbitMQ: "
363 + amqpclient_->rabbitmq_ip() +
":"
364 + amqpclient_->rabbitmq_port() +
" : " + what;
365 cout << message << endl;
370 "RabbitMQ SM: Caught fatal unknown exception while receiving "
371 "messages from RabbitMQ "
372 + amqpclient_->rabbitmq_ip() +
':'
373 + amqpclient_->rabbitmq_port();
374 cout << message << endl;
382 AmqpClient::Envelope::ptr_t &envelope) {
384 channel_->BasicAck(envelope);
385 }
catch (std::exception &e) {
386 static string what = e.what();
388 "RabbitMQ SM: Caught exception while acking "
389 "messages from RabbitMQ: "
390 + amqpclient_->rabbitmq_ip() +
':'
391 + amqpclient_->rabbitmq_port() +
':' + what;
392 cout << message << endl;
397 "RabbitMQ SM: Caught fatal unknown exception while acking messages "
398 "from RabbitMQ " + amqpclient_->rabbitmq_ip() +
':'
399 + amqpclient_->rabbitmq_port();
400 cout << message << endl;
411 if (amqpclient_->config_manager()->is_reinit_triggered()) {
413 ConfigClientMgrDebug,
414 "RabbitMQ SM: Reinit triggered, don't wait for end of config");
419 amqpclient_->config_manager()->WaitForEndOfConfig();
423 if (amqpclient_->terminate())
426 if (amqpclient_->config_manager()->is_reinit_triggered()) {
428 "RabbitMQ SM: Reinit triggered, break from message receiving loop");
431 AmqpClient::Envelope::ptr_t envelope;
432 if (ReceiveRabbitMessages(envelope) ==
false) {
433 ConnectToRabbitMQ(
false);
440 amqpclient_->ProcessMessage(envelope->Message()->Body());
441 if (AckRabbitMessages(envelope) ==
false) {
442 ConnectToRabbitMQ(
false);
std::string FormAmqpUri(bool hide_auth_info) const
ConfigJsonParserBase * config_json_parser()
tbb::atomic< uint64_t > connection_status_change_at_
void set_connected(bool connected)
bool ProcessMessage(const std::string &json_message)
void StartRabbitMQReader()
bool AckRabbitMessages(AmqpClient::Envelope::ptr_t &envelop)
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
std::string module_name() const
string Description() const
ConfigDbClient * config_db_client() const
bool stringToInteger(const std::string &str, NumberType &num)
void ConnectToRabbitMQ(bool queue_delete=true)
std::string rabbitmq_vhost() const
std::string rabbitmq_port() const
std::vector< std::string > rabbitmq_ips_
std::string rabbitmq_ssl_certfile() const
std::string rabbitmq_ssl_keyfile() const
virtual bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
#define CONFIG_CLIENT_DEBUG(obj,...)
int GetTaskId(const std::string &name)
std::vector< Endpoint > endpoints_
tbb::atomic< bool > connection_status_
std::string rabbitmq_ip() const
void ReportRabbitMQConnectionStatus(bool connected) const
void GetConnectionInfo(ConfigAmqpConnInfo &info) const
const bool IsReadObjectType(std::string objectType)
ConfigClientManager * config_manager() const
std::vector< std::string > rabbitmq_ports_
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
boost::asio::ip::tcp::endpoint Endpoint
std::string rabbitmq_ssl_ca_certs() const
bool ReceiveRabbitMessages(AmqpClient::Envelope::ptr_t &envelop)
#define CONFIG_CLIENT_RABBIT_MSG_TRACE(obj,...)
boost::tokenizer< boost::char_separator< char > > tokenizer
ConfigAmqpClient * amqpclient_
#define CONFIG_CLIENT_WARN(obj,...)
ConfigAmqpClient(ConfigClientManager *mgr, std::string hostname, std::string module_name, const ConfigClientOptions &options)
std::string hostname() const
std::string rabbitmq_user() const
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
void increment_rabbitmq_server_index()
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
static uint64_t UTCTimestampUsec()
static int compare(const Type &lhs, const Type &rhs)
bool is_reinit_triggered()
virtual std::string FindFQName(const std::string &uuid) const
void EnqueueUUIDRequest(std::string oper, std::string obj_type, std::string uuid_str)
RabbitMQReader(ConfigAmqpClient *amqpclient)
static ConnectionState * GetInstance()
ConfigClientManager * mgr_
std::vector< std::string > rabbitmq_server_list
std::string rabbitmq_password() const
bool rabbitmq_use_ssl() const
Task is a wrapper over tbb::task to support policies.
size_t rabbitmq_server_list_len() const
std::vector< Endpoint > endpoints() const
boost::scoped_ptr< ConfigAmqpChannel > channel_
static std::string UTCUsecToString(uint64_t tstamp)