OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
eql_if.cc
Go to the documentation of this file.
1 //
2 // Copyright (c) 2018 Juniper Networks, Inc. All rights reserved.
3 //
4 
5 #include <iostream>
6 #include <sandesh/sandesh.h>
7 #include <boost/algorithm/string.hpp>
8 #include <boost/algorithm/string/join.hpp>
9 #include <boost/foreach.hpp>
10 #include <boost/unordered_map.hpp>
11 #include <boost/system/error_code.hpp>
12 #include <eql_types.h>
13 #include "proto/kv.pb.h"
14 #include "schema/vnc_cfg_types.h"
15 #include "eql_if.h"
16 #include "eql_log.h"
17 #include "base/address_util.h"
18 
19 using namespace std;
20 using namespace etcd::etcdql;
21 
22 using grpc::Channel;
27 
29  EQL_TRACE_BUF, 10000));
30 
31 EtcdIf::EtcdIf(const std::vector<std::string> &etcd_hosts,
32  const int port, bool useSsl)
33  : port_(port),
34  useSsl_(useSsl) {
35 
36  BOOST_FOREACH(const std::string &etcd_host, etcd_hosts) {
37  hosts_.push_back(etcd_host);
38  boost::system::error_code ec;
39  boost::asio::ip::address etcd_addr;
40  etcd_addr = AddressFromString(etcd_host, &ec);
41  if (ec) {
42  EQL_DEBUG(EtcdClientDebug, "Invalid IP address");
43  }
44  Endpoint endpoint(etcd_addr, port);
45  endpoints_.push_back(endpoint);
46  }
47 
49 }
50 
52 }
53 
55  ostringstream url;
56 
57  BOOST_FOREACH(const std::string &etcd_host, hosts_) {
58  url << etcd_host << ":" << port_;
59 
60  shared_ptr<Channel> chan;
61 
62  if (useSsl_) {
63  auto channel_creds = grpc::SslCredentials(
64  grpc::SslCredentialsOptions());
65  chan = grpc::CreateChannel(url.str(), channel_creds);
66  } else {
67  chan = grpc::CreateChannel(
68  url.str(),
69  grpc::InsecureChannelCredentials());
70  }
71 
72  //if (chan->GetState(false) != GRPC_CHANNEL_READY) continue;
73 
74  kv_stub_ = KV::NewStub(chan);
75  watch_stub_ = Watch::NewStub(chan);
76  return true;
77  }
78 
79  return false;
80 }
81 
82 EtcdResponse EtcdIf::Get(string const& key,
83  string const& range_end,
84  int limit) {
85  ostringstream os;
86 
87  EQL_DEBUG(EtcdClientDebug, os << "Get Request - key: " << key
88  << " range_end: " << range_end
89  << " limit: " << limit);
90 
94  RangeRequest req;
95  req.set_key(key);
96  req.set_range_end(range_end);
99  req.set_limit(limit);
100 
104  get_call_.reset(new EtcdAsyncGetCall);
105  get_call_->get_reader_ = kv_stub_->AsyncRange(&get_call_->ctx_,
106  req,
107  &get_call_->cq_);
108  get_call_->get_reader_->Finish(&get_call_->get_resp_,
109  &get_call_->status_,
110  (static_cast<void *>(&get_call_->gtag_)));
111 
115  EtcdResponse resp = get_call_->ParseGetResponse();
116 
117  return resp;
118 }
119 
121  EtcdResponse resp;
122  void* got_tag;
123  bool ok = false;
124  ostringstream os;
125 
126  // Block until the next result is available in the completion queue "cq".
127  while (cq_.Next(&got_tag, &ok)) {
128 
129  /*
130  * The tag is the memory location of the call's tag object
131  */
136  if (!status_.ok()) {
137  resp.set_err_code(status_.error_code());
138  resp.set_err_msg(status_.error_message());
139  EQL_TRACE(EtcdClientErrorTrace, "Get Response: Error",
140  resp.err_code(), resp.err_msg());
141  break;
142  }
143 
144  if (got_tag == (static_cast<void *>(&gtag_))) {
145  /*
146  * Read the Get response.
147  */
148  resp.set_revision((get_resp_.header()).revision());
149  if(get_resp_.kvs_size() == 0) {
150  resp.set_err_code(100);
151  resp.set_err_msg("Prefix/Key not found");
152  EQL_TRACE(EtcdClientErrorTrace,
153  "Get Response: Prefix Not Found",
154  resp.err_code(),
155  resp.err_msg());
156  break;
157  } else {
158  multimap<string, string> kvs;
159  for(int i = 0; i < get_resp_.kvs_size(); i++) {
160  kvs.insert(pair<string, string>
161  (get_resp_.kvs(i).key(),
162  get_resp_.kvs(i).value()));
163  }
164  resp.set_kv_map(kvs);
165  }
166  }
167 
168  os.str("");
169  EQL_DEBUG(EtcdClientDebug, os << "Get Response: Success"
170  << " revision: "
171  << resp.revision()
172  << " KEY-VALUE LIST: ");
173 
174  for(int i = 0; i < get_resp_.kvs_size(); i++) {
175 
176  os.str("");
177  EQL_DEBUG(EtcdClientDebug, os << " Index: " << i
178  << " Key: "
179  << get_resp_.kvs(i).key()
180  << " Value: "
181  << get_resp_.kvs(i).value());
182  }
183 
184  break;
185  }
186 
187  return (resp);
188 }
189 
190 void EtcdIf::Set (const string& key, const string& value) {
191  void *got_tag;
192  bool ok = false;
193  ostringstream os;
194 
195  EQL_DEBUG(EtcdClientDebug, os << "Set Request - Key: "
196  << key
197  << " Value: "
198  << value);
199 
203  PutRequest req;
204  req.set_key(key);
205  req.set_value(value);
206  req.set_prev_kv(true);
207 
211  set_call_.reset(new EtcdAsyncSetCall);
212  set_call_->set_reader_ = kv_stub_->AsyncPut(&set_call_->ctx_,
213  req,
214  &set_call_->cq_);
215  set_call_->set_reader_->Finish(&set_call_->set_resp_,
216  &set_call_->status_,
217  (void*)this);
218 
222  while (set_call_->cq_.Next(&got_tag, &ok)) {
223 
224  if (!set_call_->status_.ok()) {
225  EQL_TRACE(EtcdClientErrorTrace,
226  "Set Response: Error",
227  set_call_->status_.error_code(),
228  set_call_->status_.error_message());
229  }
230 
231  if (got_tag == (void *)this) {
232  os.str("");
233  EQL_DEBUG(EtcdClientDebug, os << "Set Response: Success"
234  << " PrevKey: "
235  << (set_call_->set_resp_.prev_kv()).key()
236  << " PrevValue: "
237  << (set_call_->set_resp_.prev_kv()).value());
238  }
239  break;
240  }
241 }
242 
243 void EtcdIf::Delete (const string& key, string const& range_end) {
244  void *got_tag;
245  bool ok = false;
246  ostringstream os;
247 
248  EQL_DEBUG(EtcdClientDebug, os << "Delete Request - Key: "
249  << key
250  << " Range End: "
251  << range_end);
252 
256  DeleteRangeRequest req;
257  req.set_key(key);
258  req.set_range_end(range_end);
259  req.set_prev_kv(true);
260 
265  delete_call_->delete_reader_ = kv_stub_->AsyncDeleteRange(
266  &delete_call_->ctx_,
267  req,
268  &delete_call_->cq_);
269  delete_call_->delete_reader_->Finish(&delete_call_->delete_resp_,
270  &delete_call_->status_,
271  (void*)this);
272 
276  while (delete_call_->cq_.Next(&got_tag, &ok)) {
277  if (!delete_call_->status_.ok()) {
278  EQL_TRACE(EtcdClientErrorTrace,
279  "Delete Response: Error",
280  delete_call_->status_.error_code(),
281  delete_call_->status_.error_message());
282  }
283  if (got_tag == (void *)this) {
284 
285  os.str("");
286  EQL_DEBUG(EtcdClientDebug, os << "Delete Response: Success"
287  << " # Keys Deleted: "
288  << delete_call_->delete_resp_.deleted());
289 
290  for (int i = 0; i < delete_call_->delete_resp_.deleted(); i++) {
291 
292  os.str("");
293  EQL_DEBUG(EtcdClientDebug, os << " Index: " << i
294  << " PrevKey: "
295  << (delete_call_->delete_resp_.prev_kvs(i)).key()
296  << " PrevVal: "
297  << (delete_call_->delete_resp_.prev_kvs(i)).value());
298  }
299  }
300  break;
301  }
302 }
303 
304 void EtcdIf::Watch (const string& key, WatchCb cb) {
305  WatchRequest req;
306  WatchCreateRequest create_req;
307  void* got_tag;
308  bool ok = false;
309  ostringstream os;
310 
311  EQL_DEBUG(EtcdClientDebug, os << "Watch Request - Key: " << key);
312 
316  watch_call_->watch_reader_ = watch_stub_->AsyncWatch(&watch_call_->ctx_,
317  &watch_call_->cq_,
318  (void *)this);
319 
320  create_req.set_key(key);
321  create_req.set_prev_kv(true);
322 
323  string range_end(key);
324  range_end.back() = ((int)range_end[range_end.length()-1])+1;
325 
326  create_req.set_range_end(range_end);
327 
328  req.mutable_create_request()->CopyFrom(create_req);
329 
337  while (watch_call_->cq_.Next(&got_tag, &ok)) {
338  if (got_tag == (void *)this) {
339  watch_call_->watch_reader_->Write(req, (void *)"write");
340  watch_call_->watch_reader_->Read(&watch_call_->watch_resp_,
341  (void *)(&watch_call_->wtag));
342  break;
343  }
344  }
345 
346  /* Set bool indicating that watch has started */
347  watch_call_->watch_active_ = true;
348 
349  /* Wait for changes to happen and process them */
350  watch_call_->WaitForWatchResponse(cb);
351 }
352 
354  void* got_tag;
355  bool ok = false;
356  EtcdResponse resp;
357  ostringstream os;
358 
362  while(cq_.Next(&got_tag, &ok)) {
363 
373  if (!ok || !watch_active_) {
374  resp.set_err_code(10);
375  resp.set_err_msg("Watch RPC failed");
376 
377  EQL_TRACE(EtcdClientErrorTrace,
378  "Watch Response: Error",
379  resp.err_code(),
380  resp.err_msg());
381  break;
382  }
383 
384  if (got_tag == (void*)&wtag) {
389  if (watch_resp_.events_size()) {
390 
395  resp.set_revision(watch_resp_.header().revision());
396  for (int i = 0; i < watch_resp_.events_size(); i++) {
397  auto event = watch_resp_.events(i);
398 
399  switch (event.type()) {
401  if (event.kv().version() == 0) {
402  resp.set_action(CREATE);
403  } else {
404  resp.set_action(UPDATE);
405  }
406  break;
407  }
409  resp.set_action(DELETE);
410  break;
411  }
412  default:
413  break;
414  }
415  resp.set_key(event.kv().key());
416  resp.set_val(event.kv().value());
417 
418  if (event.has_prev_kv()) {
419  resp.set_prev_key(event.prev_kv().key());
420  resp.set_prev_val(event.prev_kv().value());
421  }
422 
423  EQL_DEBUG(EtcdClientDebug, os << "Watch Response: "
424  << "Success"
425  << " revision: "
426  << resp.revision()
427  << " action: "
428  << resp.action()
429  << " Key: "
430  << resp.key()
431  << " Value: "
432  << resp.value()
433  << " PrevKey: "
434  << resp.prev_key()
435  << " PrevValue: "
436  << resp.prev_value());
437 
441  cb(resp);
442  }
443  }
444 
448  watch_reader_->Read(&watch_resp_, (void*)&wtag);
449  }
450  } // while
451 }
452 
454  if (watch_call_->watch_active_) {
455  watch_call_->watch_active_ = false;
456  watch_call_->watch_reader_->WritesDone((void *)"Stop Watch");
457  }
458 }
void set_value(const ::std::string &value)
Definition: rpc.pb.h:10019
const std::string & prev_value() const
Definition: eql_if.h:228
std::unique_ptr< Watch::Stub > watch_stub_
Definition: eql_if.h:120
void set_err_msg(std::string msg)
Definition: eql_if.h:199
WatchAction action() const
Definition: eql_if.h:208
void set_key(const ::std::string &value)
Definition: rpc.pb.h:11531
void set_range_end(const ::std::string &value)
Definition: rpc.pb.h:10255
void set_key(const ::std::string &value)
Definition: rpc.pb.h:10202
const std::string & prev_key() const
Definition: eql_if.h:223
virtual EtcdResponse Get(const std::string &key, const std::string &range_end, int limit)
Definition: eql_if.cc:82
std::vector< std::string > hosts_
Definition: eql_if.h:116
const std::string & value() const
Definition: eql_if.h:218
SandeshTraceBufferPtr EqlTraceBuf
#define EQL_DEBUG(obj, arg)
Definition: eql_log.h:25
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
Definition: sandesh_trace.h:18
virtual ~EtcdIf()
Definition: eql_if.cc:51
virtual void Set(const std::string &key, const std::string &value)
Definition: eql_if.cc:190
const ::std::string & key() const
Definition: kv.pb.h:396
std::unique_ptr< EtcdAsyncSetCall > set_call_
Definition: eql_if.h:150
const ::std::string & value() const
Definition: kv.pb.h:491
#define EQL_TRACE_BUF
Definition: eql_log.h:12
virtual bool Connect()
Definition: eql_if.cc:54
void CopyFrom(const ::google::protobuf::Message &from) PROTOBUF_FINAL
Definition: rpc.pb.cc:10178
#define EQL_TRACE(obj,...)
Definition: eql_log.h:35
const std::string & key() const
Definition: eql_if.h:213
void set_key(std::string key)
Definition: eql_if.h:214
void set_prev_val(std::string prev_val)
Definition: eql_if.h:229
const ::etcdserverpb::ResponseHeader & header() const
Definition: rpc.pb.h:9859
virtual void StopWatch()
Definition: eql_if.cc:453
void set_revision(int revision)
Definition: eql_if.h:204
void set_range_end(const ::std::string &value)
Definition: rpc.pb.h:11584
const std::string & err_msg() const
Definition: eql_if.h:198
void set_val(std::string val)
Definition: eql_if.h:219
void set_err_code(int code)
Definition: eql_if.h:194
void set_prev_kv(bool value)
Definition: rpc.pb.h:10308
int err_code() const
Definition: eql_if.h:193
void set_key(const ::std::string &value)
Definition: rpc.pb.h:9966
void set_sort_target(::etcdserverpb::RangeRequest_SortTarget value)
Definition: rpc.pb.h:9797
void set_kv_map(kv_map kvs)
Definition: eql_if.h:234
void set_limit(::google::protobuf::int64 value)
Definition: rpc.pb.h:9755
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
void set_prev_kv(bool value)
Definition: rpc.pb.h:10086
std::unique_ptr< EtcdAsyncWatchCall > watch_call_
Definition: eql_if.h:172
virtual void Delete(const std::string &key, const std::string &range_end)
Definition: eql_if.cc:243
boost::function< void(const EtcdResponse &Resp)> WatchCb
Definition: eql_if.h:45
boost::asio::ip::tcp::endpoint Endpoint
Definition: eql_if.h:28
std::unique_ptr< EtcdAsyncGetCall > get_call_
Definition: eql_if.h:140
int revision() const
Definition: eql_if.h:203
std::vector< Endpoint > endpoints_
Definition: eql_if.h:115
void set_key(const ::std::string &value)
Definition: rpc.pb.h:9649
void set_sort_order(::etcdserverpb::RangeRequest_SortOrder value)
Definition: rpc.pb.h:9783
void set_prev_key(std::string prev_key)
Definition: eql_if.h:224
std::unique_ptr< KV::Stub > kv_stub_
Definition: eql_if.h:119
std::unique_ptr< EtcdAsyncDeleteCall > delete_call_
Definition: eql_if.h:160
virtual void Watch(const std::string &key, WatchCb cb)
Definition: eql_if.cc:304
void set_range_end(const ::std::string &value)
Definition: rpc.pb.h:9702
const ::mvccpb::KeyValue & kvs(int index) const
Definition: rpc.pb.h:9903
void set_action(WatchAction action)
Definition: eql_if.h:209
::etcdserverpb::WatchCreateRequest * mutable_create_request()
Definition: rpc.pb.h:11460
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
Definition: sandesh_trace.h:46
void set_prev_kv(bool value)
Definition: rpc.pb.h:11695