OpenSDN source code
cql_if.cc
Go to the documentation of this file.
1 //
2 // Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
3 //
4 
5 #include <assert.h>
6 #include <fstream>
7 
8 #include <boost/foreach.hpp>
9 #include <boost/algorithm/string.hpp>
10 #include <boost/algorithm/string/join.hpp>
11 #include <boost/unordered_map.hpp>
12 #include <boost/system/error_code.hpp>
13 
14 #include <linux/version.h>
15 #if defined(RHEL_MAJOR) && (RHEL_MAJOR >= 9)
16 #include <cassandra/cassandra.h>
17 #else
18 #include <cassandra.h>
19 #endif
20 
21 #include <base/logging.h>
22 #include <base/misc_utils.h>
23 #include <base/task.h>
24 #include <base/timer.h>
25 #include <base/string_util.h>
26 #include <base/address_util.h>
27 #include <io/event_manager.h>
28 #include <database/gendb_if.h>
29 #include <database/gendb_constants.h>
33 
34 using namespace boost::system;
35 
36 #define CQLIF_DEBUG "CqlTraceBufDebug"
37 #define CQLIF_INFO "CqlTraceBufInfo"
38 #define CQLIF_ERR "CqlTraceBufErr"
39 
41  CQLIF_DEBUG, 10000));
43  CQLIF_INFO, 10000));
45  CQLIF_ERR, 20000));
46 
47 #define CQLIF_DEBUG_TRACE(_Msg) \
48  do { \
49  std::stringstream _ss; \
50  _ss << __func__ << ":" << __FILE__ << ":" << \
51  __LINE__ << ": " << _Msg; \
52  CQL_TRACE_TRACE(CqlTraceDebugBuf, _ss.str()); \
53  } while (false) \
54 
55 #define CQLIF_INFO_TRACE(_Msg) \
56  do { \
57  std::stringstream _ss; \
58  _ss << __func__ << ":" << __FILE__ << ":" << \
59  __LINE__ << ": " << _Msg; \
60  CQL_TRACE_TRACE(CqlTraceInfoBuf, _ss.str()); \
61  } while (false) \
62 
63 #define CQLIF_ERR_TRACE(_Msg) \
64  do { \
65  std::stringstream _ss; \
66  _ss << __func__ << ":" << __FILE__ << ":" << \
67  __LINE__ << ": " << _Msg; \
68  CQL_TRACE_TRACE(CqlTraceErrBuf, _ss.str()); \
69  } while (false) \
70 
71 #define CASS_LIB_TRACE(_Level, _Msg) \
72  do { \
73  if (_Level == log4cplus::ERROR_LOG_LEVEL) { \
74  CQL_TRACE_TRACE(CqlTraceErrBuf, _Msg); \
75  } else if (_Level == log4cplus::DEBUG_LOG_LEVEL) { \
76  CQL_TRACE_TRACE(CqlTraceDebugBuf, _Msg); \
77  } else { \
78  CQL_TRACE_TRACE(CqlTraceInfoBuf, _Msg); \
79  } \
80  } while (false) \
81 
82 #define CQLIF_LOG(_Level, _Msg) \
83  do { \
84  if (LoggingDisabled()) break; \
85  log4cplus::Logger logger = log4cplus::Logger::getRoot(); \
86  LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \
87  __LINE__ << ": " << _Msg); \
88  } while (false)
89 
90 #define CQLIF_LOG_ERR(_Msg) \
91  do { \
92  LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \
93  << _Msg); \
94  } while (false)
95 
96 namespace cass {
97 namespace cql {
98 namespace impl {
99 
100 // CassString convenience structure
101 struct CassString {
103  data(NULL),
104  length(0) {
105  }
106 
107  CassString(const char *data) :
108  data(data),
109  length(strlen(data)) {
110  }
111 
112  CassString(const char* data, size_t length) :
113  data(data),
114  length(length) {
115  }
116 
117  const char* data;
118  size_t length;
119 };
120 
121 // CassUuid encode and decode
122 static inline void encode_uuid(char* output, const CassUuid &uuid) {
123  uint64_t time_and_version = uuid.time_and_version;
124  output[3] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
125  time_and_version >>= 8;
126  output[2] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
127  time_and_version >>= 8;
128  output[1] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
129  time_and_version >>= 8;
130  output[0] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
131  time_and_version >>= 8;
132 
133  output[5] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
134  time_and_version >>= 8;
135  output[4] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
136  time_and_version >>= 8;
137 
138  output[7] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
139  time_and_version >>= 8;
140  output[6] = static_cast<char>(time_and_version & 0x000000000000000FFLL);
141 
142  uint64_t clock_seq_and_node = uuid.clock_seq_and_node;
143  for (size_t i = 0; i < 8; ++i) {
144  output[15 - i] = static_cast<char>(clock_seq_and_node & 0x00000000000000FFL);
145  clock_seq_and_node >>= 8;
146  }
147 }
148 
149 static inline char* decode_uuid(char* input, CassUuid* output) {
150  output->time_and_version = static_cast<uint64_t>(static_cast<uint8_t>(input[3]));
151  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[2])) << 8;
152  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[1])) << 16;
153  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[0])) << 24;
154 
155  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[5])) << 32;
156  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[4])) << 40;
157 
158  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[7])) << 48;
159  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[6])) << 56;
160 
161  output->clock_seq_and_node = 0;
162  for (size_t i = 0; i < 8; ++i) {
163  output->clock_seq_and_node |= static_cast<uint64_t>(static_cast<uint8_t>(input[15 - i])) << (8 * i);
164  }
165  return input + 16;
166 }
167 
168 static const char * DbDataType2CassType(
169  const GenDb::DbDataType::type &db_type) {
170  switch (db_type) {
171  case GenDb::DbDataType::AsciiType:
172  return "ascii";
173  case GenDb::DbDataType::LexicalUUIDType:
174  return "uuid";
175  case GenDb::DbDataType::TimeUUIDType:
176  return "timeuuid";
177  case GenDb::DbDataType::Unsigned8Type:
178  case GenDb::DbDataType::Unsigned16Type:
179  case GenDb::DbDataType::Unsigned32Type:
180  return "int";
181  case GenDb::DbDataType::Unsigned64Type:
182  return "bigint";
183  case GenDb::DbDataType::DoubleType:
184  return "double";
185  case GenDb::DbDataType::UTF8Type:
186  return "text";
187  case GenDb::DbDataType::InetType:
188  return "inet";
189  case GenDb::DbDataType::IntegerType:
190  return "varint";
191  case GenDb::DbDataType::BlobType:
192  return "blob";
193  default:
194  assert(false && "Invalid data type");
195  return "";
196  }
197 }
198 
199 static std::string DbDataTypes2CassTypes(
200  const GenDb::DbDataTypeVec &v_db_types) {
201  assert(!v_db_types.empty());
202  return std::string(DbDataType2CassType(v_db_types[0]));
203 }
204 
205 static CassConsistency Db2CassConsistency(
206  GenDb::DbConsistency::type dconsistency) {
207  switch (dconsistency) {
209  return CASS_CONSISTENCY_ANY;
211  return CASS_CONSISTENCY_ONE;
213  return CASS_CONSISTENCY_TWO;
215  return CASS_CONSISTENCY_THREE;
217  return CASS_CONSISTENCY_QUORUM;
219  return CASS_CONSISTENCY_ALL;
221  return CASS_CONSISTENCY_LOCAL_QUORUM;
223  return CASS_CONSISTENCY_EACH_QUORUM;
225  return CASS_CONSISTENCY_SERIAL;
227  return CASS_CONSISTENCY_LOCAL_SERIAL;
229  return CASS_CONSISTENCY_LOCAL_ONE;
231  default:
232  return CASS_CONSISTENCY_UNKNOWN;
233  }
234 }
235 
236 // Cass Query Printer
237 class CassQueryPrinter : public boost::static_visitor<> {
238  public:
239  CassQueryPrinter(std::ostream &os, bool quote_strings) :
240  os_(os),
241  quote_strings_(quote_strings) {
242  }
243  CassQueryPrinter(std::ostream &os) :
244  os_(os),
245  quote_strings_(true) {
246  }
247  template<typename T>
248  void operator()(const T &t) const {
249  os_ << t;
250  }
251  void operator()(const boost::uuids::uuid &tuuid) const {
252  os_ << to_string(tuuid);
253  }
254  // uint8_t must be handled specially because ostream sees
255  // uint8_t as a text type instead of an integer type
256  void operator()(const uint8_t &tu8) const {
257  os_ << (uint16_t)tu8;
258  }
259  void operator()(const std::string &tstring) const {
260  if (quote_strings_) {
261  os_ << "'" << tstring << "'";
262  } else {
263  os_ << tstring;
264  }
265  }
266  // CQL int is 32 bit signed integer
267  void operator()(const uint32_t &tu32) const {
268  os_ << (int32_t)tu32;
269  }
270  // CQL bigint is 64 bit signed long
271  void operator()(const uint64_t &tu64) const {
272  os_ << (int64_t)tu64;
273  }
274  void operator()(const IpAddress &tipaddr) const {
275  os_ << "'" << tipaddr << "'";
276  }
277  std::ostream &os_;
279 };
280 
281 //
282 // CassStatement bind
283 //
284 class CassStatementIndexBinder : public boost::static_visitor<> {
285  public:
287  CassStatement *statement) :
288  cci_(cci),
289  statement_(statement) {
290  }
291  void operator()(const boost::blank &tblank, size_t index) const {
292  assert(false && "CassStatement bind to boost::blank not supported");
293  }
294  void operator()(const std::string &tstring, size_t index) const {
295  CassError rc(cci_->CassStatementBindStringN(statement_, index,
296  tstring.c_str(), tstring.length()));
297  assert(rc == CASS_OK);
298  }
299  void operator()(const boost::uuids::uuid &tuuid, size_t index) const {
300  CassUuid cuuid;
301  decode_uuid((char *)&tuuid, &cuuid);
302  CassError rc(cci_->CassStatementBindUuid(statement_, index, cuuid));
303  assert(rc == CASS_OK);
304  }
305  void operator()(const uint8_t &tu8, size_t index) const {
306  CassError rc(cci_->CassStatementBindInt32(statement_, index, tu8));
307  assert(rc == CASS_OK);
308  }
309  void operator()(const uint16_t &tu16, size_t index) const {
310  CassError rc(cci_->CassStatementBindInt32(statement_, index, tu16));
311  assert(rc == CASS_OK);
312  }
313  void operator()(const uint32_t &tu32, size_t index) const {
314  assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
315  CassError rc(cci_->CassStatementBindInt32(statement_, index,
316  (cass_int32_t)tu32));
317  assert(rc == CASS_OK);
318  }
319  void operator()(const uint64_t &tu64, size_t index) const {
320  assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
321  CassError rc(cci_->CassStatementBindInt64(statement_, index,
322  (cass_int64_t)tu64));
323  assert(rc == CASS_OK);
324  }
325  void operator()(const double &tdouble, size_t index) const {
326  CassError rc(cci_->CassStatementBindDouble(statement_, index,
327  (cass_double_t)tdouble));
328  assert(rc == CASS_OK);
329  }
330  void operator()(const IpAddress &tipaddr, size_t index) const {
331  CassInet cinet;
332  if (tipaddr.is_v4()) {
333  boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
334  cinet = cci_->CassInetInitV4(GENERIC_RAW_ARRAY(tv4.to_bytes()));
335  } else {
336  boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
337  cinet = cci_->CassInetInitV6(GENERIC_RAW_ARRAY(tv6.to_bytes()));
338  }
339  CassError rc(cci_->CassStatementBindInet(statement_, index,
340  cinet));
341  assert(rc == CASS_OK);
342  }
343  void operator()(const GenDb::Blob &tblob, size_t index) const {
344  CassError rc(cci_->CassStatementBindBytes(statement_, index,
345  tblob.data(), tblob.size()));
346  assert(rc == CASS_OK);
347  }
349  CassStatement *statement_;
350 };
351 
352 class CassStatementNameBinder : public boost::static_visitor<> {
353  public:
355  CassStatement *statement) :
356  cci_(cci),
357  statement_(statement) {
358  }
359  void operator()(const boost::blank &tblank, const char *name) const {
360  assert(false && "CassStatement bind to boost::blank not supported");
361  }
362  void operator()(const std::string &tstring, const char *name) const {
363  CassError rc(cci_->CassStatementBindStringByNameN(statement_, name,
364  strlen(name), tstring.c_str(), tstring.length()));
365  assert(rc == CASS_OK);
366  }
367  void operator()(const boost::uuids::uuid &tuuid, const char *name) const {
368  CassUuid cuuid;
369  decode_uuid((char *)&tuuid, &cuuid);
370  CassError rc(cci_->CassStatementBindUuidByName(statement_, name,
371  cuuid));
372  assert(rc == CASS_OK);
373  }
374  void operator()(const uint8_t &tu8, const char *name) const {
375  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
376  tu8));
377  assert(rc == CASS_OK);
378  }
379  void operator()(const uint16_t &tu16, const char *name) const {
380  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
381  tu16));
382  assert(rc == CASS_OK);
383  }
384  void operator()(const uint32_t &tu32, const char *name) const {
385  assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
386  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
387  (cass_int32_t)tu32));
388  assert(rc == CASS_OK);
389  }
390  void operator()(const uint64_t &tu64, const char *name) const {
391  assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
392  CassError rc(cci_->CassStatementBindInt64ByName(statement_, name,
393  (cass_int64_t)tu64));
394  assert(rc == CASS_OK);
395  }
396  void operator()(const double &tdouble, const char *name) const {
397  CassError rc(cci_->CassStatementBindDoubleByName(statement_, name,
398  (cass_double_t)tdouble));
399  assert(rc == CASS_OK);
400  }
401  void operator()(const IpAddress &tipaddr, const char *name) const {
402  CassInet cinet;
403  if (tipaddr.is_v4()) {
404  boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
405  cinet = cci_->CassInetInitV4(GENERIC_RAW_ARRAY(tv4.to_bytes()));
406  } else {
407  boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
408  cinet = cci_->CassInetInitV6(GENERIC_RAW_ARRAY(tv6.to_bytes()));
409  }
410  CassError rc(cci_->CassStatementBindInetByName(statement_, name,
411  cinet));
412  assert(rc == CASS_OK);
413  }
414  void operator()(const GenDb::Blob &tblob, const char *name) const {
415  CassError rc(cci_->CassStatementBindBytesByNameN(statement_, name,
416  strlen(name), tblob.data(), tblob.size()));
417  assert(rc == CASS_OK);
418  }
420  CassStatement *statement_;
421 };
422 
423 static const char * kQCompactionStrategy(
424  "compaction = {'class': "
425  "'org.apache.cassandra.db.compaction.%s'}");
426 static const std::string kQGCGraceSeconds("gc_grace_seconds = 0");
427 static const std::string kQReadRepairChanceDTCS(
428  "read_repair_chance = 0.0");
429 
430 //
431 // Cf2CassCreateTableIfNotExists
432 //
433 
435  const std::string &compaction_strategy) {
436  std::ostringstream query;
437  // Table name
438  query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " ";
439  // Row key
440  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
441  assert(rkeys.size() == 1);
442  query << "(key " << DbDataType2CassType(rkeys[0]) <<
443  " PRIMARY KEY";
444  // Columns
445  const GenDb::NewCf::ColumnMap &cfcolumns(cf.cfcolumns_);
446  assert(!cfcolumns.empty());
447  BOOST_FOREACH(const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
448  cfcolumns) {
449  query << ", \"" << cfcolumn.first << "\" " <<
450  DbDataType2CassType(cfcolumn.second);
451  }
452  char cbuf[512];
453  int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy,
454  compaction_strategy.c_str()));
455  assert(!(n < 0 || n >= (int)sizeof(cbuf)));
456 
457  // The compaction strategy DateTieredCompactionStrategy precludes
458  // using read repair, because of the way timestamps are checked for
459  // DTCS compaction.In this case, you must set read_repair_chance to
460  // zero. For other compaction strategies, read repair should be
461  // enabled with a read_repair_chance value of 0.2 being typical
462  if (compaction_strategy ==
463  GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
464  query << ") WITH " << std::string(cbuf) << " AND " <<
466  } else {
467  query << ") WITH " << std::string(cbuf) << " AND " <<
469  }
470 
471  return query.str();
472 }
473 
475  const std::string &compaction_strategy,
476  boost::system::error_code *ec) {
477  std::ostringstream query;
478 
479  *ec = errc::make_error_code(errc::success);
480  // sanity check - # of clustering_columns cannot be 0
481  // for dynamic tables
482  if (cf.clustering_columns_.size() == 0) {
483  *ec = errc::make_error_code(errc::invalid_argument);
484  return query.str();
485  }
486 
487  // Table name
488  query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " (";
489  // Row key
490  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
491  int rk_size(rkeys.size());
492  for (int i = 0; i < rk_size; i++) {
493  if (i) {
494  int key_num(i + 1);
495  query << "key" << key_num;
496  } else {
497  query << "key";
498  }
499  query << " " << DbDataType2CassType(rkeys[i]) << ", ";
500  }
501  // clustering columns
502  const GenDb::DbDataTypeVec &clustering_columns(cf.clustering_columns_);
503  int ccn_size(clustering_columns.size());
504  for (int i = 0; i < ccn_size; i++) {
505  int cnum(i + 1);
506  query << "column" << cnum << " " <<
507  DbDataType2CassType(clustering_columns[i]) << ", ";
508  }
509  // columns
510  const GenDb::DbDataTypeVec &columns(cf.columns_);
511  int cn_size(columns.size());
512  for (int i = 0; i < cn_size; i++) {
513  int cnum(i + 1 + ccn_size);
514  query << "column" << cnum << " " <<
515  DbDataType2CassType(columns[i]) << ", ";
516  }
517  // Value
518  const GenDb::DbDataTypeVec &values(cf.value_);
519  if (values.size() > 0) {
520  query << "value" << " " << DbDataTypes2CassTypes(values) << ", ";
521  }
522  // Primary Key
523  query << "PRIMARY KEY (";
524  std::ostringstream rkey_ss;
525  for (int i = 0; i < rk_size; i++) {
526  if (i) {
527  int key_num(i + 1);
528  rkey_ss << ", key" << key_num;
529  } else {
530  rkey_ss << "key";
531  }
532  }
533  if (rk_size >= 2) {
534  query << "(" << rkey_ss.str() << "), ";
535  } else {
536  query << rkey_ss.str() << ", ";
537  }
538  for (int i = 0; i < ccn_size; i++) {
539  int cnum(i + 1);
540  if (i) {
541  query << ", ";
542  }
543  query << "column" << cnum;
544  }
545  char cbuf[512];
546  int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy,
547  compaction_strategy.c_str()));
548  assert(!(n < 0 || n >= (int)sizeof(cbuf)));
549 
550  // The compaction strategy DateTieredCompactionStrategy precludes
551  // using read repair, because of the way timestamps are checked for
552  // DTCS compaction.In this case, you must set read_repair_chance to
553  // zero. For other compaction strategies, read repair should be
554  // enabled with a read_repair_chance value of 0.2 being typical
555  if (compaction_strategy ==
556  GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
557  query << ")) WITH " << std::string(cbuf) << " AND " <<
559  } else {
560  query << ")) WITH " << std::string(cbuf) << " AND " <<
562  }
563  return query.str();
564 }
565 
566 static std::string DbColIndexMode2String(
567  const GenDb::ColIndexMode::type index_mode) {
568  switch (index_mode) {
570  return "";
571  case GenDb::ColIndexMode::PREFIX:
572  return "PREFIX";
573  case GenDb::ColIndexMode::CONTAINS:
574  return "CONTAINS";
575  default:
576  assert(false && "INVALID");
577  }
578 }
579 
580 //
581 // CassCreateIndexIfNotExists
582 //
583 
584 std::string CassCreateIndexIfNotExists(const std::string &cfname,
585  const std::string &column, const std::string &indexname,
586  const GenDb::ColIndexMode::type index_mode) {
587  std::ostringstream query;
588 
589  query << "CREATE ";
590  if (index_mode != GenDb::ColIndexMode::NONE) {
591  query << "CUSTOM "; // SASI
592  }
593  // Indexname
594  query << "INDEX IF NOT EXISTS " << indexname << " ";
595  // Index Column
596  query << "ON " << cfname << "(\""<< column <<"\")";
597  // Mode if SASI
598  if (index_mode != GenDb::ColIndexMode::NONE) {
599  query << " USING \'org.apache.cassandra.index.sasi.SASIIndex\' " <<
600  "WITH OPTIONS = {\'mode\': \'" << DbColIndexMode2String(index_mode) << "\'}";
601  }
602  query << ";";
603  return query.str();
604 }
605 
606 //
607 // Cf2CassInsertIntoTable
608 //
609 
610 std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns) {
611  std::ostringstream query;
612  // Table
613  const std::string &table(v_columns->cfname_);
614  query << "INSERT INTO " << table << " (";
615  std::ostringstream values_ss;
616  values_ss << "VALUES (";
617  CassQueryPrinter values_printer(values_ss);
618  // Row keys
619  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
620  int rk_size(rkeys.size());
621  for (int i = 0; i < rk_size; i++) {
622  if (i) {
623  int key_num(i + 1);
624  query << ", key" << key_num;
625  } else {
626  query << "key";
627  }
628  if (i) {
629  values_ss << ", ";
630  }
631  boost::apply_visitor(values_printer, rkeys[i]);
632  }
633  // Columns
634  int cttl(-1);
635  CassQueryPrinter cnames_printer(query, false);
636  BOOST_FOREACH(const GenDb::NewCol &column, v_columns->columns_) {
637  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_SQL);
638  // Column Name
639  query << ", ";
640  const GenDb::DbDataValueVec &cnames(*column.name.get());
641  assert(cnames.size() == 1);
642  // Double quote column name strings
643  query << "\"";
644  boost::apply_visitor(cnames_printer, cnames[0]);
645  query << "\"";
646  // Column Values
647  values_ss << ", ";
648  const GenDb::DbDataValueVec &cvalues(*column.value.get());
649  assert(cvalues.size() == 1);
650  boost::apply_visitor(values_printer, cvalues[0]);
651  // Column TTL
652  cttl = column.ttl;
653  }
654  query << ") ";
655  values_ss << ")";
656  query << values_ss.str();
657  if (cttl > 0) {
658  query << " USING TTL " << cttl;
659  }
660  return query.str();
661 }
662 
663 std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns) {
664  std::ostringstream query;
665  // Table
666  const std::string &table(v_columns->cfname_);
667  query << "INSERT INTO " << table << " (";
668  std::ostringstream values_ss;
669  // Row keys
670  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
671  int rk_size(rkeys.size());
672  CassQueryPrinter values_printer(values_ss);
673  for (int i = 0; i < rk_size; i++) {
674  if (i) {
675  int key_num(i + 1);
676  query << ", key" << key_num;
677  } else {
678  query << "key";
679  }
680  boost::apply_visitor(values_printer, rkeys[i]);
681  values_ss << ", ";
682  }
683  // Columns
684  const GenDb::NewColVec &columns(v_columns->columns_);
685  assert(columns.size() == 1);
686  const GenDb::NewCol &column(columns[0]);
687  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_NOSQL);
688  // Column Names
689  const GenDb::DbDataValueVec &cnames(*column.name.get());
690  int cn_size(cnames.size());
691  for (int i = 0; i < cn_size; i++) {
692  int cnum(i + 1);
693  if (cnames.at(i).which() != GenDb::DB_VALUE_BLANK) {
694  query << ", column" << cnum;
695  boost::apply_visitor(values_printer, cnames[i]);
696  if (i != cn_size - 1) {
697  values_ss << ", ";
698  }
699  }
700  }
701  // Column Values
702  const GenDb::DbDataValueVec &cvalues(*column.value.get());
703  if (cvalues.size() > 0) {
704  query << ", value) VALUES (";
705  values_ss << ", ";
706  boost::apply_visitor(values_printer, cvalues[0]);
707  } else {
708  query << ") VALUES (";
709  }
710  values_ss << ")";
711  query << values_ss.str();
712  if (column.ttl > 0) {
713  query << " USING TTL " << column.ttl;
714  }
715  return query.str();
716 }
717 
718 //
719 // Cf2CassPrepareInsertIntoTable
720 //
721 
723  std::ostringstream query;
724  // Table name
725  query << "INSERT INTO " << cf.cfname_ << " ";
726  // Row key
727  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
728  assert(rkeys.size() == 1);
729  std::ostringstream values_ss;
730  query << "(key";
731  values_ss << ") VALUES (?";
732  // Columns
733  const GenDb::NewCf::ColumnMap &cfcolumns(cf.cfcolumns_);
734  assert(!cfcolumns.empty());
735  BOOST_FOREACH(const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
736  cfcolumns) {
737  query << ", \"" << cfcolumn.first << "\"";
738  values_ss << ", ?";
739  }
740  query << values_ss.str();
741  query << ") USING TTL ?";
742  return query.str();
743 }
744 
746  boost::system::error_code *ec) {
747  std::ostringstream query;
748 
749  *ec = errc::make_error_code(errc::success);
750  // sanity check - # of clustering_columns cannot be 0
751  // for dynamic tables
752  if (cf.clustering_columns_.size() == 0) {
753  *ec = errc::make_error_code(errc::invalid_argument);
754  return query.str();
755  }
756 
757  // Table name
758  query << "INSERT INTO " << cf.cfname_ << " (";
759  // Row key
760  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
761  int rk_size(rkeys.size());
762  std::ostringstream values_ss;
763  for (int i = 0; i < rk_size; i++) {
764  if (i) {
765  int key_num(i + 1);
766  query << "key" << key_num;
767  } else {
768  query << "key";
769  }
770  query << ", ";
771  values_ss << "?, ";
772  }
773  // Clustering Column name
774  const GenDb::DbDataTypeVec &clustering_columns(cf.clustering_columns_);
775  int ccn_size(clustering_columns.size());
776  for (int i = 0; i < ccn_size; i++) {
777  int cnum(i + 1);
778  query << "column" << cnum;
779  values_ss << "?";
780  if (i != ccn_size - 1) {
781  query << ", ";
782  values_ss << ", ";
783  }
784  }
785  // Column name
786  const GenDb::DbDataTypeVec &columns(cf.columns_);
787  int cn_size(columns.size());
788  if (cn_size > 0) {
789  query << ", ";
790  values_ss << ", ";
791  }
792  for (int i = 0; i < cn_size; i++) {
793  int cnum(i + 1 + ccn_size);
794  query << "column" << cnum;
795  values_ss << "?";
796  if (i != cn_size - 1) {
797  query << ", ";
798  values_ss << ", ";
799  }
800  }
801  // Value
802  const GenDb::DbDataTypeVec &values(cf.value_);
803  if (values.size() > 0) {
804  query << ", value";
805  values_ss << ", ?";
806  }
807  query << ") VALUES (";
808  values_ss << ")";
809  query << values_ss.str();
810  query << " USING TTL ?";
811  return query.str();
812 }
813 
814 //
815 // Cf2CassPrepareBind
816 //
817 
819  CassStatement *statement,
820  const GenDb::ColList *v_columns) {
821  CassStatementNameBinder values_binder(cci, statement);
822  // Row keys
823  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
824  int rk_size(rkeys.size());
825  size_t idx(0);
826  for (; (int) idx < rk_size; idx++) {
827  std::string rk_name;
828  if (idx) {
829  int key_num(idx + 1);
830  rk_name = "key" + integerToString(key_num);
831  } else {
832  rk_name = "key";
833  }
834  boost::apply_visitor(boost::bind(values_binder, _1, rk_name.c_str()),
835  rkeys[idx]);
836  }
837  // Columns
838  int cttl(-1);
839  BOOST_FOREACH(const GenDb::NewCol &column, v_columns->columns_) {
840  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_SQL);
841  const GenDb::DbDataValueVec &cnames(*column.name.get());
842  assert(cnames.size() == 1);
843  assert(cnames[0].which() == GenDb::DB_VALUE_STRING);
844  std::string cname(boost::get<std::string>(cnames[0]));
845  const GenDb::DbDataValueVec &cvalues(*column.value.get());
846  assert(cvalues.size() == 1);
847  boost::apply_visitor(boost::bind(values_binder, _1, cname.c_str()),
848  cvalues[0]);
849  // Column TTL
850  cttl = column.ttl;
851  idx++;
852  }
853  CassError rc(cci->CassStatementBindInt32(statement, idx++,
854  (cass_int32_t)cttl));
855  assert(rc == CASS_OK);
856  return true;
857 }
858 
860  CassStatement *statement,
861  const GenDb::ColList *v_columns) {
862  CassStatementIndexBinder values_binder(cci, statement);
863  // Row keys
864  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
865  int rk_size(rkeys.size());
866  size_t idx(0);
867  for (; (int) idx < rk_size; idx++) {
868  boost::apply_visitor(boost::bind(values_binder, _1, idx), rkeys[idx]);
869  }
870  // Columns
871  const GenDb::NewColVec &columns(v_columns->columns_);
872  assert(columns.size() == 1);
873  const GenDb::NewCol &column(columns[0]);
874  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_NOSQL);
875  // Column Names
876  const GenDb::DbDataValueVec &cnames(*column.name.get());
877  int cn_size(cnames.size());
878  for (int i = 0; i < cn_size; i++, idx++) {
879  boost::apply_visitor(boost::bind(values_binder, _1, idx), cnames[i]);
880  }
881  // Column Values
882  const GenDb::DbDataValueVec &cvalues(*column.value.get());
883  if (cvalues.size() > 0) {
884  boost::apply_visitor(boost::bind(values_binder, _1, idx++),
885  cvalues[0]);
886  }
887  CassError rc(cci->CassStatementBindInt32(statement, idx++,
888  (cass_int32_t)column.ttl));
889  assert(rc == CASS_OK);
890  return true;
891 }
892 
893 static std::string CassSelectFromTableInternal(const std::string &table,
894  const std::vector<GenDb::DbDataValueVec> &rkeys,
895  const GenDb::ColumnNameRange &ck_range,
896  const GenDb::FieldNamesToReadVec &read_vec,
897  const GenDb::WhereIndexInfoVec &where_vec) {
898  std::ostringstream query;
899  // Table
900  if (read_vec.empty()) {
901  query << "SELECT * FROM " << table;
902  } else {
903  query << "SELECT ";
904  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
905  it != read_vec.end(); it++) {
906  query << it->get<0>() << ",";
907  bool read_timestamp = it->get<3>();
908  if (read_timestamp) {
909  query << "WRITETIME(" << it->get<0>() << "),";
910  }
911  }
912  query.seekp(-1, query.cur);
913  query << " FROM " << table;
914  }
915  if (rkeys.size() == 1) {
916  GenDb::DbDataValueVec rkey = rkeys[0];
917  int rk_size(rkey.size());
918  CassQueryPrinter cprinter(query);
919  for (int i = 0; i < rk_size; i++) {
920  if (i) {
921  int key_num(i + 1);
922  query << " AND key" << key_num << "=";
923  } else {
924  query << " WHERE key=";
925  }
926  boost::apply_visitor(cprinter, rkey[i]);
927  }
928 
929  } else if (rkeys.size() > 1) {
930  query << " WHERE key IN (";
931  BOOST_FOREACH(GenDb::DbDataValueVec rkey, rkeys) {
932  int rk_size(rkey.size());
933  assert(rk_size == 1);
934  CassQueryPrinter cprinter(query);
935  boost::apply_visitor(cprinter, rkey[0]);
936  query << ",";
937  }
938  query.seekp(-1, query.cur);
939  query << ")";
940  }
941  if (!where_vec.empty()) {
942  for (GenDb::WhereIndexInfoVec::const_iterator it = where_vec.begin();
943  it != where_vec.end(); ++it) {
944  std::ostringstream value_ss;
945  CassQueryPrinter value_vprinter(value_ss);
946  boost::apply_visitor(value_vprinter, it->get<2>());
947  query << " AND";
948  query << " " << it->get<0>();
949  query << " " << GenDb::Op::ToString(it->get<1>());
950  query << " " << value_ss.str();
951  }
952  }
953  if (!ck_range.IsEmpty()) {
954  if (!ck_range.start_.empty()) {
955  int ck_start_size(ck_range.start_.size());
956  std::ostringstream start_ss;
957  start_ss << " " << GenDb::Op::ToString(ck_range.start_op_) << " (";
958  CassQueryPrinter start_vprinter(start_ss);
959  query << " AND (";
960  for (int i = 0; i < ck_start_size; i++) {
961  if (i) {
962  query << ", ";
963  start_ss << ", ";
964  }
965  int cnum(i + 1);
966  query << "column" << cnum;
967  boost::apply_visitor(start_vprinter, ck_range.start_[i]);
968  }
969  query << ")";
970  start_ss << ")";
971  query << start_ss.str();
972  }
973  if (!ck_range.finish_.empty()) {
974  int ck_finish_size(ck_range.finish_.size());
975  std::ostringstream finish_ss;
976  finish_ss << " " << GenDb::Op::ToString(ck_range.finish_op_) <<
977  " (";
978  CassQueryPrinter finish_vprinter(finish_ss);
979  query << " AND (";
980  for (int i = 0; i < ck_finish_size; i++) {
981  if (i) {
982  query << ", ";
983  finish_ss << ", ";
984  }
985  int cnum(i + 1);
986  query << "column" << cnum;
987  boost::apply_visitor(finish_vprinter, ck_range.finish_[i]);
988  }
989  query << ")";
990  finish_ss << ")";
991  query << finish_ss.str();
992  }
993  if (ck_range.count_) {
994  query << " LIMIT " << ck_range.count_;
995  }
996  }
997  if (where_vec.size() > 1) {
998  query << " ALLOW FILTERING";
999  }
1000  return query.str();
1001 }
1002 
1004  const std::string &table, const GenDb::DbDataValueVec &rkeys,
1005  const GenDb::ColumnNameRange &ck_range,
1006  const GenDb::WhereIndexInfoVec &where_vec,
1007  const GenDb::FieldNamesToReadVec &read_vec) {
1008  std::vector<GenDb::DbDataValueVec> rkey_vec;
1009  rkey_vec.push_back(rkeys);
1010  return CassSelectFromTableInternal(table, rkey_vec, ck_range,
1011  read_vec, where_vec);
1012 }
1013 
1014 std::string PartitionKey2CassSelectFromTable(const std::string &table,
1015  const GenDb::DbDataValueVec &rkeys) {
1016  std::vector<GenDb::DbDataValueVec> rkey_vec;
1017  rkey_vec.push_back(rkeys);
1018  return CassSelectFromTableInternal(table, rkey_vec, GenDb::ColumnNameRange(),
1021 }
1022 
1024  const std::string &table, const GenDb::DbDataValueVec &rkeys,
1025  const GenDb::ColumnNameRange &ck_range,
1026  const GenDb::FieldNamesToReadVec &read_vec) {
1027  std::vector<GenDb::DbDataValueVec> rkey_vec;
1028  rkey_vec.push_back(rkeys);
1029  return CassSelectFromTableInternal(table, rkey_vec, ck_range, read_vec,
1031 }
1032 
1034  const std::string &table, const std::vector<GenDb::DbDataValueVec> &rkeys,
1035  const GenDb::ColumnNameRange &ck_range,
1036  const GenDb::FieldNamesToReadVec &read_vec) {
1037  return CassSelectFromTableInternal(table, rkeys, ck_range, read_vec,
1039 }
1040 
1041 std::string CassSelectFromTable(const std::string &table) {
1042  std::vector<GenDb::DbDataValueVec> rkey_vec;
1043  return CassSelectFromTableInternal(table, rkey_vec,
1046 }
1047 
1049  interface::CassLibrary *cci, const CassValue *cvalue) {
1050  if (cci->CassValueIsNull(cvalue)) {
1051  return GenDb::DbDataValue();
1052  }
1053  CassValueType cvtype(cci->GetCassValueType(cvalue));
1054  switch (cvtype) {
1055  case CASS_VALUE_TYPE_ASCII:
1056  case CASS_VALUE_TYPE_VARCHAR:
1057  case CASS_VALUE_TYPE_TEXT: {
1058  CassString ctstring;
1059  CassError rc(cci->CassValueGetString(cvalue, &ctstring.data,
1060  &ctstring.length));
1061  assert(rc == CASS_OK);
1062  return std::string(ctstring.data, ctstring.length);
1063  }
1064  case CASS_VALUE_TYPE_UUID: {
1065  CassUuid ctuuid;
1066  CassError rc(cci->CassValueGetUuid(cvalue, &ctuuid));
1067  assert(rc == CASS_OK);
1069  encode_uuid((char *)&u, ctuuid);
1070  return u;
1071  }
1072  case CASS_VALUE_TYPE_DOUBLE: {
1073  cass_double_t ctdouble;
1074  CassError rc(cci->CassValueGetDouble(cvalue, &ctdouble));
1075  assert(rc == CASS_OK);
1076  return (double)ctdouble;
1077  }
1078  case CASS_VALUE_TYPE_TINY_INT: {
1079  cass_int8_t ct8;
1080  CassError rc(cci->CassValueGetInt8(cvalue, &ct8));
1081  assert(rc == CASS_OK);
1082  return (uint8_t)ct8;
1083  }
1084  case CASS_VALUE_TYPE_SMALL_INT: {
1085  cass_int16_t ct16;
1086  CassError rc(cci->CassValueGetInt16(cvalue, &ct16));
1087  assert(rc == CASS_OK);
1088  return (uint16_t)ct16;
1089  }
1090  case CASS_VALUE_TYPE_INT: {
1091  cass_int32_t ct32;
1092  CassError rc(cci->CassValueGetInt32(cvalue, &ct32));
1093  assert(rc == CASS_OK);
1094  return (uint32_t)ct32;
1095  }
1096  case CASS_VALUE_TYPE_BIGINT: {
1097  cass_int64_t ct64;
1098  CassError rc(cci->CassValueGetInt64(cvalue, &ct64));
1099  assert(rc == CASS_OK);
1100  return (uint64_t)ct64;
1101  }
1102  case CASS_VALUE_TYPE_INET: {
1103  CassInet ctinet;
1104  CassError rc(cci->CassValueGetInet(cvalue, &ctinet));
1105  assert(rc == CASS_OK);
1106  IpAddress ipaddr;
1107  if (ctinet.address_length == CASS_INET_V4_LENGTH) {
1108  Ip4Address::bytes_type ipv4;
1109  memcpy(GENERIC_RAW_ARRAY(ipv4), ctinet.address, CASS_INET_V4_LENGTH);
1110  ipaddr = Ip4Address(ipv4);
1111  } else if (ctinet.address_length == CASS_INET_V6_LENGTH) {
1112  Ip6Address::bytes_type ipv6;
1113  memcpy(GENERIC_RAW_ARRAY(ipv6), ctinet.address, CASS_INET_V6_LENGTH);
1114  ipaddr = Ip6Address(ipv6);
1115  } else {
1116  assert(0);
1117  }
1118  return ipaddr;
1119  }
1120  case CASS_VALUE_TYPE_BLOB: {
1121  const cass_byte_t *bytes(NULL);
1122  size_t size(0);
1123  CassError rc(cci->CassValueGetBytes(cvalue, &bytes, &size));
1124  assert(rc == CASS_OK);
1125  return GenDb::Blob(bytes, size);
1126  }
1127  case CASS_VALUE_TYPE_UNKNOWN: {
1128  // null type
1129  return GenDb::DbDataValue();
1130  }
1131  default: {
1132  CQLIF_ERR_TRACE("Unhandled CassValueType: " << cvtype);
1133  assert(false && "Unhandled value type");
1134  return GenDb::DbDataValue();
1135  }
1136  }
1137 }
1138 
1140  CassSession *session, const char* query,
1141  CassPreparedPtr *prepared) {
1142  CQLIF_DEBUG_TRACE( "PrepareSync: " << query);
1143  CassFuturePtr future(cci->CassSessionPrepare(session, query), cci);
1144  cci->CassFutureWait(future.get());
1145 
1146  CassError rc(cci->CassFutureErrorCode(future.get()));
1147  if (rc != CASS_OK) {
1148  CassString err;
1149  cci->CassFutureErrorMessage(future.get(), &err.data, &err.length);
1150  CQLIF_ERR_TRACE("PrepareSync: " << query << " FAILED: " <<
1151  std::string(err.data, err.length));
1152  } else {
1153  *prepared = CassPreparedPtr(cci->CassFutureGetPrepared(future.get()),
1154  cci);
1155  }
1156  return rc == CASS_OK;
1157 }
1158 
1160  CassSession *session,
1161  CassStatement *qstatement, CassResultPtr *result,
1162  CassConsistency consistency) {
1163  cci->CassStatementSetConsistency(qstatement, consistency);
1164  CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci);
1165  cci->CassFutureWait(future.get());
1166 
1167  CassError rc(cci->CassFutureErrorCode(future.get()));
1168  if (rc != CASS_OK) {
1169  CassString err;
1170  cci->CassFutureErrorMessage(future.get(), &err.data, &err.length);
1171  CQLIF_ERR_TRACE("SyncQuery: FAILED: " <<
1172  std::string(err.data, err.length));
1173  } else {
1174  if (result) {
1175  *result = CassResultPtr(cci->CassFutureGetResult(future.get()),
1176  cci);
1177  }
1178  }
1179  return rc == CASS_OK;
1180 }
1181 
1183  CassSession *session, const char *query, CassConsistency consistency) {
1184  CQLIF_DEBUG_TRACE( "SyncQuery: " << query);
1185  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1186  return ExecuteQuerySyncInternal(cci, session, statement.get(), NULL,
1187  consistency);
1188 }
1189 
1191  CassSession *session, const char *query,
1192  CassResultPtr *result, CassConsistency consistency) {
1193  CQLIF_DEBUG_TRACE( "SyncQuery: " << query);
1194  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1195  return ExecuteQuerySyncInternal(cci, session, statement.get(), result,
1196  consistency);
1197 }
1198 
1200  CassSession *session, CassStatement *statement,
1201  CassConsistency consistency) {
1202  return ExecuteQuerySyncInternal(cci, session, statement, NULL,
1203  consistency);
1204 }
1205 
1207  switch (rc) {
1208  case CASS_OK:
1209  return GenDb::DbOpResult::OK;
1210  case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1211  case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
1212  case CASS_ERROR_LIB_NO_AVAILABLE_IO_THREAD:
1214  default:
1215  return GenDb::DbOpResult::ERROR;
1216  }
1217 }
1218 
1220  CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec,
1221  GenDb::NewColVec *v_columns) {
1222  // Row iterator
1223  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1224  while (cci->CassIteratorNext(riterator.get())) {
1225  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1229  int i = 0;
1230  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1231  it != read_vec.end(); it++) {
1232  bool row_key = it->get<1>();
1233  bool row_column = it->get<2>();
1234  bool read_timestamp = it->get<3>();
1235  if (row_key) {
1236  i++;
1237  continue;
1238  }
1239  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1240  assert(cvalue);
1241  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1242  if (row_column) {
1243  cnames->push_back(db_value);
1244  } else {
1245  values->push_back(db_value);
1246  if (read_timestamp) {
1247  i++;
1248  const CassValue *ctimestamp(cci->CassRowGetColumn(row, i));
1249  assert(ctimestamp);
1250  GenDb::DbDataValue time_value(CassValue2DbDataValue(cci, ctimestamp));
1251  timestamps->push_back(time_value);
1252  }
1253  }
1254  i++;
1255  }
1256  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0, timestamps));
1257  v_columns->push_back(column);
1258  }
1259 }
1260 
1262  CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec,
1263  GenDb::ColListVec *v_col_list) {
1264  std::auto_ptr<GenDb::ColList> col_list;
1265  // Row iterator
1266  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1267  while (cci->CassIteratorNext(riterator.get())) {
1268  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1269  GenDb::DbDataValueVec rkey;
1273  int i = 0;
1274  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1275  it != read_vec.end(); it++) {
1276  bool row_key = it->get<1>();
1277  bool row_column = it->get<2>();
1278  bool read_timestamp = it->get<3>();
1279  if (row_key) {
1280  // Partiiton key
1281  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1282  assert(cvalue);
1283  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1284  rkey.push_back(db_value);
1285  i++;
1286  continue;
1287  }
1288  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1289  assert(cvalue);
1290  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1291  if (row_column) {
1292  cnames->push_back(db_value);
1293  } else {
1294  values->push_back(db_value);
1295  if (read_timestamp) {
1296  i++;
1297  const CassValue *ctimestamp(cci->CassRowGetColumn(row, i));
1298  assert(ctimestamp);
1299  GenDb::DbDataValue time_value(CassValue2DbDataValue(cci, ctimestamp));
1300  timestamps->push_back(time_value);
1301  }
1302  }
1303  i++;
1304  }
1305  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0, timestamps));
1306  // Do we need a new ColList?
1307  if (!col_list.get()) {
1308  col_list.reset(new GenDb::ColList);
1309  col_list->rowkey_ = rkey;
1310  }
1311  if (rkey != col_list->rowkey_) {
1312  v_col_list->push_back(col_list.release());
1313  col_list.reset(new GenDb::ColList);
1314  col_list->rowkey_ = rkey;
1315  }
1316  GenDb::NewColVec *v_columns(&col_list->columns_);
1317  v_columns->push_back(column);
1318  }
1319  if (col_list.get()) {
1320  v_col_list->push_back(col_list.release());
1321  }
1322 }
1323 
1325  CassResultPtr *result, size_t rk_count,
1326  size_t ck_count, GenDb::NewColVec *v_columns) {
1327  // Row iterator
1328  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1329  while (cci->CassIteratorNext(riterator.get())) {
1330  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1331  // Iterate over columns
1332  size_t ccount(cci->CassResultColumnCount(result->get()));
1333  // Clustering key
1335  for (size_t i = rk_count; i < rk_count + ck_count; i++) {
1336  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1337  assert(cvalue);
1338  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1339  cnames->push_back(db_value);
1340  }
1341  // Values
1343  for (size_t i = rk_count + ck_count; i < ccount; i++) {
1344  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1345  assert(cvalue);
1346  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1347  values->push_back(db_value);
1348  }
1349  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0));
1350  v_columns->push_back(column);
1351  }
1352 }
1353 
1355  CassResultPtr *result, size_t rk_count,
1356  size_t ck_count, GenDb::ColListVec *v_col_list) {
1357  std::auto_ptr<GenDb::ColList> col_list;
1358  // Row iterator
1359  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1360  while (cci->CassIteratorNext(riterator.get())) {
1361  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1362  // Iterate over columns
1363  size_t ccount(cci->CassResultColumnCount(result->get()));
1364  // Partiiton key
1365  GenDb::DbDataValueVec rkey;
1366  for (size_t i = 0; i < rk_count; i++) {
1367  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1368  assert(cvalue);
1369  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1370  rkey.push_back(db_value);
1371  }
1372  // Clustering key
1374  for (size_t i = rk_count; i < rk_count + ck_count; i++) {
1375  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1376  assert(cvalue);
1377  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1378  cnames->push_back(db_value);
1379  }
1380  // Values
1382  for (size_t i = rk_count + ck_count; i < ccount; i++) {
1383  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1384  assert(cvalue);
1385  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1386  values->push_back(db_value);
1387  }
1388  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0));
1389  // Do we need a new ColList?
1390  if (!col_list.get()) {
1391  col_list.reset(new GenDb::ColList);
1392  col_list->rowkey_ = rkey;
1393  }
1394  if (rkey != col_list->rowkey_) {
1395  v_col_list->push_back(col_list.release());
1396  col_list.reset(new GenDb::ColList);
1397  col_list->rowkey_ = rkey;
1398  }
1399  GenDb::NewColVec *v_columns(&col_list->columns_);
1400  v_columns->push_back(column);
1401  }
1402  if (col_list.get()) {
1403  v_col_list->push_back(col_list.release());
1404  }
1405 }
1406 
1408  CassResultPtr *result, GenDb::NewColVec *v_columns) {
1409  // Row iterator
1410  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1411  while (cci->CassIteratorNext(riterator.get())) {
1412  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1413  // Iterate over columns
1414  size_t ccount(cci->CassResultColumnCount(result->get()));
1415  for (size_t i = 0; i < ccount; i++) {
1416  CassString cname;
1417  CassError rc(cci->CassResultColumnName(result->get(), i,
1418  &cname.data, &cname.length));
1419  assert(rc == CASS_OK);
1420  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1421  assert(cvalue);
1422  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1423  if (db_value.which() == GenDb::DB_VALUE_BLANK) {
1424  continue;
1425  }
1426  GenDb::NewCol *column(new GenDb::NewCol(
1427  std::string(cname.data, cname.length), db_value, 0));
1428  v_columns->push_back(column);
1429  }
1430  }
1431 }
1432 
1434  CassResultPtr *result, size_t rk_count, GenDb::ColListVec *v_col_list) {
1435  std::auto_ptr<GenDb::ColList> col_list;
1436  // Row iterator
1437  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1438  while (cci->CassIteratorNext(riterator.get())) {
1439  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1440  // Iterate over columns
1441  size_t ccount(cci->CassResultColumnCount(result->get()));
1442  // Partiiton key
1443  GenDb::DbDataValueVec rkey;
1444  for (size_t i = 0; i < rk_count; i++) {
1445  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1446  assert(cvalue);
1447  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1448  rkey.push_back(db_value);
1449  }
1450  // Do we need a new ColList?
1451  if (!col_list.get()) {
1452  col_list.reset(new GenDb::ColList);
1453  col_list->rowkey_ = rkey;
1454  }
1455  if (rkey != col_list->rowkey_) {
1456  v_col_list->push_back(col_list.release());
1457  col_list.reset(new GenDb::ColList);
1458  col_list->rowkey_ = rkey;
1459  }
1460  GenDb::NewColVec *v_columns(&col_list->columns_);
1461  for (size_t i = 0; i < ccount; i++) {
1462  CassString cname;
1463  CassError rc(cci->CassResultColumnName(result->get(), i,
1464  &cname.data, &cname.length));
1465  assert(rc == CASS_OK);
1466  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1467  assert(cvalue);
1468  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1469  if (db_value.which() == GenDb::DB_VALUE_BLANK) {
1470  continue;
1471  }
1472  GenDb::NewCol *column(new GenDb::NewCol(
1473  std::string(cname.data, cname.length), db_value, 0));
1474  v_columns->push_back(column);
1475  }
1476  }
1477  if (col_list.get()) {
1478  v_col_list->push_back(col_list.release());
1479  }
1480 }
1481 
1482 static void OnExecuteQueryAsync(CassFuture *future, void *data) {
1483  assert(data);
1484  std::auto_ptr<CassAsyncQueryContext> ctx(
1485  boost::reinterpret_pointer_cast<CassAsyncQueryContext>(data));
1486  interface::CassLibrary *cci(ctx->cci_);
1487  CassError rc(cci->CassFutureErrorCode(future));
1489  if (rc != CASS_OK) {
1490  CassString err;
1491  cci->CassFutureErrorMessage(future, &err.data, &err.length);
1492  CQLIF_ERR_TRACE("AsyncQuery: " << ctx->query_id_ << " FAILED: "
1493  << std::string(err.data, err.length));
1494  ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1495  return;
1496  }
1497  if (ctx->result_ctx_) {
1498  CassQueryResultContext *rctx(ctx->result_ctx_.get());
1499  CassResultPtr result(cci->CassFutureGetResult(future), cci);
1500  // In case of select parse the results
1501  if (cci->CassResultColumnCount(result.get())) {
1502  std::auto_ptr<GenDb::ColList> col_list(new GenDb::ColList);
1503  col_list->cfname_ = rctx->cf_name_;
1504  col_list->rowkey_ = rctx->row_key_;
1505  if (rctx->is_dynamic_cf_) {
1506  DynamicCfGetResult(cci, &result, rctx->rk_count_,
1507  rctx->ck_count_, &col_list->columns_);
1508  } else {
1509  StaticCfGetResult(cci, &result, &col_list->columns_);
1510  }
1511  ctx->cb_(db_rc, col_list);
1512  return;
1513  }
1514  }
1515  ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1516 }
1517 
1519  CassSession *session, const char *qid, CassStatement *qstatement,
1520  CassConsistency consistency, CassAsyncQueryCallback cb,
1521  CassQueryResultContext *rctx = NULL) {
1522  CQLIF_DEBUG_TRACE( "AsyncQuery: " << qid);
1523  cci->CassStatementSetConsistency(qstatement, consistency);
1524  CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci);
1525  std::auto_ptr<CassAsyncQueryContext> ctx(
1526  new CassAsyncQueryContext(qid, cb, cci, rctx));
1527  cci->CassFutureSetCallback(future.get(), OnExecuteQueryAsync,
1528  ctx.release());
1529 }
1530 
1532  CassSession *session, const char *query,
1533  CassConsistency consistency, CassAsyncQueryCallback cb) {
1534  CQLIF_DEBUG_TRACE( "AsyncQuery: " << query);
1535  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1536  ExecuteQueryAsyncInternal(cci, session, query, statement.get(),
1537  consistency, cb);
1538 }
1539 
1541  CassSession *session, const char *query_id, CassStatement *qstatement,
1542  CassConsistency consistency, CassAsyncQueryCallback cb) {
1543  ExecuteQueryAsyncInternal(cci, session, query_id, qstatement, consistency,
1544  cb);
1545 }
1546 
1548  CassSession *session, const char *query, CassConsistency consistency,
1550  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1551  ExecuteQueryAsyncInternal(cci, session, query, statement.get(),
1552  consistency, cb, rctx);
1553 }
1554 
1556  CassSession *session, const char *query, CassConsistency consistency,
1557  impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count,
1558  const std::string &cfname, const GenDb::DbDataValueVec &row_key) {
1559  std::auto_ptr<CassQueryResultContext> rctx(
1560  new CassQueryResultContext(cfname, true, row_key,
1561  rk_count, ck_count));
1562  ExecuteQueryResultAsync(cci, session, query, consistency, cb,
1563  rctx.release());
1564  return true;
1565 }
1566 
1568  CassSession *session, const char *query,
1569  const GenDb::FieldNamesToReadVec &read_vec,
1570  CassConsistency consistency, GenDb::NewColVec *v_columns) {
1571  CassResultPtr result(NULL, cci);
1572  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1573  consistency));
1574  if (!success) {
1575  return success;
1576  }
1577  DynamicCfGetResult(cci, &result, read_vec, v_columns);
1578  return success;
1579 }
1580 
1582  CassSession *session, const char *query,
1583  const GenDb::FieldNamesToReadVec &read_vec,
1584  CassConsistency consistency, GenDb::ColListVec *v_columns) {
1585  CassResultPtr result(NULL, cci);
1586  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1587  consistency));
1588  if (!success) {
1589  return success;
1590  }
1591  DynamicCfGetResult(cci, &result, read_vec, v_columns);
1592  return success;
1593 }
1594 
1596  CassSession *session, const char *query,
1597  size_t rk_count, size_t ck_count, CassConsistency consistency,
1598  GenDb::NewColVec *v_columns) {
1599  CassResultPtr result(NULL, cci);
1600  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1601  consistency));
1602  if (!success) {
1603  return success;
1604  }
1605  DynamicCfGetResult(cci, &result, rk_count, ck_count, v_columns);
1606  return success;
1607 }
1608 
1610  CassSession *session, const char *query,
1611  size_t rk_count, size_t ck_count, CassConsistency consistency,
1612  GenDb::ColListVec *v_col_list) {
1613  CassResultPtr result(NULL, cci);
1614  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1615  consistency));
1616  if (!success) {
1617  return success;
1618  }
1619  DynamicCfGetResult(cci, &result, rk_count, ck_count, v_col_list);
1620  return success;
1621 }
1622 
1624  CassSession *session, const char *query,
1625  CassConsistency consistency, impl::CassAsyncQueryCallback cb,
1626  const std::string& cfname, const GenDb::DbDataValueVec &row_key) {
1627  std::auto_ptr<CassQueryResultContext> rctx(
1628  new CassQueryResultContext(cfname, false, row_key));
1629  ExecuteQueryResultAsync(cci, session, query, consistency, cb,
1630  rctx.release());
1631  return true;
1632 }
1633 
1635  CassSession *session, const char *query,
1636  CassConsistency consistency, GenDb::NewColVec *v_columns) {
1637  CassResultPtr result(NULL, cci);
1638  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1639  consistency));
1640  if (!success) {
1641  return success;
1642  }
1643  StaticCfGetResult(cci, &result, v_columns);
1644  return success;
1645 }
1646 
1648  CassSession *session, const char *query, size_t rk_count,
1649  CassConsistency consistency, GenDb::ColListVec *v_col_list) {
1650  CassResultPtr result(NULL, cci);
1651  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1652  consistency));
1653  if (!success) {
1654  return success;
1655  }
1656  StaticCfGetResult(cci, &result, rk_count, v_col_list);
1657  return success;
1658 }
1659 
1661  CassFuture *future) {
1662  cci->CassFutureWait(future);
1663  CassError rc(cci->CassFutureErrorCode(future));
1664  if (rc != CASS_OK) {
1665  CassString err;
1666  cci->CassFutureErrorMessage(future, &err.data, &err.length);
1667  CQLIF_ERR_TRACE("SyncWait: FAILED: " <<
1668  std::string(err.data, err.length));
1669  }
1670  return rc == CASS_OK;
1671 }
1672 
1673 static const CassTableMeta * GetCassTableMeta(
1674  interface::CassLibrary *cci, const CassSchemaMeta *schema_meta,
1675  const std::string &keyspace, const std::string &table, bool log_error) {
1676  const CassKeyspaceMeta *keyspace_meta(
1677  cci->CassSchemaMetaKeyspaceByName(schema_meta, keyspace.c_str()));
1678  if (keyspace_meta == NULL) {
1679  if (log_error) {
1680  CQLIF_ERR_TRACE("No keyspace schema: Keyspace: " << keyspace <<
1681  ", Table: " << table);
1682  }
1683  return NULL;
1684  }
1685  std::string table_lower(table);
1686  boost::algorithm::to_lower(table_lower);
1687  const CassTableMeta *table_meta(
1688  cci->CassKeyspaceMetaTableByName(keyspace_meta, table_lower.c_str()));
1689  if (table_meta == NULL) {
1690  if (log_error) {
1691  CQLIF_ERR_TRACE("No table schema: Keyspace: " << keyspace <<
1692  ", Table: " << table_lower);
1693  }
1694  return NULL;
1695  }
1696  return table_meta;
1697 }
1698 
1700  CassSession *session,
1701  const std::string &keyspace, const std::string &table) {
1703  session), cci);
1704  if (schema_meta.get() == NULL) {
1705  CQLIF_DEBUG_TRACE( "No schema meta: Keyspace: " << keyspace <<
1706  ", Table: " << table);
1707  return false;
1708  }
1709  bool log_error(false);
1710  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1711  schema_meta.get(), keyspace, table, log_error));
1712  if (table_meta == NULL) {
1713  return false;
1714  }
1715  return true;
1716 }
1717 
1720  CassSession *session, const std::string &keyspace,
1721  const std::string &table, size_t *ck_count) {
1723  session), cci);
1724  if (schema_meta.get() == NULL) {
1725  CQLIF_ERR_TRACE("No schema meta: Keyspace: " << keyspace <<
1726  ", Table: " << table);
1727  return false;
1728  }
1729  bool log_error(true);
1730  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1731  schema_meta.get(), keyspace, table, log_error));
1732  if (table_meta == NULL) {
1733  return false;
1734  }
1735  *ck_count = cci->CassTableMetaClusteringKeyCount(table_meta);
1736  return true;
1737 }
1738 
1740  interface::CassLibrary *cci, CassSession *session,
1741  const std::string &keyspace, const std::string &table, size_t *rk_count) {
1743  session), cci);
1744  if (schema_meta.get() == NULL) {
1745  CQLIF_ERR_TRACE("No schema meta: Keyspace: " << keyspace <<
1746  ", Table: " << table);
1747  return false;
1748  }
1749  bool log_error(true);
1750  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1751  schema_meta.get(), keyspace, table, log_error));
1752  if (table_meta == NULL) {
1753  return false;
1754  }
1755  *rk_count = cci->CassTableMetaPartitionKeyCount(table_meta);
1756  return true;
1757 }
1758 
1759 static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel) {
1760  switch (clevel) {
1761  case CASS_LOG_DISABLED:
1762  return log4cplus::OFF_LOG_LEVEL;
1763  case CASS_LOG_CRITICAL:
1764  return log4cplus::FATAL_LOG_LEVEL;
1765  case CASS_LOG_ERROR:
1766  return log4cplus::ERROR_LOG_LEVEL;
1767  case CASS_LOG_WARN:
1768  return log4cplus::WARN_LOG_LEVEL;
1769  case CASS_LOG_INFO:
1770  return log4cplus::INFO_LOG_LEVEL;
1771  case CASS_LOG_DEBUG:
1772  return log4cplus::DEBUG_LOG_LEVEL;
1773  case CASS_LOG_TRACE:
1774  return log4cplus::TRACE_LOG_LEVEL;
1775  default:
1776  return log4cplus::ALL_LOG_LEVEL;
1777  }
1778 }
1779 
1780 static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level) {
1781  switch (level) {
1782  case log4cplus::OFF_LOG_LEVEL:
1783  return CASS_LOG_DISABLED;
1784  case log4cplus::FATAL_LOG_LEVEL:
1785  return CASS_LOG_CRITICAL;
1786  case log4cplus::ERROR_LOG_LEVEL:
1787  return CASS_LOG_ERROR;
1788  case log4cplus::WARN_LOG_LEVEL:
1789  return CASS_LOG_WARN;
1790  case log4cplus::INFO_LOG_LEVEL:
1791  return CASS_LOG_INFO;
1792  case log4cplus::DEBUG_LOG_LEVEL:
1793  return CASS_LOG_DEBUG;
1794  case log4cplus::TRACE_LOG_LEVEL:
1795  return CASS_LOG_TRACE;
1796  default:
1797  assert(false && "Invalid Log4Level");
1798  return CASS_LOG_DISABLED;
1799  }
1800 }
1801 
1802 static void CassLibraryLog(const CassLogMessage* message, void *data) {
1803  if (LoggingDisabled()) {
1804  return;
1805  }
1806  log4cplus::LogLevel log4level(Cass2log4Level(message->severity));
1807  std::stringstream buf;
1808  buf << "CassLibrary: " << message->file << ":" << message->line <<
1809  " " << message->function << "] " << message->message;
1810  CASS_LIB_TRACE(log4level, buf.str());
1811 }
1812 
1813 static std::string LoadCertFile(const std::string &ca_certs_path) {
1814  if (ca_certs_path.length() == 0) {
1815  return std::string();
1816  }
1817  std::ifstream file(ca_certs_path.c_str());
1818  if (!file) {
1819  return std::string();
1820  }
1821  std::string content((std::istreambuf_iterator<char>(file)),
1822  std::istreambuf_iterator<char>());
1823  return content;
1824 }
1825 
1826 
1827 class WorkerTask : public Task {
1828  public:
1829  typedef boost::function<void(void)> FunctionPtr;
1830  WorkerTask(FunctionPtr func, int task_id, int task_instance) :
1831  Task(task_id, task_instance),
1832  func_(func) {
1833  }
1834  bool Run() {
1835  func_();
1836  return true;
1837  }
1838  std::string Description() const {
1839  return "cass::cql::impl::WorkerTask";
1840  }
1841  private:
1843 };
1844 
1845 } // namespace impl
1846 
1847 //
1848 // CqlIfImpl
1849 //
1850 CqlIfImpl::CqlIfImpl(EventManager *evm,
1851  const std::vector<std::string> &cassandra_ips,
1852  int cassandra_port,
1853  const std::string &cassandra_user,
1854  const std::string &cassandra_password,
1855  bool use_ssl,
1856  const std::string &ca_certs_path,
1857  interface::CassLibrary *cci) :
1858  evm_(evm),
1859  cci_(cci),
1860  cluster_(cci_->CassClusterNew(), cci_),
1861  ssl_(0, cci_),
1862  session_(cci_->CassSessionNew(), cci_),
1863  schema_session_(cci_->CassSessionNew(), cci_),
1864  keyspace_(),
1865  io_thread_count_(2) {
1866  // Set session state to INIT
1869 
1870  if (cassandra_ips.size() > 0) {
1871  schema_contact_point_ = cassandra_ips[0];
1872  boost::system::error_code ec;
1873  boost::asio::ip::address::from_string(cassandra_ips[0], ec);
1874  if(ec.value() != 0){
1876  evm->io_service(), cassandra_ips[0]);
1877  }
1878  }
1879 
1880  if (use_ssl) {
1881  ssl_ = impl::CassSslPtr(cci->CassSslNew(), cci_);
1882  /* Only verify the certification and not the identity */
1883  cci_->CassSslSetVerifyFlags(ssl_.get(), CASS_SSL_VERIFY_PEER_CERT);
1884  std::string content = impl::LoadCertFile(ca_certs_path);
1885  if (content.length() == 0) {
1886  cci_->CassSslSetVerifyFlags(ssl_.get(), CASS_SSL_VERIFY_NONE);
1887  } else {
1888  cci_->CassSslAddTrustedCert(ssl_.get(), content);
1889  }
1890  cci_->CassClusterSetSsl(cluster_.get(), ssl_.get());
1891  }
1892  std::string contact_points(boost::algorithm::join(cassandra_ips, ","));
1893  cci_->CassClusterSetContactPoints(cluster_.get(), contact_points.c_str());
1894  cci_->CassClusterSetPort(cluster_.get(), cassandra_port);
1895  // Set credentials for plain text authentication
1896  if (!cassandra_user.empty() && !cassandra_password.empty()) {
1897  cci_->CassClusterSetCredentials(cluster_.get(), cassandra_user.c_str(),
1898  cassandra_password.c_str());
1899  }
1900  // Set number of IO threads to half the number of cores
1906 }
1907 
1909  assert(session_state_ == SessionState::INIT ||
1913 }
1914 
1915 bool CqlIfImpl::CreateKeyspaceIfNotExistsSync(const std::string &keyspace,
1916  const std::string &replication_factor, CassConsistency consistency) {
1918  return false;
1919  }
1920  char buf[512];
1921  int n(snprintf(buf, sizeof(buf), kQCreateKeyspaceIfNotExists,
1922  keyspace.c_str(), replication_factor.c_str()));
1923  if (n < 0 || n >= (int)sizeof(buf)) {
1924  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1925  keyspace << ", RF: " << replication_factor);
1926  return false;
1927  }
1928  return impl::ExecuteQuerySync(cci_, schema_session_.get(), buf,
1929  consistency);
1930 }
1931 
1932 bool CqlIfImpl::UseKeyspaceSyncOnSchemaSession(const std::string &keyspace,
1933  CassConsistency consistency) {
1935  return false;
1936  }
1937  char buf[512];
1938  int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str()));
1939  if (n < 0 || n >= (int)sizeof(buf)) {
1940  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1941  keyspace);
1942  return false;
1943  }
1944  bool success(impl::ExecuteQuerySync(cci_, schema_session_.get(), buf,
1945  consistency));
1946  if (!success) {
1947  return false;
1948  }
1949  // Update keyspace
1950  keyspace_ = keyspace;
1951  return success;
1952 }
1953 
1954 bool CqlIfImpl::UseKeyspaceSync(const std::string &keyspace,
1955  CassConsistency consistency) {
1957  return false;
1958  }
1959  char buf[512];
1960  int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str()));
1961  if (n < 0 || n >= (int)sizeof(buf)) {
1962  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1963  keyspace);
1964  return false;
1965  }
1966  bool success(impl::ExecuteQuerySync(cci_, session_.get(), buf,
1967  consistency));
1968  if (!success) {
1969  return false;
1970  }
1971  // Update keyspace
1972  keyspace_ = keyspace;
1973  return success;
1974 }
1975 
1977  const std::string &compaction_strategy, CassConsistency consistency) {
1979  return false;
1980  }
1981  // There are two types of tables - Static (SQL) and Dynamic (NOSQL)
1982  // column family. Static column family has more or less fixed rows,
1983  // and dynamic column family has wide rows
1984  std::string query;
1985  switch (cf.cftype_) {
1987  {
1989  compaction_strategy);
1990  break;
1991  }
1993  {
1994  boost::system::error_code ec;
1996  compaction_strategy, &ec);
1997  if (ec) {
1998  return false;
1999  }
2000  break;
2001  }
2002  default:
2003  {
2004  return false;
2005  }
2006  }
2007  return impl::ExecuteQuerySync(cci_, schema_session_.get(), query.c_str(),
2008  consistency);
2009 }
2010 
2011 bool CqlIfImpl::CreateIndexIfNotExistsSync(const std::string &cfname,
2012  const std::string &column, const std::string &indexname,
2013  CassConsistency consistency, const GenDb::ColIndexMode::type index_mode) {
2015  return false;
2016  }
2017  std::string query(impl::CassCreateIndexIfNotExists(cfname, column,
2018  indexname, index_mode));
2019  return impl::ExecuteQuerySync(cci_, schema_session_.get(), query.c_str(),
2020  consistency);
2021 }
2022 
2024  const std::string &table_name(cf.cfname_);
2025  impl::CassPreparedPtr prepared(NULL, cci_);
2026  // Check if the prepared statement exists
2027  if (GetPrepareInsertIntoTable(table_name, &prepared)) {
2028  return true;
2029  }
2030  bool success(PrepareInsertIntoTableSync(cf, &prepared));
2031  if (!success) {
2032  return success;
2033  }
2034  // Store the prepared statement into the map
2035  std::scoped_lock lock(map_mutex_);
2036  success = (insert_prepared_map_.insert(
2037  std::make_pair(table_name, prepared))).second;
2038  assert(success);
2039  return success;
2040 }
2041 
2042 bool CqlIfImpl::GetPrepareInsertIntoTable(const std::string &table_name,
2043  impl::CassPreparedPtr *prepared) const {
2044  std::scoped_lock lock(map_mutex_);
2045  CassPreparedMapType::const_iterator it(
2046  insert_prepared_map_.find(table_name));
2047  if (it == insert_prepared_map_.end()) {
2048  return false;
2049  }
2050  *prepared = it->second;
2051  return true;
2052 }
2053 
2054 bool CqlIfImpl::IsTablePresent(const std::string &table) {
2056  return false;
2057  }
2059  table);
2060 }
2061 
2062 int CqlIfImpl::IsTableStatic(const std::string &table) {
2064  return false;
2065  }
2066  size_t ck_count;
2068  keyspace_, table, &ck_count)) {
2069  return -1;
2070  }
2071  if (ck_count == 0) {
2072  return 1;
2073  } else {
2074  return 0;
2075  }
2076 }
2077 
2078 bool CqlIfImpl::SelectFromTableAsync(const std::string &cfname,
2079  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
2082  return false;
2083  }
2084  std::string query(impl::PartitionKey2CassSelectFromTable(cfname,rkey));
2085  if (IsTableStatic(cfname) == 1) {
2087  query.c_str(), consistency, cb, cfname.c_str(), rkey);
2088  } else if (IsTableStatic(cfname) == 0) {
2089  size_t rk_count;
2091  keyspace_, cfname, &rk_count));
2092  size_t ck_count;
2094  keyspace_, cfname, &ck_count));
2096  query.c_str(), consistency, cb, rk_count, ck_count,
2097  cfname, rkey);
2098  } else {
2099  return false;
2100  }
2101 }
2102 
2104  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
2105  const GenDb::ColumnNameRange &ck_range,
2106  const GenDb::WhereIndexInfoVec &where_vec,
2107  const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency,
2110  return false;
2111  }
2112  std::string query(
2114  rkey, ck_range, where_vec, read_vec));
2115  assert(IsTableDynamic(cfname));
2116  size_t rk_count;
2118  keyspace_, cfname, &rk_count));
2119  size_t ck_count;
2121  keyspace_, cfname, &ck_count));
2123  query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2124  rkey);
2125 }
2126 
2128  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
2129  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2132  return false;
2133  }
2134  std::string query(
2136  rkey, ck_range));
2137  assert(IsTableDynamic(cfname));
2138  size_t rk_count;
2140  keyspace_, cfname, &rk_count));
2141  size_t ck_count;
2143  keyspace_, cfname, &ck_count));
2145  query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2146  rkey);
2147 }
2148 
2149 bool CqlIfImpl::IsTableDynamic(const std::string &table) {
2150  return !IsTableStatic(table);
2151 }
2152 
2153 bool CqlIfImpl::InsertIntoTableSync(std::auto_ptr<GenDb::ColList> v_columns,
2154  CassConsistency consistency) {
2155  return InsertIntoTableInternal(v_columns, consistency, true, NULL);
2156 }
2157 
2158 bool CqlIfImpl::InsertIntoTableAsync(std::auto_ptr<GenDb::ColList> v_columns,
2159  CassConsistency consistency, impl::CassAsyncQueryCallback cb) {
2160  return InsertIntoTableInternal(v_columns, consistency, false, cb);
2161 }
2162 
2163 bool CqlIfImpl::InsertIntoTablePrepareAsync(std::auto_ptr<GenDb::ColList> v_columns,
2164  CassConsistency consistency, impl::CassAsyncQueryCallback cb) {
2165  return InsertIntoTablePrepareInternal(v_columns, consistency, false,
2166  cb);
2167 }
2168 
2169 // COLLECTOR_GLOBAL_TABLE is dynamic table with 6 indexed columns
2170 // for object-id. Some of these might be blank. This creates tombstones
2171 // so we dont want to prepare for inserts.
2172 bool CqlIfImpl::IsInsertIntoTablePrepareSupported(const std::string &table) {
2173  return (IsTableDynamic(table)) &&
2174  (table != "MessageTablev2");
2175 }
2176 
2177 bool CqlIfImpl::SelectFromTableSync(const std::string &cfname,
2178  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
2179  GenDb::NewColVec *out) {
2181  return false;
2182  }
2183  std::string query(impl::PartitionKey2CassSelectFromTable(cfname,
2184  rkey));
2185  if (IsTableStatic(cfname) == 1) {
2187  query.c_str(), consistency, out);
2188  } else if (IsTableStatic(cfname) == 0){
2189  size_t rk_count;
2191  keyspace_, cfname, &rk_count));
2192  size_t ck_count;
2194  keyspace_, cfname, &ck_count));
2196  query.c_str(), rk_count, ck_count, consistency, out);
2197  } else {
2198  return false;
2199  }
2200 }
2201 
2202 bool CqlIfImpl::SelectFromTableSync(const std::string &cfname,
2203  CassConsistency consistency, GenDb::ColListVec *out) {
2205  return false;
2206  }
2207  std::string query(impl::CassSelectFromTable(cfname));
2208  size_t rk_count;
2210  keyspace_, cfname, &rk_count));
2211  if (IsTableStatic(cfname) == 1) {
2213  query.c_str(), rk_count, consistency, out);
2214  } else if (IsTableStatic(cfname) == 0){
2215  size_t ck_count;
2217  keyspace_, cfname, &ck_count));
2219  query.c_str(), rk_count, ck_count, consistency, out);
2220  } else {
2221  return false;
2222  }
2223 }
2224 
2225 
2227  const GenDb::DbDataValueVec &rkey,
2228  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2229  const GenDb::FieldNamesToReadVec &read_vec,
2230  GenDb::NewColVec *out) {
2232  return false;
2233  }
2234  std::string query(
2236  rkey, ck_range, read_vec));
2237  assert(IsTableDynamic(cfname));
2239  query.c_str(), read_vec, consistency, out);
2240 }
2241 
2243  const std::vector<GenDb::DbDataValueVec> &rkeys,
2244  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2245  const GenDb::FieldNamesToReadVec &read_vec,
2246  GenDb::ColListVec *out) {
2248  return false;
2249  }
2250  std::string query(
2252  rkeys, ck_range, read_vec));
2253  assert(IsTableDynamic(cfname));
2255  query.c_str(), read_vec, consistency, out);
2256 }
2257 
2258 
2260  const GenDb::DbDataValueVec &rkey,
2261  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2262  GenDb::NewColVec *out) {
2264  return false;
2265  }
2266  std::string query(
2268  rkey, ck_range));
2269  assert(IsTableDynamic(cfname));
2270  size_t rk_count;
2272  keyspace_, cfname, &rk_count));
2273  size_t ck_count;
2275  keyspace_, cfname, &ck_count));
2277  query.c_str(), rk_count, ck_count, consistency, out);
2278 }
2279 
2280 void CqlIfImpl::SetRequestTimeout(uint32_t timeout_ms) {
2281  CQLIF_DEBUG_TRACE("request timeout set to " << timeout_ms);
2282  cci_->CassClusterSetRequestTimeout(cluster_.get(), timeout_ms);
2283 }
2284 
2286  /* If Connect is called multiple times due to DB failure,
2287  * then it is better to delete previous session and use
2288  * a new one to avoid gradual leak.
2289  */
2290  schema_session_.reset();
2291  impl::CassSessionPtr schema_session(cci_->CassSessionNew(), cci_);
2292  schema_session_.swap(schema_session);
2293 
2294  // First set the cluster whitelist filtering to just one node
2296  schema_contact_point_.c_str());
2297 
2299  cluster_.get()), cci_);
2300  bool success(impl::SyncFutureWait(cci_, future.get()));
2301  if (success) {
2303  CQLIF_INFO_TRACE("ConnectSchemaSync Done");
2304  } else {
2305  CQLIF_ERR_TRACE("ConnectSchemaSync FAILED");
2306  }
2307 
2309  return success;
2310 }
2311 
2313  /* If Connect is called multiple times due to DB failure,
2314  * then it is better to delete previous session and use
2315  * a new one to avoid gradual leak.
2316  */
2317  session_.reset();
2319  session_.swap(session);
2320 
2322  cluster_.get()), cci_);
2323  bool success(impl::SyncFutureWait(cci_, future.get()));
2324  if (success) {
2326  CQLIF_INFO_TRACE("ConnectSync Done");
2327  } else {
2328  CQLIF_ERR_TRACE("ConnectSync FAILED");
2329  }
2330  return success;
2331 }
2332 
2334  // Close all session and pending queries
2336  bool success(impl::SyncFutureWait(cci_, future.get()));
2337  if (success) {
2339  CQLIF_INFO_TRACE("DisconnectSync Done");
2340  } else {
2341  CQLIF_ERR_TRACE("DisconnectSync FAILED");
2342  }
2343  return success;
2344 }
2345 
2347  // Close the schema session
2349  cci_);
2350  bool success(impl::SyncFutureWait(cci_, future.get()));
2351  if (success) {
2353  CQLIF_INFO_TRACE("DisconnectSchemaSync Done");
2354  } else {
2355  CQLIF_ERR_TRACE("DisconnectSchemaSync FAILED");
2356  }
2357  return success;
2358 }
2359 
2360 bool CqlIfImpl::GetMetrics(Metrics *metrics) const {
2362  return false;
2363  }
2364  CassMetrics cass_metrics;
2365  cci_->CassSessionGetMetrics(session_.get(), &cass_metrics);
2366  // Requests
2367  metrics->requests.min = cass_metrics.requests.min;
2368  metrics->requests.max = cass_metrics.requests.max;
2369  metrics->requests.mean = cass_metrics.requests.mean;
2370  metrics->requests.stddev = cass_metrics.requests.stddev;
2371  metrics->requests.median = cass_metrics.requests.median;
2372  metrics->requests.percentile_75th =
2373  cass_metrics.requests.percentile_75th;
2374  metrics->requests.percentile_95th =
2375  cass_metrics.requests.percentile_95th;
2376  metrics->requests.percentile_98th =
2377  cass_metrics.requests.percentile_98th;
2378  metrics->requests.percentile_99th =
2379  cass_metrics.requests.percentile_99th;
2380  metrics->requests.percentile_999th =
2381  cass_metrics.requests.percentile_999th;
2382  metrics->requests.mean_rate = cass_metrics.requests.mean_rate;
2383  metrics->requests.one_minute_rate =
2384  cass_metrics.requests.one_minute_rate;
2385  metrics->requests.five_minute_rate =
2386  cass_metrics.requests.five_minute_rate;
2387  metrics->requests.fifteen_minute_rate =
2388  cass_metrics.requests.fifteen_minute_rate;
2389  // Stats
2390  metrics->stats.total_connections =
2391  cass_metrics.stats.total_connections;
2392  metrics->stats.available_connections =
2393  cass_metrics.stats.available_connections;
2394  metrics->stats.exceeded_pending_requests_water_mark =
2395  cass_metrics.stats.exceeded_pending_requests_water_mark;
2396  metrics->stats.exceeded_write_bytes_water_mark =
2397  cass_metrics.stats.exceeded_write_bytes_water_mark;
2398  // Errors
2399  metrics->errors.connection_timeouts =
2400  cass_metrics.errors.connection_timeouts;
2401  metrics->errors.pending_request_timeouts =
2402  cass_metrics.errors.pending_request_timeouts;
2403  metrics->errors.request_timeouts =
2404  cass_metrics.errors.request_timeouts;
2405  return true;
2406 }
2407 
2408 bool CqlIfImpl::InsertIntoTableInternal(std::auto_ptr<GenDb::ColList> v_columns,
2409  CassConsistency consistency, bool sync,
2412  return false;
2413  }
2414  std::string query;
2415  if (IsTableStatic(v_columns->cfname_) == 1) {
2416  query = impl::StaticCf2CassInsertIntoTable(v_columns.get());
2417  } else if (IsTableStatic(v_columns->cfname_) == 0){
2418  query = impl::DynamicCf2CassInsertIntoTable(v_columns.get());
2419  } else {
2420  return false;
2421  }
2422  if (sync) {
2423  return impl::ExecuteQuerySync(cci_, session_.get(), query.c_str(),
2424  consistency);
2425  } else {
2426  impl::ExecuteQueryAsync(cci_, session_.get(), query.c_str(),
2427  consistency, cb);
2428  return true;
2429  }
2430 }
2431 
2433  impl::CassPreparedPtr *prepared) {
2435  return false;
2436  }
2437  std::string query;
2438  switch (cf.cftype_) {
2440  {
2442  break;
2443  }
2445  {
2446  boost::system::error_code ec;
2448  if (ec.value() != boost::system::errc::success) {
2449  return false;
2450  }
2451  break;
2452  }
2453  default:
2454  {
2455  return false;
2456  }
2457  }
2458  return impl::PrepareSync(cci_, schema_session_.get(), query.c_str(),
2459  prepared);
2460 }
2461 
2463  std::auto_ptr<GenDb::ColList> v_columns,
2464  CassConsistency consistency, bool sync,
2467  return false;
2468  }
2469  impl::CassPreparedPtr prepared(NULL, cci_);
2470  bool success(GetPrepareInsertIntoTable(v_columns->cfname_, &prepared));
2471  if (!success) {
2472  CQLIF_ERR_TRACE("CassPrepared statement NOT found: " <<
2473  v_columns->cfname_);
2474  return false;
2475  }
2476  impl::CassStatementPtr qstatement(cci_->CassPreparedBind(prepared.get()),
2477  cci_);
2478  if (IsTableStatic(v_columns->cfname_) == 1) {
2479  success = impl::StaticCf2CassPrepareBind(cci_, qstatement.get(),
2480  v_columns.get());
2481  } else if (IsTableStatic(v_columns->cfname_) == 0){
2482  success = impl::DynamicCf2CassPrepareBind(cci_, qstatement.get(),
2483  v_columns.get());
2484  } else {
2485  return false;
2486  }
2487  if (!success) {
2488  return false;
2489  }
2490  if (sync) {
2492  qstatement.get(), consistency);
2493  } else {
2494  std::string qid("Prepare: " + v_columns->cfname_);
2495  impl::ExecuteQueryStatementAsync(cci_, session_.get(), qid.c_str(),
2496  qstatement.get(), consistency, cb);
2497  return true;
2498  }
2499 }
2500 
2502  "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH "
2503  "replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }");
2504 const char * CqlIfImpl::kQUseKeyspace("USE \"%s\"");
2505 const char * CqlIfImpl::kTaskName("CqlIfImpl::Task");
2506 
2507 //
2508 // CqlIf
2509 //
2511  const std::vector<std::string> &cassandra_ips,
2512  int cassandra_port,
2513  const std::string &cassandra_user,
2514  const std::string &cassandra_password,
2515  bool use_ssl,
2516  const std::string &ca_certs_path,
2517  bool create_schema) :
2518  cci_(new interface::CassDatastaxLibrary),
2519  impl_(new CqlIfImpl(evm, cassandra_ips, cassandra_port,
2520  cassandra_user, cassandra_password, use_ssl,
2521  ca_certs_path, cci_.get())),
2522  use_prepared_for_insert_(true),
2523  create_schema_(create_schema) {
2524  // Setup library logging
2525  cci_->CassLogSetLevel(impl::Log4Level2CassLogLevel(
2526  log4cplus::Logger::getRoot().getLogLevel()));
2527  cci_->CassLogSetCallback(impl::CassLibraryLog, NULL);
2528  initialized_ = false;
2529  BOOST_FOREACH(const std::string &cassandra_ip, cassandra_ips) {
2530  boost::system::error_code ec;
2531  boost::asio::ip::address cassandra_addr(
2532  AddressFromString(cassandra_ip, &ec));
2533  GenDb::Endpoint endpoint(cassandra_addr, cassandra_port);
2534  endpoints_.push_back(endpoint);
2535  }
2536 }
2537 
2538 CqlIf::CqlIf() : impl_(NULL) {
2539 }
2540 
2542 }
2543 
2544 // Init/Uninit
2546  if (create_schema_) {
2547  impl_->SetRequestTimeout(GenDb::g_gendb_constants.SCHEMA_REQUEST_TIMEOUT);
2548  bool success(impl_->ConnectSchemaSync());
2549  if (!success) {
2550  return success;
2551  }
2552  }
2553  return impl_->ConnectSync();
2554 }
2555 
2557  if (create_schema_) {
2558  impl_->DisconnectSchemaSync();
2559  }
2560  impl_->DisconnectSync();
2561 }
2562 
2563 void CqlIf::Db_SetInitDone(bool init_done) {
2564  initialized_ = init_done;
2565  // No need for schema session if initialization is done
2566  if (create_schema_) {
2567  if (initialized_) {
2568  impl_->SetRequestTimeout(GenDb::g_gendb_constants.DEFAULT_REQUEST_TIMEOUT);
2569  impl_->DisconnectSchemaSync();
2570  }
2571  }
2572 }
2573 
2574 // Tablespace
2575 bool CqlIf::Db_AddSetTablespace(const std::string &tablespace,
2576  const std::string &replication_factor) {
2577  bool success(impl_->CreateKeyspaceIfNotExistsSync(tablespace,
2578  replication_factor, CASS_CONSISTENCY_QUORUM));
2579  if (!success) {
2581  return success;
2582  }
2583  success = impl_->UseKeyspaceSyncOnSchemaSession(tablespace,
2584  CASS_CONSISTENCY_ONE);
2585  if (!success) {
2587  return success;
2588  }
2589  return success;
2590 }
2591 
2592 bool CqlIf::Db_SetTablespace(const std::string &tablespace) {
2593  bool success(impl_->UseKeyspaceSync(tablespace, CASS_CONSISTENCY_ONE));
2594  if (!success) {
2596  return success;
2597  }
2598  return success;
2599 }
2600 
2601 // Column family
2603  const std::string &compaction_strategy) {
2604  bool success(
2605  impl_->CreateTableIfNotExistsSync(cf, compaction_strategy,
2606  CASS_CONSISTENCY_QUORUM));
2607  if (!success) {
2610  return success;
2611  }
2612  // Locate (add if not exists) INSERT INTO prepare statement
2613  success = impl_->LocatePrepareInsertIntoTable(cf);
2614  if (!success) {
2617  return success;
2618  }
2620  return success;
2621 }
2622 
2624  // Check existence of table
2625  return Db_UseColumnfamily(cf.cfname_);
2626 }
2627 
2628 bool CqlIf::Db_UseColumnfamily(const std::string &cfname) {
2629  // Check existence of table
2630  bool success(impl_->IsTablePresent(cfname));
2631  if (!success) {
2634  return success;
2635  }
2636  IncrementTableReadStats(cfname);
2637  return success;
2638 }
2639 
2640 // Index
2641 bool CqlIf::Db_CreateIndex(const std::string &cfname,
2642  const std::string &column, const std::string &indexname,
2643  const GenDb::ColIndexMode::type index_mode) {
2644  bool success(impl_->CreateIndexIfNotExistsSync(cfname, column, indexname,
2645  CASS_CONSISTENCY_QUORUM, index_mode));
2646  if (!success) {
2649  return success;
2650  }
2651  return success;
2652 }
2653 
2654 // Column
2656  std::auto_ptr<GenDb::ColList> row,
2657  std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb) {
2658  if (drc == GenDb::DbOpResult::OK) {
2659  IncrementTableWriteStats(cfname);
2660  } else if (drc == GenDb::DbOpResult::BACK_PRESSURE) {
2663  } else {
2666  }
2667  if (!cb.empty()) {
2668  cb(drc);
2669  }
2670 }
2671 
2674  GenDb::DbOpResult::type drc, std::auto_ptr<GenDb::ColList> row) :
2675  cb_(cb),
2676  drc_(drc),
2677  row_(row) {
2678  }
2681  std::auto_ptr<GenDb::ColList> row_;
2682 };
2683 
2685  boost::shared_ptr<AsyncRowGetCallbackContext> cb_ctx) {
2686  cb_ctx->cb_(cb_ctx->drc_, cb_ctx->row_);
2687 }
2688 
2690  std::auto_ptr<GenDb::ColList> row, std::string cfname,
2691  GenDb::GenDbIf::DbGetRowCb cb, bool use_worker, int task_id,
2692  int task_instance) {
2693  if (drc == GenDb::DbOpResult::OK) {
2694  IncrementTableReadStats(cfname);
2695  } else if (drc == GenDb::DbOpResult::BACK_PRESSURE) {
2698  } else {
2701  }
2702  if (use_worker) {
2703  if (!cb.empty()) {
2704  boost::shared_ptr<AsyncRowGetCallbackContext> ctx(
2705  new AsyncRowGetCallbackContext(cb, drc, row));
2706  impl::WorkerTask *worker(new impl::WorkerTask(
2707  boost::bind(&AsyncRowGetCompletionCallback, ctx),
2708  task_id, task_instance));
2710  scheduler->Enqueue(worker);
2711  }
2712  } else {
2713  if (!cb.empty()) {
2714  cb(drc, row);
2715  }
2716  }
2717 }
2718 
2720  std::auto_ptr<GenDb::ColList> row, std::string cfname,
2722  OnAsyncRowGetCompletion(drc, row, cfname, cb, false, -1, -2);
2723 }
2724 bool CqlIf::Db_AddColumn(std::auto_ptr<GenDb::ColList> cl,
2725  GenDb::DbConsistency::type dconsistency,
2727  std::string cfname(cl->cfname_);
2728  if (!initialized_) {
2731  return false;
2732  }
2733  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2734  bool success;
2736  impl_->IsInsertIntoTablePrepareSupported(cfname)) {
2737  success = impl_->InsertIntoTablePrepareAsync(cl, consistency,
2738  boost::bind(&CqlIf::OnAsyncColumnAddCompletion, this, _1, _2, cfname,
2739  cb));
2740  } else {
2741  success = impl_->InsertIntoTableAsync(cl, consistency,
2742  boost::bind(&CqlIf::OnAsyncColumnAddCompletion, this, _1, _2, cfname,
2743  cb));
2744  }
2745  if (!success) {
2748  return success;
2749  }
2750  return success;
2751 }
2752 
2753 bool CqlIf::Db_AddColumnSync(std::auto_ptr<GenDb::ColList> cl,
2754  GenDb::DbConsistency::type dconsistency) {
2755  std::string cfname(cl->cfname_);
2756  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2757  bool success(impl_->InsertIntoTableSync(cl, consistency));
2758  if (!success) {
2761  return success;
2762  }
2763  IncrementTableWriteStats(cfname);
2764  return success;
2765 }
2766 
2767 // Read
2768 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2769  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2771  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2772  bool success(impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2773  crange, consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this,
2774  _1, _2, cfname, cb)));
2775  if (!success) {
2778  }
2779  return success;
2780 }
2781 
2782 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2783  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2784  GenDb::DbConsistency::type dconsistency, int task_id, int task_instance,
2786  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2787  bool success(impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2788  crange, consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this,
2789  _1, _2, cfname, cb, true, task_id, task_instance)));
2790  if (!success) {
2793  }
2794  return success;
2795 }
2796 
2797 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2798  const GenDb::DbDataValueVec &rowkey,
2799  GenDb::DbConsistency::type dconsistency,
2801  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2802  bool success(impl_->SelectFromTableAsync(cfname, rowkey,
2803  consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2,
2804  cfname, cb)));
2805  if (!success) {
2808  }
2809  return success;
2810 }
2811 
2812 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2813  const GenDb::DbDataValueVec &rowkey,
2814  GenDb::DbConsistency::type dconsistency, int task_id, int task_instance,
2816  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2817  bool success(impl_->SelectFromTableAsync(cfname, rowkey,
2818  consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2,
2819  cfname, cb, true, task_id, task_instance)));
2820  if (!success) {
2823  }
2824  return success;
2825 }
2826 
2827 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2828  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2829  const GenDb::WhereIndexInfoVec &where_vec,
2831  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2832  bool success(impl_->SelectFromTableClusteringKeyRangeAndIndexValueAsync(cfname,
2833  rowkey, crange, where_vec, GenDb::FieldNamesToReadVec(), consistency,
2834  boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2, cfname, cb)));
2835  if (!success) {
2838  }
2839  return success;
2840 }
2841 
2842 bool CqlIf::Db_GetRow(GenDb::ColList *out, const std::string &cfname,
2843  const GenDb::DbDataValueVec &rowkey,
2844  GenDb::DbConsistency::type dconsistency) {
2845  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2846  bool success(impl_->SelectFromTableSync(cfname, rowkey,
2847  consistency, &out->columns_));
2848  if (!success) {
2851  return success;
2852  }
2853  IncrementTableReadStats(cfname);
2854  return success;
2855 }
2856 
2857 bool CqlIf::Db_GetRow(GenDb::ColList *out, const std::string &cfname,
2858  const GenDb::DbDataValueVec &rowkey,
2859  GenDb::DbConsistency::type dconsistency,
2860  const GenDb::ColumnNameRange &crange,
2861  const GenDb::FieldNamesToReadVec &read_vec) {
2862  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2863  bool success(impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2864  rowkey, crange, consistency, read_vec, &out->columns_));
2865  if (!success) {
2868  return success;
2869  }
2870  IncrementTableReadStats(cfname);
2871  return success;
2872 }
2873 
2874 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2875  const std::vector<GenDb::DbDataValueVec> &v_rowkey) {
2876  BOOST_FOREACH(const GenDb::DbDataValueVec &rkey, v_rowkey) {
2877  std::auto_ptr<GenDb::ColList> v_columns(new GenDb::ColList);
2878  // Partition Key
2879  v_columns->rowkey_ = rkey;
2880  bool success(impl_->SelectFromTableSync(cfname, rkey,
2881  CASS_CONSISTENCY_ONE, &v_columns->columns_));
2882  if (!success) {
2883  CQLIF_ERR_TRACE("SELECT FROM Table: " << cfname << " Partition Key: "
2884  << GenDb::DbDataValueVecToString(rkey) << " FAILED");
2887  return false;
2888  }
2889  out->push_back(v_columns.release());
2890  }
2891  IncrementTableReadStats(cfname, v_rowkey.size());
2892  return true;
2893 }
2894 
2895 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2896  const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2897  const GenDb::ColumnNameRange &crange) {
2898  BOOST_FOREACH(const GenDb::DbDataValueVec &rkey, v_rowkey) {
2899  std::auto_ptr<GenDb::ColList> v_columns(new GenDb::ColList);
2900  // Partition Key
2901  v_columns->rowkey_ = rkey;
2902  bool success(impl_->SelectFromTableClusteringKeyRangeSync(cfname,
2903  rkey, crange, CASS_CONSISTENCY_ONE, &v_columns->columns_));
2904  if (!success) {
2905  CQLIF_ERR_TRACE("SELECT FROM Table: " << cfname << " Partition Key: "
2906  << GenDb::DbDataValueVecToString(rkey) <<
2907  " Clustering Key Range: " << crange.ToString() << " FAILED");
2910  return false;
2911  }
2912  out->push_back(v_columns.release());
2913  }
2914  IncrementTableReadStats(cfname, v_rowkey.size());
2915  return true;
2916 }
2917 
2918 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2919  const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2920  const GenDb::ColumnNameRange &crange,
2921  const GenDb::FieldNamesToReadVec &read_vec,
2922  GenDb::DbConsistency::type dconsistency) {
2923  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2924  bool success(impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2925  v_rowkey, crange, consistency, read_vec, out));
2926  if (!success) {
2929  return false;
2930  }
2931  IncrementTableReadStats(cfname, v_rowkey.size());
2932  return true;
2933 }
2934 
2935 bool CqlIf::Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname,
2936  GenDb::DbConsistency::type dconsistency) {
2937  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2938  bool success(impl_->SelectFromTableSync(cfname, consistency, out));
2939  if (!success) {
2942  return success;
2943  }
2944  IncrementTableReadStats(cfname);
2945  return success;
2946 }
2947 
2948 // Queue
2949 bool CqlIf::Db_GetQueueStats(uint64_t *queue_count,
2950  uint64_t *enqueues) const {
2951  //return impl_->Db_GetQueueStats(queue_count, enqueues);
2952  return true;
2953 }
2954 
2955 void CqlIf::Db_SetQueueWaterMark(bool high, size_t queue_count,
2957  //impl_->Db_SetQueueWaterMark(high, queue_count, cb);
2958 }
2959 
2961  //impl_->Db_ResetQueueWaterMarks();
2962 }
2963 
2964 // Stats
2965 bool CqlIf::Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
2966  GenDb::DbErrors *dbe) {
2967  std::scoped_lock lock(stats_mutex_);
2968  stats_.GetDiffs(vdbti, dbe);
2969  return true;
2970 }
2971 
2972 bool CqlIf::Db_GetCumulativeStats(std::vector<GenDb::DbTableInfo> *vdbti,
2973  GenDb::DbErrors *dbe) const {
2974  std::scoped_lock lock(stats_mutex_);
2975  stats_.GetCumulative(vdbti, dbe);
2976  return true;
2977 }
2978 
2979 bool CqlIf::Db_GetCqlMetrics(Metrics *metrics) const {
2980  return impl_->GetMetrics(metrics);
2981 }
2982 
2983 bool CqlIf::Db_GetCqlStats(DbStats *db_stats) const {
2984  Metrics metrics;
2985  bool success(impl_->GetMetrics(&metrics));
2986  if (!success) {
2987  return success;
2988  }
2989  db_stats->requests_one_minute_rate = metrics.requests.one_minute_rate;
2990  db_stats->stats = metrics.stats;
2991  db_stats->errors = metrics.errors;
2992  return success;
2993 }
2994 
2995 void CqlIf::IncrementTableWriteStats(const std::string &table_name) {
2996  std::scoped_lock lock(stats_mutex_);
2997  stats_.IncrementTableWrite(table_name);
2998 }
2999 
3000 void CqlIf::IncrementTableWriteStats(const std::string &table_name,
3001  uint64_t num_writes) {
3002  std::scoped_lock lock(stats_mutex_);
3003  stats_.IncrementTableWrite(table_name, num_writes);
3004 }
3005 
3006 void CqlIf::IncrementTableWriteFailStats(const std::string &table_name) {
3007  std::scoped_lock lock(stats_mutex_);
3008  stats_.IncrementTableWriteFail(table_name);
3009 }
3010 
3011 void CqlIf::IncrementTableWriteFailStats(const std::string &table_name,
3012  uint64_t num_writes) {
3013  std::scoped_lock lock(stats_mutex_);
3014  stats_.IncrementTableWriteFail(table_name, num_writes);
3015 }
3016 
3018  const std::string &table_name) {
3019  std::scoped_lock lock(stats_mutex_);
3021 }
3022 
3024  const std::string &table_name) {
3025  std::scoped_lock lock(stats_mutex_);
3027 }
3028 
3029 void CqlIf::IncrementTableReadStats(const std::string &table_name) {
3030  std::scoped_lock lock(stats_mutex_);
3031  stats_.IncrementTableRead(table_name);
3032 }
3033 
3034 void CqlIf::IncrementTableReadStats(const std::string &table_name,
3035  uint64_t num_reads) {
3036  std::scoped_lock lock(stats_mutex_);
3037  stats_.IncrementTableRead(table_name, num_reads);
3038 }
3039 
3040 void CqlIf::IncrementTableReadFailStats(const std::string &table_name) {
3041  std::scoped_lock lock(stats_mutex_);
3042  stats_.IncrementTableReadFail(table_name);
3043 }
3044 
3045 void CqlIf::IncrementTableReadFailStats(const std::string &table_name,
3046  uint64_t num_reads) {
3047  std::scoped_lock lock(stats_mutex_);
3048  stats_.IncrementTableReadFail(table_name, num_reads);
3049 }
3050 
3052  std::scoped_lock lock(stats_mutex_);
3053  stats_.IncrementErrors(err_type);
3054 }
3055 
3056 // Connection
3057 std::vector<GenDb::Endpoint> CqlIf::Db_GetEndpoints() const {
3058  return endpoints_;
3059 }
3060 
3061 namespace interface {
3062 
3063 //
3064 // CassDatastaxLibrary
3065 //
3067 }
3068 
3070 }
3071 
3072 // CassCluster
3074  return cass_cluster_new();
3075 }
3076 
3077 void CassDatastaxLibrary::CassClusterFree(CassCluster* cluster) {
3078  cass_cluster_free(cluster);
3079 }
3080 
3082  CassCluster* cluster, const char* contact_points) {
3083  return cass_cluster_set_contact_points(cluster, contact_points);
3084 }
3085 
3086 CassError CassDatastaxLibrary::CassClusterSetPort(CassCluster* cluster,
3087  int port) {
3088  return cass_cluster_set_port(cluster, port);
3089 }
3090 
3091 void CassDatastaxLibrary::CassClusterSetSsl(CassCluster* cluster, CassSsl* ssl) {
3092  cass_cluster_set_ssl(cluster, ssl);
3093 }
3094 
3096  const char* username, const char* password) {
3097  cass_cluster_set_credentials(cluster, username, password);
3098 }
3099 
3101  unsigned num_threads) {
3102  return cass_cluster_set_num_threads_io(cluster, num_threads);
3103 }
3104 
3106  CassCluster* cluster, unsigned num_requests) {
3107  return cass_cluster_set_pending_requests_high_water_mark(cluster,
3108  num_requests);
3109 }
3110 
3112  CassCluster* cluster, unsigned num_requests) {
3113  return cass_cluster_set_pending_requests_low_water_mark(cluster,
3114  num_requests);
3115 }
3116 
3118  CassCluster* cluster, unsigned num_bytes) {
3119  return cass_cluster_set_write_bytes_high_water_mark(cluster, num_bytes);
3120 }
3121 
3123  CassCluster* cluster, unsigned num_bytes) {
3124  return cass_cluster_set_write_bytes_low_water_mark(cluster, num_bytes);
3125 }
3126 
3128  CassCluster* cluster, const char* hosts) {
3129  cass_cluster_set_whitelist_filtering(cluster, hosts);
3130 }
3131 
3132 // CassSsl
3134  return cass_ssl_new();
3135 }
3136 
3138  return cass_ssl_free(ssl);
3139 }
3140 
3142  const std::string &cert) {
3143  return cass_ssl_add_trusted_cert_n(ssl, cert.c_str(), cert.length());
3144 }
3145 
3146 void CassDatastaxLibrary::CassSslSetVerifyFlags(CassSsl* ssl, int flags) {
3147  cass_ssl_set_verify_flags(ssl, flags);
3148 }
3149 
3150 // CassSession
3152  return cass_session_new();
3153 }
3154 
3155 void CassDatastaxLibrary::CassSessionFree(CassSession* session) {
3156  cass_session_free(session);
3157 }
3158 
3160  unsigned timeout_ms) {
3161  return cass_cluster_set_request_timeout(cluster, timeout_ms);
3162 }
3163 
3164 CassFuture* CassDatastaxLibrary::CassSessionConnect(CassSession* session,
3165  const CassCluster* cluster) {
3166  return cass_session_connect(session, cluster);
3167 }
3168 
3169 CassFuture* CassDatastaxLibrary::CassSessionClose(CassSession* session) {
3170  return cass_session_close(session);
3171 }
3172 
3173 CassFuture* CassDatastaxLibrary::CassSessionExecute(CassSession* session,
3174  const CassStatement* statement) {
3175  return cass_session_execute(session, statement);
3176 }
3177 
3179  const CassSession* session) {
3180  return cass_session_get_schema_meta(session);
3181 }
3182 
3183 CassFuture* CassDatastaxLibrary::CassSessionPrepare(CassSession* session,
3184  const char* query) {
3185  return cass_session_prepare(session, query);
3186 }
3187 
3188 void CassDatastaxLibrary::CassSessionGetMetrics(const CassSession* session,
3189  CassMetrics* output) {
3190  cass_session_get_metrics(session, output);
3191 }
3192 
3193 // CassSchema
3195  const CassSchemaMeta* schema_meta) {
3196  cass_schema_meta_free(schema_meta);
3197 }
3198 
3200  const CassSchemaMeta* schema_meta, const char* keyspace) {
3201  return cass_schema_meta_keyspace_by_name(schema_meta, keyspace);
3202 }
3203 
3205  const CassKeyspaceMeta* keyspace_meta, const char* table) {
3206  return cass_keyspace_meta_table_by_name(keyspace_meta, table);
3207 }
3208 
3210  const CassTableMeta* table_meta) {
3211  return cass_table_meta_partition_key_count(table_meta);
3212 }
3213 
3215  const CassTableMeta* table_meta) {
3216  return cass_table_meta_clustering_key_count(table_meta);
3217 }
3218 
3219 // CassFuture
3220 void CassDatastaxLibrary::CassFutureFree(CassFuture* future) {
3221  cass_future_free(future);
3222 }
3223 
3224 CassError CassDatastaxLibrary::CassFutureSetCallback(CassFuture* future,
3225  CassFutureCallback callback, void* data) {
3226  return cass_future_set_callback(future, callback, data);
3227 }
3228 
3229 void CassDatastaxLibrary::CassFutureWait(CassFuture* future) {
3230  cass_future_wait(future);
3231 }
3232 
3234  CassFuture* future) {
3235  return cass_future_get_result(future);
3236 }
3237 
3239  const char** message, size_t* message_length) {
3240  cass_future_error_message(future, message, message_length);
3241 }
3242 
3243 CassError CassDatastaxLibrary::CassFutureErrorCode(CassFuture* future) {
3244  return cass_future_error_code(future);
3245 }
3246 
3248  CassFuture* future) {
3249  return cass_future_get_prepared(future);
3250 }
3251 
3252 // CassResult
3253 void CassDatastaxLibrary::CassResultFree(const CassResult* result) {
3254  cass_result_free(result);
3255 }
3256 
3257 size_t CassDatastaxLibrary::CassResultColumnCount(const CassResult* result) {
3258  return cass_result_column_count(result);
3259 }
3260 
3261 CassError CassDatastaxLibrary::CassResultColumnName(const CassResult *result,
3262  size_t index, const char** name, size_t* name_length) {
3263  return cass_result_column_name(result, index, name, name_length);
3264 }
3265 
3266 // CassIterator
3267 void CassDatastaxLibrary::CassIteratorFree(CassIterator* iterator) {
3268  cass_iterator_free(iterator);
3269 }
3270 
3272  const CassResult* result) {
3273  return cass_iterator_from_result(result);
3274 }
3275 
3276 cass_bool_t CassDatastaxLibrary::CassIteratorNext(CassIterator* iterator) {
3277  return cass_iterator_next(iterator);
3278 }
3279 
3281  const CassIterator* iterator) {
3282  return cass_iterator_get_row(iterator);
3283 }
3284 
3285 // CassStatement
3286 CassStatement* CassDatastaxLibrary::CassStatementNew(const char* query,
3287  size_t parameter_count) {
3288  return cass_statement_new(query, parameter_count);
3289 }
3290 
3291 void CassDatastaxLibrary::CassStatementFree(CassStatement* statement) {
3292  cass_statement_free(statement);
3293 }
3294 
3296  CassStatement* statement, CassConsistency consistency) {
3297  return cass_statement_set_consistency(statement, consistency);
3298 }
3299 
3301  CassStatement* statement,
3302  size_t index, const char* value, size_t value_length) {
3303  return cass_statement_bind_string_n(statement, index, value, value_length);
3304 }
3305 
3306 CassError CassDatastaxLibrary::CassStatementBindInt32(CassStatement* statement,
3307  size_t index, cass_int32_t value) {
3308  return cass_statement_bind_int32(statement, index, value);
3309 }
3310 
3311 CassError CassDatastaxLibrary::CassStatementBindInt64(CassStatement* statement,
3312  size_t index, cass_int64_t value) {
3313  return cass_statement_bind_int64(statement, index, value);
3314 }
3315 
3316 CassError CassDatastaxLibrary::CassStatementBindUuid(CassStatement* statement,
3317  size_t index, CassUuid value) {
3318  return cass_statement_bind_uuid(statement, index, value);
3319 }
3320 
3322  CassStatement* statement, size_t index, cass_double_t value) {
3323  return cass_statement_bind_double(statement, index, value);
3324 }
3325 
3326 CassError CassDatastaxLibrary::CassStatementBindInet(CassStatement* statement,
3327  size_t index, CassInet value) {
3328  return cass_statement_bind_inet(statement, index, value);
3329 }
3330 
3332  CassStatement* statement,
3333  size_t index, const cass_byte_t* value, size_t value_length) {
3334  return cass_statement_bind_bytes(statement, index, value, value_length);
3335 }
3336 
3338  CassStatement* statement,
3339  const char* name, size_t name_length, const char* value,
3340  size_t value_length) {
3341  return cass_statement_bind_string_by_name_n(statement, name, name_length,
3342  value, value_length);
3343 }
3344 
3346  CassStatement* statement, const char* name, cass_int32_t value) {
3347  return cass_statement_bind_int32_by_name(statement, name, value);
3348 }
3349 
3351  CassStatement* statement, const char* name, cass_int64_t value) {
3352  return cass_statement_bind_int64_by_name(statement, name, value);
3353 }
3354 
3356  CassStatement* statement, const char* name, CassUuid value) {
3357  return cass_statement_bind_uuid_by_name(statement, name, value);
3358 }
3359 
3361  CassStatement* statement, const char* name, cass_double_t value) {
3362  return cass_statement_bind_double_by_name(statement, name, value);
3363 }
3364 
3366  CassStatement* statement, const char* name, CassInet value) {
3367  return cass_statement_bind_inet_by_name(statement, name, value);
3368 }
3369 
3371  CassStatement* statement,
3372  const char* name, size_t name_length, const cass_byte_t* value,
3373  size_t value_length) {
3374  return cass_statement_bind_bytes_by_name_n(statement, name, name_length,
3375  value, value_length);
3376 }
3377 
3378 // CassPrepare
3379 void CassDatastaxLibrary::CassPreparedFree(const CassPrepared* prepared) {
3380  cass_prepared_free(prepared);
3381 }
3382 
3384  const CassPrepared* prepared) {
3385  return cass_prepared_bind(prepared);
3386 }
3387 
3388 // CassValue
3389 CassValueType CassDatastaxLibrary::GetCassValueType(const CassValue* value) {
3390  return cass_value_type(value);
3391 }
3392 
3393 CassError CassDatastaxLibrary::CassValueGetString(const CassValue* value,
3394  const char** output, size_t* output_size) {
3395  return cass_value_get_string(value, output, output_size);
3396 }
3397 
3398 CassError CassDatastaxLibrary::CassValueGetInt8(const CassValue* value,
3399  cass_int8_t* output) {
3400  return cass_value_get_int8(value, output);
3401 }
3402 
3403 CassError CassDatastaxLibrary::CassValueGetInt16(const CassValue* value,
3404  cass_int16_t* output) {
3405  return cass_value_get_int16(value, output);
3406 }
3407 
3408 CassError CassDatastaxLibrary::CassValueGetInt32(const CassValue* value,
3409  cass_int32_t* output) {
3410  return cass_value_get_int32(value, output);
3411 }
3412 
3413 CassError CassDatastaxLibrary::CassValueGetInt64(const CassValue* value,
3414  cass_int64_t* output) {
3415  return cass_value_get_int64(value, output);
3416 }
3417 
3418 CassError CassDatastaxLibrary::CassValueGetUuid(const CassValue* value,
3419  CassUuid* output) {
3420  return cass_value_get_uuid(value, output);
3421 }
3422 
3423 CassError CassDatastaxLibrary::CassValueGetDouble(const CassValue* value,
3424  cass_double_t* output) {
3425  return cass_value_get_double(value, output);
3426 }
3427 
3428 CassError CassDatastaxLibrary::CassValueGetInet(const CassValue* value,
3429  CassInet* output) {
3430  return cass_value_get_inet(value, output);
3431 }
3432 
3433 CassError CassDatastaxLibrary::CassValueGetBytes(const CassValue* value,
3434  const cass_byte_t** output, size_t* output_size) {
3435  return cass_value_get_bytes(value, output, output_size);
3436 }
3437 
3438 cass_bool_t CassDatastaxLibrary::CassValueIsNull(const CassValue* value) {
3439  return cass_value_is_null(value);
3440 }
3441 
3442 // CassInet
3444  const cass_uint8_t* address) {
3445  return cass_inet_init_v4(address);
3446 }
3447 
3449  const cass_uint8_t* address) {
3450  return cass_inet_init_v6(address);
3451 }
3452 
3453 // CassRow
3454 const CassValue* CassDatastaxLibrary::CassRowGetColumn(const CassRow* row,
3455  size_t index) {
3456  return cass_row_get_column(row, index);
3457 }
3458 
3459 // CassLog
3460 void CassDatastaxLibrary::CassLogSetLevel(CassLogLevel log_level) {
3461  cass_log_set_level(log_level);
3462 }
3463 
3464 void CassDatastaxLibrary::CassLogSetCallback(CassLogCallback callback,
3465  void* data) {
3466  cass_log_set_callback(callback, data);
3467 }
3468 
3469 } // namespace interface
3470 } // namespace cql
3471 } // namespace cass
boost::asio::ip::address_v6 Ip6Address
Definition: address.h:15
boost::asio::ip::address IpAddress
Definition: address.h:13
boost::asio::ip::address_v4 Ip4Address
Definition: address.h:14
std::string GetHostIp(boost::asio::io_context *io_service, const std::string &hostname)
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
boost::asio::io_context * io_service()
Definition: event_manager.h:42
void GetDiffs(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
void IncrementTableReadFail(const std::string &table_name)
void IncrementTableRead(const std::string &table_name)
void IncrementErrors(IfErrors::Type type)
void IncrementTableWriteBackPressureFail(const std::string &table_name)
void IncrementTableReadBackPressureFail(const std::string &table_name)
void IncrementTableWriteFail(const std::string &table_name)
void GetCumulative(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
void IncrementTableWrite(const std::string &table_name)
boost::function< void(size_t)> DbQueueWaterMarkCb
Definition: gendb_if.h:253
boost::function< void(DbOpResult::type, std::auto_ptr< ColList >)> DbGetRowCb
Definition: gendb_if.h:256
boost::function< void(DbOpResult::type)> DbAddColumnCb
Definition: gendb_if.h:254
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:304
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:642
static TaskScheduler * GetInstance()
Definition: task.cc:554
Task is a class to describe a computational task within OpenSDN control plane applications....
Definition: task.h:79
impl::CassSessionPtr session_
Definition: cql_if_impl.h:348
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2259
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2158
bool DisconnectSchemaSync()
Definition: cql_if.cc:2346
impl::CassClusterPtr cluster_
Definition: cql_if_impl.h:346
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
Definition: cql_if.cc:1915
bool IsTableDynamic(const std::string &table)
Definition: cql_if.cc:2149
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
Definition: cql_if.cc:2432
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:2011
impl::CassSslPtr ssl_
Definition: cql_if_impl.h:347
impl::CassSessionPtr schema_session_
Definition: cql_if_impl.h:349
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2408
static const char * kTaskName
Definition: cql_if_impl.h:333
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2163
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
Definition: cql_if.cc:2042
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2078
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
Definition: cql_if.cc:2226
virtual ~CqlIfImpl()
Definition: cql_if.cc:1908
bool GetMetrics(Metrics *metrics) const
Definition: cql_if.cc:2360
void SetRequestTimeout(uint32_t timeout_ms)
Definition: cql_if.cc:2280
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2462
CassPreparedMapType insert_prepared_map_
Definition: cql_if_impl.h:357
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
Definition: cql_if.cc:1976
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1932
bool ConnectSchemaSync()
Definition: cql_if.cc:2285
static const char * kQCreateKeyspaceIfNotExists
Definition: cql_if_impl.h:331
bool IsTablePresent(const std::string &table)
Definition: cql_if.cc:2054
std::string schema_contact_point_
Definition: cql_if_impl.h:352
interface::CassLibrary * cci_
Definition: cql_if_impl.h:345
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1954
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:2023
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2103
bool IsInsertIntoTablePrepareSupported(const std::string &table)
Definition: cql_if.cc:2172
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2177
std::atomic< SessionState::type > session_state_
Definition: cql_if_impl.h:350
std::string keyspace_
Definition: cql_if_impl.h:353
std::atomic< SessionState::type > schema_session_state_
Definition: cql_if_impl.h:351
std::mutex map_mutex_
Definition: cql_if_impl.h:358
static const char * kQUseKeyspace
Definition: cql_if_impl.h:332
int IsTableStatic(const std::string &table)
Definition: cql_if.cc:2062
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
Definition: cql_if.cc:2153
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2127
boost::scoped_ptr< CqlIfImpl > impl_
Definition: cql_if.h:152
boost::scoped_ptr< interface::CassLibrary > cci_
Definition: cql_if.h:151
bool create_schema_
Definition: cql_if.h:158
void IncrementTableWriteStats(const std::string &table_name)
Definition: cql_if.cc:2995
std::mutex stats_mutex_
Definition: cql_if.h:155
virtual void Db_SetQueueWaterMark(bool high, size_t queue_count, DbQueueWaterMarkCb cb)
Definition: cql_if.cc:2955
virtual bool Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname, const std::vector< GenDb::DbDataValueVec > &v_rowkey)
Definition: cql_if.cc:2874
virtual bool Db_GetCumulativeStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
Definition: cql_if.cc:2972
virtual bool Db_GetRow(GenDb::ColList *out, const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2842
virtual bool Db_CreateIndex(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode=GenDb::ColIndexMode::NONE)
Definition: cql_if.cc:2641
virtual void Db_SetInitDone(bool)
Definition: cql_if.cc:2563
virtual bool Db_GetCqlMetrics(Metrics *metrics) const
Definition: cql_if.cc:2979
GenDb::GenDbIfStats stats_
Definition: cql_if.h:156
virtual bool Db_AddSetTablespace(const std::string &tablespace, const std::string &replication_factor="1")
Definition: cql_if.cc:2575
virtual void Db_ResetQueueWaterMarks()
Definition: cql_if.cc:2960
virtual bool Db_GetCqlStats(DbStats *db_stats) const
Definition: cql_if.cc:2983
void IncrementErrors(GenDb::IfErrors::Type err_type)
Definition: cql_if.cc:3051
virtual bool Db_GetRowAsync(const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbGetRowCb cb)
Definition: cql_if.cc:2797
void IncrementTableWriteFailStats(const std::string &table_name)
Definition: cql_if.cc:3006
virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:2602
virtual bool Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2935
virtual std::vector< GenDb::Endpoint > Db_GetEndpoints() const
Definition: cql_if.cc:3057
virtual bool Db_AddColumn(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbAddColumnCb cb)
Definition: cql_if.cc:2724
virtual bool Db_AddColumnSync(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2753
void IncrementTableReadFailStats(const std::string &table_name)
Definition: cql_if.cc:3040
virtual void Db_Uninit()
Definition: cql_if.cc:2556
void IncrementTableReadBackPressureFailStats(const std::string &table_name)
Definition: cql_if.cc:3023
virtual ~CqlIf()
Definition: cql_if.cc:2541
virtual bool Db_GetQueueStats(uint64_t *queue_count, uint64_t *enqueues) const
Definition: cql_if.cc:2949
virtual bool Db_SetTablespace(const std::string &tablespace)
Definition: cql_if.cc:2592
void IncrementTableWriteBackPressureFailStats(const std::string &table_name)
Definition: cql_if.cc:3017
void OnAsyncColumnAddCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb)
Definition: cql_if.cc:2655
virtual bool Db_GetStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
Definition: cql_if.cc:2965
void OnAsyncRowGetCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbGetRowCb cb)
Definition: cql_if.cc:2719
std::vector< GenDb::Endpoint > endpoints_
Definition: cql_if.h:154
std::atomic< bool > initialized_
Definition: cql_if.h:153
virtual bool Db_Init()
Definition: cql_if.cc:2545
void IncrementTableReadStats(const std::string &table_name)
Definition: cql_if.cc:3029
bool use_prepared_for_insert_
Definition: cql_if.h:157
virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf)
Definition: cql_if.cc:2623
void operator()(const boost::uuids::uuid &tuuid) const
Definition: cql_if.cc:251
void operator()(const uint64_t &tu64) const
Definition: cql_if.cc:271
CassQueryPrinter(std::ostream &os, bool quote_strings)
Definition: cql_if.cc:239
void operator()(const IpAddress &tipaddr) const
Definition: cql_if.cc:274
void operator()(const std::string &tstring) const
Definition: cql_if.cc:259
void operator()(const uint32_t &tu32) const
Definition: cql_if.cc:267
void operator()(const T &t) const
Definition: cql_if.cc:248
void operator()(const uint8_t &tu8) const
Definition: cql_if.cc:256
CassQueryPrinter(std::ostream &os)
Definition: cql_if.cc:243
void operator()(const double &tdouble, size_t index) const
Definition: cql_if.cc:325
void operator()(const IpAddress &tipaddr, size_t index) const
Definition: cql_if.cc:330
void operator()(const boost::uuids::uuid &tuuid, size_t index) const
Definition: cql_if.cc:299
void operator()(const uint16_t &tu16, size_t index) const
Definition: cql_if.cc:309
void operator()(const std::string &tstring, size_t index) const
Definition: cql_if.cc:294
void operator()(const uint32_t &tu32, size_t index) const
Definition: cql_if.cc:313
CassStatementIndexBinder(interface::CassLibrary *cci, CassStatement *statement)
Definition: cql_if.cc:286
void operator()(const uint64_t &tu64, size_t index) const
Definition: cql_if.cc:319
void operator()(const GenDb::Blob &tblob, size_t index) const
Definition: cql_if.cc:343
void operator()(const uint8_t &tu8, size_t index) const
Definition: cql_if.cc:305
void operator()(const boost::blank &tblank, size_t index) const
Definition: cql_if.cc:291
interface::CassLibrary * cci_
Definition: cql_if.cc:348
void operator()(const uint32_t &tu32, const char *name) const
Definition: cql_if.cc:384
CassStatementNameBinder(interface::CassLibrary *cci, CassStatement *statement)
Definition: cql_if.cc:354
void operator()(const std::string &tstring, const char *name) const
Definition: cql_if.cc:362
void operator()(const boost::blank &tblank, const char *name) const
Definition: cql_if.cc:359
void operator()(const boost::uuids::uuid &tuuid, const char *name) const
Definition: cql_if.cc:367
void operator()(const GenDb::Blob &tblob, const char *name) const
Definition: cql_if.cc:414
void operator()(const double &tdouble, const char *name) const
Definition: cql_if.cc:396
void operator()(const uint64_t &tu64, const char *name) const
Definition: cql_if.cc:390
void operator()(const uint8_t &tu8, const char *name) const
Definition: cql_if.cc:374
interface::CassLibrary * cci_
Definition: cql_if.cc:419
void operator()(const IpAddress &tipaddr, const char *name) const
Definition: cql_if.cc:401
void operator()(const uint16_t &tu16, const char *name) const
Definition: cql_if.cc:379
std::string Description() const
Gives a description of the task.
Definition: cql_if.cc:1838
WorkerTask(FunctionPtr func, int task_id, int task_instance)
Definition: cql_if.cc:1830
boost::function< void(void)> FunctionPtr
Definition: cql_if.cc:1829
bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
Definition: cql_if.cc:1834
virtual CassError CassStatementBindDouble(CassStatement *statement, size_t index, cass_double_t value)
Definition: cql_if.cc:3321
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)
Definition: cql_if.cc:3423
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)
Definition: cql_if.cc:3393
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)
Definition: cql_if.cc:3383
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)
Definition: cql_if.cc:3117
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)
Definition: cql_if.cc:3280
virtual void CassFutureFree(CassFuture *future)
Definition: cql_if.cc:3220
virtual CassInet CassInetInitV6(const cass_uint8_t *address)
Definition: cql_if.cc:3448
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)
Definition: cql_if.cc:3418
virtual void CassResultFree(const CassResult *result)
Definition: cql_if.cc:3253
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)
Definition: cql_if.cc:3086
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)
Definition: cql_if.cc:3159
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)
Definition: cql_if.cc:3188
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)
Definition: cql_if.cc:3261
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)
Definition: cql_if.cc:3286
virtual CassInet CassInetInitV4(const cass_uint8_t *address)
Definition: cql_if.cc:3443
virtual CassError CassStatementBindDoubleByName(CassStatement *statement, const char *name, cass_double_t value)
Definition: cql_if.cc:3360
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)
Definition: cql_if.cc:3122
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)
Definition: cql_if.cc:3454
virtual CassError CassStatementBindStringByNameN(CassStatement *statement, const char *name, size_t name_length, const char *value, size_t value_length)
Definition: cql_if.cc:3337
virtual void CassSchemaMetaFree(const CassSchemaMeta *schema_meta)
Definition: cql_if.cc:3194
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)
Definition: cql_if.cc:3111
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)
Definition: cql_if.cc:3398
virtual void CassLogSetLevel(CassLogLevel log_level)
Definition: cql_if.cc:3460
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)
Definition: cql_if.cc:3306
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)
Definition: cql_if.cc:3183
virtual CassError CassStatementBindUuid(CassStatement *statement, size_t index, CassUuid value)
Definition: cql_if.cc:3316
virtual void CassIteratorFree(CassIterator *iterator)
Definition: cql_if.cc:3267
virtual CassIterator * CassIteratorFromResult(const CassResult *result)
Definition: cql_if.cc:3271
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)
Definition: cql_if.cc:3095
virtual CassError CassStatementBindBytes(CassStatement *statement, size_t index, const cass_byte_t *value, size_t value_length)
Definition: cql_if.cc:3331
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)
Definition: cql_if.cc:3141
virtual CassFuture * CassSessionClose(CassSession *session)
Definition: cql_if.cc:3169
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)
Definition: cql_if.cc:3247
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)
Definition: cql_if.cc:3199
virtual void CassPreparedFree(const CassPrepared *prepared)
Definition: cql_if.cc:3379
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)
Definition: cql_if.cc:3276
virtual size_t CassResultColumnCount(const CassResult *result)
Definition: cql_if.cc:3257
virtual void CassClusterFree(CassCluster *cluster)
Definition: cql_if.cc:3077
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)
Definition: cql_if.cc:3413
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)
Definition: cql_if.cc:3105
virtual CassError CassStatementBindStringN(CassStatement *statement, size_t index, const char *value, size_t value_length)
Definition: cql_if.cc:3300
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)
Definition: cql_if.cc:3224
virtual void CassSslFree(CassSsl *ssl)
Definition: cql_if.cc:3137
virtual const CassResult * CassFutureGetResult(CassFuture *future)
Definition: cql_if.cc:3233
virtual CassValueType GetCassValueType(const CassValue *value)
Definition: cql_if.cc:3389
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)
Definition: cql_if.cc:3178
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)
Definition: cql_if.cc:3081
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)
Definition: cql_if.cc:3214
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)
Definition: cql_if.cc:3295
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)
Definition: cql_if.cc:3173
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)
Definition: cql_if.cc:3091
virtual CassError CassStatementBindInet(CassStatement *statement, size_t index, CassInet value)
Definition: cql_if.cc:3326
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)
Definition: cql_if.cc:3164
virtual cass_bool_t CassValueIsNull(const CassValue *value)
Definition: cql_if.cc:3438
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)
Definition: cql_if.cc:3209
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)
Definition: cql_if.cc:3403
virtual void CassFutureWait(CassFuture *future)
Definition: cql_if.cc:3229
virtual CassError CassStatementBindBytesByNameN(CassStatement *statement, const char *name, size_t name_length, const cass_byte_t *value, size_t value_length)
Definition: cql_if.cc:3370
virtual CassError CassStatementBindUuidByName(CassStatement *statement, const char *name, CassUuid value)
Definition: cql_if.cc:3355
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)
Definition: cql_if.cc:3433
virtual CassSession * CassSessionNew()
Definition: cql_if.cc:3151
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)
Definition: cql_if.cc:3238
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)
Definition: cql_if.cc:3146
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)
Definition: cql_if.cc:3100
virtual CassError CassStatementBindInt64(CassStatement *statement, size_t index, cass_int64_t value)
Definition: cql_if.cc:3311
virtual void CassLogSetCallback(CassLogCallback callback, void *data)
Definition: cql_if.cc:3464
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)
Definition: cql_if.cc:3204
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)
Definition: cql_if.cc:3408
virtual CassError CassStatementBindInt64ByName(CassStatement *statement, const char *name, cass_int64_t value)
Definition: cql_if.cc:3350
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)
Definition: cql_if.cc:3428
virtual void CassSessionFree(CassSession *session)
Definition: cql_if.cc:3155
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)
Definition: cql_if.cc:3127
virtual void CassStatementFree(CassStatement *statement)
Definition: cql_if.cc:3291
virtual CassError CassStatementBindInt32ByName(CassStatement *statement, const char *name, cass_int32_t value)
Definition: cql_if.cc:3345
virtual CassError CassStatementBindInetByName(CassStatement *statement, const char *name, CassInet value)
Definition: cql_if.cc:3365
virtual CassCluster * CassClusterNew()
Definition: cql_if.cc:3073
virtual CassError CassFutureErrorCode(CassFuture *future)
Definition: cql_if.cc:3243
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)=0
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)=0
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)=0
virtual size_t CassResultColumnCount(const CassResult *result)=0
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)=0
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)=0
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)=0
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassSsl * CassSslNew()=0
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)=0
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)=0
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)=0
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassSession * CassSessionNew()=0
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)=0
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)=0
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)=0
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)=0
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)=0
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)=0
virtual CassValueType GetCassValueType(const CassValue *value)=0
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)=0
virtual const CassResult * CassFutureGetResult(CassFuture *future)=0
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)=0
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)=0
virtual CassIterator * CassIteratorFromResult(const CassResult *result)=0
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)=0
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)=0
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)=0
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)=0
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)=0
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)=0
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)=0
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)=0
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)=0
virtual void CassFutureWait(CassFuture *future)=0
virtual CassError CassFutureErrorCode(CassFuture *future)=0
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)=0
virtual cass_bool_t CassValueIsNull(const CassValue *value)=0
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)=0
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)=0
virtual CassFuture * CassSessionClose(CassSession *session)=0
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)=0
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)=0
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)=0
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)=0
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)=0
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)=0
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)=0
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)=0
static EventManager evm
#define CQLIF_DEBUG
Definition: cql_if.cc:36
SandeshTraceBufferPtr CqlTraceDebugBuf(SandeshTraceBufferCreate(CQLIF_DEBUG, 10000))
#define CQLIF_INFO_TRACE(_Msg)
Definition: cql_if.cc:55
#define CASS_LIB_TRACE(_Level, _Msg)
Definition: cql_if.cc:71
SandeshTraceBufferPtr CqlTraceErrBuf(SandeshTraceBufferCreate(CQLIF_ERR, 20000))
#define CQLIF_DEBUG_TRACE(_Msg)
Definition: cql_if.cc:47
#define CQLIF_INFO
Definition: cql_if.cc:37
#define CQLIF_ERR_TRACE(_Msg)
Definition: cql_if.cc:63
SandeshTraceBufferPtr CqlTraceInfoBuf(SandeshTraceBufferCreate(CQLIF_INFO, 10000))
#define CQLIF_ERR
Definition: cql_if.cc:38
uint8_t type
Definition: load_balance.h:2
bool LoggingDisabled()
Definition: logging.cc:55
#define GENERIC_RAW_ARRAY(obj)
Definition: misc_utils.h:32
std::string DbDataValueVecToString(const GenDb::DbDataValueVec &v_db_value)
Definition: gendb_if.cc:102
@ DB_VALUE_STRING
Definition: gendb_if.h:89
@ DB_VALUE_BLANK
Definition: gendb_if.h:88
std::vector< DbDataValue > DbDataValueVec
Definition: gendb_if.h:100
boost::variant< boost::blank, std::string, uint64_t, uint32_t, boost::uuids::uuid, uint8_t, uint16_t, double, IpAddress, Blob > DbDataValue
Definition: gendb_if.h:85
std::vector< GenDb::DbDataType::type > DbDataTypeVec
Definition: gendb_if.h:101
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
Definition: gendb_if.h:238
boost::asio::ip::tcp::endpoint Endpoint
Definition: gendb_if.h:240
std::vector< WhereIndexInfo > WhereIndexInfoVec
Definition: gendb_if.h:233
boost::ptr_vector< ColList > ColListVec
Definition: gendb_if.h:208
boost::ptr_vector< NewCol > NewColVec
Definition: gendb_if.h:186
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:610
static std::string LoadCertFile(const std::string &ca_certs_path)
Definition: cql_if.cc:1813
static void OnExecuteQueryAsync(CassFuture *future, void *data)
Definition: cql_if.cc:1482
static void ExecuteQueryAsyncInternal(interface::CassLibrary *cci, CassSession *session, const char *qid, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx=NULL)
Definition: cql_if.cc:1518
static bool DynamicCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
Definition: cql_if.cc:1555
static void CassLibraryLog(const CassLogMessage *message, void *data)
Definition: cql_if.cc:1802
void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, size_t rk_count, GenDb::ColListVec *v_col_list)
Definition: cql_if.cc:1433
static void ExecuteQueryResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx)
Definition: cql_if.cc:1547
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:722
static const char * kQCompactionStrategy("compaction = {'class': " "'org.apache.cassandra.db.compaction.%s'}")
static GenDb::DbDataValue CassValue2DbDataValue(interface::CassLibrary *cci, const CassValue *cvalue)
Definition: cql_if.cc:1048
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1023
static bool GetCassTablePartitionKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *rk_count)
Definition: cql_if.cc:1739
bool DynamicCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
Definition: cql_if.cc:859
static std::string DbDataTypes2CassTypes(const GenDb::DbDataTypeVec &v_db_types)
Definition: cql_if.cc:199
static std::string CassSelectFromTableInternal(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec, const GenDb::WhereIndexInfoVec &where_vec)
Definition: cql_if.cc:893
CassSharedPtr< CassSsl > CassSslPtr
Definition: cql_if_impl.h:181
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, size_t rk_count, size_t ck_count, CassConsistency consistency, GenDb::ColListVec *v_col_list)
Definition: cql_if.cc:1609
static bool GetCassTableClusteringKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *ck_count)
Definition: cql_if.cc:1718
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
Definition: cql_if.cc:745
CassSharedPtr< const CassPrepared > CassPreparedPtr
Definition: cql_if_impl.h:187
static bool PrepareSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassPreparedPtr *prepared)
Definition: cql_if.cc:1139
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1033
static bool StaticCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
Definition: cql_if.cc:1623
static bool IsCassTableMetaPresent(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table)
Definition: cql_if.cc:1699
static std::string DbColIndexMode2String(const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:566
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
Definition: cql_if.cc:474
static CassConsistency Db2CassConsistency(GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:205
static const std::string kQReadRepairChanceDTCS("read_repair_chance = 0.0")
CassSharedPtr< const CassResult > CassResultPtr
Definition: cql_if_impl.h:185
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, size_t rk_count, CassConsistency consistency, GenDb::ColListVec *v_col_list)
Definition: cql_if.cc:1647
static bool ExecuteQueryResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassResultPtr *result, CassConsistency consistency)
Definition: cql_if.cc:1190
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:584
std::string CassSelectFromTable(const std::string &table)
Definition: cql_if.cc:1041
static void ExecuteQueryStatementAsync(interface::CassLibrary *cci, CassSession *session, const char *query_id, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb)
Definition: cql_if.cc:1540
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1003
static const char * DbDataType2CassType(const GenDb::DbDataType::type &db_type)
Definition: cql_if.cc:168
static void ExecuteQueryAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb)
Definition: cql_if.cc:1531
static void encode_uuid(char *output, const CassUuid &uuid)
Definition: cql_if.cc:122
bool StaticCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
Definition: cql_if.cc:818
static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc)
Definition: cql_if.cc:1206
static char * decode_uuid(char *input, CassUuid *output)
Definition: cql_if.cc:149
void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, size_t rk_count, size_t ck_count, GenDb::ColListVec *v_col_list)
Definition: cql_if.cc:1354
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:663
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1634
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
Definition: cql_if_impl.h:191
static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel)
Definition: cql_if.cc:1759
static bool SyncFutureWait(interface::CassLibrary *cci, CassFuture *future)
Definition: cql_if.cc:1660
static const CassTableMeta * GetCassTableMeta(interface::CassLibrary *cci, const CassSchemaMeta *schema_meta, const std::string &keyspace, const std::string &table, bool log_error)
Definition: cql_if.cc:1673
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:434
static bool ExecuteQueryStatementSync(interface::CassLibrary *cci, CassSession *session, CassStatement *statement, CassConsistency consistency)
Definition: cql_if.cc:1199
static bool ExecuteQuerySync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency)
Definition: cql_if.cc:1182
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1567
static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level)
Definition: cql_if.cc:1780
static bool ExecuteQuerySyncInternal(interface::CassLibrary *cci, CassSession *session, CassStatement *qstatement, CassResultPtr *result, CassConsistency consistency)
Definition: cql_if.cc:1159
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
Definition: cql_if.cc:1014
static const std::string kQGCGraceSeconds("gc_grace_seconds = 0")
static void AsyncRowGetCompletionCallback(boost::shared_ptr< AsyncRowGetCallbackContext > cb_ctx)
Definition: cql_if.cc:2684
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
Definition: sandesh_trace.h:18
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
Definition: sandesh_trace.h:46
static const std::string integerToString(const NumberType &num)
Definition: string_util.h:19
size_t size() const
Definition: gendb_if.h:63
const uint8_t * data() const
Definition: gendb_if.h:60
std::string cfname_
Definition: gendb_if.h:197
NewColVec columns_
Definition: gendb_if.h:199
DbDataValueVec rowkey_
Definition: gendb_if.h:198
DbDataValueVec finish_
Definition: gendb_if.h:226
bool IsEmpty() const
Definition: gendb_if.h:217
DbDataValueVec start_
Definition: gendb_if.h:225
std::string ToString() const
Definition: gendb_if.cc:60
ColumnMap cfcolumns_
Definition: gendb_if.h:140
DbDataTypeVec columns_
Definition: gendb_if.h:142
DbDataTypeVec value_
Definition: gendb_if.h:143
std::map< std::string, GenDb::DbDataType::type > ColumnMap
Definition: gendb_if.h:113
ColumnFamilyType cftype_
Definition: gendb_if.h:138
@ COLUMN_FAMILY_NOSQL
Definition: gendb_if.h:111
@ COLUMN_FAMILY_SQL
Definition: gendb_if.h:110
std::string cfname_
Definition: gendb_if.h:137
DbDataTypeVec clustering_columns_
Definition: gendb_if.h:141
DbDataTypeVec partition_keys_
Definition: gendb_if.h:139
boost::scoped_ptr< DbDataValueVec > value
Definition: gendb_if.h:181
boost::scoped_ptr< DbDataValueVec > name
Definition: gendb_if.h:180
NewCf::ColumnFamilyType cftype_
Definition: gendb_if.h:179
static std::string ToString(Op::type op)
Definition: gendb_if.cc:81
std::auto_ptr< GenDb::ColList > row_
Definition: cql_if.cc:2681
GenDb::GenDbIf::DbGetRowCb cb_
Definition: cql_if.cc:2679
AsyncRowGetCallbackContext(GenDb::GenDbIf::DbGetRowCb cb, GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row)
Definition: cql_if.cc:2673
GenDb::DbOpResult::type drc_
Definition: cql_if.cc:2680
CassString(const char *data)
Definition: cql_if.cc:107
CassString(const char *data, size_t length)
Definition: cql_if.cc:112
boost::uuids::uuid uuid