OpenSDN source code
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
cql_if_impl.h
Go to the documentation of this file.
1 //
2 // Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3 //
4 
5 #ifndef DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
6 #define DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
7 
8 #include <string>
9 
10 #include <boost/unordered_map.hpp>
11 
12 #include <cassandra.h>
13 
14 #include <io/event_manager.h>
15 #include <base/timer.h>
16 #include <database/gendb_if.h>
17 #include <database/cassandra/cql/cql_types.h>
19 
20 namespace cass {
21 namespace cql {
22 namespace impl {
23 
25  const std::string &compaction_strategy);
27  const std::string &compaction_strategy,
28  boost::system::error_code *ec);
29 std::string CassCreateIndexIfNotExists(const std::string &cfname,
30  const std::string &column, const std::string &indexname,
31  const GenDb::ColIndexMode::type index_mode);
32 std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
33 std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
36  boost::system::error_code *ec);
37 std::string CassSelectFromTable(const std::string &table);
39  const std::string &table, const GenDb::DbDataValueVec &rkeys,
40  const GenDb::ColumnNameRange &ck_range,
41  const GenDb::WhereIndexInfoVec &where_vec,
43 std::string PartitionKey2CassSelectFromTable(const std::string &table,
44  const GenDb::DbDataValueVec &rkeys);
46  const std::string &table, const GenDb::DbDataValueVec &rkeys,
47  const GenDb::ColumnNameRange &crange,
50  const std::string &table, const std::vector<GenDb::DbDataValueVec> &rkeys,
51  const GenDb::ColumnNameRange &crange,
53 
54 // CQL Library Shared Pointers to handle library free calls
55 template<class T>
56 struct Deleter;
57 
58 template<>
59 struct Deleter<CassCluster> {
61  cci_(cci) {}
62  void operator()(CassCluster *ptr) {
63  if (ptr != NULL) {
64  cci_->CassClusterFree(ptr);
65  }
66  }
68 };
69 
70 template<>
71 struct Deleter<CassSsl> {
73  cci_(cci) {}
74  void operator()(CassSsl *ptr) {
75  if (ptr != NULL) {
76  cci_->CassSslFree(ptr);
77  }
78  }
80 };
81 
82 template<>
83 struct Deleter<CassSession> {
85  cci_(cci) {}
86  void operator()(CassSession* ptr) {
87  if (ptr != NULL) {
88  cci_->CassSessionFree(ptr);
89  }
90  }
92 };
93 
94 template<>
95 struct Deleter<CassFuture> {
97  cci_(cci) {}
98  void operator()(CassFuture* ptr) {
99  if (ptr != NULL) {
100  cci_->CassFutureFree(ptr);
101  }
102  }
104 };
105 
106 template<>
107 struct Deleter<CassStatement> {
109  cci_(cci) {}
110  void operator()(CassStatement* ptr) {
111  if (ptr != NULL) {
112  cci_->CassStatementFree(ptr);
113  }
114  }
116 };
117 
118 template<>
119 struct Deleter<const CassResult> {
121  cci_(cci) {}
122  void operator()(const CassResult* ptr) {
123  if (ptr != NULL) {
124  cci_->CassResultFree(ptr);
125  }
126  }
128 };
129 
130 template<>
131 struct Deleter<CassIterator> {
133  cci_(cci) {}
134  void operator()(CassIterator* ptr) {
135  if (ptr != NULL) {
136  cci_->CassIteratorFree(ptr);
137  }
138  }
140 };
141 
142 template<>
143 struct Deleter<const CassPrepared> {
145  cci_(cci) {}
146  void operator()(const CassPrepared* ptr) {
147  if (ptr != NULL) {
148  cci_->CassPreparedFree(ptr);
149  }
150  }
152 };
153 
154 template<>
155 struct Deleter<const CassSchemaMeta> {
157  cci_(cci) {}
158  void operator()(const CassSchemaMeta* ptr) {
159  if (ptr != NULL) {
160  cci_->CassSchemaMetaFree(ptr);
161  }
162  }
164 };
165 
166 template <class T>
167 class CassSharedPtr : public boost::shared_ptr<T> {
168  public:
170  boost::shared_ptr<T>(ptr, Deleter<T>(cci)) {}
171 };
172 
182 
183 typedef boost::function<void(GenDb::DbOpResult::type,
184  std::auto_ptr<GenDb::ColList>)> CassAsyncQueryCallback;
185 
187  CassQueryResultContext(const std::string &cf_name, bool is_dynamic_cf,
188  const GenDb::DbDataValueVec &row_key, size_t rk_count = 0,
189  size_t ck_count = 0) :
190  cf_name_(cf_name),
191  is_dynamic_cf_(is_dynamic_cf),
192  row_key_(row_key),
193  rk_count_(rk_count),
194  ck_count_(ck_count) {
195  }
196  std::string cf_name_;
199  size_t rk_count_;
200  size_t ck_count_;
201 };
202 
205  interface::CassLibrary *cci, CassQueryResultContext *rctx = NULL) :
206  query_id_(query_id),
207  cb_(cb),
208  cci_(cci),
209  result_ctx_(rctx) {
210  }
211  std::string query_id_;
214  boost::scoped_ptr<CassQueryResultContext> result_ctx_;
215 };
216 
218  CassResultPtr *result, size_t rk_count,
219  size_t ck_count, GenDb::ColListVec *v_col_list);
221  CassResultPtr *result, size_t rk_count,
222  GenDb::ColListVec *v_col_list);
223 
224 } // namespace impl
225 
226 //
227 // CqlIfImpl
228 //
229 class CqlIfImpl {
230  public:
232  const std::vector<std::string> &cassandra_ips,
233  int cassandra_port,
234  const std::string &cassandra_user,
235  const std::string &cassandra_password,
236  bool use_ssl,
237  const std::string &ca_certs_path,
239  virtual ~CqlIfImpl();
240 
241  bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace,
242  const std::string &replication_factor, CassConsistency consistency);
243  bool UseKeyspaceSync(const std::string &keyspace,
244  CassConsistency consistency);
245  bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace,
246  CassConsistency consistency);
247 
249  const std::string &compaction_strategy, CassConsistency consistency);
250  bool CreateIndexIfNotExistsSync(const std::string &cfname,
251  const std::string &column, const std::string &indexname,
252  CassConsistency consistency,
253  const GenDb::ColIndexMode::type index_mode);
255  bool IsTablePresent(const std::string &table);
256  int IsTableStatic(const std::string &table);
257  bool IsTableDynamic(const std::string &table);
258 
259  bool InsertIntoTableSync(std::auto_ptr<GenDb::ColList> v_columns,
260  CassConsistency consistency);
261  bool InsertIntoTableAsync(std::auto_ptr<GenDb::ColList> v_columns,
262  CassConsistency consistency, impl::CassAsyncQueryCallback cb);
263  bool InsertIntoTablePrepareAsync(std::auto_ptr<GenDb::ColList> v_columns,
264  CassConsistency consistency, impl::CassAsyncQueryCallback cb);
265  bool IsInsertIntoTablePrepareSupported(const std::string &table);
266 
267  bool SelectFromTableSync(const std::string &cfname,
268  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
269  GenDb::NewColVec *out);
270  bool SelectFromTableSync(const std::string &cfname,
271  CassConsistency consistency, GenDb::ColListVec *out);
272  bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname,
273  const GenDb::DbDataValueVec &rkey,
274  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
275  GenDb::NewColVec *out);
276  bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname,
277  const GenDb::DbDataValueVec &rkey,
278  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
279  const GenDb::FieldNamesToReadVec &read_vec,
280  GenDb::NewColVec *out);
281 
282  bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname,
283  const std::vector<GenDb::DbDataValueVec> &rkeys,
284  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
285  const GenDb::FieldNamesToReadVec &read_vec,
286  GenDb::ColListVec *out);
287 
288  bool SelectFromTableAsync(const std::string &cfname,
289  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
291  bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname,
292  const GenDb::DbDataValueVec &rkey,
293  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
296  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
297  const GenDb::ColumnNameRange &ck_range,
298  const GenDb::WhereIndexInfoVec &where_vec,
299  const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency,
301 
302  bool ConnectSync();
303  bool ConnectSchemaSync();
304  bool DisconnectSync();
305  bool DisconnectSchemaSync();
306 
307  void SetRequestTimeout(uint32_t timeout_ms);
308 
309  bool GetMetrics(Metrics *metrics) const;
310 
311  private:
312 
313  bool InsertIntoTableInternal(std::auto_ptr<GenDb::ColList> v_columns,
314  CassConsistency consistency, bool sync,
316  bool GetPrepareInsertIntoTable(const std::string &table_name,
317  impl::CassPreparedPtr *prepared) const;
319  impl::CassPreparedPtr *prepared);
320  bool InsertIntoTablePrepareInternal(std::auto_ptr<GenDb::ColList> v_columns,
321  CassConsistency consistency, bool sync,
323 
324  static const char * kQCreateKeyspaceIfNotExists;
325  static const char * kQUseKeyspace;
326  static const char * kTaskName;
327  static const int kTaskInstance = -1;
328 
329  struct SessionState {
330  enum type {
334  };
335  };
336 
343  tbb::atomic<SessionState::type> session_state_;
344  tbb::atomic<SessionState::type> schema_session_state_;
346  std::string keyspace_;
348  typedef boost::unordered_map<std::string, impl::CassPreparedPtr>
351  mutable tbb::mutex map_mutex_;
352 };
353 
354 } // namespace cql
355 } // namespace cass
356 
357 #endif // DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
CassSharedPtr(T *ptr, interface::CassLibrary *cci)
Definition: cql_if_impl.h:169
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:2019
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:84
void SetRequestTimeout(uint32_t timeout_ms)
Definition: cql_if.cc:2276
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2159
interface::CassLibrary * cci_
Definition: cql_if_impl.h:67
tbb::atomic< SessionState::type > session_state_
Definition: cql_if_impl.h:343
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
Definition: gendb_if.h:238
CassSharedPtr< CassSsl > CassSslPtr
Definition: cql_if_impl.h:174
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
Definition: cql_if.cc:741
interface::CassLibrary * cci_
Definition: cql_if_impl.h:79
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:108
CassSharedPtr< const CassPrepared > CassPreparedPtr
Definition: cql_if_impl.h:180
CassSharedPtr< CassIterator > CassIteratorPtr
Definition: cql_if_impl.h:179
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:72
static const char * kTaskName
Definition: cql_if_impl.h:326
bool IsTableDynamic(const std::string &table)
Definition: cql_if.cc:2145
CqlIfImpl(EventManager *evm, const std::vector< std::string > &cassandra_ips, int cassandra_port, const std::string &cassandra_user, const std::string &cassandra_password, bool use_ssl, const std::string &ca_certs_path, interface::CassLibrary *cci)
Definition: cql_if.cc:1846
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2099
tbb::mutex map_mutex_
Definition: cql_if_impl.h:351
impl::CassClusterPtr cluster_
Definition: cql_if_impl.h:339
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:659
virtual ~CqlIfImpl()
Definition: cql_if.cc:1904
static const char * kQCreateKeyspaceIfNotExists
Definition: cql_if_impl.h:324
impl::CassSessionPtr session_
Definition: cql_if_impl.h:341
CassAsyncQueryContext(const char *query_id, CassAsyncQueryCallback cb, interface::CassLibrary *cci, CassQueryResultContext *rctx=NULL)
Definition: cql_if_impl.h:204
boost::unordered_map< std::string, impl::CassPreparedPtr > CassPreparedMapType
Definition: cql_if_impl.h:349
void operator()(CassSession *ptr)
Definition: cql_if_impl.h:86
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
Definition: cql_if.cc:1010
void operator()(const CassResult *ptr)
Definition: cql_if_impl.h:122
impl::CassSslPtr ssl_
Definition: cql_if_impl.h:340
static void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1215
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2074
static const int kTaskInstance
Definition: cql_if_impl.h:327
boost::ptr_vector< NewCol > NewColVec
Definition: gendb_if.h:186
CassSharedPtr< CassSession > CassSessionPtr
Definition: cql_if_impl.h:175
CassQueryResultContext(const std::string &cf_name, bool is_dynamic_cf, const GenDb::DbDataValueVec &row_key, size_t rk_count=0, size_t ck_count=0)
Definition: cql_if_impl.h:187
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:999
CassPreparedMapType insert_prepared_map_
Definition: cql_if_impl.h:350
boost::scoped_ptr< CassQueryResultContext > result_ctx_
Definition: cql_if_impl.h:214
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2173
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:2007
uint8_t type
Definition: load_balance.h:109
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
Definition: cql_if.cc:2149
static void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1403
CassSharedPtr< CassStatement > CassStatementPtr
Definition: cql_if_impl.h:177
std::vector< DbDataValue > DbDataValueVec
Definition: gendb_if.h:100
static const char * kQUseKeyspace
Definition: cql_if_impl.h:325
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:120
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
Definition: cql_if.cc:2222
CassSharedPtr< CassCluster > CassClusterPtr
Definition: cql_if_impl.h:173
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:606
void operator()(CassFuture *ptr)
Definition: cql_if_impl.h:98
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:430
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:580
interface::CassLibrary * cci_
Definition: cql_if_impl.h:338
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
Definition: cql_if.cc:1911
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1019
std::vector< WhereIndexInfo > WhereIndexInfoVec
Definition: gendb_if.h:233
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2458
bool GetMetrics(Metrics *metrics) const
Definition: cql_if.cc:2356
EventManager * evm_
Definition: cql_if_impl.h:337
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
Definition: cql_if_impl.h:184
CassSharedPtr< CassFuture > CassFuturePtr
Definition: cql_if_impl.h:176
boost::ptr_vector< ColList > ColListVec
Definition: gendb_if.h:208
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
Definition: cql_if.cc:470
int IsTableStatic(const std::string &table)
Definition: cql_if.cc:2058
bool IsTablePresent(const std::string &table)
Definition: cql_if.cc:2050
void operator()(CassCluster *ptr)
Definition: cql_if_impl.h:62
bool ConnectSchemaSync()
Definition: cql_if.cc:2281
void operator()(CassIterator *ptr)
Definition: cql_if_impl.h:134
std::string CassSelectFromTable(const std::string &table)
Definition: cql_if.cc:1037
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:718
interface::CassLibrary * cci_
Definition: cql_if_impl.h:103
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:132
void operator()(const CassSchemaMeta *ptr)
Definition: cql_if_impl.h:158
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
Definition: cql_if.cc:2038
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:96
CassSharedPtr< const CassSchemaMeta > CassSchemaMetaPtr
Definition: cql_if_impl.h:181
void operator()(CassStatement *ptr)
Definition: cql_if_impl.h:110
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2123
std::string schema_contact_point_
Definition: cql_if_impl.h:345
impl::CassSessionPtr schema_session_
Definition: cql_if_impl.h:342
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2404
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:144
CassSharedPtr< const CassResult > CassResultPtr
Definition: cql_if_impl.h:178
tbb::atomic< SessionState::type > schema_session_state_
Definition: cql_if_impl.h:344
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2154
std::string keyspace_
Definition: cql_if_impl.h:346
interface::CassLibrary * cci_
Definition: cql_if_impl.h:91
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:60
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
Definition: cql_if.cc:2428
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2255
bool IsInsertIntoTablePrepareSupported(const std::string &table)
Definition: cql_if.cc:2168
void operator()(const CassPrepared *ptr)
Definition: cql_if_impl.h:146
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1950
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
Definition: cql_if.cc:1972
static EventManager evm
bool DisconnectSchemaSync()
Definition: cql_if.cc:2342
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1928
interface::CassLibrary * cci_
Definition: cql_if_impl.h:213