OpenSDN source code
cql_if_impl.h
Go to the documentation of this file.
1 //
2 // Copyright (c) 2016 Juniper Networks, Inc. All rights reserved.
3 //
4 
5 #ifndef DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
6 #define DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
7 
8 #include <string>
9 
10 #include <boost/unordered_map.hpp>
11 
12 #include <linux/version.h>
13 #if defined(RHEL_MAJOR) && (RHEL_MAJOR >= 9)
14 #include <cassandra/cassandra.h>
15 #else
16 #include <cassandra.h>
17 #endif
18 
19 #include <io/event_manager.h>
20 #include <base/timer.h>
21 #include <database/gendb_if.h>
22 #include <database/cassandra/cql/cql_types.h>
24 
25 namespace cass {
26 namespace cql {
27 namespace impl {
28 
30  const std::string &compaction_strategy);
32  const std::string &compaction_strategy,
33  boost::system::error_code *ec);
34 std::string CassCreateIndexIfNotExists(const std::string &cfname,
35  const std::string &column, const std::string &indexname,
36  const GenDb::ColIndexMode::type index_mode);
37 std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
38 std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns);
41  boost::system::error_code *ec);
42 std::string CassSelectFromTable(const std::string &table);
44  const std::string &table, const GenDb::DbDataValueVec &rkeys,
45  const GenDb::ColumnNameRange &ck_range,
46  const GenDb::WhereIndexInfoVec &where_vec,
48 std::string PartitionKey2CassSelectFromTable(const std::string &table,
49  const GenDb::DbDataValueVec &rkeys);
51  const std::string &table, const GenDb::DbDataValueVec &rkeys,
52  const GenDb::ColumnNameRange &crange,
55  const std::string &table, const std::vector<GenDb::DbDataValueVec> &rkeys,
56  const GenDb::ColumnNameRange &crange,
58 
59 // CQL Library Shared Pointers to handle library free calls
60 template<class T>
61 struct Deleter;
62 
63 template<>
64 struct Deleter<CassCluster> {
66  cci_(cci) {}
67  void operator()(CassCluster *ptr) {
68  if (ptr != NULL) {
69  cci_->CassClusterFree(ptr);
70  }
71  }
73 };
74 
75 template<>
76 struct Deleter<CassSsl> {
78  cci_(cci) {}
79  void operator()(CassSsl *ptr) {
80  if (ptr != NULL) {
81  cci_->CassSslFree(ptr);
82  }
83  }
85 };
86 
87 template<>
88 struct Deleter<CassSession> {
90  cci_(cci) {}
91  void operator()(CassSession* ptr) {
92  if (ptr != NULL) {
93  cci_->CassSessionFree(ptr);
94  }
95  }
97 };
98 
99 template<>
100 struct Deleter<CassFuture> {
102  cci_(cci) {}
103  void operator()(CassFuture* ptr) {
104  if (ptr != NULL) {
105  cci_->CassFutureFree(ptr);
106  }
107  }
109 };
110 
111 template<>
112 struct Deleter<CassStatement> {
114  cci_(cci) {}
115  void operator()(CassStatement* ptr) {
116  if (ptr != NULL) {
117  cci_->CassStatementFree(ptr);
118  }
119  }
121 };
122 
123 template<>
124 struct Deleter<const CassResult> {
126  cci_(cci) {}
127  void operator()(const CassResult* ptr) {
128  if (ptr != NULL) {
129  cci_->CassResultFree(ptr);
130  }
131  }
133 };
134 
135 template<>
136 struct Deleter<CassIterator> {
138  cci_(cci) {}
139  void operator()(CassIterator* ptr) {
140  if (ptr != NULL) {
141  cci_->CassIteratorFree(ptr);
142  }
143  }
145 };
146 
147 template<>
148 struct Deleter<const CassPrepared> {
150  cci_(cci) {}
151  void operator()(const CassPrepared* ptr) {
152  if (ptr != NULL) {
153  cci_->CassPreparedFree(ptr);
154  }
155  }
157 };
158 
159 template<>
160 struct Deleter<const CassSchemaMeta> {
162  cci_(cci) {}
163  void operator()(const CassSchemaMeta* ptr) {
164  if (ptr != NULL) {
165  cci_->CassSchemaMetaFree(ptr);
166  }
167  }
169 };
170 
171 template <class T>
172 class CassSharedPtr : public boost::shared_ptr<T> {
173  public:
175  boost::shared_ptr<T>(ptr, Deleter<T>(cci)) {}
176 };
177 
187 
188 typedef boost::function<void(GenDb::DbOpResult::type,
189  std::auto_ptr<GenDb::ColList>)> CassAsyncQueryCallback;
190 
192  CassQueryResultContext(const std::string &cf_name, bool is_dynamic_cf,
193  const GenDb::DbDataValueVec &row_key, size_t rk_count = 0,
194  size_t ck_count = 0) :
195  cf_name_(cf_name),
196  is_dynamic_cf_(is_dynamic_cf),
197  row_key_(row_key),
198  rk_count_(rk_count),
199  ck_count_(ck_count) {
200  }
201  std::string cf_name_;
204  size_t rk_count_;
205  size_t ck_count_;
206 };
207 
210  interface::CassLibrary *cci, CassQueryResultContext *rctx = NULL) :
211  query_id_(query_id),
212  cb_(cb),
213  cci_(cci),
214  result_ctx_(rctx) {
215  }
216  std::string query_id_;
219  boost::scoped_ptr<CassQueryResultContext> result_ctx_;
220 };
221 
223  CassResultPtr *result, size_t rk_count,
224  size_t ck_count, GenDb::ColListVec *v_col_list);
226  CassResultPtr *result, size_t rk_count,
227  GenDb::ColListVec *v_col_list);
228 
229 } // namespace impl
230 
231 //
232 // CqlIfImpl
233 //
234 class CqlIfImpl {
235  public:
237  const std::vector<std::string> &cassandra_ips,
238  int cassandra_port,
239  const std::string &cassandra_user,
240  const std::string &cassandra_password,
241  bool use_ssl,
242  const std::string &ca_certs_path,
244  virtual ~CqlIfImpl();
245 
246  bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace,
247  const std::string &replication_factor, CassConsistency consistency);
248  bool UseKeyspaceSync(const std::string &keyspace,
249  CassConsistency consistency);
250  bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace,
251  CassConsistency consistency);
252 
254  const std::string &compaction_strategy, CassConsistency consistency);
255  bool CreateIndexIfNotExistsSync(const std::string &cfname,
256  const std::string &column, const std::string &indexname,
257  CassConsistency consistency,
258  const GenDb::ColIndexMode::type index_mode);
260  bool IsTablePresent(const std::string &table);
261  int IsTableStatic(const std::string &table);
262  bool IsTableDynamic(const std::string &table);
263 
264  bool InsertIntoTableSync(std::auto_ptr<GenDb::ColList> v_columns,
265  CassConsistency consistency);
266  bool InsertIntoTableAsync(std::auto_ptr<GenDb::ColList> v_columns,
267  CassConsistency consistency, impl::CassAsyncQueryCallback cb);
268  bool InsertIntoTablePrepareAsync(std::auto_ptr<GenDb::ColList> v_columns,
269  CassConsistency consistency, impl::CassAsyncQueryCallback cb);
270  bool IsInsertIntoTablePrepareSupported(const std::string &table);
271 
272  bool SelectFromTableSync(const std::string &cfname,
273  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
274  GenDb::NewColVec *out);
275  bool SelectFromTableSync(const std::string &cfname,
276  CassConsistency consistency, GenDb::ColListVec *out);
277  bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname,
278  const GenDb::DbDataValueVec &rkey,
279  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
280  GenDb::NewColVec *out);
281  bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname,
282  const GenDb::DbDataValueVec &rkey,
283  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
284  const GenDb::FieldNamesToReadVec &read_vec,
285  GenDb::NewColVec *out);
286 
287  bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname,
288  const std::vector<GenDb::DbDataValueVec> &rkeys,
289  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
290  const GenDb::FieldNamesToReadVec &read_vec,
291  GenDb::ColListVec *out);
292 
293  bool SelectFromTableAsync(const std::string &cfname,
294  const GenDb::DbDataValueVec &rkey, CassConsistency consistency,
296  bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname,
297  const GenDb::DbDataValueVec &rkey,
298  const GenDb::ColumnNameRange &ck_range, CassConsistency consistency,
301  const std::string &cfname, const GenDb::DbDataValueVec &rkey,
302  const GenDb::ColumnNameRange &ck_range,
303  const GenDb::WhereIndexInfoVec &where_vec,
304  const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency,
306 
307  bool ConnectSync();
308  bool ConnectSchemaSync();
309  bool DisconnectSync();
310  bool DisconnectSchemaSync();
311 
312  void SetRequestTimeout(uint32_t timeout_ms);
313 
314  bool GetMetrics(Metrics *metrics) const;
315 
316  private:
317 
318  bool InsertIntoTableInternal(std::auto_ptr<GenDb::ColList> v_columns,
319  CassConsistency consistency, bool sync,
321  bool GetPrepareInsertIntoTable(const std::string &table_name,
322  impl::CassPreparedPtr *prepared) const;
324  impl::CassPreparedPtr *prepared);
325  bool InsertIntoTablePrepareInternal(std::auto_ptr<GenDb::ColList> v_columns,
326  CassConsistency consistency, bool sync,
328 
329  static const char * kQCreateKeyspaceIfNotExists;
330  static const char * kQUseKeyspace;
331  static const char * kTaskName;
332  static const int kTaskInstance = -1;
333 
334  struct SessionState {
335  enum type {
339  };
340  };
341 
348  tbb::atomic<SessionState::type> session_state_;
349  tbb::atomic<SessionState::type> schema_session_state_;
351  std::string keyspace_;
353  typedef boost::unordered_map<std::string, impl::CassPreparedPtr>
356  mutable tbb::mutex map_mutex_;
357 };
358 
359 } // namespace cql
360 } // namespace cass
361 
362 #endif // DATABASE_CASSANDRA_CQL_CQL_IF_IMPL_H_
impl::CassSessionPtr session_
Definition: cql_if_impl.h:346
bool SelectFromTableClusteringKeyRangeSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2260
bool InsertIntoTableAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2159
bool DisconnectSchemaSync()
Definition: cql_if.cc:2347
impl::CassClusterPtr cluster_
Definition: cql_if_impl.h:344
bool CreateKeyspaceIfNotExistsSync(const std::string &keyspace, const std::string &replication_factor, CassConsistency consistency)
Definition: cql_if.cc:1916
bool IsTableDynamic(const std::string &table)
Definition: cql_if.cc:2150
bool PrepareInsertIntoTableSync(const GenDb::NewCf &cf, impl::CassPreparedPtr *prepared)
Definition: cql_if.cc:2433
bool CreateIndexIfNotExistsSync(const std::string &cfname, const std::string &column, const std::string &indexname, CassConsistency consistency, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:2012
impl::CassSslPtr ssl_
Definition: cql_if_impl.h:345
impl::CassSessionPtr schema_session_
Definition: cql_if_impl.h:347
tbb::mutex map_mutex_
Definition: cql_if_impl.h:356
bool InsertIntoTableInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2409
static const char * kTaskName
Definition: cql_if_impl.h:331
bool InsertIntoTablePrepareAsync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2164
bool GetPrepareInsertIntoTable(const std::string &table_name, impl::CassPreparedPtr *prepared) const
Definition: cql_if.cc:2043
bool SelectFromTableAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2079
bool SelectFromTableClusteringKeyRangeFieldNamesSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *out)
Definition: cql_if.cc:2227
virtual ~CqlIfImpl()
Definition: cql_if.cc:1909
bool GetMetrics(Metrics *metrics) const
Definition: cql_if.cc:2361
tbb::atomic< SessionState::type > schema_session_state_
Definition: cql_if_impl.h:349
void SetRequestTimeout(uint32_t timeout_ms)
Definition: cql_if.cc:2281
tbb::atomic< SessionState::type > session_state_
Definition: cql_if_impl.h:348
bool InsertIntoTablePrepareInternal(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency, bool sync, impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2463
CassPreparedMapType insert_prepared_map_
Definition: cql_if_impl.h:355
bool CreateTableIfNotExistsSync(const GenDb::NewCf &cf, const std::string &compaction_strategy, CassConsistency consistency)
Definition: cql_if.cc:1977
bool UseKeyspaceSyncOnSchemaSession(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1933
bool ConnectSchemaSync()
Definition: cql_if.cc:2286
static const char * kQCreateKeyspaceIfNotExists
Definition: cql_if_impl.h:329
bool IsTablePresent(const std::string &table)
Definition: cql_if.cc:2055
CqlIfImpl(EventManager *evm, const std::vector< std::string > &cassandra_ips, int cassandra_port, const std::string &cassandra_user, const std::string &cassandra_password, bool use_ssl, const std::string &ca_certs_path, interface::CassLibrary *cci)
Definition: cql_if.cc:1851
std::string schema_contact_point_
Definition: cql_if_impl.h:350
boost::unordered_map< std::string, impl::CassPreparedPtr > CassPreparedMapType
Definition: cql_if_impl.h:354
interface::CassLibrary * cci_
Definition: cql_if_impl.h:343
bool UseKeyspaceSync(const std::string &keyspace, CassConsistency consistency)
Definition: cql_if.cc:1955
bool LocatePrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:2024
bool SelectFromTableClusteringKeyRangeAndIndexValueAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2104
EventManager * evm_
Definition: cql_if_impl.h:342
bool IsInsertIntoTablePrepareSupported(const std::string &table)
Definition: cql_if.cc:2173
bool SelectFromTableSync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, CassConsistency consistency, GenDb::NewColVec *out)
Definition: cql_if.cc:2178
std::string keyspace_
Definition: cql_if_impl.h:351
static const char * kQUseKeyspace
Definition: cql_if_impl.h:330
static const int kTaskInstance
Definition: cql_if_impl.h:332
int IsTableStatic(const std::string &table)
Definition: cql_if.cc:2063
bool InsertIntoTableSync(std::auto_ptr< GenDb::ColList > v_columns, CassConsistency consistency)
Definition: cql_if.cc:2154
bool SelectFromTableClusteringKeyRangeAsync(const std::string &cfname, const GenDb::DbDataValueVec &rkey, const GenDb::ColumnNameRange &ck_range, CassConsistency consistency, cass::cql::impl::CassAsyncQueryCallback cb)
Definition: cql_if.cc:2128
CassSharedPtr(T *ptr, interface::CassLibrary *cci)
Definition: cql_if_impl.h:174
static EventManager evm
uint8_t type
Definition: load_balance.h:2
std::vector< DbDataValue > DbDataValueVec
Definition: gendb_if.h:100
std::vector< FieldNamesToReadInfo > FieldNamesToReadVec
Definition: gendb_if.h:238
std::vector< WhereIndexInfo > WhereIndexInfoVec
Definition: gendb_if.h:233
boost::ptr_vector< ColList > ColListVec
Definition: gendb_if.h:208
boost::ptr_vector< NewCol > NewColVec
Definition: gendb_if.h:186
std::string StaticCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:611
static void DynamicCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, const GenDb::FieldNamesToReadVec &read_vec, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1220
std::string StaticCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf)
Definition: cql_if.cc:723
std::string PartitionKeyAndClusteringKeyRange2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1024
CassSharedPtr< const CassSchemaMeta > CassSchemaMetaPtr
Definition: cql_if_impl.h:186
CassSharedPtr< CassSsl > CassSslPtr
Definition: cql_if_impl.h:179
std::string DynamicCf2CassPrepareInsertIntoTable(const GenDb::NewCf &cf, boost::system::error_code *ec)
Definition: cql_if.cc:746
CassSharedPtr< CassSession > CassSessionPtr
Definition: cql_if_impl.h:180
CassSharedPtr< const CassPrepared > CassPreparedPtr
Definition: cql_if_impl.h:185
CassSharedPtr< CassIterator > CassIteratorPtr
Definition: cql_if_impl.h:184
std::string DynamicCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy, boost::system::error_code *ec)
Definition: cql_if.cc:475
CassSharedPtr< const CassResult > CassResultPtr
Definition: cql_if_impl.h:183
CassSharedPtr< CassCluster > CassClusterPtr
Definition: cql_if_impl.h:178
std::string CassCreateIndexIfNotExists(const std::string &cfname, const std::string &column, const std::string &indexname, const GenDb::ColIndexMode::type index_mode)
Definition: cql_if.cc:585
std::string CassSelectFromTable(const std::string &table)
Definition: cql_if.cc:1042
static void StaticCfGetResult(interface::CassLibrary *cci, CassResultPtr *result, GenDb::NewColVec *v_columns)
Definition: cql_if.cc:1408
std::string ClusteringKeyRangeAndIndexValue2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys, const GenDb::ColumnNameRange &ck_range, const GenDb::WhereIndexInfoVec &where_vec, const GenDb::FieldNamesToReadVec &read_vec)
Definition: cql_if.cc:1004
std::string DynamicCf2CassInsertIntoTable(const GenDb::ColList *v_columns)
Definition: cql_if.cc:664
boost::function< void(GenDb::DbOpResult::type, std::auto_ptr< GenDb::ColList >)> CassAsyncQueryCallback
Definition: cql_if_impl.h:189
CassSharedPtr< CassStatement > CassStatementPtr
Definition: cql_if_impl.h:182
std::string StaticCf2CassCreateTableIfNotExists(const GenDb::NewCf &cf, const std::string &compaction_strategy)
Definition: cql_if.cc:435
CassSharedPtr< CassFuture > CassFuturePtr
Definition: cql_if_impl.h:181
std::string PartitionKey2CassSelectFromTable(const std::string &table, const GenDb::DbDataValueVec &rkeys)
Definition: cql_if.cc:1015
interface::CassLibrary * cci_
Definition: cql_if_impl.h:218
boost::scoped_ptr< CassQueryResultContext > result_ctx_
Definition: cql_if_impl.h:219
CassAsyncQueryContext(const char *query_id, CassAsyncQueryCallback cb, interface::CassLibrary *cci, CassQueryResultContext *rctx=NULL)
Definition: cql_if_impl.h:209
CassQueryResultContext(const std::string &cf_name, bool is_dynamic_cf, const GenDb::DbDataValueVec &row_key, size_t rk_count=0, size_t ck_count=0)
Definition: cql_if_impl.h:192
interface::CassLibrary * cci_
Definition: cql_if_impl.h:72
void operator()(CassCluster *ptr)
Definition: cql_if_impl.h:67
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:65
interface::CassLibrary * cci_
Definition: cql_if_impl.h:108
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:101
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:137
void operator()(CassIterator *ptr)
Definition: cql_if_impl.h:139
void operator()(CassSession *ptr)
Definition: cql_if_impl.h:91
interface::CassLibrary * cci_
Definition: cql_if_impl.h:96
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:89
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:77
interface::CassLibrary * cci_
Definition: cql_if_impl.h:84
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:113
void operator()(CassStatement *ptr)
Definition: cql_if_impl.h:115
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:149
void operator()(const CassPrepared *ptr)
Definition: cql_if_impl.h:151
Deleter(interface::CassLibrary *cci)
Definition: cql_if_impl.h:125
void operator()(const CassResult *ptr)
Definition: cql_if_impl.h:127
void operator()(const CassSchemaMeta *ptr)
Definition: cql_if_impl.h:163