OpenSDN source code
cql_if.cc
Go to the documentation of this file.
1 //
2 // Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
3 //
4 
5 #include <assert.h>
6 #include <fstream>
7 
8 #include <tbb/atomic.h>
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string.hpp>
11 #include <boost/algorithm/string/join.hpp>
12 #include <boost/unordered_map.hpp>
13 #include <boost/system/error_code.hpp>
14 
15 #include <linux/version.h>
16 #if defined(RHEL_MAJOR) && (RHEL_MAJOR >= 9)
17 #include <cassandra/cassandra.h>
18 #else
19 #include <cassandra.h>
20 #endif
21 
22 #include <base/logging.h>
23 #include <base/misc_utils.h>
24 #include <base/task.h>
25 #include <base/timer.h>
26 #include <base/string_util.h>
27 #include <base/address_util.h>
28 #include <io/event_manager.h>
29 #include <database/gendb_if.h>
30 #include <database/gendb_constants.h>
34 
35 using namespace boost::system;
36 
37 #define CQLIF_DEBUG "CqlTraceBufDebug"
38 #define CQLIF_INFO "CqlTraceBufInfo"
39 #define CQLIF_ERR "CqlTraceBufErr"
40 
42  CQLIF_DEBUG, 10000));
44  CQLIF_INFO, 10000));
46  CQLIF_ERR, 20000));
47 
48 #define CQLIF_DEBUG_TRACE(_Msg) \
49  do { \
50  std::stringstream _ss; \
51  _ss << __func__ << ":" << __FILE__ << ":" << \
52  __LINE__ << ": " << _Msg; \
53  CQL_TRACE_TRACE(CqlTraceDebugBuf, _ss.str()); \
54  } while (false) \
55 
56 #define CQLIF_INFO_TRACE(_Msg) \
57  do { \
58  std::stringstream _ss; \
59  _ss << __func__ << ":" << __FILE__ << ":" << \
60  __LINE__ << ": " << _Msg; \
61  CQL_TRACE_TRACE(CqlTraceInfoBuf, _ss.str()); \
62  } while (false) \
63 
64 #define CQLIF_ERR_TRACE(_Msg) \
65  do { \
66  std::stringstream _ss; \
67  _ss << __func__ << ":" << __FILE__ << ":" << \
68  __LINE__ << ": " << _Msg; \
69  CQL_TRACE_TRACE(CqlTraceErrBuf, _ss.str()); \
70  } while (false) \
71 
72 #define CASS_LIB_TRACE(_Level, _Msg) \
73  do { \
74  if (_Level == log4cplus::ERROR_LOG_LEVEL) { \
75  CQL_TRACE_TRACE(CqlTraceErrBuf, _Msg); \
76  } else if (_Level == log4cplus::DEBUG_LOG_LEVEL) { \
77  CQL_TRACE_TRACE(CqlTraceDebugBuf, _Msg); \
78  } else { \
79  CQL_TRACE_TRACE(CqlTraceInfoBuf, _Msg); \
80  } \
81  } while (false) \
82 
83 #define CQLIF_LOG(_Level, _Msg) \
84  do { \
85  if (LoggingDisabled()) break; \
86  log4cplus::Logger logger = log4cplus::Logger::getRoot(); \
87  LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \
88  __LINE__ << ": " << _Msg); \
89  } while (false)
90 
91 #define CQLIF_LOG_ERR(_Msg) \
92  do { \
93  LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \
94  << _Msg); \
95  } while (false)
96 
97 namespace cass {
98 namespace cql {
99 namespace impl {
100 
101 // CassString convenience structure
102 struct CassString {
104  data(NULL),
105  length(0) {
106  }
107 
108  CassString(const char *data) :
109  data(data),
110  length(strlen(data)) {
111  }
112 
113  CassString(const char* data, size_t length) :
114  data(data),
115  length(length) {
116  }
117 
118  const char* data;
119  size_t length;
120 };
121 
122 // CassUuid encode and decode
123 static inline void encode_uuid(char* output, const CassUuid &uuid) {
124  uint64_t time_and_version = uuid.time_and_version;
125  output[3] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
126  time_and_version >>= 8;
127  output[2] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
128  time_and_version >>= 8;
129  output[1] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
130  time_and_version >>= 8;
131  output[0] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
132  time_and_version >>= 8;
133 
134  output[5] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
135  time_and_version >>= 8;
136  output[4] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
137  time_and_version >>= 8;
138 
139  output[7] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
140  time_and_version >>= 8;
141  output[6] = static_cast<char>(time_and_version & 0x000000000000000FFLL);
142 
143  uint64_t clock_seq_and_node = uuid.clock_seq_and_node;
144  for (size_t i = 0; i < 8; ++i) {
145  output[15 - i] = static_cast<char>(clock_seq_and_node & 0x00000000000000FFL);
146  clock_seq_and_node >>= 8;
147  }
148 }
149 
150 static inline char* decode_uuid(char* input, CassUuid* output) {
151  output->time_and_version = static_cast<uint64_t>(static_cast<uint8_t>(input[3]));
152  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[2])) << 8;
153  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[1])) << 16;
154  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[0])) << 24;
155 
156  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[5])) << 32;
157  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[4])) << 40;
158 
159  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[7])) << 48;
160  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[6])) << 56;
161 
162  output->clock_seq_and_node = 0;
163  for (size_t i = 0; i < 8; ++i) {
164  output->clock_seq_and_node |= static_cast<uint64_t>(static_cast<uint8_t>(input[15 - i])) << (8 * i);
165  }
166  return input + 16;
167 }
168 
169 static const char * DbDataType2CassType(
170  const GenDb::DbDataType::type &db_type) {
171  switch (db_type) {
172  case GenDb::DbDataType::AsciiType:
173  return "ascii";
174  case GenDb::DbDataType::LexicalUUIDType:
175  return "uuid";
176  case GenDb::DbDataType::TimeUUIDType:
177  return "timeuuid";
178  case GenDb::DbDataType::Unsigned8Type:
179  case GenDb::DbDataType::Unsigned16Type:
180  case GenDb::DbDataType::Unsigned32Type:
181  return "int";
182  case GenDb::DbDataType::Unsigned64Type:
183  return "bigint";
184  case GenDb::DbDataType::DoubleType:
185  return "double";
186  case GenDb::DbDataType::UTF8Type:
187  return "text";
188  case GenDb::DbDataType::InetType:
189  return "inet";
190  case GenDb::DbDataType::IntegerType:
191  return "varint";
192  case GenDb::DbDataType::BlobType:
193  return "blob";
194  default:
195  assert(false && "Invalid data type");
196  return "";
197  }
198 }
199 
200 static std::string DbDataTypes2CassTypes(
201  const GenDb::DbDataTypeVec &v_db_types) {
202  assert(!v_db_types.empty());
203  return std::string(DbDataType2CassType(v_db_types[0]));
204 }
205 
206 static CassConsistency Db2CassConsistency(
207  GenDb::DbConsistency::type dconsistency) {
208  switch (dconsistency) {
210  return CASS_CONSISTENCY_ANY;
212  return CASS_CONSISTENCY_ONE;
214  return CASS_CONSISTENCY_TWO;
216  return CASS_CONSISTENCY_THREE;
218  return CASS_CONSISTENCY_QUORUM;
220  return CASS_CONSISTENCY_ALL;
222  return CASS_CONSISTENCY_LOCAL_QUORUM;
224  return CASS_CONSISTENCY_EACH_QUORUM;
226  return CASS_CONSISTENCY_SERIAL;
228  return CASS_CONSISTENCY_LOCAL_SERIAL;
230  return CASS_CONSISTENCY_LOCAL_ONE;
232  default:
233  return CASS_CONSISTENCY_UNKNOWN;
234  }
235 }
236 
237 // Cass Query Printer
238 class CassQueryPrinter : public boost::static_visitor<> {
239  public:
240  CassQueryPrinter(std::ostream &os, bool quote_strings) :
241  os_(os),
242  quote_strings_(quote_strings) {
243  }
244  CassQueryPrinter(std::ostream &os) :
245  os_(os),
246  quote_strings_(true) {
247  }
248  template<typename T>
249  void operator()(const T &t) const {
250  os_ << t;
251  }
252  void operator()(const boost::uuids::uuid &tuuid) const {
253  os_ << to_string(tuuid);
254  }
255  // uint8_t must be handled specially because ostream sees
256  // uint8_t as a text type instead of an integer type
257  void operator()(const uint8_t &tu8) const {
258  os_ << (uint16_t)tu8;
259  }
260  void operator()(const std::string &tstring) const {
261  if (quote_strings_) {
262  os_ << "'" << tstring << "'";
263  } else {
264  os_ << tstring;
265  }
266  }
267  // CQL int is 32 bit signed integer
268  void operator()(const uint32_t &tu32) const {
269  os_ << (int32_t)tu32;
270  }
271  // CQL bigint is 64 bit signed long
272  void operator()(const uint64_t &tu64) const {
273  os_ << (int64_t)tu64;
274  }
275  void operator()(const IpAddress &tipaddr) const {
276  os_ << "'" << tipaddr << "'";
277  }
278  std::ostream &os_;
280 };
281 
282 //
283 // CassStatement bind
284 //
285 class CassStatementIndexBinder : public boost::static_visitor<> {
286  public:
288  CassStatement *statement) :
289  cci_(cci),
290  statement_(statement) {
291  }
292  void operator()(const boost::blank &tblank, size_t index) const {
293  assert(false && "CassStatement bind to boost::blank not supported");
294  }
295  void operator()(const std::string &tstring, size_t index) const {
296  CassError rc(cci_->CassStatementBindStringN(statement_, index,
297  tstring.c_str(), tstring.length()));
298  assert(rc == CASS_OK);
299  }
300  void operator()(const boost::uuids::uuid &tuuid, size_t index) const {
301  CassUuid cuuid;
302  decode_uuid((char *)&tuuid, &cuuid);
303  CassError rc(cci_->CassStatementBindUuid(statement_, index, cuuid));
304  assert(rc == CASS_OK);
305  }
306  void operator()(const uint8_t &tu8, size_t index) const {
307  CassError rc(cci_->CassStatementBindInt32(statement_, index, tu8));
308  assert(rc == CASS_OK);
309  }
310  void operator()(const uint16_t &tu16, size_t index) const {
311  CassError rc(cci_->CassStatementBindInt32(statement_, index, tu16));
312  assert(rc == CASS_OK);
313  }
314  void operator()(const uint32_t &tu32, size_t index) const {
315  assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
316  CassError rc(cci_->CassStatementBindInt32(statement_, index,
317  (cass_int32_t)tu32));
318  assert(rc == CASS_OK);
319  }
320  void operator()(const uint64_t &tu64, size_t index) const {
321  assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
322  CassError rc(cci_->CassStatementBindInt64(statement_, index,
323  (cass_int64_t)tu64));
324  assert(rc == CASS_OK);
325  }
326  void operator()(const double &tdouble, size_t index) const {
327  CassError rc(cci_->CassStatementBindDouble(statement_, index,
328  (cass_double_t)tdouble));
329  assert(rc == CASS_OK);
330  }
331  void operator()(const IpAddress &tipaddr, size_t index) const {
332  CassInet cinet;
333  if (tipaddr.is_v4()) {
334  boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
335  cinet = cci_->CassInetInitV4(GENERIC_RAW_ARRAY(tv4.to_bytes()));
336  } else {
337  boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
338  cinet = cci_->CassInetInitV6(GENERIC_RAW_ARRAY(tv6.to_bytes()));
339  }
340  CassError rc(cci_->CassStatementBindInet(statement_, index,
341  cinet));
342  assert(rc == CASS_OK);
343  }
344  void operator()(const GenDb::Blob &tblob, size_t index) const {
345  CassError rc(cci_->CassStatementBindBytes(statement_, index,
346  tblob.data(), tblob.size()));
347  assert(rc == CASS_OK);
348  }
350  CassStatement *statement_;
351 };
352 
353 class CassStatementNameBinder : public boost::static_visitor<> {
354  public:
356  CassStatement *statement) :
357  cci_(cci),
358  statement_(statement) {
359  }
360  void operator()(const boost::blank &tblank, const char *name) const {
361  assert(false && "CassStatement bind to boost::blank not supported");
362  }
363  void operator()(const std::string &tstring, const char *name) const {
364  CassError rc(cci_->CassStatementBindStringByNameN(statement_, name,
365  strlen(name), tstring.c_str(), tstring.length()));
366  assert(rc == CASS_OK);
367  }
368  void operator()(const boost::uuids::uuid &tuuid, const char *name) const {
369  CassUuid cuuid;
370  decode_uuid((char *)&tuuid, &cuuid);
371  CassError rc(cci_->CassStatementBindUuidByName(statement_, name,
372  cuuid));
373  assert(rc == CASS_OK);
374  }
375  void operator()(const uint8_t &tu8, const char *name) const {
376  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
377  tu8));
378  assert(rc == CASS_OK);
379  }
380  void operator()(const uint16_t &tu16, const char *name) const {
381  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
382  tu16));
383  assert(rc == CASS_OK);
384  }
385  void operator()(const uint32_t &tu32, const char *name) const {
386  assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
387  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
388  (cass_int32_t)tu32));
389  assert(rc == CASS_OK);
390  }
391  void operator()(const uint64_t &tu64, const char *name) const {
392  assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
393  CassError rc(cci_->CassStatementBindInt64ByName(statement_, name,
394  (cass_int64_t)tu64));
395  assert(rc == CASS_OK);
396  }
397  void operator()(const double &tdouble, const char *name) const {
398  CassError rc(cci_->CassStatementBindDoubleByName(statement_, name,
399  (cass_double_t)tdouble));
400  assert(rc == CASS_OK);
401  }
402  void operator()(const IpAddress &tipaddr, const char *name) const {
403  CassInet cinet;
404  if (tipaddr.is_v4()) {
405  boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
406  cinet = cci_->CassInetInitV4(GENERIC_RAW_ARRAY(tv4.to_bytes()));
407  } else {
408  boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
409  cinet = cci_->CassInetInitV6(GENERIC_RAW_ARRAY(tv6.to_bytes()));
410  }
411  CassError rc(cci_->CassStatementBindInetByName(statement_, name,
412  cinet));
413  assert(rc == CASS_OK);
414  }
415  void operator()(const GenDb::Blob &tblob, const char *name) const {
416  CassError rc(cci_->CassStatementBindBytesByNameN(statement_, name,
417  strlen(name), tblob.data(), tblob.size()));
418  assert(rc == CASS_OK);
419  }
421  CassStatement *statement_;
422 };
423 
424 static const char * kQCompactionStrategy(
425  "compaction = {'class': "
426  "'org.apache.cassandra.db.compaction.%s'}");
427 static const std::string kQGCGraceSeconds("gc_grace_seconds = 0");
428 static const std::string kQReadRepairChanceDTCS(
429  "read_repair_chance = 0.0");
430 
431 //
432 // Cf2CassCreateTableIfNotExists
433 //
434 
436  const std::string &compaction_strategy) {
437  std::ostringstream query;
438  // Table name
439  query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " ";
440  // Row key
441  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
442  assert(rkeys.size() == 1);
443  query << "(key " << DbDataType2CassType(rkeys[0]) <<
444  " PRIMARY KEY";
445  // Columns
446  const GenDb::NewCf::ColumnMap &cfcolumns(cf.cfcolumns_);
447  assert(!cfcolumns.empty());
448  BOOST_FOREACH(const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
449  cfcolumns) {
450  query << ", \"" << cfcolumn.first << "\" " <<
451  DbDataType2CassType(cfcolumn.second);
452  }
453  char cbuf[512];
454  int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy,
455  compaction_strategy.c_str()));
456  assert(!(n < 0 || n >= (int)sizeof(cbuf)));
457 
458  // The compaction strategy DateTieredCompactionStrategy precludes
459  // using read repair, because of the way timestamps are checked for
460  // DTCS compaction.In this case, you must set read_repair_chance to
461  // zero. For other compaction strategies, read repair should be
462  // enabled with a read_repair_chance value of 0.2 being typical
463  if (compaction_strategy ==
464  GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
465  query << ") WITH " << std::string(cbuf) << " AND " <<
467  } else {
468  query << ") WITH " << std::string(cbuf) << " AND " <<
470  }
471 
472  return query.str();
473 }
474 
476  const std::string &compaction_strategy,
477  boost::system::error_code *ec) {
478  std::ostringstream query;
479 
480  *ec = errc::make_error_code(errc::success);
481  // sanity check - # of clustering_columns cannot be 0
482  // for dynamic tables
483  if (cf.clustering_columns_.size() == 0) {
484  *ec = errc::make_error_code(errc::invalid_argument);
485  return query.str();
486  }
487 
488  // Table name
489  query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " (";
490  // Row key
491  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
492  int rk_size(rkeys.size());
493  for (int i = 0; i < rk_size; i++) {
494  if (i) {
495  int key_num(i + 1);
496  query << "key" << key_num;
497  } else {
498  query << "key";
499  }
500  query << " " << DbDataType2CassType(rkeys[i]) << ", ";
501  }
502  // clustering columns
503  const GenDb::DbDataTypeVec &clustering_columns(cf.clustering_columns_);
504  int ccn_size(clustering_columns.size());
505  for (int i = 0; i < ccn_size; i++) {
506  int cnum(i + 1);
507  query << "column" << cnum << " " <<
508  DbDataType2CassType(clustering_columns[i]) << ", ";
509  }
510  // columns
511  const GenDb::DbDataTypeVec &columns(cf.columns_);
512  int cn_size(columns.size());
513  for (int i = 0; i < cn_size; i++) {
514  int cnum(i + 1 + ccn_size);
515  query << "column" << cnum << " " <<
516  DbDataType2CassType(columns[i]) << ", ";
517  }
518  // Value
519  const GenDb::DbDataTypeVec &values(cf.value_);
520  if (values.size() > 0) {
521  query << "value" << " " << DbDataTypes2CassTypes(values) << ", ";
522  }
523  // Primary Key
524  query << "PRIMARY KEY (";
525  std::ostringstream rkey_ss;
526  for (int i = 0; i < rk_size; i++) {
527  if (i) {
528  int key_num(i + 1);
529  rkey_ss << ", key" << key_num;
530  } else {
531  rkey_ss << "key";
532  }
533  }
534  if (rk_size >= 2) {
535  query << "(" << rkey_ss.str() << "), ";
536  } else {
537  query << rkey_ss.str() << ", ";
538  }
539  for (int i = 0; i < ccn_size; i++) {
540  int cnum(i + 1);
541  if (i) {
542  query << ", ";
543  }
544  query << "column" << cnum;
545  }
546  char cbuf[512];
547  int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy,
548  compaction_strategy.c_str()));
549  assert(!(n < 0 || n >= (int)sizeof(cbuf)));
550 
551  // The compaction strategy DateTieredCompactionStrategy precludes
552  // using read repair, because of the way timestamps are checked for
553  // DTCS compaction.In this case, you must set read_repair_chance to
554  // zero. For other compaction strategies, read repair should be
555  // enabled with a read_repair_chance value of 0.2 being typical
556  if (compaction_strategy ==
557  GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
558  query << ")) WITH " << std::string(cbuf) << " AND " <<
560  } else {
561  query << ")) WITH " << std::string(cbuf) << " AND " <<
563  }
564  return query.str();
565 }
566 
567 static std::string DbColIndexMode2String(
568  const GenDb::ColIndexMode::type index_mode) {
569  switch (index_mode) {
571  return "";
572  case GenDb::ColIndexMode::PREFIX:
573  return "PREFIX";
574  case GenDb::ColIndexMode::CONTAINS:
575  return "CONTAINS";
576  default:
577  assert(false && "INVALID");
578  }
579 }
580 
581 //
582 // CassCreateIndexIfNotExists
583 //
584 
585 std::string CassCreateIndexIfNotExists(const std::string &cfname,
586  const std::string &column, const std::string &indexname,
587  const GenDb::ColIndexMode::type index_mode) {
588  std::ostringstream query;
589 
590  query << "CREATE ";
591  if (index_mode != GenDb::ColIndexMode::NONE) {
592  query << "CUSTOM "; // SASI
593  }
594  // Indexname
595  query << "INDEX IF NOT EXISTS " << indexname << " ";
596  // Index Column
597  query << "ON " << cfname << "(\""<< column <<"\")";
598  // Mode if SASI
599  if (index_mode != GenDb::ColIndexMode::NONE) {
600  query << " USING \'org.apache.cassandra.index.sasi.SASIIndex\' " <<
601  "WITH OPTIONS = {\'mode\': \'" << DbColIndexMode2String(index_mode) << "\'}";
602  }
603  query << ";";
604  return query.str();
605 }
606 
607 //
608 // Cf2CassInsertIntoTable
609 //
610 
611 std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns) {
612  std::ostringstream query;
613  // Table
614  const std::string &table(v_columns->cfname_);
615  query << "INSERT INTO " << table << " (";
616  std::ostringstream values_ss;
617  values_ss << "VALUES (";
618  CassQueryPrinter values_printer(values_ss);
619  // Row keys
620  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
621  int rk_size(rkeys.size());
622  for (int i = 0; i < rk_size; i++) {
623  if (i) {
624  int key_num(i + 1);
625  query << ", key" << key_num;
626  } else {
627  query << "key";
628  }
629  if (i) {
630  values_ss << ", ";
631  }
632  boost::apply_visitor(values_printer, rkeys[i]);
633  }
634  // Columns
635  int cttl(-1);
636  CassQueryPrinter cnames_printer(query, false);
637  BOOST_FOREACH(const GenDb::NewCol &column, v_columns->columns_) {
638  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_SQL);
639  // Column Name
640  query << ", ";
641  const GenDb::DbDataValueVec &cnames(*column.name.get());
642  assert(cnames.size() == 1);
643  // Double quote column name strings
644  query << "\"";
645  boost::apply_visitor(cnames_printer, cnames[0]);
646  query << "\"";
647  // Column Values
648  values_ss << ", ";
649  const GenDb::DbDataValueVec &cvalues(*column.value.get());
650  assert(cvalues.size() == 1);
651  boost::apply_visitor(values_printer, cvalues[0]);
652  // Column TTL
653  cttl = column.ttl;
654  }
655  query << ") ";
656  values_ss << ")";
657  query << values_ss.str();
658  if (cttl > 0) {
659  query << " USING TTL " << cttl;
660  }
661  return query.str();
662 }
663 
664 std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns) {
665  std::ostringstream query;
666  // Table
667  const std::string &table(v_columns->cfname_);
668  query << "INSERT INTO " << table << " (";
669  std::ostringstream values_ss;
670  // Row keys
671  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
672  int rk_size(rkeys.size());
673  CassQueryPrinter values_printer(values_ss);
674  for (int i = 0; i < rk_size; i++) {
675  if (i) {
676  int key_num(i + 1);
677  query << ", key" << key_num;
678  } else {
679  query << "key";
680  }
681  boost::apply_visitor(values_printer, rkeys[i]);
682  values_ss << ", ";
683  }
684  // Columns
685  const GenDb::NewColVec &columns(v_columns->columns_);
686  assert(columns.size() == 1);
687  const GenDb::NewCol &column(columns[0]);
688  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_NOSQL);
689  // Column Names
690  const GenDb::DbDataValueVec &cnames(*column.name.get());
691  int cn_size(cnames.size());
692  for (int i = 0; i < cn_size; i++) {
693  int cnum(i + 1);
694  if (cnames.at(i).which() != GenDb::DB_VALUE_BLANK) {
695  query << ", column" << cnum;
696  boost::apply_visitor(values_printer, cnames[i]);
697  if (i != cn_size - 1) {
698  values_ss << ", ";
699  }
700  }
701  }
702  // Column Values
703  const GenDb::DbDataValueVec &cvalues(*column.value.get());
704  if (cvalues.size() > 0) {
705  query << ", value) VALUES (";
706  values_ss << ", ";
707  boost::apply_visitor(values_printer, cvalues[0]);
708  } else {
709  query << ") VALUES (";
710  }
711  values_ss << ")";
712  query << values_ss.str();
713  if (column.ttl > 0) {
714  query << " USING TTL " << column.ttl;
715  }
716  return query.str();
717 }
718 
719 //
720 // Cf2CassPrepareInsertIntoTable
721 //
722 
724  std::ostringstream query;
725  // Table name
726  query << "INSERT INTO " << cf.cfname_ << " ";
727  // Row key
728  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
729  assert(rkeys.size() == 1);
730  std::ostringstream values_ss;
731  query << "(key";
732  values_ss << ") VALUES (?";
733  // Columns
734  const GenDb::NewCf::ColumnMap &cfcolumns(cf.cfcolumns_);
735  assert(!cfcolumns.empty());
736  BOOST_FOREACH(const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
737  cfcolumns) {
738  query << ", \"" << cfcolumn.first << "\"";
739  values_ss << ", ?";
740  }
741  query << values_ss.str();
742  query << ") USING TTL ?";
743  return query.str();
744 }
745 
747  boost::system::error_code *ec) {
748  std::ostringstream query;
749 
750  *ec = errc::make_error_code(errc::success);
751  // sanity check - # of clustering_columns cannot be 0
752  // for dynamic tables
753  if (cf.clustering_columns_.size() == 0) {
754  *ec = errc::make_error_code(errc::invalid_argument);
755  return query.str();
756  }
757 
758  // Table name
759  query << "INSERT INTO " << cf.cfname_ << " (";
760  // Row key
761  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
762  int rk_size(rkeys.size());
763  std::ostringstream values_ss;
764  for (int i = 0; i < rk_size; i++) {
765  if (i) {
766  int key_num(i + 1);
767  query << "key" << key_num;
768  } else {
769  query << "key";
770  }
771  query << ", ";
772  values_ss << "?, ";
773  }
774  // Clustering Column name
775  const GenDb::DbDataTypeVec &clustering_columns(cf.clustering_columns_);
776  int ccn_size(clustering_columns.size());
777  for (int i = 0; i < ccn_size; i++) {
778  int cnum(i + 1);
779  query << "column" << cnum;
780  values_ss << "?";
781  if (i != ccn_size - 1) {
782  query << ", ";
783  values_ss << ", ";
784  }
785  }
786  // Column name
787  const GenDb::DbDataTypeVec &columns(cf.columns_);
788  int cn_size(columns.size());
789  if (cn_size > 0) {
790  query << ", ";
791  values_ss << ", ";
792  }
793  for (int i = 0; i < cn_size; i++) {
794  int cnum(i + 1 + ccn_size);
795  query << "column" << cnum;
796  values_ss << "?";
797  if (i != cn_size - 1) {
798  query << ", ";
799  values_ss << ", ";
800  }
801  }
802  // Value
803  const GenDb::DbDataTypeVec &values(cf.value_);
804  if (values.size() > 0) {
805  query << ", value";
806  values_ss << ", ?";
807  }
808  query << ") VALUES (";
809  values_ss << ")";
810  query << values_ss.str();
811  query << " USING TTL ?";
812  return query.str();
813 }
814 
815 //
816 // Cf2CassPrepareBind
817 //
818 
820  CassStatement *statement,
821  const GenDb::ColList *v_columns) {
822  CassStatementNameBinder values_binder(cci, statement);
823  // Row keys
824  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
825  int rk_size(rkeys.size());
826  size_t idx(0);
827  for (; (int) idx < rk_size; idx++) {
828  std::string rk_name;
829  if (idx) {
830  int key_num(idx + 1);
831  rk_name = "key" + integerToString(key_num);
832  } else {
833  rk_name = "key";
834  }
835  boost::apply_visitor(boost::bind(values_binder, _1, rk_name.c_str()),
836  rkeys[idx]);
837  }
838  // Columns
839  int cttl(-1);
840  BOOST_FOREACH(const GenDb::NewCol &column, v_columns->columns_) {
841  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_SQL);
842  const GenDb::DbDataValueVec &cnames(*column.name.get());
843  assert(cnames.size() == 1);
844  assert(cnames[0].which() == GenDb::DB_VALUE_STRING);
845  std::string cname(boost::get<std::string>(cnames[0]));
846  const GenDb::DbDataValueVec &cvalues(*column.value.get());
847  assert(cvalues.size() == 1);
848  boost::apply_visitor(boost::bind(values_binder, _1, cname.c_str()),
849  cvalues[0]);
850  // Column TTL
851  cttl = column.ttl;
852  idx++;
853  }
854  CassError rc(cci->CassStatementBindInt32(statement, idx++,
855  (cass_int32_t)cttl));
856  assert(rc == CASS_OK);
857  return true;
858 }
859 
861  CassStatement *statement,
862  const GenDb::ColList *v_columns) {
863  CassStatementIndexBinder values_binder(cci, statement);
864  // Row keys
865  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
866  int rk_size(rkeys.size());
867  size_t idx(0);
868  for (; (int) idx < rk_size; idx++) {
869  boost::apply_visitor(boost::bind(values_binder, _1, idx), rkeys[idx]);
870  }
871  // Columns
872  const GenDb::NewColVec &columns(v_columns->columns_);
873  assert(columns.size() == 1);
874  const GenDb::NewCol &column(columns[0]);
875  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_NOSQL);
876  // Column Names
877  const GenDb::DbDataValueVec &cnames(*column.name.get());
878  int cn_size(cnames.size());
879  for (int i = 0; i < cn_size; i++, idx++) {
880  boost::apply_visitor(boost::bind(values_binder, _1, idx), cnames[i]);
881  }
882  // Column Values
883  const GenDb::DbDataValueVec &cvalues(*column.value.get());
884  if (cvalues.size() > 0) {
885  boost::apply_visitor(boost::bind(values_binder, _1, idx++),
886  cvalues[0]);
887  }
888  CassError rc(cci->CassStatementBindInt32(statement, idx++,
889  (cass_int32_t)column.ttl));
890  assert(rc == CASS_OK);
891  return true;
892 }
893 
894 static std::string CassSelectFromTableInternal(const std::string &table,
895  const std::vector<GenDb::DbDataValueVec> &rkeys,
896  const GenDb::ColumnNameRange &ck_range,
897  const GenDb::FieldNamesToReadVec &read_vec,
898  const GenDb::WhereIndexInfoVec &where_vec) {
899  std::ostringstream query;
900  // Table
901  if (read_vec.empty()) {
902  query << "SELECT * FROM " << table;
903  } else {
904  query << "SELECT ";
905  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
906  it != read_vec.end(); it++) {
907  query << it->get<0>() << ",";
908  bool read_timestamp = it->get<3>();
909  if (read_timestamp) {
910  query << "WRITETIME(" << it->get<0>() << "),";
911  }
912  }
913  query.seekp(-1, query.cur);
914  query << " FROM " << table;
915  }
916  if (rkeys.size() == 1) {
917  GenDb::DbDataValueVec rkey = rkeys[0];
918  int rk_size(rkey.size());
919  CassQueryPrinter cprinter(query);
920  for (int i = 0; i < rk_size; i++) {
921  if (i) {
922  int key_num(i + 1);
923  query << " AND key" << key_num << "=";
924  } else {
925  query << " WHERE key=";
926  }
927  boost::apply_visitor(cprinter, rkey[i]);
928  }
929 
930  } else if (rkeys.size() > 1) {
931  query << " WHERE key IN (";
932  BOOST_FOREACH(GenDb::DbDataValueVec rkey, rkeys) {
933  int rk_size(rkey.size());
934  assert(rk_size == 1);
935  CassQueryPrinter cprinter(query);
936  boost::apply_visitor(cprinter, rkey[0]);
937  query << ",";
938  }
939  query.seekp(-1, query.cur);
940  query << ")";
941  }
942  if (!where_vec.empty()) {
943  for (GenDb::WhereIndexInfoVec::const_iterator it = where_vec.begin();
944  it != where_vec.end(); ++it) {
945  std::ostringstream value_ss;
946  CassQueryPrinter value_vprinter(value_ss);
947  boost::apply_visitor(value_vprinter, it->get<2>());
948  query << " AND";
949  query << " " << it->get<0>();
950  query << " " << GenDb::Op::ToString(it->get<1>());
951  query << " " << value_ss.str();
952  }
953  }
954  if (!ck_range.IsEmpty()) {
955  if (!ck_range.start_.empty()) {
956  int ck_start_size(ck_range.start_.size());
957  std::ostringstream start_ss;
958  start_ss << " " << GenDb::Op::ToString(ck_range.start_op_) << " (";
959  CassQueryPrinter start_vprinter(start_ss);
960  query << " AND (";
961  for (int i = 0; i < ck_start_size; i++) {
962  if (i) {
963  query << ", ";
964  start_ss << ", ";
965  }
966  int cnum(i + 1);
967  query << "column" << cnum;
968  boost::apply_visitor(start_vprinter, ck_range.start_[i]);
969  }
970  query << ")";
971  start_ss << ")";
972  query << start_ss.str();
973  }
974  if (!ck_range.finish_.empty()) {
975  int ck_finish_size(ck_range.finish_.size());
976  std::ostringstream finish_ss;
977  finish_ss << " " << GenDb::Op::ToString(ck_range.finish_op_) <<
978  " (";
979  CassQueryPrinter finish_vprinter(finish_ss);
980  query << " AND (";
981  for (int i = 0; i < ck_finish_size; i++) {
982  if (i) {
983  query << ", ";
984  finish_ss << ", ";
985  }
986  int cnum(i + 1);
987  query << "column" << cnum;
988  boost::apply_visitor(finish_vprinter, ck_range.finish_[i]);
989  }
990  query << ")";
991  finish_ss << ")";
992  query << finish_ss.str();
993  }
994  if (ck_range.count_) {
995  query << " LIMIT " << ck_range.count_;
996  }
997  }
998  if (where_vec.size() > 1) {
999  query << " ALLOW FILTERING";
1000  }
1001  return query.str();
1002 }
1003 
1005  const std::string &table, const GenDb::DbDataValueVec &rkeys,
1006  const GenDb::ColumnNameRange &ck_range,
1007  const GenDb::WhereIndexInfoVec &where_vec,
1008  const GenDb::FieldNamesToReadVec &read_vec) {
1009  std::vector<GenDb::DbDataValueVec> rkey_vec;
1010  rkey_vec.push_back(rkeys);
1011  return CassSelectFromTableInternal(table, rkey_vec, ck_range,
1012  read_vec, where_vec);
1013 }
1014 
1015 std::string PartitionKey2CassSelectFromTable(const std::string &table,
1016  const GenDb::DbDataValueVec &rkeys) {
1017  std::vector<GenDb::DbDataValueVec> rkey_vec;
1018  rkey_vec.push_back(rkeys);
1019  return CassSelectFromTableInternal(table, rkey_vec, GenDb::ColumnNameRange(),
1022 }
1023 
1025  const std::string &table, const GenDb::DbDataValueVec &rkeys,
1026  const GenDb::ColumnNameRange &ck_range,
1027  const GenDb::FieldNamesToReadVec &read_vec) {
1028  std::vector<GenDb::DbDataValueVec> rkey_vec;
1029  rkey_vec.push_back(rkeys);
1030  return CassSelectFromTableInternal(table, rkey_vec, ck_range, read_vec,
1032 }
1033 
1035  const std::string &table, const std::vector<GenDb::DbDataValueVec> &rkeys,
1036  const GenDb::ColumnNameRange &ck_range,
1037  const GenDb::FieldNamesToReadVec &read_vec) {
1038  return CassSelectFromTableInternal(table, rkeys, ck_range, read_vec,
1040 }
1041 
1042 std::string CassSelectFromTable(const std::string &table) {
1043  std::vector<GenDb::DbDataValueVec> rkey_vec;
1044  return CassSelectFromTableInternal(table, rkey_vec,
1047 }
1048 
1050  interface::CassLibrary *cci, const CassValue *cvalue) {
1051  if (cci->CassValueIsNull(cvalue)) {
1052  return GenDb::DbDataValue();
1053  }
1054  CassValueType cvtype(cci->GetCassValueType(cvalue));
1055  switch (cvtype) {
1056  case CASS_VALUE_TYPE_ASCII:
1057  case CASS_VALUE_TYPE_VARCHAR:
1058  case CASS_VALUE_TYPE_TEXT: {
1059  CassString ctstring;
1060  CassError rc(cci->CassValueGetString(cvalue, &ctstring.data,
1061  &ctstring.length));
1062  assert(rc == CASS_OK);
1063  return std::string(ctstring.data, ctstring.length);
1064  }
1065  case CASS_VALUE_TYPE_UUID: {
1066  CassUuid ctuuid;
1067  CassError rc(cci->CassValueGetUuid(cvalue, &ctuuid));
1068  assert(rc == CASS_OK);
1070  encode_uuid((char *)&u, ctuuid);
1071  return u;
1072  }
1073  case CASS_VALUE_TYPE_DOUBLE: {
1074  cass_double_t ctdouble;
1075  CassError rc(cci->CassValueGetDouble(cvalue, &ctdouble));
1076  assert(rc == CASS_OK);
1077  return (double)ctdouble;
1078  }
1079  case CASS_VALUE_TYPE_TINY_INT: {
1080  cass_int8_t ct8;
1081  CassError rc(cci->CassValueGetInt8(cvalue, &ct8));
1082  assert(rc == CASS_OK);
1083  return (uint8_t)ct8;
1084  }
1085  case CASS_VALUE_TYPE_SMALL_INT: {
1086  cass_int16_t ct16;
1087  CassError rc(cci->CassValueGetInt16(cvalue, &ct16));
1088  assert(rc == CASS_OK);
1089  return (uint16_t)ct16;
1090  }
1091  case CASS_VALUE_TYPE_INT: {
1092  cass_int32_t ct32;
1093  CassError rc(cci->CassValueGetInt32(cvalue, &ct32));
1094  assert(rc == CASS_OK);
1095  return (uint32_t)ct32;
1096  }
1097  case CASS_VALUE_TYPE_BIGINT: {
1098  cass_int64_t ct64;
1099  CassError rc(cci->CassValueGetInt64(cvalue, &ct64));
1100  assert(rc == CASS_OK);
1101  return (uint64_t)ct64;
1102  }
1103  case CASS_VALUE_TYPE_INET: {
1104  CassInet ctinet;
1105  CassError rc(cci->CassValueGetInet(cvalue, &ctinet));
1106  assert(rc == CASS_OK);
1107  IpAddress ipaddr;
1108  if (ctinet.address_length == CASS_INET_V4_LENGTH) {
1109  Ip4Address::bytes_type ipv4;
1110  memcpy(GENERIC_RAW_ARRAY(ipv4), ctinet.address, CASS_INET_V4_LENGTH);
1111  ipaddr = Ip4Address(ipv4);
1112  } else if (ctinet.address_length == CASS_INET_V6_LENGTH) {
1113  Ip6Address::bytes_type ipv6;
1114  memcpy(GENERIC_RAW_ARRAY(ipv6), ctinet.address, CASS_INET_V6_LENGTH);
1115  ipaddr = Ip6Address(ipv6);
1116  } else {
1117  assert(0);
1118  }
1119  return ipaddr;
1120  }
1121  case CASS_VALUE_TYPE_BLOB: {
1122  const cass_byte_t *bytes(NULL);
1123  size_t size(0);
1124  CassError rc(cci->CassValueGetBytes(cvalue, &bytes, &size));
1125  assert(rc == CASS_OK);
1126  return GenDb::Blob(bytes, size);
1127  }
1128  case CASS_VALUE_TYPE_UNKNOWN: {
1129  // null type
1130  return GenDb::DbDataValue();
1131  }
1132  default: {
1133  CQLIF_ERR_TRACE("Unhandled CassValueType: " << cvtype);
1134  assert(false && "Unhandled value type");
1135  return GenDb::DbDataValue();
1136  }
1137  }
1138 }
1139 
1141  CassSession *session, const char* query,
1142  CassPreparedPtr *prepared) {
1143  CQLIF_DEBUG_TRACE( "PrepareSync: " << query);
1144  CassFuturePtr future(cci->CassSessionPrepare(session, query), cci);
1145  cci->CassFutureWait(future.get());
1146 
1147  CassError rc(cci->CassFutureErrorCode(future.get()));
1148  if (rc != CASS_OK) {
1149  CassString err;
1150  cci->CassFutureErrorMessage(future.get(), &err.data, &err.length);
1151  CQLIF_ERR_TRACE("PrepareSync: " << query << " FAILED: " <<
1152  std::string(err.data, err.length));
1153  } else {
1154  *prepared = CassPreparedPtr(cci->CassFutureGetPrepared(future.get()),
1155  cci);
1156  }
1157  return rc == CASS_OK;
1158 }
1159 
1161  CassSession *session,
1162  CassStatement *qstatement, CassResultPtr *result,
1163  CassConsistency consistency) {
1164  cci->CassStatementSetConsistency(qstatement, consistency);
1165  CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci);
1166  cci->CassFutureWait(future.get());
1167 
1168  CassError rc(cci->CassFutureErrorCode(future.get()));
1169  if (rc != CASS_OK) {
1170  CassString err;
1171  cci->CassFutureErrorMessage(future.get(), &err.data, &err.length);
1172  CQLIF_ERR_TRACE("SyncQuery: FAILED: " <<
1173  std::string(err.data, err.length));
1174  } else {
1175  if (result) {
1176  *result = CassResultPtr(cci->CassFutureGetResult(future.get()),
1177  cci);
1178  }
1179  }
1180  return rc == CASS_OK;
1181 }
1182 
1184  CassSession *session, const char *query, CassConsistency consistency) {
1185  CQLIF_DEBUG_TRACE( "SyncQuery: " << query);
1186  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1187  return ExecuteQuerySyncInternal(cci, session, statement.get(), NULL,
1188  consistency);
1189 }
1190 
1192  CassSession *session, const char *query,
1193  CassResultPtr *result, CassConsistency consistency) {
1194  CQLIF_DEBUG_TRACE( "SyncQuery: " << query);
1195  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1196  return ExecuteQuerySyncInternal(cci, session, statement.get(), result,
1197  consistency);
1198 }
1199 
1201  CassSession *session, CassStatement *statement,
1202  CassConsistency consistency) {
1203  return ExecuteQuerySyncInternal(cci, session, statement, NULL,
1204  consistency);
1205 }
1206 
1208  switch (rc) {
1209  case CASS_OK:
1210  return GenDb::DbOpResult::OK;
1211  case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1212  case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
1213  case CASS_ERROR_LIB_NO_AVAILABLE_IO_THREAD:
1215  default:
1216  return GenDb::DbOpResult::ERROR;
1217  }
1218 }
1219 
1221  CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec,
1222  GenDb::NewColVec *v_columns) {
1223  // Row iterator
1224  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1225  while (cci->CassIteratorNext(riterator.get())) {
1226  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1230  int i = 0;
1231  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1232  it != read_vec.end(); it++) {
1233  bool row_key = it->get<1>();
1234  bool row_column = it->get<2>();
1235  bool read_timestamp = it->get<3>();
1236  if (row_key) {
1237  i++;
1238  continue;
1239  }
1240  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1241  assert(cvalue);
1242  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1243  if (row_column) {
1244  cnames->push_back(db_value);
1245  } else {
1246  values->push_back(db_value);
1247  if (read_timestamp) {
1248  i++;
1249  const CassValue *ctimestamp(cci->CassRowGetColumn(row, i));
1250  assert(ctimestamp);
1251  GenDb::DbDataValue time_value(CassValue2DbDataValue(cci, ctimestamp));
1252  timestamps->push_back(time_value);
1253  }
1254  }
1255  i++;
1256  }
1257  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0, timestamps));
1258  v_columns->push_back(column);
1259  }
1260 }
1261 
1263  CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec,
1264  GenDb::ColListVec *v_col_list) {
1265  std::auto_ptr<GenDb::ColList> col_list;
1266  // Row iterator
1267  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1268  while (cci->CassIteratorNext(riterator.get())) {
1269  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1270  GenDb::DbDataValueVec rkey;
1274  int i = 0;
1275  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1276  it != read_vec.end(); it++) {
1277  bool row_key = it->get<1>();
1278  bool row_column = it->get<2>();
1279  bool read_timestamp = it->get<3>();
1280  if (row_key) {
1281  // Partiiton key
1282  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1283  assert(cvalue);
1284  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1285  rkey.push_back(db_value);
1286  i++;
1287  continue;
1288  }
1289  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1290  assert(cvalue);
1291  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1292  if (row_column) {
1293  cnames->push_back(db_value);
1294  } else {
1295  values->push_back(db_value);
1296  if (read_timestamp) {
1297  i++;
1298  const CassValue *ctimestamp(cci->CassRowGetColumn(row, i));
1299  assert(ctimestamp);
1300  GenDb::DbDataValue time_value(CassValue2DbDataValue(cci, ctimestamp));
1301  timestamps->push_back(time_value);
1302  }
1303  }
1304  i++;
1305  }
1306  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0, timestamps));
1307  // Do we need a new ColList?
1308  if (!col_list.get()) {
1309  col_list.reset(new GenDb::ColList);
1310  col_list->rowkey_ = rkey;
1311  }
1312  if (rkey != col_list->rowkey_) {
1313  v_col_list->push_back(col_list.release());
1314  col_list.reset(new GenDb::ColList);
1315  col_list->rowkey_ = rkey;
1316  }
1317  GenDb::NewColVec *v_columns(&col_list->columns_);
1318  v_columns->push_back(column);
1319  }
1320  if (col_list.get()) {
1321  v_col_list->push_back(col_list.release());
1322  }
1323 }
1324 
1326  CassResultPtr *result, size_t rk_count,
1327  size_t ck_count, GenDb::NewColVec *v_columns) {
1328  // Row iterator
1329  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1330  while (cci->CassIteratorNext(riterator.get())) {
1331  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1332  // Iterate over columns
1333  size_t ccount(cci->CassResultColumnCount(result->get()));
1334  // Clustering key
1336  for (size_t i = rk_count; i < rk_count + ck_count; i++) {
1337  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1338  assert(cvalue);
1339  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1340  cnames->push_back(db_value);
1341  }
1342  // Values
1344  for (size_t i = rk_count + ck_count; i < ccount; i++) {
1345  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1346  assert(cvalue);
1347  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1348  values->push_back(db_value);
1349  }
1350  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0));
1351  v_columns->push_back(column);
1352  }
1353 }
1354 
1356  CassResultPtr *result, size_t rk_count,
1357  size_t ck_count, GenDb::ColListVec *v_col_list) {
1358  std::auto_ptr<GenDb::ColList> col_list;
1359  // Row iterator
1360  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1361  while (cci->CassIteratorNext(riterator.get())) {
1362  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1363  // Iterate over columns
1364  size_t ccount(cci->CassResultColumnCount(result->get()));
1365  // Partiiton key
1366  GenDb::DbDataValueVec rkey;
1367  for (size_t i = 0; i < rk_count; i++) {
1368  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1369  assert(cvalue);
1370  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1371  rkey.push_back(db_value);
1372  }
1373  // Clustering key
1375  for (size_t i = rk_count; i < rk_count + ck_count; i++) {
1376  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1377  assert(cvalue);
1378  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1379  cnames->push_back(db_value);
1380  }
1381  // Values
1383  for (size_t i = rk_count + ck_count; i < ccount; i++) {
1384  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1385  assert(cvalue);
1386  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1387  values->push_back(db_value);
1388  }
1389  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0));
1390  // Do we need a new ColList?
1391  if (!col_list.get()) {
1392  col_list.reset(new GenDb::ColList);
1393  col_list->rowkey_ = rkey;
1394  }
1395  if (rkey != col_list->rowkey_) {
1396  v_col_list->push_back(col_list.release());
1397  col_list.reset(new GenDb::ColList);
1398  col_list->rowkey_ = rkey;
1399  }
1400  GenDb::NewColVec *v_columns(&col_list->columns_);
1401  v_columns->push_back(column);
1402  }
1403  if (col_list.get()) {
1404  v_col_list->push_back(col_list.release());
1405  }
1406 }
1407 
1409  CassResultPtr *result, GenDb::NewColVec *v_columns) {
1410  // Row iterator
1411  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1412  while (cci->CassIteratorNext(riterator.get())) {
1413  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1414  // Iterate over columns
1415  size_t ccount(cci->CassResultColumnCount(result->get()));
1416  for (size_t i = 0; i < ccount; i++) {
1417  CassString cname;
1418  CassError rc(cci->CassResultColumnName(result->get(), i,
1419  &cname.data, &cname.length));
1420  assert(rc == CASS_OK);
1421  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1422  assert(cvalue);
1423  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1424  if (db_value.which() == GenDb::DB_VALUE_BLANK) {
1425  continue;
1426  }
1427  GenDb::NewCol *column(new GenDb::NewCol(
1428  std::string(cname.data, cname.length), db_value, 0));
1429  v_columns->push_back(column);
1430  }
1431  }
1432 }
1433 
1435  CassResultPtr *result, size_t rk_count, GenDb::ColListVec *v_col_list) {
1436  std::auto_ptr<GenDb::ColList> col_list;
1437  // Row iterator
1438  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1439  while (cci->CassIteratorNext(riterator.get())) {
1440  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1441  // Iterate over columns
1442  size_t ccount(cci->CassResultColumnCount(result->get()));
1443  // Partiiton key
1444  GenDb::DbDataValueVec rkey;
1445  for (size_t i = 0; i < rk_count; i++) {
1446  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1447  assert(cvalue);
1448  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1449  rkey.push_back(db_value);
1450  }
1451  // Do we need a new ColList?
1452  if (!col_list.get()) {
1453  col_list.reset(new GenDb::ColList);
1454  col_list->rowkey_ = rkey;
1455  }
1456  if (rkey != col_list->rowkey_) {
1457  v_col_list->push_back(col_list.release());
1458  col_list.reset(new GenDb::ColList);
1459  col_list->rowkey_ = rkey;
1460  }
1461  GenDb::NewColVec *v_columns(&col_list->columns_);
1462  for (size_t i = 0; i < ccount; i++) {
1463  CassString cname;
1464  CassError rc(cci->CassResultColumnName(result->get(), i,
1465  &cname.data, &cname.length));
1466  assert(rc == CASS_OK);
1467  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1468  assert(cvalue);
1469  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1470  if (db_value.which() == GenDb::DB_VALUE_BLANK) {
1471  continue;
1472  }
1473  GenDb::NewCol *column(new GenDb::NewCol(
1474  std::string(cname.data, cname.length), db_value, 0));
1475  v_columns->push_back(column);
1476  }
1477  }
1478  if (col_list.get()) {
1479  v_col_list->push_back(col_list.release());
1480  }
1481 }
1482 
1483 static void OnExecuteQueryAsync(CassFuture *future, void *data) {
1484  assert(data);
1485  std::auto_ptr<CassAsyncQueryContext> ctx(
1486  boost::reinterpret_pointer_cast<CassAsyncQueryContext>(data));
1487  interface::CassLibrary *cci(ctx->cci_);
1488  CassError rc(cci->CassFutureErrorCode(future));
1490  if (rc != CASS_OK) {
1491  CassString err;
1492  cci->CassFutureErrorMessage(future, &err.data, &err.length);
1493  CQLIF_ERR_TRACE("AsyncQuery: " << ctx->query_id_ << " FAILED: "
1494  << std::string(err.data, err.length));
1495  ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1496  return;
1497  }
1498  if (ctx->result_ctx_) {
1499  CassQueryResultContext *rctx(ctx->result_ctx_.get());
1500  CassResultPtr result(cci->CassFutureGetResult(future), cci);
1501  // In case of select parse the results
1502  if (cci->CassResultColumnCount(result.get())) {
1503  std::auto_ptr<GenDb::ColList> col_list(new GenDb::ColList);
1504  col_list->cfname_ = rctx->cf_name_;
1505  col_list->rowkey_ = rctx->row_key_;
1506  if (rctx->is_dynamic_cf_) {
1507  DynamicCfGetResult(cci, &result, rctx->rk_count_,
1508  rctx->ck_count_, &col_list->columns_);
1509  } else {
1510  StaticCfGetResult(cci, &result, &col_list->columns_);
1511  }
1512  ctx->cb_(db_rc, col_list);
1513  return;
1514  }
1515  }
1516  ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1517 }
1518 
1520  CassSession *session, const char *qid, CassStatement *qstatement,
1521  CassConsistency consistency, CassAsyncQueryCallback cb,
1522  CassQueryResultContext *rctx = NULL) {
1523  CQLIF_DEBUG_TRACE( "AsyncQuery: " << qid);
1524  cci->CassStatementSetConsistency(qstatement, consistency);
1525  CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci);
1526  std::auto_ptr<CassAsyncQueryContext> ctx(
1527  new CassAsyncQueryContext(qid, cb, cci, rctx));
1528  cci->CassFutureSetCallback(future.get(), OnExecuteQueryAsync,
1529  ctx.release());
1530 }
1531 
1533  CassSession *session, const char *query,
1534  CassConsistency consistency, CassAsyncQueryCallback cb) {
1535  CQLIF_DEBUG_TRACE( "AsyncQuery: " << query);
1536  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1537  ExecuteQueryAsyncInternal(cci, session, query, statement.get(),
1538  consistency, cb);
1539 }
1540 
1542  CassSession *session, const char *query_id, CassStatement *qstatement,
1543  CassConsistency consistency, CassAsyncQueryCallback cb) {
1544  ExecuteQueryAsyncInternal(cci, session, query_id, qstatement, consistency,
1545  cb);
1546 }
1547 
1549  CassSession *session, const char *query, CassConsistency consistency,
1551  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1552  ExecuteQueryAsyncInternal(cci, session, query, statement.get(),
1553  consistency, cb, rctx);
1554 }
1555 
1557  CassSession *session, const char *query, CassConsistency consistency,
1558  impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count,
1559  const std::string &cfname, const GenDb::DbDataValueVec &row_key) {
1560  std::auto_ptr<CassQueryResultContext> rctx(
1561  new CassQueryResultContext(cfname, true, row_key,
1562  rk_count, ck_count));
1563  ExecuteQueryResultAsync(cci, session, query, consistency, cb,
1564  rctx.release());
1565  return true;
1566 }
1567 
1569  CassSession *session, const char *query,
1570  const GenDb::FieldNamesToReadVec &read_vec,
1571  CassConsistency consistency, GenDb::NewColVec *v_columns) {
1572  CassResultPtr result(NULL, cci);
1573  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1574  consistency));
1575  if (!success) {
1576  return success;
1577  }
1578  DynamicCfGetResult(cci, &result, read_vec, v_columns);
1579  return success;
1580 }
1581 
1583  CassSession *session, const char *query,
1584  const GenDb::FieldNamesToReadVec &read_vec,
1585  CassConsistency consistency, GenDb::ColListVec *v_columns) {
1586  CassResultPtr result(NULL, cci);
1587  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1588  consistency));
1589  if (!success) {
1590  return success;
1591  }
1592  DynamicCfGetResult(cci, &result, read_vec, v_columns);
1593  return success;
1594 }
1595 
1597  CassSession *session, const char *query,
1598  size_t rk_count, size_t ck_count, CassConsistency consistency,
1599  GenDb::NewColVec *v_columns) {
1600  CassResultPtr result(NULL, cci);
1601  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1602  consistency));
1603  if (!success) {
1604  return success;
1605  }
1606  DynamicCfGetResult(cci, &result, rk_count, ck_count, v_columns);
1607  return success;
1608 }
1609 
1611  CassSession *session, const char *query,
1612  size_t rk_count, size_t ck_count, CassConsistency consistency,
1613  GenDb::ColListVec *v_col_list) {
1614  CassResultPtr result(NULL, cci);
1615  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1616  consistency));
1617  if (!success) {
1618  return success;
1619  }
1620  DynamicCfGetResult(cci, &result, rk_count, ck_count, v_col_list);
1621  return success;
1622 }
1623 
1625  CassSession *session, const char *query,
1626  CassConsistency consistency, impl::CassAsyncQueryCallback cb,
1627  const std::string& cfname, const GenDb::DbDataValueVec &row_key) {
1628  std::auto_ptr<CassQueryResultContext> rctx(
1629  new CassQueryResultContext(cfname, false, row_key));
1630  ExecuteQueryResultAsync(cci, session, query, consistency, cb,
1631  rctx.release());
1632  return true;
1633 }
1634 
1636  CassSession *session, const char *query,
1637  CassConsistency consistency, GenDb::NewColVec *v_columns) {
1638  CassResultPtr result(NULL, cci);
1639  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1640  consistency));
1641  if (!success) {
1642  return success;
1643  }
1644  StaticCfGetResult(cci, &result, v_columns);
1645  return success;
1646 }
1647 
1649  CassSession *session, const char *query, size_t rk_count,
1650  CassConsistency consistency, GenDb::ColListVec *v_col_list) {
1651  CassResultPtr result(NULL, cci);
1652  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1653  consistency));
1654  if (!success) {
1655  return success;
1656  }
1657  StaticCfGetResult(cci, &result, rk_count, v_col_list);
1658  return success;
1659 }
1660 
1662  CassFuture *future) {
1663  cci->CassFutureWait(future);
1664  CassError rc(cci->CassFutureErrorCode(future));
1665  if (rc != CASS_OK) {
1666  CassString err;
1667  cci->CassFutureErrorMessage(future, &err.data, &err.length);
1668  CQLIF_ERR_TRACE("SyncWait: FAILED: " <<
1669  std::string(err.data, err.length));
1670  }
1671  return rc == CASS_OK;
1672 }
1673 
1674 static const CassTableMeta * GetCassTableMeta(
1675  interface::CassLibrary *cci, const CassSchemaMeta *schema_meta,
1676  const std::string &keyspace, const std::string &table, bool log_error) {
1677  const CassKeyspaceMeta *keyspace_meta(
1678  cci->CassSchemaMetaKeyspaceByName(schema_meta, keyspace.c_str()));
1679  if (keyspace_meta == NULL) {
1680  if (log_error) {
1681  CQLIF_ERR_TRACE("No keyspace schema: Keyspace: " << keyspace <<
1682  ", Table: " << table);
1683  }
1684  return NULL;
1685  }
1686  std::string table_lower(table);
1687  boost::algorithm::to_lower(table_lower);
1688  const CassTableMeta *table_meta(
1689  cci->CassKeyspaceMetaTableByName(keyspace_meta, table_lower.c_str()));
1690  if (table_meta == NULL) {
1691  if (log_error) {
1692  CQLIF_ERR_TRACE("No table schema: Keyspace: " << keyspace <<
1693  ", Table: " << table_lower);
1694  }
1695  return NULL;
1696  }
1697  return table_meta;
1698 }
1699 
1701  CassSession *session,
1702  const std::string &keyspace, const std::string &table) {
1704  session), cci);
1705  if (schema_meta.get() == NULL) {
1706  CQLIF_DEBUG_TRACE( "No schema meta: Keyspace: " << keyspace <<
1707  ", Table: " << table);
1708  return false;
1709  }
1710  bool log_error(false);
1711  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1712  schema_meta.get(), keyspace, table, log_error));
1713  if (table_meta == NULL) {
1714  return false;
1715  }
1716  return true;
1717 }
1718 
1721  CassSession *session, const std::string &keyspace,
1722  const std::string &table, size_t *ck_count) {
1724  session), cci);
1725  if (schema_meta.get() == NULL) {
1726  CQLIF_ERR_TRACE("No schema meta: Keyspace: " << keyspace <<
1727  ", Table: " << table);
1728  return false;
1729  }
1730  bool log_error(true);
1731  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1732  schema_meta.get(), keyspace, table, log_error));
1733  if (table_meta == NULL) {
1734  return false;
1735  }
1736  *ck_count = cci->CassTableMetaClusteringKeyCount(table_meta);
1737  return true;
1738 }
1739 
1741  interface::CassLibrary *cci, CassSession *session,
1742  const std::string &keyspace, const std::string &table, size_t *rk_count) {
1744  session), cci);
1745  if (schema_meta.get() == NULL) {
1746  CQLIF_ERR_TRACE("No schema meta: Keyspace: " << keyspace <<
1747  ", Table: " << table);
1748  return false;
1749  }
1750  bool log_error(true);
1751  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1752  schema_meta.get(), keyspace, table, log_error));
1753  if (table_meta == NULL) {
1754  return false;
1755  }
1756  *rk_count = cci->CassTableMetaPartitionKeyCount(table_meta);
1757  return true;
1758 }
1759 
1760 static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel) {
1761  switch (clevel) {
1762  case CASS_LOG_DISABLED:
1763  return log4cplus::OFF_LOG_LEVEL;
1764  case CASS_LOG_CRITICAL:
1765  return log4cplus::FATAL_LOG_LEVEL;
1766  case CASS_LOG_ERROR:
1767  return log4cplus::ERROR_LOG_LEVEL;
1768  case CASS_LOG_WARN:
1769  return log4cplus::WARN_LOG_LEVEL;
1770  case CASS_LOG_INFO:
1771  return log4cplus::INFO_LOG_LEVEL;
1772  case CASS_LOG_DEBUG:
1773  return log4cplus::DEBUG_LOG_LEVEL;
1774  case CASS_LOG_TRACE:
1775  return log4cplus::TRACE_LOG_LEVEL;
1776  default:
1777  return log4cplus::ALL_LOG_LEVEL;
1778  }
1779 }
1780 
1781 static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level) {
1782  switch (level) {
1783  case log4cplus::OFF_LOG_LEVEL:
1784  return CASS_LOG_DISABLED;
1785  case log4cplus::FATAL_LOG_LEVEL:
1786  return CASS_LOG_CRITICAL;
1787  case log4cplus::ERROR_LOG_LEVEL:
1788  return CASS_LOG_ERROR;
1789  case log4cplus::WARN_LOG_LEVEL:
1790  return CASS_LOG_WARN;
1791  case log4cplus::INFO_LOG_LEVEL:
1792  return CASS_LOG_INFO;
1793  case log4cplus::DEBUG_LOG_LEVEL:
1794  return CASS_LOG_DEBUG;
1795  case log4cplus::TRACE_LOG_LEVEL:
1796  return CASS_LOG_TRACE;
1797  default:
1798  assert(false && "Invalid Log4Level");
1799  return CASS_LOG_DISABLED;
1800  }
1801 }
1802 
1803 static void CassLibraryLog(const CassLogMessage* message, void *data) {
1804  if (LoggingDisabled()) {
1805  return;
1806  }
1807  log4cplus::LogLevel log4level(Cass2log4Level(message->severity));
1808  std::stringstream buf;
1809  buf << "CassLibrary: " << message->file << ":" << message->line <<
1810  " " << message->function << "] " << message->message;
1811  CASS_LIB_TRACE(log4level, buf.str());
1812 }
1813 
1814 static std::string LoadCertFile(const std::string &ca_certs_path) {
1815  if (ca_certs_path.length() == 0) {
1816  return std::string();
1817  }
1818  std::ifstream file(ca_certs_path.c_str());
1819  if (!file) {
1820  return std::string();
1821  }
1822  std::string content((std::istreambuf_iterator<char>(file)),
1823  std::istreambuf_iterator<char>());
1824  return content;
1825 }
1826 
1827 
1828 class WorkerTask : public Task {
1829  public:
1830  typedef boost::function<void(void)> FunctionPtr;
1831  WorkerTask(FunctionPtr func, int task_id, int task_instance) :
1832  Task(task_id, task_instance),
1833  func_(func) {
1834  }
1835  bool Run() {
1836  func_();
1837  return true;
1838  }
1839  std::string Description() const {
1840  return "cass::cql::impl::WorkerTask";
1841  }
1842  private:
1844 };
1845 
1846 } // namespace impl
1847 
1848 //
1849 // CqlIfImpl
1850 //
1851 CqlIfImpl::CqlIfImpl(EventManager *evm,
1852  const std::vector<std::string> &cassandra_ips,
1853  int cassandra_port,
1854  const std::string &cassandra_user,
1855  const std::string &cassandra_password,
1856  bool use_ssl,
1857  const std::string &ca_certs_path,
1858  interface::CassLibrary *cci) :
1859  evm_(evm),
1860  cci_(cci),
1861  cluster_(cci_->CassClusterNew(), cci_),
1862  ssl_(0, cci_),
1863  session_(cci_->CassSessionNew(), cci_),
1864  schema_session_(cci_->CassSessionNew(), cci_),
1865  keyspace_(),
1866  io_thread_count_(2) {
1867  // Set session state to INIT
1870 
1871  if (cassandra_ips.size() > 0) {
1872  schema_contact_point_ = cassandra_ips[0];
1873  boost::system::error_code ec;
1874  boost::asio::ip::address::from_string(cassandra_ips[0], ec);
1875  if(ec.value() != 0){
1877  evm->io_service(), cassandra_ips[0]);
1878  }
1879  }
1880 
1881  if (use_ssl) {
1882  ssl_ = impl::CassSslPtr(cci->CassSslNew(), cci_);
1883  /* Only verify the certification and not the identity */
1884  cci_->CassSslSetVerifyFlags(ssl_.get(), CASS_SSL_VERIFY_PEER_CERT);
1885  std::string content = impl::LoadCertFile(ca_certs_path);
1886  if (content.length() == 0) {
1887  cci_->CassSslSetVerifyFlags(ssl_.get(), CASS_SSL_VERIFY_NONE);
1888  } else {
1889  cci_->CassSslAddTrustedCert(ssl_.get(), content);
1890  }
1891  cci_->CassClusterSetSsl(cluster_.get(), ssl_.get());
1892  }
1893  std::string contact_points(boost::algorithm::join(cassandra_ips, ","));
1894  cci_->CassClusterSetContactPoints(cluster_.get(), contact_points.c_str());
1895  cci_->CassClusterSetPort(cluster_.get(), cassandra_port);
1896  // Set credentials for plain text authentication
1897  if (!cassandra_user.empty() && !cassandra_password.empty()) {
1898  cci_->CassClusterSetCredentials(cluster_.get(), cassandra_user.c_str(),
1899  cassandra_password.c_str());
1900  }
1901  // Set number of IO threads to half the number of cores
1907 }
1908 
1910  assert(session_state_ == SessionState::INIT ||
1914 }
1915 
1916 bool CqlIfImpl::CreateKeyspaceIfNotExistsSync(const std::string &keyspace,
1917  const std::string &replication_factor, CassConsistency consistency) {
1919  return false;
1920  }
1921  char buf[512];
1922  int n(snprintf(buf, sizeof(buf), kQCreateKeyspaceIfNotExists,
1923  keyspace.c_str(), replication_factor.c_str()));
1924  if (n < 0 || n >= (int)sizeof(buf)) {
1925  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1926  keyspace << ", RF: " << replication_factor);
1927  return false;
1928  }
1929  return impl::ExecuteQuerySync(cci_, schema_session_.get(), buf,
1930  consistency);
1931 }
1932 
1933 bool CqlIfImpl::UseKeyspaceSyncOnSchemaSession(const std::string &keyspace,
1934  CassConsistency consistency) {
1936  return false;
1937  }
1938  char buf[512];
1939  int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str()));
1940  if (n < 0 || n >= (int)sizeof(buf)) {
1941  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1942  keyspace);
1943  return false;
1944  }
1945  bool success(impl::ExecuteQuerySync(cci_, schema_session_.get(), buf,
1946  consistency));
1947  if (!success) {
1948  return false;
1949  }
1950  // Update keyspace
1951  keyspace_ = keyspace;
1952  return success;
1953 }
1954 
1955 bool CqlIfImpl::UseKeyspaceSync(const std::string &keyspace,
1956  CassConsistency consistency) {
1958  return false;
1959  }
1960  char buf[512];
1961  int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str()));
1962  if (n < 0 || n >= (int)sizeof(buf)) {
1963  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1964  keyspace);
1965  return false;
1966  }
1967  bool success(impl::ExecuteQuerySync(cci_, session_.get(), buf,
1968  consistency));
1969  if (!success) {
1970  return false;
1971  }
1972  // Update keyspace
1973  keyspace_ = keyspace;
1974  return success;
1975 }
1976 
1978  const std::string &compaction_strategy, CassConsistency consistency) {
1980  return false;
1981  }
1982  // There are two types of tables - Static (SQL) and Dynamic (NOSQL)
1983  // column family. Static column family has more or less fixed rows,
1984  // and dynamic column family has wide rows
1985  std::string query;
1986  switch (cf.cftype_) {
1988  {
1990  compaction_strategy);
1991  break;
1992  }
1994  {
1995  boost::system::error_code ec;
1997  compaction_strategy, &ec);
1998  if (ec) {
1999  return false;
2000  }
2001  break;
2002  }
2003  default:
2004  {
2005  return false;
2006  }
2007  }
2008  return impl::ExecuteQuerySync(cci_, schema_session_.get(), query.c_str(),
2009  consistency);
2010 }
2011 
2012 bool CqlIfImpl::CreateIndexIfNotExistsSync(const std::string &cfname,
2013  const std::string &column, const std::string &indexname,
2014  CassConsistency consistency, const GenDb::ColIndexMode::type index_mode) {
2016  return false;
2017  }
2018  std::string query(impl::CassCreateIndexIfNotExists(cfname, column,
2019  indexname, index_mode));
2020  return impl::ExecuteQuerySync(cci_, schema_session_.get(), query.c_str(),
2021  consistency);
2022 }
2023 
2025  const std::string &table_name(cf.cfname_);
2026  impl::CassPreparedPtr prepared(NULL, cci_);
2027  // Check if the prepared statement exists
2028  if (GetPrepareInsertIntoTable(table_name, &prepared)) {
2029  return true;
2030  }
2031  bool success(PrepareInsertIntoTableSync(cf, &prepared));
2032  if (!success) {
2033  return success;
2034  }
2035  // Store the prepared statement into the map
2036  tbb::mutex::scoped_lock lock(map_mutex_);
2037  success = (insert_prepared_map_.insert(
2038  std::make_pair(table_name, prepared))).second;
2039  assert(success);
2040  return success;
2041 }
2042 
2043 bool CqlIfImpl::GetPrepareInsertIntoTable(const std::string &table_name,
2044  impl::CassPreparedPtr *prepared) const {
2045  tbb::mutex::scoped_lock lock(map_mutex_);
2046  CassPreparedMapType::const_iterator it(
2047  insert_prepared_map_.find(table_name));
2048  if (it == insert_prepared_map_.end()) {
2049  return false;
2050  }
2051  *prepared = it->second;
2052  return true;
2053 }
2054 
2055 bool CqlIfImpl::IsTablePresent(const std::string &table) {
2057  return false;
2058  }
2060  table);
2061 }
2062 
2063 int CqlIfImpl::IsTableStatic(const std::string &table) {
2065  return false;
2066  }
2067  size_t ck_count;
2069  keyspace_, table, &ck_count)) {
2070  return -1;
2071  }
2072  if (ck_count == 0) {
2073  return 1;
2074  } else {
2075  return 0;
2076  }
2077 }
2078 
2079 bool CqlIfImpl::SelectFromTableAsync(const std::string &cfname,
2080  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
2083  return false;
2084  }
2085  std::string query(impl::PartitionKey2CassSelectFromTable(cfname,rkey));
2086  if (IsTableStatic(cfname) == 1) {
2088  query.c_str(), consistency, cb, cfname.c_str(), rkey);
2089  } else if (IsTableStatic(cfname) == 0) {
2090  size_t rk_count;
2092  keyspace_, cfname, &rk_count));
2093  size_t ck_count;
2095  keyspace_, cfname, &ck_count));
2097  query.c_str(), consistency, cb, rk_count, ck_count,
2098  cfname, rkey);
2099  } else {
2100  return false;
2101  }
2102 }
2103 
2105  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
2106  const GenDb::ColumnNameRange &ck_range,
2107  const GenDb::WhereIndexInfoVec &where_vec,
2108  const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency,
2111  return false;
2112  }
2113  std::string query(
2115  rkey, ck_range, where_vec, read_vec));
2116  assert(IsTableDynamic(cfname));
2117  size_t rk_count;
2119  keyspace_, cfname, &rk_count));
2120  size_t ck_count;
2122  keyspace_, cfname, &ck_count));
2124  query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2125  rkey);
2126 }
2127 
2129  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
2130  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2133  return false;
2134  }
2135  std::string query(
2137  rkey, ck_range));
2138  assert(IsTableDynamic(cfname));
2139  size_t rk_count;
2141  keyspace_, cfname, &rk_count));
2142  size_t ck_count;
2144  keyspace_, cfname, &ck_count));
2146  query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2147  rkey);
2148 }
2149 
2150 bool CqlIfImpl::IsTableDynamic(const std::string &table) {
2151  return !IsTableStatic(table);
2152 }
2153 
2154 bool CqlIfImpl::InsertIntoTableSync(std::auto_ptr<GenDb::ColList> v_columns,
2155  CassConsistency consistency) {
2156  return InsertIntoTableInternal(v_columns, consistency, true, NULL);
2157 }
2158 
2159 bool CqlIfImpl::InsertIntoTableAsync(std::auto_ptr<GenDb::ColList> v_columns,
2160  CassConsistency consistency, impl::CassAsyncQueryCallback cb) {
2161  return InsertIntoTableInternal(v_columns, consistency, false, cb);
2162 }
2163 
2164 bool CqlIfImpl::InsertIntoTablePrepareAsync(std::auto_ptr<GenDb::ColList> v_columns,
2165  CassConsistency consistency, impl::CassAsyncQueryCallback cb) {
2166  return InsertIntoTablePrepareInternal(v_columns, consistency, false,
2167  cb);
2168 }
2169 
2170 // COLLECTOR_GLOBAL_TABLE is dynamic table with 6 indexed columns
2171 // for object-id. Some of these might be blank. This creates tombstones
2172 // so we dont want to prepare for inserts.
2173 bool CqlIfImpl::IsInsertIntoTablePrepareSupported(const std::string &table) {
2174  return (IsTableDynamic(table)) &&
2175  (table != "MessageTablev2");
2176 }
2177 
2178 bool CqlIfImpl::SelectFromTableSync(const std::string &cfname,
2179  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
2180  GenDb::NewColVec *out) {
2182  return false;
2183  }
2184  std::string query(impl::PartitionKey2CassSelectFromTable(cfname,
2185  rkey));
2186  if (IsTableStatic(cfname) == 1) {
2188  query.c_str(), consistency, out);
2189  } else if (IsTableStatic(cfname) == 0){
2190  size_t rk_count;
2192  keyspace_, cfname, &rk_count));
2193  size_t ck_count;
2195  keyspace_, cfname, &ck_count));
2197  query.c_str(), rk_count, ck_count, consistency, out);
2198  } else {
2199  return false;
2200  }
2201 }
2202 
2203 bool CqlIfImpl::SelectFromTableSync(const std::string &cfname,
2204  CassConsistency consistency, GenDb::ColListVec *out) {
2206  return false;
2207  }
2208  std::string query(impl::CassSelectFromTable(cfname));
2209  size_t rk_count;
2211  keyspace_, cfname, &rk_count));
2212  if (IsTableStatic(cfname) == 1) {
2214  query.c_str(), rk_count, consistency, out);
2215  } else if (IsTableStatic(cfname) == 0){
2216  size_t ck_count;
2218  keyspace_, cfname, &ck_count));
2220  query.c_str(), rk_count, ck_count, consistency, out);
2221  } else {
2222  return false;
2223  }
2224 }
2225 
2226 
2228  const GenDb::DbDataValueVec &rkey,
2229  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2230  const GenDb::FieldNamesToReadVec &read_vec,
2231  GenDb::NewColVec *out) {
2233  return false;
2234  }
2235  std::string query(
2237  rkey, ck_range, read_vec));
2238  assert(IsTableDynamic(cfname));
2240  query.c_str(), read_vec, consistency, out);
2241 }
2242 
2244  const std::vector<GenDb::DbDataValueVec> &rkeys,
2245  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2246  const GenDb::FieldNamesToReadVec &read_vec,
2247  GenDb::ColListVec *out) {
2249  return false;
2250  }
2251  std::string query(
2253  rkeys, ck_range, read_vec));
2254  assert(IsTableDynamic(cfname));
2256  query.c_str(), read_vec, consistency, out);
2257 }
2258 
2259 
2261  const GenDb::DbDataValueVec &rkey,
2262  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2263  GenDb::NewColVec *out) {
2265  return false;
2266  }
2267  std::string query(
2269  rkey, ck_range));
2270  assert(IsTableDynamic(cfname));
2271  size_t rk_count;
2273  keyspace_, cfname, &rk_count));
2274  size_t ck_count;
2276  keyspace_, cfname, &ck_count));
2278  query.c_str(), rk_count, ck_count, consistency, out);
2279 }
2280 
2281 void CqlIfImpl::SetRequestTimeout(uint32_t timeout_ms) {
2282  CQLIF_DEBUG_TRACE("request timeout set to " << timeout_ms);
2283  cci_->CassClusterSetRequestTimeout(cluster_.get(), timeout_ms);
2284 }
2285 
2287  /* If Connect is called multiple times due to DB failure,
2288  * then it is better to delete previous session and use
2289  * a new one to avoid gradual leak.
2290  */
2291  schema_session_.reset();
2292  impl::CassSessionPtr schema_session(cci_->CassSessionNew(), cci_);
2293  schema_session_.swap(schema_session);
2294 
2295  // First set the cluster whitelist filtering to just one node
2297  schema_contact_point_.c_str());
2298 
2300  cluster_.get()), cci_);
2301  bool success(impl::SyncFutureWait(cci_, future.get()));
2302  if (success) {
2304  CQLIF_INFO_TRACE("ConnectSchemaSync Done");
2305  } else {
2306  CQLIF_ERR_TRACE("ConnectSchemaSync FAILED");
2307  }
2308 
2310  return success;
2311 }
2312 
2314  /* If Connect is called multiple times due to DB failure,
2315  * then it is better to delete previous session and use
2316  * a new one to avoid gradual leak.
2317  */
2318  session_.reset();
2320  session_.swap(session);
2321 
2323  cluster_.get()), cci_);
2324  bool success(impl::SyncFutureWait(cci_, future.get()));
2325  if (success) {
2327  CQLIF_INFO_TRACE("ConnectSync Done");
2328  } else {
2329  CQLIF_ERR_TRACE("ConnectSync FAILED");
2330  }
2331  return success;
2332 }
2333 
2335  // Close all session and pending queries
2337  bool success(impl::SyncFutureWait(cci_, future.get()));
2338  if (success) {
2340  CQLIF_INFO_TRACE("DisconnectSync Done");
2341  } else {
2342  CQLIF_ERR_TRACE("DisconnectSync FAILED");
2343  }
2344  return success;
2345 }
2346 
2348  // Close the schema session
2350  cci_);
2351  bool success(impl::SyncFutureWait(cci_, future.get()));
2352  if (success) {
2354  CQLIF_INFO_TRACE("DisconnectSchemaSync Done");
2355  } else {
2356  CQLIF_ERR_TRACE("DisconnectSchemaSync FAILED");
2357  }
2358  return success;
2359 }
2360 
2361 bool CqlIfImpl::GetMetrics(Metrics *metrics) const {
2363  return false;
2364  }
2365  CassMetrics cass_metrics;
2366  cci_->CassSessionGetMetrics(session_.get(), &cass_metrics);
2367  // Requests
2368  metrics->requests.min = cass_metrics.requests.min;
2369  metrics->requests.max = cass_metrics.requests.max;
2370  metrics->requests.mean = cass_metrics.requests.mean;
2371  metrics->requests.stddev = cass_metrics.requests.stddev;
2372  metrics->requests.median = cass_metrics.requests.median;
2373  metrics->requests.percentile_75th =
2374  cass_metrics.requests.percentile_75th;
2375  metrics->requests.percentile_95th =
2376  cass_metrics.requests.percentile_95th;
2377  metrics->requests.percentile_98th =
2378  cass_metrics.requests.percentile_98th;
2379  metrics->requests.percentile_99th =
2380  cass_metrics.requests.percentile_99th;
2381  metrics->requests.percentile_999th =
2382  cass_metrics.requests.percentile_999th;
2383  metrics->requests.mean_rate = cass_metrics.requests.mean_rate;
2384  metrics->requests.one_minute_rate =
2385  cass_metrics.requests.one_minute_rate;
2386  metrics->requests.five_minute_rate =
2387  cass_metrics.requests.five_minute_rate;
2388  metrics->requests.fifteen_minute_rate =
2389  cass_metrics.requests.fifteen_minute_rate;
2390  // Stats
2391  metrics->stats.total_connections =
2392  cass_metrics.stats.total_connections;
2393  metrics->stats.available_connections =
2394  cass_metrics.stats.available_connections;
2395  metrics->stats.exceeded_pending_requests_water_mark =
2396  cass_metrics.stats.exceeded_pending_requests_water_mark;
2397  metrics->stats.exceeded_write_bytes_water_mark =
2398  cass_metrics.stats.exceeded_write_bytes_water_mark;
2399  // Errors
2400  metrics->errors.connection_timeouts =
2401  cass_metrics.errors.connection_timeouts;
2402  metrics->errors.pending_request_timeouts =
2403  cass_metrics.errors.pending_request_timeouts;
2404  metrics->errors.request_timeouts =
2405  cass_metrics.errors.request_timeouts;
2406  return true;
2407 }
2408 
2409 bool CqlIfImpl::InsertIntoTableInternal(std::auto_ptr<GenDb::ColList> v_columns,
2410  CassConsistency consistency, bool sync,
2413  return false;
2414  }
2415  std::string query;
2416  if (IsTableStatic(v_columns->cfname_) == 1) {
2417  query = impl::StaticCf2CassInsertIntoTable(v_columns.get());
2418  } else if (IsTableStatic(v_columns->cfname_) == 0){
2419  query = impl::DynamicCf2CassInsertIntoTable(v_columns.get());
2420  } else {
2421  return false;
2422  }
2423  if (sync) {
2424  return impl::ExecuteQuerySync(cci_, session_.get(), query.c_str(),
2425  consistency);
2426  } else {
2427  impl::ExecuteQueryAsync(cci_, session_.get(), query.c_str(),
2428  consistency, cb);
2429  return true;
2430  }
2431 }
2432 
2434  impl::CassPreparedPtr *prepared) {
2436  return false;
2437  }
2438  std::string query;
2439  switch (cf.cftype_) {
2441  {
2443  break;
2444  }
2446  {
2447  boost::system::error_code ec;
2449  if (ec.value() != boost::system::errc::success) {
2450  return false;
2451  }
2452  break;
2453  }
2454  default:
2455  {
2456  return false;
2457  }
2458  }
2459  return impl::PrepareSync(cci_, schema_session_.get(), query.c_str(),
2460  prepared);
2461 }
2462 
2464  std::auto_ptr<GenDb::ColList> v_columns,
2465  CassConsistency consistency, bool sync,
2468  return false;
2469  }
2470  impl::CassPreparedPtr prepared(NULL, cci_);
2471  bool success(GetPrepareInsertIntoTable(v_columns->cfname_, &prepared));
2472  if (!success) {
2473  CQLIF_ERR_TRACE("CassPrepared statement NOT found: " <<
2474  v_columns->cfname_);
2475  return false;
2476  }
2477  impl::CassStatementPtr qstatement(cci_->CassPreparedBind(prepared.get()),
2478  cci_);
2479  if (IsTableStatic(v_columns->cfname_) == 1) {
2480  success = impl::StaticCf2CassPrepareBind(cci_, qstatement.get(),
2481  v_columns.get());
2482  } else if (IsTableStatic(v_columns->cfname_) == 0){
2483  success = impl::DynamicCf2CassPrepareBind(cci_, qstatement.get(),
2484  v_columns.get());
2485  } else {
2486  return false;
2487  }
2488  if (!success) {
2489  return false;
2490  }
2491  if (sync) {
2493  qstatement.get(), consistency);
2494  } else {
2495  std::string qid("Prepare: " + v_columns->cfname_);
2496  impl::ExecuteQueryStatementAsync(cci_, session_.get(), qid.c_str(),
2497  qstatement.get(), consistency, cb);
2498  return true;
2499  }
2500 }
2501 
2503  "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH "
2504  "replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }");
2505 const char * CqlIfImpl::kQUseKeyspace("USE \"%s\"");
2506 const char * CqlIfImpl::kTaskName("CqlIfImpl::Task");
2507 
2508 //
2509 // CqlIf
2510 //
2512  const std::vector<std::string> &cassandra_ips,
2513  int cassandra_port,
2514  const std::string &cassandra_user,
2515  const std::string &cassandra_password,
2516  bool use_ssl,
2517  const std::string &ca_certs_path,
2518  bool create_schema) :
2519  cci_(new interface::CassDatastaxLibrary),
2520  impl_(new CqlIfImpl(evm, cassandra_ips, cassandra_port,
2521  cassandra_user, cassandra_password, use_ssl,
2522  ca_certs_path, cci_.get())),
2523  use_prepared_for_insert_(true),
2524  create_schema_(create_schema) {
2525  // Setup library logging
2526  cci_->CassLogSetLevel(impl::Log4Level2CassLogLevel(
2527  log4cplus::Logger::getRoot().getLogLevel()));
2528  cci_->CassLogSetCallback(impl::CassLibraryLog, NULL);
2529  initialized_ = false;
2530  BOOST_FOREACH(const std::string &cassandra_ip, cassandra_ips) {
2531  boost::system::error_code ec;
2532  boost::asio::ip::address cassandra_addr(
2533  AddressFromString(cassandra_ip, &ec));
2534  GenDb::Endpoint endpoint(cassandra_addr, cassandra_port);
2535  endpoints_.push_back(endpoint);
2536  }
2537 }
2538 
2539 CqlIf::CqlIf() : impl_(NULL) {
2540 }
2541 
2543 }
2544 
2545 // Init/Uninit
2547  if (create_schema_) {
2548  impl_->SetRequestTimeout(GenDb::g_gendb_constants.SCHEMA_REQUEST_TIMEOUT);
2549  bool success(impl_->ConnectSchemaSync());
2550  if (!success) {
2551  return success;
2552  }
2553  }
2554  return impl_->ConnectSync();
2555 }
2556 
2558  if (create_schema_) {
2559  impl_->DisconnectSchemaSync();
2560  }
2561  impl_->DisconnectSync();
2562 }
2563 
2564 void CqlIf::Db_SetInitDone(bool init_done) {
2565  initialized_ = init_done;
2566  // No need for schema session if initialization is done
2567  if (create_schema_) {
2568  if (initialized_) {
2569  impl_->SetRequestTimeout(GenDb::g_gendb_constants.DEFAULT_REQUEST_TIMEOUT);
2570  impl_->DisconnectSchemaSync();
2571  }
2572  }
2573 }
2574 
2575 // Tablespace
2576 bool CqlIf::Db_AddSetTablespace(const std::string &tablespace,
2577  const std::string &replication_factor) {
2578  bool success(impl_->CreateKeyspaceIfNotExistsSync(tablespace,
2579  replication_factor, CASS_CONSISTENCY_QUORUM));
2580  if (!success) {
2582  return success;
2583  }
2584  success = impl_->UseKeyspaceSyncOnSchemaSession(tablespace,
2585  CASS_CONSISTENCY_ONE);
2586  if (!success) {
2588  return success;
2589  }
2590  return success;
2591 }
2592 
2593 bool CqlIf::Db_SetTablespace(const std::string &tablespace) {
2594  bool success(impl_->UseKeyspaceSync(tablespace, CASS_CONSISTENCY_ONE));
2595  if (!success) {
2597  return success;
2598  }
2599  return success;
2600 }
2601 
2602 // Column family
2604  const std::string &compaction_strategy) {
2605  bool success(
2606  impl_->CreateTableIfNotExistsSync(cf, compaction_strategy,
2607  CASS_CONSISTENCY_QUORUM));
2608  if (!success) {
2611  return success;
2612  }
2613  // Locate (add if not exists) INSERT INTO prepare statement
2614  success = impl_->LocatePrepareInsertIntoTable(cf);
2615  if (!success) {
2618  return success;
2619  }
2621  return success;
2622 }
2623 
2625  // Check existence of table
2626  return Db_UseColumnfamily(cf.cfname_);
2627 }
2628 
2629 bool CqlIf::Db_UseColumnfamily(const std::string &cfname) {
2630  // Check existence of table
2631  bool success(impl_->IsTablePresent(cfname));
2632  if (!success) {
2635  return success;
2636  }
2637  IncrementTableReadStats(cfname);
2638  return success;
2639 }
2640 
2641 // Index
2642 bool CqlIf::Db_CreateIndex(const std::string &cfname,
2643  const std::string &column, const std::string &indexname,
2644  const GenDb::ColIndexMode::type index_mode) {
2645  bool success(impl_->CreateIndexIfNotExistsSync(cfname, column, indexname,
2646  CASS_CONSISTENCY_QUORUM, index_mode));
2647  if (!success) {
2650  return success;
2651  }
2652  return success;
2653 }
2654 
2655 // Column
2657  std::auto_ptr<GenDb::ColList> row,
2658  std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb) {
2659  if (drc == GenDb::DbOpResult::OK) {
2660  IncrementTableWriteStats(cfname);
2661  } else if (drc == GenDb::DbOpResult::BACK_PRESSURE) {
2664  } else {
2667  }
2668  if (!cb.empty()) {
2669  cb(drc);
2670  }
2671 }
2672 
2675  GenDb::DbOpResult::type drc, std::auto_ptr<GenDb::ColList> row) :
2676  cb_(cb),
2677  drc_(drc),
2678  row_(row) {
2679  }
2682  std::auto_ptr<GenDb::ColList> row_;
2683 };
2684 
2686  boost::shared_ptr<AsyncRowGetCallbackContext> cb_ctx) {
2687  cb_ctx->cb_(cb_ctx->drc_, cb_ctx->row_);
2688 }
2689 
2691  std::auto_ptr<GenDb::ColList> row, std::string cfname,
2692  GenDb::GenDbIf::DbGetRowCb cb, bool use_worker, int task_id,
2693  int task_instance) {
2694  if (drc == GenDb::DbOpResult::OK) {
2695  IncrementTableReadStats(cfname);
2696  } else if (drc == GenDb::DbOpResult::BACK_PRESSURE) {
2699  } else {
2702  }
2703  if (use_worker) {
2704  if (!cb.empty()) {
2705  boost::shared_ptr<AsyncRowGetCallbackContext> ctx(
2706  new AsyncRowGetCallbackContext(cb, drc, row));
2707  impl::WorkerTask *worker(new impl::WorkerTask(
2708  boost::bind(&AsyncRowGetCompletionCallback, ctx),
2709  task_id, task_instance));
2711  scheduler->Enqueue(worker);
2712  }
2713  } else {
2714  if (!cb.empty()) {
2715  cb(drc, row);
2716  }
2717  }
2718 }
2719 
2721  std::auto_ptr<GenDb::ColList> row, std::string cfname,
2723  OnAsyncRowGetCompletion(drc, row, cfname, cb, false, -1, -2);
2724 }
2725 bool CqlIf::Db_AddColumn(std::auto_ptr<GenDb::ColList> cl,
2726  GenDb::DbConsistency::type dconsistency,
2728  std::string cfname(cl->cfname_);
2729  if (!initialized_) {
2732  return false;
2733  }
2734  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2735  bool success;
2737  impl_->IsInsertIntoTablePrepareSupported(cfname)) {
2738  success = impl_->InsertIntoTablePrepareAsync(cl, consistency,
2739  boost::bind(&CqlIf::OnAsyncColumnAddCompletion, this, _1, _2, cfname,
2740  cb));
2741  } else {
2742  success = impl_->InsertIntoTableAsync(cl, consistency,
2743  boost::bind(&CqlIf::OnAsyncColumnAddCompletion, this, _1, _2, cfname,
2744  cb));
2745  }
2746  if (!success) {
2749  return success;
2750  }
2751  return success;
2752 }
2753 
2754 bool CqlIf::Db_AddColumnSync(std::auto_ptr<GenDb::ColList> cl,
2755  GenDb::DbConsistency::type dconsistency) {
2756  std::string cfname(cl->cfname_);
2757  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2758  bool success(impl_->InsertIntoTableSync(cl, consistency));
2759  if (!success) {
2762  return success;
2763  }
2764  IncrementTableWriteStats(cfname);
2765  return success;
2766 }
2767 
2768 // Read
2769 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2770  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2772  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2773  bool success(impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2774  crange, consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this,
2775  _1, _2, cfname, cb)));
2776  if (!success) {
2779  }
2780  return success;
2781 }
2782 
2783 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2784  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2785  GenDb::DbConsistency::type dconsistency, int task_id, int task_instance,
2787  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2788  bool success(impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2789  crange, consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this,
2790  _1, _2, cfname, cb, true, task_id, task_instance)));
2791  if (!success) {
2794  }
2795  return success;
2796 }
2797 
2798 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2799  const GenDb::DbDataValueVec &rowkey,
2800  GenDb::DbConsistency::type dconsistency,
2802  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2803  bool success(impl_->SelectFromTableAsync(cfname, rowkey,
2804  consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2,
2805  cfname, cb)));
2806  if (!success) {
2809  }
2810  return success;
2811 }
2812 
2813 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2814  const GenDb::DbDataValueVec &rowkey,
2815  GenDb::DbConsistency::type dconsistency, int task_id, int task_instance,
2817  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2818  bool success(impl_->SelectFromTableAsync(cfname, rowkey,
2819  consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2,
2820  cfname, cb, true, task_id, task_instance)));
2821  if (!success) {
2824  }
2825  return success;
2826 }
2827 
2828 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2829  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2830  const GenDb::WhereIndexInfoVec &where_vec,
2832  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2833  bool success(impl_->SelectFromTableClusteringKeyRangeAndIndexValueAsync(cfname,
2834  rowkey, crange, where_vec, GenDb::FieldNamesToReadVec(), consistency,
2835  boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2, cfname, cb)));
2836  if (!success) {
2839  }
2840  return success;
2841 }
2842 
2843 bool CqlIf::Db_GetRow(GenDb::ColList *out, const std::string &cfname,
2844  const GenDb::DbDataValueVec &rowkey,
2845  GenDb::DbConsistency::type dconsistency) {
2846  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2847  bool success(impl_->SelectFromTableSync(cfname, rowkey,
2848  consistency, &out->columns_));
2849  if (!success) {
2852  return success;
2853  }
2854  IncrementTableReadStats(cfname);
2855  return success;
2856 }
2857 
2858 bool CqlIf::Db_GetRow(GenDb::ColList *out, const std::string &cfname,
2859  const GenDb::DbDataValueVec &rowkey,
2860  GenDb::DbConsistency::type dconsistency,
2861  const GenDb::ColumnNameRange &crange,
2862  const GenDb::FieldNamesToReadVec &read_vec) {
2863  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2864  bool success(impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2865  rowkey, crange, consistency, read_vec, &out->columns_));
2866  if (!success) {
2869  return success;
2870  }
2871  IncrementTableReadStats(cfname);
2872  return success;
2873 }
2874 
2875 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2876  const std::vector<GenDb::DbDataValueVec> &v_rowkey) {
2877  BOOST_FOREACH(const GenDb::DbDataValueVec &rkey, v_rowkey) {
2878  std::auto_ptr<GenDb::ColList> v_columns(new GenDb::ColList);
2879  // Partition Key
2880  v_columns->rowkey_ = rkey;
2881  bool success(impl_->SelectFromTableSync(cfname, rkey,
2882  CASS_CONSISTENCY_ONE, &v_columns->columns_));
2883  if (!success) {
2884  CQLIF_ERR_TRACE("SELECT FROM Table: " << cfname << " Partition Key: "
2885  << GenDb::DbDataValueVecToString(rkey) << " FAILED");
2888  return false;
2889  }
2890  out->push_back(v_columns.release());
2891  }
2892  IncrementTableReadStats(cfname, v_rowkey.size());
2893  return true;
2894 }
2895 
2896 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2897  const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2898  const GenDb::ColumnNameRange &crange) {
2899  BOOST_FOREACH(const GenDb::DbDataValueVec &rkey, v_rowkey) {
2900  std::auto_ptr<GenDb::ColList> v_columns(new GenDb::ColList);
2901  // Partition Key
2902  v_columns->rowkey_ = rkey;
2903  bool success(impl_->SelectFromTableClusteringKeyRangeSync(cfname,
2904  rkey, crange, CASS_CONSISTENCY_ONE, &v_columns->columns_));
2905  if (!success) {
2906  CQLIF_ERR_TRACE("SELECT FROM Table: " << cfname << " Partition Key: "
2907  << GenDb::DbDataValueVecToString(rkey) <<
2908  " Clustering Key Range: " << crange.ToString() << " FAILED");
2911  return false;
2912  }
2913  out->push_back(v_columns.release());
2914  }
2915  IncrementTableReadStats(cfname, v_rowkey.size());
2916  return true;
2917 }
2918 
2919 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2920  const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2921  const GenDb::ColumnNameRange &crange,
2922  const GenDb::FieldNamesToReadVec &read_vec,
2923  GenDb::DbConsistency::type dconsistency) {
2924  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2925  bool success(impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2926  v_rowkey, crange, consistency, read_vec, out));
2927  if (!success) {
2930  return false;
2931  }
2932  IncrementTableReadStats(cfname, v_rowkey.size());
2933  return true;
2934 }
2935 
2936 bool CqlIf::Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname,
2937  GenDb::DbConsistency::type dconsistency) {
2938  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2939  bool success(impl_->SelectFromTableSync(cfname, consistency, out));
2940  if (!success) {
2943  return success;
2944  }
2945  IncrementTableReadStats(cfname);
2946  return success;
2947 }
2948 
2949 // Queue
2950 bool CqlIf::Db_GetQueueStats(uint64_t *queue_count,
2951  uint64_t *enqueues) const {
2952  //return impl_->Db_GetQueueStats(queue_count, enqueues);
2953  return true;
2954 }
2955 
2956 void CqlIf::Db_SetQueueWaterMark(bool high, size_t queue_count,
2958  //impl_->Db_SetQueueWaterMark(high, queue_count, cb);
2959 }
2960 
2962  //impl_->Db_ResetQueueWaterMarks();
2963 }
2964 
2965 // Stats
2966 bool CqlIf::Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
2967  GenDb::DbErrors *dbe) {
2968  tbb::mutex::scoped_lock lock(stats_mutex_);
2969  stats_.GetDiffs(vdbti, dbe);
2970  return true;
2971 }
2972 
2973 bool CqlIf::Db_GetCumulativeStats(std::vector<GenDb::DbTableInfo> *vdbti,
2974  GenDb::DbErrors *dbe) const {
2975  tbb::mutex::scoped_lock lock(stats_mutex_);
2976  stats_.GetCumulative(vdbti, dbe);
2977  return true;
2978 }
2979 
2980 bool CqlIf::Db_GetCqlMetrics(Metrics *metrics) const {
2981  return impl_->GetMetrics(metrics);
2982 }
2983 
2984 bool CqlIf::Db_GetCqlStats(DbStats *db_stats) const {
2985  Metrics metrics;
2986  bool success(impl_->GetMetrics(&metrics));
2987  if (!success) {
2988  return success;
2989  }
2990  db_stats->requests_one_minute_rate = metrics.requests.one_minute_rate;
2991  db_stats->stats = metrics.stats;
2992  db_stats->errors = metrics.errors;
2993  return success;
2994 }
2995 
2996 void CqlIf::IncrementTableWriteStats(const std::string &table_name) {
2997  tbb::mutex::scoped_lock lock(stats_mutex_);
2998  stats_.IncrementTableWrite(table_name);
2999 }
3000 
3001 void CqlIf::IncrementTableWriteStats(const std::string &table_name,
3002  uint64_t num_writes) {
3003  tbb::mutex::scoped_lock lock(stats_mutex_);
3004  stats_.IncrementTableWrite(table_name, num_writes);
3005 }
3006 
3007 void CqlIf::IncrementTableWriteFailStats(const std::string &table_name) {
3008  tbb::mutex::scoped_lock lock(stats_mutex_);
3009  stats_.IncrementTableWriteFail(table_name);
3010 }
3011 
3012 void CqlIf::IncrementTableWriteFailStats(const std::string &table_name,
3013  uint64_t num_writes) {
3014  tbb::mutex::scoped_lock lock(stats_mutex_);
3015  stats_.IncrementTableWriteFail(table_name, num_writes);
3016 }
3017 
3019  const std::string &table_name) {
3020  tbb::mutex::scoped_lock lock(stats_mutex_);
3022 }
3023 
3025  const std::string &table_name) {
3026  tbb::mutex::scoped_lock lock(stats_mutex_);
3028 }
3029 
3030 void CqlIf::IncrementTableReadStats(const std::string &table_name) {
3031  tbb::mutex::scoped_lock lock(stats_mutex_);
3032  stats_.IncrementTableRead(table_name);
3033 }
3034 
3035 void CqlIf::IncrementTableReadStats(const std::string &table_name,
3036  uint64_t num_reads) {
3037  tbb::mutex::scoped_lock lock(stats_mutex_);
3038  stats_.IncrementTableRead(table_name, num_reads);
3039 }
3040 
3041 void CqlIf::IncrementTableReadFailStats(const std::string &table_name) {
3042  tbb::mutex::scoped_lock lock(stats_mutex_);
3043  stats_.IncrementTableReadFail(table_name);
3044 }
3045 
3046 void CqlIf::IncrementTableReadFailStats(const std::string &table_name,
3047  uint64_t num_reads) {
3048  tbb::mutex::scoped_lock lock(stats_mutex_);
3049  stats_.IncrementTableReadFail(table_name, num_reads);
3050 }
3051 
3053  tbb::mutex::scoped_lock lock(stats_mutex_);
3054  stats_.IncrementErrors(err_type);
3055 }
3056 
3057 // Connection
3058 std::vector<GenDb::Endpoint> CqlIf::Db_GetEndpoints() const {
3059  return endpoints_;
3060 }
3061 
3062 namespace interface {
3063 
3064 //
3065 // CassDatastaxLibrary
3066 //
3068 }
3069 
3071 }
3072 
3073 // CassCluster
3075  return cass_cluster_new();
3076 }
3077 
3078 void CassDatastaxLibrary::CassClusterFree(CassCluster* cluster) {
3079  cass_cluster_free(cluster);
3080 }
3081 
3083  CassCluster* cluster, const char* contact_points) {
3084  return cass_cluster_set_contact_points(cluster, contact_points);
3085 }
3086 
3087 CassError CassDatastaxLibrary::CassClusterSetPort(CassCluster* cluster,
3088  int port) {
3089  return cass_cluster_set_port(cluster, port);
3090 }
3091 
3092 void CassDatastaxLibrary::CassClusterSetSsl(CassCluster* cluster, CassSsl* ssl) {
3093  cass_cluster_set_ssl(cluster, ssl);
3094 }
3095 
3097  const char* username, const char* password) {
3098  cass_cluster_set_credentials(cluster, username, password);
3099 }
3100 
3102  unsigned num_threads) {
3103  return cass_cluster_set_num_threads_io(cluster, num_threads);
3104 }
3105 
3107  CassCluster* cluster, unsigned num_requests) {
3108  return cass_cluster_set_pending_requests_high_water_mark(cluster,
3109  num_requests);
3110 }
3111 
3113  CassCluster* cluster, unsigned num_requests) {
3114  return cass_cluster_set_pending_requests_low_water_mark(cluster,
3115  num_requests);
3116 }
3117 
3119  CassCluster* cluster, unsigned num_bytes) {
3120  return cass_cluster_set_write_bytes_high_water_mark(cluster, num_bytes);
3121 }
3122 
3124  CassCluster* cluster, unsigned num_bytes) {
3125  return cass_cluster_set_write_bytes_low_water_mark(cluster, num_bytes);
3126 }
3127 
3129  CassCluster* cluster, const char* hosts) {
3130  cass_cluster_set_whitelist_filtering(cluster, hosts);
3131 }
3132 
3133 // CassSsl
3135  return cass_ssl_new();
3136 }
3137 
3139  return cass_ssl_free(ssl);
3140 }
3141 
3143  const std::string &cert) {
3144  return cass_ssl_add_trusted_cert_n(ssl, cert.c_str(), cert.length());
3145 }
3146 
3147 void CassDatastaxLibrary::CassSslSetVerifyFlags(CassSsl* ssl, int flags) {
3148  cass_ssl_set_verify_flags(ssl, flags);
3149 }
3150 
3151 // CassSession
3153  return cass_session_new();
3154 }
3155 
3156 void CassDatastaxLibrary::CassSessionFree(CassSession* session) {
3157  cass_session_free(session);
3158 }
3159 
3161  unsigned timeout_ms) {
3162  return cass_cluster_set_request_timeout(cluster, timeout_ms);
3163 }
3164 
3165 CassFuture* CassDatastaxLibrary::CassSessionConnect(CassSession* session,
3166  const CassCluster* cluster) {
3167  return cass_session_connect(session, cluster);
3168 }
3169 
3170 CassFuture* CassDatastaxLibrary::CassSessionClose(CassSession* session) {
3171  return cass_session_close(session);
3172 }
3173 
3174 CassFuture* CassDatastaxLibrary::CassSessionExecute(CassSession* session,
3175  const CassStatement* statement) {
3176  return cass_session_execute(session, statement);
3177 }
3178 
3180  const CassSession* session) {
3181  return cass_session_get_schema_meta(session);
3182 }
3183 
3184 CassFuture* CassDatastaxLibrary::CassSessionPrepare(CassSession* session,
3185  const char* query) {
3186  return cass_session_prepare(session, query);
3187 }
3188 
3189 void CassDatastaxLibrary::CassSessionGetMetrics(const CassSession* session,
3190  CassMetrics* output) {
3191  cass_session_get_metrics(session, output);
3192 }
3193 
3194 // CassSchema
3196  const CassSchemaMeta* schema_meta) {
3197  cass_schema_meta_free(schema_meta);
3198 }
3199 
3201  const CassSchemaMeta* schema_meta, const char* keyspace) {
3202  return cass_schema_meta_keyspace_by_name(schema_meta, keyspace);
3203 }
3204 
3206  const CassKeyspaceMeta* keyspace_meta, const char* table) {
3207  return cass_keyspace_meta_table_by_name(keyspace_meta, table);
3208 }
3209 
3211  const CassTableMeta* table_meta) {
3212  return cass_table_meta_partition_key_count(table_meta);
3213 }
3214 
3216  const CassTableMeta* table_meta) {
3217  return cass_table_meta_clustering_key_count(table_meta);
3218 }
3219 
3220 // CassFuture
3221 void CassDatastaxLibrary::CassFutureFree(CassFuture* future) {
3222  cass_future_free(future);
3223 }
3224 
3225 CassError CassDatastaxLibrary::CassFutureSetCallback(CassFuture* future,
3226  CassFutureCallback callback, void* data) {
3227  return cass_future_set_callback(future, callback, data);
3228 }
3229 
3230 void CassDatastaxLibrary::CassFutureWait(CassFuture* future) {
3231  cass_future_wait(future);
3232 }
3233 
3235  CassFuture* future) {
3236  return cass_future_get_result(future);
3237 }
3238 
3240  const char** message, size_t* message_length) {
3241  cass_future_error_message(future, message, message_length);
3242 }
3243 
3244 CassError CassDatastaxLibrary::CassFutureErrorCode(CassFuture* future) {
3245  return cass_future_error_code(future);
3246 }
3247 
3249  CassFuture* future) {
3250  return cass_future_get_prepared(future);
3251 }
3252 
3253 // CassResult
3254 void CassDatastaxLibrary::CassResultFree(const CassResult* result) {
3255  cass_result_free(result);
3256 }
3257 
3258 size_t CassDatastaxLibrary::CassResultColumnCount(const CassResult* result) {
3259  return cass_result_column_count(result);
3260 }
3261 
3262 CassError CassDatastaxLibrary::CassResultColumnName(const CassResult *result,
3263  size_t index, const char** name, size_t* name_length) {
3264  return cass_result_column_name(result, index, name, name_length);
3265 }
3266 
3267 // CassIterator
3268 void CassDatastaxLibrary::CassIteratorFree(CassIterator* iterator) {
3269  cass_iterator_free(iterator);
3270 }
3271 
3273  const CassResult* result) {
3274  return cass_iterator_from_result(result);
3275 }
3276 
3277 cass_bool_t CassDatastaxLibrary::CassIteratorNext(CassIterator* iterator) {
3278  return cass_iterator_next(iterator);
3279 }
3280 
3282  const CassIterator* iterator) {
3283  return cass_iterator_get_row(iterator);
3284 }
3285 
3286 // CassStatement
3287 CassStatement* CassDatastaxLibrary::CassStatementNew(const char* query,
3288  size_t parameter_count) {
3289  return cass_statement_new(query, parameter_count);
3290 }
3291 
3292 void CassDatastaxLibrary::CassStatementFree(CassStatement* statement) {
3293  cass_statement_free(statement);
3294 }
3295 
3297  CassStatement* statement, CassConsistency consistency) {
3298  return cass_statement_set_consistency(statement, consistency);
3299 }
3300 
3302  CassStatement* statement,
3303  size_t index, const char* value, size_t value_length) {
3304  return cass_statement_bind_string_n(statement, index, value, value_length);
3305 }
3306 
3307 CassError CassDatastaxLibrary::CassStatementBindInt32(CassStatement* statement,
3308  size_t index, cass_int32_t value) {
3309  return cass_statement_bind_int32(statement, index, value);
3310 }
3311 
3312 CassError CassDatastaxLibrary::CassStatementBindInt64(CassStatement* statement,
3313  size_t index, cass_int64_t value) {
3314  return cass_statement_bind_int64(statement, index, value);
3315 }
3316 
3317 CassError CassDatastaxLibrary::CassStatementBindUuid(CassStatement* statement,
3318  size_t index, CassUuid value) {
3319  return cass_statement_bind_uuid(statement, index, value);
3320 }
3321 
3323  CassStatement* statement, size_t index, cass_double_t value) {
3324  return cass_statement_bind_double(statement, index, value);
3325 }
3326 
3327 CassError CassDatastaxLibrary::CassStatementBindInet(CassStatement* statement,
3328  size_t index, CassInet value) {
3329  return cass_statement_bind_inet(statement, index, value);
3330 }
3331 
3333  CassStatement* statement,
3334  size_t index, const cass_byte_t* value, size_t value_length) {
3335  return cass_statement_bind_bytes(statement, index, value, value_length);
3336 }
3337 
3339  CassStatement* statement,
3340  const char* name, size_t name_length, const char* value,
3341  size_t value_length) {
3342  return cass_statement_bind_string_by_name_n(statement, name, name_length,
3343  value, value_length);
3344 }
3345 
3347  CassStatement* statement, const char* name, cass_int32_t value) {
3348  return cass_statement_bind_int32_by_name(statement, name, value);
3349 }
3350 
3352  CassStatement* statement, const char* name, cass_int64_t value) {
3353  return cass_statement_bind_int64_by_name(statement, name, value);
3354 }
3355 
3357  CassStatement* statement, const char* name, CassUuid value) {
3358  return cass_statement_bind_uuid_by_name(statement, name, value);
3359 }
3360 
3362  CassStatement* statement, const char* name, cass_double_t value) {
3363  return cass_statement_bind_double_by_name(statement, name, value);
3364 }
3365 
3367  CassStatement* statement, const char* name, CassInet value) {
3368  return cass_statement_bind_inet_by_name(statement, name, value);
3369 }
3370 
3372  CassStatement* statement,
3373  const char* name, size_t name_length, const cass_byte_t* value,
3374  size_t value_length) {
3375  return cass_statement_bind_bytes_by_name_n(statement, name, name_length,
3376  value, value_length);
3377 }
3378 
3379 // CassPrepare
3380 void CassDatastaxLibrary::CassPreparedFree(const CassPrepared* prepared) {
3381  cass_prepared_free(prepared);
3382 }
3383 
3385  const CassPrepared* prepared) {
3386  return cass_prepared_bind(prepared);
3387 }
3388 
3389 // CassValue
3390 CassValueType CassDatastaxLibrary::GetCassValueType(const CassValue* value) {
3391  return cass_value_type(value);
3392 }
3393 
3394 CassError CassDatastaxLibrary::CassValueGetString(const CassValue* value,
3395  const char** output, size_t* output_size) {
3396  return cass_value_get_string(value, output, output_size);
3397 }
3398 
3399 CassError CassDatastaxLibrary::CassValueGetInt8(const CassValue* value,
3400  cass_int8_t* output) {
3401  return cass_value_get_int8(value, output);
3402 }
3403 
3404 CassError CassDatastaxLibrary::CassValueGetInt16(const CassValue* value,
3405  cass_int16_t* output) {
3406  return cass_value_get_int16(value, output);
3407 }
3408 
3409 CassError CassDatastaxLibrary::CassValueGetInt32(const CassValue* value,
3410  cass_int32_t* output) {
3411  return cass_value_get_int32(value, output);
3412 }
3413 
3414 CassError CassDatastaxLibrary::CassValueGetInt64(const CassValue* value,
3415  cass_int64_t* output) {
3416  return cass_value_get_int64(value, output);
3417 }
3418 
3419 CassError CassDatastaxLibrary::CassValueGetUuid(const CassValue* value,
3420  CassUuid* output) {
3421  return cass_value_get_uuid(value, output);
3422 }
3423 
3424 CassError CassDatastaxLibrary::CassValueGetDouble(const CassValue* value,
3425  cass_double_t* output) {
3426  return cass_value_get_double(value, output);
3427 }
3428 
3429 CassError CassDatastaxLibrary::CassValueGetInet(const CassValue* value,
3430  CassInet* output) {
3431  return cass_value_get_inet(value, output);
3432 }
3433 
3434 CassError CassDatastaxLibrary::CassValueGetBytes(const CassValue* value,
3435  const cass_byte_t** output, size_t* output_size) {
3436  return cass_value_get_bytes(value, output, output_size);
3437 }
3438 
3439 cass_bool_t CassDatastaxLibrary::CassValueIsNull(const CassValue* value) {
3440  return cass_value_is_null(value);
3441 }
3442 
3443 // CassInet
3445  const cass_uint8_t* address) {
3446  return cass_inet_init_v4(address);
3447 }
3448 
3450  const cass_uint8_t* address) {
3451  return cass_inet_init_v6(address);
3452 }
3453 
3454 // CassRow
3455 const CassValue* CassDatastaxLibrary::CassRowGetColumn(const CassRow* row,
3456  size_t index) {
3457  return cass_row_get_column(row, index);
3458 }
3459 
3460 // CassLog
3461 void CassDatastaxLibrary::CassLogSetLevel(CassLogLevel log_level) {
3462  cass_log_set_level(log_level);
3463 }
3464 
3465 void CassDatastaxLibrary::CassLogSetCallback(CassLogCallback callback,
3466  void* data) {
3467  cass_log_set_callback(callback, data);
3468 }
3469 
3470 } // namespace interface
3471 } // namespace cql
3472 } // namespace cass
boost::asio::ip::address_v6 Ip6Address
Definition: address.h:15
boost::asio::ip::address IpAddress
Definition: address.h:13
boost::asio::ip::address_v4 Ip4Address
Definition: address.h:14
std::string GetHostIp(boost::asio::io_context *io_service, const std::string &hostname)
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
boost::asio::io_context * io_service()
Definition: event_manager.h:42
void GetDiffs(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
void IncrementTableReadFail(const std::string &table_name)
void IncrementTableRead(const std::string &table_name)
void IncrementErrors(IfErrors::Type type)
void IncrementTableWriteBackPressureFail(const std::string &table_name)
void IncrementTableReadBackPressureFail(const std::string &table_name)
void IncrementTableWriteFail(const std::string &table_name)
void GetCumulative(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
void IncrementTableWrite(const std::string &table_name)
boost::function< void(size_t)> DbQueueWaterMarkCb
Definition: gendb_if.h:253
boost::function< void(DbOpResult::type, std::auto_ptr< ColList >)> DbGetRowCb
Definition: gendb_if.h:256
boost::function< void(DbOpResult::type)> DbAddColumnCb
Definition: gendb_if.h:254
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
Definition: task.cc:636
static TaskScheduler * GetInstance()
Definition: task.cc:547
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
impl::CassSessionPtr session_
Definition: cql_if_impl.h:346
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2260
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2159
bool DisconnectSchemaSync()
Definition: cql_if.cc:2347
impl::CassClusterPtr cluster_
Definition: cql_if_impl.h:344
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
Definition: cql_if.cc:1916
bool IsTableDynamic(const std::string &table)
Definition: cql_if.cc:2150
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
Definition: cql_if.cc:2433
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:2012
impl::CassSslPtr ssl_
Definition: cql_if_impl.h:345
impl::CassSessionPtr schema_session_
Definition: cql_if_impl.h:347
tbb::mutex map_mutex_
Definition: cql_if_impl.h:356
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2409
static const char * kTaskName
Definition: cql_if_impl.h:331
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2164
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
Definition: cql_if.cc:2043
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2079
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
Definition: cql_if.cc:2227
virtual ~CqlIfImpl()
Definition: cql_if.cc:1909
bool GetMetrics(Metrics *metrics) const
Definition: cql_if.cc:2361
tbb::atomic< SessionState::type > schema_session_state_
Definition: cql_if_impl.h:349
void SetRequestTimeout(uint32_t timeout_ms)
Definition: cql_if.cc:2281
tbb::atomic< SessionState::type > session_state_
Definition: cql_if_impl.h:348
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2463
CassPreparedMapType insert_prepared_map_
Definition: cql_if_impl.h:355
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
Definition: cql_if.cc:1977
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1933
bool ConnectSchemaSync()
Definition: cql_if.cc:2286
static const char * kQCreateKeyspaceIfNotExists
Definition: cql_if_impl.h:329
bool IsTablePresent(const std::string &table)
Definition: cql_if.cc:2055
std::string schema_contact_point_
Definition: cql_if_impl.h:350
interface::CassLibrary * cci_
Definition: cql_if_impl.h:343
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1955
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:2024
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2104
bool IsInsertIntoTablePrepareSupported(const std::string &table)
Definition: cql_if.cc:2173
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2178
std::string keyspace_
Definition: cql_if_impl.h:351
static const char * kQUseKeyspace
Definition: cql_if_impl.h:330
int IsTableStatic(const std::string &table)
Definition: cql_if.cc:2063
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
Definition: cql_if.cc:2154
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2128
boost::scoped_ptr< CqlIfImpl > impl_
Definition: cql_if.h:151
boost::scoped_ptr< interface::CassLibrary > cci_
Definition: cql_if.h:150
bool create_schema_
Definition: cql_if.h:157
void IncrementTableWriteStats(const std::string &table_name)
Definition: cql_if.cc:2996
tbb::mutex stats_mutex_
Definition: cql_if.h:154
tbb::atomic< bool > initialized_
Definition: cql_if.h:152
virtual void Db_SetQueueWaterMark(bool high, size_t queue_count, DbQueueWaterMarkCb cb)
Definition: cql_if.cc:2956
virtual bool Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname, const std::vector< GenDb::DbDataValueVec > &v_rowkey)
Definition: cql_if.cc:2875
virtual bool Db_GetCumulativeStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
Definition: cql_if.cc:2973
virtual bool Db_GetRow(GenDb::ColList *out, const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2843
virtual bool Db_CreateIndex(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode=GenDb::ColIndexMode::NONE)
Definition: cql_if.cc:2642
virtual void Db_SetInitDone(bool)
Definition: cql_if.cc:2564
virtual bool Db_GetCqlMetrics(Metrics *metrics) const
Definition: cql_if.cc:2980
GenDb::GenDbIfStats stats_
Definition: cql_if.h:155
virtual bool Db_AddSetTablespace(const std::string &tablespace, const std::string &replication_factor="1")
Definition: cql_if.cc:2576
virtual void Db_ResetQueueWaterMarks()
Definition: cql_if.cc:2961
virtual bool Db_GetCqlStats(DbStats *db_stats) const
Definition: cql_if.cc:2984
void IncrementErrors(GenDb::IfErrors::Type err_type)
Definition: cql_if.cc:3052
virtual bool Db_GetRowAsync(const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbGetRowCb cb)
Definition: cql_if.cc:2798
void IncrementTableWriteFailStats(const std::string &table_name)
Definition: cql_if.cc:3007
virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:2603
virtual bool Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2936
virtual std::vector< GenDb::Endpoint > Db_GetEndpoints() const
Definition: cql_if.cc:3058
virtual bool Db_AddColumn(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbAddColumnCb cb)
Definition: cql_if.cc:2725
virtual bool Db_AddColumnSync(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2754
void IncrementTableReadFailStats(const std::string &table_name)
Definition: cql_if.cc:3041
virtual void Db_Uninit()
Definition: cql_if.cc:2557
void IncrementTableReadBackPressureFailStats(const std::string &table_name)
Definition: cql_if.cc:3024
virtual ~CqlIf()
Definition: cql_if.cc:2542
virtual bool Db_GetQueueStats(uint64_t *queue_count, uint64_t *enqueues) const
Definition: cql_if.cc:2950
virtual bool Db_SetTablespace(const std::string &tablespace)
Definition: cql_if.cc:2593
void IncrementTableWriteBackPressureFailStats(const std::string &table_name)
Definition: cql_if.cc:3018
void OnAsyncColumnAddCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb)
Definition: cql_if.cc:2656
virtual bool Db_GetStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
Definition: cql_if.cc:2966
void OnAsyncRowGetCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbGetRowCb cb)
Definition: cql_if.cc:2720
std::vector< GenDb::Endpoint > endpoints_
Definition: cql_if.h:153
virtual bool Db_Init()
Definition: cql_if.cc:2546
void IncrementTableReadStats(const std::string &table_name)
Definition: cql_if.cc:3030
bool use_prepared_for_insert_
Definition: cql_if.h:156
virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf)
Definition: cql_if.cc:2624
void operator()(const boost::uuids::uuid &tuuid) const
Definition: cql_if.cc:252
void operator()(const uint64_t &tu64) const
Definition: cql_if.cc:272
CassQueryPrinter(std::ostream &os, bool quote_strings)
Definition: cql_if.cc:240
void operator()(const IpAddress &tipaddr) const
Definition: cql_if.cc:275
void operator()(const std::string &tstring) const
Definition: cql_if.cc:260
void operator()(const uint32_t &tu32) const
Definition: cql_if.cc:268
void operator()(const T &t) const
Definition: cql_if.cc:249
void operator()(const uint8_t &tu8) const
Definition: cql_if.cc:257
CassQueryPrinter(std::ostream &os)
Definition: cql_if.cc:244
void operator()(const double &tdouble, size_t index) const
Definition: cql_if.cc:326
void operator()(const IpAddress &tipaddr, size_t index) const
Definition: cql_if.cc:331
void operator()(const boost::uuids::uuid &tuuid, size_t index) const
Definition: cql_if.cc:300
void operator()(const uint16_t &tu16, size_t index) const
Definition: cql_if.cc:310
void operator()(const std::string &tstring, size_t index) const
Definition: cql_if.cc:295
void operator()(const uint32_t &tu32, size_t index) const
Definition: cql_if.cc:314
CassStatementIndexBinder(interface::CassLibrary *cci, CassStatement *statement)
Definition: cql_if.cc:287
void operator()(const uint64_t &tu64, size_t index) const
Definition: cql_if.cc:320
void operator()(const GenDb::Blob &tblob, size_t index) const
Definition: cql_if.cc:344
void operator()(const uint8_t &tu8, size_t index) const
Definition: cql_if.cc:306
void operator()(const boost::blank &tblank, size_t index) const
Definition: cql_if.cc:292
interface::CassLibrary * cci_
Definition: cql_if.cc:349
void operator()(const uint32_t &tu32, const char *name) const
Definition: cql_if.cc:385
CassStatementNameBinder(interface::CassLibrary *cci, CassStatement *statement)
Definition: cql_if.cc:355
void operator()(const std::string &tstring, const char *name) const
Definition: cql_if.cc:363
void operator()(const boost::blank &tblank, const char *name) const
Definition: cql_if.cc:360
void operator()(const boost::uuids::uuid &tuuid, const char *name) const
Definition: cql_if.cc:368
void operator()(const GenDb::Blob &tblob, const char *name) const
Definition: cql_if.cc:415
void operator()(const double &tdouble, const char *name) const
Definition: cql_if.cc:397
void operator()(const uint64_t &tu64, const char *name) const
Definition: cql_if.cc:391
void operator()(const uint8_t &tu8, const char *name) const
Definition: cql_if.cc:375
interface::CassLibrary * cci_
Definition: cql_if.cc:420
void operator()(const IpAddress &tipaddr, const char *name) const
Definition: cql_if.cc:402
void operator()(const uint16_t &tu16, const char *name) const
Definition: cql_if.cc:380
std::string Description() const
Definition: cql_if.cc:1839
WorkerTask(FunctionPtr func, int task_id, int task_instance)
Definition: cql_if.cc:1831
boost::function< void(void)> FunctionPtr
Definition: cql_if.cc:1830
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
Definition: cql_if.cc:1835
virtual CassError CassStatementBindDouble(CassStatement *statement, size_t index, cass_double_t value)
Definition: cql_if.cc:3322
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)
Definition: cql_if.cc:3424
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)
Definition: cql_if.cc:3394
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)
Definition: cql_if.cc:3384
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)
Definition: cql_if.cc:3118
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)
Definition: cql_if.cc:3281
virtual void CassFutureFree(CassFuture *future)
Definition: cql_if.cc:3221
virtual CassInet CassInetInitV6(const cass_uint8_t *address)
Definition: cql_if.cc:3449
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)
Definition: cql_if.cc:3419
virtual void CassResultFree(const CassResult *result)
Definition: cql_if.cc:3254
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)
Definition: cql_if.cc:3087
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)
Definition: cql_if.cc:3160
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)
Definition: cql_if.cc:3189
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)
Definition: cql_if.cc:3262
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)
Definition: cql_if.cc:3287
virtual CassInet CassInetInitV4(const cass_uint8_t *address)
Definition: cql_if.cc:3444
virtual CassError CassStatementBindDoubleByName(CassStatement *statement, const char *name, cass_double_t value)
Definition: cql_if.cc:3361
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)
Definition: cql_if.cc:3123
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)
Definition: cql_if.cc:3455
virtual CassError CassStatementBindStringByNameN(CassStatement *statement, const char *name, size_t name_length, const char *value, size_t value_length)
Definition: cql_if.cc:3338
virtual void CassSchemaMetaFree(const CassSchemaMeta *schema_meta)
Definition: cql_if.cc:3195
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)
Definition: cql_if.cc:3112
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)
Definition: cql_if.cc:3399
virtual void CassLogSetLevel(CassLogLevel log_level)
Definition: cql_if.cc:3461
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)
Definition: cql_if.cc:3307
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)
Definition: cql_if.cc:3184
virtual CassError CassStatementBindUuid(CassStatement *statement, size_t index, CassUuid value)
Definition: cql_if.cc:3317
virtual void CassIteratorFree(CassIterator *iterator)
Definition: cql_if.cc:3268
virtual CassIterator * CassIteratorFromResult(const CassResult *result)
Definition: cql_if.cc:3272
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)
Definition: cql_if.cc:3096
virtual CassError CassStatementBindBytes(CassStatement *statement, size_t index, const cass_byte_t *value, size_t value_length)
Definition: cql_if.cc:3332
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)
Definition: cql_if.cc:3142
virtual CassFuture * CassSessionClose(CassSession *session)
Definition: cql_if.cc:3170
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)
Definition: cql_if.cc:3248
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)
Definition: cql_if.cc:3200
virtual void CassPreparedFree(const CassPrepared *prepared)
Definition: cql_if.cc:3380
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)
Definition: cql_if.cc:3277
virtual size_t CassResultColumnCount(const CassResult *result)
Definition: cql_if.cc:3258
virtual void CassClusterFree(CassCluster *cluster)
Definition: cql_if.cc:3078
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)
Definition: cql_if.cc:3414
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)
Definition: cql_if.cc:3106
virtual CassError CassStatementBindStringN(CassStatement *statement, size_t index, const char *value, size_t value_length)
Definition: cql_if.cc:3301
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)
Definition: cql_if.cc:3225
virtual void CassSslFree(CassSsl *ssl)
Definition: cql_if.cc:3138
virtual const CassResult * CassFutureGetResult(CassFuture *future)
Definition: cql_if.cc:3234
virtual CassValueType GetCassValueType(const CassValue *value)
Definition: cql_if.cc:3390
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)
Definition: cql_if.cc:3179
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)
Definition: cql_if.cc:3082
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)
Definition: cql_if.cc:3215
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)
Definition: cql_if.cc:3296
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)
Definition: cql_if.cc:3174
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)
Definition: cql_if.cc:3092
virtual CassError CassStatementBindInet(CassStatement *statement, size_t index, CassInet value)
Definition: cql_if.cc:3327
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)
Definition: cql_if.cc:3165
virtual cass_bool_t CassValueIsNull(const CassValue *value)
Definition: cql_if.cc:3439
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)
Definition: cql_if.cc:3210
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)
Definition: cql_if.cc:3404
virtual void CassFutureWait(CassFuture *future)
Definition: cql_if.cc:3230
virtual CassError CassStatementBindBytesByNameN(CassStatement *statement, const char *name, size_t name_length, const cass_byte_t *value, size_t value_length)
Definition: cql_if.cc:3371
virtual CassError CassStatementBindUuidByName(CassStatement *statement, const char *name, CassUuid value)
Definition: cql_if.cc:3356
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)
Definition: cql_if.cc:3434
virtual CassSession * CassSessionNew()
Definition: cql_if.cc:3152
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)
Definition: cql_if.cc:3239
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)
Definition: cql_if.cc:3147
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)
Definition: cql_if.cc:3101
virtual CassError CassStatementBindInt64(CassStatement *statement, size_t index, cass_int64_t value)
Definition: cql_if.cc:3312
virtual void CassLogSetCallback(CassLogCallback callback, void *data)
Definition: cql_if.cc:3465
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)
Definition: cql_if.cc:3205
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)
Definition: cql_if.cc:3409
virtual CassError CassStatementBindInt64ByName(CassStatement *statement, const char *name, cass_int64_t value)
Definition: cql_if.cc:3351
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)
Definition: cql_if.cc:3429
virtual void CassSessionFree(CassSession *session)
Definition: cql_if.cc:3156
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)
Definition: cql_if.cc:3128
virtual void CassStatementFree(CassStatement *statement)
Definition: cql_if.cc:3292
virtual CassError CassStatementBindInt32ByName(CassStatement *statement, const char *name, cass_int32_t value)
Definition: cql_if.cc:3346
virtual CassError CassStatementBindInetByName(CassStatement *statement, const char *name, CassInet value)
Definition: cql_if.cc:3366
virtual CassCluster * CassClusterNew()
Definition: cql_if.cc:3074
virtual CassError CassFutureErrorCode(CassFuture *future)
Definition: cql_if.cc:3244
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)=0
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)=0
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)=0
virtual size_t CassResultColumnCount(const CassResult *result)=0
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)=0
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)=0
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)=0
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassSsl * CassSslNew()=0
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)=0
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)=0
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)=0
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassSession * CassSessionNew()=0
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)=0
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)=0
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)=0
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)=0
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)=0
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)=0
virtual CassValueType GetCassValueType(const CassValue *value)=0
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)=0
virtual const CassResult * CassFutureGetResult(CassFuture *future)=0
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)=0
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)=0
virtual CassIterator * CassIteratorFromResult(const CassResult *result)=0
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)=0
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)=0
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)=0
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)=0
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)=0
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)=0
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)=0
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)=0
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)=0
virtual void CassFutureWait(CassFuture *future)=0
virtual CassError CassFutureErrorCode(CassFuture *future)=0
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)=0
virtual cass_bool_t CassValueIsNull(const CassValue *value)=0
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)=0
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)=0
virtual CassFuture * CassSessionClose(CassSession *session)=0
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)=0
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)=0
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)=0
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)=0
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)=0
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)=0
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)=0
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)=0
static EventManager evm
#define CQLIF_DEBUG
Definition: cql_if.cc:37
SandeshTraceBufferPtr CqlTraceDebugBuf(SandeshTraceBufferCreate(CQLIF_DEBUG, 10000))
#define CQLIF_INFO_TRACE(_Msg)
Definition: cql_if.cc:56
#define CASS_LIB_TRACE(_Level, _Msg)
Definition: cql_if.cc:72
SandeshTraceBufferPtr CqlTraceErrBuf(SandeshTraceBufferCreate(CQLIF_ERR, 20000))
#define CQLIF_DEBUG_TRACE(_Msg)
Definition: cql_if.cc:48
#define CQLIF_INFO
Definition: cql_if.cc:38
#define CQLIF_ERR_TRACE(_Msg)
Definition: cql_if.cc:64
SandeshTraceBufferPtr CqlTraceInfoBuf(SandeshTraceBufferCreate(CQLIF_INFO, 10000))
#define CQLIF_ERR
Definition: cql_if.cc:39
uint8_t type
Definition: load_balance.h:2
bool LoggingDisabled()
Definition: logging.cc:24
#define GENERIC_RAW_ARRAY(obj)
Definition: misc_utils.h:32
std::string DbDataValueVecToString(const GenDb::DbDataValueVec &v_db_value)
Definition: gendb_if.cc:102
@ DB_VALUE_STRING
Definition: gendb_if.h:89
@ DB_VALUE_BLANK
Definition: gendb_if.h:88
std::vector< DbDataValue > DbDataValueVec
Definition: gendb_if.h:100
boost::variant< boost::blank, std::string, uint64_t, uint32_t, boost::uuids::uuid, uint8_t, uint16_t, double, IpAddress, Blob > DbDataValue
Definition: gendb_if.h:85
std::vector< GenDb::DbDataType::type > DbDataTypeVec
Definition: gendb_if.h:101
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
Definition: gendb_if.h:238
boost::asio::ip::tcp::endpoint Endpoint
Definition: gendb_if.h:240
std::vector< WhereIndexInfo > WhereIndexInfoVec
Definition: gendb_if.h:233
boost::ptr_vector< ColList > ColListVec
Definition: gendb_if.h:208
boost::ptr_vector< NewCol > NewColVec
Definition: gendb_if.h:186
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:611
static std::string LoadCertFile(const std::string &ca_certs_path)
Definition: cql_if.cc:1814
static void OnExecuteQueryAsync(CassFuture *future, void *data)
Definition: cql_if.cc:1483
static void ExecuteQueryAsyncInternal(interface::CassLibrary *cci, CassSession *session, const char *qid, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx=NULL)
Definition: cql_if.cc:1519
static bool DynamicCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
Definition: cql_if.cc:1556
static void CassLibraryLog(const CassLogMessage *message, void *data)
Definition: cql_if.cc:1803
void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, size_t rk_count, GenDb::ColListVec *v_col_list)
Definition: cql_if.cc:1434
static void ExecuteQueryResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx)
Definition: cql_if.cc:1548
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:723
static const char * kQCompactionStrategy("compaction = {'class': " "'org.apache.cassandra.db.compaction.%s'}")
static GenDb::DbDataValue CassValue2DbDataValue(interface::CassLibrary *cci, const CassValue *cvalue)
Definition: cql_if.cc:1049
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1024
static bool GetCassTablePartitionKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *rk_count)
Definition: cql_if.cc:1740
bool DynamicCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
Definition: cql_if.cc:860
static std::string DbDataTypes2CassTypes(const GenDb::DbDataTypeVec &v_db_types)
Definition: cql_if.cc:200
static std::string CassSelectFromTableInternal(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec, const GenDb::WhereIndexInfoVec &where_vec)
Definition: cql_if.cc:894
CassSharedPtr< CassSsl > CassSslPtr
Definition: cql_if_impl.h:179
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, size_t rk_count, size_t ck_count, CassConsistency consistency, GenDb::ColListVec *v_col_list)
Definition: cql_if.cc:1610
static bool GetCassTableClusteringKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *ck_count)
Definition: cql_if.cc:1719
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
Definition: cql_if.cc:746
CassSharedPtr< const CassPrepared > CassPreparedPtr
Definition: cql_if_impl.h:185
static bool PrepareSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassPreparedPtr *prepared)
Definition: cql_if.cc:1140
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1034
static bool StaticCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
Definition: cql_if.cc:1624
static bool IsCassTableMetaPresent(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table)
Definition: cql_if.cc:1700
static std::string DbColIndexMode2String(const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:567
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
Definition: cql_if.cc:475
static CassConsistency Db2CassConsistency(GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:206
static const std::string kQReadRepairChanceDTCS("read_repair_chance = 0.0")
CassSharedPtr< const CassResult > CassResultPtr
Definition: cql_if_impl.h:183
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, size_t rk_count, CassConsistency consistency, GenDb::ColListVec *v_col_list)
Definition: cql_if.cc:1648
static bool ExecuteQueryResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassResultPtr *result, CassConsistency consistency)
Definition: cql_if.cc:1191
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:585
std::string CassSelectFromTable(const std::string &table)
Definition: cql_if.cc:1042
static void ExecuteQueryStatementAsync(interface::CassLibrary *cci, CassSession *session, const char *query_id, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb)
Definition: cql_if.cc:1541
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1004
static const char * DbDataType2CassType(const GenDb::DbDataType::type &db_type)
Definition: cql_if.cc:169
static void ExecuteQueryAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb)
Definition: cql_if.cc:1532
static void encode_uuid(char *output, const CassUuid &uuid)
Definition: cql_if.cc:123
bool StaticCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
Definition: cql_if.cc:819
static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc)
Definition: cql_if.cc:1207
static char * decode_uuid(char *input, CassUuid *output)
Definition: cql_if.cc:150
void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, size_t rk_count, size_t ck_count, GenDb::ColListVec *v_col_list)
Definition: cql_if.cc:1355
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:664
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1635
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
Definition: cql_if_impl.h:189
static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel)
Definition: cql_if.cc:1760
static bool SyncFutureWait(interface::CassLibrary *cci, CassFuture *future)
Definition: cql_if.cc:1661
static const CassTableMeta * GetCassTableMeta(interface::CassLibrary *cci, const CassSchemaMeta *schema_meta, const std::string &keyspace, const std::string &table, bool log_error)
Definition: cql_if.cc:1674
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:435
static bool ExecuteQueryStatementSync(interface::CassLibrary *cci, CassSession *session, CassStatement *statement, CassConsistency consistency)
Definition: cql_if.cc:1200
static bool ExecuteQuerySync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency)
Definition: cql_if.cc:1183
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1568
static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level)
Definition: cql_if.cc:1781
static bool ExecuteQuerySyncInternal(interface::CassLibrary *cci, CassSession *session, CassStatement *qstatement, CassResultPtr *result, CassConsistency consistency)
Definition: cql_if.cc:1160
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
Definition: cql_if.cc:1015
static const std::string kQGCGraceSeconds("gc_grace_seconds = 0")
static void AsyncRowGetCompletionCallback(boost::shared_ptr< AsyncRowGetCallbackContext > cb_ctx)
Definition: cql_if.cc:2685
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
Definition: sandesh_trace.h:18
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
Definition: sandesh_trace.h:46
static const std::string integerToString(const NumberType &num)
Definition: string_util.h:19
size_t size() const
Definition: gendb_if.h:63
const uint8_t * data() const
Definition: gendb_if.h:60
std::string cfname_
Definition: gendb_if.h:197
NewColVec columns_
Definition: gendb_if.h:199
DbDataValueVec rowkey_
Definition: gendb_if.h:198
DbDataValueVec finish_
Definition: gendb_if.h:226
bool IsEmpty() const
Definition: gendb_if.h:217
DbDataValueVec start_
Definition: gendb_if.h:225
std::string ToString() const
Definition: gendb_if.cc:60
ColumnMap cfcolumns_
Definition: gendb_if.h:140
DbDataTypeVec columns_
Definition: gendb_if.h:142
DbDataTypeVec value_
Definition: gendb_if.h:143
std::map< std::string, GenDb::DbDataType::type > ColumnMap
Definition: gendb_if.h:113
ColumnFamilyType cftype_
Definition: gendb_if.h:138
@ COLUMN_FAMILY_NOSQL
Definition: gendb_if.h:111
@ COLUMN_FAMILY_SQL
Definition: gendb_if.h:110
std::string cfname_
Definition: gendb_if.h:137
DbDataTypeVec clustering_columns_
Definition: gendb_if.h:141
DbDataTypeVec partition_keys_
Definition: gendb_if.h:139
boost::scoped_ptr< DbDataValueVec > value
Definition: gendb_if.h:181
boost::scoped_ptr< DbDataValueVec > name
Definition: gendb_if.h:180
NewCf::ColumnFamilyType cftype_
Definition: gendb_if.h:179
static std::string ToString(Op::type op)
Definition: gendb_if.cc:81
std::auto_ptr< GenDb::ColList > row_
Definition: cql_if.cc:2682
GenDb::GenDbIf::DbGetRowCb cb_
Definition: cql_if.cc:2680
AsyncRowGetCallbackContext(GenDb::GenDbIf::DbGetRowCb cb, GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row)
Definition: cql_if.cc:2674
GenDb::DbOpResult::type drc_
Definition: cql_if.cc:2681
CassString(const char *data)
Definition: cql_if.cc:108
CassString(const char *data, size_t length)
Definition: cql_if.cc:113
boost::uuids::uuid uuid