OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
zookeeper_client.cc
Go to the documentation of this file.
1 //
2 // Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3 //
4 
5 #include <cerrno>
6 #include <cstring>
7 
8 #include <base/logging.h>
9 
13 
14 #define ZOO_LOG(_Level, _Msg) \
15  do { \
16  if (LoggingDisabled()) break; \
17  log4cplus::Logger logger = log4cplus::Logger::getRoot(); \
18  LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \
19  __LINE__ << ": " << _Msg); \
20  } while (false)
21 
22 #define ZOO_LOG_ERR(_Msg) \
23  do { \
24  LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \
25  << _Msg); \
26  } while (false)
27 
28 namespace zookeeper {
29 namespace interface {
30 
32  public:
34  }
35  virtual ~ZookeeperCBindings() {
36  }
37  virtual void ZooSetDebugLevel(ZooLogLevel logLevel) {
38  zoo_set_debug_level(logLevel);
39  }
40  virtual zhandle_t* ZookeeperInit(const char *host, watcher_fn fn,
41  int recv_timeout, const clientid_t *clientid, void *context,
42  int flags) {
43  return zookeeper_init(host, fn, recv_timeout, clientid, context,
44  flags);
45  }
46  virtual int ZookeeperClose(zhandle_t *zh) {
47  return zookeeper_close(zh);
48  }
49  virtual int ZooState(zhandle_t *zh) {
50  return zoo_state(zh);
51  }
52  virtual int ZooCreate(zhandle_t *zh, const char *path, const char *value,
53  int valuelen, const struct ACL_vector *acl, int flags,
54  char *path_buffer, int path_buffer_len) {
55  return zoo_create(zh, path, value, valuelen, acl, flags, path_buffer,
56  path_buffer_len);
57  }
58  virtual int ZooDelete(zhandle_t *zh, const char *path, int version) {
59  return zoo_delete(zh, path, version);
60  }
61  virtual int ZooGet(zhandle_t *zh, const char *path, int watch,
62  char *buffer, int* buffer_len, struct Stat *stat) {
63  return zoo_get(zh, path, watch, buffer, buffer_len, stat);
64  }
65  virtual int ZooExists(zhandle_t *zh, const char *path, int watch,
66  struct Stat *stat) {
67  return zoo_exists(zh, path, watch, stat);
68  }
69  virtual void ZooSetContext(zhandle_t * zh, void *context) {
70  return zoo_set_context(zh, context);
71  }
72  virtual int ZooIsUnrecoverable(zhandle_t * zh) {
73  return is_unrecoverable(zh);
74  }
75 };
76 
77 } // namespace interface
78 
79 namespace client {
80 namespace impl {
81 
82 void ZookeeperWatcher(zhandle_t* zh, int type, int state,
83  const char* path, void* watcherCtx) {
84  if (ZINVALIDSTATE == is_unrecoverable(zh)) {
85  ZOO_LOG(DEBUG, "Zookeeper callback called with state " << state);
86  ZookeeperClientImpl *zooImpl = (ZookeeperClientImpl *)watcherCtx;
87  if (zooImpl && zooImpl->GetClient()) {
88  if (((ZookeeperClient *)zooImpl->GetClient())->cb) {
89  ((ZookeeperClient *)zooImpl->GetClient())->cb();
90  }
91  }
92  }
93 }
94 
95 // ZookeeperClientImpl
97  const char *servers, zookeeper::interface::ZookeeperInterface *zki) :
98  hostname_(hostname),
99  servers_(servers),
100  zk_handle_(NULL),
101  connected_(false),
102  zki_(zki) {
103  // Set loglevel
104  zki_->ZooSetDebugLevel(ZOO_LOG_LEVEL_DEBUG);
105 }
106 
108 }
109 
111  while (true) {
112  zk_handle_ = zki_->ZookeeperInit(servers_.c_str(),
115  NULL,
116  NULL,
117  0);
118  if (zk_handle_ == NULL) {
119  int zerrno(errno);
120  ZOO_LOG_ERR("zookeeper_init FAILED: (" << zerrno <<
121  "): servers: " << servers_ << " retrying in 1 second");
122  sleep(1);
123  continue;
124  }
125  zki_->ZooSetContext(zk_handle_, this);
126  // Block till session is connected
127  while (!connected_) {
128  int zstate(zki_->ZooState(zk_handle_));
129  if (zstate == ZOO_CONNECTED_STATE) {
130  connected_ = true;
131  ZOO_LOG(DEBUG, "Session CONNECTED");
132  break;
133  } else {
134  ZOO_LOG(DEBUG, "Session NOT CONNECTED: retrying in 1 second");
135  sleep(1);
136  continue;
137  }
138  }
139  break;
140  }
141  assert(connected_);
142  return true;
143 }
144 
146  if (zk_handle_) {
147  int rc(zki_->ZookeeperClose(zk_handle_));
148  if (rc != ZOK) {
149  int zerrno(errno);
150  ZOO_LOG(WARN, "zookeeper_close FAILED (" << rc <<
151  "): errno: " << zerrno);
152  }
153  zk_handle_ = NULL;
154  }
155  connected_ = false;
156 }
157 
159  Shutdown();
160  return Connect();
161 }
162 
164  return connected_;
165 }
166 
167 static inline bool IsZooErrorRecoverable(int zerror) {
168  return zerror == ZCONNECTIONLOSS ||
169  zerror == ZOPERATIONTIMEOUT;
170 }
171 
172 static inline bool IsZooErrorUnrecoverable(int zerror) {
173  return zerror == ZINVALIDSTATE;
174 }
175 
176 int ZookeeperClientImpl::CreateNodeSync(const char *path, const char *value,
177  int *err, int flag) {
178  int rc;
179  retry:
180  do {
181  rc = zki_->ZooCreate(zk_handle_, path, value, strlen(value),
182  &ZOO_OPEN_ACL_UNSAFE, flag, NULL, -1);
183  } while (IsZooErrorRecoverable(rc));
184  if (IsZooErrorUnrecoverable(rc)) {
185  // Reconnect
186  Reconnect();
187  goto retry;
188  }
189  if (rc != ZOK) {
190  *err = errno;
191  }
192  return rc;
193 }
194 
195 bool ZookeeperClientImpl::CreateNode(const char *path, const char *value,
196  int flag) {
197  int err = 0;
198  int rc;
199  if (!IsConnected()) {
200  bool success(Connect());
201  if (!success) {
202  ZOO_LOG_ERR("Zookeeper Client Connect FAILED");
203  return false;
204  }
205  }
206  rc = CreateNodeSync(path, value, &err, flag);
207  if (rc != ZOK && rc != ZNODEEXISTS) {
208  ZOO_LOG_ERR("Creation of ZNODE(" << path << "): " << value
209  << ": FAILED: (" << rc << ") error: " << err);
210  return false;
211  }
212  return true;
213 }
214 
215 int ZookeeperClientImpl::GetNodeDataSync(const char *path, char *buf,
216  int *buf_len, int *err) {
217  int rc;
218  retry:
219  do {
220  rc = zki_->ZooGet(zk_handle_, path, 0, buf, buf_len, NULL);
221  } while (IsZooErrorRecoverable(rc));
222  if (IsZooErrorUnrecoverable(rc)) {
223  // Reconnect
224  Reconnect();
225  goto retry;
226  }
227  if (rc != ZOK) {
228  *err = errno;
229  }
230  return rc;
231 }
232 
233 bool ZookeeperClientImpl::CheckNodeExist(const char *path) {
234  if (!IsConnected()) {
235  bool success(Connect());
236  if (!success) {
237  ZOO_LOG_ERR("Zookeeper Client Connect FAILED");
238  return false;
239  }
240  }
241 
242  struct Stat stat;
243  int rc = zki_->ZooExists(zk_handle_, path, 0, &stat);
244  return (rc == ZOK);
245 }
246 
247 int ZookeeperClientImpl::DeleteNodeSync(const char *path, int *err) {
248  int rc;
249  retry:
250  do {
251  rc = zki_->ZooDelete(zk_handle_, path, -1);
252  } while (IsZooErrorRecoverable(rc));
253  if (IsZooErrorUnrecoverable(rc)) {
254  // Reconnect
255  Reconnect();
256  goto retry;
257  }
258  if (rc != ZOK) {
259  *err = errno;
260  }
261  return rc;
262 }
263 
264 bool ZookeeperClientImpl::DeleteNode(const char *path) {
265  int err = 0;
266  int rc;
267  if (!IsConnected()) {
268  bool success(Connect());
269  if (!success) {
270  ZOO_LOG_ERR("Zookeeper Client Connect FAILED");
271  return false;
272  }
273  }
274 
275  rc = DeleteNodeSync(path, &err);
276  if (rc != ZOK) {
277  ZOO_LOG_ERR("Deletion of ZNODE(" << path << "): "
278  << ": FAILED: (" << rc << ") error: " << err);
279  return false;
280  }
281 
282  return (rc == ZOK);
283 }
284 
285 std::string ZookeeperClientImpl::Name() const {
286  return hostname_;
287 }
288 
289 } // namespace impl
290 
291 // ZookeeperClient
292 ZookeeperClient::ZookeeperClient(const char *hostname, const char *servers) :
293  impl_(new impl::ZookeeperClientImpl(hostname, servers,
294  new zookeeper::interface::ZookeeperCBindings)) {
295 }
296 
298  impl_(impl) {
299 }
300 
301 bool ZookeeperClient::CreateNode(const char *path, const char *value,
302  int type) {
303  int flag = 0;
304  if (type == Z_NODE_TYPE_EPHEMERAL) {
305  flag |= ZOO_EPHEMERAL;
306  }
307  if (type == Z_NODE_TYPE_SEQUENCE) {
308  flag |= ZOO_SEQUENCE;
309  }
310  return impl_->CreateNode(path, value, flag);
311 }
312 
313 bool ZookeeperClient::CheckNodeExist(const char *path) {
314  return impl_->CheckNodeExist(path);
315 }
316 
317 bool ZookeeperClient::DeleteNode(const char *path) {
318  return impl_->DeleteNode(path);
319 }
320 
322  return impl_->Shutdown();
323 }
324 
326  cb = callback;
327  if (impl_.get()) {
328  impl_->SetClient(this);
329  }
330 }
331 
333 }
334 
335 // ZookeeperLockImpl
337  public:
339  const char *path) :
340  clientImpl_(clientImpl),
341  path_(path),
342  is_acquired_(false) {
343  id_ = clientImpl_->Name();
344  }
345 
346  std::string Id() const {
347  return id_;
348  }
349 
350  bool Lock() {
351  ZOO_LOG(INFO, "Trying (" << path_ << "): " << id_);
352  while (true) {
353  // Connect if not already done
354  if (!clientImpl_->IsConnected()) {
355  bool success(clientImpl_->Connect());
356  if (!success) {
357  ZOO_LOG_ERR("Zookeeper Client Connect FAILED");
358  return success;
359  }
360  }
361  // Try creating the znode
362  int err;
363  int rc(clientImpl_->CreateNodeSync(path_.c_str(), id_.c_str(),
364  &err, 0));
365  switch (rc) {
366  case ZOK: {
367  // We acquired the lock
368  ZOO_LOG(INFO, "ACQUIRED (" << path_ << "): " << id_);
369  is_acquired_ = true;
370  return true;
371  }
372  case ZNODEEXISTS: {
373  // Node exists, get node data and check
374  char buf[256];
375  int buf_len(sizeof(buf));
376  int zerr;
377  int zrc(clientImpl_->GetNodeDataSync(path_.c_str(), buf,
378  &buf_len, &zerr));
379  if (zrc == ZOK) {
380  // Does it match our ID?
381  std::string mid(buf, buf_len);
382  if (id_ == mid) {
383  // We acquired the lock
384  ZOO_LOG(INFO, "ACQUIRED EEXIST (" << path_ << "): "
385  << id_);
386  is_acquired_ = true;
387  return true;
388  }
389  ZOO_LOG_ERR("EEXIST (" << path_ << "): " << mid <<
390  " , ours: " << id_);
391  sleep(1);
392  continue;
393  } else if (zrc == ZNONODE) {
394  ZOO_LOG(WARN, "GetNodeDataSync(" << path_ <<
395  "): Data: " << id_ <<
396  ": No Node EXISTS: retrying in 1 second");
397  sleep(1);
398  continue;
399  } else {
400  ZOO_LOG_ERR("GetNodeDataSync(" << path_ << "): " <<
401  id_ << ": FAILED: (" << zrc << ") error: " << zerr);
403  return false;
404  }
405  break;
406  }
407  default: {
408  ZOO_LOG_ERR("CreateNodeSync(" << path_ << "): " << id_
409  << ": FAILED: (" << rc << ") error: " << err);
411  return false;
412  }
413  }
414  }
415  }
416 
417  bool Release() {
418  bool success;
419  int err, rc;
420  if (!is_acquired_) {
421  ZOO_LOG_ERR("(" << path_ << "): " << id_ <<
422  ": Release WITHOUT Lock");
423  success = false;
424  goto cleanup;
425  }
426  is_acquired_ = false;
427  rc = clientImpl_->DeleteNodeSync(path_.c_str(), &err);
428  if (rc == ZOK) {
429  ZOO_LOG(INFO, "RELEASED (" << path_ << "): " << id_);
430  success = true;
431  goto cleanup;
432  } else if (rc == ZNONODE) {
433  ZOO_LOG_ERR("DeleteNodeSync(" << path_ << "): " << id_ <<
434  ": No Node EXISTS(" << err <<
435  "): Possible concurrent execution");
436  success = false;
437  goto cleanup;
438  } else {
439  ZOO_LOG_ERR("DeleteNodeSync(" << path_ << "): " << id_ <<
440  ": FAILED (" << rc << "): error " << err);
441  success = false;
442  goto cleanup;
443  }
444  cleanup:
446  return success;
447  }
448 
449  private:
451  std::string path_;
453  std::string id_;
454 };
455 
456 // ZookeeperLock
457 ZookeeperLock::ZookeeperLock(ZookeeperClient *client, const char *path) :
458  impl_(new ZookeeperLockImpl(client->impl_.get(), path)) {
459 }
460 
462 }
463 
464 std::string ZookeeperLock::Id() const {
465  return impl_->Id();
466 }
467 
469  return impl_->Lock();
470 }
471 
473  return impl_->Release();
474 }
475 
476 } // namespace client
477 } // namespace zookeeper
ZookeeperLock(ZookeeperClient *client, const char *path)
virtual void ZooSetDebugLevel(ZooLogLevel logLevel)
virtual int ZooIsUnrecoverable(zhandle_t *zh)
#define ZOO_LOG_ERR(_Msg)
int CreateNodeSync(const char *path, const char *value, int *err, int flag)
virtual int ZooGet(zhandle_t *zh, const char *path, int watch, char *buffer, int *buffer_len, struct Stat *stat)
bool CreateNode(const char *path, const char *data, int type=Z_NODE_TYPE_PERSISTENT)
bool CheckNodeExist(const char *path)
std::unique_ptr< zookeeper::interface::ZookeeperInterface > zki_
bool CreateNode(const char *path, const char *value, int flag)
static bool IsZooErrorRecoverable(int zerror)
virtual int ZooExists(zhandle_t *zh, const char *path, int watch, struct Stat *stat)
uint8_t type
Definition: load_balance.h:109
std::unique_ptr< ZookeeperLockImpl > impl_
ZookeeperLockImpl(impl::ZookeeperClientImpl *clientImpl, const char *path)
#define ZOO_LOG(_Level, _Msg)
static bool IsZooErrorUnrecoverable(int zerror)
int DeleteNodeSync(const char *path, int *err)
void AddListener(ZooStateCallback cb)
virtual zhandle_t * ZookeeperInit(const char *host, watcher_fn fn, int recv_timeout, const clientid_t *clientid, void *context, int flags)
boost::function< void(void)> ZooStateCallback
virtual int ZooDelete(zhandle_t *zh, const char *path, int version)
virtual int ZooCreate(zhandle_t *zh, const char *path, const char *value, int valuelen, const struct ACL_vector *acl, int flags, char *path_buffer, int path_buffer_len)
ZookeeperClient(const char *hostname, const char *servers)
int GetNodeDataSync(const char *path, char *buf, int *buf_len, int *err)
virtual int ZookeeperClose(zhandle_t *zh)
ZookeeperClientImpl(const char *hostname, const char *servers, zookeeper::interface::ZookeeperInterface *zki)
std::unique_ptr< impl::ZookeeperClientImpl > impl_
virtual void ZooSetContext(zhandle_t *zh, void *context)
void ZookeeperWatcher(zhandle_t *zh, int type, int state, const char *path, void *watcherCtx)