OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cql_if.cc
Go to the documentation of this file.
1 //
2 // Copyright (c) 2015 Juniper Networks, Inc. All rights reserved.
3 //
4 
5 #include <assert.h>
6 #include <fstream>
7 
8 #include <tbb/atomic.h>
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string.hpp>
11 #include <boost/algorithm/string/join.hpp>
12 #include <boost/unordered_map.hpp>
13 #include <boost/system/error_code.hpp>
14 
15 #include <cassandra.h>
16 
17 #include <base/logging.h>
18 #include <base/misc_utils.h>
19 #include <base/task.h>
20 #include <base/timer.h>
21 #include <base/string_util.h>
22 #include <base/address_util.h>
23 #include <io/event_manager.h>
24 #include <database/gendb_if.h>
25 #include <database/gendb_constants.h>
29 
30 using namespace boost::system;
31 
32 #define CQLIF_DEBUG "CqlTraceBufDebug"
33 #define CQLIF_INFO "CqlTraceBufInfo"
34 #define CQLIF_ERR "CqlTraceBufErr"
35 
37  CQLIF_DEBUG, 10000));
39  CQLIF_INFO, 10000));
41  CQLIF_ERR, 20000));
42 
43 #define CQLIF_DEBUG_TRACE(_Msg) \
44  do { \
45  std::stringstream _ss; \
46  _ss << __func__ << ":" << __FILE__ << ":" << \
47  __LINE__ << ": " << _Msg; \
48  CQL_TRACE_TRACE(CqlTraceDebugBuf, _ss.str()); \
49  } while (false) \
50 
51 #define CQLIF_INFO_TRACE(_Msg) \
52  do { \
53  std::stringstream _ss; \
54  _ss << __func__ << ":" << __FILE__ << ":" << \
55  __LINE__ << ": " << _Msg; \
56  CQL_TRACE_TRACE(CqlTraceInfoBuf, _ss.str()); \
57  } while (false) \
58 
59 #define CQLIF_ERR_TRACE(_Msg) \
60  do { \
61  std::stringstream _ss; \
62  _ss << __func__ << ":" << __FILE__ << ":" << \
63  __LINE__ << ": " << _Msg; \
64  CQL_TRACE_TRACE(CqlTraceErrBuf, _ss.str()); \
65  } while (false) \
66 
67 #define CASS_LIB_TRACE(_Level, _Msg) \
68  do { \
69  if (_Level == log4cplus::ERROR_LOG_LEVEL) { \
70  CQL_TRACE_TRACE(CqlTraceErrBuf, _Msg); \
71  } else if (_Level == log4cplus::DEBUG_LOG_LEVEL) { \
72  CQL_TRACE_TRACE(CqlTraceDebugBuf, _Msg); \
73  } else { \
74  CQL_TRACE_TRACE(CqlTraceInfoBuf, _Msg); \
75  } \
76  } while (false) \
77 
78 #define CQLIF_LOG(_Level, _Msg) \
79  do { \
80  if (LoggingDisabled()) break; \
81  log4cplus::Logger logger = log4cplus::Logger::getRoot(); \
82  LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \
83  __LINE__ << ": " << _Msg); \
84  } while (false)
85 
86 #define CQLIF_LOG_ERR(_Msg) \
87  do { \
88  LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \
89  << _Msg); \
90  } while (false)
91 
92 namespace cass {
93 namespace cql {
94 namespace impl {
95 
96 // CassString convenience structure
97 struct CassString {
99  data(NULL),
100  length(0) {
101  }
102 
103  CassString(const char *data) :
104  data(data),
105  length(strlen(data)) {
106  }
107 
108  CassString(const char* data, size_t length) :
109  data(data),
110  length(length) {
111  }
112 
113  const char* data;
114  size_t length;
115 };
116 
117 // CassUuid encode and decode
118 static inline void encode_uuid(char* output, const CassUuid &uuid) {
119  uint64_t time_and_version = uuid.time_and_version;
120  output[3] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
121  time_and_version >>= 8;
122  output[2] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
123  time_and_version >>= 8;
124  output[1] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
125  time_and_version >>= 8;
126  output[0] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
127  time_and_version >>= 8;
128 
129  output[5] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
130  time_and_version >>= 8;
131  output[4] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
132  time_and_version >>= 8;
133 
134  output[7] = static_cast<char>(time_and_version & 0x00000000000000FFLL);
135  time_and_version >>= 8;
136  output[6] = static_cast<char>(time_and_version & 0x000000000000000FFLL);
137 
138  uint64_t clock_seq_and_node = uuid.clock_seq_and_node;
139  for (size_t i = 0; i < 8; ++i) {
140  output[15 - i] = static_cast<char>(clock_seq_and_node & 0x00000000000000FFL);
141  clock_seq_and_node >>= 8;
142  }
143 }
144 
145 static inline char* decode_uuid(char* input, CassUuid* output) {
146  output->time_and_version = static_cast<uint64_t>(static_cast<uint8_t>(input[3]));
147  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[2])) << 8;
148  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[1])) << 16;
149  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[0])) << 24;
150 
151  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[5])) << 32;
152  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[4])) << 40;
153 
154  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[7])) << 48;
155  output->time_and_version |= static_cast<uint64_t>(static_cast<uint8_t>(input[6])) << 56;
156 
157  output->clock_seq_and_node = 0;
158  for (size_t i = 0; i < 8; ++i) {
159  output->clock_seq_and_node |= static_cast<uint64_t>(static_cast<uint8_t>(input[15 - i])) << (8 * i);
160  }
161  return input + 16;
162 }
163 
164 static const char * DbDataType2CassType(
165  const GenDb::DbDataType::type &db_type) {
166  switch (db_type) {
167  case GenDb::DbDataType::AsciiType:
168  return "ascii";
169  case GenDb::DbDataType::LexicalUUIDType:
170  return "uuid";
171  case GenDb::DbDataType::TimeUUIDType:
172  return "timeuuid";
173  case GenDb::DbDataType::Unsigned8Type:
174  case GenDb::DbDataType::Unsigned16Type:
175  case GenDb::DbDataType::Unsigned32Type:
176  return "int";
177  case GenDb::DbDataType::Unsigned64Type:
178  return "bigint";
179  case GenDb::DbDataType::DoubleType:
180  return "double";
181  case GenDb::DbDataType::UTF8Type:
182  return "text";
183  case GenDb::DbDataType::InetType:
184  return "inet";
185  case GenDb::DbDataType::IntegerType:
186  return "varint";
187  case GenDb::DbDataType::BlobType:
188  return "blob";
189  default:
190  assert(false && "Invalid data type");
191  return "";
192  }
193 }
194 
195 static std::string DbDataTypes2CassTypes(
196  const GenDb::DbDataTypeVec &v_db_types) {
197  assert(!v_db_types.empty());
198  return std::string(DbDataType2CassType(v_db_types[0]));
199 }
200 
201 static CassConsistency Db2CassConsistency(
202  GenDb::DbConsistency::type dconsistency) {
203  switch (dconsistency) {
205  return CASS_CONSISTENCY_ANY;
207  return CASS_CONSISTENCY_ONE;
209  return CASS_CONSISTENCY_TWO;
211  return CASS_CONSISTENCY_THREE;
213  return CASS_CONSISTENCY_QUORUM;
215  return CASS_CONSISTENCY_ALL;
217  return CASS_CONSISTENCY_LOCAL_QUORUM;
219  return CASS_CONSISTENCY_EACH_QUORUM;
221  return CASS_CONSISTENCY_SERIAL;
223  return CASS_CONSISTENCY_LOCAL_SERIAL;
225  return CASS_CONSISTENCY_LOCAL_ONE;
227  default:
228  return CASS_CONSISTENCY_UNKNOWN;
229  }
230 }
231 
232 // Cass Query Printer
233 class CassQueryPrinter : public boost::static_visitor<> {
234  public:
235  CassQueryPrinter(std::ostream &os, bool quote_strings) :
236  os_(os),
237  quote_strings_(quote_strings) {
238  }
239  CassQueryPrinter(std::ostream &os) :
240  os_(os),
241  quote_strings_(true) {
242  }
243  template<typename T>
244  void operator()(const T &t) const {
245  os_ << t;
246  }
247  void operator()(const boost::uuids::uuid &tuuid) const {
248  os_ << to_string(tuuid);
249  }
250  // uint8_t must be handled specially because ostream sees
251  // uint8_t as a text type instead of an integer type
252  void operator()(const uint8_t &tu8) const {
253  os_ << (uint16_t)tu8;
254  }
255  void operator()(const std::string &tstring) const {
256  if (quote_strings_) {
257  os_ << "'" << tstring << "'";
258  } else {
259  os_ << tstring;
260  }
261  }
262  // CQL int is 32 bit signed integer
263  void operator()(const uint32_t &tu32) const {
264  os_ << (int32_t)tu32;
265  }
266  // CQL bigint is 64 bit signed long
267  void operator()(const uint64_t &tu64) const {
268  os_ << (int64_t)tu64;
269  }
270  void operator()(const IpAddress &tipaddr) const {
271  os_ << "'" << tipaddr << "'";
272  }
273  std::ostream &os_;
275 };
276 
277 //
278 // CassStatement bind
279 //
280 class CassStatementIndexBinder : public boost::static_visitor<> {
281  public:
283  CassStatement *statement) :
284  cci_(cci),
285  statement_(statement) {
286  }
287  void operator()(const boost::blank &tblank, size_t index) const {
288  assert(false && "CassStatement bind to boost::blank not supported");
289  }
290  void operator()(const std::string &tstring, size_t index) const {
291  CassError rc(cci_->CassStatementBindStringN(statement_, index,
292  tstring.c_str(), tstring.length()));
293  assert(rc == CASS_OK);
294  }
295  void operator()(const boost::uuids::uuid &tuuid, size_t index) const {
296  CassUuid cuuid;
297  decode_uuid((char *)&tuuid, &cuuid);
298  CassError rc(cci_->CassStatementBindUuid(statement_, index, cuuid));
299  assert(rc == CASS_OK);
300  }
301  void operator()(const uint8_t &tu8, size_t index) const {
302  CassError rc(cci_->CassStatementBindInt32(statement_, index, tu8));
303  assert(rc == CASS_OK);
304  }
305  void operator()(const uint16_t &tu16, size_t index) const {
306  CassError rc(cci_->CassStatementBindInt32(statement_, index, tu16));
307  assert(rc == CASS_OK);
308  }
309  void operator()(const uint32_t &tu32, size_t index) const {
310  assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
311  CassError rc(cci_->CassStatementBindInt32(statement_, index,
312  (cass_int32_t)tu32));
313  assert(rc == CASS_OK);
314  }
315  void operator()(const uint64_t &tu64, size_t index) const {
316  assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
317  CassError rc(cci_->CassStatementBindInt64(statement_, index,
318  (cass_int64_t)tu64));
319  assert(rc == CASS_OK);
320  }
321  void operator()(const double &tdouble, size_t index) const {
322  CassError rc(cci_->CassStatementBindDouble(statement_, index,
323  (cass_double_t)tdouble));
324  assert(rc == CASS_OK);
325  }
326  void operator()(const IpAddress &tipaddr, size_t index) const {
327  CassInet cinet;
328  if (tipaddr.is_v4()) {
329  boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
330  cinet = cci_->CassInetInitV4(GENERIC_RAW_ARRAY(tv4.to_bytes()));
331  } else {
332  boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
333  cinet = cci_->CassInetInitV6(GENERIC_RAW_ARRAY(tv6.to_bytes()));
334  }
335  CassError rc(cci_->CassStatementBindInet(statement_, index,
336  cinet));
337  assert(rc == CASS_OK);
338  }
339  void operator()(const GenDb::Blob &tblob, size_t index) const {
340  CassError rc(cci_->CassStatementBindBytes(statement_, index,
341  tblob.data(), tblob.size()));
342  assert(rc == CASS_OK);
343  }
345  CassStatement *statement_;
346 };
347 
348 class CassStatementNameBinder : public boost::static_visitor<> {
349  public:
351  CassStatement *statement) :
352  cci_(cci),
353  statement_(statement) {
354  }
355  void operator()(const boost::blank &tblank, const char *name) const {
356  assert(false && "CassStatement bind to boost::blank not supported");
357  }
358  void operator()(const std::string &tstring, const char *name) const {
359  CassError rc(cci_->CassStatementBindStringByNameN(statement_, name,
360  strlen(name), tstring.c_str(), tstring.length()));
361  assert(rc == CASS_OK);
362  }
363  void operator()(const boost::uuids::uuid &tuuid, const char *name) const {
364  CassUuid cuuid;
365  decode_uuid((char *)&tuuid, &cuuid);
366  CassError rc(cci_->CassStatementBindUuidByName(statement_, name,
367  cuuid));
368  assert(rc == CASS_OK);
369  }
370  void operator()(const uint8_t &tu8, const char *name) const {
371  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
372  tu8));
373  assert(rc == CASS_OK);
374  }
375  void operator()(const uint16_t &tu16, const char *name) const {
376  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
377  tu16));
378  assert(rc == CASS_OK);
379  }
380  void operator()(const uint32_t &tu32, const char *name) const {
381  assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
382  CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
383  (cass_int32_t)tu32));
384  assert(rc == CASS_OK);
385  }
386  void operator()(const uint64_t &tu64, const char *name) const {
387  assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
388  CassError rc(cci_->CassStatementBindInt64ByName(statement_, name,
389  (cass_int64_t)tu64));
390  assert(rc == CASS_OK);
391  }
392  void operator()(const double &tdouble, const char *name) const {
393  CassError rc(cci_->CassStatementBindDoubleByName(statement_, name,
394  (cass_double_t)tdouble));
395  assert(rc == CASS_OK);
396  }
397  void operator()(const IpAddress &tipaddr, const char *name) const {
398  CassInet cinet;
399  if (tipaddr.is_v4()) {
400  boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
401  cinet = cci_->CassInetInitV4(GENERIC_RAW_ARRAY(tv4.to_bytes()));
402  } else {
403  boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
404  cinet = cci_->CassInetInitV6(GENERIC_RAW_ARRAY(tv6.to_bytes()));
405  }
406  CassError rc(cci_->CassStatementBindInetByName(statement_, name,
407  cinet));
408  assert(rc == CASS_OK);
409  }
410  void operator()(const GenDb::Blob &tblob, const char *name) const {
411  CassError rc(cci_->CassStatementBindBytesByNameN(statement_, name,
412  strlen(name), tblob.data(), tblob.size()));
413  assert(rc == CASS_OK);
414  }
416  CassStatement *statement_;
417 };
418 
419 static const char * kQCompactionStrategy(
420  "compaction = {'class': "
421  "'org.apache.cassandra.db.compaction.%s'}");
422 static const std::string kQGCGraceSeconds("gc_grace_seconds = 0");
423 static const std::string kQReadRepairChanceDTCS(
424  "read_repair_chance = 0.0");
425 
426 //
427 // Cf2CassCreateTableIfNotExists
428 //
429 
431  const std::string &compaction_strategy) {
432  std::ostringstream query;
433  // Table name
434  query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " ";
435  // Row key
436  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
437  assert(rkeys.size() == 1);
438  query << "(key " << DbDataType2CassType(rkeys[0]) <<
439  " PRIMARY KEY";
440  // Columns
441  const GenDb::NewCf::ColumnMap &cfcolumns(cf.cfcolumns_);
442  assert(!cfcolumns.empty());
443  BOOST_FOREACH(const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
444  cfcolumns) {
445  query << ", \"" << cfcolumn.first << "\" " <<
446  DbDataType2CassType(cfcolumn.second);
447  }
448  char cbuf[512];
449  int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy,
450  compaction_strategy.c_str()));
451  assert(!(n < 0 || n >= (int)sizeof(cbuf)));
452 
453  // The compaction strategy DateTieredCompactionStrategy precludes
454  // using read repair, because of the way timestamps are checked for
455  // DTCS compaction.In this case, you must set read_repair_chance to
456  // zero. For other compaction strategies, read repair should be
457  // enabled with a read_repair_chance value of 0.2 being typical
458  if (compaction_strategy ==
459  GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
460  query << ") WITH " << std::string(cbuf) << " AND " <<
462  } else {
463  query << ") WITH " << std::string(cbuf) << " AND " <<
465  }
466 
467  return query.str();
468 }
469 
471  const std::string &compaction_strategy,
472  boost::system::error_code *ec) {
473  std::ostringstream query;
474 
475  *ec = errc::make_error_code(errc::success);
476  // sanity check - # of clustering_columns cannot be 0
477  // for dynamic tables
478  if (cf.clustering_columns_.size() == 0) {
479  *ec = errc::make_error_code(errc::invalid_argument);
480  return query.str();
481  }
482 
483  // Table name
484  query << "CREATE TABLE IF NOT EXISTS " << cf.cfname_ << " (";
485  // Row key
486  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
487  int rk_size(rkeys.size());
488  for (int i = 0; i < rk_size; i++) {
489  if (i) {
490  int key_num(i + 1);
491  query << "key" << key_num;
492  } else {
493  query << "key";
494  }
495  query << " " << DbDataType2CassType(rkeys[i]) << ", ";
496  }
497  // clustering columns
498  const GenDb::DbDataTypeVec &clustering_columns(cf.clustering_columns_);
499  int ccn_size(clustering_columns.size());
500  for (int i = 0; i < ccn_size; i++) {
501  int cnum(i + 1);
502  query << "column" << cnum << " " <<
503  DbDataType2CassType(clustering_columns[i]) << ", ";
504  }
505  // columns
506  const GenDb::DbDataTypeVec &columns(cf.columns_);
507  int cn_size(columns.size());
508  for (int i = 0; i < cn_size; i++) {
509  int cnum(i + 1 + ccn_size);
510  query << "column" << cnum << " " <<
511  DbDataType2CassType(columns[i]) << ", ";
512  }
513  // Value
514  const GenDb::DbDataTypeVec &values(cf.value_);
515  if (values.size() > 0) {
516  query << "value" << " " << DbDataTypes2CassTypes(values) << ", ";
517  }
518  // Primary Key
519  query << "PRIMARY KEY (";
520  std::ostringstream rkey_ss;
521  for (int i = 0; i < rk_size; i++) {
522  if (i) {
523  int key_num(i + 1);
524  rkey_ss << ", key" << key_num;
525  } else {
526  rkey_ss << "key";
527  }
528  }
529  if (rk_size >= 2) {
530  query << "(" << rkey_ss.str() << "), ";
531  } else {
532  query << rkey_ss.str() << ", ";
533  }
534  for (int i = 0; i < ccn_size; i++) {
535  int cnum(i + 1);
536  if (i) {
537  query << ", ";
538  }
539  query << "column" << cnum;
540  }
541  char cbuf[512];
542  int n(snprintf(cbuf, sizeof(cbuf), kQCompactionStrategy,
543  compaction_strategy.c_str()));
544  assert(!(n < 0 || n >= (int)sizeof(cbuf)));
545 
546  // The compaction strategy DateTieredCompactionStrategy precludes
547  // using read repair, because of the way timestamps are checked for
548  // DTCS compaction.In this case, you must set read_repair_chance to
549  // zero. For other compaction strategies, read repair should be
550  // enabled with a read_repair_chance value of 0.2 being typical
551  if (compaction_strategy ==
552  GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
553  query << ")) WITH " << std::string(cbuf) << " AND " <<
555  } else {
556  query << ")) WITH " << std::string(cbuf) << " AND " <<
558  }
559  return query.str();
560 }
561 
562 static std::string DbColIndexMode2String(
563  const GenDb::ColIndexMode::type index_mode) {
564  switch (index_mode) {
566  return "";
567  case GenDb::ColIndexMode::PREFIX:
568  return "PREFIX";
569  case GenDb::ColIndexMode::CONTAINS:
570  return "CONTAINS";
571  default:
572  assert(false && "INVALID");
573  }
574 }
575 
576 //
577 // CassCreateIndexIfNotExists
578 //
579 
580 std::string CassCreateIndexIfNotExists(const std::string &cfname,
581  const std::string &column, const std::string &indexname,
582  const GenDb::ColIndexMode::type index_mode) {
583  std::ostringstream query;
584 
585  query << "CREATE ";
586  if (index_mode != GenDb::ColIndexMode::NONE) {
587  query << "CUSTOM "; // SASI
588  }
589  // Indexname
590  query << "INDEX IF NOT EXISTS " << indexname << " ";
591  // Index Column
592  query << "ON " << cfname << "(\""<< column <<"\")";
593  // Mode if SASI
594  if (index_mode != GenDb::ColIndexMode::NONE) {
595  query << " USING \'org.apache.cassandra.index.sasi.SASIIndex\' " <<
596  "WITH OPTIONS = {\'mode\': \'" << DbColIndexMode2String(index_mode) << "\'}";
597  }
598  query << ";";
599  return query.str();
600 }
601 
602 //
603 // Cf2CassInsertIntoTable
604 //
605 
606 std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns) {
607  std::ostringstream query;
608  // Table
609  const std::string &table(v_columns->cfname_);
610  query << "INSERT INTO " << table << " (";
611  std::ostringstream values_ss;
612  values_ss << "VALUES (";
613  CassQueryPrinter values_printer(values_ss);
614  // Row keys
615  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
616  int rk_size(rkeys.size());
617  for (int i = 0; i < rk_size; i++) {
618  if (i) {
619  int key_num(i + 1);
620  query << ", key" << key_num;
621  } else {
622  query << "key";
623  }
624  if (i) {
625  values_ss << ", ";
626  }
627  boost::apply_visitor(values_printer, rkeys[i]);
628  }
629  // Columns
630  int cttl(-1);
631  CassQueryPrinter cnames_printer(query, false);
632  BOOST_FOREACH(const GenDb::NewCol &column, v_columns->columns_) {
633  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_SQL);
634  // Column Name
635  query << ", ";
636  const GenDb::DbDataValueVec &cnames(*column.name.get());
637  assert(cnames.size() == 1);
638  // Double quote column name strings
639  query << "\"";
640  boost::apply_visitor(cnames_printer, cnames[0]);
641  query << "\"";
642  // Column Values
643  values_ss << ", ";
644  const GenDb::DbDataValueVec &cvalues(*column.value.get());
645  assert(cvalues.size() == 1);
646  boost::apply_visitor(values_printer, cvalues[0]);
647  // Column TTL
648  cttl = column.ttl;
649  }
650  query << ") ";
651  values_ss << ")";
652  query << values_ss.str();
653  if (cttl > 0) {
654  query << " USING TTL " << cttl;
655  }
656  return query.str();
657 }
658 
659 std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns) {
660  std::ostringstream query;
661  // Table
662  const std::string &table(v_columns->cfname_);
663  query << "INSERT INTO " << table << " (";
664  std::ostringstream values_ss;
665  // Row keys
666  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
667  int rk_size(rkeys.size());
668  CassQueryPrinter values_printer(values_ss);
669  for (int i = 0; i < rk_size; i++) {
670  if (i) {
671  int key_num(i + 1);
672  query << ", key" << key_num;
673  } else {
674  query << "key";
675  }
676  boost::apply_visitor(values_printer, rkeys[i]);
677  values_ss << ", ";
678  }
679  // Columns
680  const GenDb::NewColVec &columns(v_columns->columns_);
681  assert(columns.size() == 1);
682  const GenDb::NewCol &column(columns[0]);
683  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_NOSQL);
684  // Column Names
685  const GenDb::DbDataValueVec &cnames(*column.name.get());
686  int cn_size(cnames.size());
687  for (int i = 0; i < cn_size; i++) {
688  int cnum(i + 1);
689  if (cnames.at(i).which() != GenDb::DB_VALUE_BLANK) {
690  query << ", column" << cnum;
691  boost::apply_visitor(values_printer, cnames[i]);
692  if (i != cn_size - 1) {
693  values_ss << ", ";
694  }
695  }
696  }
697  // Column Values
698  const GenDb::DbDataValueVec &cvalues(*column.value.get());
699  if (cvalues.size() > 0) {
700  query << ", value) VALUES (";
701  values_ss << ", ";
702  boost::apply_visitor(values_printer, cvalues[0]);
703  } else {
704  query << ") VALUES (";
705  }
706  values_ss << ")";
707  query << values_ss.str();
708  if (column.ttl > 0) {
709  query << " USING TTL " << column.ttl;
710  }
711  return query.str();
712 }
713 
714 //
715 // Cf2CassPrepareInsertIntoTable
716 //
717 
719  std::ostringstream query;
720  // Table name
721  query << "INSERT INTO " << cf.cfname_ << " ";
722  // Row key
723  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
724  assert(rkeys.size() == 1);
725  std::ostringstream values_ss;
726  query << "(key";
727  values_ss << ") VALUES (?";
728  // Columns
729  const GenDb::NewCf::ColumnMap &cfcolumns(cf.cfcolumns_);
730  assert(!cfcolumns.empty());
731  BOOST_FOREACH(const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
732  cfcolumns) {
733  query << ", \"" << cfcolumn.first << "\"";
734  values_ss << ", ?";
735  }
736  query << values_ss.str();
737  query << ") USING TTL ?";
738  return query.str();
739 }
740 
742  boost::system::error_code *ec) {
743  std::ostringstream query;
744 
745  *ec = errc::make_error_code(errc::success);
746  // sanity check - # of clustering_columns cannot be 0
747  // for dynamic tables
748  if (cf.clustering_columns_.size() == 0) {
749  *ec = errc::make_error_code(errc::invalid_argument);
750  return query.str();
751  }
752 
753  // Table name
754  query << "INSERT INTO " << cf.cfname_ << " (";
755  // Row key
756  const GenDb::DbDataTypeVec &rkeys(cf.partition_keys_);
757  int rk_size(rkeys.size());
758  std::ostringstream values_ss;
759  for (int i = 0; i < rk_size; i++) {
760  if (i) {
761  int key_num(i + 1);
762  query << "key" << key_num;
763  } else {
764  query << "key";
765  }
766  query << ", ";
767  values_ss << "?, ";
768  }
769  // Clustering Column name
770  const GenDb::DbDataTypeVec &clustering_columns(cf.clustering_columns_);
771  int ccn_size(clustering_columns.size());
772  for (int i = 0; i < ccn_size; i++) {
773  int cnum(i + 1);
774  query << "column" << cnum;
775  values_ss << "?";
776  if (i != ccn_size - 1) {
777  query << ", ";
778  values_ss << ", ";
779  }
780  }
781  // Column name
782  const GenDb::DbDataTypeVec &columns(cf.columns_);
783  int cn_size(columns.size());
784  if (cn_size > 0) {
785  query << ", ";
786  values_ss << ", ";
787  }
788  for (int i = 0; i < cn_size; i++) {
789  int cnum(i + 1 + ccn_size);
790  query << "column" << cnum;
791  values_ss << "?";
792  if (i != cn_size - 1) {
793  query << ", ";
794  values_ss << ", ";
795  }
796  }
797  // Value
798  const GenDb::DbDataTypeVec &values(cf.value_);
799  if (values.size() > 0) {
800  query << ", value";
801  values_ss << ", ?";
802  }
803  query << ") VALUES (";
804  values_ss << ")";
805  query << values_ss.str();
806  query << " USING TTL ?";
807  return query.str();
808 }
809 
810 //
811 // Cf2CassPrepareBind
812 //
813 
815  CassStatement *statement,
816  const GenDb::ColList *v_columns) {
817  CassStatementNameBinder values_binder(cci, statement);
818  // Row keys
819  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
820  int rk_size(rkeys.size());
821  size_t idx(0);
822  for (; (int) idx < rk_size; idx++) {
823  std::string rk_name;
824  if (idx) {
825  int key_num(idx + 1);
826  rk_name = "key" + integerToString(key_num);
827  } else {
828  rk_name = "key";
829  }
830  boost::apply_visitor(boost::bind(values_binder, _1, rk_name.c_str()),
831  rkeys[idx]);
832  }
833  // Columns
834  int cttl(-1);
835  BOOST_FOREACH(const GenDb::NewCol &column, v_columns->columns_) {
836  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_SQL);
837  const GenDb::DbDataValueVec &cnames(*column.name.get());
838  assert(cnames.size() == 1);
839  assert(cnames[0].which() == GenDb::DB_VALUE_STRING);
840  std::string cname(boost::get<std::string>(cnames[0]));
841  const GenDb::DbDataValueVec &cvalues(*column.value.get());
842  assert(cvalues.size() == 1);
843  boost::apply_visitor(boost::bind(values_binder, _1, cname.c_str()),
844  cvalues[0]);
845  // Column TTL
846  cttl = column.ttl;
847  idx++;
848  }
849  CassError rc(cci->CassStatementBindInt32(statement, idx++,
850  (cass_int32_t)cttl));
851  assert(rc == CASS_OK);
852  return true;
853 }
854 
856  CassStatement *statement,
857  const GenDb::ColList *v_columns) {
858  CassStatementIndexBinder values_binder(cci, statement);
859  // Row keys
860  const GenDb::DbDataValueVec &rkeys(v_columns->rowkey_);
861  int rk_size(rkeys.size());
862  size_t idx(0);
863  for (; (int) idx < rk_size; idx++) {
864  boost::apply_visitor(boost::bind(values_binder, _1, idx), rkeys[idx]);
865  }
866  // Columns
867  const GenDb::NewColVec &columns(v_columns->columns_);
868  assert(columns.size() == 1);
869  const GenDb::NewCol &column(columns[0]);
870  assert(column.cftype_ == GenDb::NewCf::COLUMN_FAMILY_NOSQL);
871  // Column Names
872  const GenDb::DbDataValueVec &cnames(*column.name.get());
873  int cn_size(cnames.size());
874  for (int i = 0; i < cn_size; i++, idx++) {
875  boost::apply_visitor(boost::bind(values_binder, _1, idx), cnames[i]);
876  }
877  // Column Values
878  const GenDb::DbDataValueVec &cvalues(*column.value.get());
879  if (cvalues.size() > 0) {
880  boost::apply_visitor(boost::bind(values_binder, _1, idx++),
881  cvalues[0]);
882  }
883  CassError rc(cci->CassStatementBindInt32(statement, idx++,
884  (cass_int32_t)column.ttl));
885  assert(rc == CASS_OK);
886  return true;
887 }
888 
889 static std::string CassSelectFromTableInternal(const std::string &table,
890  const std::vector<GenDb::DbDataValueVec> &rkeys,
891  const GenDb::ColumnNameRange &ck_range,
892  const GenDb::FieldNamesToReadVec &read_vec,
893  const GenDb::WhereIndexInfoVec &where_vec) {
894  std::ostringstream query;
895  // Table
896  if (read_vec.empty()) {
897  query << "SELECT * FROM " << table;
898  } else {
899  query << "SELECT ";
900  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
901  it != read_vec.end(); it++) {
902  query << it->get<0>() << ",";
903  bool read_timestamp = it->get<3>();
904  if (read_timestamp) {
905  query << "WRITETIME(" << it->get<0>() << "),";
906  }
907  }
908  query.seekp(-1, query.cur);
909  query << " FROM " << table;
910  }
911  if (rkeys.size() == 1) {
912  GenDb::DbDataValueVec rkey = rkeys[0];
913  int rk_size(rkey.size());
914  CassQueryPrinter cprinter(query);
915  for (int i = 0; i < rk_size; i++) {
916  if (i) {
917  int key_num(i + 1);
918  query << " AND key" << key_num << "=";
919  } else {
920  query << " WHERE key=";
921  }
922  boost::apply_visitor(cprinter, rkey[i]);
923  }
924 
925  } else if (rkeys.size() > 1) {
926  query << " WHERE key IN (";
927  BOOST_FOREACH(GenDb::DbDataValueVec rkey, rkeys) {
928  int rk_size(rkey.size());
929  assert(rk_size == 1);
930  CassQueryPrinter cprinter(query);
931  boost::apply_visitor(cprinter, rkey[0]);
932  query << ",";
933  }
934  query.seekp(-1, query.cur);
935  query << ")";
936  }
937  if (!where_vec.empty()) {
938  for (GenDb::WhereIndexInfoVec::const_iterator it = where_vec.begin();
939  it != where_vec.end(); ++it) {
940  std::ostringstream value_ss;
941  CassQueryPrinter value_vprinter(value_ss);
942  boost::apply_visitor(value_vprinter, it->get<2>());
943  query << " AND";
944  query << " " << it->get<0>();
945  query << " " << GenDb::Op::ToString(it->get<1>());
946  query << " " << value_ss.str();
947  }
948  }
949  if (!ck_range.IsEmpty()) {
950  if (!ck_range.start_.empty()) {
951  int ck_start_size(ck_range.start_.size());
952  std::ostringstream start_ss;
953  start_ss << " " << GenDb::Op::ToString(ck_range.start_op_) << " (";
954  CassQueryPrinter start_vprinter(start_ss);
955  query << " AND (";
956  for (int i = 0; i < ck_start_size; i++) {
957  if (i) {
958  query << ", ";
959  start_ss << ", ";
960  }
961  int cnum(i + 1);
962  query << "column" << cnum;
963  boost::apply_visitor(start_vprinter, ck_range.start_[i]);
964  }
965  query << ")";
966  start_ss << ")";
967  query << start_ss.str();
968  }
969  if (!ck_range.finish_.empty()) {
970  int ck_finish_size(ck_range.finish_.size());
971  std::ostringstream finish_ss;
972  finish_ss << " " << GenDb::Op::ToString(ck_range.finish_op_) <<
973  " (";
974  CassQueryPrinter finish_vprinter(finish_ss);
975  query << " AND (";
976  for (int i = 0; i < ck_finish_size; i++) {
977  if (i) {
978  query << ", ";
979  finish_ss << ", ";
980  }
981  int cnum(i + 1);
982  query << "column" << cnum;
983  boost::apply_visitor(finish_vprinter, ck_range.finish_[i]);
984  }
985  query << ")";
986  finish_ss << ")";
987  query << finish_ss.str();
988  }
989  if (ck_range.count_) {
990  query << " LIMIT " << ck_range.count_;
991  }
992  }
993  if (where_vec.size() > 1) {
994  query << " ALLOW FILTERING";
995  }
996  return query.str();
997 }
998 
1000  const std::string &table, const GenDb::DbDataValueVec &rkeys,
1001  const GenDb::ColumnNameRange &ck_range,
1002  const GenDb::WhereIndexInfoVec &where_vec,
1003  const GenDb::FieldNamesToReadVec &read_vec) {
1004  std::vector<GenDb::DbDataValueVec> rkey_vec;
1005  rkey_vec.push_back(rkeys);
1006  return CassSelectFromTableInternal(table, rkey_vec, ck_range,
1007  read_vec, where_vec);
1008 }
1009 
1010 std::string PartitionKey2CassSelectFromTable(const std::string &table,
1011  const GenDb::DbDataValueVec &rkeys) {
1012  std::vector<GenDb::DbDataValueVec> rkey_vec;
1013  rkey_vec.push_back(rkeys);
1014  return CassSelectFromTableInternal(table, rkey_vec, GenDb::ColumnNameRange(),
1017 }
1018 
1020  const std::string &table, const GenDb::DbDataValueVec &rkeys,
1021  const GenDb::ColumnNameRange &ck_range,
1022  const GenDb::FieldNamesToReadVec &read_vec) {
1023  std::vector<GenDb::DbDataValueVec> rkey_vec;
1024  rkey_vec.push_back(rkeys);
1025  return CassSelectFromTableInternal(table, rkey_vec, ck_range, read_vec,
1027 }
1028 
1030  const std::string &table, const std::vector<GenDb::DbDataValueVec> &rkeys,
1031  const GenDb::ColumnNameRange &ck_range,
1032  const GenDb::FieldNamesToReadVec &read_vec) {
1033  return CassSelectFromTableInternal(table, rkeys, ck_range, read_vec,
1035 }
1036 
1037 std::string CassSelectFromTable(const std::string &table) {
1038  std::vector<GenDb::DbDataValueVec> rkey_vec;
1039  return CassSelectFromTableInternal(table, rkey_vec,
1042 }
1043 
1045  interface::CassLibrary *cci, const CassValue *cvalue) {
1046  if (cci->CassValueIsNull(cvalue)) {
1047  return GenDb::DbDataValue();
1048  }
1049  CassValueType cvtype(cci->GetCassValueType(cvalue));
1050  switch (cvtype) {
1051  case CASS_VALUE_TYPE_ASCII:
1052  case CASS_VALUE_TYPE_VARCHAR:
1053  case CASS_VALUE_TYPE_TEXT: {
1054  CassString ctstring;
1055  CassError rc(cci->CassValueGetString(cvalue, &ctstring.data,
1056  &ctstring.length));
1057  assert(rc == CASS_OK);
1058  return std::string(ctstring.data, ctstring.length);
1059  }
1060  case CASS_VALUE_TYPE_UUID: {
1061  CassUuid ctuuid;
1062  CassError rc(cci->CassValueGetUuid(cvalue, &ctuuid));
1063  assert(rc == CASS_OK);
1065  encode_uuid((char *)&u, ctuuid);
1066  return u;
1067  }
1068  case CASS_VALUE_TYPE_DOUBLE: {
1069  cass_double_t ctdouble;
1070  CassError rc(cci->CassValueGetDouble(cvalue, &ctdouble));
1071  assert(rc == CASS_OK);
1072  return (double)ctdouble;
1073  }
1074  case CASS_VALUE_TYPE_TINY_INT: {
1075  cass_int8_t ct8;
1076  CassError rc(cci->CassValueGetInt8(cvalue, &ct8));
1077  assert(rc == CASS_OK);
1078  return (uint8_t)ct8;
1079  }
1080  case CASS_VALUE_TYPE_SMALL_INT: {
1081  cass_int16_t ct16;
1082  CassError rc(cci->CassValueGetInt16(cvalue, &ct16));
1083  assert(rc == CASS_OK);
1084  return (uint16_t)ct16;
1085  }
1086  case CASS_VALUE_TYPE_INT: {
1087  cass_int32_t ct32;
1088  CassError rc(cci->CassValueGetInt32(cvalue, &ct32));
1089  assert(rc == CASS_OK);
1090  return (uint32_t)ct32;
1091  }
1092  case CASS_VALUE_TYPE_BIGINT: {
1093  cass_int64_t ct64;
1094  CassError rc(cci->CassValueGetInt64(cvalue, &ct64));
1095  assert(rc == CASS_OK);
1096  return (uint64_t)ct64;
1097  }
1098  case CASS_VALUE_TYPE_INET: {
1099  CassInet ctinet;
1100  CassError rc(cci->CassValueGetInet(cvalue, &ctinet));
1101  assert(rc == CASS_OK);
1102  IpAddress ipaddr;
1103  if (ctinet.address_length == CASS_INET_V4_LENGTH) {
1104  Ip4Address::bytes_type ipv4;
1105  memcpy(GENERIC_RAW_ARRAY(ipv4), ctinet.address, CASS_INET_V4_LENGTH);
1106  ipaddr = Ip4Address(ipv4);
1107  } else if (ctinet.address_length == CASS_INET_V6_LENGTH) {
1108  Ip6Address::bytes_type ipv6;
1109  memcpy(GENERIC_RAW_ARRAY(ipv6), ctinet.address, CASS_INET_V6_LENGTH);
1110  ipaddr = Ip6Address(ipv6);
1111  } else {
1112  assert(0);
1113  }
1114  return ipaddr;
1115  }
1116  case CASS_VALUE_TYPE_BLOB: {
1117  const cass_byte_t *bytes(NULL);
1118  size_t size(0);
1119  CassError rc(cci->CassValueGetBytes(cvalue, &bytes, &size));
1120  assert(rc == CASS_OK);
1121  return GenDb::Blob(bytes, size);
1122  }
1123  case CASS_VALUE_TYPE_UNKNOWN: {
1124  // null type
1125  return GenDb::DbDataValue();
1126  }
1127  default: {
1128  CQLIF_ERR_TRACE("Unhandled CassValueType: " << cvtype);
1129  assert(false && "Unhandled value type");
1130  return GenDb::DbDataValue();
1131  }
1132  }
1133 }
1134 
1136  CassSession *session, const char* query,
1137  CassPreparedPtr *prepared) {
1138  CQLIF_DEBUG_TRACE( "PrepareSync: " << query);
1139  CassFuturePtr future(cci->CassSessionPrepare(session, query), cci);
1140  cci->CassFutureWait(future.get());
1141 
1142  CassError rc(cci->CassFutureErrorCode(future.get()));
1143  if (rc != CASS_OK) {
1144  CassString err;
1145  cci->CassFutureErrorMessage(future.get(), &err.data, &err.length);
1146  CQLIF_ERR_TRACE("PrepareSync: " << query << " FAILED: " <<
1147  std::string(err.data, err.length));
1148  } else {
1149  *prepared = CassPreparedPtr(cci->CassFutureGetPrepared(future.get()),
1150  cci);
1151  }
1152  return rc == CASS_OK;
1153 }
1154 
1156  CassSession *session,
1157  CassStatement *qstatement, CassResultPtr *result,
1158  CassConsistency consistency) {
1159  cci->CassStatementSetConsistency(qstatement, consistency);
1160  CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci);
1161  cci->CassFutureWait(future.get());
1162 
1163  CassError rc(cci->CassFutureErrorCode(future.get()));
1164  if (rc != CASS_OK) {
1165  CassString err;
1166  cci->CassFutureErrorMessage(future.get(), &err.data, &err.length);
1167  CQLIF_ERR_TRACE("SyncQuery: FAILED: " <<
1168  std::string(err.data, err.length));
1169  } else {
1170  if (result) {
1171  *result = CassResultPtr(cci->CassFutureGetResult(future.get()),
1172  cci);
1173  }
1174  }
1175  return rc == CASS_OK;
1176 }
1177 
1179  CassSession *session, const char *query, CassConsistency consistency) {
1180  CQLIF_DEBUG_TRACE( "SyncQuery: " << query);
1181  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1182  return ExecuteQuerySyncInternal(cci, session, statement.get(), NULL,
1183  consistency);
1184 }
1185 
1187  CassSession *session, const char *query,
1188  CassResultPtr *result, CassConsistency consistency) {
1189  CQLIF_DEBUG_TRACE( "SyncQuery: " << query);
1190  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1191  return ExecuteQuerySyncInternal(cci, session, statement.get(), result,
1192  consistency);
1193 }
1194 
1196  CassSession *session, CassStatement *statement,
1197  CassConsistency consistency) {
1198  return ExecuteQuerySyncInternal(cci, session, statement, NULL,
1199  consistency);
1200 }
1201 
1203  switch (rc) {
1204  case CASS_OK:
1205  return GenDb::DbOpResult::OK;
1206  case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1207  case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
1208  case CASS_ERROR_LIB_NO_AVAILABLE_IO_THREAD:
1210  default:
1211  return GenDb::DbOpResult::ERROR;
1212  }
1213 }
1214 
1216  CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec,
1217  GenDb::NewColVec *v_columns) {
1218  // Row iterator
1219  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1220  while (cci->CassIteratorNext(riterator.get())) {
1221  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1225  int i = 0;
1226  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1227  it != read_vec.end(); it++) {
1228  bool row_key = it->get<1>();
1229  bool row_column = it->get<2>();
1230  bool read_timestamp = it->get<3>();
1231  if (row_key) {
1232  i++;
1233  continue;
1234  }
1235  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1236  assert(cvalue);
1237  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1238  if (row_column) {
1239  cnames->push_back(db_value);
1240  } else {
1241  values->push_back(db_value);
1242  if (read_timestamp) {
1243  i++;
1244  const CassValue *ctimestamp(cci->CassRowGetColumn(row, i));
1245  assert(ctimestamp);
1246  GenDb::DbDataValue time_value(CassValue2DbDataValue(cci, ctimestamp));
1247  timestamps->push_back(time_value);
1248  }
1249  }
1250  i++;
1251  }
1252  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0, timestamps));
1253  v_columns->push_back(column);
1254  }
1255 }
1256 
1258  CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec,
1259  GenDb::ColListVec *v_col_list) {
1260  std::auto_ptr<GenDb::ColList> col_list;
1261  // Row iterator
1262  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1263  while (cci->CassIteratorNext(riterator.get())) {
1264  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1265  GenDb::DbDataValueVec rkey;
1269  int i = 0;
1270  for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1271  it != read_vec.end(); it++) {
1272  bool row_key = it->get<1>();
1273  bool row_column = it->get<2>();
1274  bool read_timestamp = it->get<3>();
1275  if (row_key) {
1276  // Partiiton key
1277  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1278  assert(cvalue);
1279  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1280  rkey.push_back(db_value);
1281  i++;
1282  continue;
1283  }
1284  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1285  assert(cvalue);
1286  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1287  if (row_column) {
1288  cnames->push_back(db_value);
1289  } else {
1290  values->push_back(db_value);
1291  if (read_timestamp) {
1292  i++;
1293  const CassValue *ctimestamp(cci->CassRowGetColumn(row, i));
1294  assert(ctimestamp);
1295  GenDb::DbDataValue time_value(CassValue2DbDataValue(cci, ctimestamp));
1296  timestamps->push_back(time_value);
1297  }
1298  }
1299  i++;
1300  }
1301  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0, timestamps));
1302  // Do we need a new ColList?
1303  if (!col_list.get()) {
1304  col_list.reset(new GenDb::ColList);
1305  col_list->rowkey_ = rkey;
1306  }
1307  if (rkey != col_list->rowkey_) {
1308  v_col_list->push_back(col_list.release());
1309  col_list.reset(new GenDb::ColList);
1310  col_list->rowkey_ = rkey;
1311  }
1312  GenDb::NewColVec *v_columns(&col_list->columns_);
1313  v_columns->push_back(column);
1314  }
1315  if (col_list.get()) {
1316  v_col_list->push_back(col_list.release());
1317  }
1318 }
1319 
1321  CassResultPtr *result, size_t rk_count,
1322  size_t ck_count, GenDb::NewColVec *v_columns) {
1323  // Row iterator
1324  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1325  while (cci->CassIteratorNext(riterator.get())) {
1326  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1327  // Iterate over columns
1328  size_t ccount(cci->CassResultColumnCount(result->get()));
1329  // Clustering key
1331  for (size_t i = rk_count; i < rk_count + ck_count; i++) {
1332  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1333  assert(cvalue);
1334  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1335  cnames->push_back(db_value);
1336  }
1337  // Values
1339  for (size_t i = rk_count + ck_count; i < ccount; i++) {
1340  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1341  assert(cvalue);
1342  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1343  values->push_back(db_value);
1344  }
1345  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0));
1346  v_columns->push_back(column);
1347  }
1348 }
1349 
1351  CassResultPtr *result, size_t rk_count,
1352  size_t ck_count, GenDb::ColListVec *v_col_list) {
1353  std::auto_ptr<GenDb::ColList> col_list;
1354  // Row iterator
1355  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1356  while (cci->CassIteratorNext(riterator.get())) {
1357  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1358  // Iterate over columns
1359  size_t ccount(cci->CassResultColumnCount(result->get()));
1360  // Partiiton key
1361  GenDb::DbDataValueVec rkey;
1362  for (size_t i = 0; i < rk_count; i++) {
1363  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1364  assert(cvalue);
1365  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1366  rkey.push_back(db_value);
1367  }
1368  // Clustering key
1370  for (size_t i = rk_count; i < rk_count + ck_count; i++) {
1371  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1372  assert(cvalue);
1373  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1374  cnames->push_back(db_value);
1375  }
1376  // Values
1378  for (size_t i = rk_count + ck_count; i < ccount; i++) {
1379  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1380  assert(cvalue);
1381  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1382  values->push_back(db_value);
1383  }
1384  GenDb::NewCol *column(new GenDb::NewCol(cnames, values, 0));
1385  // Do we need a new ColList?
1386  if (!col_list.get()) {
1387  col_list.reset(new GenDb::ColList);
1388  col_list->rowkey_ = rkey;
1389  }
1390  if (rkey != col_list->rowkey_) {
1391  v_col_list->push_back(col_list.release());
1392  col_list.reset(new GenDb::ColList);
1393  col_list->rowkey_ = rkey;
1394  }
1395  GenDb::NewColVec *v_columns(&col_list->columns_);
1396  v_columns->push_back(column);
1397  }
1398  if (col_list.get()) {
1399  v_col_list->push_back(col_list.release());
1400  }
1401 }
1402 
1404  CassResultPtr *result, GenDb::NewColVec *v_columns) {
1405  // Row iterator
1406  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1407  while (cci->CassIteratorNext(riterator.get())) {
1408  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1409  // Iterate over columns
1410  size_t ccount(cci->CassResultColumnCount(result->get()));
1411  for (size_t i = 0; i < ccount; i++) {
1412  CassString cname;
1413  CassError rc(cci->CassResultColumnName(result->get(), i,
1414  &cname.data, &cname.length));
1415  assert(rc == CASS_OK);
1416  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1417  assert(cvalue);
1418  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1419  if (db_value.which() == GenDb::DB_VALUE_BLANK) {
1420  continue;
1421  }
1422  GenDb::NewCol *column(new GenDb::NewCol(
1423  std::string(cname.data, cname.length), db_value, 0));
1424  v_columns->push_back(column);
1425  }
1426  }
1427 }
1428 
1430  CassResultPtr *result, size_t rk_count, GenDb::ColListVec *v_col_list) {
1431  std::auto_ptr<GenDb::ColList> col_list;
1432  // Row iterator
1433  CassIteratorPtr riterator(cci->CassIteratorFromResult(result->get()), cci);
1434  while (cci->CassIteratorNext(riterator.get())) {
1435  const CassRow *row(cci->CassIteratorGetRow(riterator.get()));
1436  // Iterate over columns
1437  size_t ccount(cci->CassResultColumnCount(result->get()));
1438  // Partiiton key
1439  GenDb::DbDataValueVec rkey;
1440  for (size_t i = 0; i < rk_count; i++) {
1441  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1442  assert(cvalue);
1443  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1444  rkey.push_back(db_value);
1445  }
1446  // Do we need a new ColList?
1447  if (!col_list.get()) {
1448  col_list.reset(new GenDb::ColList);
1449  col_list->rowkey_ = rkey;
1450  }
1451  if (rkey != col_list->rowkey_) {
1452  v_col_list->push_back(col_list.release());
1453  col_list.reset(new GenDb::ColList);
1454  col_list->rowkey_ = rkey;
1455  }
1456  GenDb::NewColVec *v_columns(&col_list->columns_);
1457  for (size_t i = 0; i < ccount; i++) {
1458  CassString cname;
1459  CassError rc(cci->CassResultColumnName(result->get(), i,
1460  &cname.data, &cname.length));
1461  assert(rc == CASS_OK);
1462  const CassValue *cvalue(cci->CassRowGetColumn(row, i));
1463  assert(cvalue);
1464  GenDb::DbDataValue db_value(CassValue2DbDataValue(cci, cvalue));
1465  if (db_value.which() == GenDb::DB_VALUE_BLANK) {
1466  continue;
1467  }
1468  GenDb::NewCol *column(new GenDb::NewCol(
1469  std::string(cname.data, cname.length), db_value, 0));
1470  v_columns->push_back(column);
1471  }
1472  }
1473  if (col_list.get()) {
1474  v_col_list->push_back(col_list.release());
1475  }
1476 }
1477 
1478 static void OnExecuteQueryAsync(CassFuture *future, void *data) {
1479  assert(data);
1480  std::auto_ptr<CassAsyncQueryContext> ctx(
1481  boost::reinterpret_pointer_cast<CassAsyncQueryContext>(data));
1482  interface::CassLibrary *cci(ctx->cci_);
1483  CassError rc(cci->CassFutureErrorCode(future));
1485  if (rc != CASS_OK) {
1486  CassString err;
1487  cci->CassFutureErrorMessage(future, &err.data, &err.length);
1488  CQLIF_ERR_TRACE("AsyncQuery: " << ctx->query_id_ << " FAILED: "
1489  << std::string(err.data, err.length));
1490  ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1491  return;
1492  }
1493  if (ctx->result_ctx_) {
1494  CassQueryResultContext *rctx(ctx->result_ctx_.get());
1495  CassResultPtr result(cci->CassFutureGetResult(future), cci);
1496  // In case of select parse the results
1497  if (cci->CassResultColumnCount(result.get())) {
1498  std::auto_ptr<GenDb::ColList> col_list(new GenDb::ColList);
1499  col_list->cfname_ = rctx->cf_name_;
1500  col_list->rowkey_ = rctx->row_key_;
1501  if (rctx->is_dynamic_cf_) {
1502  DynamicCfGetResult(cci, &result, rctx->rk_count_,
1503  rctx->ck_count_, &col_list->columns_);
1504  } else {
1505  StaticCfGetResult(cci, &result, &col_list->columns_);
1506  }
1507  ctx->cb_(db_rc, col_list);
1508  return;
1509  }
1510  }
1511  ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1512 }
1513 
1515  CassSession *session, const char *qid, CassStatement *qstatement,
1516  CassConsistency consistency, CassAsyncQueryCallback cb,
1517  CassQueryResultContext *rctx = NULL) {
1518  CQLIF_DEBUG_TRACE( "AsyncQuery: " << qid);
1519  cci->CassStatementSetConsistency(qstatement, consistency);
1520  CassFuturePtr future(cci->CassSessionExecute(session, qstatement), cci);
1521  std::auto_ptr<CassAsyncQueryContext> ctx(
1522  new CassAsyncQueryContext(qid, cb, cci, rctx));
1523  cci->CassFutureSetCallback(future.get(), OnExecuteQueryAsync,
1524  ctx.release());
1525 }
1526 
1528  CassSession *session, const char *query,
1529  CassConsistency consistency, CassAsyncQueryCallback cb) {
1530  CQLIF_DEBUG_TRACE( "AsyncQuery: " << query);
1531  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1532  ExecuteQueryAsyncInternal(cci, session, query, statement.get(),
1533  consistency, cb);
1534 }
1535 
1537  CassSession *session, const char *query_id, CassStatement *qstatement,
1538  CassConsistency consistency, CassAsyncQueryCallback cb) {
1539  ExecuteQueryAsyncInternal(cci, session, query_id, qstatement, consistency,
1540  cb);
1541 }
1542 
1544  CassSession *session, const char *query, CassConsistency consistency,
1546  CassStatementPtr statement(cci->CassStatementNew(query, 0), cci);
1547  ExecuteQueryAsyncInternal(cci, session, query, statement.get(),
1548  consistency, cb, rctx);
1549 }
1550 
1552  CassSession *session, const char *query, CassConsistency consistency,
1553  impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count,
1554  const std::string &cfname, const GenDb::DbDataValueVec &row_key) {
1555  std::auto_ptr<CassQueryResultContext> rctx(
1556  new CassQueryResultContext(cfname, true, row_key,
1557  rk_count, ck_count));
1558  ExecuteQueryResultAsync(cci, session, query, consistency, cb,
1559  rctx.release());
1560  return true;
1561 }
1562 
1564  CassSession *session, const char *query,
1565  const GenDb::FieldNamesToReadVec &read_vec,
1566  CassConsistency consistency, GenDb::NewColVec *v_columns) {
1567  CassResultPtr result(NULL, cci);
1568  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1569  consistency));
1570  if (!success) {
1571  return success;
1572  }
1573  DynamicCfGetResult(cci, &result, read_vec, v_columns);
1574  return success;
1575 }
1576 
1578  CassSession *session, const char *query,
1579  const GenDb::FieldNamesToReadVec &read_vec,
1580  CassConsistency consistency, GenDb::ColListVec *v_columns) {
1581  CassResultPtr result(NULL, cci);
1582  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1583  consistency));
1584  if (!success) {
1585  return success;
1586  }
1587  DynamicCfGetResult(cci, &result, read_vec, v_columns);
1588  return success;
1589 }
1590 
1592  CassSession *session, const char *query,
1593  size_t rk_count, size_t ck_count, CassConsistency consistency,
1594  GenDb::NewColVec *v_columns) {
1595  CassResultPtr result(NULL, cci);
1596  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1597  consistency));
1598  if (!success) {
1599  return success;
1600  }
1601  DynamicCfGetResult(cci, &result, rk_count, ck_count, v_columns);
1602  return success;
1603 }
1604 
1606  CassSession *session, const char *query,
1607  size_t rk_count, size_t ck_count, CassConsistency consistency,
1608  GenDb::ColListVec *v_col_list) {
1609  CassResultPtr result(NULL, cci);
1610  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1611  consistency));
1612  if (!success) {
1613  return success;
1614  }
1615  DynamicCfGetResult(cci, &result, rk_count, ck_count, v_col_list);
1616  return success;
1617 }
1618 
1620  CassSession *session, const char *query,
1621  CassConsistency consistency, impl::CassAsyncQueryCallback cb,
1622  const std::string& cfname, const GenDb::DbDataValueVec &row_key) {
1623  std::auto_ptr<CassQueryResultContext> rctx(
1624  new CassQueryResultContext(cfname, false, row_key));
1625  ExecuteQueryResultAsync(cci, session, query, consistency, cb,
1626  rctx.release());
1627  return true;
1628 }
1629 
1631  CassSession *session, const char *query,
1632  CassConsistency consistency, GenDb::NewColVec *v_columns) {
1633  CassResultPtr result(NULL, cci);
1634  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1635  consistency));
1636  if (!success) {
1637  return success;
1638  }
1639  StaticCfGetResult(cci, &result, v_columns);
1640  return success;
1641 }
1642 
1644  CassSession *session, const char *query, size_t rk_count,
1645  CassConsistency consistency, GenDb::ColListVec *v_col_list) {
1646  CassResultPtr result(NULL, cci);
1647  bool success(ExecuteQueryResultSync(cci, session, query, &result,
1648  consistency));
1649  if (!success) {
1650  return success;
1651  }
1652  StaticCfGetResult(cci, &result, rk_count, v_col_list);
1653  return success;
1654 }
1655 
1657  CassFuture *future) {
1658  cci->CassFutureWait(future);
1659  CassError rc(cci->CassFutureErrorCode(future));
1660  if (rc != CASS_OK) {
1661  CassString err;
1662  cci->CassFutureErrorMessage(future, &err.data, &err.length);
1663  CQLIF_ERR_TRACE("SyncWait: FAILED: " <<
1664  std::string(err.data, err.length));
1665  }
1666  return rc == CASS_OK;
1667 }
1668 
1669 static const CassTableMeta * GetCassTableMeta(
1670  interface::CassLibrary *cci, const CassSchemaMeta *schema_meta,
1671  const std::string &keyspace, const std::string &table, bool log_error) {
1672  const CassKeyspaceMeta *keyspace_meta(
1673  cci->CassSchemaMetaKeyspaceByName(schema_meta, keyspace.c_str()));
1674  if (keyspace_meta == NULL) {
1675  if (log_error) {
1676  CQLIF_ERR_TRACE("No keyspace schema: Keyspace: " << keyspace <<
1677  ", Table: " << table);
1678  }
1679  return NULL;
1680  }
1681  std::string table_lower(table);
1682  boost::algorithm::to_lower(table_lower);
1683  const CassTableMeta *table_meta(
1684  cci->CassKeyspaceMetaTableByName(keyspace_meta, table_lower.c_str()));
1685  if (table_meta == NULL) {
1686  if (log_error) {
1687  CQLIF_ERR_TRACE("No table schema: Keyspace: " << keyspace <<
1688  ", Table: " << table_lower);
1689  }
1690  return NULL;
1691  }
1692  return table_meta;
1693 }
1694 
1696  CassSession *session,
1697  const std::string &keyspace, const std::string &table) {
1699  session), cci);
1700  if (schema_meta.get() == NULL) {
1701  CQLIF_DEBUG_TRACE( "No schema meta: Keyspace: " << keyspace <<
1702  ", Table: " << table);
1703  return false;
1704  }
1705  bool log_error(false);
1706  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1707  schema_meta.get(), keyspace, table, log_error));
1708  if (table_meta == NULL) {
1709  return false;
1710  }
1711  return true;
1712 }
1713 
1716  CassSession *session, const std::string &keyspace,
1717  const std::string &table, size_t *ck_count) {
1719  session), cci);
1720  if (schema_meta.get() == NULL) {
1721  CQLIF_ERR_TRACE("No schema meta: Keyspace: " << keyspace <<
1722  ", Table: " << table);
1723  return false;
1724  }
1725  bool log_error(true);
1726  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1727  schema_meta.get(), keyspace, table, log_error));
1728  if (table_meta == NULL) {
1729  return false;
1730  }
1731  *ck_count = cci->CassTableMetaClusteringKeyCount(table_meta);
1732  return true;
1733 }
1734 
1736  interface::CassLibrary *cci, CassSession *session,
1737  const std::string &keyspace, const std::string &table, size_t *rk_count) {
1739  session), cci);
1740  if (schema_meta.get() == NULL) {
1741  CQLIF_ERR_TRACE("No schema meta: Keyspace: " << keyspace <<
1742  ", Table: " << table);
1743  return false;
1744  }
1745  bool log_error(true);
1746  const CassTableMeta *table_meta(impl::GetCassTableMeta(cci,
1747  schema_meta.get(), keyspace, table, log_error));
1748  if (table_meta == NULL) {
1749  return false;
1750  }
1751  *rk_count = cci->CassTableMetaPartitionKeyCount(table_meta);
1752  return true;
1753 }
1754 
1755 static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel) {
1756  switch (clevel) {
1757  case CASS_LOG_DISABLED:
1758  return log4cplus::OFF_LOG_LEVEL;
1759  case CASS_LOG_CRITICAL:
1760  return log4cplus::FATAL_LOG_LEVEL;
1761  case CASS_LOG_ERROR:
1762  return log4cplus::ERROR_LOG_LEVEL;
1763  case CASS_LOG_WARN:
1764  return log4cplus::WARN_LOG_LEVEL;
1765  case CASS_LOG_INFO:
1766  return log4cplus::INFO_LOG_LEVEL;
1767  case CASS_LOG_DEBUG:
1768  return log4cplus::DEBUG_LOG_LEVEL;
1769  case CASS_LOG_TRACE:
1770  return log4cplus::TRACE_LOG_LEVEL;
1771  default:
1772  return log4cplus::ALL_LOG_LEVEL;
1773  }
1774 }
1775 
1776 static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level) {
1777  switch (level) {
1778  case log4cplus::OFF_LOG_LEVEL:
1779  return CASS_LOG_DISABLED;
1780  case log4cplus::FATAL_LOG_LEVEL:
1781  return CASS_LOG_CRITICAL;
1782  case log4cplus::ERROR_LOG_LEVEL:
1783  return CASS_LOG_ERROR;
1784  case log4cplus::WARN_LOG_LEVEL:
1785  return CASS_LOG_WARN;
1786  case log4cplus::INFO_LOG_LEVEL:
1787  return CASS_LOG_INFO;
1788  case log4cplus::DEBUG_LOG_LEVEL:
1789  return CASS_LOG_DEBUG;
1790  case log4cplus::TRACE_LOG_LEVEL:
1791  return CASS_LOG_TRACE;
1792  default:
1793  assert(false && "Invalid Log4Level");
1794  return CASS_LOG_DISABLED;
1795  }
1796 }
1797 
1798 static void CassLibraryLog(const CassLogMessage* message, void *data) {
1799  if (LoggingDisabled()) {
1800  return;
1801  }
1802  log4cplus::LogLevel log4level(Cass2log4Level(message->severity));
1803  std::stringstream buf;
1804  buf << "CassLibrary: " << message->file << ":" << message->line <<
1805  " " << message->function << "] " << message->message;
1806  CASS_LIB_TRACE(log4level, buf.str());
1807 }
1808 
1809 static std::string LoadCertFile(const std::string &ca_certs_path) {
1810  if (ca_certs_path.length() == 0) {
1811  return std::string();
1812  }
1813  std::ifstream file(ca_certs_path.c_str());
1814  if (!file) {
1815  return std::string();
1816  }
1817  std::string content((std::istreambuf_iterator<char>(file)),
1818  std::istreambuf_iterator<char>());
1819  return content;
1820 }
1821 
1822 
1823 class WorkerTask : public Task {
1824  public:
1825  typedef boost::function<void(void)> FunctionPtr;
1826  WorkerTask(FunctionPtr func, int task_id, int task_instance) :
1827  Task(task_id, task_instance),
1828  func_(func) {
1829  }
1830  bool Run() {
1831  func_();
1832  return true;
1833  }
1834  std::string Description() const {
1835  return "cass::cql::impl::WorkerTask";
1836  }
1837  private:
1839 };
1840 
1841 } // namespace impl
1842 
1843 //
1844 // CqlIfImpl
1845 //
1846 CqlIfImpl::CqlIfImpl(EventManager *evm,
1847  const std::vector<std::string> &cassandra_ips,
1848  int cassandra_port,
1849  const std::string &cassandra_user,
1850  const std::string &cassandra_password,
1851  bool use_ssl,
1852  const std::string &ca_certs_path,
1853  interface::CassLibrary *cci) :
1854  evm_(evm),
1855  cci_(cci),
1856  cluster_(cci_->CassClusterNew(), cci_),
1857  ssl_(0, cci_),
1858  session_(cci_->CassSessionNew(), cci_),
1859  schema_session_(cci_->CassSessionNew(), cci_),
1860  keyspace_(),
1861  io_thread_count_(2) {
1862  // Set session state to INIT
1865 
1866  if (cassandra_ips.size() > 0) {
1867  schema_contact_point_ = cassandra_ips[0];
1868  boost::system::error_code ec;
1869  boost::asio::ip::address::from_string(cassandra_ips[0], ec);
1870  if(ec.value() != 0){
1872  evm->io_service(), cassandra_ips[0]);
1873  }
1874  }
1875 
1876  if (use_ssl) {
1877  ssl_ = impl::CassSslPtr(cci->CassSslNew(), cci_);
1878  /* Only verify the certification and not the identity */
1879  cci_->CassSslSetVerifyFlags(ssl_.get(), CASS_SSL_VERIFY_PEER_CERT);
1880  std::string content = impl::LoadCertFile(ca_certs_path);
1881  if (content.length() == 0) {
1882  cci_->CassSslSetVerifyFlags(ssl_.get(), CASS_SSL_VERIFY_NONE);
1883  } else {
1884  cci_->CassSslAddTrustedCert(ssl_.get(), content);
1885  }
1886  cci_->CassClusterSetSsl(cluster_.get(), ssl_.get());
1887  }
1888  std::string contact_points(boost::algorithm::join(cassandra_ips, ","));
1889  cci_->CassClusterSetContactPoints(cluster_.get(), contact_points.c_str());
1890  cci_->CassClusterSetPort(cluster_.get(), cassandra_port);
1891  // Set credentials for plain text authentication
1892  if (!cassandra_user.empty() && !cassandra_password.empty()) {
1893  cci_->CassClusterSetCredentials(cluster_.get(), cassandra_user.c_str(),
1894  cassandra_password.c_str());
1895  }
1896  // Set number of IO threads to half the number of cores
1902 }
1903 
1905  assert(session_state_ == SessionState::INIT ||
1909 }
1910 
1911 bool CqlIfImpl::CreateKeyspaceIfNotExistsSync(const std::string &keyspace,
1912  const std::string &replication_factor, CassConsistency consistency) {
1914  return false;
1915  }
1916  char buf[512];
1917  int n(snprintf(buf, sizeof(buf), kQCreateKeyspaceIfNotExists,
1918  keyspace.c_str(), replication_factor.c_str()));
1919  if (n < 0 || n >= (int)sizeof(buf)) {
1920  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1921  keyspace << ", RF: " << replication_factor);
1922  return false;
1923  }
1924  return impl::ExecuteQuerySync(cci_, schema_session_.get(), buf,
1925  consistency);
1926 }
1927 
1928 bool CqlIfImpl::UseKeyspaceSyncOnSchemaSession(const std::string &keyspace,
1929  CassConsistency consistency) {
1931  return false;
1932  }
1933  char buf[512];
1934  int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str()));
1935  if (n < 0 || n >= (int)sizeof(buf)) {
1936  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1937  keyspace);
1938  return false;
1939  }
1940  bool success(impl::ExecuteQuerySync(cci_, schema_session_.get(), buf,
1941  consistency));
1942  if (!success) {
1943  return false;
1944  }
1945  // Update keyspace
1946  keyspace_ = keyspace;
1947  return success;
1948 }
1949 
1950 bool CqlIfImpl::UseKeyspaceSync(const std::string &keyspace,
1951  CassConsistency consistency) {
1953  return false;
1954  }
1955  char buf[512];
1956  int n(snprintf(buf, sizeof(buf), kQUseKeyspace, keyspace.c_str()));
1957  if (n < 0 || n >= (int)sizeof(buf)) {
1958  CQLIF_ERR_TRACE("FAILED (" << n << "): Keyspace: " <<
1959  keyspace);
1960  return false;
1961  }
1962  bool success(impl::ExecuteQuerySync(cci_, session_.get(), buf,
1963  consistency));
1964  if (!success) {
1965  return false;
1966  }
1967  // Update keyspace
1968  keyspace_ = keyspace;
1969  return success;
1970 }
1971 
1973  const std::string &compaction_strategy, CassConsistency consistency) {
1975  return false;
1976  }
1977  // There are two types of tables - Static (SQL) and Dynamic (NOSQL)
1978  // column family. Static column family has more or less fixed rows,
1979  // and dynamic column family has wide rows
1980  std::string query;
1981  switch (cf.cftype_) {
1983  {
1985  compaction_strategy);
1986  break;
1987  }
1989  {
1990  boost::system::error_code ec;
1992  compaction_strategy, &ec);
1993  if (ec) {
1994  return false;
1995  }
1996  break;
1997  }
1998  default:
1999  {
2000  return false;
2001  }
2002  }
2003  return impl::ExecuteQuerySync(cci_, schema_session_.get(), query.c_str(),
2004  consistency);
2005 }
2006 
2007 bool CqlIfImpl::CreateIndexIfNotExistsSync(const std::string &cfname,
2008  const std::string &column, const std::string &indexname,
2009  CassConsistency consistency, const GenDb::ColIndexMode::type index_mode) {
2011  return false;
2012  }
2013  std::string query(impl::CassCreateIndexIfNotExists(cfname, column,
2014  indexname, index_mode));
2015  return impl::ExecuteQuerySync(cci_, schema_session_.get(), query.c_str(),
2016  consistency);
2017 }
2018 
2020  const std::string &table_name(cf.cfname_);
2021  impl::CassPreparedPtr prepared(NULL, cci_);
2022  // Check if the prepared statement exists
2023  if (GetPrepareInsertIntoTable(table_name, &prepared)) {
2024  return true;
2025  }
2026  bool success(PrepareInsertIntoTableSync(cf, &prepared));
2027  if (!success) {
2028  return success;
2029  }
2030  // Store the prepared statement into the map
2031  tbb::mutex::scoped_lock lock(map_mutex_);
2032  success = (insert_prepared_map_.insert(
2033  std::make_pair(table_name, prepared))).second;
2034  assert(success);
2035  return success;
2036 }
2037 
2038 bool CqlIfImpl::GetPrepareInsertIntoTable(const std::string &table_name,
2039  impl::CassPreparedPtr *prepared) const {
2040  tbb::mutex::scoped_lock lock(map_mutex_);
2041  CassPreparedMapType::const_iterator it(
2042  insert_prepared_map_.find(table_name));
2043  if (it == insert_prepared_map_.end()) {
2044  return false;
2045  }
2046  *prepared = it->second;
2047  return true;
2048 }
2049 
2050 bool CqlIfImpl::IsTablePresent(const std::string &table) {
2052  return false;
2053  }
2055  table);
2056 }
2057 
2058 int CqlIfImpl::IsTableStatic(const std::string &table) {
2060  return false;
2061  }
2062  size_t ck_count;
2064  keyspace_, table, &ck_count)) {
2065  return -1;
2066  }
2067  if (ck_count == 0) {
2068  return 1;
2069  } else {
2070  return 0;
2071  }
2072 }
2073 
2074 bool CqlIfImpl::SelectFromTableAsync(const std::string &cfname,
2075  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
2078  return false;
2079  }
2080  std::string query(impl::PartitionKey2CassSelectFromTable(cfname,rkey));
2081  if (IsTableStatic(cfname) == 1) {
2083  query.c_str(), consistency, cb, cfname.c_str(), rkey);
2084  } else if (IsTableStatic(cfname) == 0) {
2085  size_t rk_count;
2087  keyspace_, cfname, &rk_count));
2088  size_t ck_count;
2090  keyspace_, cfname, &ck_count));
2092  query.c_str(), consistency, cb, rk_count, ck_count,
2093  cfname, rkey);
2094  } else {
2095  return false;
2096  }
2097 }
2098 
2100  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
2101  const GenDb::ColumnNameRange &ck_range,
2102  const GenDb::WhereIndexInfoVec &where_vec,
2103  const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency,
2106  return false;
2107  }
2108  std::string query(
2110  rkey, ck_range, where_vec, read_vec));
2111  assert(IsTableDynamic(cfname));
2112  size_t rk_count;
2114  keyspace_, cfname, &rk_count));
2115  size_t ck_count;
2117  keyspace_, cfname, &ck_count));
2119  query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2120  rkey);
2121 }
2122 
2124  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
2125  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2128  return false;
2129  }
2130  std::string query(
2132  rkey, ck_range));
2133  assert(IsTableDynamic(cfname));
2134  size_t rk_count;
2136  keyspace_, cfname, &rk_count));
2137  size_t ck_count;
2139  keyspace_, cfname, &ck_count));
2141  query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2142  rkey);
2143 }
2144 
2145 bool CqlIfImpl::IsTableDynamic(const std::string &table) {
2146  return !IsTableStatic(table);
2147 }
2148 
2149 bool CqlIfImpl::InsertIntoTableSync(std::auto_ptr<GenDb::ColList> v_columns,
2150  CassConsistency consistency) {
2151  return InsertIntoTableInternal(v_columns, consistency, true, NULL);
2152 }
2153 
2154 bool CqlIfImpl::InsertIntoTableAsync(std::auto_ptr<GenDb::ColList> v_columns,
2155  CassConsistency consistency, impl::CassAsyncQueryCallback cb) {
2156  return InsertIntoTableInternal(v_columns, consistency, false, cb);
2157 }
2158 
2159 bool CqlIfImpl::InsertIntoTablePrepareAsync(std::auto_ptr<GenDb::ColList> v_columns,
2160  CassConsistency consistency, impl::CassAsyncQueryCallback cb) {
2161  return InsertIntoTablePrepareInternal(v_columns, consistency, false,
2162  cb);
2163 }
2164 
2165 // COLLECTOR_GLOBAL_TABLE is dynamic table with 6 indexed columns
2166 // for object-id. Some of these might be blank. This creates tombstones
2167 // so we dont want to prepare for inserts.
2168 bool CqlIfImpl::IsInsertIntoTablePrepareSupported(const std::string &table) {
2169  return (IsTableDynamic(table)) &&
2170  (table != "MessageTablev2");
2171 }
2172 
2173 bool CqlIfImpl::SelectFromTableSync(const std::string &cfname,
2174  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
2175  GenDb::NewColVec *out) {
2177  return false;
2178  }
2179  std::string query(impl::PartitionKey2CassSelectFromTable(cfname,
2180  rkey));
2181  if (IsTableStatic(cfname) == 1) {
2183  query.c_str(), consistency, out);
2184  } else if (IsTableStatic(cfname) == 0){
2185  size_t rk_count;
2187  keyspace_, cfname, &rk_count));
2188  size_t ck_count;
2190  keyspace_, cfname, &ck_count));
2192  query.c_str(), rk_count, ck_count, consistency, out);
2193  } else {
2194  return false;
2195  }
2196 }
2197 
2198 bool CqlIfImpl::SelectFromTableSync(const std::string &cfname,
2199  CassConsistency consistency, GenDb::ColListVec *out) {
2201  return false;
2202  }
2203  std::string query(impl::CassSelectFromTable(cfname));
2204  size_t rk_count;
2206  keyspace_, cfname, &rk_count));
2207  if (IsTableStatic(cfname) == 1) {
2209  query.c_str(), rk_count, consistency, out);
2210  } else if (IsTableStatic(cfname) == 0){
2211  size_t ck_count;
2213  keyspace_, cfname, &ck_count));
2215  query.c_str(), rk_count, ck_count, consistency, out);
2216  } else {
2217  return false;
2218  }
2219 }
2220 
2221 
2223  const GenDb::DbDataValueVec &rkey,
2224  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2225  const GenDb::FieldNamesToReadVec &read_vec,
2226  GenDb::NewColVec *out) {
2228  return false;
2229  }
2230  std::string query(
2232  rkey, ck_range, read_vec));
2233  assert(IsTableDynamic(cfname));
2235  query.c_str(), read_vec, consistency, out);
2236 }
2237 
2239  const std::vector<GenDb::DbDataValueVec> &rkeys,
2240  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2241  const GenDb::FieldNamesToReadVec &read_vec,
2242  GenDb::ColListVec *out) {
2244  return false;
2245  }
2246  std::string query(
2248  rkeys, ck_range, read_vec));
2249  assert(IsTableDynamic(cfname));
2251  query.c_str(), read_vec, consistency, out);
2252 }
2253 
2254 
2256  const GenDb::DbDataValueVec &rkey,
2257  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
2258  GenDb::NewColVec *out) {
2260  return false;
2261  }
2262  std::string query(
2264  rkey, ck_range));
2265  assert(IsTableDynamic(cfname));
2266  size_t rk_count;
2268  keyspace_, cfname, &rk_count));
2269  size_t ck_count;
2271  keyspace_, cfname, &ck_count));
2273  query.c_str(), rk_count, ck_count, consistency, out);
2274 }
2275 
2276 void CqlIfImpl::SetRequestTimeout(uint32_t timeout_ms) {
2277  CQLIF_DEBUG_TRACE("request timeout set to " << timeout_ms);
2278  cci_->CassClusterSetRequestTimeout(cluster_.get(), timeout_ms);
2279 }
2280 
2282  /* If Connect is called multiple times due to DB failure,
2283  * then it is better to delete previous session and use
2284  * a new one to avoid gradual leak.
2285  */
2286  schema_session_.reset();
2287  impl::CassSessionPtr schema_session(cci_->CassSessionNew(), cci_);
2288  schema_session_.swap(schema_session);
2289 
2290  // First set the cluster whitelist filtering to just one node
2292  schema_contact_point_.c_str());
2293 
2295  cluster_.get()), cci_);
2296  bool success(impl::SyncFutureWait(cci_, future.get()));
2297  if (success) {
2299  CQLIF_INFO_TRACE("ConnectSchemaSync Done");
2300  } else {
2301  CQLIF_ERR_TRACE("ConnectSchemaSync FAILED");
2302  }
2303 
2305  return success;
2306 }
2307 
2309  /* If Connect is called multiple times due to DB failure,
2310  * then it is better to delete previous session and use
2311  * a new one to avoid gradual leak.
2312  */
2313  session_.reset();
2315  session_.swap(session);
2316 
2318  cluster_.get()), cci_);
2319  bool success(impl::SyncFutureWait(cci_, future.get()));
2320  if (success) {
2322  CQLIF_INFO_TRACE("ConnectSync Done");
2323  } else {
2324  CQLIF_ERR_TRACE("ConnectSync FAILED");
2325  }
2326  return success;
2327 }
2328 
2330  // Close all session and pending queries
2332  bool success(impl::SyncFutureWait(cci_, future.get()));
2333  if (success) {
2335  CQLIF_INFO_TRACE("DisconnectSync Done");
2336  } else {
2337  CQLIF_ERR_TRACE("DisconnectSync FAILED");
2338  }
2339  return success;
2340 }
2341 
2343  // Close the schema session
2345  cci_);
2346  bool success(impl::SyncFutureWait(cci_, future.get()));
2347  if (success) {
2349  CQLIF_INFO_TRACE("DisconnectSchemaSync Done");
2350  } else {
2351  CQLIF_ERR_TRACE("DisconnectSchemaSync FAILED");
2352  }
2353  return success;
2354 }
2355 
2356 bool CqlIfImpl::GetMetrics(Metrics *metrics) const {
2358  return false;
2359  }
2360  CassMetrics cass_metrics;
2361  cci_->CassSessionGetMetrics(session_.get(), &cass_metrics);
2362  // Requests
2363  metrics->requests.min = cass_metrics.requests.min;
2364  metrics->requests.max = cass_metrics.requests.max;
2365  metrics->requests.mean = cass_metrics.requests.mean;
2366  metrics->requests.stddev = cass_metrics.requests.stddev;
2367  metrics->requests.median = cass_metrics.requests.median;
2368  metrics->requests.percentile_75th =
2369  cass_metrics.requests.percentile_75th;
2370  metrics->requests.percentile_95th =
2371  cass_metrics.requests.percentile_95th;
2372  metrics->requests.percentile_98th =
2373  cass_metrics.requests.percentile_98th;
2374  metrics->requests.percentile_99th =
2375  cass_metrics.requests.percentile_99th;
2376  metrics->requests.percentile_999th =
2377  cass_metrics.requests.percentile_999th;
2378  metrics->requests.mean_rate = cass_metrics.requests.mean_rate;
2379  metrics->requests.one_minute_rate =
2380  cass_metrics.requests.one_minute_rate;
2381  metrics->requests.five_minute_rate =
2382  cass_metrics.requests.five_minute_rate;
2383  metrics->requests.fifteen_minute_rate =
2384  cass_metrics.requests.fifteen_minute_rate;
2385  // Stats
2386  metrics->stats.total_connections =
2387  cass_metrics.stats.total_connections;
2388  metrics->stats.available_connections =
2389  cass_metrics.stats.available_connections;
2390  metrics->stats.exceeded_pending_requests_water_mark =
2391  cass_metrics.stats.exceeded_pending_requests_water_mark;
2392  metrics->stats.exceeded_write_bytes_water_mark =
2393  cass_metrics.stats.exceeded_write_bytes_water_mark;
2394  // Errors
2395  metrics->errors.connection_timeouts =
2396  cass_metrics.errors.connection_timeouts;
2397  metrics->errors.pending_request_timeouts =
2398  cass_metrics.errors.pending_request_timeouts;
2399  metrics->errors.request_timeouts =
2400  cass_metrics.errors.request_timeouts;
2401  return true;
2402 }
2403 
2404 bool CqlIfImpl::InsertIntoTableInternal(std::auto_ptr<GenDb::ColList> v_columns,
2405  CassConsistency consistency, bool sync,
2408  return false;
2409  }
2410  std::string query;
2411  if (IsTableStatic(v_columns->cfname_) == 1) {
2412  query = impl::StaticCf2CassInsertIntoTable(v_columns.get());
2413  } else if (IsTableStatic(v_columns->cfname_) == 0){
2414  query = impl::DynamicCf2CassInsertIntoTable(v_columns.get());
2415  } else {
2416  return false;
2417  }
2418  if (sync) {
2419  return impl::ExecuteQuerySync(cci_, session_.get(), query.c_str(),
2420  consistency);
2421  } else {
2422  impl::ExecuteQueryAsync(cci_, session_.get(), query.c_str(),
2423  consistency, cb);
2424  return true;
2425  }
2426 }
2427 
2429  impl::CassPreparedPtr *prepared) {
2431  return false;
2432  }
2433  std::string query;
2434  switch (cf.cftype_) {
2436  {
2438  break;
2439  }
2441  {
2442  boost::system::error_code ec;
2444  if (ec.value() != boost::system::errc::success) {
2445  return false;
2446  }
2447  break;
2448  }
2449  default:
2450  {
2451  return false;
2452  }
2453  }
2454  return impl::PrepareSync(cci_, schema_session_.get(), query.c_str(),
2455  prepared);
2456 }
2457 
2459  std::auto_ptr<GenDb::ColList> v_columns,
2460  CassConsistency consistency, bool sync,
2463  return false;
2464  }
2465  impl::CassPreparedPtr prepared(NULL, cci_);
2466  bool success(GetPrepareInsertIntoTable(v_columns->cfname_, &prepared));
2467  if (!success) {
2468  CQLIF_ERR_TRACE("CassPrepared statement NOT found: " <<
2469  v_columns->cfname_);
2470  return false;
2471  }
2472  impl::CassStatementPtr qstatement(cci_->CassPreparedBind(prepared.get()),
2473  cci_);
2474  if (IsTableStatic(v_columns->cfname_) == 1) {
2475  success = impl::StaticCf2CassPrepareBind(cci_, qstatement.get(),
2476  v_columns.get());
2477  } else if (IsTableStatic(v_columns->cfname_) == 0){
2478  success = impl::DynamicCf2CassPrepareBind(cci_, qstatement.get(),
2479  v_columns.get());
2480  } else {
2481  return false;
2482  }
2483  if (!success) {
2484  return false;
2485  }
2486  if (sync) {
2488  qstatement.get(), consistency);
2489  } else {
2490  std::string qid("Prepare: " + v_columns->cfname_);
2491  impl::ExecuteQueryStatementAsync(cci_, session_.get(), qid.c_str(),
2492  qstatement.get(), consistency, cb);
2493  return true;
2494  }
2495 }
2496 
2498  "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH "
2499  "replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }");
2500 const char * CqlIfImpl::kQUseKeyspace("USE \"%s\"");
2501 const char * CqlIfImpl::kTaskName("CqlIfImpl::Task");
2502 
2503 //
2504 // CqlIf
2505 //
2507  const std::vector<std::string> &cassandra_ips,
2508  int cassandra_port,
2509  const std::string &cassandra_user,
2510  const std::string &cassandra_password,
2511  bool use_ssl,
2512  const std::string &ca_certs_path,
2513  bool create_schema) :
2514  cci_(new interface::CassDatastaxLibrary),
2515  impl_(new CqlIfImpl(evm, cassandra_ips, cassandra_port,
2516  cassandra_user, cassandra_password, use_ssl,
2517  ca_certs_path, cci_.get())),
2518  use_prepared_for_insert_(true),
2519  create_schema_(create_schema) {
2520  // Setup library logging
2521  cci_->CassLogSetLevel(impl::Log4Level2CassLogLevel(
2522  log4cplus::Logger::getRoot().getLogLevel()));
2523  cci_->CassLogSetCallback(impl::CassLibraryLog, NULL);
2524  initialized_ = false;
2525  BOOST_FOREACH(const std::string &cassandra_ip, cassandra_ips) {
2526  boost::system::error_code ec;
2527  boost::asio::ip::address cassandra_addr(
2528  AddressFromString(cassandra_ip, &ec));
2529  GenDb::Endpoint endpoint(cassandra_addr, cassandra_port);
2530  endpoints_.push_back(endpoint);
2531  }
2532 }
2533 
2534 CqlIf::CqlIf() : impl_(NULL) {
2535 }
2536 
2538 }
2539 
2540 // Init/Uninit
2542  if (create_schema_) {
2543  impl_->SetRequestTimeout(GenDb::g_gendb_constants.SCHEMA_REQUEST_TIMEOUT);
2544  bool success(impl_->ConnectSchemaSync());
2545  if (!success) {
2546  return success;
2547  }
2548  }
2549  return impl_->ConnectSync();
2550 }
2551 
2553  if (create_schema_) {
2554  impl_->DisconnectSchemaSync();
2555  }
2556  impl_->DisconnectSync();
2557 }
2558 
2559 void CqlIf::Db_SetInitDone(bool init_done) {
2560  initialized_ = init_done;
2561  // No need for schema session if initialization is done
2562  if (create_schema_) {
2563  if (initialized_) {
2564  impl_->SetRequestTimeout(GenDb::g_gendb_constants.DEFAULT_REQUEST_TIMEOUT);
2565  impl_->DisconnectSchemaSync();
2566  }
2567  }
2568 }
2569 
2570 // Tablespace
2571 bool CqlIf::Db_AddSetTablespace(const std::string &tablespace,
2572  const std::string &replication_factor) {
2573  bool success(impl_->CreateKeyspaceIfNotExistsSync(tablespace,
2574  replication_factor, CASS_CONSISTENCY_QUORUM));
2575  if (!success) {
2577  return success;
2578  }
2579  success = impl_->UseKeyspaceSyncOnSchemaSession(tablespace,
2580  CASS_CONSISTENCY_ONE);
2581  if (!success) {
2583  return success;
2584  }
2585  return success;
2586 }
2587 
2588 bool CqlIf::Db_SetTablespace(const std::string &tablespace) {
2589  bool success(impl_->UseKeyspaceSync(tablespace, CASS_CONSISTENCY_ONE));
2590  if (!success) {
2592  return success;
2593  }
2594  return success;
2595 }
2596 
2597 // Column family
2599  const std::string &compaction_strategy) {
2600  bool success(
2601  impl_->CreateTableIfNotExistsSync(cf, compaction_strategy,
2602  CASS_CONSISTENCY_QUORUM));
2603  if (!success) {
2606  return success;
2607  }
2608  // Locate (add if not exists) INSERT INTO prepare statement
2609  success = impl_->LocatePrepareInsertIntoTable(cf);
2610  if (!success) {
2613  return success;
2614  }
2616  return success;
2617 }
2618 
2620  // Check existence of table
2621  return Db_UseColumnfamily(cf.cfname_);
2622 }
2623 
2624 bool CqlIf::Db_UseColumnfamily(const std::string &cfname) {
2625  // Check existence of table
2626  bool success(impl_->IsTablePresent(cfname));
2627  if (!success) {
2630  return success;
2631  }
2632  IncrementTableReadStats(cfname);
2633  return success;
2634 }
2635 
2636 // Index
2637 bool CqlIf::Db_CreateIndex(const std::string &cfname,
2638  const std::string &column, const std::string &indexname,
2639  const GenDb::ColIndexMode::type index_mode) {
2640  bool success(impl_->CreateIndexIfNotExistsSync(cfname, column, indexname,
2641  CASS_CONSISTENCY_QUORUM, index_mode));
2642  if (!success) {
2645  return success;
2646  }
2647  return success;
2648 }
2649 
2650 // Column
2652  std::auto_ptr<GenDb::ColList> row,
2653  std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb) {
2654  if (drc == GenDb::DbOpResult::OK) {
2655  IncrementTableWriteStats(cfname);
2656  } else if (drc == GenDb::DbOpResult::BACK_PRESSURE) {
2659  } else {
2662  }
2663  if (!cb.empty()) {
2664  cb(drc);
2665  }
2666 }
2667 
2670  GenDb::DbOpResult::type drc, std::auto_ptr<GenDb::ColList> row) :
2671  cb_(cb),
2672  drc_(drc),
2673  row_(row) {
2674  }
2677  std::auto_ptr<GenDb::ColList> row_;
2678 };
2679 
2681  boost::shared_ptr<AsyncRowGetCallbackContext> cb_ctx) {
2682  cb_ctx->cb_(cb_ctx->drc_, cb_ctx->row_);
2683 }
2684 
2686  std::auto_ptr<GenDb::ColList> row, std::string cfname,
2687  GenDb::GenDbIf::DbGetRowCb cb, bool use_worker, int task_id,
2688  int task_instance) {
2689  if (drc == GenDb::DbOpResult::OK) {
2690  IncrementTableReadStats(cfname);
2691  } else if (drc == GenDb::DbOpResult::BACK_PRESSURE) {
2694  } else {
2697  }
2698  if (use_worker) {
2699  if (!cb.empty()) {
2700  boost::shared_ptr<AsyncRowGetCallbackContext> ctx(
2701  new AsyncRowGetCallbackContext(cb, drc, row));
2702  impl::WorkerTask *worker(new impl::WorkerTask(
2703  boost::bind(&AsyncRowGetCompletionCallback, ctx),
2704  task_id, task_instance));
2706  scheduler->Enqueue(worker);
2707  }
2708  } else {
2709  if (!cb.empty()) {
2710  cb(drc, row);
2711  }
2712  }
2713 }
2714 
2716  std::auto_ptr<GenDb::ColList> row, std::string cfname,
2718  OnAsyncRowGetCompletion(drc, row, cfname, cb, false, -1, -2);
2719 }
2720 bool CqlIf::Db_AddColumn(std::auto_ptr<GenDb::ColList> cl,
2721  GenDb::DbConsistency::type dconsistency,
2723  std::string cfname(cl->cfname_);
2724  if (!initialized_) {
2727  return false;
2728  }
2729  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2730  bool success;
2732  impl_->IsInsertIntoTablePrepareSupported(cfname)) {
2733  success = impl_->InsertIntoTablePrepareAsync(cl, consistency,
2734  boost::bind(&CqlIf::OnAsyncColumnAddCompletion, this, _1, _2, cfname,
2735  cb));
2736  } else {
2737  success = impl_->InsertIntoTableAsync(cl, consistency,
2738  boost::bind(&CqlIf::OnAsyncColumnAddCompletion, this, _1, _2, cfname,
2739  cb));
2740  }
2741  if (!success) {
2744  return success;
2745  }
2746  return success;
2747 }
2748 
2749 bool CqlIf::Db_AddColumnSync(std::auto_ptr<GenDb::ColList> cl,
2750  GenDb::DbConsistency::type dconsistency) {
2751  std::string cfname(cl->cfname_);
2752  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2753  bool success(impl_->InsertIntoTableSync(cl, consistency));
2754  if (!success) {
2757  return success;
2758  }
2759  IncrementTableWriteStats(cfname);
2760  return success;
2761 }
2762 
2763 // Read
2764 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2765  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2767  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2768  bool success(impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2769  crange, consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this,
2770  _1, _2, cfname, cb)));
2771  if (!success) {
2774  }
2775  return success;
2776 }
2777 
2778 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2779  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2780  GenDb::DbConsistency::type dconsistency, int task_id, int task_instance,
2782  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2783  bool success(impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2784  crange, consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this,
2785  _1, _2, cfname, cb, true, task_id, task_instance)));
2786  if (!success) {
2789  }
2790  return success;
2791 }
2792 
2793 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2794  const GenDb::DbDataValueVec &rowkey,
2795  GenDb::DbConsistency::type dconsistency,
2797  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2798  bool success(impl_->SelectFromTableAsync(cfname, rowkey,
2799  consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2,
2800  cfname, cb)));
2801  if (!success) {
2804  }
2805  return success;
2806 }
2807 
2808 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2809  const GenDb::DbDataValueVec &rowkey,
2810  GenDb::DbConsistency::type dconsistency, int task_id, int task_instance,
2812  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2813  bool success(impl_->SelectFromTableAsync(cfname, rowkey,
2814  consistency, boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2,
2815  cfname, cb, true, task_id, task_instance)));
2816  if (!success) {
2819  }
2820  return success;
2821 }
2822 
2823 bool CqlIf::Db_GetRowAsync(const std::string &cfname,
2824  const GenDb::DbDataValueVec &rowkey, const GenDb::ColumnNameRange &crange,
2825  const GenDb::WhereIndexInfoVec &where_vec,
2827  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2828  bool success(impl_->SelectFromTableClusteringKeyRangeAndIndexValueAsync(cfname,
2829  rowkey, crange, where_vec, GenDb::FieldNamesToReadVec(), consistency,
2830  boost::bind(&CqlIf::OnAsyncRowGetCompletion, this, _1, _2, cfname, cb)));
2831  if (!success) {
2834  }
2835  return success;
2836 }
2837 
2838 bool CqlIf::Db_GetRow(GenDb::ColList *out, const std::string &cfname,
2839  const GenDb::DbDataValueVec &rowkey,
2840  GenDb::DbConsistency::type dconsistency) {
2841  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2842  bool success(impl_->SelectFromTableSync(cfname, rowkey,
2843  consistency, &out->columns_));
2844  if (!success) {
2847  return success;
2848  }
2849  IncrementTableReadStats(cfname);
2850  return success;
2851 }
2852 
2853 bool CqlIf::Db_GetRow(GenDb::ColList *out, const std::string &cfname,
2854  const GenDb::DbDataValueVec &rowkey,
2855  GenDb::DbConsistency::type dconsistency,
2856  const GenDb::ColumnNameRange &crange,
2857  const GenDb::FieldNamesToReadVec &read_vec) {
2858  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2859  bool success(impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2860  rowkey, crange, consistency, read_vec, &out->columns_));
2861  if (!success) {
2864  return success;
2865  }
2866  IncrementTableReadStats(cfname);
2867  return success;
2868 }
2869 
2870 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2871  const std::vector<GenDb::DbDataValueVec> &v_rowkey) {
2872  BOOST_FOREACH(const GenDb::DbDataValueVec &rkey, v_rowkey) {
2873  std::auto_ptr<GenDb::ColList> v_columns(new GenDb::ColList);
2874  // Partition Key
2875  v_columns->rowkey_ = rkey;
2876  bool success(impl_->SelectFromTableSync(cfname, rkey,
2877  CASS_CONSISTENCY_ONE, &v_columns->columns_));
2878  if (!success) {
2879  CQLIF_ERR_TRACE("SELECT FROM Table: " << cfname << " Partition Key: "
2880  << GenDb::DbDataValueVecToString(rkey) << " FAILED");
2883  return false;
2884  }
2885  out->push_back(v_columns.release());
2886  }
2887  IncrementTableReadStats(cfname, v_rowkey.size());
2888  return true;
2889 }
2890 
2891 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2892  const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2893  const GenDb::ColumnNameRange &crange) {
2894  BOOST_FOREACH(const GenDb::DbDataValueVec &rkey, v_rowkey) {
2895  std::auto_ptr<GenDb::ColList> v_columns(new GenDb::ColList);
2896  // Partition Key
2897  v_columns->rowkey_ = rkey;
2898  bool success(impl_->SelectFromTableClusteringKeyRangeSync(cfname,
2899  rkey, crange, CASS_CONSISTENCY_ONE, &v_columns->columns_));
2900  if (!success) {
2901  CQLIF_ERR_TRACE("SELECT FROM Table: " << cfname << " Partition Key: "
2902  << GenDb::DbDataValueVecToString(rkey) <<
2903  " Clustering Key Range: " << crange.ToString() << " FAILED");
2906  return false;
2907  }
2908  out->push_back(v_columns.release());
2909  }
2910  IncrementTableReadStats(cfname, v_rowkey.size());
2911  return true;
2912 }
2913 
2914 bool CqlIf::Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname,
2915  const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2916  const GenDb::ColumnNameRange &crange,
2917  const GenDb::FieldNamesToReadVec &read_vec,
2918  GenDb::DbConsistency::type dconsistency) {
2919  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2920  bool success(impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2921  v_rowkey, crange, consistency, read_vec, out));
2922  if (!success) {
2925  return false;
2926  }
2927  IncrementTableReadStats(cfname, v_rowkey.size());
2928  return true;
2929 }
2930 
2931 bool CqlIf::Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname,
2932  GenDb::DbConsistency::type dconsistency) {
2933  CassConsistency consistency(impl::Db2CassConsistency(dconsistency));
2934  bool success(impl_->SelectFromTableSync(cfname, consistency, out));
2935  if (!success) {
2938  return success;
2939  }
2940  IncrementTableReadStats(cfname);
2941  return success;
2942 }
2943 
2944 // Queue
2945 bool CqlIf::Db_GetQueueStats(uint64_t *queue_count,
2946  uint64_t *enqueues) const {
2947  //return impl_->Db_GetQueueStats(queue_count, enqueues);
2948  return true;
2949 }
2950 
2951 void CqlIf::Db_SetQueueWaterMark(bool high, size_t queue_count,
2953  //impl_->Db_SetQueueWaterMark(high, queue_count, cb);
2954 }
2955 
2957  //impl_->Db_ResetQueueWaterMarks();
2958 }
2959 
2960 // Stats
2961 bool CqlIf::Db_GetStats(std::vector<GenDb::DbTableInfo> *vdbti,
2962  GenDb::DbErrors *dbe) {
2963  tbb::mutex::scoped_lock lock(stats_mutex_);
2964  stats_.GetDiffs(vdbti, dbe);
2965  return true;
2966 }
2967 
2968 bool CqlIf::Db_GetCumulativeStats(std::vector<GenDb::DbTableInfo> *vdbti,
2969  GenDb::DbErrors *dbe) const {
2970  tbb::mutex::scoped_lock lock(stats_mutex_);
2971  stats_.GetCumulative(vdbti, dbe);
2972  return true;
2973 }
2974 
2975 bool CqlIf::Db_GetCqlMetrics(Metrics *metrics) const {
2976  return impl_->GetMetrics(metrics);
2977 }
2978 
2979 bool CqlIf::Db_GetCqlStats(DbStats *db_stats) const {
2980  Metrics metrics;
2981  bool success(impl_->GetMetrics(&metrics));
2982  if (!success) {
2983  return success;
2984  }
2985  db_stats->requests_one_minute_rate = metrics.requests.one_minute_rate;
2986  db_stats->stats = metrics.stats;
2987  db_stats->errors = metrics.errors;
2988  return success;
2989 }
2990 
2991 void CqlIf::IncrementTableWriteStats(const std::string &table_name) {
2992  tbb::mutex::scoped_lock lock(stats_mutex_);
2993  stats_.IncrementTableWrite(table_name);
2994 }
2995 
2996 void CqlIf::IncrementTableWriteStats(const std::string &table_name,
2997  uint64_t num_writes) {
2998  tbb::mutex::scoped_lock lock(stats_mutex_);
2999  stats_.IncrementTableWrite(table_name, num_writes);
3000 }
3001 
3002 void CqlIf::IncrementTableWriteFailStats(const std::string &table_name) {
3003  tbb::mutex::scoped_lock lock(stats_mutex_);
3004  stats_.IncrementTableWriteFail(table_name);
3005 }
3006 
3007 void CqlIf::IncrementTableWriteFailStats(const std::string &table_name,
3008  uint64_t num_writes) {
3009  tbb::mutex::scoped_lock lock(stats_mutex_);
3010  stats_.IncrementTableWriteFail(table_name, num_writes);
3011 }
3012 
3014  const std::string &table_name) {
3015  tbb::mutex::scoped_lock lock(stats_mutex_);
3017 }
3018 
3020  const std::string &table_name) {
3021  tbb::mutex::scoped_lock lock(stats_mutex_);
3023 }
3024 
3025 void CqlIf::IncrementTableReadStats(const std::string &table_name) {
3026  tbb::mutex::scoped_lock lock(stats_mutex_);
3027  stats_.IncrementTableRead(table_name);
3028 }
3029 
3030 void CqlIf::IncrementTableReadStats(const std::string &table_name,
3031  uint64_t num_reads) {
3032  tbb::mutex::scoped_lock lock(stats_mutex_);
3033  stats_.IncrementTableRead(table_name, num_reads);
3034 }
3035 
3036 void CqlIf::IncrementTableReadFailStats(const std::string &table_name) {
3037  tbb::mutex::scoped_lock lock(stats_mutex_);
3038  stats_.IncrementTableReadFail(table_name);
3039 }
3040 
3041 void CqlIf::IncrementTableReadFailStats(const std::string &table_name,
3042  uint64_t num_reads) {
3043  tbb::mutex::scoped_lock lock(stats_mutex_);
3044  stats_.IncrementTableReadFail(table_name, num_reads);
3045 }
3046 
3048  tbb::mutex::scoped_lock lock(stats_mutex_);
3049  stats_.IncrementErrors(err_type);
3050 }
3051 
3052 // Connection
3053 std::vector<GenDb::Endpoint> CqlIf::Db_GetEndpoints() const {
3054  return endpoints_;
3055 }
3056 
3057 namespace interface {
3058 
3059 //
3060 // CassDatastaxLibrary
3061 //
3063 }
3064 
3066 }
3067 
3068 // CassCluster
3070  return cass_cluster_new();
3071 }
3072 
3073 void CassDatastaxLibrary::CassClusterFree(CassCluster* cluster) {
3074  cass_cluster_free(cluster);
3075 }
3076 
3078  CassCluster* cluster, const char* contact_points) {
3079  return cass_cluster_set_contact_points(cluster, contact_points);
3080 }
3081 
3082 CassError CassDatastaxLibrary::CassClusterSetPort(CassCluster* cluster,
3083  int port) {
3084  return cass_cluster_set_port(cluster, port);
3085 }
3086 
3087 void CassDatastaxLibrary::CassClusterSetSsl(CassCluster* cluster, CassSsl* ssl) {
3088  cass_cluster_set_ssl(cluster, ssl);
3089 }
3090 
3092  const char* username, const char* password) {
3093  cass_cluster_set_credentials(cluster, username, password);
3094 }
3095 
3097  unsigned num_threads) {
3098  return cass_cluster_set_num_threads_io(cluster, num_threads);
3099 }
3100 
3102  CassCluster* cluster, unsigned num_requests) {
3103  return cass_cluster_set_pending_requests_high_water_mark(cluster,
3104  num_requests);
3105 }
3106 
3108  CassCluster* cluster, unsigned num_requests) {
3109  return cass_cluster_set_pending_requests_low_water_mark(cluster,
3110  num_requests);
3111 }
3112 
3114  CassCluster* cluster, unsigned num_bytes) {
3115  return cass_cluster_set_write_bytes_high_water_mark(cluster, num_bytes);
3116 }
3117 
3119  CassCluster* cluster, unsigned num_bytes) {
3120  return cass_cluster_set_write_bytes_low_water_mark(cluster, num_bytes);
3121 }
3122 
3124  CassCluster* cluster, const char* hosts) {
3125  cass_cluster_set_whitelist_filtering(cluster, hosts);
3126 }
3127 
3128 // CassSsl
3130  return cass_ssl_new();
3131 }
3132 
3134  return cass_ssl_free(ssl);
3135 }
3136 
3138  const std::string &cert) {
3139  return cass_ssl_add_trusted_cert_n(ssl, cert.c_str(), cert.length());
3140 }
3141 
3142 void CassDatastaxLibrary::CassSslSetVerifyFlags(CassSsl* ssl, int flags) {
3143  cass_ssl_set_verify_flags(ssl, flags);
3144 }
3145 
3146 // CassSession
3148  return cass_session_new();
3149 }
3150 
3151 void CassDatastaxLibrary::CassSessionFree(CassSession* session) {
3152  cass_session_free(session);
3153 }
3154 
3156  unsigned timeout_ms) {
3157  return cass_cluster_set_request_timeout(cluster, timeout_ms);
3158 }
3159 
3160 CassFuture* CassDatastaxLibrary::CassSessionConnect(CassSession* session,
3161  const CassCluster* cluster) {
3162  return cass_session_connect(session, cluster);
3163 }
3164 
3165 CassFuture* CassDatastaxLibrary::CassSessionClose(CassSession* session) {
3166  return cass_session_close(session);
3167 }
3168 
3169 CassFuture* CassDatastaxLibrary::CassSessionExecute(CassSession* session,
3170  const CassStatement* statement) {
3171  return cass_session_execute(session, statement);
3172 }
3173 
3175  const CassSession* session) {
3176  return cass_session_get_schema_meta(session);
3177 }
3178 
3179 CassFuture* CassDatastaxLibrary::CassSessionPrepare(CassSession* session,
3180  const char* query) {
3181  return cass_session_prepare(session, query);
3182 }
3183 
3184 void CassDatastaxLibrary::CassSessionGetMetrics(const CassSession* session,
3185  CassMetrics* output) {
3186  cass_session_get_metrics(session, output);
3187 }
3188 
3189 // CassSchema
3191  const CassSchemaMeta* schema_meta) {
3192  cass_schema_meta_free(schema_meta);
3193 }
3194 
3196  const CassSchemaMeta* schema_meta, const char* keyspace) {
3197  return cass_schema_meta_keyspace_by_name(schema_meta, keyspace);
3198 }
3199 
3201  const CassKeyspaceMeta* keyspace_meta, const char* table) {
3202  return cass_keyspace_meta_table_by_name(keyspace_meta, table);
3203 }
3204 
3206  const CassTableMeta* table_meta) {
3207  return cass_table_meta_partition_key_count(table_meta);
3208 }
3209 
3211  const CassTableMeta* table_meta) {
3212  return cass_table_meta_clustering_key_count(table_meta);
3213 }
3214 
3215 // CassFuture
3216 void CassDatastaxLibrary::CassFutureFree(CassFuture* future) {
3217  cass_future_free(future);
3218 }
3219 
3220 CassError CassDatastaxLibrary::CassFutureSetCallback(CassFuture* future,
3221  CassFutureCallback callback, void* data) {
3222  return cass_future_set_callback(future, callback, data);
3223 }
3224 
3225 void CassDatastaxLibrary::CassFutureWait(CassFuture* future) {
3226  cass_future_wait(future);
3227 }
3228 
3230  CassFuture* future) {
3231  return cass_future_get_result(future);
3232 }
3233 
3235  const char** message, size_t* message_length) {
3236  cass_future_error_message(future, message, message_length);
3237 }
3238 
3239 CassError CassDatastaxLibrary::CassFutureErrorCode(CassFuture* future) {
3240  return cass_future_error_code(future);
3241 }
3242 
3244  CassFuture* future) {
3245  return cass_future_get_prepared(future);
3246 }
3247 
3248 // CassResult
3249 void CassDatastaxLibrary::CassResultFree(const CassResult* result) {
3250  cass_result_free(result);
3251 }
3252 
3253 size_t CassDatastaxLibrary::CassResultColumnCount(const CassResult* result) {
3254  return cass_result_column_count(result);
3255 }
3256 
3257 CassError CassDatastaxLibrary::CassResultColumnName(const CassResult *result,
3258  size_t index, const char** name, size_t* name_length) {
3259  return cass_result_column_name(result, index, name, name_length);
3260 }
3261 
3262 // CassIterator
3263 void CassDatastaxLibrary::CassIteratorFree(CassIterator* iterator) {
3264  cass_iterator_free(iterator);
3265 }
3266 
3268  const CassResult* result) {
3269  return cass_iterator_from_result(result);
3270 }
3271 
3272 cass_bool_t CassDatastaxLibrary::CassIteratorNext(CassIterator* iterator) {
3273  return cass_iterator_next(iterator);
3274 }
3275 
3277  const CassIterator* iterator) {
3278  return cass_iterator_get_row(iterator);
3279 }
3280 
3281 // CassStatement
3282 CassStatement* CassDatastaxLibrary::CassStatementNew(const char* query,
3283  size_t parameter_count) {
3284  return cass_statement_new(query, parameter_count);
3285 }
3286 
3287 void CassDatastaxLibrary::CassStatementFree(CassStatement* statement) {
3288  cass_statement_free(statement);
3289 }
3290 
3292  CassStatement* statement, CassConsistency consistency) {
3293  return cass_statement_set_consistency(statement, consistency);
3294 }
3295 
3297  CassStatement* statement,
3298  size_t index, const char* value, size_t value_length) {
3299  return cass_statement_bind_string_n(statement, index, value, value_length);
3300 }
3301 
3302 CassError CassDatastaxLibrary::CassStatementBindInt32(CassStatement* statement,
3303  size_t index, cass_int32_t value) {
3304  return cass_statement_bind_int32(statement, index, value);
3305 }
3306 
3307 CassError CassDatastaxLibrary::CassStatementBindInt64(CassStatement* statement,
3308  size_t index, cass_int64_t value) {
3309  return cass_statement_bind_int64(statement, index, value);
3310 }
3311 
3312 CassError CassDatastaxLibrary::CassStatementBindUuid(CassStatement* statement,
3313  size_t index, CassUuid value) {
3314  return cass_statement_bind_uuid(statement, index, value);
3315 }
3316 
3318  CassStatement* statement, size_t index, cass_double_t value) {
3319  return cass_statement_bind_double(statement, index, value);
3320 }
3321 
3322 CassError CassDatastaxLibrary::CassStatementBindInet(CassStatement* statement,
3323  size_t index, CassInet value) {
3324  return cass_statement_bind_inet(statement, index, value);
3325 }
3326 
3328  CassStatement* statement,
3329  size_t index, const cass_byte_t* value, size_t value_length) {
3330  return cass_statement_bind_bytes(statement, index, value, value_length);
3331 }
3332 
3334  CassStatement* statement,
3335  const char* name, size_t name_length, const char* value,
3336  size_t value_length) {
3337  return cass_statement_bind_string_by_name_n(statement, name, name_length,
3338  value, value_length);
3339 }
3340 
3342  CassStatement* statement, const char* name, cass_int32_t value) {
3343  return cass_statement_bind_int32_by_name(statement, name, value);
3344 }
3345 
3347  CassStatement* statement, const char* name, cass_int64_t value) {
3348  return cass_statement_bind_int64_by_name(statement, name, value);
3349 }
3350 
3352  CassStatement* statement, const char* name, CassUuid value) {
3353  return cass_statement_bind_uuid_by_name(statement, name, value);
3354 }
3355 
3357  CassStatement* statement, const char* name, cass_double_t value) {
3358  return cass_statement_bind_double_by_name(statement, name, value);
3359 }
3360 
3362  CassStatement* statement, const char* name, CassInet value) {
3363  return cass_statement_bind_inet_by_name(statement, name, value);
3364 }
3365 
3367  CassStatement* statement,
3368  const char* name, size_t name_length, const cass_byte_t* value,
3369  size_t value_length) {
3370  return cass_statement_bind_bytes_by_name_n(statement, name, name_length,
3371  value, value_length);
3372 }
3373 
3374 // CassPrepare
3375 void CassDatastaxLibrary::CassPreparedFree(const CassPrepared* prepared) {
3376  cass_prepared_free(prepared);
3377 }
3378 
3380  const CassPrepared* prepared) {
3381  return cass_prepared_bind(prepared);
3382 }
3383 
3384 // CassValue
3385 CassValueType CassDatastaxLibrary::GetCassValueType(const CassValue* value) {
3386  return cass_value_type(value);
3387 }
3388 
3389 CassError CassDatastaxLibrary::CassValueGetString(const CassValue* value,
3390  const char** output, size_t* output_size) {
3391  return cass_value_get_string(value, output, output_size);
3392 }
3393 
3394 CassError CassDatastaxLibrary::CassValueGetInt8(const CassValue* value,
3395  cass_int8_t* output) {
3396  return cass_value_get_int8(value, output);
3397 }
3398 
3399 CassError CassDatastaxLibrary::CassValueGetInt16(const CassValue* value,
3400  cass_int16_t* output) {
3401  return cass_value_get_int16(value, output);
3402 }
3403 
3404 CassError CassDatastaxLibrary::CassValueGetInt32(const CassValue* value,
3405  cass_int32_t* output) {
3406  return cass_value_get_int32(value, output);
3407 }
3408 
3409 CassError CassDatastaxLibrary::CassValueGetInt64(const CassValue* value,
3410  cass_int64_t* output) {
3411  return cass_value_get_int64(value, output);
3412 }
3413 
3414 CassError CassDatastaxLibrary::CassValueGetUuid(const CassValue* value,
3415  CassUuid* output) {
3416  return cass_value_get_uuid(value, output);
3417 }
3418 
3419 CassError CassDatastaxLibrary::CassValueGetDouble(const CassValue* value,
3420  cass_double_t* output) {
3421  return cass_value_get_double(value, output);
3422 }
3423 
3424 CassError CassDatastaxLibrary::CassValueGetInet(const CassValue* value,
3425  CassInet* output) {
3426  return cass_value_get_inet(value, output);
3427 }
3428 
3429 CassError CassDatastaxLibrary::CassValueGetBytes(const CassValue* value,
3430  const cass_byte_t** output, size_t* output_size) {
3431  return cass_value_get_bytes(value, output, output_size);
3432 }
3433 
3434 cass_bool_t CassDatastaxLibrary::CassValueIsNull(const CassValue* value) {
3435  return cass_value_is_null(value);
3436 }
3437 
3438 // CassInet
3440  const cass_uint8_t* address) {
3441  return cass_inet_init_v4(address);
3442 }
3443 
3445  const cass_uint8_t* address) {
3446  return cass_inet_init_v6(address);
3447 }
3448 
3449 // CassRow
3450 const CassValue* CassDatastaxLibrary::CassRowGetColumn(const CassRow* row,
3451  size_t index) {
3452  return cass_row_get_column(row, index);
3453 }
3454 
3455 // CassLog
3456 void CassDatastaxLibrary::CassLogSetLevel(CassLogLevel log_level) {
3457  cass_log_set_level(log_level);
3458 }
3459 
3460 void CassDatastaxLibrary::CassLogSetCallback(CassLogCallback callback,
3461  void* data) {
3462  cass_log_set_callback(callback, data);
3463 }
3464 
3465 } // namespace interface
3466 } // namespace cql
3467 } // namespace cass
void IncrementTableReadFail(const std::string &table_name)
virtual CassSession * CassSessionNew()
Definition: cql_if.cc:3147
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)
Definition: cql_if.cc:3220
WorkerTask(FunctionPtr func, int task_id, int task_instance)
Definition: cql_if.cc:1826
void IncrementTableWriteStats(const std::string &table_name)
Definition: cql_if.cc:2991
std::string Description() const
Definition: cql_if.cc:1834
static void OnExecuteQueryAsync(CassFuture *future, void *data)
Definition: cql_if.cc:1478
virtual void CassFutureFree(CassFuture *future)
Definition: cql_if.cc:3216
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)=0
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
Definition: cql_if.cc:1830
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:2019
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)=0
static const char * DbDataType2CassType(const GenDb::DbDataType::type &db_type)
Definition: cql_if.cc:164
boost::scoped_ptr< CqlIfImpl > impl_
Definition: cql_if.h:151
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)
Definition: cql_if.cc:3205
std::vector< GenDb::DbDataType::type > DbDataTypeVec
Definition: gendb_if.h:101
tbb::atomic< bool > initialized_
Definition: cql_if.h:152
virtual void CassStatementFree(CassStatement *statement)
Definition: cql_if.cc:3287
virtual const CassResult * CassFutureGetResult(CassFuture *future)=0
void SetRequestTimeout(uint32_t timeout_ms)
Definition: cql_if.cc:2276
virtual void Db_Uninit()
Definition: cql_if.cc:2552
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)=0
static GenDb::DbDataValue CassValue2DbDataValue(interface::CassLibrary *cci, const CassValue *cvalue)
Definition: cql_if.cc:1044
tbb::mutex stats_mutex_
Definition: cql_if.h:154
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)=0
CassQueryPrinter(std::ostream &os)
Definition: cql_if.cc:239
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2159
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)
Definition: cql_if.cc:3169
void operator()(const boost::uuids::uuid &tuuid) const
Definition: cql_if.cc:247
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)
Definition: cql_if.cc:3291
virtual ~CqlIf()
Definition: cql_if.cc:2537
virtual CassValueType GetCassValueType(const CassValue *value)=0
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)
Definition: cql_if.cc:3123
tbb::atomic< SessionState::type > session_state_
Definition: cql_if_impl.h:343
void OnAsyncRowGetCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbGetRowCb cb)
Definition: cql_if.cc:2715
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
Definition: gendb_if.h:238
virtual bool Db_GetCumulativeStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
Definition: cql_if.cc:2968
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
Definition: task.h:178
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)
Definition: cql_if.cc:3155
CassSharedPtr< CassSsl > CassSslPtr
Definition: cql_if_impl.h:174
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)=0
std::auto_ptr< GenDb::ColList > row_
Definition: cql_if.cc:2677
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)
Definition: cql_if.cc:3409
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1630
static void encode_uuid(char *output, const CassUuid &uuid)
Definition: cql_if.cc:118
void operator()(const IpAddress &tipaddr) const
Definition: cql_if.cc:270
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
Definition: cql_if.cc:741
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)=0
void operator()(const uint16_t &tu16, size_t index) const
Definition: cql_if.cc:305
virtual void CassSessionFree(CassSession *session)
Definition: cql_if.cc:3151
virtual bool Db_Init()
Definition: cql_if.cc:2541
virtual std::vector< GenDb::Endpoint > Db_GetEndpoints() const
Definition: cql_if.cc:3053
static bool PrepareSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassPreparedPtr *prepared)
Definition: cql_if.cc:1135
void operator()(const boost::blank &tblank, size_t index) const
Definition: cql_if.cc:287
void IncrementTableReadBackPressureFailStats(const std::string &table_name)
Definition: cql_if.cc:3019
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)=0
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)
Definition: cql_if.cc:3113
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)=0
virtual const CassResult * CassFutureGetResult(CassFuture *future)
Definition: cql_if.cc:3229
static void ExecuteQueryAsyncInternal(interface::CassLibrary *cci, CassSession *session, const char *qid, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx=NULL)
Definition: cql_if.cc:1514
virtual void CassFutureWait(CassFuture *future)=0
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)
Definition: cql_if.cc:3450
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)=0
boost::asio::ip::tcp::endpoint Endpoint
Definition: gendb_if.h:240
static bool ExecuteQuerySync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency)
Definition: cql_if.cc:1178
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)=0
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)
Definition: cql_if.cc:3419
DbDataValueVec rowkey_
Definition: gendb_if.h:198
void IncrementTableWriteFail(const std::string &table_name)
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)
Definition: cql_if.cc:3087
void operator()(const std::string &tstring) const
Definition: cql_if.cc:255
static void AsyncRowGetCompletionCallback(boost::shared_ptr< AsyncRowGetCallbackContext > cb_ctx)
Definition: cql_if.cc:2680
CassSharedPtr< const CassPrepared > CassPreparedPtr
Definition: cql_if_impl.h:180
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)
Definition: cql_if.cc:3282
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)=0
virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf)
Definition: cql_if.cc:2619
boost::asio::ip::address IpAddress
Definition: address.h:13
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)=0
ColumnMap cfcolumns_
Definition: gendb_if.h:140
DbDataTypeVec partition_keys_
Definition: gendb_if.h:139
void operator()(const IpAddress &tipaddr, size_t index) const
Definition: cql_if.cc:326
#define CQLIF_DEBUG_TRACE(_Msg)
Definition: cql_if.cc:43
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)
Definition: cql_if.cc:3414
interface::CassLibrary * cci_
Definition: cql_if.cc:415
static const char * kTaskName
Definition: cql_if_impl.h:326
bool IsTableDynamic(const std::string &table)
Definition: cql_if.cc:2145
virtual CassIterator * CassIteratorFromResult(const CassResult *result)
Definition: cql_if.cc:3267
void operator()(const uint8_t &tu8, size_t index) const
Definition: cql_if.cc:301
virtual void CassSslFree(CassSsl *ssl)
Definition: cql_if.cc:3133
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1563
const uint8_t * data() const
Definition: gendb_if.h:60
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)=0
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2099
static bool StaticCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
Definition: cql_if.cc:1619
static CassConsistency Db2CassConsistency(GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:201
void operator()(const uint32_t &tu32) const
Definition: cql_if.cc:263
static bool DynamicCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
Definition: cql_if.cc:1551
boost::asio::io_context * io_service()
Definition: event_manager.h:42
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)
Definition: cql_if.cc:3118
#define CQLIF_ERR
Definition: cql_if.cc:34
virtual bool Db_CreateIndex(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode=GenDb::ColIndexMode::NONE)
Definition: cql_if.cc:2637
tbb::mutex map_mutex_
Definition: cql_if_impl.h:351
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)
Definition: cql_if.cc:3082
impl::CassClusterPtr cluster_
Definition: cql_if_impl.h:339
void operator()(const std::string &tstring, size_t index) const
Definition: cql_if.cc:290
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:659
boost::uuids::uuid uuid
boost::function< void(void)> FunctionPtr
Definition: cql_if.cc:1825
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)
Definition: cql_if.cc:3399
bool create_schema_
Definition: cql_if.h:157
SandeshTraceBufferPtr CqlTraceErrBuf(SandeshTraceBufferCreate(CQLIF_ERR, 20000))
CassString(const char *data, size_t length)
Definition: cql_if.cc:108
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)=0
virtual ~CqlIfImpl()
Definition: cql_if.cc:1904
void IncrementErrors(GenDb::IfErrors::Type err_type)
Definition: cql_if.cc:3047
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)=0
virtual CassFuture * CassSessionClose(CassSession *session)=0
interface::CassLibrary * cci_
Definition: cql_if.cc:344
static const char * kQCreateKeyspaceIfNotExists
Definition: cql_if_impl.h:324
impl::CassSessionPtr session_
Definition: cql_if_impl.h:341
std::string ToString() const
Definition: gendb_if.cc:60
void operator()(const uint8_t &tu8) const
Definition: cql_if.cc:252
bool use_prepared_for_insert_
Definition: cql_if.h:156
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)
Definition: cql_if.cc:3302
virtual bool Db_GetCqlMetrics(Metrics *metrics) const
Definition: cql_if.cc:2975
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)=0
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
Definition: sandesh_trace.h:18
void operator()(const uint64_t &tu64) const
Definition: cql_if.cc:267
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)
Definition: cql_if.cc:3174
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
Definition: cql_if.cc:1010
void operator()(const double &tdouble, size_t index) const
Definition: cql_if.cc:321
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)=0
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)=0
std::string cfname_
Definition: gendb_if.h:197
static std::string DbDataTypes2CassTypes(const GenDb::DbDataTypeVec &v_db_types)
Definition: cql_if.cc:195
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)
Definition: cql_if.cc:3101
virtual void CassClusterFree(CassCluster *cluster)
Definition: cql_if.cc:3073
impl::CassSslPtr ssl_
Definition: cql_if_impl.h:340
static void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1215
virtual void CassFutureWait(CassFuture *future)
Definition: cql_if.cc:3225
void operator()(const std::string &tstring, const char *name) const
Definition: cql_if.cc:358
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2074
boost::ptr_vector< NewCol > NewColVec
Definition: gendb_if.h:186
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)=0
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)=0
virtual bool Db_SetTablespace(const std::string &tablespace)
Definition: cql_if.cc:2588
bool IsEmpty() const
Definition: gendb_if.h:217
GenDb::DbOpResult::type drc_
Definition: cql_if.cc:2676
virtual bool Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2931
virtual bool Db_GetStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
Definition: cql_if.cc:2961
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:999
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)=0
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)=0
DbDataValueVec start_
Definition: gendb_if.h:225
CassPreparedMapType insert_prepared_map_
Definition: cql_if_impl.h:350
void IncrementTableWriteBackPressureFailStats(const std::string &table_name)
Definition: cql_if.cc:3013
void IncrementErrors(IfErrors::Type type)
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)
Definition: cql_if.cc:3107
static std::string ToString(Op::type op)
Definition: gendb_if.cc:81
void operator()(const T &t) const
Definition: cql_if.cc:244
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)=0
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2173
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)=0
virtual void CassPreparedFree(const CassPrepared *prepared)
Definition: cql_if.cc:3375
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:2007
uint8_t type
Definition: load_balance.h:109
void operator()(const boost::uuids::uuid &tuuid, const char *name) const
Definition: cql_if.cc:363
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
Definition: cql_if.cc:2149
CassStatementNameBinder(interface::CassLibrary *cci, CassStatement *statement)
Definition: cql_if.cc:350
static void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1403
static const std::string integerToString(const NumberType &num)
Definition: string_util.h:19
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)
Definition: cql_if.cc:3096
static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel)
Definition: cql_if.cc:1755
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)
Definition: cql_if.cc:3272
static void ExecuteQueryStatementAsync(interface::CassLibrary *cci, CassSession *session, const char *query_id, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb)
Definition: cql_if.cc:1536
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)=0
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)=0
virtual CassSession * CassSessionNew()=0
static TaskScheduler * GetInstance()
Definition: task.cc:547
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
Definition: task.cc:636
static std::string DbColIndexMode2String(const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:562
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)
Definition: cql_if.cc:3424
std::vector< DbDataValue > DbDataValueVec
Definition: gendb_if.h:100
virtual bool Db_AddSetTablespace(const std::string &tablespace, const std::string &replication_factor="1")
Definition: cql_if.cc:2571
DbDataTypeVec clustering_columns_
Definition: gendb_if.h:141
void operator()(const uint32_t &tu32, const char *name) const
Definition: cql_if.cc:380
boost::asio::ip::address_v6 Ip6Address
Definition: address.h:15
void OnAsyncColumnAddCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb)
Definition: cql_if.cc:2651
static const char * kQUseKeyspace
Definition: cql_if_impl.h:325
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)
Definition: cql_if.cc:3379
CassStatementIndexBinder(interface::CassLibrary *cci, CassStatement *statement)
Definition: cql_if.cc:282
static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc)
Definition: cql_if.cc:1202
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)
Definition: cql_if.cc:3429
virtual CassInet CassInetInitV6(const cass_uint8_t *address)
Definition: cql_if.cc:3444
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)
Definition: cql_if.cc:3184
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
Definition: cql_if.cc:2222
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassError CassStatementBindBytes(CassStatement *statement, size_t index, const cass_byte_t *value, size_t value_length)
Definition: cql_if.cc:3327
virtual CassError CassStatementBindInt32ByName(CassStatement *statement, const char *name, cass_int32_t value)
Definition: cql_if.cc:3341
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)=0
virtual size_t CassResultColumnCount(const CassResult *result)=0
virtual CassError CassStatementBindUuidByName(CassStatement *statement, const char *name, CassUuid value)
Definition: cql_if.cc:3351
std::string DbDataValueVecToString(const GenDb::DbDataValueVec &v_db_value)
Definition: gendb_if.cc:102
virtual void Db_SetQueueWaterMark(bool high, size_t queue_count, DbQueueWaterMarkCb cb)
Definition: cql_if.cc:2951
void IncrementTableReadBackPressureFail(const std::string &table_name)
virtual CassValueType GetCassValueType(const CassValue *value)
Definition: cql_if.cc:3385
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:606
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)
Definition: cql_if.cc:3091
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)=0
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)=0
void IncrementTableRead(const std::string &table_name)
static bool SyncFutureWait(interface::CassLibrary *cci, CassFuture *future)
Definition: cql_if.cc:1656
boost::variant< boost::blank, std::string, uint64_t, uint32_t, boost::uuids::uuid, uint8_t, uint16_t, double, IpAddress, Blob > DbDataValue
Definition: gendb_if.h:85
boost::scoped_ptr< DbDataValueVec > value
Definition: gendb_if.h:181
boost::function< void(DbOpResult::type, std::auto_ptr< ColList >)> DbGetRowCb
Definition: gendb_if.h:256
GenDb::GenDbIf::DbGetRowCb cb_
Definition: cql_if.cc:2675
static bool GetCassTablePartitionKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *rk_count)
Definition: cql_if.cc:1735
static bool GetCassTableClusteringKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *ck_count)
Definition: cql_if.cc:1714
std::map< std::string, GenDb::DbDataType::type > ColumnMap
Definition: gendb_if.h:113
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)=0
virtual void Db_ResetQueueWaterMarks()
Definition: cql_if.cc:2956
bool StaticCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
Definition: cql_if.cc:814
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:430
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:580
virtual void CassResultFree(const CassResult *result)
Definition: cql_if.cc:3249
static void ExecuteQueryAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb)
Definition: cql_if.cc:1527
interface::CassLibrary * cci_
Definition: cql_if_impl.h:338
CassString(const char *data)
Definition: cql_if.cc:103
virtual bool Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname, const std::vector< GenDb::DbDataValueVec > &v_rowkey)
Definition: cql_if.cc:2870
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)
Definition: cql_if.cc:3276
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
Definition: cql_if.cc:1911
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)=0
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1019
boost::asio::ip::address_v4 Ip4Address
Definition: address.h:14
std::vector< WhereIndexInfo > WhereIndexInfoVec
Definition: gendb_if.h:233
void operator()(const boost::blank &tblank, const char *name) const
Definition: cql_if.cc:355
CassQueryPrinter(std::ostream &os, bool quote_strings)
Definition: cql_if.cc:235
NewColVec columns_
Definition: gendb_if.h:199
SandeshTraceBufferPtr CqlTraceDebugBuf(SandeshTraceBufferCreate(CQLIF_DEBUG, 10000))
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2458
static std::string CassSelectFromTableInternal(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec, const GenDb::WhereIndexInfoVec &where_vec)
Definition: cql_if.cc:889
bool GetMetrics(Metrics *metrics) const
Definition: cql_if.cc:2356
virtual bool Db_GetCqlStats(DbStats *db_stats) const
Definition: cql_if.cc:2979
virtual CassError CassFutureErrorCode(CassFuture *future)=0
virtual CassInet CassInetInitV4(const cass_uint8_t *address)
Definition: cql_if.cc:3439
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)=0
virtual CassCluster * CassClusterNew()
Definition: cql_if.cc:3069
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)=0
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
Definition: cql_if_impl.h:184
void operator()(const uint8_t &tu8, const char *name) const
Definition: cql_if.cc:370
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
GenDb::GenDbIfStats stats_
Definition: cql_if.h:155
static bool ExecuteQueryStatementSync(interface::CassLibrary *cci, CassSession *session, CassStatement *statement, CassConsistency consistency)
Definition: cql_if.cc:1195
virtual cass_bool_t CassValueIsNull(const CassValue *value)=0
virtual CassError CassFutureErrorCode(CassFuture *future)
Definition: cql_if.cc:3239
static void CassLibraryLog(const CassLogMessage *message, void *data)
Definition: cql_if.cc:1798
boost::ptr_vector< ColList > ColListVec
Definition: gendb_if.h:208
static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level)
Definition: cql_if.cc:1776
virtual CassError CassStatementBindInt64(CassStatement *statement, size_t index, cass_int64_t value)
Definition: cql_if.cc:3307
std::string cfname_
Definition: gendb_if.h:137
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)
Definition: cql_if.cc:3234
std::vector< GenDb::Endpoint > endpoints_
Definition: cql_if.h:153
size_t size() const
Definition: gendb_if.h:63
virtual void CassLogSetLevel(CassLogLevel log_level)
Definition: cql_if.cc:3456
static bool ExecuteQuerySyncInternal(interface::CassLibrary *cci, CassSession *session, CassStatement *qstatement, CassResultPtr *result, CassConsistency consistency)
Definition: cql_if.cc:1155
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
Definition: cql_if.cc:470
void operator()(const boost::uuids::uuid &tuuid, size_t index) const
Definition: cql_if.cc:295
int IsTableStatic(const std::string &table)
Definition: cql_if.cc:2058
virtual CassError CassStatementBindStringN(CassStatement *statement, size_t index, const char *value, size_t value_length)
Definition: cql_if.cc:3296
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)=0
bool IsTablePresent(const std::string &table)
Definition: cql_if.cc:2050
virtual bool Db_GetQueueStats(uint64_t *queue_count, uint64_t *enqueues) const
Definition: cql_if.cc:2945
void IncrementTableWriteFailStats(const std::string &table_name)
Definition: cql_if.cc:3002
bool ConnectSchemaSync()
Definition: cql_if.cc:2281
boost::function< void(size_t)> DbQueueWaterMarkCb
Definition: gendb_if.h:253
virtual CassError CassStatementBindDouble(CassStatement *statement, size_t index, cass_double_t value)
Definition: cql_if.cc:3317
static void ExecuteQueryResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx)
Definition: cql_if.cc:1543
static const std::string kQReadRepairChanceDTCS("read_repair_chance = 0.0")
static const CassTableMeta * GetCassTableMeta(interface::CassLibrary *cci, const CassSchemaMeta *schema_meta, const std::string &keyspace, const std::string &table, bool log_error)
Definition: cql_if.cc:1669
std::string GetHostIp(boost::asio::io_context *io_service, const std::string &hostname)
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)=0
static char * decode_uuid(char *input, CassUuid *output)
Definition: cql_if.cc:145
virtual bool Db_AddColumnSync(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2749
DbDataTypeVec columns_
Definition: gendb_if.h:142
virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:2598
static const std::string kQGCGraceSeconds("gc_grace_seconds = 0")
void IncrementTableReadStats(const std::string &table_name)
Definition: cql_if.cc:3025
std::string CassSelectFromTable(const std::string &table)
Definition: cql_if.cc:1037
static std::string LoadCertFile(const std::string &ca_certs_path)
Definition: cql_if.cc:1809
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:718
virtual bool Db_AddColumn(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbAddColumnCb cb)
Definition: cql_if.cc:2720
void operator()(const GenDb::Blob &tblob, size_t index) const
Definition: cql_if.cc:339
boost::scoped_ptr< interface::CassLibrary > cci_
Definition: cql_if.h:150
AsyncRowGetCallbackContext(GenDb::GenDbIf::DbGetRowCb cb, GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row)
Definition: cql_if.cc:2669
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)
Definition: cql_if.cc:3142
virtual CassFuture * CassSessionClose(CassSession *session)
Definition: cql_if.cc:3165
void operator()(const GenDb::Blob &tblob, const char *name) const
Definition: cql_if.cc:410
static bool IsCassTableMetaPresent(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table)
Definition: cql_if.cc:1695
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
Definition: cql_if.cc:2038
void operator()(const uint64_t &tu64, size_t index) const
Definition: cql_if.cc:315
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)
Definition: cql_if.cc:3179
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2123
virtual cass_bool_t CassValueIsNull(const CassValue *value)
Definition: cql_if.cc:3434
bool LoggingDisabled()
Definition: logging.cc:24
bool DynamicCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
Definition: cql_if.cc:855
std::string schema_contact_point_
Definition: cql_if_impl.h:345
NewCf::ColumnFamilyType cftype_
Definition: gendb_if.h:179
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)=0
SandeshTraceBufferPtr CqlTraceInfoBuf(SandeshTraceBufferCreate(CQLIF_INFO, 10000))
void operator()(const uint64_t &tu64, const char *name) const
Definition: cql_if.cc:386
boost::function< void(DbOpResult::type)> DbAddColumnCb
Definition: gendb_if.h:254
impl::CassSessionPtr schema_session_
Definition: cql_if_impl.h:342
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)
Definition: cql_if.cc:3195
static const char * kQCompactionStrategy("compaction = {'class': ""'org.apache.cassandra.db.compaction.%s'}")
void IncrementTableReadFailStats(const std::string &table_name)
Definition: cql_if.cc:3036
virtual CassError CassStatementBindStringByNameN(CassStatement *statement, const char *name, size_t name_length, const char *value, size_t value_length)
Definition: cql_if.cc:3333
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2404
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)
Definition: cql_if.cc:3243
#define CQLIF_INFO_TRACE(_Msg)
Definition: cql_if.cc:51
#define CQLIF_INFO
Definition: cql_if.cc:33
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)=0
boost::scoped_ptr< DbDataValueVec > name
Definition: gendb_if.h:180
CassSharedPtr< const CassResult > CassResultPtr
Definition: cql_if_impl.h:178
tbb::atomic< SessionState::type > schema_session_state_
Definition: cql_if_impl.h:344
DbDataValueVec finish_
Definition: gendb_if.h:226
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2154
void GetDiffs(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
void operator()(const uint32_t &tu32, size_t index) const
Definition: cql_if.cc:309
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)
Definition: cql_if.cc:3137
virtual void CassLogSetCallback(CassLogCallback callback, void *data)
Definition: cql_if.cc:3460
virtual CassError CassStatementBindInet(CassStatement *statement, size_t index, CassInet value)
Definition: cql_if.cc:3322
virtual CassError CassStatementBindInt64ByName(CassStatement *statement, const char *name, cass_int64_t value)
Definition: cql_if.cc:3346
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)
Definition: cql_if.cc:3160
#define GENERIC_RAW_ARRAY(obj)
Definition: misc_utils.h:32
virtual CassError CassStatementBindDoubleByName(CassStatement *statement, const char *name, cass_double_t value)
Definition: cql_if.cc:3356
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)=0
virtual void CassSchemaMetaFree(const CassSchemaMeta *schema_meta)
Definition: cql_if.cc:3190
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)=0
std::string keyspace_
Definition: cql_if_impl.h:346
virtual bool Db_GetRowAsync(const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbGetRowCb cb)
Definition: cql_if.cc:2793
virtual void CassIteratorFree(CassIterator *iterator)
Definition: cql_if.cc:3263
virtual size_t CassResultColumnCount(const CassResult *result)
Definition: cql_if.cc:3253
Task is a wrapper over tbb::task to support policies.
Definition: task.h:86
void operator()(const double &tdouble, const char *name) const
Definition: cql_if.cc:392
void operator()(const IpAddress &tipaddr, const char *name) const
Definition: cql_if.cc:397
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)
Definition: cql_if.cc:3404
#define CASS_LIB_TRACE(_Level, _Msg)
Definition: cql_if.cc:67
virtual CassIterator * CassIteratorFromResult(const CassResult *result)=0
#define CQLIF_DEBUG
Definition: cql_if.cc:32
virtual bool Db_GetRow(GenDb::ColList *out, const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency)
Definition: cql_if.cc:2838
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
Definition: cql_if.cc:2428
void GetCumulative(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2255
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)
Definition: cql_if.cc:3389
virtual CassError CassStatementBindUuid(CassStatement *statement, size_t index, CassUuid value)
Definition: cql_if.cc:3312
bool IsInsertIntoTablePrepareSupported(const std::string &table)
Definition: cql_if.cc:2168
static bool ExecuteQueryResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassResultPtr *result, CassConsistency consistency)
Definition: cql_if.cc:1186
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)
Definition: cql_if.cc:3210
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)=0
virtual CassError CassStatementBindInetByName(CassStatement *statement, const char *name, CassInet value)
Definition: cql_if.cc:3361
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)
Definition: cql_if.cc:3077
DbDataTypeVec value_
Definition: gendb_if.h:143
virtual CassError CassStatementBindBytesByNameN(CassStatement *statement, const char *name, size_t name_length, const cass_byte_t *value, size_t value_length)
Definition: cql_if.cc:3366
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)
Definition: cql_if.cc:3257
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)
Definition: cql_if.cc:3200
#define CQLIF_ERR_TRACE(_Msg)
Definition: cql_if.cc:59
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1950
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)
Definition: cql_if.cc:3394
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
Definition: cql_if.cc:1972
static EventManager evm
ColumnFamilyType cftype_
Definition: gendb_if.h:138
bool DisconnectSchemaSync()
Definition: cql_if.cc:2342
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
Definition: sandesh_trace.h:46
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1928
virtual CassSsl * CassSslNew()=0
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual void Db_SetInitDone(bool)
Definition: cql_if.cc:2559
void operator()(const uint16_t &tu16, const char *name) const
Definition: cql_if.cc:375
void IncrementTableWrite(const std::string &table_name)
void IncrementTableWriteBackPressureFail(const std::string &table_name)