8 #include <tbb/atomic.h>
9 #include <boost/foreach.hpp>
10 #include <boost/algorithm/string.hpp>
11 #include <boost/algorithm/string/join.hpp>
12 #include <boost/unordered_map.hpp>
13 #include <boost/system/error_code.hpp>
15 #include <linux/version.h>
16 #if defined(RHEL_MAJOR) && (RHEL_MAJOR >= 9)
17 #include <cassandra/cassandra.h>
19 #include <cassandra.h>
30 #include <database/gendb_constants.h>
35 using namespace boost::system;
37 #define CQLIF_DEBUG "CqlTraceBufDebug"
38 #define CQLIF_INFO "CqlTraceBufInfo"
39 #define CQLIF_ERR "CqlTraceBufErr"
48 #define CQLIF_DEBUG_TRACE(_Msg) \
50 std::stringstream _ss; \
51 _ss << __func__ << ":" << __FILE__ << ":" << \
52 __LINE__ << ": " << _Msg; \
53 CQL_TRACE_TRACE(CqlTraceDebugBuf, _ss.str()); \
56 #define CQLIF_INFO_TRACE(_Msg) \
58 std::stringstream _ss; \
59 _ss << __func__ << ":" << __FILE__ << ":" << \
60 __LINE__ << ": " << _Msg; \
61 CQL_TRACE_TRACE(CqlTraceInfoBuf, _ss.str()); \
64 #define CQLIF_ERR_TRACE(_Msg) \
66 std::stringstream _ss; \
67 _ss << __func__ << ":" << __FILE__ << ":" << \
68 __LINE__ << ": " << _Msg; \
69 CQL_TRACE_TRACE(CqlTraceErrBuf, _ss.str()); \
72 #define CASS_LIB_TRACE(_Level, _Msg) \
74 if (_Level == log4cplus::ERROR_LOG_LEVEL) { \
75 CQL_TRACE_TRACE(CqlTraceErrBuf, _Msg); \
76 } else if (_Level == log4cplus::DEBUG_LOG_LEVEL) { \
77 CQL_TRACE_TRACE(CqlTraceDebugBuf, _Msg); \
79 CQL_TRACE_TRACE(CqlTraceInfoBuf, _Msg); \
83 #define CQLIF_LOG(_Level, _Msg) \
85 if (LoggingDisabled()) break; \
86 log4cplus::Logger logger = log4cplus::Logger::getRoot(); \
87 LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \
88 __LINE__ << ": " << _Msg); \
91 #define CQLIF_LOG_ERR(_Msg) \
93 LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \
110 length(strlen(data)) {
124 uint64_t time_and_version =
uuid.time_and_version;
125 output[3] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
126 time_and_version >>= 8;
127 output[2] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
128 time_and_version >>= 8;
129 output[1] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
130 time_and_version >>= 8;
131 output[0] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
132 time_and_version >>= 8;
134 output[5] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
135 time_and_version >>= 8;
136 output[4] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
137 time_and_version >>= 8;
139 output[7] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
140 time_and_version >>= 8;
141 output[6] =
static_cast<char>(time_and_version & 0x000000000000000FFLL);
143 uint64_t clock_seq_and_node =
uuid.clock_seq_and_node;
144 for (
size_t i = 0; i < 8; ++i) {
145 output[15 - i] =
static_cast<char>(clock_seq_and_node & 0x00000000000000FFL);
146 clock_seq_and_node >>= 8;
151 output->time_and_version =
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[3]));
152 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[2])) << 8;
153 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[1])) << 16;
154 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[0])) << 24;
156 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[5])) << 32;
157 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[4])) << 40;
159 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[7])) << 48;
160 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[6])) << 56;
162 output->clock_seq_and_node = 0;
163 for (
size_t i = 0; i < 8; ++i) {
164 output->clock_seq_and_node |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[15 - i])) << (8 * i);
172 case GenDb::DbDataType::AsciiType:
174 case GenDb::DbDataType::LexicalUUIDType:
176 case GenDb::DbDataType::TimeUUIDType:
178 case GenDb::DbDataType::Unsigned8Type:
179 case GenDb::DbDataType::Unsigned16Type:
180 case GenDb::DbDataType::Unsigned32Type:
182 case GenDb::DbDataType::Unsigned64Type:
184 case GenDb::DbDataType::DoubleType:
186 case GenDb::DbDataType::UTF8Type:
188 case GenDb::DbDataType::InetType:
190 case GenDb::DbDataType::IntegerType:
192 case GenDb::DbDataType::BlobType:
195 assert(
false &&
"Invalid data type");
202 assert(!v_db_types.empty());
208 switch (dconsistency) {
210 return CASS_CONSISTENCY_ANY;
212 return CASS_CONSISTENCY_ONE;
214 return CASS_CONSISTENCY_TWO;
216 return CASS_CONSISTENCY_THREE;
218 return CASS_CONSISTENCY_QUORUM;
220 return CASS_CONSISTENCY_ALL;
222 return CASS_CONSISTENCY_LOCAL_QUORUM;
224 return CASS_CONSISTENCY_EACH_QUORUM;
226 return CASS_CONSISTENCY_SERIAL;
228 return CASS_CONSISTENCY_LOCAL_SERIAL;
230 return CASS_CONSISTENCY_LOCAL_ONE;
233 return CASS_CONSISTENCY_UNKNOWN;
242 quote_strings_(quote_strings) {
246 quote_strings_(true) {
253 os_ << to_string(tuuid);
258 os_ << (uint16_t)tu8;
261 if (quote_strings_) {
262 os_ <<
"'" << tstring <<
"'";
269 os_ << (int32_t)tu32;
273 os_ << (int64_t)tu64;
276 os_ <<
"'" << tipaddr <<
"'";
288 CassStatement *statement) :
290 statement_(statement) {
292 void operator()(
const boost::blank &tblank,
size_t index)
const {
293 assert(
false &&
"CassStatement bind to boost::blank not supported");
295 void operator()(
const std::string &tstring,
size_t index)
const {
296 CassError rc(cci_->CassStatementBindStringN(statement_, index,
297 tstring.c_str(), tstring.length()));
298 assert(rc == CASS_OK);
303 CassError rc(cci_->CassStatementBindUuid(statement_, index, cuuid));
304 assert(rc == CASS_OK);
307 CassError rc(cci_->CassStatementBindInt32(statement_, index, tu8));
308 assert(rc == CASS_OK);
311 CassError rc(cci_->CassStatementBindInt32(statement_, index, tu16));
312 assert(rc == CASS_OK);
315 assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
316 CassError rc(cci_->CassStatementBindInt32(statement_, index,
317 (cass_int32_t)tu32));
318 assert(rc == CASS_OK);
321 assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
322 CassError rc(cci_->CassStatementBindInt64(statement_, index,
323 (cass_int64_t)tu64));
324 assert(rc == CASS_OK);
327 CassError rc(cci_->CassStatementBindDouble(statement_, index,
328 (cass_double_t)tdouble));
329 assert(rc == CASS_OK);
333 if (tipaddr.is_v4()) {
334 boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
337 boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
340 CassError rc(cci_->CassStatementBindInet(statement_, index,
342 assert(rc == CASS_OK);
345 CassError rc(cci_->CassStatementBindBytes(statement_, index,
347 assert(rc == CASS_OK);
356 CassStatement *statement) :
358 statement_(statement) {
360 void operator()(
const boost::blank &tblank,
const char *name)
const {
361 assert(
false &&
"CassStatement bind to boost::blank not supported");
363 void operator()(
const std::string &tstring,
const char *name)
const {
364 CassError rc(cci_->CassStatementBindStringByNameN(statement_, name,
365 strlen(name), tstring.c_str(), tstring.length()));
366 assert(rc == CASS_OK);
371 CassError rc(cci_->CassStatementBindUuidByName(statement_, name,
373 assert(rc == CASS_OK);
375 void operator()(
const uint8_t &tu8,
const char *name)
const {
376 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
378 assert(rc == CASS_OK);
380 void operator()(
const uint16_t &tu16,
const char *name)
const {
381 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
383 assert(rc == CASS_OK);
385 void operator()(
const uint32_t &tu32,
const char *name)
const {
386 assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
387 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
388 (cass_int32_t)tu32));
389 assert(rc == CASS_OK);
391 void operator()(
const uint64_t &tu64,
const char *name)
const {
392 assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
393 CassError rc(cci_->CassStatementBindInt64ByName(statement_, name,
394 (cass_int64_t)tu64));
395 assert(rc == CASS_OK);
397 void operator()(
const double &tdouble,
const char *name)
const {
398 CassError rc(cci_->CassStatementBindDoubleByName(statement_, name,
399 (cass_double_t)tdouble));
400 assert(rc == CASS_OK);
404 if (tipaddr.is_v4()) {
405 boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
408 boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
411 CassError rc(cci_->CassStatementBindInetByName(statement_, name,
413 assert(rc == CASS_OK);
416 CassError rc(cci_->CassStatementBindBytesByNameN(statement_, name,
417 strlen(name), tblob.
data(), tblob.
size()));
418 assert(rc == CASS_OK);
425 "compaction = {'class': "
426 "'org.apache.cassandra.db.compaction.%s'}");
429 "read_repair_chance = 0.0");
436 const std::string &compaction_strategy) {
437 std::ostringstream query;
439 query <<
"CREATE TABLE IF NOT EXISTS " << cf.
cfname_ <<
" ";
442 assert(rkeys.size() == 1);
447 assert(!cfcolumns.empty());
448 BOOST_FOREACH(
const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
450 query <<
", \"" << cfcolumn.first <<
"\" " <<
455 compaction_strategy.c_str()));
456 assert(!(n < 0 || n >= (
int)
sizeof(cbuf)));
463 if (compaction_strategy ==
464 GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
465 query <<
") WITH " << std::string(cbuf) <<
" AND " <<
468 query <<
") WITH " << std::string(cbuf) <<
" AND " <<
476 const std::string &compaction_strategy,
477 boost::system::error_code *ec) {
478 std::ostringstream query;
480 *ec = errc::make_error_code(errc::success);
484 *ec = errc::make_error_code(errc::invalid_argument);
489 query <<
"CREATE TABLE IF NOT EXISTS " << cf.
cfname_ <<
" (";
492 int rk_size(rkeys.size());
493 for (
int i = 0; i < rk_size; i++) {
496 query <<
"key" << key_num;
504 int ccn_size(clustering_columns.size());
505 for (
int i = 0; i < ccn_size; i++) {
507 query <<
"column" << cnum <<
" " <<
512 int cn_size(columns.size());
513 for (
int i = 0; i < cn_size; i++) {
514 int cnum(i + 1 + ccn_size);
515 query <<
"column" << cnum <<
" " <<
520 if (values.size() > 0) {
524 query <<
"PRIMARY KEY (";
525 std::ostringstream rkey_ss;
526 for (
int i = 0; i < rk_size; i++) {
529 rkey_ss <<
", key" << key_num;
535 query <<
"(" << rkey_ss.str() <<
"), ";
537 query << rkey_ss.str() <<
", ";
539 for (
int i = 0; i < ccn_size; i++) {
544 query <<
"column" << cnum;
548 compaction_strategy.c_str()));
549 assert(!(n < 0 || n >= (
int)
sizeof(cbuf)));
556 if (compaction_strategy ==
557 GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
558 query <<
")) WITH " << std::string(cbuf) <<
" AND " <<
561 query <<
")) WITH " << std::string(cbuf) <<
" AND " <<
569 switch (index_mode) {
572 case GenDb::ColIndexMode::PREFIX:
574 case GenDb::ColIndexMode::CONTAINS:
577 assert(
false &&
"INVALID");
586 const std::string &column,
const std::string &indexname,
588 std::ostringstream query;
595 query <<
"INDEX IF NOT EXISTS " << indexname <<
" ";
597 query <<
"ON " << cfname <<
"(\""<< column <<
"\")";
600 query <<
" USING \'org.apache.cassandra.index.sasi.SASIIndex\' " <<
612 std::ostringstream query;
614 const std::string &table(v_columns->
cfname_);
615 query <<
"INSERT INTO " << table <<
" (";
616 std::ostringstream values_ss;
617 values_ss <<
"VALUES (";
621 int rk_size(rkeys.size());
622 for (
int i = 0; i < rk_size; i++) {
625 query <<
", key" << key_num;
632 boost::apply_visitor(values_printer, rkeys[i]);
642 assert(cnames.size() == 1);
645 boost::apply_visitor(cnames_printer, cnames[0]);
650 assert(cvalues.size() == 1);
651 boost::apply_visitor(values_printer, cvalues[0]);
657 query << values_ss.str();
659 query <<
" USING TTL " << cttl;
665 std::ostringstream query;
667 const std::string &table(v_columns->
cfname_);
668 query <<
"INSERT INTO " << table <<
" (";
669 std::ostringstream values_ss;
672 int rk_size(rkeys.size());
674 for (
int i = 0; i < rk_size; i++) {
677 query <<
", key" << key_num;
681 boost::apply_visitor(values_printer, rkeys[i]);
686 assert(columns.size() == 1);
691 int cn_size(cnames.size());
692 for (
int i = 0; i < cn_size; i++) {
695 query <<
", column" << cnum;
696 boost::apply_visitor(values_printer, cnames[i]);
697 if (i != cn_size - 1) {
704 if (cvalues.size() > 0) {
705 query <<
", value) VALUES (";
707 boost::apply_visitor(values_printer, cvalues[0]);
709 query <<
") VALUES (";
712 query << values_ss.str();
713 if (column.
ttl > 0) {
714 query <<
" USING TTL " << column.
ttl;
724 std::ostringstream query;
726 query <<
"INSERT INTO " << cf.
cfname_ <<
" ";
729 assert(rkeys.size() == 1);
730 std::ostringstream values_ss;
732 values_ss <<
") VALUES (?";
735 assert(!cfcolumns.empty());
736 BOOST_FOREACH(
const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
738 query <<
", \"" << cfcolumn.first <<
"\"";
741 query << values_ss.str();
742 query <<
") USING TTL ?";
747 boost::system::error_code *ec) {
748 std::ostringstream query;
750 *ec = errc::make_error_code(errc::success);
754 *ec = errc::make_error_code(errc::invalid_argument);
759 query <<
"INSERT INTO " << cf.
cfname_ <<
" (";
762 int rk_size(rkeys.size());
763 std::ostringstream values_ss;
764 for (
int i = 0; i < rk_size; i++) {
767 query <<
"key" << key_num;
776 int ccn_size(clustering_columns.size());
777 for (
int i = 0; i < ccn_size; i++) {
779 query <<
"column" << cnum;
781 if (i != ccn_size - 1) {
788 int cn_size(columns.size());
793 for (
int i = 0; i < cn_size; i++) {
794 int cnum(i + 1 + ccn_size);
795 query <<
"column" << cnum;
797 if (i != cn_size - 1) {
804 if (values.size() > 0) {
808 query <<
") VALUES (";
810 query << values_ss.str();
811 query <<
" USING TTL ?";
820 CassStatement *statement,
825 int rk_size(rkeys.size());
827 for (; (int) idx < rk_size; idx++) {
830 int key_num(idx + 1);
835 boost::apply_visitor(boost::bind(values_binder, _1, rk_name.c_str()),
843 assert(cnames.size() == 1);
845 std::string cname(boost::get<std::string>(cnames[0]));
847 assert(cvalues.size() == 1);
848 boost::apply_visitor(boost::bind(values_binder, _1, cname.c_str()),
855 (cass_int32_t)cttl));
856 assert(rc == CASS_OK);
861 CassStatement *statement,
866 int rk_size(rkeys.size());
868 for (; (int) idx < rk_size; idx++) {
869 boost::apply_visitor(boost::bind(values_binder, _1, idx), rkeys[idx]);
873 assert(columns.size() == 1);
878 int cn_size(cnames.size());
879 for (
int i = 0; i < cn_size; i++, idx++) {
880 boost::apply_visitor(boost::bind(values_binder, _1, idx), cnames[i]);
884 if (cvalues.size() > 0) {
885 boost::apply_visitor(boost::bind(values_binder, _1, idx++),
889 (cass_int32_t)column.
ttl));
890 assert(rc == CASS_OK);
895 const std::vector<GenDb::DbDataValueVec> &rkeys,
899 std::ostringstream query;
901 if (read_vec.empty()) {
902 query <<
"SELECT * FROM " << table;
905 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
906 it != read_vec.end(); it++) {
907 query << it->get<0>() <<
",";
908 bool read_timestamp = it->get<3>();
909 if (read_timestamp) {
910 query <<
"WRITETIME(" << it->get<0>() <<
"),";
913 query.seekp(-1, query.cur);
914 query <<
" FROM " << table;
916 if (rkeys.size() == 1) {
918 int rk_size(rkey.size());
920 for (
int i = 0; i < rk_size; i++) {
923 query <<
" AND key" << key_num <<
"=";
925 query <<
" WHERE key=";
927 boost::apply_visitor(cprinter, rkey[i]);
930 }
else if (rkeys.size() > 1) {
931 query <<
" WHERE key IN (";
933 int rk_size(rkey.size());
934 assert(rk_size == 1);
936 boost::apply_visitor(cprinter, rkey[0]);
939 query.seekp(-1, query.cur);
942 if (!where_vec.empty()) {
943 for (GenDb::WhereIndexInfoVec::const_iterator it = where_vec.begin();
944 it != where_vec.end(); ++it) {
945 std::ostringstream value_ss;
947 boost::apply_visitor(value_vprinter, it->get<2>());
949 query <<
" " << it->get<0>();
951 query <<
" " << value_ss.str();
955 if (!ck_range.
start_.empty()) {
956 int ck_start_size(ck_range.
start_.size());
957 std::ostringstream start_ss;
961 for (
int i = 0; i < ck_start_size; i++) {
967 query <<
"column" << cnum;
968 boost::apply_visitor(start_vprinter, ck_range.
start_[i]);
972 query << start_ss.str();
974 if (!ck_range.
finish_.empty()) {
975 int ck_finish_size(ck_range.
finish_.size());
976 std::ostringstream finish_ss;
981 for (
int i = 0; i < ck_finish_size; i++) {
987 query <<
"column" << cnum;
988 boost::apply_visitor(finish_vprinter, ck_range.
finish_[i]);
992 query << finish_ss.str();
995 query <<
" LIMIT " << ck_range.
count_;
998 if (where_vec.size() > 1) {
999 query <<
" ALLOW FILTERING";
1009 std::vector<GenDb::DbDataValueVec> rkey_vec;
1010 rkey_vec.push_back(rkeys);
1012 read_vec, where_vec);
1017 std::vector<GenDb::DbDataValueVec> rkey_vec;
1018 rkey_vec.push_back(rkeys);
1028 std::vector<GenDb::DbDataValueVec> rkey_vec;
1029 rkey_vec.push_back(rkeys);
1035 const std::string &table,
const std::vector<GenDb::DbDataValueVec> &rkeys,
1043 std::vector<GenDb::DbDataValueVec> rkey_vec;
1056 case CASS_VALUE_TYPE_ASCII:
1057 case CASS_VALUE_TYPE_VARCHAR:
1058 case CASS_VALUE_TYPE_TEXT: {
1062 assert(rc == CASS_OK);
1063 return std::string(ctstring.
data, ctstring.
length);
1065 case CASS_VALUE_TYPE_UUID: {
1068 assert(rc == CASS_OK);
1073 case CASS_VALUE_TYPE_DOUBLE: {
1074 cass_double_t ctdouble;
1076 assert(rc == CASS_OK);
1077 return (
double)ctdouble;
1079 case CASS_VALUE_TYPE_TINY_INT: {
1082 assert(rc == CASS_OK);
1083 return (uint8_t)ct8;
1085 case CASS_VALUE_TYPE_SMALL_INT: {
1088 assert(rc == CASS_OK);
1089 return (uint16_t)ct16;
1091 case CASS_VALUE_TYPE_INT: {
1094 assert(rc == CASS_OK);
1095 return (uint32_t)ct32;
1097 case CASS_VALUE_TYPE_BIGINT: {
1100 assert(rc == CASS_OK);
1101 return (uint64_t)ct64;
1103 case CASS_VALUE_TYPE_INET: {
1106 assert(rc == CASS_OK);
1108 if (ctinet.address_length == CASS_INET_V4_LENGTH) {
1109 Ip4Address::bytes_type ipv4;
1112 }
else if (ctinet.address_length == CASS_INET_V6_LENGTH) {
1113 Ip6Address::bytes_type ipv6;
1121 case CASS_VALUE_TYPE_BLOB: {
1122 const cass_byte_t *bytes(NULL);
1125 assert(rc == CASS_OK);
1128 case CASS_VALUE_TYPE_UNKNOWN: {
1134 assert(
false &&
"Unhandled value type");
1141 CassSession *session,
const char* query,
1148 if (rc != CASS_OK) {
1157 return rc == CASS_OK;
1161 CassSession *session,
1163 CassConsistency consistency) {
1169 if (rc != CASS_OK) {
1180 return rc == CASS_OK;
1184 CassSession *session,
const char *query, CassConsistency consistency) {
1192 CassSession *session,
const char *query,
1201 CassSession *session, CassStatement *statement,
1202 CassConsistency consistency) {
1211 case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1212 case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
1213 case CASS_ERROR_LIB_NO_AVAILABLE_IO_THREAD:
1231 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1232 it != read_vec.end(); it++) {
1233 bool row_key = it->get<1>();
1234 bool row_column = it->get<2>();
1235 bool read_timestamp = it->get<3>();
1244 cnames->push_back(db_value);
1246 values->push_back(db_value);
1247 if (read_timestamp) {
1252 timestamps->push_back(time_value);
1258 v_columns->push_back(column);
1265 std::auto_ptr<GenDb::ColList> col_list;
1275 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1276 it != read_vec.end(); it++) {
1277 bool row_key = it->get<1>();
1278 bool row_column = it->get<2>();
1279 bool read_timestamp = it->get<3>();
1285 rkey.push_back(db_value);
1293 cnames->push_back(db_value);
1295 values->push_back(db_value);
1296 if (read_timestamp) {
1301 timestamps->push_back(time_value);
1308 if (!col_list.get()) {
1310 col_list->rowkey_ = rkey;
1312 if (rkey != col_list->rowkey_) {
1313 v_col_list->push_back(col_list.release());
1315 col_list->rowkey_ = rkey;
1318 v_columns->push_back(column);
1320 if (col_list.get()) {
1321 v_col_list->push_back(col_list.release());
1336 for (
size_t i = rk_count; i < rk_count + ck_count; i++) {
1340 cnames->push_back(db_value);
1344 for (
size_t i = rk_count + ck_count; i < ccount; i++) {
1348 values->push_back(db_value);
1351 v_columns->push_back(column);
1358 std::auto_ptr<GenDb::ColList> col_list;
1367 for (
size_t i = 0; i < rk_count; i++) {
1371 rkey.push_back(db_value);
1375 for (
size_t i = rk_count; i < rk_count + ck_count; i++) {
1379 cnames->push_back(db_value);
1383 for (
size_t i = rk_count + ck_count; i < ccount; i++) {
1387 values->push_back(db_value);
1391 if (!col_list.get()) {
1393 col_list->rowkey_ = rkey;
1395 if (rkey != col_list->rowkey_) {
1396 v_col_list->push_back(col_list.release());
1398 col_list->rowkey_ = rkey;
1401 v_columns->push_back(column);
1403 if (col_list.get()) {
1404 v_col_list->push_back(col_list.release());
1416 for (
size_t i = 0; i < ccount; i++) {
1420 assert(rc == CASS_OK);
1428 std::string(cname.
data, cname.
length), db_value, 0));
1429 v_columns->push_back(column);
1436 std::auto_ptr<GenDb::ColList> col_list;
1445 for (
size_t i = 0; i < rk_count; i++) {
1449 rkey.push_back(db_value);
1452 if (!col_list.get()) {
1454 col_list->rowkey_ = rkey;
1456 if (rkey != col_list->rowkey_) {
1457 v_col_list->push_back(col_list.release());
1459 col_list->rowkey_ = rkey;
1462 for (
size_t i = 0; i < ccount; i++) {
1466 assert(rc == CASS_OK);
1474 std::string(cname.
data, cname.
length), db_value, 0));
1475 v_columns->push_back(column);
1478 if (col_list.get()) {
1479 v_col_list->push_back(col_list.release());
1485 std::auto_ptr<CassAsyncQueryContext> ctx(
1486 boost::reinterpret_pointer_cast<CassAsyncQueryContext>(data));
1490 if (rc != CASS_OK) {
1495 ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1498 if (ctx->result_ctx_) {
1504 col_list->cfname_ = rctx->
cf_name_;
1505 col_list->rowkey_ = rctx->
row_key_;
1512 ctx->cb_(db_rc, col_list);
1516 ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1520 CassSession *session,
const char *qid, CassStatement *qstatement,
1526 std::auto_ptr<CassAsyncQueryContext> ctx(
1533 CassSession *session,
const char *query,
1542 CassSession *session,
const char *query_id, CassStatement *qstatement,
1549 CassSession *session,
const char *query, CassConsistency consistency,
1553 consistency, cb, rctx);
1557 CassSession *session,
const char *query, CassConsistency consistency,
1560 std::auto_ptr<CassQueryResultContext> rctx(
1562 rk_count, ck_count));
1569 CassSession *session,
const char *query,
1583 CassSession *session,
const char *query,
1597 CassSession *session,
const char *query,
1598 size_t rk_count,
size_t ck_count, CassConsistency consistency,
1611 CassSession *session,
const char *query,
1612 size_t rk_count,
size_t ck_count, CassConsistency consistency,
1625 CassSession *session,
const char *query,
1628 std::auto_ptr<CassQueryResultContext> rctx(
1636 CassSession *session,
const char *query,
1649 CassSession *session,
const char *query,
size_t rk_count,
1662 CassFuture *future) {
1665 if (rc != CASS_OK) {
1671 return rc == CASS_OK;
1676 const std::string &keyspace,
const std::string &table,
bool log_error) {
1677 const CassKeyspaceMeta *keyspace_meta(
1679 if (keyspace_meta == NULL) {
1682 ", Table: " << table);
1686 std::string table_lower(table);
1687 boost::algorithm::to_lower(table_lower);
1688 const CassTableMeta *table_meta(
1690 if (table_meta == NULL) {
1693 ", Table: " << table_lower);
1701 CassSession *session,
1702 const std::string &keyspace,
const std::string &table) {
1705 if (schema_meta.get() == NULL) {
1707 ", Table: " << table);
1710 bool log_error(
false);
1712 schema_meta.get(), keyspace, table, log_error));
1713 if (table_meta == NULL) {
1721 CassSession *session,
const std::string &keyspace,
1722 const std::string &table,
size_t *ck_count) {
1725 if (schema_meta.get() == NULL) {
1727 ", Table: " << table);
1730 bool log_error(
true);
1732 schema_meta.get(), keyspace, table, log_error));
1733 if (table_meta == NULL) {
1742 const std::string &keyspace,
const std::string &table,
size_t *rk_count) {
1745 if (schema_meta.get() == NULL) {
1747 ", Table: " << table);
1750 bool log_error(
true);
1752 schema_meta.get(), keyspace, table, log_error));
1753 if (table_meta == NULL) {
1762 case CASS_LOG_DISABLED:
1763 return log4cplus::OFF_LOG_LEVEL;
1764 case CASS_LOG_CRITICAL:
1765 return log4cplus::FATAL_LOG_LEVEL;
1766 case CASS_LOG_ERROR:
1767 return log4cplus::ERROR_LOG_LEVEL;
1769 return log4cplus::WARN_LOG_LEVEL;
1771 return log4cplus::INFO_LOG_LEVEL;
1772 case CASS_LOG_DEBUG:
1773 return log4cplus::DEBUG_LOG_LEVEL;
1774 case CASS_LOG_TRACE:
1775 return log4cplus::TRACE_LOG_LEVEL;
1777 return log4cplus::ALL_LOG_LEVEL;
1783 case log4cplus::OFF_LOG_LEVEL:
1784 return CASS_LOG_DISABLED;
1785 case log4cplus::FATAL_LOG_LEVEL:
1786 return CASS_LOG_CRITICAL;
1787 case log4cplus::ERROR_LOG_LEVEL:
1788 return CASS_LOG_ERROR;
1789 case log4cplus::WARN_LOG_LEVEL:
1790 return CASS_LOG_WARN;
1791 case log4cplus::INFO_LOG_LEVEL:
1792 return CASS_LOG_INFO;
1793 case log4cplus::DEBUG_LOG_LEVEL:
1794 return CASS_LOG_DEBUG;
1795 case log4cplus::TRACE_LOG_LEVEL:
1796 return CASS_LOG_TRACE;
1798 assert(
false &&
"Invalid Log4Level");
1799 return CASS_LOG_DISABLED;
1807 log4cplus::LogLevel log4level(
Cass2log4Level(message->severity));
1808 std::stringstream buf;
1809 buf <<
"CassLibrary: " << message->file <<
":" << message->line <<
1810 " " << message->function <<
"] " << message->message;
1815 if (ca_certs_path.length() == 0) {
1816 return std::string();
1818 std::ifstream file(ca_certs_path.c_str());
1820 return std::string();
1822 std::string content((std::istreambuf_iterator<char>(file)),
1823 std::istreambuf_iterator<char>());
1832 Task(task_id, task_instance),
1840 return "cass::cql::impl::WorkerTask";
1852 const std::vector<std::string> &cassandra_ips,
1854 const std::string &cassandra_user,
1855 const std::string &cassandra_password,
1857 const std::string &ca_certs_path,
1861 cluster_(cci_->CassClusterNew(), cci_),
1863 session_(cci_->CassSessionNew(), cci_),
1864 schema_session_(cci_->CassSessionNew(), cci_),
1866 io_thread_count_(2) {
1871 if (cassandra_ips.size() > 0) {
1873 boost::system::error_code ec;
1874 boost::asio::ip::address::from_string(cassandra_ips[0], ec);
1875 if(ec.value() != 0){
1886 if (content.length() == 0) {
1893 std::string contact_points(boost::algorithm::join(cassandra_ips,
","));
1897 if (!cassandra_user.empty() && !cassandra_password.empty()) {
1899 cassandra_password.c_str());
1917 const std::string &replication_factor, CassConsistency consistency) {
1923 keyspace.c_str(), replication_factor.c_str()));
1924 if (n < 0 || n >= (
int)
sizeof(buf)) {
1926 keyspace <<
", RF: " << replication_factor);
1934 CassConsistency consistency) {
1939 int n(snprintf(buf,
sizeof(buf),
kQUseKeyspace, keyspace.c_str()));
1940 if (n < 0 || n >= (
int)
sizeof(buf)) {
1956 CassConsistency consistency) {
1961 int n(snprintf(buf,
sizeof(buf),
kQUseKeyspace, keyspace.c_str()));
1962 if (n < 0 || n >= (
int)
sizeof(buf)) {
1978 const std::string &compaction_strategy, CassConsistency consistency) {
1990 compaction_strategy);
1995 boost::system::error_code ec;
1997 compaction_strategy, &ec);
2013 const std::string &column,
const std::string &indexname,
2019 indexname, index_mode));
2025 const std::string &table_name(cf.
cfname_);
2038 std::make_pair(table_name, prepared))).second;
2046 CassPreparedMapType::const_iterator it(
2051 *prepared = it->second;
2072 if (ck_count == 0) {
2088 query.c_str(), consistency, cb, cfname.c_str(), rkey);
2097 query.c_str(), consistency, cb, rk_count, ck_count,
2115 rkey, ck_range, where_vec, read_vec));
2124 query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2146 query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2155 CassConsistency consistency) {
2175 (table !=
"MessageTablev2");
2188 query.c_str(), consistency, out);
2197 query.c_str(), rk_count, ck_count, consistency, out);
2214 query.c_str(), rk_count, consistency, out);
2220 query.c_str(), rk_count, ck_count, consistency, out);
2237 rkey, ck_range, read_vec));
2240 query.c_str(), read_vec, consistency, out);
2244 const std::vector<GenDb::DbDataValueVec> &rkeys,
2253 rkeys, ck_range, read_vec));
2256 query.c_str(), read_vec, consistency, out);
2278 query.c_str(), rk_count, ck_count, consistency, out);
2365 CassMetrics cass_metrics;
2368 metrics->requests.min = cass_metrics.requests.min;
2369 metrics->requests.max = cass_metrics.requests.max;
2370 metrics->requests.mean = cass_metrics.requests.mean;
2371 metrics->requests.stddev = cass_metrics.requests.stddev;
2372 metrics->requests.median = cass_metrics.requests.median;
2373 metrics->requests.percentile_75th =
2374 cass_metrics.requests.percentile_75th;
2375 metrics->requests.percentile_95th =
2376 cass_metrics.requests.percentile_95th;
2377 metrics->requests.percentile_98th =
2378 cass_metrics.requests.percentile_98th;
2379 metrics->requests.percentile_99th =
2380 cass_metrics.requests.percentile_99th;
2381 metrics->requests.percentile_999th =
2382 cass_metrics.requests.percentile_999th;
2383 metrics->requests.mean_rate = cass_metrics.requests.mean_rate;
2384 metrics->requests.one_minute_rate =
2385 cass_metrics.requests.one_minute_rate;
2386 metrics->requests.five_minute_rate =
2387 cass_metrics.requests.five_minute_rate;
2388 metrics->requests.fifteen_minute_rate =
2389 cass_metrics.requests.fifteen_minute_rate;
2391 metrics->stats.total_connections =
2392 cass_metrics.stats.total_connections;
2393 metrics->stats.available_connections =
2394 cass_metrics.stats.available_connections;
2395 metrics->stats.exceeded_pending_requests_water_mark =
2396 cass_metrics.stats.exceeded_pending_requests_water_mark;
2397 metrics->stats.exceeded_write_bytes_water_mark =
2398 cass_metrics.stats.exceeded_write_bytes_water_mark;
2400 metrics->errors.connection_timeouts =
2401 cass_metrics.errors.connection_timeouts;
2402 metrics->errors.pending_request_timeouts =
2403 cass_metrics.errors.pending_request_timeouts;
2404 metrics->errors.request_timeouts =
2405 cass_metrics.errors.request_timeouts;
2410 CassConsistency consistency,
bool sync,
2447 boost::system::error_code ec;
2449 if (ec.value() != boost::system::errc::success) {
2464 std::auto_ptr<GenDb::ColList> v_columns,
2465 CassConsistency consistency,
bool sync,
2474 v_columns->cfname_);
2493 qstatement.get(), consistency);
2495 std::string qid(
"Prepare: " + v_columns->cfname_);
2497 qstatement.get(), consistency, cb);
2503 "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH "
2504 "replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }");
2512 const std::vector<std::string> &cassandra_ips,
2514 const std::string &cassandra_user,
2515 const std::string &cassandra_password,
2517 const std::string &ca_certs_path,
2518 bool create_schema) :
2519 cci_(new interface::CassDatastaxLibrary),
2520 impl_(new
CqlIfImpl(
evm, cassandra_ips, cassandra_port,
2521 cassandra_user, cassandra_password, use_ssl,
2522 ca_certs_path, cci_.get())),
2523 use_prepared_for_insert_(true),
2524 create_schema_(create_schema) {
2527 log4cplus::Logger::getRoot().getLogLevel()));
2530 BOOST_FOREACH(
const std::string &cassandra_ip, cassandra_ips) {
2531 boost::system::error_code ec;
2532 boost::asio::ip::address cassandra_addr(
2548 impl_->SetRequestTimeout(GenDb::g_gendb_constants.SCHEMA_REQUEST_TIMEOUT);
2549 bool success(
impl_->ConnectSchemaSync());
2554 return impl_->ConnectSync();
2559 impl_->DisconnectSchemaSync();
2561 impl_->DisconnectSync();
2569 impl_->SetRequestTimeout(GenDb::g_gendb_constants.DEFAULT_REQUEST_TIMEOUT);
2570 impl_->DisconnectSchemaSync();
2577 const std::string &replication_factor) {
2578 bool success(
impl_->CreateKeyspaceIfNotExistsSync(tablespace,
2579 replication_factor, CASS_CONSISTENCY_QUORUM));
2584 success =
impl_->UseKeyspaceSyncOnSchemaSession(tablespace,
2585 CASS_CONSISTENCY_ONE);
2594 bool success(
impl_->UseKeyspaceSync(tablespace, CASS_CONSISTENCY_ONE));
2604 const std::string &compaction_strategy) {
2606 impl_->CreateTableIfNotExistsSync(cf, compaction_strategy,
2607 CASS_CONSISTENCY_QUORUM));
2614 success =
impl_->LocatePrepareInsertIntoTable(cf);
2631 bool success(
impl_->IsTablePresent(cfname));
2643 const std::string &column,
const std::string &indexname,
2645 bool success(
impl_->CreateIndexIfNotExistsSync(cfname, column, indexname,
2646 CASS_CONSISTENCY_QUORUM, index_mode));
2657 std::auto_ptr<GenDb::ColList> row,
2682 std::auto_ptr<GenDb::ColList>
row_;
2686 boost::shared_ptr<AsyncRowGetCallbackContext> cb_ctx) {
2687 cb_ctx->cb_(cb_ctx->drc_, cb_ctx->row_);
2691 std::auto_ptr<GenDb::ColList> row, std::string cfname,
2693 int task_instance) {
2705 boost::shared_ptr<AsyncRowGetCallbackContext> ctx(
2709 task_id, task_instance));
2721 std::auto_ptr<GenDb::ColList> row, std::string cfname,
2728 std::string cfname(cl->cfname_);
2737 impl_->IsInsertIntoTablePrepareSupported(cfname)) {
2738 success =
impl_->InsertIntoTablePrepareAsync(cl, consistency,
2742 success =
impl_->InsertIntoTableAsync(cl, consistency,
2756 std::string cfname(cl->cfname_);
2758 bool success(
impl_->InsertIntoTableSync(cl, consistency));
2773 bool success(
impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2775 _1, _2, cfname, cb)));
2788 bool success(
impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2790 _1, _2, cfname, cb,
true, task_id, task_instance)));
2803 bool success(
impl_->SelectFromTableAsync(cfname, rowkey,
2818 bool success(
impl_->SelectFromTableAsync(cfname, rowkey,
2820 cfname, cb,
true, task_id, task_instance)));
2833 bool success(
impl_->SelectFromTableClusteringKeyRangeAndIndexValueAsync(cfname,
2847 bool success(
impl_->SelectFromTableSync(cfname, rowkey,
2864 bool success(
impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2865 rowkey, crange, consistency, read_vec, &out->
columns_));
2876 const std::vector<GenDb::DbDataValueVec> &v_rowkey) {
2880 v_columns->rowkey_ = rkey;
2881 bool success(
impl_->SelectFromTableSync(cfname, rkey,
2882 CASS_CONSISTENCY_ONE, &v_columns->columns_));
2890 out->push_back(v_columns.release());
2897 const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2902 v_columns->rowkey_ = rkey;
2903 bool success(
impl_->SelectFromTableClusteringKeyRangeSync(cfname,
2904 rkey, crange, CASS_CONSISTENCY_ONE, &v_columns->columns_));
2908 " Clustering Key Range: " << crange.
ToString() <<
" FAILED");
2913 out->push_back(v_columns.release());
2920 const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2925 bool success(
impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2926 v_rowkey, crange, consistency, read_vec, out));
2939 bool success(
impl_->SelectFromTableSync(cfname, consistency, out));
2951 uint64_t *enqueues)
const {
2967 GenDb::DbErrors *dbe) {
2974 GenDb::DbErrors *dbe)
const {
2981 return impl_->GetMetrics(metrics);
2986 bool success(
impl_->GetMetrics(&metrics));
2990 db_stats->requests_one_minute_rate = metrics.requests.one_minute_rate;
2991 db_stats->stats = metrics.stats;
2992 db_stats->errors = metrics.errors;
3002 uint64_t num_writes) {
3013 uint64_t num_writes) {
3019 const std::string &table_name) {
3025 const std::string &table_name) {
3036 uint64_t num_reads) {
3047 uint64_t num_reads) {
3062 namespace interface {
3075 return cass_cluster_new();
3079 cass_cluster_free(cluster);
3083 CassCluster* cluster,
const char* contact_points) {
3084 return cass_cluster_set_contact_points(cluster, contact_points);
3089 return cass_cluster_set_port(cluster, port);
3093 cass_cluster_set_ssl(cluster, ssl);
3097 const char* username,
const char* password) {
3098 cass_cluster_set_credentials(cluster, username, password);
3102 unsigned num_threads) {
3103 return cass_cluster_set_num_threads_io(cluster, num_threads);
3107 CassCluster* cluster,
unsigned num_requests) {
3108 return cass_cluster_set_pending_requests_high_water_mark(cluster,
3113 CassCluster* cluster,
unsigned num_requests) {
3114 return cass_cluster_set_pending_requests_low_water_mark(cluster,
3119 CassCluster* cluster,
unsigned num_bytes) {
3120 return cass_cluster_set_write_bytes_high_water_mark(cluster, num_bytes);
3124 CassCluster* cluster,
unsigned num_bytes) {
3125 return cass_cluster_set_write_bytes_low_water_mark(cluster, num_bytes);
3129 CassCluster* cluster,
const char* hosts) {
3130 cass_cluster_set_whitelist_filtering(cluster, hosts);
3135 return cass_ssl_new();
3139 return cass_ssl_free(ssl);
3143 const std::string &cert) {
3144 return cass_ssl_add_trusted_cert_n(ssl, cert.c_str(), cert.length());
3148 cass_ssl_set_verify_flags(ssl, flags);
3153 return cass_session_new();
3157 cass_session_free(session);
3161 unsigned timeout_ms) {
3162 return cass_cluster_set_request_timeout(cluster, timeout_ms);
3166 const CassCluster* cluster) {
3167 return cass_session_connect(session, cluster);
3171 return cass_session_close(session);
3175 const CassStatement* statement) {
3176 return cass_session_execute(session, statement);
3180 const CassSession* session) {
3181 return cass_session_get_schema_meta(session);
3185 const char* query) {
3186 return cass_session_prepare(session, query);
3190 CassMetrics* output) {
3191 cass_session_get_metrics(session, output);
3196 const CassSchemaMeta* schema_meta) {
3197 cass_schema_meta_free(schema_meta);
3201 const CassSchemaMeta* schema_meta,
const char* keyspace) {
3202 return cass_schema_meta_keyspace_by_name(schema_meta, keyspace);
3206 const CassKeyspaceMeta* keyspace_meta,
const char* table) {
3207 return cass_keyspace_meta_table_by_name(keyspace_meta, table);
3211 const CassTableMeta* table_meta) {
3212 return cass_table_meta_partition_key_count(table_meta);
3216 const CassTableMeta* table_meta) {
3217 return cass_table_meta_clustering_key_count(table_meta);
3222 cass_future_free(future);
3226 CassFutureCallback callback,
void* data) {
3227 return cass_future_set_callback(future, callback, data);
3231 cass_future_wait(future);
3235 CassFuture* future) {
3236 return cass_future_get_result(future);
3240 const char** message,
size_t* message_length) {
3241 cass_future_error_message(future, message, message_length);
3245 return cass_future_error_code(future);
3249 CassFuture* future) {
3250 return cass_future_get_prepared(future);
3255 cass_result_free(result);
3259 return cass_result_column_count(result);
3263 size_t index,
const char** name,
size_t* name_length) {
3264 return cass_result_column_name(result, index, name, name_length);
3269 cass_iterator_free(iterator);
3273 const CassResult* result) {
3274 return cass_iterator_from_result(result);
3278 return cass_iterator_next(iterator);
3282 const CassIterator* iterator) {
3283 return cass_iterator_get_row(iterator);
3288 size_t parameter_count) {
3289 return cass_statement_new(query, parameter_count);
3293 cass_statement_free(statement);
3297 CassStatement* statement, CassConsistency consistency) {
3298 return cass_statement_set_consistency(statement, consistency);
3302 CassStatement* statement,
3303 size_t index,
const char* value,
size_t value_length) {
3304 return cass_statement_bind_string_n(statement, index, value, value_length);
3308 size_t index, cass_int32_t value) {
3309 return cass_statement_bind_int32(statement, index, value);
3313 size_t index, cass_int64_t value) {
3314 return cass_statement_bind_int64(statement, index, value);
3318 size_t index, CassUuid value) {
3319 return cass_statement_bind_uuid(statement, index, value);
3323 CassStatement* statement,
size_t index, cass_double_t value) {
3324 return cass_statement_bind_double(statement, index, value);
3328 size_t index, CassInet value) {
3329 return cass_statement_bind_inet(statement, index, value);
3333 CassStatement* statement,
3334 size_t index,
const cass_byte_t* value,
size_t value_length) {
3335 return cass_statement_bind_bytes(statement, index, value, value_length);
3339 CassStatement* statement,
3340 const char* name,
size_t name_length,
const char* value,
3341 size_t value_length) {
3342 return cass_statement_bind_string_by_name_n(statement, name, name_length,
3343 value, value_length);
3347 CassStatement* statement,
const char* name, cass_int32_t value) {
3348 return cass_statement_bind_int32_by_name(statement, name, value);
3352 CassStatement* statement,
const char* name, cass_int64_t value) {
3353 return cass_statement_bind_int64_by_name(statement, name, value);
3357 CassStatement* statement,
const char* name, CassUuid value) {
3358 return cass_statement_bind_uuid_by_name(statement, name, value);
3362 CassStatement* statement,
const char* name, cass_double_t value) {
3363 return cass_statement_bind_double_by_name(statement, name, value);
3367 CassStatement* statement,
const char* name, CassInet value) {
3368 return cass_statement_bind_inet_by_name(statement, name, value);
3372 CassStatement* statement,
3373 const char* name,
size_t name_length,
const cass_byte_t* value,
3374 size_t value_length) {
3375 return cass_statement_bind_bytes_by_name_n(statement, name, name_length,
3376 value, value_length);
3381 cass_prepared_free(prepared);
3385 const CassPrepared* prepared) {
3386 return cass_prepared_bind(prepared);
3391 return cass_value_type(value);
3395 const char** output,
size_t* output_size) {
3396 return cass_value_get_string(value, output, output_size);
3400 cass_int8_t* output) {
3401 return cass_value_get_int8(value, output);
3405 cass_int16_t* output) {
3406 return cass_value_get_int16(value, output);
3410 cass_int32_t* output) {
3411 return cass_value_get_int32(value, output);
3415 cass_int64_t* output) {
3416 return cass_value_get_int64(value, output);
3421 return cass_value_get_uuid(value, output);
3425 cass_double_t* output) {
3426 return cass_value_get_double(value, output);
3431 return cass_value_get_inet(value, output);
3435 const cass_byte_t** output,
size_t* output_size) {
3436 return cass_value_get_bytes(value, output, output_size);
3440 return cass_value_is_null(value);
3445 const cass_uint8_t* address) {
3446 return cass_inet_init_v4(address);
3450 const cass_uint8_t* address) {
3451 return cass_inet_init_v6(address);
3457 return cass_row_get_column(row, index);
3462 cass_log_set_level(log_level);
3467 cass_log_set_callback(callback, data);
boost::asio::ip::address_v6 Ip6Address
boost::asio::ip::address IpAddress
boost::asio::ip::address_v4 Ip4Address
std::string GetHostIp(boost::asio::io_context *io_service, const std::string &hostname)
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
boost::asio::io_context * io_service()
void GetDiffs(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
void IncrementTableReadFail(const std::string &table_name)
void IncrementTableRead(const std::string &table_name)
void IncrementErrors(IfErrors::Type type)
void IncrementTableWriteBackPressureFail(const std::string &table_name)
void IncrementTableReadBackPressureFail(const std::string &table_name)
void IncrementTableWriteFail(const std::string &table_name)
void GetCumulative(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
void IncrementTableWrite(const std::string &table_name)
boost::function< void(size_t)> DbQueueWaterMarkCb
boost::function< void(DbOpResult::type, std::auto_ptr< ColList >)> DbGetRowCb
boost::function< void(DbOpResult::type)> DbAddColumnCb
@ ERR_WRITE_COLUMN_FAMILY
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a wrapper over tbb::task to support policies.
impl::CassSessionPtr session_
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
bool DisconnectSchemaSync()
impl::CassClusterPtr cluster_
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
bool IsTableDynamic(const std::string &table)
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
impl::CassSessionPtr schema_session_
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
static const char * kTaskName
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
bool GetMetrics(Metrics *metrics) const
tbb::atomic< SessionState::type > schema_session_state_
void SetRequestTimeout(uint32_t timeout_ms)
tbb::atomic< SessionState::type > session_state_
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
CassPreparedMapType insert_prepared_map_
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
static const char * kQCreateKeyspaceIfNotExists
bool IsTablePresent(const std::string &table)
std::string schema_contact_point_
interface::CassLibrary * cci_
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
bool IsInsertIntoTablePrepareSupported(const std::string &table)
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
static const char * kQUseKeyspace
int IsTableStatic(const std::string &table)
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
boost::scoped_ptr< CqlIfImpl > impl_
boost::scoped_ptr< interface::CassLibrary > cci_
void IncrementTableWriteStats(const std::string &table_name)
tbb::atomic< bool > initialized_
virtual void Db_SetQueueWaterMark(bool high, size_t queue_count, DbQueueWaterMarkCb cb)
virtual bool Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname, const std::vector< GenDb::DbDataValueVec > &v_rowkey)
virtual bool Db_GetCumulativeStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
virtual bool Db_GetRow(GenDb::ColList *out, const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency)
virtual bool Db_CreateIndex(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode=GenDb::ColIndexMode::NONE)
virtual void Db_SetInitDone(bool)
virtual bool Db_GetCqlMetrics(Metrics *metrics) const
GenDb::GenDbIfStats stats_
virtual bool Db_AddSetTablespace(const std::string &tablespace, const std::string &replication_factor="1")
virtual void Db_ResetQueueWaterMarks()
virtual bool Db_GetCqlStats(DbStats *db_stats) const
void IncrementErrors(GenDb::IfErrors::Type err_type)
virtual bool Db_GetRowAsync(const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbGetRowCb cb)
void IncrementTableWriteFailStats(const std::string &table_name)
virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf, const std::string &compaction_strategy)
virtual bool Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname, GenDb::DbConsistency::type dconsistency)
virtual std::vector< GenDb::Endpoint > Db_GetEndpoints() const
virtual bool Db_AddColumn(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbAddColumnCb cb)
virtual bool Db_AddColumnSync(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency)
void IncrementTableReadFailStats(const std::string &table_name)
void IncrementTableReadBackPressureFailStats(const std::string &table_name)
virtual bool Db_GetQueueStats(uint64_t *queue_count, uint64_t *enqueues) const
virtual bool Db_SetTablespace(const std::string &tablespace)
void IncrementTableWriteBackPressureFailStats(const std::string &table_name)
void OnAsyncColumnAddCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb)
virtual bool Db_GetStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
void OnAsyncRowGetCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbGetRowCb cb)
std::vector< GenDb::Endpoint > endpoints_
void IncrementTableReadStats(const std::string &table_name)
bool use_prepared_for_insert_
virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf)
void operator()(const boost::uuids::uuid &tuuid) const
void operator()(const uint64_t &tu64) const
CassQueryPrinter(std::ostream &os, bool quote_strings)
void operator()(const IpAddress &tipaddr) const
void operator()(const std::string &tstring) const
void operator()(const uint32_t &tu32) const
void operator()(const T &t) const
void operator()(const uint8_t &tu8) const
CassQueryPrinter(std::ostream &os)
void operator()(const double &tdouble, size_t index) const
void operator()(const IpAddress &tipaddr, size_t index) const
void operator()(const boost::uuids::uuid &tuuid, size_t index) const
void operator()(const uint16_t &tu16, size_t index) const
void operator()(const std::string &tstring, size_t index) const
void operator()(const uint32_t &tu32, size_t index) const
CassStatementIndexBinder(interface::CassLibrary *cci, CassStatement *statement)
void operator()(const uint64_t &tu64, size_t index) const
void operator()(const GenDb::Blob &tblob, size_t index) const
void operator()(const uint8_t &tu8, size_t index) const
void operator()(const boost::blank &tblank, size_t index) const
interface::CassLibrary * cci_
CassStatement * statement_
void operator()(const uint32_t &tu32, const char *name) const
CassStatementNameBinder(interface::CassLibrary *cci, CassStatement *statement)
void operator()(const std::string &tstring, const char *name) const
void operator()(const boost::blank &tblank, const char *name) const
void operator()(const boost::uuids::uuid &tuuid, const char *name) const
void operator()(const GenDb::Blob &tblob, const char *name) const
void operator()(const double &tdouble, const char *name) const
CassStatement * statement_
void operator()(const uint64_t &tu64, const char *name) const
void operator()(const uint8_t &tu8, const char *name) const
interface::CassLibrary * cci_
void operator()(const IpAddress &tipaddr, const char *name) const
void operator()(const uint16_t &tu16, const char *name) const
std::string Description() const
WorkerTask(FunctionPtr func, int task_id, int task_instance)
boost::function< void(void)> FunctionPtr
bool Run()
Code to execute. Returns true if task is completed. Return false to reschedule the task.
virtual CassError CassStatementBindDouble(CassStatement *statement, size_t index, cass_double_t value)
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)
virtual void CassFutureFree(CassFuture *future)
virtual CassInet CassInetInitV6(const cass_uint8_t *address)
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)
virtual void CassResultFree(const CassResult *result)
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)
virtual CassInet CassInetInitV4(const cass_uint8_t *address)
virtual CassError CassStatementBindDoubleByName(CassStatement *statement, const char *name, cass_double_t value)
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)
virtual CassError CassStatementBindStringByNameN(CassStatement *statement, const char *name, size_t name_length, const char *value, size_t value_length)
virtual void CassSchemaMetaFree(const CassSchemaMeta *schema_meta)
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)
virtual void CassLogSetLevel(CassLogLevel log_level)
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)
virtual CassError CassStatementBindUuid(CassStatement *statement, size_t index, CassUuid value)
virtual void CassIteratorFree(CassIterator *iterator)
virtual CassIterator * CassIteratorFromResult(const CassResult *result)
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)
virtual CassError CassStatementBindBytes(CassStatement *statement, size_t index, const cass_byte_t *value, size_t value_length)
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)
virtual CassFuture * CassSessionClose(CassSession *session)
virtual CassSsl * CassSslNew()
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)
virtual ~CassDatastaxLibrary()
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)
virtual void CassPreparedFree(const CassPrepared *prepared)
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)
virtual size_t CassResultColumnCount(const CassResult *result)
virtual void CassClusterFree(CassCluster *cluster)
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)
virtual CassError CassStatementBindStringN(CassStatement *statement, size_t index, const char *value, size_t value_length)
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)
virtual void CassSslFree(CassSsl *ssl)
virtual const CassResult * CassFutureGetResult(CassFuture *future)
virtual CassValueType GetCassValueType(const CassValue *value)
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)
virtual CassError CassStatementBindInet(CassStatement *statement, size_t index, CassInet value)
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)
virtual cass_bool_t CassValueIsNull(const CassValue *value)
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)
virtual void CassFutureWait(CassFuture *future)
virtual CassError CassStatementBindBytesByNameN(CassStatement *statement, const char *name, size_t name_length, const cass_byte_t *value, size_t value_length)
virtual CassError CassStatementBindUuidByName(CassStatement *statement, const char *name, CassUuid value)
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)
virtual CassSession * CassSessionNew()
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)
virtual CassError CassStatementBindInt64(CassStatement *statement, size_t index, cass_int64_t value)
virtual void CassLogSetCallback(CassLogCallback callback, void *data)
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)
virtual CassError CassStatementBindInt64ByName(CassStatement *statement, const char *name, cass_int64_t value)
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)
virtual void CassSessionFree(CassSession *session)
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)
virtual void CassStatementFree(CassStatement *statement)
virtual CassError CassStatementBindInt32ByName(CassStatement *statement, const char *name, cass_int32_t value)
virtual CassError CassStatementBindInetByName(CassStatement *statement, const char *name, CassInet value)
virtual CassCluster * CassClusterNew()
virtual CassError CassFutureErrorCode(CassFuture *future)
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)=0
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)=0
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)=0
virtual size_t CassResultColumnCount(const CassResult *result)=0
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)=0
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)=0
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)=0
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassSsl * CassSslNew()=0
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)=0
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)=0
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)=0
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassSession * CassSessionNew()=0
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)=0
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)=0
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)=0
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)=0
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)=0
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)=0
virtual CassValueType GetCassValueType(const CassValue *value)=0
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)=0
virtual const CassResult * CassFutureGetResult(CassFuture *future)=0
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)=0
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)=0
virtual CassIterator * CassIteratorFromResult(const CassResult *result)=0
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)=0
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)=0
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)=0
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)=0
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)=0
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)=0
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)=0
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)=0
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)=0
virtual void CassFutureWait(CassFuture *future)=0
virtual CassError CassFutureErrorCode(CassFuture *future)=0
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)=0
virtual cass_bool_t CassValueIsNull(const CassValue *value)=0
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)=0
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)=0
virtual CassFuture * CassSessionClose(CassSession *session)=0
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)=0
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)=0
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)=0
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)=0
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)=0
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)=0
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)=0
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)=0
SandeshTraceBufferPtr CqlTraceDebugBuf(SandeshTraceBufferCreate(CQLIF_DEBUG, 10000))
#define CQLIF_INFO_TRACE(_Msg)
#define CASS_LIB_TRACE(_Level, _Msg)
SandeshTraceBufferPtr CqlTraceErrBuf(SandeshTraceBufferCreate(CQLIF_ERR, 20000))
#define CQLIF_DEBUG_TRACE(_Msg)
#define CQLIF_ERR_TRACE(_Msg)
SandeshTraceBufferPtr CqlTraceInfoBuf(SandeshTraceBufferCreate(CQLIF_INFO, 10000))
#define GENERIC_RAW_ARRAY(obj)
std::string DbDataValueVecToString(const GenDb::DbDataValueVec &v_db_value)
std::vector< DbDataValue > DbDataValueVec
boost::variant< boost::blank, std::string, uint64_t, uint32_t, boost::uuids::uuid, uint8_t, uint16_t, double, IpAddress, Blob > DbDataValue
std::vector< GenDb::DbDataType::type > DbDataTypeVec
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
boost::asio::ip::tcp::endpoint Endpoint
std::vector< WhereIndexInfo > WhereIndexInfoVec
boost::ptr_vector< ColList > ColListVec
boost::ptr_vector< NewCol > NewColVec
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
static std::string LoadCertFile(const std::string &ca_certs_path)
static void OnExecuteQueryAsync(CassFuture *future, void *data)
static void ExecuteQueryAsyncInternal(interface::CassLibrary *cci, CassSession *session, const char *qid, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx=NULL)
static bool DynamicCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
static void CassLibraryLog(const CassLogMessage *message, void *data)
void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, size_t rk_count, GenDb::ColListVec *v_col_list)
static void ExecuteQueryResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx)
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
static const char * kQCompactionStrategy("compaction = {'class': " "'org.apache.cassandra.db.compaction.%s'}")
static GenDb::DbDataValue CassValue2DbDataValue(interface::CassLibrary *cci, const CassValue *cvalue)
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
static bool GetCassTablePartitionKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *rk_count)
bool DynamicCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
static std::string DbDataTypes2CassTypes(const GenDb::DbDataTypeVec &v_db_types)
static std::string CassSelectFromTableInternal(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec, const GenDb::WhereIndexInfoVec &where_vec)
CassSharedPtr< CassSsl > CassSslPtr
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, size_t rk_count, size_t ck_count, CassConsistency consistency, GenDb::ColListVec *v_col_list)
static bool GetCassTableClusteringKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *ck_count)
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
CassSharedPtr< const CassPrepared > CassPreparedPtr
static bool PrepareSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassPreparedPtr *prepared)
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
static bool StaticCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
static bool IsCassTableMetaPresent(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table)
static std::string DbColIndexMode2String(const GenDb::ColIndexMode::type index_mode)
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
static CassConsistency Db2CassConsistency(GenDb::DbConsistency::type dconsistency)
static const std::string kQReadRepairChanceDTCS("read_repair_chance = 0.0")
CassSharedPtr< const CassResult > CassResultPtr
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, size_t rk_count, CassConsistency consistency, GenDb::ColListVec *v_col_list)
static bool ExecuteQueryResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassResultPtr *result, CassConsistency consistency)
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
std::string CassSelectFromTable(const std::string &table)
static void ExecuteQueryStatementAsync(interface::CassLibrary *cci, CassSession *session, const char *query_id, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb)
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
static const char * DbDataType2CassType(const GenDb::DbDataType::type &db_type)
static void ExecuteQueryAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb)
static void encode_uuid(char *output, const CassUuid &uuid)
bool StaticCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc)
static char * decode_uuid(char *input, CassUuid *output)
void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, size_t rk_count, size_t ck_count, GenDb::ColListVec *v_col_list)
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, GenDb::NewColVec *v_columns)
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel)
static bool SyncFutureWait(interface::CassLibrary *cci, CassFuture *future)
static const CassTableMeta * GetCassTableMeta(interface::CassLibrary *cci, const CassSchemaMeta *schema_meta, const std::string &keyspace, const std::string &table, bool log_error)
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
static bool ExecuteQueryStatementSync(interface::CassLibrary *cci, CassSession *session, CassStatement *statement, CassConsistency consistency)
static bool ExecuteQuerySync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency)
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, GenDb::NewColVec *v_columns)
static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level)
static bool ExecuteQuerySyncInternal(interface::CassLibrary *cci, CassSession *session, CassStatement *qstatement, CassResultPtr *result, CassConsistency consistency)
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
static const std::string kQGCGraceSeconds("gc_grace_seconds = 0")
static void AsyncRowGetCompletionCallback(boost::shared_ptr< AsyncRowGetCallbackContext > cb_ctx)
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
static const std::string integerToString(const NumberType &num)
const uint8_t * data() const
std::string ToString() const
std::map< std::string, GenDb::DbDataType::type > ColumnMap
DbDataTypeVec clustering_columns_
DbDataTypeVec partition_keys_
boost::scoped_ptr< DbDataValueVec > value
boost::scoped_ptr< DbDataValueVec > name
NewCf::ColumnFamilyType cftype_
static std::string ToString(Op::type op)
std::auto_ptr< GenDb::ColList > row_
GenDb::GenDbIf::DbGetRowCb cb_
AsyncRowGetCallbackContext(GenDb::GenDbIf::DbGetRowCb cb, GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row)
GenDb::DbOpResult::type drc_
GenDb::DbDataValueVec row_key_
CassString(const char *data)
CassString(const char *data, size_t length)