8 #include <tbb/atomic.h>
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string.hpp>
11 #include <boost/algorithm/string/join.hpp>
12 #include <boost/unordered_map.hpp>
13 #include <boost/system/error_code.hpp>
15 #include <cassandra.h>
25 #include <database/gendb_constants.h>
30 using namespace boost::system;
32 #define CQLIF_DEBUG "CqlTraceBufDebug"
33 #define CQLIF_INFO "CqlTraceBufInfo"
34 #define CQLIF_ERR "CqlTraceBufErr"
43 #define CQLIF_DEBUG_TRACE(_Msg) \
45 std::stringstream _ss; \
46 _ss << __func__ << ":" << __FILE__ << ":" << \
47 __LINE__ << ": " << _Msg; \
48 CQL_TRACE_TRACE(CqlTraceDebugBuf, _ss.str()); \
51 #define CQLIF_INFO_TRACE(_Msg) \
53 std::stringstream _ss; \
54 _ss << __func__ << ":" << __FILE__ << ":" << \
55 __LINE__ << ": " << _Msg; \
56 CQL_TRACE_TRACE(CqlTraceInfoBuf, _ss.str()); \
59 #define CQLIF_ERR_TRACE(_Msg) \
61 std::stringstream _ss; \
62 _ss << __func__ << ":" << __FILE__ << ":" << \
63 __LINE__ << ": " << _Msg; \
64 CQL_TRACE_TRACE(CqlTraceErrBuf, _ss.str()); \
67 #define CASS_LIB_TRACE(_Level, _Msg) \
69 if (_Level == log4cplus::ERROR_LOG_LEVEL) { \
70 CQL_TRACE_TRACE(CqlTraceErrBuf, _Msg); \
71 } else if (_Level == log4cplus::DEBUG_LOG_LEVEL) { \
72 CQL_TRACE_TRACE(CqlTraceDebugBuf, _Msg); \
74 CQL_TRACE_TRACE(CqlTraceInfoBuf, _Msg); \
78 #define CQLIF_LOG(_Level, _Msg) \
80 if (LoggingDisabled()) break; \
81 log4cplus::Logger logger = log4cplus::Logger::getRoot(); \
82 LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \
83 __LINE__ << ": " << _Msg); \
86 #define CQLIF_LOG_ERR(_Msg) \
88 LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \
105 length(strlen(data)) {
119 uint64_t time_and_version = uuid.time_and_version;
120 output[3] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
121 time_and_version >>= 8;
122 output[2] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
123 time_and_version >>= 8;
124 output[1] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
125 time_and_version >>= 8;
126 output[0] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
127 time_and_version >>= 8;
129 output[5] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
130 time_and_version >>= 8;
131 output[4] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
132 time_and_version >>= 8;
134 output[7] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
135 time_and_version >>= 8;
136 output[6] =
static_cast<char>(time_and_version & 0x000000000000000FFLL);
138 uint64_t clock_seq_and_node = uuid.clock_seq_and_node;
139 for (
size_t i = 0; i < 8; ++i) {
140 output[15 - i] =
static_cast<char>(clock_seq_and_node & 0x00000000000000FFL);
141 clock_seq_and_node >>= 8;
146 output->time_and_version =
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[3]));
147 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[2])) << 8;
148 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[1])) << 16;
149 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[0])) << 24;
151 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[5])) << 32;
152 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[4])) << 40;
154 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[7])) << 48;
155 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[6])) << 56;
157 output->clock_seq_and_node = 0;
158 for (
size_t i = 0; i < 8; ++i) {
159 output->clock_seq_and_node |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[15 - i])) << (8 * i);
167 case GenDb::DbDataType::AsciiType:
169 case GenDb::DbDataType::LexicalUUIDType:
171 case GenDb::DbDataType::TimeUUIDType:
173 case GenDb::DbDataType::Unsigned8Type:
174 case GenDb::DbDataType::Unsigned16Type:
175 case GenDb::DbDataType::Unsigned32Type:
177 case GenDb::DbDataType::Unsigned64Type:
179 case GenDb::DbDataType::DoubleType:
181 case GenDb::DbDataType::UTF8Type:
183 case GenDb::DbDataType::InetType:
185 case GenDb::DbDataType::IntegerType:
187 case GenDb::DbDataType::BlobType:
190 assert(
false &&
"Invalid data type");
197 assert(!v_db_types.empty());
203 switch (dconsistency) {
205 return CASS_CONSISTENCY_ANY;
207 return CASS_CONSISTENCY_ONE;
209 return CASS_CONSISTENCY_TWO;
211 return CASS_CONSISTENCY_THREE;
213 return CASS_CONSISTENCY_QUORUM;
215 return CASS_CONSISTENCY_ALL;
217 return CASS_CONSISTENCY_LOCAL_QUORUM;
219 return CASS_CONSISTENCY_EACH_QUORUM;
221 return CASS_CONSISTENCY_SERIAL;
223 return CASS_CONSISTENCY_LOCAL_SERIAL;
225 return CASS_CONSISTENCY_LOCAL_ONE;
228 return CASS_CONSISTENCY_UNKNOWN;
237 quote_strings_(quote_strings) {
241 quote_strings_(true) {
248 os_ << to_string(tuuid);
253 os_ << (uint16_t)tu8;
256 if (quote_strings_) {
257 os_ <<
"'" << tstring <<
"'";
264 os_ << (int32_t)tu32;
268 os_ << (int64_t)tu64;
271 os_ <<
"'" << tipaddr <<
"'";
283 CassStatement *statement) :
285 statement_(statement) {
287 void operator()(
const boost::blank &tblank,
size_t index)
const {
288 assert(
false &&
"CassStatement bind to boost::blank not supported");
290 void operator()(
const std::string &tstring,
size_t index)
const {
291 CassError rc(cci_->CassStatementBindStringN(statement_, index,
292 tstring.c_str(), tstring.length()));
293 assert(rc == CASS_OK);
298 CassError rc(cci_->CassStatementBindUuid(statement_, index, cuuid));
299 assert(rc == CASS_OK);
302 CassError rc(cci_->CassStatementBindInt32(statement_, index, tu8));
303 assert(rc == CASS_OK);
306 CassError rc(cci_->CassStatementBindInt32(statement_, index, tu16));
307 assert(rc == CASS_OK);
310 assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
311 CassError rc(cci_->CassStatementBindInt32(statement_, index,
312 (cass_int32_t)tu32));
313 assert(rc == CASS_OK);
316 assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
317 CassError rc(cci_->CassStatementBindInt64(statement_, index,
318 (cass_int64_t)tu64));
319 assert(rc == CASS_OK);
322 CassError rc(cci_->CassStatementBindDouble(statement_, index,
323 (cass_double_t)tdouble));
324 assert(rc == CASS_OK);
328 if (tipaddr.is_v4()) {
329 boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
332 boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
335 CassError rc(cci_->CassStatementBindInet(statement_, index,
337 assert(rc == CASS_OK);
340 CassError rc(cci_->CassStatementBindBytes(statement_, index,
342 assert(rc == CASS_OK);
351 CassStatement *statement) :
353 statement_(statement) {
355 void operator()(
const boost::blank &tblank,
const char *name)
const {
356 assert(
false &&
"CassStatement bind to boost::blank not supported");
358 void operator()(
const std::string &tstring,
const char *name)
const {
359 CassError rc(cci_->CassStatementBindStringByNameN(statement_, name,
360 strlen(name), tstring.c_str(), tstring.length()));
361 assert(rc == CASS_OK);
366 CassError rc(cci_->CassStatementBindUuidByName(statement_, name,
368 assert(rc == CASS_OK);
370 void operator()(
const uint8_t &tu8,
const char *name)
const {
371 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
373 assert(rc == CASS_OK);
375 void operator()(
const uint16_t &tu16,
const char *name)
const {
376 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
378 assert(rc == CASS_OK);
380 void operator()(
const uint32_t &tu32,
const char *name)
const {
381 assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
382 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
383 (cass_int32_t)tu32));
384 assert(rc == CASS_OK);
386 void operator()(
const uint64_t &tu64,
const char *name)
const {
387 assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
388 CassError rc(cci_->CassStatementBindInt64ByName(statement_, name,
389 (cass_int64_t)tu64));
390 assert(rc == CASS_OK);
392 void operator()(
const double &tdouble,
const char *name)
const {
393 CassError rc(cci_->CassStatementBindDoubleByName(statement_, name,
394 (cass_double_t)tdouble));
395 assert(rc == CASS_OK);
399 if (tipaddr.is_v4()) {
400 boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
403 boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
406 CassError rc(cci_->CassStatementBindInetByName(statement_, name,
408 assert(rc == CASS_OK);
411 CassError rc(cci_->CassStatementBindBytesByNameN(statement_, name,
412 strlen(name), tblob.
data(), tblob.
size()));
413 assert(rc == CASS_OK);
420 "compaction = {'class': "
421 "'org.apache.cassandra.db.compaction.%s'}");
424 "read_repair_chance = 0.0");
431 const std::string &compaction_strategy) {
432 std::ostringstream query;
434 query <<
"CREATE TABLE IF NOT EXISTS " << cf.
cfname_ <<
" ";
437 assert(rkeys.size() == 1);
442 assert(!cfcolumns.empty());
443 BOOST_FOREACH(
const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
445 query <<
", \"" << cfcolumn.first <<
"\" " <<
450 compaction_strategy.c_str()));
451 assert(!(n < 0 || n >= (
int)
sizeof(cbuf)));
458 if (compaction_strategy ==
459 GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
460 query <<
") WITH " << std::string(cbuf) <<
" AND " <<
463 query <<
") WITH " << std::string(cbuf) <<
" AND " <<
471 const std::string &compaction_strategy,
472 boost::system::error_code *ec) {
473 std::ostringstream query;
475 *ec = errc::make_error_code(errc::success);
479 *ec = errc::make_error_code(errc::invalid_argument);
484 query <<
"CREATE TABLE IF NOT EXISTS " << cf.
cfname_ <<
" (";
487 int rk_size(rkeys.size());
488 for (
int i = 0; i < rk_size; i++) {
491 query <<
"key" << key_num;
499 int ccn_size(clustering_columns.size());
500 for (
int i = 0; i < ccn_size; i++) {
502 query <<
"column" << cnum <<
" " <<
507 int cn_size(columns.size());
508 for (
int i = 0; i < cn_size; i++) {
509 int cnum(i + 1 + ccn_size);
510 query <<
"column" << cnum <<
" " <<
515 if (values.size() > 0) {
519 query <<
"PRIMARY KEY (";
520 std::ostringstream rkey_ss;
521 for (
int i = 0; i < rk_size; i++) {
524 rkey_ss <<
", key" << key_num;
530 query <<
"(" << rkey_ss.str() <<
"), ";
532 query << rkey_ss.str() <<
", ";
534 for (
int i = 0; i < ccn_size; i++) {
539 query <<
"column" << cnum;
543 compaction_strategy.c_str()));
544 assert(!(n < 0 || n >= (
int)
sizeof(cbuf)));
551 if (compaction_strategy ==
552 GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
553 query <<
")) WITH " << std::string(cbuf) <<
" AND " <<
556 query <<
")) WITH " << std::string(cbuf) <<
" AND " <<
564 switch (index_mode) {
567 case GenDb::ColIndexMode::PREFIX:
569 case GenDb::ColIndexMode::CONTAINS:
572 assert(
false &&
"INVALID");
581 const std::string &column,
const std::string &indexname,
583 std::ostringstream query;
590 query <<
"INDEX IF NOT EXISTS " << indexname <<
" ";
592 query <<
"ON " << cfname <<
"(\""<< column <<
"\")";
595 query <<
" USING \'org.apache.cassandra.index.sasi.SASIIndex\' " <<
607 std::ostringstream query;
609 const std::string &table(v_columns->
cfname_);
610 query <<
"INSERT INTO " << table <<
" (";
611 std::ostringstream values_ss;
612 values_ss <<
"VALUES (";
616 int rk_size(rkeys.size());
617 for (
int i = 0; i < rk_size; i++) {
620 query <<
", key" << key_num;
627 boost::apply_visitor(values_printer, rkeys[i]);
637 assert(cnames.size() == 1);
640 boost::apply_visitor(cnames_printer, cnames[0]);
645 assert(cvalues.size() == 1);
646 boost::apply_visitor(values_printer, cvalues[0]);
652 query << values_ss.str();
654 query <<
" USING TTL " << cttl;
660 std::ostringstream query;
662 const std::string &table(v_columns->
cfname_);
663 query <<
"INSERT INTO " << table <<
" (";
664 std::ostringstream values_ss;
667 int rk_size(rkeys.size());
669 for (
int i = 0; i < rk_size; i++) {
672 query <<
", key" << key_num;
676 boost::apply_visitor(values_printer, rkeys[i]);
681 assert(columns.size() == 1);
686 int cn_size(cnames.size());
687 for (
int i = 0; i < cn_size; i++) {
690 query <<
", column" << cnum;
691 boost::apply_visitor(values_printer, cnames[i]);
692 if (i != cn_size - 1) {
699 if (cvalues.size() > 0) {
700 query <<
", value) VALUES (";
702 boost::apply_visitor(values_printer, cvalues[0]);
704 query <<
") VALUES (";
707 query << values_ss.str();
708 if (column.ttl > 0) {
709 query <<
" USING TTL " << column.ttl;
719 std::ostringstream query;
721 query <<
"INSERT INTO " << cf.
cfname_ <<
" ";
724 assert(rkeys.size() == 1);
725 std::ostringstream values_ss;
727 values_ss <<
") VALUES (?";
730 assert(!cfcolumns.empty());
731 BOOST_FOREACH(
const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
733 query <<
", \"" << cfcolumn.first <<
"\"";
736 query << values_ss.str();
737 query <<
") USING TTL ?";
742 boost::system::error_code *ec) {
743 std::ostringstream query;
745 *ec = errc::make_error_code(errc::success);
749 *ec = errc::make_error_code(errc::invalid_argument);
754 query <<
"INSERT INTO " << cf.
cfname_ <<
" (";
757 int rk_size(rkeys.size());
758 std::ostringstream values_ss;
759 for (
int i = 0; i < rk_size; i++) {
762 query <<
"key" << key_num;
771 int ccn_size(clustering_columns.size());
772 for (
int i = 0; i < ccn_size; i++) {
774 query <<
"column" << cnum;
776 if (i != ccn_size - 1) {
783 int cn_size(columns.size());
788 for (
int i = 0; i < cn_size; i++) {
789 int cnum(i + 1 + ccn_size);
790 query <<
"column" << cnum;
792 if (i != cn_size - 1) {
799 if (values.size() > 0) {
803 query <<
") VALUES (";
805 query << values_ss.str();
806 query <<
" USING TTL ?";
815 CassStatement *statement,
820 int rk_size(rkeys.size());
822 for (; (int) idx < rk_size; idx++) {
825 int key_num(idx + 1);
830 boost::apply_visitor(boost::bind(values_binder, _1, rk_name.c_str()),
838 assert(cnames.size() == 1);
840 std::string cname(boost::get<std::string>(cnames[0]));
842 assert(cvalues.size() == 1);
843 boost::apply_visitor(boost::bind(values_binder, _1, cname.c_str()),
850 (cass_int32_t)cttl));
851 assert(rc == CASS_OK);
856 CassStatement *statement,
861 int rk_size(rkeys.size());
863 for (; (int) idx < rk_size; idx++) {
864 boost::apply_visitor(boost::bind(values_binder, _1, idx), rkeys[idx]);
868 assert(columns.size() == 1);
873 int cn_size(cnames.size());
874 for (
int i = 0; i < cn_size; i++, idx++) {
875 boost::apply_visitor(boost::bind(values_binder, _1, idx), cnames[i]);
879 if (cvalues.size() > 0) {
880 boost::apply_visitor(boost::bind(values_binder, _1, idx++),
884 (cass_int32_t)column.ttl));
885 assert(rc == CASS_OK);
890 const std::vector<GenDb::DbDataValueVec> &rkeys,
894 std::ostringstream query;
896 if (read_vec.empty()) {
897 query <<
"SELECT * FROM " << table;
900 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
901 it != read_vec.end(); it++) {
902 query << it->get<0>() <<
",";
903 bool read_timestamp = it->get<3>();
904 if (read_timestamp) {
905 query <<
"WRITETIME(" << it->get<0>() <<
"),";
908 query.seekp(-1, query.cur);
909 query <<
" FROM " << table;
911 if (rkeys.size() == 1) {
913 int rk_size(rkey.size());
915 for (
int i = 0; i < rk_size; i++) {
918 query <<
" AND key" << key_num <<
"=";
920 query <<
" WHERE key=";
922 boost::apply_visitor(cprinter, rkey[i]);
925 }
else if (rkeys.size() > 1) {
926 query <<
" WHERE key IN (";
928 int rk_size(rkey.size());
929 assert(rk_size == 1);
931 boost::apply_visitor(cprinter, rkey[0]);
934 query.seekp(-1, query.cur);
937 if (!where_vec.empty()) {
938 for (GenDb::WhereIndexInfoVec::const_iterator it = where_vec.begin();
939 it != where_vec.end(); ++it) {
940 std::ostringstream value_ss;
942 boost::apply_visitor(value_vprinter, it->get<2>());
944 query <<
" " << it->get<0>();
946 query <<
" " << value_ss.str();
950 if (!ck_range.
start_.empty()) {
951 int ck_start_size(ck_range.
start_.size());
952 std::ostringstream start_ss;
956 for (
int i = 0; i < ck_start_size; i++) {
962 query <<
"column" << cnum;
963 boost::apply_visitor(start_vprinter, ck_range.
start_[i]);
967 query << start_ss.str();
969 if (!ck_range.
finish_.empty()) {
970 int ck_finish_size(ck_range.
finish_.size());
971 std::ostringstream finish_ss;
976 for (
int i = 0; i < ck_finish_size; i++) {
982 query <<
"column" << cnum;
983 boost::apply_visitor(finish_vprinter, ck_range.
finish_[i]);
987 query << finish_ss.str();
990 query <<
" LIMIT " << ck_range.
count_;
993 if (where_vec.size() > 1) {
994 query <<
" ALLOW FILTERING";
1004 std::vector<GenDb::DbDataValueVec> rkey_vec;
1005 rkey_vec.push_back(rkeys);
1007 read_vec, where_vec);
1012 std::vector<GenDb::DbDataValueVec> rkey_vec;
1013 rkey_vec.push_back(rkeys);
1023 std::vector<GenDb::DbDataValueVec> rkey_vec;
1024 rkey_vec.push_back(rkeys);
1030 const std::string &table,
const std::vector<GenDb::DbDataValueVec> &rkeys,
1038 std::vector<GenDb::DbDataValueVec> rkey_vec;
1051 case CASS_VALUE_TYPE_ASCII:
1052 case CASS_VALUE_TYPE_VARCHAR:
1053 case CASS_VALUE_TYPE_TEXT: {
1057 assert(rc == CASS_OK);
1058 return std::string(ctstring.
data, ctstring.
length);
1060 case CASS_VALUE_TYPE_UUID: {
1063 assert(rc == CASS_OK);
1068 case CASS_VALUE_TYPE_DOUBLE: {
1069 cass_double_t ctdouble;
1071 assert(rc == CASS_OK);
1072 return (
double)ctdouble;
1074 case CASS_VALUE_TYPE_TINY_INT: {
1077 assert(rc == CASS_OK);
1078 return (uint8_t)ct8;
1080 case CASS_VALUE_TYPE_SMALL_INT: {
1083 assert(rc == CASS_OK);
1084 return (uint16_t)ct16;
1086 case CASS_VALUE_TYPE_INT: {
1089 assert(rc == CASS_OK);
1090 return (uint32_t)ct32;
1092 case CASS_VALUE_TYPE_BIGINT: {
1095 assert(rc == CASS_OK);
1096 return (uint64_t)ct64;
1098 case CASS_VALUE_TYPE_INET: {
1101 assert(rc == CASS_OK);
1103 if (ctinet.address_length == CASS_INET_V4_LENGTH) {
1104 Ip4Address::bytes_type ipv4;
1107 }
else if (ctinet.address_length == CASS_INET_V6_LENGTH) {
1108 Ip6Address::bytes_type ipv6;
1116 case CASS_VALUE_TYPE_BLOB: {
1117 const cass_byte_t *bytes(NULL);
1120 assert(rc == CASS_OK);
1123 case CASS_VALUE_TYPE_UNKNOWN: {
1129 assert(
false &&
"Unhandled value type");
1136 CassSession *session,
const char* query,
1143 if (rc != CASS_OK) {
1152 return rc == CASS_OK;
1156 CassSession *session,
1158 CassConsistency consistency) {
1164 if (rc != CASS_OK) {
1175 return rc == CASS_OK;
1179 CassSession *session,
const char *query, CassConsistency consistency) {
1187 CassSession *session,
const char *query,
1196 CassSession *session, CassStatement *statement,
1197 CassConsistency consistency) {
1206 case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1207 case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
1208 case CASS_ERROR_LIB_NO_AVAILABLE_IO_THREAD:
1226 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1227 it != read_vec.end(); it++) {
1228 bool row_key = it->get<1>();
1229 bool row_column = it->get<2>();
1230 bool read_timestamp = it->get<3>();
1239 cnames->push_back(db_value);
1241 values->push_back(db_value);
1242 if (read_timestamp) {
1247 timestamps->push_back(time_value);
1253 v_columns->push_back(column);
1260 std::auto_ptr<GenDb::ColList> col_list;
1270 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1271 it != read_vec.end(); it++) {
1272 bool row_key = it->get<1>();
1273 bool row_column = it->get<2>();
1274 bool read_timestamp = it->get<3>();
1280 rkey.push_back(db_value);
1288 cnames->push_back(db_value);
1290 values->push_back(db_value);
1291 if (read_timestamp) {
1296 timestamps->push_back(time_value);
1303 if (!col_list.get()) {
1305 col_list->rowkey_ = rkey;
1307 if (rkey != col_list->rowkey_) {
1308 v_col_list->push_back(col_list.release());
1310 col_list->rowkey_ = rkey;
1313 v_columns->push_back(column);
1315 if (col_list.get()) {
1316 v_col_list->push_back(col_list.release());
1331 for (
size_t i = rk_count; i < rk_count + ck_count; i++) {
1335 cnames->push_back(db_value);
1339 for (
size_t i = rk_count + ck_count; i < ccount; i++) {
1343 values->push_back(db_value);
1346 v_columns->push_back(column);
1353 std::auto_ptr<GenDb::ColList> col_list;
1362 for (
size_t i = 0; i < rk_count; i++) {
1366 rkey.push_back(db_value);
1370 for (
size_t i = rk_count; i < rk_count + ck_count; i++) {
1374 cnames->push_back(db_value);
1378 for (
size_t i = rk_count + ck_count; i < ccount; i++) {
1382 values->push_back(db_value);
1386 if (!col_list.get()) {
1388 col_list->rowkey_ = rkey;
1390 if (rkey != col_list->rowkey_) {
1391 v_col_list->push_back(col_list.release());
1393 col_list->rowkey_ = rkey;
1396 v_columns->push_back(column);
1398 if (col_list.get()) {
1399 v_col_list->push_back(col_list.release());
1411 for (
size_t i = 0; i < ccount; i++) {
1415 assert(rc == CASS_OK);
1423 std::string(cname.
data, cname.
length), db_value, 0));
1424 v_columns->push_back(column);
1431 std::auto_ptr<GenDb::ColList> col_list;
1440 for (
size_t i = 0; i < rk_count; i++) {
1444 rkey.push_back(db_value);
1447 if (!col_list.get()) {
1449 col_list->rowkey_ = rkey;
1451 if (rkey != col_list->rowkey_) {
1452 v_col_list->push_back(col_list.release());
1454 col_list->rowkey_ = rkey;
1457 for (
size_t i = 0; i < ccount; i++) {
1461 assert(rc == CASS_OK);
1469 std::string(cname.
data, cname.
length), db_value, 0));
1470 v_columns->push_back(column);
1473 if (col_list.get()) {
1474 v_col_list->push_back(col_list.release());
1480 std::auto_ptr<CassAsyncQueryContext> ctx(
1481 boost::reinterpret_pointer_cast<CassAsyncQueryContext>(data));
1485 if (rc != CASS_OK) {
1490 ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1493 if (ctx->result_ctx_) {
1499 col_list->cfname_ = rctx->cf_name_;
1500 col_list->rowkey_ = rctx->row_key_;
1501 if (rctx->is_dynamic_cf_) {
1503 rctx->ck_count_, &col_list->columns_);
1507 ctx->cb_(db_rc, col_list);
1511 ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1515 CassSession *session,
const char *qid, CassStatement *qstatement,
1521 std::auto_ptr<CassAsyncQueryContext> ctx(
1528 CassSession *session,
const char *query,
1537 CassSession *session,
const char *query_id, CassStatement *qstatement,
1544 CassSession *session,
const char *query, CassConsistency consistency,
1548 consistency, cb, rctx);
1552 CassSession *session,
const char *query, CassConsistency consistency,
1555 std::auto_ptr<CassQueryResultContext> rctx(
1557 rk_count, ck_count));
1564 CassSession *session,
const char *query,
1578 CassSession *session,
const char *query,
1592 CassSession *session,
const char *query,
1593 size_t rk_count,
size_t ck_count, CassConsistency consistency,
1606 CassSession *session,
const char *query,
1607 size_t rk_count,
size_t ck_count, CassConsistency consistency,
1620 CassSession *session,
const char *query,
1623 std::auto_ptr<CassQueryResultContext> rctx(
1631 CassSession *session,
const char *query,
1644 CassSession *session,
const char *query,
size_t rk_count,
1657 CassFuture *future) {
1660 if (rc != CASS_OK) {
1666 return rc == CASS_OK;
1671 const std::string &keyspace,
const std::string &table,
bool log_error) {
1672 const CassKeyspaceMeta *keyspace_meta(
1674 if (keyspace_meta == NULL) {
1677 ", Table: " << table);
1681 std::string table_lower(table);
1682 boost::algorithm::to_lower(table_lower);
1683 const CassTableMeta *table_meta(
1685 if (table_meta == NULL) {
1688 ", Table: " << table_lower);
1696 CassSession *session,
1697 const std::string &keyspace,
const std::string &table) {
1700 if (schema_meta.get() == NULL) {
1702 ", Table: " << table);
1705 bool log_error(
false);
1707 schema_meta.get(), keyspace, table, log_error));
1708 if (table_meta == NULL) {
1716 CassSession *session,
const std::string &keyspace,
1717 const std::string &table,
size_t *ck_count) {
1720 if (schema_meta.get() == NULL) {
1722 ", Table: " << table);
1725 bool log_error(
true);
1727 schema_meta.get(), keyspace, table, log_error));
1728 if (table_meta == NULL) {
1737 const std::string &keyspace,
const std::string &table,
size_t *rk_count) {
1740 if (schema_meta.get() == NULL) {
1742 ", Table: " << table);
1745 bool log_error(
true);
1747 schema_meta.get(), keyspace, table, log_error));
1748 if (table_meta == NULL) {
1757 case CASS_LOG_DISABLED:
1758 return log4cplus::OFF_LOG_LEVEL;
1759 case CASS_LOG_CRITICAL:
1760 return log4cplus::FATAL_LOG_LEVEL;
1761 case CASS_LOG_ERROR:
1762 return log4cplus::ERROR_LOG_LEVEL;
1764 return log4cplus::WARN_LOG_LEVEL;
1766 return log4cplus::INFO_LOG_LEVEL;
1767 case CASS_LOG_DEBUG:
1768 return log4cplus::DEBUG_LOG_LEVEL;
1769 case CASS_LOG_TRACE:
1770 return log4cplus::TRACE_LOG_LEVEL;
1772 return log4cplus::ALL_LOG_LEVEL;
1778 case log4cplus::OFF_LOG_LEVEL:
1779 return CASS_LOG_DISABLED;
1780 case log4cplus::FATAL_LOG_LEVEL:
1781 return CASS_LOG_CRITICAL;
1782 case log4cplus::ERROR_LOG_LEVEL:
1783 return CASS_LOG_ERROR;
1784 case log4cplus::WARN_LOG_LEVEL:
1785 return CASS_LOG_WARN;
1786 case log4cplus::INFO_LOG_LEVEL:
1787 return CASS_LOG_INFO;
1788 case log4cplus::DEBUG_LOG_LEVEL:
1789 return CASS_LOG_DEBUG;
1790 case log4cplus::TRACE_LOG_LEVEL:
1791 return CASS_LOG_TRACE;
1793 assert(
false &&
"Invalid Log4Level");
1794 return CASS_LOG_DISABLED;
1802 log4cplus::LogLevel log4level(
Cass2log4Level(message->severity));
1803 std::stringstream buf;
1804 buf <<
"CassLibrary: " << message->file <<
":" << message->line <<
1805 " " << message->function <<
"] " << message->message;
1810 if (ca_certs_path.length() == 0) {
1811 return std::string();
1813 std::ifstream file(ca_certs_path.c_str());
1815 return std::string();
1817 std::string content((std::istreambuf_iterator<char>(file)),
1818 std::istreambuf_iterator<char>());
1827 Task(task_id, task_instance),
1835 return "cass::cql::impl::WorkerTask";
1847 const std::vector<std::string> &cassandra_ips,
1849 const std::string &cassandra_user,
1850 const std::string &cassandra_password,
1852 const std::string &ca_certs_path,
1856 cluster_(cci_->CassClusterNew(), cci_),
1858 session_(cci_->CassSessionNew(), cci_),
1859 schema_session_(cci_->CassSessionNew(), cci_),
1861 io_thread_count_(2) {
1866 if (cassandra_ips.size() > 0) {
1868 boost::system::error_code ec;
1869 boost::asio::ip::address::from_string(cassandra_ips[0], ec);
1870 if(ec.value() != 0){
1881 if (content.length() == 0) {
1888 std::string contact_points(boost::algorithm::join(cassandra_ips,
","));
1892 if (!cassandra_user.empty() && !cassandra_password.empty()) {
1894 cassandra_password.c_str());
1912 const std::string &replication_factor, CassConsistency consistency) {
1918 keyspace.c_str(), replication_factor.c_str()));
1919 if (n < 0 || n >= (
int)
sizeof(buf)) {
1921 keyspace <<
", RF: " << replication_factor);
1929 CassConsistency consistency) {
1934 int n(snprintf(buf,
sizeof(buf),
kQUseKeyspace, keyspace.c_str()));
1935 if (n < 0 || n >= (
int)
sizeof(buf)) {
1951 CassConsistency consistency) {
1956 int n(snprintf(buf,
sizeof(buf),
kQUseKeyspace, keyspace.c_str()));
1957 if (n < 0 || n >= (
int)
sizeof(buf)) {
1973 const std::string &compaction_strategy, CassConsistency consistency) {
1985 compaction_strategy);
1990 boost::system::error_code ec;
1992 compaction_strategy, &ec);
2008 const std::string &column,
const std::string &indexname,
2014 indexname, index_mode));
2020 const std::string &table_name(cf.
cfname_);
2033 std::make_pair(table_name, prepared))).second;
2041 CassPreparedMapType::const_iterator it(
2046 *prepared = it->second;
2067 if (ck_count == 0) {
2083 query.c_str(), consistency, cb, cfname.c_str(), rkey);
2092 query.c_str(), consistency, cb, rk_count, ck_count,
2110 rkey, ck_range, where_vec, read_vec));
2119 query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2141 query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2150 CassConsistency consistency) {
2170 (table !=
"MessageTablev2");
2183 query.c_str(), consistency, out);
2192 query.c_str(), rk_count, ck_count, consistency, out);
2209 query.c_str(), rk_count, consistency, out);
2215 query.c_str(), rk_count, ck_count, consistency, out);
2232 rkey, ck_range, read_vec));
2235 query.c_str(), read_vec, consistency, out);
2239 const std::vector<GenDb::DbDataValueVec> &rkeys,
2248 rkeys, ck_range, read_vec));
2251 query.c_str(), read_vec, consistency, out);
2273 query.c_str(), rk_count, ck_count, consistency, out);
2360 CassMetrics cass_metrics;
2363 metrics->requests.min = cass_metrics.requests.min;
2364 metrics->requests.max = cass_metrics.requests.max;
2365 metrics->requests.mean = cass_metrics.requests.mean;
2366 metrics->requests.stddev = cass_metrics.requests.stddev;
2367 metrics->requests.median = cass_metrics.requests.median;
2368 metrics->requests.percentile_75th =
2369 cass_metrics.requests.percentile_75th;
2370 metrics->requests.percentile_95th =
2371 cass_metrics.requests.percentile_95th;
2372 metrics->requests.percentile_98th =
2373 cass_metrics.requests.percentile_98th;
2374 metrics->requests.percentile_99th =
2375 cass_metrics.requests.percentile_99th;
2376 metrics->requests.percentile_999th =
2377 cass_metrics.requests.percentile_999th;
2378 metrics->requests.mean_rate = cass_metrics.requests.mean_rate;
2379 metrics->requests.one_minute_rate =
2380 cass_metrics.requests.one_minute_rate;
2381 metrics->requests.five_minute_rate =
2382 cass_metrics.requests.five_minute_rate;
2383 metrics->requests.fifteen_minute_rate =
2384 cass_metrics.requests.fifteen_minute_rate;
2386 metrics->stats.total_connections =
2387 cass_metrics.stats.total_connections;
2388 metrics->stats.available_connections =
2389 cass_metrics.stats.available_connections;
2390 metrics->stats.exceeded_pending_requests_water_mark =
2391 cass_metrics.stats.exceeded_pending_requests_water_mark;
2392 metrics->stats.exceeded_write_bytes_water_mark =
2393 cass_metrics.stats.exceeded_write_bytes_water_mark;
2395 metrics->errors.connection_timeouts =
2396 cass_metrics.errors.connection_timeouts;
2397 metrics->errors.pending_request_timeouts =
2398 cass_metrics.errors.pending_request_timeouts;
2399 metrics->errors.request_timeouts =
2400 cass_metrics.errors.request_timeouts;
2405 CassConsistency consistency,
bool sync,
2442 boost::system::error_code ec;
2444 if (ec.value() != boost::system::errc::success) {
2459 std::auto_ptr<GenDb::ColList> v_columns,
2460 CassConsistency consistency,
bool sync,
2469 v_columns->cfname_);
2488 qstatement.get(), consistency);
2490 std::string qid(
"Prepare: " + v_columns->cfname_);
2492 qstatement.get(), consistency, cb);
2498 "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH "
2499 "replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }");
2507 const std::vector<std::string> &cassandra_ips,
2509 const std::string &cassandra_user,
2510 const std::string &cassandra_password,
2512 const std::string &ca_certs_path,
2513 bool create_schema) :
2514 cci_(new interface::CassDatastaxLibrary),
2515 impl_(new
CqlIfImpl(evm, cassandra_ips, cassandra_port,
2516 cassandra_user, cassandra_password, use_ssl,
2517 ca_certs_path, cci_.get())),
2518 use_prepared_for_insert_(true),
2519 create_schema_(create_schema) {
2522 log4cplus::Logger::getRoot().getLogLevel()));
2525 BOOST_FOREACH(
const std::string &cassandra_ip, cassandra_ips) {
2526 boost::system::error_code ec;
2527 boost::asio::ip::address cassandra_addr(
2543 impl_->SetRequestTimeout(GenDb::g_gendb_constants.SCHEMA_REQUEST_TIMEOUT);
2544 bool success(
impl_->ConnectSchemaSync());
2549 return impl_->ConnectSync();
2554 impl_->DisconnectSchemaSync();
2556 impl_->DisconnectSync();
2564 impl_->SetRequestTimeout(GenDb::g_gendb_constants.DEFAULT_REQUEST_TIMEOUT);
2565 impl_->DisconnectSchemaSync();
2572 const std::string &replication_factor) {
2573 bool success(
impl_->CreateKeyspaceIfNotExistsSync(tablespace,
2574 replication_factor, CASS_CONSISTENCY_QUORUM));
2579 success =
impl_->UseKeyspaceSyncOnSchemaSession(tablespace,
2580 CASS_CONSISTENCY_ONE);
2589 bool success(
impl_->UseKeyspaceSync(tablespace, CASS_CONSISTENCY_ONE));
2599 const std::string &compaction_strategy) {
2601 impl_->CreateTableIfNotExistsSync(cf, compaction_strategy,
2602 CASS_CONSISTENCY_QUORUM));
2609 success =
impl_->LocatePrepareInsertIntoTable(cf);
2626 bool success(
impl_->IsTablePresent(cfname));
2638 const std::string &column,
const std::string &indexname,
2640 bool success(
impl_->CreateIndexIfNotExistsSync(cfname, column, indexname,
2641 CASS_CONSISTENCY_QUORUM, index_mode));
2652 std::auto_ptr<GenDb::ColList> row,
2677 std::auto_ptr<GenDb::ColList>
row_;
2681 boost::shared_ptr<AsyncRowGetCallbackContext> cb_ctx) {
2682 cb_ctx->cb_(cb_ctx->drc_, cb_ctx->row_);
2686 std::auto_ptr<GenDb::ColList> row, std::string cfname,
2688 int task_instance) {
2700 boost::shared_ptr<AsyncRowGetCallbackContext> ctx(
2704 task_id, task_instance));
2716 std::auto_ptr<GenDb::ColList> row, std::string cfname,
2723 std::string cfname(cl->cfname_);
2732 impl_->IsInsertIntoTablePrepareSupported(cfname)) {
2733 success =
impl_->InsertIntoTablePrepareAsync(cl, consistency,
2737 success =
impl_->InsertIntoTableAsync(cl, consistency,
2751 std::string cfname(cl->cfname_);
2753 bool success(
impl_->InsertIntoTableSync(cl, consistency));
2768 bool success(
impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2770 _1, _2, cfname, cb)));
2783 bool success(
impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2785 _1, _2, cfname, cb,
true, task_id, task_instance)));
2798 bool success(
impl_->SelectFromTableAsync(cfname, rowkey,
2813 bool success(
impl_->SelectFromTableAsync(cfname, rowkey,
2815 cfname, cb,
true, task_id, task_instance)));
2828 bool success(
impl_->SelectFromTableClusteringKeyRangeAndIndexValueAsync(cfname,
2842 bool success(
impl_->SelectFromTableSync(cfname, rowkey,
2859 bool success(
impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2860 rowkey, crange, consistency, read_vec, &out->
columns_));
2871 const std::vector<GenDb::DbDataValueVec> &v_rowkey) {
2875 v_columns->rowkey_ = rkey;
2876 bool success(
impl_->SelectFromTableSync(cfname, rkey,
2877 CASS_CONSISTENCY_ONE, &v_columns->columns_));
2885 out->push_back(v_columns.release());
2892 const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2897 v_columns->rowkey_ = rkey;
2898 bool success(
impl_->SelectFromTableClusteringKeyRangeSync(cfname,
2899 rkey, crange, CASS_CONSISTENCY_ONE, &v_columns->columns_));
2903 " Clustering Key Range: " << crange.
ToString() <<
" FAILED");
2908 out->push_back(v_columns.release());
2915 const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2920 bool success(
impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2921 v_rowkey, crange, consistency, read_vec, out));
2934 bool success(
impl_->SelectFromTableSync(cfname, consistency, out));
2946 uint64_t *enqueues)
const {
2962 GenDb::DbErrors *dbe) {
2969 GenDb::DbErrors *dbe)
const {
2976 return impl_->GetMetrics(metrics);
2981 bool success(
impl_->GetMetrics(&metrics));
2985 db_stats->requests_one_minute_rate = metrics.requests.one_minute_rate;
2986 db_stats->stats = metrics.stats;
2987 db_stats->errors = metrics.errors;
2997 uint64_t num_writes) {
3008 uint64_t num_writes) {
3014 const std::string &table_name) {
3020 const std::string &table_name) {
3031 uint64_t num_reads) {
3042 uint64_t num_reads) {
3057 namespace interface {
3070 return cass_cluster_new();
3074 cass_cluster_free(cluster);
3078 CassCluster* cluster,
const char* contact_points) {
3079 return cass_cluster_set_contact_points(cluster, contact_points);
3084 return cass_cluster_set_port(cluster, port);
3088 cass_cluster_set_ssl(cluster, ssl);
3092 const char* username,
const char* password) {
3093 cass_cluster_set_credentials(cluster, username, password);
3097 unsigned num_threads) {
3098 return cass_cluster_set_num_threads_io(cluster, num_threads);
3102 CassCluster* cluster,
unsigned num_requests) {
3103 return cass_cluster_set_pending_requests_high_water_mark(cluster,
3108 CassCluster* cluster,
unsigned num_requests) {
3109 return cass_cluster_set_pending_requests_low_water_mark(cluster,
3114 CassCluster* cluster,
unsigned num_bytes) {
3115 return cass_cluster_set_write_bytes_high_water_mark(cluster, num_bytes);
3119 CassCluster* cluster,
unsigned num_bytes) {
3120 return cass_cluster_set_write_bytes_low_water_mark(cluster, num_bytes);
3124 CassCluster* cluster,
const char* hosts) {
3125 cass_cluster_set_whitelist_filtering(cluster, hosts);
3130 return cass_ssl_new();
3134 return cass_ssl_free(ssl);
3138 const std::string &cert) {
3139 return cass_ssl_add_trusted_cert_n(ssl, cert.c_str(), cert.length());
3143 cass_ssl_set_verify_flags(ssl, flags);
3148 return cass_session_new();
3152 cass_session_free(session);
3156 unsigned timeout_ms) {
3157 return cass_cluster_set_request_timeout(cluster, timeout_ms);
3161 const CassCluster* cluster) {
3162 return cass_session_connect(session, cluster);
3166 return cass_session_close(session);
3170 const CassStatement* statement) {
3171 return cass_session_execute(session, statement);
3175 const CassSession* session) {
3176 return cass_session_get_schema_meta(session);
3180 const char* query) {
3181 return cass_session_prepare(session, query);
3185 CassMetrics* output) {
3186 cass_session_get_metrics(session, output);
3191 const CassSchemaMeta* schema_meta) {
3192 cass_schema_meta_free(schema_meta);
3196 const CassSchemaMeta* schema_meta,
const char* keyspace) {
3197 return cass_schema_meta_keyspace_by_name(schema_meta, keyspace);
3201 const CassKeyspaceMeta* keyspace_meta,
const char* table) {
3202 return cass_keyspace_meta_table_by_name(keyspace_meta, table);
3206 const CassTableMeta* table_meta) {
3207 return cass_table_meta_partition_key_count(table_meta);
3211 const CassTableMeta* table_meta) {
3212 return cass_table_meta_clustering_key_count(table_meta);
3217 cass_future_free(future);
3221 CassFutureCallback callback,
void* data) {
3222 return cass_future_set_callback(future, callback, data);
3226 cass_future_wait(future);
3230 CassFuture* future) {
3231 return cass_future_get_result(future);
3235 const char** message,
size_t* message_length) {
3236 cass_future_error_message(future, message, message_length);
3240 return cass_future_error_code(future);
3244 CassFuture* future) {
3245 return cass_future_get_prepared(future);
3250 cass_result_free(result);
3254 return cass_result_column_count(result);
3258 size_t index,
const char** name,
size_t* name_length) {
3259 return cass_result_column_name(result, index, name, name_length);
3264 cass_iterator_free(iterator);
3268 const CassResult* result) {
3269 return cass_iterator_from_result(result);
3273 return cass_iterator_next(iterator);
3277 const CassIterator* iterator) {
3278 return cass_iterator_get_row(iterator);
3283 size_t parameter_count) {
3284 return cass_statement_new(query, parameter_count);
3288 cass_statement_free(statement);
3292 CassStatement* statement, CassConsistency consistency) {
3293 return cass_statement_set_consistency(statement, consistency);
3297 CassStatement* statement,
3298 size_t index,
const char* value,
size_t value_length) {
3299 return cass_statement_bind_string_n(statement, index, value, value_length);
3303 size_t index, cass_int32_t value) {
3304 return cass_statement_bind_int32(statement, index, value);
3308 size_t index, cass_int64_t value) {
3309 return cass_statement_bind_int64(statement, index, value);
3313 size_t index, CassUuid value) {
3314 return cass_statement_bind_uuid(statement, index, value);
3318 CassStatement* statement,
size_t index, cass_double_t value) {
3319 return cass_statement_bind_double(statement, index, value);
3323 size_t index, CassInet value) {
3324 return cass_statement_bind_inet(statement, index, value);
3328 CassStatement* statement,
3329 size_t index,
const cass_byte_t* value,
size_t value_length) {
3330 return cass_statement_bind_bytes(statement, index, value, value_length);
3334 CassStatement* statement,
3335 const char* name,
size_t name_length,
const char* value,
3336 size_t value_length) {
3337 return cass_statement_bind_string_by_name_n(statement, name, name_length,
3338 value, value_length);
3342 CassStatement* statement,
const char* name, cass_int32_t value) {
3343 return cass_statement_bind_int32_by_name(statement, name, value);
3347 CassStatement* statement,
const char* name, cass_int64_t value) {
3348 return cass_statement_bind_int64_by_name(statement, name, value);
3352 CassStatement* statement,
const char* name, CassUuid value) {
3353 return cass_statement_bind_uuid_by_name(statement, name, value);
3357 CassStatement* statement,
const char* name, cass_double_t value) {
3358 return cass_statement_bind_double_by_name(statement, name, value);
3362 CassStatement* statement,
const char* name, CassInet value) {
3363 return cass_statement_bind_inet_by_name(statement, name, value);
3367 CassStatement* statement,
3368 const char* name,
size_t name_length,
const cass_byte_t* value,
3369 size_t value_length) {
3370 return cass_statement_bind_bytes_by_name_n(statement, name, name_length,
3371 value, value_length);
3376 cass_prepared_free(prepared);
3380 const CassPrepared* prepared) {
3381 return cass_prepared_bind(prepared);
3386 return cass_value_type(value);
3390 const char** output,
size_t* output_size) {
3391 return cass_value_get_string(value, output, output_size);
3395 cass_int8_t* output) {
3396 return cass_value_get_int8(value, output);
3400 cass_int16_t* output) {
3401 return cass_value_get_int16(value, output);
3405 cass_int32_t* output) {
3406 return cass_value_get_int32(value, output);
3410 cass_int64_t* output) {
3411 return cass_value_get_int64(value, output);
3416 return cass_value_get_uuid(value, output);
3420 cass_double_t* output) {
3421 return cass_value_get_double(value, output);
3426 return cass_value_get_inet(value, output);
3430 const cass_byte_t** output,
size_t* output_size) {
3431 return cass_value_get_bytes(value, output, output_size);
3435 return cass_value_is_null(value);
3440 const cass_uint8_t* address) {
3441 return cass_inet_init_v4(address);
3445 const cass_uint8_t* address) {
3446 return cass_inet_init_v6(address);
3452 return cass_row_get_column(row, index);
3457 cass_log_set_level(log_level);
3462 cass_log_set_callback(callback, data);
void IncrementTableReadFail(const std::string &table_name)
virtual CassSession * CassSessionNew()
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)
WorkerTask(FunctionPtr func, int task_id, int task_instance)
void IncrementTableWriteStats(const std::string &table_name)
std::string Description() const
static void OnExecuteQueryAsync(CassFuture *future, void *data)
virtual void CassFutureFree(CassFuture *future)
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)=0
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task...
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)=0
static const char * DbDataType2CassType(const GenDb::DbDataType::type &db_type)
boost::scoped_ptr< CqlIfImpl > impl_
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)
std::vector< GenDb::DbDataType::type > DbDataTypeVec
tbb::atomic< bool > initialized_
virtual void CassStatementFree(CassStatement *statement)
virtual const CassResult * CassFutureGetResult(CassFuture *future)=0
void SetRequestTimeout(uint32_t timeout_ms)
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)=0
static GenDb::DbDataValue CassValue2DbDataValue(interface::CassLibrary *cci, const CassValue *cvalue)
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)=0
CassQueryPrinter(std::ostream &os)
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)
void operator()(const boost::uuids::uuid &tuuid) const
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)
virtual CassValueType GetCassValueType(const CassValue *value)=0
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)
tbb::atomic< SessionState::type > session_state_
void OnAsyncRowGetCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbGetRowCb cb)
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
virtual bool Db_GetCumulativeStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)
CassSharedPtr< CassSsl > CassSslPtr
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)=0
std::auto_ptr< GenDb::ColList > row_
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, GenDb::NewColVec *v_columns)
static void encode_uuid(char *output, const CassUuid &uuid)
void operator()(const IpAddress &tipaddr) const
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)=0
void operator()(const uint16_t &tu16, size_t index) const
virtual void CassSessionFree(CassSession *session)
virtual std::vector< GenDb::Endpoint > Db_GetEndpoints() const
static bool PrepareSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassPreparedPtr *prepared)
void operator()(const boost::blank &tblank, size_t index) const
void IncrementTableReadBackPressureFailStats(const std::string &table_name)
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)=0
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)=0
virtual const CassResult * CassFutureGetResult(CassFuture *future)
static void ExecuteQueryAsyncInternal(interface::CassLibrary *cci, CassSession *session, const char *qid, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx=NULL)
virtual void CassFutureWait(CassFuture *future)=0
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)=0
boost::asio::ip::tcp::endpoint Endpoint
static bool ExecuteQuerySync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency)
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)=0
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)
void IncrementTableWriteFail(const std::string &table_name)
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)
void operator()(const std::string &tstring) const
static void AsyncRowGetCompletionCallback(boost::shared_ptr< AsyncRowGetCallbackContext > cb_ctx)
CassSharedPtr< const CassPrepared > CassPreparedPtr
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)=0
virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf)
boost::asio::ip::address IpAddress
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)=0
DbDataTypeVec partition_keys_
void operator()(const IpAddress &tipaddr, size_t index) const
#define CQLIF_DEBUG_TRACE(_Msg)
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)
interface::CassLibrary * cci_
static const char * kTaskName
bool IsTableDynamic(const std::string &table)
virtual CassIterator * CassIteratorFromResult(const CassResult *result)
void operator()(const uint8_t &tu8, size_t index) const
virtual void CassSslFree(CassSsl *ssl)
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, GenDb::NewColVec *v_columns)
const uint8_t * data() const
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)=0
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
static bool StaticCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
static CassConsistency Db2CassConsistency(GenDb::DbConsistency::type dconsistency)
void operator()(const uint32_t &tu32) const
static bool DynamicCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
boost::asio::io_context * io_service()
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)
virtual bool Db_CreateIndex(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode=GenDb::ColIndexMode::NONE)
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)
impl::CassClusterPtr cluster_
void operator()(const std::string &tstring, size_t index) const
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
boost::function< void(void)> FunctionPtr
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)
SandeshTraceBufferPtr CqlTraceErrBuf(SandeshTraceBufferCreate(CQLIF_ERR, 20000))
CassString(const char *data, size_t length)
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)=0
void IncrementErrors(GenDb::IfErrors::Type err_type)
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)=0
virtual CassFuture * CassSessionClose(CassSession *session)=0
interface::CassLibrary * cci_
static const char * kQCreateKeyspaceIfNotExists
impl::CassSessionPtr session_
std::string ToString() const
void operator()(const uint8_t &tu8) const
bool use_prepared_for_insert_
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)
virtual bool Db_GetCqlMetrics(Metrics *metrics) const
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)=0
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
void operator()(const uint64_t &tu64) const
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
void operator()(const double &tdouble, size_t index) const
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)=0
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)=0
static std::string DbDataTypes2CassTypes(const GenDb::DbDataTypeVec &v_db_types)
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)
virtual void CassClusterFree(CassCluster *cluster)
static void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *v_columns)
virtual void CassFutureWait(CassFuture *future)
void operator()(const std::string &tstring, const char *name) const
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
boost::ptr_vector< NewCol > NewColVec
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)=0
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)=0
virtual bool Db_SetTablespace(const std::string &tablespace)
GenDb::DbOpResult::type drc_
virtual bool Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname, GenDb::DbConsistency::type dconsistency)
virtual CassSsl * CassSslNew()
virtual bool Db_GetStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)=0
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)=0
CassPreparedMapType insert_prepared_map_
void IncrementTableWriteBackPressureFailStats(const std::string &table_name)
void IncrementErrors(IfErrors::Type type)
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)
static std::string ToString(Op::type op)
void operator()(const T &t) const
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)=0
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)=0
virtual void CassPreparedFree(const CassPrepared *prepared)
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
void operator()(const boost::uuids::uuid &tuuid, const char *name) const
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
CassStatementNameBinder(interface::CassLibrary *cci, CassStatement *statement)
static void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, GenDb::NewColVec *v_columns)
static const std::string integerToString(const NumberType &num)
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)
static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel)
CassStatement * statement_
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)
static void ExecuteQueryStatementAsync(interface::CassLibrary *cci, CassSession *session, const char *query_id, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb)
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)=0
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)=0
virtual CassSession * CassSessionNew()=0
static TaskScheduler * GetInstance()
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq...
static std::string DbColIndexMode2String(const GenDb::ColIndexMode::type index_mode)
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)
std::vector< DbDataValue > DbDataValueVec
virtual bool Db_AddSetTablespace(const std::string &tablespace, const std::string &replication_factor="1")
DbDataTypeVec clustering_columns_
void operator()(const uint32_t &tu32, const char *name) const
boost::asio::ip::address_v6 Ip6Address
void OnAsyncColumnAddCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb)
static const char * kQUseKeyspace
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)
CassStatementIndexBinder(interface::CassLibrary *cci, CassStatement *statement)
static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc)
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)
virtual CassInet CassInetInitV6(const cass_uint8_t *address)
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassError CassStatementBindBytes(CassStatement *statement, size_t index, const cass_byte_t *value, size_t value_length)
virtual CassError CassStatementBindInt32ByName(CassStatement *statement, const char *name, cass_int32_t value)
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)=0
virtual size_t CassResultColumnCount(const CassResult *result)=0
virtual CassError CassStatementBindUuidByName(CassStatement *statement, const char *name, CassUuid value)
std::string DbDataValueVecToString(const GenDb::DbDataValueVec &v_db_value)
virtual void Db_SetQueueWaterMark(bool high, size_t queue_count, DbQueueWaterMarkCb cb)
void IncrementTableReadBackPressureFail(const std::string &table_name)
virtual CassValueType GetCassValueType(const CassValue *value)
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)=0
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)=0
void IncrementTableRead(const std::string &table_name)
static bool SyncFutureWait(interface::CassLibrary *cci, CassFuture *future)
boost::variant< boost::blank, std::string, uint64_t, uint32_t, boost::uuids::uuid, uint8_t, uint16_t, double, IpAddress, Blob > DbDataValue
boost::scoped_ptr< DbDataValueVec > value
boost::function< void(DbOpResult::type, std::auto_ptr< ColList >)> DbGetRowCb
GenDb::GenDbIf::DbGetRowCb cb_
static bool GetCassTablePartitionKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *rk_count)
static bool GetCassTableClusteringKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *ck_count)
std::map< std::string, GenDb::DbDataType::type > ColumnMap
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)=0
virtual void Db_ResetQueueWaterMarks()
bool StaticCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
virtual void CassResultFree(const CassResult *result)
static void ExecuteQueryAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb)
interface::CassLibrary * cci_
CassString(const char *data)
virtual bool Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname, const std::vector< GenDb::DbDataValueVec > &v_rowkey)
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)=0
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
boost::asio::ip::address_v4 Ip4Address
std::vector< WhereIndexInfo > WhereIndexInfoVec
void operator()(const boost::blank &tblank, const char *name) const
CassQueryPrinter(std::ostream &os, bool quote_strings)
SandeshTraceBufferPtr CqlTraceDebugBuf(SandeshTraceBufferCreate(CQLIF_DEBUG, 10000))
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
static std::string CassSelectFromTableInternal(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec, const GenDb::WhereIndexInfoVec &where_vec)
bool GetMetrics(Metrics *metrics) const
virtual bool Db_GetCqlStats(DbStats *db_stats) const
virtual CassError CassFutureErrorCode(CassFuture *future)=0
virtual CassInet CassInetInitV4(const cass_uint8_t *address)
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)=0
virtual CassCluster * CassClusterNew()
CassStatement * statement_
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)=0
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
void operator()(const uint8_t &tu8, const char *name) const
virtual ~CassDatastaxLibrary()
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
GenDb::GenDbIfStats stats_
static bool ExecuteQueryStatementSync(interface::CassLibrary *cci, CassSession *session, CassStatement *statement, CassConsistency consistency)
virtual cass_bool_t CassValueIsNull(const CassValue *value)=0
virtual CassError CassFutureErrorCode(CassFuture *future)
static void CassLibraryLog(const CassLogMessage *message, void *data)
boost::ptr_vector< ColList > ColListVec
static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level)
virtual CassError CassStatementBindInt64(CassStatement *statement, size_t index, cass_int64_t value)
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)
std::vector< GenDb::Endpoint > endpoints_
virtual void CassLogSetLevel(CassLogLevel log_level)
static bool ExecuteQuerySyncInternal(interface::CassLibrary *cci, CassSession *session, CassStatement *qstatement, CassResultPtr *result, CassConsistency consistency)
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
void operator()(const boost::uuids::uuid &tuuid, size_t index) const
int IsTableStatic(const std::string &table)
virtual CassError CassStatementBindStringN(CassStatement *statement, size_t index, const char *value, size_t value_length)
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)=0
bool IsTablePresent(const std::string &table)
virtual bool Db_GetQueueStats(uint64_t *queue_count, uint64_t *enqueues) const
void IncrementTableWriteFailStats(const std::string &table_name)
boost::function< void(size_t)> DbQueueWaterMarkCb
virtual CassError CassStatementBindDouble(CassStatement *statement, size_t index, cass_double_t value)
static void ExecuteQueryResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx)
static const std::string kQReadRepairChanceDTCS("read_repair_chance = 0.0")
static const CassTableMeta * GetCassTableMeta(interface::CassLibrary *cci, const CassSchemaMeta *schema_meta, const std::string &keyspace, const std::string &table, bool log_error)
std::string GetHostIp(boost::asio::io_context *io_service, const std::string &hostname)
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)=0
static char * decode_uuid(char *input, CassUuid *output)
virtual bool Db_AddColumnSync(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency)
virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf, const std::string &compaction_strategy)
static const std::string kQGCGraceSeconds("gc_grace_seconds = 0")
void IncrementTableReadStats(const std::string &table_name)
std::string CassSelectFromTable(const std::string &table)
static std::string LoadCertFile(const std::string &ca_certs_path)
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
virtual bool Db_AddColumn(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbAddColumnCb cb)
void operator()(const GenDb::Blob &tblob, size_t index) const
boost::scoped_ptr< interface::CassLibrary > cci_
AsyncRowGetCallbackContext(GenDb::GenDbIf::DbGetRowCb cb, GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row)
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)
virtual CassFuture * CassSessionClose(CassSession *session)
void operator()(const GenDb::Blob &tblob, const char *name) const
static bool IsCassTableMetaPresent(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table)
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
void operator()(const uint64_t &tu64, size_t index) const
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
virtual cass_bool_t CassValueIsNull(const CassValue *value)
bool DynamicCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
std::string schema_contact_point_
NewCf::ColumnFamilyType cftype_
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)=0
SandeshTraceBufferPtr CqlTraceInfoBuf(SandeshTraceBufferCreate(CQLIF_INFO, 10000))
void operator()(const uint64_t &tu64, const char *name) const
boost::function< void(DbOpResult::type)> DbAddColumnCb
impl::CassSessionPtr schema_session_
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)
static const char * kQCompactionStrategy("compaction = {'class': ""'org.apache.cassandra.db.compaction.%s'}")
void IncrementTableReadFailStats(const std::string &table_name)
virtual CassError CassStatementBindStringByNameN(CassStatement *statement, const char *name, size_t name_length, const char *value, size_t value_length)
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)
#define CQLIF_INFO_TRACE(_Msg)
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)=0
boost::scoped_ptr< DbDataValueVec > name
CassSharedPtr< const CassResult > CassResultPtr
tbb::atomic< SessionState::type > schema_session_state_
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
void GetDiffs(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
void operator()(const uint32_t &tu32, size_t index) const
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)
virtual void CassLogSetCallback(CassLogCallback callback, void *data)
virtual CassError CassStatementBindInet(CassStatement *statement, size_t index, CassInet value)
virtual CassError CassStatementBindInt64ByName(CassStatement *statement, const char *name, cass_int64_t value)
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)
#define GENERIC_RAW_ARRAY(obj)
virtual CassError CassStatementBindDoubleByName(CassStatement *statement, const char *name, cass_double_t value)
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)=0
virtual void CassSchemaMetaFree(const CassSchemaMeta *schema_meta)
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)=0
virtual bool Db_GetRowAsync(const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbGetRowCb cb)
virtual void CassIteratorFree(CassIterator *iterator)
virtual size_t CassResultColumnCount(const CassResult *result)
Task is a wrapper over tbb::task to support policies.
void operator()(const double &tdouble, const char *name) const
void operator()(const IpAddress &tipaddr, const char *name) const
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)
#define CASS_LIB_TRACE(_Level, _Msg)
virtual CassIterator * CassIteratorFromResult(const CassResult *result)=0
virtual bool Db_GetRow(GenDb::ColList *out, const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency)
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
void GetCumulative(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)
virtual CassError CassStatementBindUuid(CassStatement *statement, size_t index, CassUuid value)
bool IsInsertIntoTablePrepareSupported(const std::string &table)
static bool ExecuteQueryResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassResultPtr *result, CassConsistency consistency)
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)=0
virtual CassError CassStatementBindInetByName(CassStatement *statement, const char *name, CassInet value)
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)
virtual CassError CassStatementBindBytesByNameN(CassStatement *statement, const char *name, size_t name_length, const cass_byte_t *value, size_t value_length)
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)
#define CQLIF_ERR_TRACE(_Msg)
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
bool DisconnectSchemaSync()
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
virtual CassSsl * CassSslNew()=0
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual void Db_SetInitDone(bool)
void operator()(const uint16_t &tu16, const char *name) const
void IncrementTableWrite(const std::string &table_name)
void IncrementTableWriteBackPressureFail(const std::string &table_name)