6 #include <sandesh/sandesh.h>
7 #include <boost/algorithm/string.hpp>
8 #include <boost/algorithm/string/join.hpp>
9 #include <boost/foreach.hpp>
10 #include <boost/unordered_map.hpp>
11 #include <boost/system/error_code.hpp>
12 #include <eql_types.h>
14 #include "schema/vnc_cfg_types.h"
20 using namespace etcd::etcdql;
31 EtcdIf::EtcdIf(
const std::vector<std::string> &etcd_hosts,
32 const int port,
bool useSsl)
36 BOOST_FOREACH(
const std::string &etcd_host, etcd_hosts) {
37 hosts_.push_back(etcd_host);
38 boost::system::error_code ec;
39 boost::asio::ip::address etcd_addr;
42 EQL_DEBUG(EtcdClientDebug,
"Invalid IP address");
57 BOOST_FOREACH(
const std::string &etcd_host,
hosts_) {
58 url << etcd_host <<
":" <<
port_;
60 shared_ptr<Channel> chan;
63 auto channel_creds = grpc::SslCredentials(
64 grpc::SslCredentialsOptions());
65 chan = grpc::CreateChannel(url.str(), channel_creds);
67 chan = grpc::CreateChannel(
69 grpc::InsecureChannelCredentials());
83 string const& range_end,
87 EQL_DEBUG(EtcdClientDebug, os <<
"Get Request - key: " << key
88 <<
" range_end: " << range_end
89 <<
" limit: " << limit);
110 (static_cast<void *>(&
get_call_->gtag_)));
127 while (
cq_.Next(&got_tag, &ok)) {
139 EQL_TRACE(EtcdClientErrorTrace,
"Get Response: Error",
144 if (got_tag == (static_cast<void *>(&
gtag_))) {
153 "Get Response: Prefix Not Found",
158 multimap<string, string> kvs;
160 kvs.insert(pair<string, string>
169 EQL_DEBUG(EtcdClientDebug, os <<
"Get Response: Success"
172 <<
" KEY-VALUE LIST: ");
177 EQL_DEBUG(EtcdClientDebug, os <<
" Index: " << i
195 EQL_DEBUG(EtcdClientDebug, os <<
"Set Request - Key: "
222 while (
set_call_->cq_.Next(&got_tag, &ok)) {
226 "Set Response: Error",
231 if (got_tag == (
void *)
this) {
233 EQL_DEBUG(EtcdClientDebug, os <<
"Set Response: Success"
235 << (
set_call_->set_resp_.prev_kv()).key()
237 << (
set_call_->set_resp_.prev_kv()).value());
248 EQL_DEBUG(EtcdClientDebug, os <<
"Delete Request - Key: "
279 "Delete Response: Error",
283 if (got_tag == (
void *)
this) {
286 EQL_DEBUG(EtcdClientDebug, os <<
"Delete Response: Success"
287 <<
" # Keys Deleted: "
290 for (
int i = 0; i <
delete_call_->delete_resp_.deleted(); i++) {
293 EQL_DEBUG(EtcdClientDebug, os <<
" Index: " << i
311 EQL_DEBUG(EtcdClientDebug, os <<
"Watch Request - Key: " << key);
323 string range_end(key);
324 range_end.back() = ((int)range_end[range_end.length()-1])+1;
338 if (got_tag == (
void *)
this) {
339 watch_call_->watch_reader_->Write(req, (
void *)
"write");
362 while(cq_.Next(&got_tag, &ok)) {
373 if (!ok || !watch_active_) {
378 "Watch Response: Error",
384 if (got_tag == (
void*)&wtag) {
389 if (watch_resp_.events_size()) {
396 for (
int i = 0; i < watch_resp_.events_size(); i++) {
397 auto event = watch_resp_.events(i);
399 switch (event.type()) {
401 if (event.kv().version() == 0) {
415 resp.
set_key(event.kv().key());
416 resp.
set_val(event.kv().value());
418 if (event.has_prev_kv()) {
423 EQL_DEBUG(EtcdClientDebug, os <<
"Watch Response: "
448 watch_reader_->Read(&watch_resp_, (
void*)&wtag);
456 watch_call_->watch_reader_->WritesDone((
void *)
"Stop Watch");
void set_value(const ::std::string &value)
void WaitForWatchResponse(WatchCb cb)
const std::string & prev_value() const
std::unique_ptr< Watch::Stub > watch_stub_
void set_err_msg(std::string msg)
WatchAction action() const
void set_key(const ::std::string &value)
void set_range_end(const ::std::string &value)
void set_key(const ::std::string &value)
const std::string & prev_key() const
virtual EtcdResponse Get(const std::string &key, const std::string &range_end, int limit)
std::vector< std::string > hosts_
const std::string & value() const
SandeshTraceBufferPtr EqlTraceBuf
#define EQL_DEBUG(obj, arg)
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
virtual void Set(const std::string &key, const std::string &value)
const ::std::string & key() const
std::unique_ptr< EtcdAsyncSetCall > set_call_
const ::std::string & value() const
EtcdResponse ParseGetResponse()
void CopyFrom(const ::google::protobuf::Message &from) PROTOBUF_FINAL
#define EQL_TRACE(obj,...)
const std::string & key() const
void set_key(std::string key)
void set_prev_val(std::string prev_val)
const ::etcdserverpb::ResponseHeader & header() const
void set_revision(int revision)
void set_range_end(const ::std::string &value)
const std::string & err_msg() const
void set_val(std::string val)
void set_err_code(int code)
void set_prev_kv(bool value)
void set_key(const ::std::string &value)
void set_sort_target(::etcdserverpb::RangeRequest_SortTarget value)
void set_kv_map(kv_map kvs)
void set_limit(::google::protobuf::int64 value)
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
void set_prev_kv(bool value)
std::unique_ptr< EtcdAsyncWatchCall > watch_call_
virtual void Delete(const std::string &key, const std::string &range_end)
boost::function< void(const EtcdResponse &Resp)> WatchCb
boost::asio::ip::tcp::endpoint Endpoint
std::unique_ptr< EtcdAsyncGetCall > get_call_
std::vector< Endpoint > endpoints_
void set_key(const ::std::string &value)
void set_sort_order(::etcdserverpb::RangeRequest_SortOrder value)
void set_prev_key(std::string prev_key)
std::unique_ptr< KV::Stub > kv_stub_
std::unique_ptr< EtcdAsyncDeleteCall > delete_call_
virtual void Watch(const std::string &key, WatchCb cb)
void set_range_end(const ::std::string &value)
const ::mvccpb::KeyValue & kvs(int index) const
void set_action(WatchAction action)
::etcdserverpb::WatchCreateRequest * mutable_create_request()
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
void set_prev_kv(bool value)