8 #include <boost/foreach.hpp>
9 #include <boost/algorithm/string.hpp>
10 #include <boost/algorithm/string/join.hpp>
11 #include <boost/unordered_map.hpp>
12 #include <boost/system/error_code.hpp>
14 #include <linux/version.h>
15 #if defined(RHEL_MAJOR) && (RHEL_MAJOR >= 9)
16 #include <cassandra/cassandra.h>
18 #include <cassandra.h>
29 #include <database/gendb_constants.h>
34 using namespace boost::system;
36 #define CQLIF_DEBUG "CqlTraceBufDebug"
37 #define CQLIF_INFO "CqlTraceBufInfo"
38 #define CQLIF_ERR "CqlTraceBufErr"
47 #define CQLIF_DEBUG_TRACE(_Msg) \
49 std::stringstream _ss; \
50 _ss << __func__ << ":" << __FILE__ << ":" << \
51 __LINE__ << ": " << _Msg; \
52 CQL_TRACE_TRACE(CqlTraceDebugBuf, _ss.str()); \
55 #define CQLIF_INFO_TRACE(_Msg) \
57 std::stringstream _ss; \
58 _ss << __func__ << ":" << __FILE__ << ":" << \
59 __LINE__ << ": " << _Msg; \
60 CQL_TRACE_TRACE(CqlTraceInfoBuf, _ss.str()); \
63 #define CQLIF_ERR_TRACE(_Msg) \
65 std::stringstream _ss; \
66 _ss << __func__ << ":" << __FILE__ << ":" << \
67 __LINE__ << ": " << _Msg; \
68 CQL_TRACE_TRACE(CqlTraceErrBuf, _ss.str()); \
71 #define CASS_LIB_TRACE(_Level, _Msg) \
73 if (_Level == log4cplus::ERROR_LOG_LEVEL) { \
74 CQL_TRACE_TRACE(CqlTraceErrBuf, _Msg); \
75 } else if (_Level == log4cplus::DEBUG_LOG_LEVEL) { \
76 CQL_TRACE_TRACE(CqlTraceDebugBuf, _Msg); \
78 CQL_TRACE_TRACE(CqlTraceInfoBuf, _Msg); \
82 #define CQLIF_LOG(_Level, _Msg) \
84 if (LoggingDisabled()) break; \
85 log4cplus::Logger logger = log4cplus::Logger::getRoot(); \
86 LOG4CPLUS_##_Level(logger, __func__ << ":" << __FILE__ << ":" << \
87 __LINE__ << ": " << _Msg); \
90 #define CQLIF_LOG_ERR(_Msg) \
92 LOG(ERROR, __func__ << ":" << __FILE__ << ":" << __LINE__ << ": " \
109 length(strlen(data)) {
123 uint64_t time_and_version =
uuid.time_and_version;
124 output[3] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
125 time_and_version >>= 8;
126 output[2] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
127 time_and_version >>= 8;
128 output[1] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
129 time_and_version >>= 8;
130 output[0] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
131 time_and_version >>= 8;
133 output[5] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
134 time_and_version >>= 8;
135 output[4] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
136 time_and_version >>= 8;
138 output[7] =
static_cast<char>(time_and_version & 0x00000000000000FFLL);
139 time_and_version >>= 8;
140 output[6] =
static_cast<char>(time_and_version & 0x000000000000000FFLL);
142 uint64_t clock_seq_and_node =
uuid.clock_seq_and_node;
143 for (
size_t i = 0; i < 8; ++i) {
144 output[15 - i] =
static_cast<char>(clock_seq_and_node & 0x00000000000000FFL);
145 clock_seq_and_node >>= 8;
150 output->time_and_version =
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[3]));
151 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[2])) << 8;
152 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[1])) << 16;
153 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[0])) << 24;
155 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[5])) << 32;
156 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[4])) << 40;
158 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[7])) << 48;
159 output->time_and_version |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[6])) << 56;
161 output->clock_seq_and_node = 0;
162 for (
size_t i = 0; i < 8; ++i) {
163 output->clock_seq_and_node |=
static_cast<uint64_t
>(
static_cast<uint8_t
>(input[15 - i])) << (8 * i);
171 case GenDb::DbDataType::AsciiType:
173 case GenDb::DbDataType::LexicalUUIDType:
175 case GenDb::DbDataType::TimeUUIDType:
177 case GenDb::DbDataType::Unsigned8Type:
178 case GenDb::DbDataType::Unsigned16Type:
179 case GenDb::DbDataType::Unsigned32Type:
181 case GenDb::DbDataType::Unsigned64Type:
183 case GenDb::DbDataType::DoubleType:
185 case GenDb::DbDataType::UTF8Type:
187 case GenDb::DbDataType::InetType:
189 case GenDb::DbDataType::IntegerType:
191 case GenDb::DbDataType::BlobType:
194 assert(
false &&
"Invalid data type");
201 assert(!v_db_types.empty());
207 switch (dconsistency) {
209 return CASS_CONSISTENCY_ANY;
211 return CASS_CONSISTENCY_ONE;
213 return CASS_CONSISTENCY_TWO;
215 return CASS_CONSISTENCY_THREE;
217 return CASS_CONSISTENCY_QUORUM;
219 return CASS_CONSISTENCY_ALL;
221 return CASS_CONSISTENCY_LOCAL_QUORUM;
223 return CASS_CONSISTENCY_EACH_QUORUM;
225 return CASS_CONSISTENCY_SERIAL;
227 return CASS_CONSISTENCY_LOCAL_SERIAL;
229 return CASS_CONSISTENCY_LOCAL_ONE;
232 return CASS_CONSISTENCY_UNKNOWN;
241 quote_strings_(quote_strings) {
245 quote_strings_(true) {
252 os_ << to_string(tuuid);
257 os_ << (uint16_t)tu8;
260 if (quote_strings_) {
261 os_ <<
"'" << tstring <<
"'";
268 os_ << (int32_t)tu32;
272 os_ << (int64_t)tu64;
275 os_ <<
"'" << tipaddr <<
"'";
287 CassStatement *statement) :
289 statement_(statement) {
291 void operator()(
const boost::blank &tblank,
size_t index)
const {
292 assert(
false &&
"CassStatement bind to boost::blank not supported");
294 void operator()(
const std::string &tstring,
size_t index)
const {
295 CassError rc(cci_->CassStatementBindStringN(statement_, index,
296 tstring.c_str(), tstring.length()));
297 assert(rc == CASS_OK);
302 CassError rc(cci_->CassStatementBindUuid(statement_, index, cuuid));
303 assert(rc == CASS_OK);
306 CassError rc(cci_->CassStatementBindInt32(statement_, index, tu8));
307 assert(rc == CASS_OK);
310 CassError rc(cci_->CassStatementBindInt32(statement_, index, tu16));
311 assert(rc == CASS_OK);
314 assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
315 CassError rc(cci_->CassStatementBindInt32(statement_, index,
316 (cass_int32_t)tu32));
317 assert(rc == CASS_OK);
320 assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
321 CassError rc(cci_->CassStatementBindInt64(statement_, index,
322 (cass_int64_t)tu64));
323 assert(rc == CASS_OK);
326 CassError rc(cci_->CassStatementBindDouble(statement_, index,
327 (cass_double_t)tdouble));
328 assert(rc == CASS_OK);
332 if (tipaddr.is_v4()) {
333 boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
336 boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
339 CassError rc(cci_->CassStatementBindInet(statement_, index,
341 assert(rc == CASS_OK);
344 CassError rc(cci_->CassStatementBindBytes(statement_, index,
346 assert(rc == CASS_OK);
355 CassStatement *statement) :
357 statement_(statement) {
359 void operator()(
const boost::blank &tblank,
const char *name)
const {
360 assert(
false &&
"CassStatement bind to boost::blank not supported");
362 void operator()(
const std::string &tstring,
const char *name)
const {
363 CassError rc(cci_->CassStatementBindStringByNameN(statement_, name,
364 strlen(name), tstring.c_str(), tstring.length()));
365 assert(rc == CASS_OK);
370 CassError rc(cci_->CassStatementBindUuidByName(statement_, name,
372 assert(rc == CASS_OK);
374 void operator()(
const uint8_t &tu8,
const char *name)
const {
375 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
377 assert(rc == CASS_OK);
379 void operator()(
const uint16_t &tu16,
const char *name)
const {
380 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
382 assert(rc == CASS_OK);
384 void operator()(
const uint32_t &tu32,
const char *name)
const {
385 assert(tu32 <= (uint32_t)std::numeric_limits<int32_t>::max());
386 CassError rc(cci_->CassStatementBindInt32ByName(statement_, name,
387 (cass_int32_t)tu32));
388 assert(rc == CASS_OK);
390 void operator()(
const uint64_t &tu64,
const char *name)
const {
391 assert(tu64 <= (uint64_t)std::numeric_limits<int64_t>::max());
392 CassError rc(cci_->CassStatementBindInt64ByName(statement_, name,
393 (cass_int64_t)tu64));
394 assert(rc == CASS_OK);
396 void operator()(
const double &tdouble,
const char *name)
const {
397 CassError rc(cci_->CassStatementBindDoubleByName(statement_, name,
398 (cass_double_t)tdouble));
399 assert(rc == CASS_OK);
403 if (tipaddr.is_v4()) {
404 boost::asio::ip::address_v4 tv4(tipaddr.to_v4());
407 boost::asio::ip::address_v6 tv6(tipaddr.to_v6());
410 CassError rc(cci_->CassStatementBindInetByName(statement_, name,
412 assert(rc == CASS_OK);
415 CassError rc(cci_->CassStatementBindBytesByNameN(statement_, name,
416 strlen(name), tblob.
data(), tblob.
size()));
417 assert(rc == CASS_OK);
424 "compaction = {'class': "
425 "'org.apache.cassandra.db.compaction.%s'}");
428 "read_repair_chance = 0.0");
435 const std::string &compaction_strategy) {
436 std::ostringstream query;
438 query <<
"CREATE TABLE IF NOT EXISTS " << cf.
cfname_ <<
" ";
441 assert(rkeys.size() == 1);
446 assert(!cfcolumns.empty());
447 BOOST_FOREACH(
const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
449 query <<
", \"" << cfcolumn.first <<
"\" " <<
454 compaction_strategy.c_str()));
455 assert(!(n < 0 || n >= (
int)
sizeof(cbuf)));
462 if (compaction_strategy ==
463 GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
464 query <<
") WITH " << std::string(cbuf) <<
" AND " <<
467 query <<
") WITH " << std::string(cbuf) <<
" AND " <<
475 const std::string &compaction_strategy,
476 boost::system::error_code *ec) {
477 std::ostringstream query;
479 *ec = errc::make_error_code(errc::success);
483 *ec = errc::make_error_code(errc::invalid_argument);
488 query <<
"CREATE TABLE IF NOT EXISTS " << cf.
cfname_ <<
" (";
491 int rk_size(rkeys.size());
492 for (
int i = 0; i < rk_size; i++) {
495 query <<
"key" << key_num;
503 int ccn_size(clustering_columns.size());
504 for (
int i = 0; i < ccn_size; i++) {
506 query <<
"column" << cnum <<
" " <<
511 int cn_size(columns.size());
512 for (
int i = 0; i < cn_size; i++) {
513 int cnum(i + 1 + ccn_size);
514 query <<
"column" << cnum <<
" " <<
519 if (values.size() > 0) {
523 query <<
"PRIMARY KEY (";
524 std::ostringstream rkey_ss;
525 for (
int i = 0; i < rk_size; i++) {
528 rkey_ss <<
", key" << key_num;
534 query <<
"(" << rkey_ss.str() <<
"), ";
536 query << rkey_ss.str() <<
", ";
538 for (
int i = 0; i < ccn_size; i++) {
543 query <<
"column" << cnum;
547 compaction_strategy.c_str()));
548 assert(!(n < 0 || n >= (
int)
sizeof(cbuf)));
555 if (compaction_strategy ==
556 GenDb::g_gendb_constants.DATE_TIERED_COMPACTION_STRATEGY) {
557 query <<
")) WITH " << std::string(cbuf) <<
" AND " <<
560 query <<
")) WITH " << std::string(cbuf) <<
" AND " <<
568 switch (index_mode) {
571 case GenDb::ColIndexMode::PREFIX:
573 case GenDb::ColIndexMode::CONTAINS:
576 assert(
false &&
"INVALID");
585 const std::string &column,
const std::string &indexname,
587 std::ostringstream query;
594 query <<
"INDEX IF NOT EXISTS " << indexname <<
" ";
596 query <<
"ON " << cfname <<
"(\""<< column <<
"\")";
599 query <<
" USING \'org.apache.cassandra.index.sasi.SASIIndex\' " <<
611 std::ostringstream query;
613 const std::string &table(v_columns->
cfname_);
614 query <<
"INSERT INTO " << table <<
" (";
615 std::ostringstream values_ss;
616 values_ss <<
"VALUES (";
620 int rk_size(rkeys.size());
621 for (
int i = 0; i < rk_size; i++) {
624 query <<
", key" << key_num;
631 boost::apply_visitor(values_printer, rkeys[i]);
641 assert(cnames.size() == 1);
644 boost::apply_visitor(cnames_printer, cnames[0]);
649 assert(cvalues.size() == 1);
650 boost::apply_visitor(values_printer, cvalues[0]);
656 query << values_ss.str();
658 query <<
" USING TTL " << cttl;
664 std::ostringstream query;
666 const std::string &table(v_columns->
cfname_);
667 query <<
"INSERT INTO " << table <<
" (";
668 std::ostringstream values_ss;
671 int rk_size(rkeys.size());
673 for (
int i = 0; i < rk_size; i++) {
676 query <<
", key" << key_num;
680 boost::apply_visitor(values_printer, rkeys[i]);
685 assert(columns.size() == 1);
690 int cn_size(cnames.size());
691 for (
int i = 0; i < cn_size; i++) {
694 query <<
", column" << cnum;
695 boost::apply_visitor(values_printer, cnames[i]);
696 if (i != cn_size - 1) {
703 if (cvalues.size() > 0) {
704 query <<
", value) VALUES (";
706 boost::apply_visitor(values_printer, cvalues[0]);
708 query <<
") VALUES (";
711 query << values_ss.str();
712 if (column.
ttl > 0) {
713 query <<
" USING TTL " << column.
ttl;
723 std::ostringstream query;
725 query <<
"INSERT INTO " << cf.
cfname_ <<
" ";
728 assert(rkeys.size() == 1);
729 std::ostringstream values_ss;
731 values_ss <<
") VALUES (?";
734 assert(!cfcolumns.empty());
735 BOOST_FOREACH(
const GenDb::NewCf::ColumnMap::value_type &cfcolumn,
737 query <<
", \"" << cfcolumn.first <<
"\"";
740 query << values_ss.str();
741 query <<
") USING TTL ?";
746 boost::system::error_code *ec) {
747 std::ostringstream query;
749 *ec = errc::make_error_code(errc::success);
753 *ec = errc::make_error_code(errc::invalid_argument);
758 query <<
"INSERT INTO " << cf.
cfname_ <<
" (";
761 int rk_size(rkeys.size());
762 std::ostringstream values_ss;
763 for (
int i = 0; i < rk_size; i++) {
766 query <<
"key" << key_num;
775 int ccn_size(clustering_columns.size());
776 for (
int i = 0; i < ccn_size; i++) {
778 query <<
"column" << cnum;
780 if (i != ccn_size - 1) {
787 int cn_size(columns.size());
792 for (
int i = 0; i < cn_size; i++) {
793 int cnum(i + 1 + ccn_size);
794 query <<
"column" << cnum;
796 if (i != cn_size - 1) {
803 if (values.size() > 0) {
807 query <<
") VALUES (";
809 query << values_ss.str();
810 query <<
" USING TTL ?";
819 CassStatement *statement,
824 int rk_size(rkeys.size());
826 for (; (int) idx < rk_size; idx++) {
829 int key_num(idx + 1);
834 boost::apply_visitor(boost::bind(values_binder, _1, rk_name.c_str()),
842 assert(cnames.size() == 1);
844 std::string cname(boost::get<std::string>(cnames[0]));
846 assert(cvalues.size() == 1);
847 boost::apply_visitor(boost::bind(values_binder, _1, cname.c_str()),
854 (cass_int32_t)cttl));
855 assert(rc == CASS_OK);
860 CassStatement *statement,
865 int rk_size(rkeys.size());
867 for (; (int) idx < rk_size; idx++) {
868 boost::apply_visitor(boost::bind(values_binder, _1, idx), rkeys[idx]);
872 assert(columns.size() == 1);
877 int cn_size(cnames.size());
878 for (
int i = 0; i < cn_size; i++, idx++) {
879 boost::apply_visitor(boost::bind(values_binder, _1, idx), cnames[i]);
883 if (cvalues.size() > 0) {
884 boost::apply_visitor(boost::bind(values_binder, _1, idx++),
888 (cass_int32_t)column.
ttl));
889 assert(rc == CASS_OK);
894 const std::vector<GenDb::DbDataValueVec> &rkeys,
898 std::ostringstream query;
900 if (read_vec.empty()) {
901 query <<
"SELECT * FROM " << table;
904 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
905 it != read_vec.end(); it++) {
906 query << it->get<0>() <<
",";
907 bool read_timestamp = it->get<3>();
908 if (read_timestamp) {
909 query <<
"WRITETIME(" << it->get<0>() <<
"),";
912 query.seekp(-1, query.cur);
913 query <<
" FROM " << table;
915 if (rkeys.size() == 1) {
917 int rk_size(rkey.size());
919 for (
int i = 0; i < rk_size; i++) {
922 query <<
" AND key" << key_num <<
"=";
924 query <<
" WHERE key=";
926 boost::apply_visitor(cprinter, rkey[i]);
929 }
else if (rkeys.size() > 1) {
930 query <<
" WHERE key IN (";
932 int rk_size(rkey.size());
933 assert(rk_size == 1);
935 boost::apply_visitor(cprinter, rkey[0]);
938 query.seekp(-1, query.cur);
941 if (!where_vec.empty()) {
942 for (GenDb::WhereIndexInfoVec::const_iterator it = where_vec.begin();
943 it != where_vec.end(); ++it) {
944 std::ostringstream value_ss;
946 boost::apply_visitor(value_vprinter, it->get<2>());
948 query <<
" " << it->get<0>();
950 query <<
" " << value_ss.str();
954 if (!ck_range.
start_.empty()) {
955 int ck_start_size(ck_range.
start_.size());
956 std::ostringstream start_ss;
960 for (
int i = 0; i < ck_start_size; i++) {
966 query <<
"column" << cnum;
967 boost::apply_visitor(start_vprinter, ck_range.
start_[i]);
971 query << start_ss.str();
973 if (!ck_range.
finish_.empty()) {
974 int ck_finish_size(ck_range.
finish_.size());
975 std::ostringstream finish_ss;
980 for (
int i = 0; i < ck_finish_size; i++) {
986 query <<
"column" << cnum;
987 boost::apply_visitor(finish_vprinter, ck_range.
finish_[i]);
991 query << finish_ss.str();
994 query <<
" LIMIT " << ck_range.
count_;
997 if (where_vec.size() > 1) {
998 query <<
" ALLOW FILTERING";
1008 std::vector<GenDb::DbDataValueVec> rkey_vec;
1009 rkey_vec.push_back(rkeys);
1011 read_vec, where_vec);
1016 std::vector<GenDb::DbDataValueVec> rkey_vec;
1017 rkey_vec.push_back(rkeys);
1027 std::vector<GenDb::DbDataValueVec> rkey_vec;
1028 rkey_vec.push_back(rkeys);
1034 const std::string &table,
const std::vector<GenDb::DbDataValueVec> &rkeys,
1042 std::vector<GenDb::DbDataValueVec> rkey_vec;
1055 case CASS_VALUE_TYPE_ASCII:
1056 case CASS_VALUE_TYPE_VARCHAR:
1057 case CASS_VALUE_TYPE_TEXT: {
1061 assert(rc == CASS_OK);
1062 return std::string(ctstring.
data, ctstring.
length);
1064 case CASS_VALUE_TYPE_UUID: {
1067 assert(rc == CASS_OK);
1072 case CASS_VALUE_TYPE_DOUBLE: {
1073 cass_double_t ctdouble;
1075 assert(rc == CASS_OK);
1076 return (
double)ctdouble;
1078 case CASS_VALUE_TYPE_TINY_INT: {
1081 assert(rc == CASS_OK);
1082 return (uint8_t)ct8;
1084 case CASS_VALUE_TYPE_SMALL_INT: {
1087 assert(rc == CASS_OK);
1088 return (uint16_t)ct16;
1090 case CASS_VALUE_TYPE_INT: {
1093 assert(rc == CASS_OK);
1094 return (uint32_t)ct32;
1096 case CASS_VALUE_TYPE_BIGINT: {
1099 assert(rc == CASS_OK);
1100 return (uint64_t)ct64;
1102 case CASS_VALUE_TYPE_INET: {
1105 assert(rc == CASS_OK);
1107 if (ctinet.address_length == CASS_INET_V4_LENGTH) {
1108 Ip4Address::bytes_type ipv4;
1111 }
else if (ctinet.address_length == CASS_INET_V6_LENGTH) {
1112 Ip6Address::bytes_type ipv6;
1120 case CASS_VALUE_TYPE_BLOB: {
1121 const cass_byte_t *bytes(NULL);
1124 assert(rc == CASS_OK);
1127 case CASS_VALUE_TYPE_UNKNOWN: {
1133 assert(
false &&
"Unhandled value type");
1140 CassSession *session,
const char* query,
1147 if (rc != CASS_OK) {
1156 return rc == CASS_OK;
1160 CassSession *session,
1162 CassConsistency consistency) {
1168 if (rc != CASS_OK) {
1179 return rc == CASS_OK;
1183 CassSession *session,
const char *query, CassConsistency consistency) {
1191 CassSession *session,
const char *query,
1200 CassSession *session, CassStatement *statement,
1201 CassConsistency consistency) {
1210 case CASS_ERROR_LIB_NO_HOSTS_AVAILABLE:
1211 case CASS_ERROR_LIB_REQUEST_QUEUE_FULL:
1212 case CASS_ERROR_LIB_NO_AVAILABLE_IO_THREAD:
1230 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1231 it != read_vec.end(); it++) {
1232 bool row_key = it->get<1>();
1233 bool row_column = it->get<2>();
1234 bool read_timestamp = it->get<3>();
1243 cnames->push_back(db_value);
1245 values->push_back(db_value);
1246 if (read_timestamp) {
1251 timestamps->push_back(time_value);
1257 v_columns->push_back(column);
1264 std::auto_ptr<GenDb::ColList> col_list;
1274 for (GenDb::FieldNamesToReadVec::const_iterator it = read_vec.begin();
1275 it != read_vec.end(); it++) {
1276 bool row_key = it->get<1>();
1277 bool row_column = it->get<2>();
1278 bool read_timestamp = it->get<3>();
1284 rkey.push_back(db_value);
1292 cnames->push_back(db_value);
1294 values->push_back(db_value);
1295 if (read_timestamp) {
1300 timestamps->push_back(time_value);
1307 if (!col_list.get()) {
1309 col_list->rowkey_ = rkey;
1311 if (rkey != col_list->rowkey_) {
1312 v_col_list->push_back(col_list.release());
1314 col_list->rowkey_ = rkey;
1317 v_columns->push_back(column);
1319 if (col_list.get()) {
1320 v_col_list->push_back(col_list.release());
1335 for (
size_t i = rk_count; i < rk_count + ck_count; i++) {
1339 cnames->push_back(db_value);
1343 for (
size_t i = rk_count + ck_count; i < ccount; i++) {
1347 values->push_back(db_value);
1350 v_columns->push_back(column);
1357 std::auto_ptr<GenDb::ColList> col_list;
1366 for (
size_t i = 0; i < rk_count; i++) {
1370 rkey.push_back(db_value);
1374 for (
size_t i = rk_count; i < rk_count + ck_count; i++) {
1378 cnames->push_back(db_value);
1382 for (
size_t i = rk_count + ck_count; i < ccount; i++) {
1386 values->push_back(db_value);
1390 if (!col_list.get()) {
1392 col_list->rowkey_ = rkey;
1394 if (rkey != col_list->rowkey_) {
1395 v_col_list->push_back(col_list.release());
1397 col_list->rowkey_ = rkey;
1400 v_columns->push_back(column);
1402 if (col_list.get()) {
1403 v_col_list->push_back(col_list.release());
1415 for (
size_t i = 0; i < ccount; i++) {
1419 assert(rc == CASS_OK);
1427 std::string(cname.
data, cname.
length), db_value, 0));
1428 v_columns->push_back(column);
1435 std::auto_ptr<GenDb::ColList> col_list;
1444 for (
size_t i = 0; i < rk_count; i++) {
1448 rkey.push_back(db_value);
1451 if (!col_list.get()) {
1453 col_list->rowkey_ = rkey;
1455 if (rkey != col_list->rowkey_) {
1456 v_col_list->push_back(col_list.release());
1458 col_list->rowkey_ = rkey;
1461 for (
size_t i = 0; i < ccount; i++) {
1465 assert(rc == CASS_OK);
1473 std::string(cname.
data, cname.
length), db_value, 0));
1474 v_columns->push_back(column);
1477 if (col_list.get()) {
1478 v_col_list->push_back(col_list.release());
1484 std::auto_ptr<CassAsyncQueryContext> ctx(
1485 boost::reinterpret_pointer_cast<CassAsyncQueryContext>(data));
1489 if (rc != CASS_OK) {
1494 ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1497 if (ctx->result_ctx_) {
1503 col_list->cfname_ = rctx->
cf_name_;
1504 col_list->rowkey_ = rctx->
row_key_;
1511 ctx->cb_(db_rc, col_list);
1515 ctx->cb_(db_rc, std::auto_ptr<GenDb::ColList>());
1519 CassSession *session,
const char *qid, CassStatement *qstatement,
1525 std::auto_ptr<CassAsyncQueryContext> ctx(
1532 CassSession *session,
const char *query,
1541 CassSession *session,
const char *query_id, CassStatement *qstatement,
1548 CassSession *session,
const char *query, CassConsistency consistency,
1552 consistency, cb, rctx);
1556 CassSession *session,
const char *query, CassConsistency consistency,
1559 std::auto_ptr<CassQueryResultContext> rctx(
1561 rk_count, ck_count));
1568 CassSession *session,
const char *query,
1582 CassSession *session,
const char *query,
1596 CassSession *session,
const char *query,
1597 size_t rk_count,
size_t ck_count, CassConsistency consistency,
1610 CassSession *session,
const char *query,
1611 size_t rk_count,
size_t ck_count, CassConsistency consistency,
1624 CassSession *session,
const char *query,
1627 std::auto_ptr<CassQueryResultContext> rctx(
1635 CassSession *session,
const char *query,
1648 CassSession *session,
const char *query,
size_t rk_count,
1661 CassFuture *future) {
1664 if (rc != CASS_OK) {
1670 return rc == CASS_OK;
1675 const std::string &keyspace,
const std::string &table,
bool log_error) {
1676 const CassKeyspaceMeta *keyspace_meta(
1678 if (keyspace_meta == NULL) {
1681 ", Table: " << table);
1685 std::string table_lower(table);
1686 boost::algorithm::to_lower(table_lower);
1687 const CassTableMeta *table_meta(
1689 if (table_meta == NULL) {
1692 ", Table: " << table_lower);
1700 CassSession *session,
1701 const std::string &keyspace,
const std::string &table) {
1704 if (schema_meta.get() == NULL) {
1706 ", Table: " << table);
1709 bool log_error(
false);
1711 schema_meta.get(), keyspace, table, log_error));
1712 if (table_meta == NULL) {
1720 CassSession *session,
const std::string &keyspace,
1721 const std::string &table,
size_t *ck_count) {
1724 if (schema_meta.get() == NULL) {
1726 ", Table: " << table);
1729 bool log_error(
true);
1731 schema_meta.get(), keyspace, table, log_error));
1732 if (table_meta == NULL) {
1741 const std::string &keyspace,
const std::string &table,
size_t *rk_count) {
1744 if (schema_meta.get() == NULL) {
1746 ", Table: " << table);
1749 bool log_error(
true);
1751 schema_meta.get(), keyspace, table, log_error));
1752 if (table_meta == NULL) {
1761 case CASS_LOG_DISABLED:
1762 return log4cplus::OFF_LOG_LEVEL;
1763 case CASS_LOG_CRITICAL:
1764 return log4cplus::FATAL_LOG_LEVEL;
1765 case CASS_LOG_ERROR:
1766 return log4cplus::ERROR_LOG_LEVEL;
1768 return log4cplus::WARN_LOG_LEVEL;
1770 return log4cplus::INFO_LOG_LEVEL;
1771 case CASS_LOG_DEBUG:
1772 return log4cplus::DEBUG_LOG_LEVEL;
1773 case CASS_LOG_TRACE:
1774 return log4cplus::TRACE_LOG_LEVEL;
1776 return log4cplus::ALL_LOG_LEVEL;
1782 case log4cplus::OFF_LOG_LEVEL:
1783 return CASS_LOG_DISABLED;
1784 case log4cplus::FATAL_LOG_LEVEL:
1785 return CASS_LOG_CRITICAL;
1786 case log4cplus::ERROR_LOG_LEVEL:
1787 return CASS_LOG_ERROR;
1788 case log4cplus::WARN_LOG_LEVEL:
1789 return CASS_LOG_WARN;
1790 case log4cplus::INFO_LOG_LEVEL:
1791 return CASS_LOG_INFO;
1792 case log4cplus::DEBUG_LOG_LEVEL:
1793 return CASS_LOG_DEBUG;
1794 case log4cplus::TRACE_LOG_LEVEL:
1795 return CASS_LOG_TRACE;
1797 assert(
false &&
"Invalid Log4Level");
1798 return CASS_LOG_DISABLED;
1806 log4cplus::LogLevel log4level(
Cass2log4Level(message->severity));
1807 std::stringstream buf;
1808 buf <<
"CassLibrary: " << message->file <<
":" << message->line <<
1809 " " << message->function <<
"] " << message->message;
1814 if (ca_certs_path.length() == 0) {
1815 return std::string();
1817 std::ifstream file(ca_certs_path.c_str());
1819 return std::string();
1821 std::string content((std::istreambuf_iterator<char>(file)),
1822 std::istreambuf_iterator<char>());
1831 Task(task_id, task_instance),
1839 return "cass::cql::impl::WorkerTask";
1851 const std::vector<std::string> &cassandra_ips,
1853 const std::string &cassandra_user,
1854 const std::string &cassandra_password,
1856 const std::string &ca_certs_path,
1860 cluster_(cci_->CassClusterNew(), cci_),
1862 session_(cci_->CassSessionNew(), cci_),
1863 schema_session_(cci_->CassSessionNew(), cci_),
1865 io_thread_count_(2) {
1870 if (cassandra_ips.size() > 0) {
1872 boost::system::error_code ec;
1873 boost::asio::ip::address::from_string(cassandra_ips[0], ec);
1874 if(ec.value() != 0){
1885 if (content.length() == 0) {
1892 std::string contact_points(boost::algorithm::join(cassandra_ips,
","));
1896 if (!cassandra_user.empty() && !cassandra_password.empty()) {
1898 cassandra_password.c_str());
1916 const std::string &replication_factor, CassConsistency consistency) {
1922 keyspace.c_str(), replication_factor.c_str()));
1923 if (n < 0 || n >= (
int)
sizeof(buf)) {
1925 keyspace <<
", RF: " << replication_factor);
1933 CassConsistency consistency) {
1938 int n(snprintf(buf,
sizeof(buf),
kQUseKeyspace, keyspace.c_str()));
1939 if (n < 0 || n >= (
int)
sizeof(buf)) {
1955 CassConsistency consistency) {
1960 int n(snprintf(buf,
sizeof(buf),
kQUseKeyspace, keyspace.c_str()));
1961 if (n < 0 || n >= (
int)
sizeof(buf)) {
1977 const std::string &compaction_strategy, CassConsistency consistency) {
1989 compaction_strategy);
1994 boost::system::error_code ec;
1996 compaction_strategy, &ec);
2012 const std::string &column,
const std::string &indexname,
2018 indexname, index_mode));
2024 const std::string &table_name(cf.
cfname_);
2037 std::make_pair(table_name, prepared))).second;
2045 CassPreparedMapType::const_iterator it(
2050 *prepared = it->second;
2071 if (ck_count == 0) {
2087 query.c_str(), consistency, cb, cfname.c_str(), rkey);
2096 query.c_str(), consistency, cb, rk_count, ck_count,
2114 rkey, ck_range, where_vec, read_vec));
2123 query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2145 query.c_str(), consistency, cb, rk_count, ck_count, cfname.c_str(),
2154 CassConsistency consistency) {
2174 (table !=
"MessageTablev2");
2187 query.c_str(), consistency, out);
2196 query.c_str(), rk_count, ck_count, consistency, out);
2213 query.c_str(), rk_count, consistency, out);
2219 query.c_str(), rk_count, ck_count, consistency, out);
2236 rkey, ck_range, read_vec));
2239 query.c_str(), read_vec, consistency, out);
2243 const std::vector<GenDb::DbDataValueVec> &rkeys,
2252 rkeys, ck_range, read_vec));
2255 query.c_str(), read_vec, consistency, out);
2277 query.c_str(), rk_count, ck_count, consistency, out);
2364 CassMetrics cass_metrics;
2367 metrics->requests.min = cass_metrics.requests.min;
2368 metrics->requests.max = cass_metrics.requests.max;
2369 metrics->requests.mean = cass_metrics.requests.mean;
2370 metrics->requests.stddev = cass_metrics.requests.stddev;
2371 metrics->requests.median = cass_metrics.requests.median;
2372 metrics->requests.percentile_75th =
2373 cass_metrics.requests.percentile_75th;
2374 metrics->requests.percentile_95th =
2375 cass_metrics.requests.percentile_95th;
2376 metrics->requests.percentile_98th =
2377 cass_metrics.requests.percentile_98th;
2378 metrics->requests.percentile_99th =
2379 cass_metrics.requests.percentile_99th;
2380 metrics->requests.percentile_999th =
2381 cass_metrics.requests.percentile_999th;
2382 metrics->requests.mean_rate = cass_metrics.requests.mean_rate;
2383 metrics->requests.one_minute_rate =
2384 cass_metrics.requests.one_minute_rate;
2385 metrics->requests.five_minute_rate =
2386 cass_metrics.requests.five_minute_rate;
2387 metrics->requests.fifteen_minute_rate =
2388 cass_metrics.requests.fifteen_minute_rate;
2390 metrics->stats.total_connections =
2391 cass_metrics.stats.total_connections;
2392 metrics->stats.available_connections =
2393 cass_metrics.stats.available_connections;
2394 metrics->stats.exceeded_pending_requests_water_mark =
2395 cass_metrics.stats.exceeded_pending_requests_water_mark;
2396 metrics->stats.exceeded_write_bytes_water_mark =
2397 cass_metrics.stats.exceeded_write_bytes_water_mark;
2399 metrics->errors.connection_timeouts =
2400 cass_metrics.errors.connection_timeouts;
2401 metrics->errors.pending_request_timeouts =
2402 cass_metrics.errors.pending_request_timeouts;
2403 metrics->errors.request_timeouts =
2404 cass_metrics.errors.request_timeouts;
2409 CassConsistency consistency,
bool sync,
2446 boost::system::error_code ec;
2448 if (ec.value() != boost::system::errc::success) {
2463 std::auto_ptr<GenDb::ColList> v_columns,
2464 CassConsistency consistency,
bool sync,
2473 v_columns->cfname_);
2492 qstatement.get(), consistency);
2494 std::string qid(
"Prepare: " + v_columns->cfname_);
2496 qstatement.get(), consistency, cb);
2502 "CREATE KEYSPACE IF NOT EXISTS \"%s\" WITH "
2503 "replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %s }");
2511 const std::vector<std::string> &cassandra_ips,
2513 const std::string &cassandra_user,
2514 const std::string &cassandra_password,
2516 const std::string &ca_certs_path,
2517 bool create_schema) :
2518 cci_(new interface::CassDatastaxLibrary),
2519 impl_(new
CqlIfImpl(
evm, cassandra_ips, cassandra_port,
2520 cassandra_user, cassandra_password, use_ssl,
2521 ca_certs_path, cci_.get())),
2522 use_prepared_for_insert_(true),
2523 create_schema_(create_schema) {
2526 log4cplus::Logger::getRoot().getLogLevel()));
2529 BOOST_FOREACH(
const std::string &cassandra_ip, cassandra_ips) {
2530 boost::system::error_code ec;
2531 boost::asio::ip::address cassandra_addr(
2547 impl_->SetRequestTimeout(GenDb::g_gendb_constants.SCHEMA_REQUEST_TIMEOUT);
2548 bool success(
impl_->ConnectSchemaSync());
2553 return impl_->ConnectSync();
2558 impl_->DisconnectSchemaSync();
2560 impl_->DisconnectSync();
2568 impl_->SetRequestTimeout(GenDb::g_gendb_constants.DEFAULT_REQUEST_TIMEOUT);
2569 impl_->DisconnectSchemaSync();
2576 const std::string &replication_factor) {
2577 bool success(
impl_->CreateKeyspaceIfNotExistsSync(tablespace,
2578 replication_factor, CASS_CONSISTENCY_QUORUM));
2583 success =
impl_->UseKeyspaceSyncOnSchemaSession(tablespace,
2584 CASS_CONSISTENCY_ONE);
2593 bool success(
impl_->UseKeyspaceSync(tablespace, CASS_CONSISTENCY_ONE));
2603 const std::string &compaction_strategy) {
2605 impl_->CreateTableIfNotExistsSync(cf, compaction_strategy,
2606 CASS_CONSISTENCY_QUORUM));
2613 success =
impl_->LocatePrepareInsertIntoTable(cf);
2630 bool success(
impl_->IsTablePresent(cfname));
2642 const std::string &column,
const std::string &indexname,
2644 bool success(
impl_->CreateIndexIfNotExistsSync(cfname, column, indexname,
2645 CASS_CONSISTENCY_QUORUM, index_mode));
2656 std::auto_ptr<GenDb::ColList> row,
2681 std::auto_ptr<GenDb::ColList>
row_;
2685 boost::shared_ptr<AsyncRowGetCallbackContext> cb_ctx) {
2686 cb_ctx->cb_(cb_ctx->drc_, cb_ctx->row_);
2690 std::auto_ptr<GenDb::ColList> row, std::string cfname,
2692 int task_instance) {
2704 boost::shared_ptr<AsyncRowGetCallbackContext> ctx(
2708 task_id, task_instance));
2720 std::auto_ptr<GenDb::ColList> row, std::string cfname,
2727 std::string cfname(cl->cfname_);
2736 impl_->IsInsertIntoTablePrepareSupported(cfname)) {
2737 success =
impl_->InsertIntoTablePrepareAsync(cl, consistency,
2741 success =
impl_->InsertIntoTableAsync(cl, consistency,
2755 std::string cfname(cl->cfname_);
2757 bool success(
impl_->InsertIntoTableSync(cl, consistency));
2772 bool success(
impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2774 _1, _2, cfname, cb)));
2787 bool success(
impl_->SelectFromTableClusteringKeyRangeAsync(cfname, rowkey,
2789 _1, _2, cfname, cb,
true, task_id, task_instance)));
2802 bool success(
impl_->SelectFromTableAsync(cfname, rowkey,
2817 bool success(
impl_->SelectFromTableAsync(cfname, rowkey,
2819 cfname, cb,
true, task_id, task_instance)));
2832 bool success(
impl_->SelectFromTableClusteringKeyRangeAndIndexValueAsync(cfname,
2846 bool success(
impl_->SelectFromTableSync(cfname, rowkey,
2863 bool success(
impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2864 rowkey, crange, consistency, read_vec, &out->
columns_));
2875 const std::vector<GenDb::DbDataValueVec> &v_rowkey) {
2879 v_columns->rowkey_ = rkey;
2880 bool success(
impl_->SelectFromTableSync(cfname, rkey,
2881 CASS_CONSISTENCY_ONE, &v_columns->columns_));
2889 out->push_back(v_columns.release());
2896 const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2901 v_columns->rowkey_ = rkey;
2902 bool success(
impl_->SelectFromTableClusteringKeyRangeSync(cfname,
2903 rkey, crange, CASS_CONSISTENCY_ONE, &v_columns->columns_));
2907 " Clustering Key Range: " << crange.
ToString() <<
" FAILED");
2912 out->push_back(v_columns.release());
2919 const std::vector<GenDb::DbDataValueVec> &v_rowkey,
2924 bool success(
impl_->SelectFromTableClusteringKeyRangeFieldNamesSync(cfname,
2925 v_rowkey, crange, consistency, read_vec, out));
2938 bool success(
impl_->SelectFromTableSync(cfname, consistency, out));
2950 uint64_t *enqueues)
const {
2966 GenDb::DbErrors *dbe) {
2973 GenDb::DbErrors *dbe)
const {
2980 return impl_->GetMetrics(metrics);
2985 bool success(
impl_->GetMetrics(&metrics));
2989 db_stats->requests_one_minute_rate = metrics.requests.one_minute_rate;
2990 db_stats->stats = metrics.stats;
2991 db_stats->errors = metrics.errors;
3001 uint64_t num_writes) {
3012 uint64_t num_writes) {
3018 const std::string &table_name) {
3024 const std::string &table_name) {
3035 uint64_t num_reads) {
3046 uint64_t num_reads) {
3061 namespace interface {
3074 return cass_cluster_new();
3078 cass_cluster_free(cluster);
3082 CassCluster* cluster,
const char* contact_points) {
3083 return cass_cluster_set_contact_points(cluster, contact_points);
3088 return cass_cluster_set_port(cluster, port);
3092 cass_cluster_set_ssl(cluster, ssl);
3096 const char* username,
const char* password) {
3097 cass_cluster_set_credentials(cluster, username, password);
3101 unsigned num_threads) {
3102 return cass_cluster_set_num_threads_io(cluster, num_threads);
3106 CassCluster* cluster,
unsigned num_requests) {
3107 return cass_cluster_set_pending_requests_high_water_mark(cluster,
3112 CassCluster* cluster,
unsigned num_requests) {
3113 return cass_cluster_set_pending_requests_low_water_mark(cluster,
3118 CassCluster* cluster,
unsigned num_bytes) {
3119 return cass_cluster_set_write_bytes_high_water_mark(cluster, num_bytes);
3123 CassCluster* cluster,
unsigned num_bytes) {
3124 return cass_cluster_set_write_bytes_low_water_mark(cluster, num_bytes);
3128 CassCluster* cluster,
const char* hosts) {
3129 cass_cluster_set_whitelist_filtering(cluster, hosts);
3134 return cass_ssl_new();
3138 return cass_ssl_free(ssl);
3142 const std::string &cert) {
3143 return cass_ssl_add_trusted_cert_n(ssl, cert.c_str(), cert.length());
3147 cass_ssl_set_verify_flags(ssl, flags);
3152 return cass_session_new();
3156 cass_session_free(session);
3160 unsigned timeout_ms) {
3161 return cass_cluster_set_request_timeout(cluster, timeout_ms);
3165 const CassCluster* cluster) {
3166 return cass_session_connect(session, cluster);
3170 return cass_session_close(session);
3174 const CassStatement* statement) {
3175 return cass_session_execute(session, statement);
3179 const CassSession* session) {
3180 return cass_session_get_schema_meta(session);
3184 const char* query) {
3185 return cass_session_prepare(session, query);
3189 CassMetrics* output) {
3190 cass_session_get_metrics(session, output);
3195 const CassSchemaMeta* schema_meta) {
3196 cass_schema_meta_free(schema_meta);
3200 const CassSchemaMeta* schema_meta,
const char* keyspace) {
3201 return cass_schema_meta_keyspace_by_name(schema_meta, keyspace);
3205 const CassKeyspaceMeta* keyspace_meta,
const char* table) {
3206 return cass_keyspace_meta_table_by_name(keyspace_meta, table);
3210 const CassTableMeta* table_meta) {
3211 return cass_table_meta_partition_key_count(table_meta);
3215 const CassTableMeta* table_meta) {
3216 return cass_table_meta_clustering_key_count(table_meta);
3221 cass_future_free(future);
3225 CassFutureCallback callback,
void* data) {
3226 return cass_future_set_callback(future, callback, data);
3230 cass_future_wait(future);
3234 CassFuture* future) {
3235 return cass_future_get_result(future);
3239 const char** message,
size_t* message_length) {
3240 cass_future_error_message(future, message, message_length);
3244 return cass_future_error_code(future);
3248 CassFuture* future) {
3249 return cass_future_get_prepared(future);
3254 cass_result_free(result);
3258 return cass_result_column_count(result);
3262 size_t index,
const char** name,
size_t* name_length) {
3263 return cass_result_column_name(result, index, name, name_length);
3268 cass_iterator_free(iterator);
3272 const CassResult* result) {
3273 return cass_iterator_from_result(result);
3277 return cass_iterator_next(iterator);
3281 const CassIterator* iterator) {
3282 return cass_iterator_get_row(iterator);
3287 size_t parameter_count) {
3288 return cass_statement_new(query, parameter_count);
3292 cass_statement_free(statement);
3296 CassStatement* statement, CassConsistency consistency) {
3297 return cass_statement_set_consistency(statement, consistency);
3301 CassStatement* statement,
3302 size_t index,
const char* value,
size_t value_length) {
3303 return cass_statement_bind_string_n(statement, index, value, value_length);
3307 size_t index, cass_int32_t value) {
3308 return cass_statement_bind_int32(statement, index, value);
3312 size_t index, cass_int64_t value) {
3313 return cass_statement_bind_int64(statement, index, value);
3317 size_t index, CassUuid value) {
3318 return cass_statement_bind_uuid(statement, index, value);
3322 CassStatement* statement,
size_t index, cass_double_t value) {
3323 return cass_statement_bind_double(statement, index, value);
3327 size_t index, CassInet value) {
3328 return cass_statement_bind_inet(statement, index, value);
3332 CassStatement* statement,
3333 size_t index,
const cass_byte_t* value,
size_t value_length) {
3334 return cass_statement_bind_bytes(statement, index, value, value_length);
3338 CassStatement* statement,
3339 const char* name,
size_t name_length,
const char* value,
3340 size_t value_length) {
3341 return cass_statement_bind_string_by_name_n(statement, name, name_length,
3342 value, value_length);
3346 CassStatement* statement,
const char* name, cass_int32_t value) {
3347 return cass_statement_bind_int32_by_name(statement, name, value);
3351 CassStatement* statement,
const char* name, cass_int64_t value) {
3352 return cass_statement_bind_int64_by_name(statement, name, value);
3356 CassStatement* statement,
const char* name, CassUuid value) {
3357 return cass_statement_bind_uuid_by_name(statement, name, value);
3361 CassStatement* statement,
const char* name, cass_double_t value) {
3362 return cass_statement_bind_double_by_name(statement, name, value);
3366 CassStatement* statement,
const char* name, CassInet value) {
3367 return cass_statement_bind_inet_by_name(statement, name, value);
3371 CassStatement* statement,
3372 const char* name,
size_t name_length,
const cass_byte_t* value,
3373 size_t value_length) {
3374 return cass_statement_bind_bytes_by_name_n(statement, name, name_length,
3375 value, value_length);
3380 cass_prepared_free(prepared);
3384 const CassPrepared* prepared) {
3385 return cass_prepared_bind(prepared);
3390 return cass_value_type(value);
3394 const char** output,
size_t* output_size) {
3395 return cass_value_get_string(value, output, output_size);
3399 cass_int8_t* output) {
3400 return cass_value_get_int8(value, output);
3404 cass_int16_t* output) {
3405 return cass_value_get_int16(value, output);
3409 cass_int32_t* output) {
3410 return cass_value_get_int32(value, output);
3414 cass_int64_t* output) {
3415 return cass_value_get_int64(value, output);
3420 return cass_value_get_uuid(value, output);
3424 cass_double_t* output) {
3425 return cass_value_get_double(value, output);
3430 return cass_value_get_inet(value, output);
3434 const cass_byte_t** output,
size_t* output_size) {
3435 return cass_value_get_bytes(value, output, output_size);
3439 return cass_value_is_null(value);
3444 const cass_uint8_t* address) {
3445 return cass_inet_init_v4(address);
3449 const cass_uint8_t* address) {
3450 return cass_inet_init_v6(address);
3456 return cass_row_get_column(row, index);
3461 cass_log_set_level(log_level);
3466 cass_log_set_callback(callback, data);
boost::asio::ip::address_v6 Ip6Address
boost::asio::ip::address IpAddress
boost::asio::ip::address_v4 Ip4Address
std::string GetHostIp(boost::asio::io_context *io_service, const std::string &hostname)
IpAddress AddressFromString(const std::string &ip_address_str, boost::system::error_code *ec)
boost::asio::io_context * io_service()
void GetDiffs(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
void IncrementTableReadFail(const std::string &table_name)
void IncrementTableRead(const std::string &table_name)
void IncrementErrors(IfErrors::Type type)
void IncrementTableWriteBackPressureFail(const std::string &table_name)
void IncrementTableReadBackPressureFail(const std::string &table_name)
void IncrementTableWriteFail(const std::string &table_name)
void GetCumulative(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
void IncrementTableWrite(const std::string &table_name)
boost::function< void(size_t)> DbQueueWaterMarkCb
boost::function< void(DbOpResult::type, std::auto_ptr< ColList >)> DbGetRowCb
boost::function< void(DbOpResult::type)> DbAddColumnCb
@ ERR_WRITE_COLUMN_FAMILY
The TaskScheduler keeps track of what tasks are currently schedulable. When a task is enqueued it is ...
void Enqueue(Task *task)
Enqueues a task for running. Starts task if all policy rules are met else puts task in waitq....
static TaskScheduler * GetInstance()
Task is a class to describe a computational task within OpenSDN control plane applications....
impl::CassSessionPtr session_
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
bool DisconnectSchemaSync()
impl::CassClusterPtr cluster_
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
bool IsTableDynamic(const std::string &table)
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
impl::CassSessionPtr schema_session_
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
static const char * kTaskName
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
bool GetMetrics(Metrics *metrics) const
void SetRequestTimeout(uint32_t timeout_ms)
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
CassPreparedMapType insert_prepared_map_
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
static const char * kQCreateKeyspaceIfNotExists
bool IsTablePresent(const std::string &table)
std::string schema_contact_point_
interface::CassLibrary * cci_
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
bool IsInsertIntoTablePrepareSupported(const std::string &table)
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
std::atomic< SessionState::type > session_state_
std::atomic< SessionState::type > schema_session_state_
static const char * kQUseKeyspace
int IsTableStatic(const std::string &table)
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
boost::scoped_ptr< CqlIfImpl > impl_
boost::scoped_ptr< interface::CassLibrary > cci_
void IncrementTableWriteStats(const std::string &table_name)
virtual void Db_SetQueueWaterMark(bool high, size_t queue_count, DbQueueWaterMarkCb cb)
virtual bool Db_GetMultiRow(GenDb::ColListVec *out, const std::string &cfname, const std::vector< GenDb::DbDataValueVec > &v_rowkey)
virtual bool Db_GetCumulativeStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe) const
virtual bool Db_GetRow(GenDb::ColList *out, const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency)
virtual bool Db_CreateIndex(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode=GenDb::ColIndexMode::NONE)
virtual void Db_SetInitDone(bool)
virtual bool Db_GetCqlMetrics(Metrics *metrics) const
GenDb::GenDbIfStats stats_
virtual bool Db_AddSetTablespace(const std::string &tablespace, const std::string &replication_factor="1")
virtual void Db_ResetQueueWaterMarks()
virtual bool Db_GetCqlStats(DbStats *db_stats) const
void IncrementErrors(GenDb::IfErrors::Type err_type)
virtual bool Db_GetRowAsync(const std::string &cfname, const GenDb::DbDataValueVec &rowkey, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbGetRowCb cb)
void IncrementTableWriteFailStats(const std::string &table_name)
virtual bool Db_AddColumnfamily(const GenDb::NewCf &cf, const std::string &compaction_strategy)
virtual bool Db_GetAllRows(GenDb::ColListVec *out, const std::string &cfname, GenDb::DbConsistency::type dconsistency)
virtual std::vector< GenDb::Endpoint > Db_GetEndpoints() const
virtual bool Db_AddColumn(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency, GenDb::GenDbIf::DbAddColumnCb cb)
virtual bool Db_AddColumnSync(std::auto_ptr< GenDb::ColList > cl, GenDb::DbConsistency::type dconsistency)
void IncrementTableReadFailStats(const std::string &table_name)
void IncrementTableReadBackPressureFailStats(const std::string &table_name)
virtual bool Db_GetQueueStats(uint64_t *queue_count, uint64_t *enqueues) const
virtual bool Db_SetTablespace(const std::string &tablespace)
void IncrementTableWriteBackPressureFailStats(const std::string &table_name)
void OnAsyncColumnAddCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbAddColumnCb cb)
virtual bool Db_GetStats(std::vector< GenDb::DbTableInfo > *vdbti, GenDb::DbErrors *dbe)
void OnAsyncRowGetCompletion(GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row, std::string cfname, GenDb::GenDbIf::DbGetRowCb cb)
std::vector< GenDb::Endpoint > endpoints_
std::atomic< bool > initialized_
void IncrementTableReadStats(const std::string &table_name)
bool use_prepared_for_insert_
virtual bool Db_UseColumnfamily(const GenDb::NewCf &cf)
void operator()(const boost::uuids::uuid &tuuid) const
void operator()(const uint64_t &tu64) const
CassQueryPrinter(std::ostream &os, bool quote_strings)
void operator()(const IpAddress &tipaddr) const
void operator()(const std::string &tstring) const
void operator()(const uint32_t &tu32) const
void operator()(const T &t) const
void operator()(const uint8_t &tu8) const
CassQueryPrinter(std::ostream &os)
void operator()(const double &tdouble, size_t index) const
void operator()(const IpAddress &tipaddr, size_t index) const
void operator()(const boost::uuids::uuid &tuuid, size_t index) const
void operator()(const uint16_t &tu16, size_t index) const
void operator()(const std::string &tstring, size_t index) const
void operator()(const uint32_t &tu32, size_t index) const
CassStatementIndexBinder(interface::CassLibrary *cci, CassStatement *statement)
void operator()(const uint64_t &tu64, size_t index) const
void operator()(const GenDb::Blob &tblob, size_t index) const
void operator()(const uint8_t &tu8, size_t index) const
void operator()(const boost::blank &tblank, size_t index) const
interface::CassLibrary * cci_
CassStatement * statement_
void operator()(const uint32_t &tu32, const char *name) const
CassStatementNameBinder(interface::CassLibrary *cci, CassStatement *statement)
void operator()(const std::string &tstring, const char *name) const
void operator()(const boost::blank &tblank, const char *name) const
void operator()(const boost::uuids::uuid &tuuid, const char *name) const
void operator()(const GenDb::Blob &tblob, const char *name) const
void operator()(const double &tdouble, const char *name) const
CassStatement * statement_
void operator()(const uint64_t &tu64, const char *name) const
void operator()(const uint8_t &tu8, const char *name) const
interface::CassLibrary * cci_
void operator()(const IpAddress &tipaddr, const char *name) const
void operator()(const uint16_t &tu16, const char *name) const
std::string Description() const
Gives a description of the task.
WorkerTask(FunctionPtr func, int task_id, int task_instance)
boost::function< void(void)> FunctionPtr
bool Run()
Code to execute in a task. Returns true if task is completed. Return false to reschedule the task.
virtual CassError CassStatementBindDouble(CassStatement *statement, size_t index, cass_double_t value)
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)
virtual void CassFutureFree(CassFuture *future)
virtual CassInet CassInetInitV6(const cass_uint8_t *address)
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)
virtual void CassResultFree(const CassResult *result)
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)
virtual CassInet CassInetInitV4(const cass_uint8_t *address)
virtual CassError CassStatementBindDoubleByName(CassStatement *statement, const char *name, cass_double_t value)
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)
virtual CassError CassStatementBindStringByNameN(CassStatement *statement, const char *name, size_t name_length, const char *value, size_t value_length)
virtual void CassSchemaMetaFree(const CassSchemaMeta *schema_meta)
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)
virtual void CassLogSetLevel(CassLogLevel log_level)
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)
virtual CassError CassStatementBindUuid(CassStatement *statement, size_t index, CassUuid value)
virtual void CassIteratorFree(CassIterator *iterator)
virtual CassIterator * CassIteratorFromResult(const CassResult *result)
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)
virtual CassError CassStatementBindBytes(CassStatement *statement, size_t index, const cass_byte_t *value, size_t value_length)
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)
virtual CassFuture * CassSessionClose(CassSession *session)
virtual CassSsl * CassSslNew()
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)
virtual ~CassDatastaxLibrary()
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)
virtual void CassPreparedFree(const CassPrepared *prepared)
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)
virtual size_t CassResultColumnCount(const CassResult *result)
virtual void CassClusterFree(CassCluster *cluster)
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)
virtual CassError CassStatementBindStringN(CassStatement *statement, size_t index, const char *value, size_t value_length)
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)
virtual void CassSslFree(CassSsl *ssl)
virtual const CassResult * CassFutureGetResult(CassFuture *future)
virtual CassValueType GetCassValueType(const CassValue *value)
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)
virtual CassError CassStatementBindInet(CassStatement *statement, size_t index, CassInet value)
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)
virtual cass_bool_t CassValueIsNull(const CassValue *value)
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)
virtual void CassFutureWait(CassFuture *future)
virtual CassError CassStatementBindBytesByNameN(CassStatement *statement, const char *name, size_t name_length, const cass_byte_t *value, size_t value_length)
virtual CassError CassStatementBindUuidByName(CassStatement *statement, const char *name, CassUuid value)
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)
virtual CassSession * CassSessionNew()
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)
virtual CassError CassStatementBindInt64(CassStatement *statement, size_t index, cass_int64_t value)
virtual void CassLogSetCallback(CassLogCallback callback, void *data)
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)
virtual CassError CassStatementBindInt64ByName(CassStatement *statement, const char *name, cass_int64_t value)
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)
virtual void CassSessionFree(CassSession *session)
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)
virtual void CassStatementFree(CassStatement *statement)
virtual CassError CassStatementBindInt32ByName(CassStatement *statement, const char *name, cass_int32_t value)
virtual CassError CassStatementBindInetByName(CassStatement *statement, const char *name, CassInet value)
virtual CassCluster * CassClusterNew()
virtual CassError CassFutureErrorCode(CassFuture *future)
virtual CassError CassValueGetInet(const CassValue *value, CassInet *output)=0
virtual CassError CassValueGetDouble(const CassValue *value, cass_double_t *output)=0
virtual CassError CassClusterSetPendingRequestsLowWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual CassStatement * CassStatementNew(const char *query, size_t parameter_count)=0
virtual size_t CassResultColumnCount(const CassResult *result)=0
virtual void CassClusterSetCredentials(CassCluster *cluster, const char *username, const char *password)=0
virtual const CassValue * CassRowGetColumn(const CassRow *row, size_t index)=0
virtual const CassKeyspaceMeta * CassSchemaMetaKeyspaceByName(const CassSchemaMeta *schema_meta, const char *keyspace)=0
virtual CassError CassClusterSetWriteBytesHighWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassSsl * CassSslNew()=0
virtual void CassClusterSetWhitelistFiltering(CassCluster *cluster, const char *hosts)=0
virtual void CassSessionGetMetrics(const CassSession *session, CassMetrics *output)=0
virtual CassError CassClusterSetPendingRequestsHighWaterMark(CassCluster *cluster, unsigned num_requests)=0
virtual cass_bool_t CassIteratorNext(CassIterator *iterator)=0
virtual CassError CassClusterSetWriteBytesLowWaterMark(CassCluster *cluster, unsigned num_bytes)=0
virtual CassSession * CassSessionNew()=0
virtual CassError CassValueGetInt32(const CassValue *value, cass_int32_t *output)=0
virtual size_t CassTableMetaClusteringKeyCount(const CassTableMeta *table_meta)=0
virtual CassFuture * CassSessionConnect(CassSession *session, const CassCluster *cluster)=0
virtual CassError CassResultColumnName(const CassResult *result, size_t index, const char **name, size_t *name_length)=0
virtual CassError CassClusterSetNumThreadsIo(CassCluster *cluster, unsigned num_threads)=0
virtual CassFuture * CassSessionPrepare(CassSession *session, const char *query)=0
virtual CassValueType GetCassValueType(const CassValue *value)=0
virtual CassError CassStatementSetConsistency(CassStatement *statement, CassConsistency consistency)=0
virtual const CassResult * CassFutureGetResult(CassFuture *future)=0
virtual CassError CassValueGetInt64(const CassValue *value, cass_int64_t *output)=0
virtual CassError CassValueGetInt8(const CassValue *value, cass_int8_t *output)=0
virtual CassIterator * CassIteratorFromResult(const CassResult *result)=0
virtual const CassRow * CassIteratorGetRow(const CassIterator *iterator)=0
virtual CassError CassFutureSetCallback(CassFuture *future, CassFutureCallback callback, void *data)=0
virtual size_t CassTableMetaPartitionKeyCount(const CassTableMeta *table_meta)=0
virtual CassError CassValueGetInt16(const CassValue *value, cass_int16_t *output)=0
virtual const CassSchemaMeta * CassSessionGetSchemaMeta(const CassSession *session)=0
virtual CassFuture * CassSessionExecute(CassSession *session, const CassStatement *statement)=0
virtual CassError CassValueGetUuid(const CassValue *value, CassUuid *output)=0
virtual CassError CassClusterSetContactPoints(CassCluster *cluster, const char *contact_points)=0
virtual CassStatement * CassPreparedBind(const CassPrepared *prepared)=0
virtual void CassFutureWait(CassFuture *future)=0
virtual CassError CassFutureErrorCode(CassFuture *future)=0
virtual void CassClusterSetSsl(CassCluster *cluster, CassSsl *ssl)=0
virtual cass_bool_t CassValueIsNull(const CassValue *value)=0
virtual CassError CassSslAddTrustedCert(CassSsl *ssl, const std::string &cert)=0
virtual CassError CassValueGetBytes(const CassValue *value, const cass_byte_t **output, size_t *output_size)=0
virtual CassFuture * CassSessionClose(CassSession *session)=0
virtual CassError CassValueGetString(const CassValue *value, const char **output, size_t *output_size)=0
virtual const CassTableMeta * CassKeyspaceMetaTableByName(const CassKeyspaceMeta *keyspace_meta, const char *table)=0
virtual const CassPrepared * CassFutureGetPrepared(CassFuture *future)=0
virtual void CassSslSetVerifyFlags(CassSsl *ssl, int flags)=0
virtual void CassClusterSetRequestTimeout(CassCluster *cluster, unsigned timeout_ms)=0
virtual void CassFutureErrorMessage(CassFuture *future, const char **message, size_t *message_length)=0
virtual CassError CassClusterSetPort(CassCluster *cluster, int port)=0
virtual CassError CassStatementBindInt32(CassStatement *statement, size_t index, cass_int32_t value)=0
SandeshTraceBufferPtr CqlTraceDebugBuf(SandeshTraceBufferCreate(CQLIF_DEBUG, 10000))
#define CQLIF_INFO_TRACE(_Msg)
#define CASS_LIB_TRACE(_Level, _Msg)
SandeshTraceBufferPtr CqlTraceErrBuf(SandeshTraceBufferCreate(CQLIF_ERR, 20000))
#define CQLIF_DEBUG_TRACE(_Msg)
#define CQLIF_ERR_TRACE(_Msg)
SandeshTraceBufferPtr CqlTraceInfoBuf(SandeshTraceBufferCreate(CQLIF_INFO, 10000))
#define GENERIC_RAW_ARRAY(obj)
std::string DbDataValueVecToString(const GenDb::DbDataValueVec &v_db_value)
std::vector< DbDataValue > DbDataValueVec
boost::variant< boost::blank, std::string, uint64_t, uint32_t, boost::uuids::uuid, uint8_t, uint16_t, double, IpAddress, Blob > DbDataValue
std::vector< GenDb::DbDataType::type > DbDataTypeVec
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
boost::asio::ip::tcp::endpoint Endpoint
std::vector< WhereIndexInfo > WhereIndexInfoVec
boost::ptr_vector< ColList > ColListVec
boost::ptr_vector< NewCol > NewColVec
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
static std::string LoadCertFile(const std::string &ca_certs_path)
static void OnExecuteQueryAsync(CassFuture *future, void *data)
static void ExecuteQueryAsyncInternal(interface::CassLibrary *cci, CassSession *session, const char *qid, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx=NULL)
static bool DynamicCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, size_t rk_count, size_t ck_count, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
static void CassLibraryLog(const CassLogMessage *message, void *data)
void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, size_t rk_count, GenDb::ColListVec *v_col_list)
static void ExecuteQueryResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb, CassQueryResultContext *rctx)
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
static const char * kQCompactionStrategy("compaction = {'class': " "'org.apache.cassandra.db.compaction.%s'}")
static GenDb::DbDataValue CassValue2DbDataValue(interface::CassLibrary *cci, const CassValue *cvalue)
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
static bool GetCassTablePartitionKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *rk_count)
bool DynamicCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
static std::string DbDataTypes2CassTypes(const GenDb::DbDataTypeVec &v_db_types)
static std::string CassSelectFromTableInternal(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec, const GenDb::WhereIndexInfoVec &where_vec)
CassSharedPtr< CassSsl > CassSslPtr
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, size_t rk_count, size_t ck_count, CassConsistency consistency, GenDb::ColListVec *v_col_list)
static bool GetCassTableClusteringKeyCount(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table, size_t *ck_count)
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
CassSharedPtr< const CassPrepared > CassPreparedPtr
static bool PrepareSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassPreparedPtr *prepared)
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const std::vector< GenDb::DbDataValueVec > &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
static bool StaticCfGetResultAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, impl::CassAsyncQueryCallback cb, const std::string &cfname, const GenDb::DbDataValueVec &row_key)
static bool IsCassTableMetaPresent(interface::CassLibrary *cci, CassSession *session, const std::string &keyspace, const std::string &table)
static std::string DbColIndexMode2String(const GenDb::ColIndexMode::type index_mode)
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
static CassConsistency Db2CassConsistency(GenDb::DbConsistency::type dconsistency)
static const std::string kQReadRepairChanceDTCS("read_repair_chance = 0.0")
CassSharedPtr< const CassResult > CassResultPtr
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, size_t rk_count, CassConsistency consistency, GenDb::ColListVec *v_col_list)
static bool ExecuteQueryResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassResultPtr *result, CassConsistency consistency)
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
std::string CassSelectFromTable(const std::string &table)
static void ExecuteQueryStatementAsync(interface::CassLibrary *cci, CassSession *session, const char *query_id, CassStatement *qstatement, CassConsistency consistency, CassAsyncQueryCallback cb)
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
static const char * DbDataType2CassType(const GenDb::DbDataType::type &db_type)
static void ExecuteQueryAsync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, CassAsyncQueryCallback cb)
static void encode_uuid(char *output, const CassUuid &uuid)
bool StaticCf2CassPrepareBind(interface::CassLibrary *cci, CassStatement *statement, const GenDb::ColList *v_columns)
static GenDb::DbOpResult::type CassError2DbOpResult(CassError rc)
static char * decode_uuid(char *input, CassUuid *output)
void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, size_t rk_count, size_t ck_count, GenDb::ColListVec *v_col_list)
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
static bool StaticCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency, GenDb::NewColVec *v_columns)
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
static log4cplus::LogLevel Cass2log4Level(CassLogLevel clevel)
static bool SyncFutureWait(interface::CassLibrary *cci, CassFuture *future)
static const CassTableMeta * GetCassTableMeta(interface::CassLibrary *cci, const CassSchemaMeta *schema_meta, const std::string &keyspace, const std::string &table, bool log_error)
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
static bool ExecuteQueryStatementSync(interface::CassLibrary *cci, CassSession *session, CassStatement *statement, CassConsistency consistency)
static bool ExecuteQuerySync(interface::CassLibrary *cci, CassSession *session, const char *query, CassConsistency consistency)
static bool DynamicCfGetResultSync(interface::CassLibrary *cci, CassSession *session, const char *query, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, GenDb::NewColVec *v_columns)
static CassLogLevel Log4Level2CassLogLevel(log4cplus::LogLevel level)
static bool ExecuteQuerySyncInternal(interface::CassLibrary *cci, CassSession *session, CassStatement *qstatement, CassResultPtr *result, CassConsistency consistency)
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
static const std::string kQGCGraceSeconds("gc_grace_seconds = 0")
static void AsyncRowGetCompletionCallback(boost::shared_ptr< AsyncRowGetCallbackContext > cb_ctx)
boost::shared_ptr< TraceBuffer< SandeshTrace > > SandeshTraceBufferPtr
SandeshTraceBufferPtr SandeshTraceBufferCreate(const std::string &buf_name, size_t buf_size, bool trace_enable=true)
static const std::string integerToString(const NumberType &num)
const uint8_t * data() const
std::string ToString() const
std::map< std::string, GenDb::DbDataType::type > ColumnMap
DbDataTypeVec clustering_columns_
DbDataTypeVec partition_keys_
boost::scoped_ptr< DbDataValueVec > value
boost::scoped_ptr< DbDataValueVec > name
NewCf::ColumnFamilyType cftype_
static std::string ToString(Op::type op)
std::auto_ptr< GenDb::ColList > row_
GenDb::GenDbIf::DbGetRowCb cb_
AsyncRowGetCallbackContext(GenDb::GenDbIf::DbGetRowCb cb, GenDb::DbOpResult::type drc, std::auto_ptr< GenDb::ColList > row)
GenDb::DbOpResult::type drc_
GenDb::DbDataValueVec row_key_
CassString(const char *data)
CassString(const char *data, size_t length)