OpenSDN source code
cql_if_impl.h
Go to the documentation of this file.
1 //
2 // Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3 //
4 
5 #ifndef DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
6 #define DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
7 
8 #include <atomic>
9 #include <string>
10 #include <mutex>
11 
12 #include <boost/unordered_map.hpp>
13 
14 #include <linux/version.h>
15 #if defined(RHEL_MAJOR) && (RHEL_MAJOR >= 9)
16 #include <cassandra/cassandra.h>
17 #else
18 #include <cassandra.h>
19 #endif
20 
21 #include <io/event_manager.h>
22 #include <base/timer.h>
23 #include <database/gendb_if.h>
24 #include <database/cassandra/cql/cql_types.h>
26 
27 namespace cass {
28 namespace cql {
29 namespace impl {
30 
32  const std::string &compaction_strategy);
34  const std::string &compaction_strategy,
35  boost::system::error_code *ec);
36 std::string CassCreateIndexIfNotExists(const std::string &cfname,
37  const std::string &column, const std::string &indexname,
38  const GenDb::ColIndexMode::type index_mode);
39 std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
40 std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
43  boost::system::error_code *ec);
44 std::string CassSelectFromTable(const std::string &table);
46  const std::string &table, const GenDb::DbDataValueVec &rkeys,
47  const GenDb::ColumnNameRange &ck_range,
48  const GenDb::WhereIndexInfoVec &where_vec,
50 std::string PartitionKey2CassSelectFromTable(const std::string &table,
51  const GenDb::DbDataValueVec &rkeys);
53  const std::string &table, const GenDb::DbDataValueVec &rkeys,
54  const GenDb::ColumnNameRange &crange,
57  const std::string &table, const std::vector<GenDb::DbDataValueVec> &rkeys,
58  const GenDb::ColumnNameRange &crange,
60 
61 // CQL Library Shared Pointers to handle library free calls
62 template<class T>
63 struct Deleter;
64 
65 template<>
66 struct Deleter<CassCluster> {
68  cci_(cci) {}
69  void operator()(CassCluster *ptr) {
70  if (ptr != NULL) {
71  cci_->CassClusterFree(ptr);
72  }
73  }
75 };
76 
77 template<>
78 struct Deleter<CassSsl> {
80  cci_(cci) {}
81  void operator()(CassSsl *ptr) {
82  if (ptr != NULL) {
83  cci_->CassSslFree(ptr);
84  }
85  }
87 };
88 
89 template<>
90 struct Deleter<CassSession> {
92  cci_(cci) {}
93  void operator()(CassSession* ptr) {
94  if (ptr != NULL) {
95  cci_->CassSessionFree(ptr);
96  }
97  }
99 };
100 
101 template<>
102 struct Deleter<CassFuture> {
104  cci_(cci) {}
105  void operator()(CassFuture* ptr) {
106  if (ptr != NULL) {
107  cci_->CassFutureFree(ptr);
108  }
109  }
111 };
112 
113 template<>
114 struct Deleter<CassStatement> {
116  cci_(cci) {}
117  void operator()(CassStatement* ptr) {
118  if (ptr != NULL) {
119  cci_->CassStatementFree(ptr);
120  }
121  }
123 };
124 
125 template<>
126 struct Deleter<const CassResult> {
128  cci_(cci) {}
129  void operator()(const CassResult* ptr) {
130  if (ptr != NULL) {
131  cci_->CassResultFree(ptr);
132  }
133  }
135 };
136 
137 template<>
138 struct Deleter<CassIterator> {
140  cci_(cci) {}
141  void operator()(CassIterator* ptr) {
142  if (ptr != NULL) {
143  cci_->CassIteratorFree(ptr);
144  }
145  }
147 };
148 
149 template<>
150 struct Deleter<const CassPrepared> {
152  cci_(cci) {}
153  void operator()(const CassPrepared* ptr) {
154  if (ptr != NULL) {
155  cci_->CassPreparedFree(ptr);
156  }
157  }
159 };
160 
161 template<>
162 struct Deleter<const CassSchemaMeta> {
164  cci_(cci) {}
165  void operator()(const CassSchemaMeta* ptr) {
166  if (ptr != NULL) {
167  cci_->CassSchemaMetaFree(ptr);
168  }
169  }
171 };
172 
173 template <class T>
174 class CassSharedPtr : public boost::shared_ptr<T> {
175  public:
177  boost::shared_ptr<T>(ptr, Deleter<T>(cci)) {}
178 };
179 
189 
190 typedef boost::function<void(GenDb::DbOpResult::type,
191  std::auto_ptr<GenDb::ColList>)> CassAsyncQueryCallback;
192 
194  CassQueryResultContext(const std::string &cf_name, bool is_dynamic_cf,
195  const GenDb::DbDataValueVec &row_key, size_t rk_count = 0,
196  size_t ck_count = 0) :
197  cf_name_(cf_name),
198  is_dynamic_cf_(is_dynamic_cf),
199  row_key_(row_key),
200  rk_count_(rk_count),
201  ck_count_(ck_count) {
202  }
203  std::string cf_name_;
206  size_t rk_count_;
207  size_t ck_count_;
208 };
209 
212  interface::CassLibrary *cci, CassQueryResultContext *rctx = NULL) :
213  query_id_(query_id),
214  cb_(cb),
215  cci_(cci),
216  result_ctx_(rctx) {
217  }
218  std::string query_id_;
221  boost::scoped_ptr<CassQueryResultContext> result_ctx_;
222 };
223 
225  CassResultPtr *result, size_t rk_count,
226  size_t ck_count, GenDb::ColListVec *v_col_list);
228  CassResultPtr *result, size_t rk_count,
229  GenDb::ColListVec *v_col_list);
230 
231 } // namespace impl
232 
233 //
234 // CqlIfImpl
235 //
236 class CqlIfImpl {
237  public:
239  const std::vector<std::string> &cassandra_ips,
240  int cassandra_port,
241  const std::string &cassandra_user,
242  const std::string &cassandra_password,
243  bool use_ssl,
244  const std::string &ca_certs_path,
246  virtual ~CqlIfImpl();
247 
248  bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace,
249  const std::string &replication_factor, CassConsistency consistency);
250  bool UseKeyspaceSync(const std::string &keyspace,
251  CassConsistency consistency);
252  bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace,
253  CassConsistency consistency);
254 
256  const std::string &compaction_strategy, CassConsistency consistency);
257  bool CreateIndexIfNotExistsSync(const std::string &cfname,
258  const std::string &column, const std::string &indexname,
259  CassConsistency consistency,
260  const GenDb::ColIndexMode::type index_mode);
262  bool IsTablePresent(const std::string &table);
263  int IsTableStatic(const std::string &table);
264  bool IsTableDynamic(const std::string &table);
265 
266  bool InsertIntoTableSync(std::auto_ptr<GenDb::ColList> v_columns,
267  CassConsistency consistency);
268  bool InsertIntoTableAsync(std::auto_ptr<GenDb::ColList> v_columns,
269  CassConsistency consistency, impl::CassAsyncQueryCallback cb);
270  bool InsertIntoTablePrepareAsync(std::auto_ptr<GenDb::ColList> v_columns,
271  CassConsistency consistency, impl::CassAsyncQueryCallback cb);
272  bool IsInsertIntoTablePrepareSupported(const std::string &table);
273 
274  bool SelectFromTableSync(const std::string &cfname,
275  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
276  GenDb::NewColVec *out);
277  bool SelectFromTableSync(const std::string &cfname,
278  CassConsistency consistency, GenDb::ColListVec *out);
279  bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname,
280  const GenDb::DbDataValueVec &rkey,
281  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
282  GenDb::NewColVec *out);
283  bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname,
284  const GenDb::DbDataValueVec &rkey,
285  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
286  const GenDb::FieldNamesToReadVec &read_vec,
287  GenDb::NewColVec *out);
288 
289  bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname,
290  const std::vector<GenDb::DbDataValueVec> &rkeys,
291  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
292  const GenDb::FieldNamesToReadVec &read_vec,
293  GenDb::ColListVec *out);
294 
295  bool SelectFromTableAsync(const std::string &cfname,
296  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
298  bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname,
299  const GenDb::DbDataValueVec &rkey,
300  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
303  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
304  const GenDb::ColumnNameRange &ck_range,
305  const GenDb::WhereIndexInfoVec &where_vec,
306  const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency,
308 
309  bool ConnectSync();
310  bool ConnectSchemaSync();
311  bool DisconnectSync();
312  bool DisconnectSchemaSync();
313 
314  void SetRequestTimeout(uint32_t timeout_ms);
315 
316  bool GetMetrics(Metrics *metrics) const;
317 
318  private:
319 
320  bool InsertIntoTableInternal(std::auto_ptr<GenDb::ColList> v_columns,
321  CassConsistency consistency, bool sync,
323  bool GetPrepareInsertIntoTable(const std::string &table_name,
324  impl::CassPreparedPtr *prepared) const;
326  impl::CassPreparedPtr *prepared);
327  bool InsertIntoTablePrepareInternal(std::auto_ptr<GenDb::ColList> v_columns,
328  CassConsistency consistency, bool sync,
330 
331  static const char * kQCreateKeyspaceIfNotExists;
332  static const char * kQUseKeyspace;
333  static const char * kTaskName;
334  static const int kTaskInstance = -1;
335 
336  struct SessionState {
337  enum type {
341  };
342  };
343 
350  std::atomic<SessionState::type> session_state_;
351  std::atomic<SessionState::type> schema_session_state_;
353  std::string keyspace_;
355  typedef boost::unordered_map<std::string, impl::CassPreparedPtr>
358  mutable std::mutex map_mutex_;
359 };
360 
361 } // namespace cql
362 } // namespace cass
363 
364 #endif // DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
impl::CassSessionPtr session_
Definition: cql_if_impl.h:348
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2259
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2158
bool DisconnectSchemaSync()
Definition: cql_if.cc:2346
impl::CassClusterPtr cluster_
Definition: cql_if_impl.h:346
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
Definition: cql_if.cc:1915
bool IsTableDynamic(const std::string &table)
Definition: cql_if.cc:2149
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
Definition: cql_if.cc:2432
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:2011
impl::CassSslPtr ssl_
Definition: cql_if_impl.h:347
impl::CassSessionPtr schema_session_
Definition: cql_if_impl.h:349
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2408
static const char * kTaskName
Definition: cql_if_impl.h:333
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2163
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
Definition: cql_if.cc:2042
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2078
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
Definition: cql_if.cc:2226
virtual ~CqlIfImpl()
Definition: cql_if.cc:1908
bool GetMetrics(Metrics *metrics) const
Definition: cql_if.cc:2360
void SetRequestTimeout(uint32_t timeout_ms)
Definition: cql_if.cc:2280
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2462
CassPreparedMapType insert_prepared_map_
Definition: cql_if_impl.h:357
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
Definition: cql_if.cc:1976
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1932
bool ConnectSchemaSync()
Definition: cql_if.cc:2285
static const char * kQCreateKeyspaceIfNotExists
Definition: cql_if_impl.h:331
bool IsTablePresent(const std::string &table)
Definition: cql_if.cc:2054
CqlIfImpl(EventManager *evm, const std::vector< std::string > &cassandra_ips, int cassandra_port, const std::string &cassandra_user, const std::string &cassandra_password, bool use_ssl, const std::string &ca_certs_path, interface::CassLibrary *cci)
Definition: cql_if.cc:1850
std::string schema_contact_point_
Definition: cql_if_impl.h:352
boost::unordered_map< std::string, impl::CassPreparedPtr > CassPreparedMapType
Definition: cql_if_impl.h:356
interface::CassLibrary * cci_
Definition: cql_if_impl.h:345
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1954
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:2023
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2103
EventManager * evm_
Definition: cql_if_impl.h:344
bool IsInsertIntoTablePrepareSupported(const std::string &table)
Definition: cql_if.cc:2172
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2177
std::atomic< SessionState::type > session_state_
Definition: cql_if_impl.h:350
std::string keyspace_
Definition: cql_if_impl.h:353
std::atomic< SessionState::type > schema_session_state_
Definition: cql_if_impl.h:351
std::mutex map_mutex_
Definition: cql_if_impl.h:358
static const char * kQUseKeyspace
Definition: cql_if_impl.h:332
static const int kTaskInstance
Definition: cql_if_impl.h:334
int IsTableStatic(const std::string &table)
Definition: cql_if.cc:2062
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
Definition: cql_if.cc:2153
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2127
CassSharedPtr(T *ptr, interface::CassLibrary *cci)
Definition: cql_if_impl.h:176
static EventManager evm
uint8_t type
Definition: load_balance.h:2
std::vector< DbDataValue > DbDataValueVec
Definition: gendb_if.h:100
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
Definition: gendb_if.h:238
std::vector< WhereIndexInfo > WhereIndexInfoVec
Definition: gendb_if.h:233
boost::ptr_vector< ColList > ColListVec
Definition: gendb_if.h:208
boost::ptr_vector< NewCol > NewColVec
Definition: gendb_if.h:186
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:610
static void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1219
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:722
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1023
CassSharedPtr< const CassSchemaMeta > CassSchemaMetaPtr
Definition: cql_if_impl.h:188
CassSharedPtr< CassSsl > CassSslPtr
Definition: cql_if_impl.h:181
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
Definition: cql_if.cc:745
CassSharedPtr< CassSession > CassSessionPtr
Definition: cql_if_impl.h:182
CassSharedPtr< const CassPrepared > CassPreparedPtr
Definition: cql_if_impl.h:187
CassSharedPtr< CassIterator > CassIteratorPtr
Definition: cql_if_impl.h:186
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
Definition: cql_if.cc:474
CassSharedPtr< const CassResult > CassResultPtr
Definition: cql_if_impl.h:185
CassSharedPtr< CassCluster > CassClusterPtr
Definition: cql_if_impl.h:180
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:584
std::string CassSelectFromTable(const std::string &table)
Definition: cql_if.cc:1041
static void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1407
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1003
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:663
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
Definition: cql_if_impl.h:191
CassSharedPtr< CassStatement > CassStatementPtr
Definition: cql_if_impl.h:184
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:434
CassSharedPtr< CassFuture > CassFuturePtr
Definition: cql_if_impl.h:183
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
Definition: cql_if.cc:1014
interface::CassLibrary * cci_
Definition: cql_if_impl.h:220
boost::scoped_ptr< CassQueryResultContext > result_ctx_
Definition: cql_if_impl.h:221
CassAsyncQueryContext(const char *query_id, CassAsyncQueryCallback cb, interface::CassLibrary *cci, CassQueryResultContext *rctx=NULL)
Definition: cql_if_impl.h:211
CassQueryResultContext(const std::string &cf_name, bool is_dynamic_cf, const GenDb::DbDataValueVec &row_key, size_t rk_count=0, size_t ck_count=0)
Definition: cql_if_impl.h:194
interface::CassLibrary * cci_
Definition: cql_if_impl.h:74
void operator()(CassCluster *ptr)
Definition: cql_if_impl.h:69
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:67
interface::CassLibrary * cci_
Definition: cql_if_impl.h:110
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:103
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:139
void operator()(CassIterator *ptr)
Definition: cql_if_impl.h:141
void operator()(CassSession *ptr)
Definition: cql_if_impl.h:93
interface::CassLibrary * cci_
Definition: cql_if_impl.h:98
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:91
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:79
interface::CassLibrary * cci_
Definition: cql_if_impl.h:86
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:115
void operator()(CassStatement *ptr)
Definition: cql_if_impl.h:117
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:151
void operator()(const CassPrepared *ptr)
Definition: cql_if_impl.h:153
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:127
void operator()(const CassResult *ptr)
Definition: cql_if_impl.h:129
void operator()(const CassSchemaMeta *ptr)
Definition: cql_if_impl.h:165