64 #include <cassandra.h> 71 typedef struct CassInstance CassInstance;
74 static void cass_log(
const CassLogMessage *message,
void *data)
76 switch (message->severity) {
78 #define CASS_LEVEL(e, f) \ 80 f(0, "Cassandra: (%s:%d:%s): %s", \ 81 message->file, message->line, message->function, \ 85 CASS_LEVEL(CASS_LOG_INFO,
info)
86 CASS_LEVEL(CASS_LOG_WARN,
warning)
87 CASS_LEVEL(CASS_LOG_ERROR,
error)
88 CASS_LEVEL(CASS_LOG_CRITICAL,
panic)
96 static void print_error(CassFuture* future)
99 size_t message_length;
101 cass_future_error_message(future, &message, &message_length);
102 error(0,
"Cassandra: %s", message);
106 static CassError connect_session(CassSession* session,
const CassCluster* cluster)
108 CassError rc = CASS_OK;
109 CassFuture* future = cass_session_connect(session, cluster);
111 cass_future_wait(future);
112 rc = cass_future_error_code(future);
116 cass_future_free(future);
122 static void *cass_open_conn(
const DBConf *db_conf)
132 cass_log_set_level(CASS_LOG_INFO);
133 cass_log_set_callback(cass_log, NULL);
136 i = gw_malloc(
sizeof(CassInstance));
137 i->cluster = cass_cluster_new();
141 cass_cluster_set_credentials(i->cluster,
145 cass_cluster_set_connection_idle_timeout(i->cluster, conf->
idle_timeout);
147 i->session = cass_session_new();
149 if (connect_session(i->session, i->cluster) != CASS_OK) {
150 error(0,
"Cassandra: Can not connect to cluster!");
154 info(0,
"Cassandra: Connected to cluster <%s>",
160 cass_cluster_free(i->cluster);
161 cass_session_free(i->session);
168 static void cass_close_conn(
void *conn)
170 CassInstance *i = (CassInstance*) conn;
171 CassFuture *close_future = NULL;
176 close_future = cass_session_close(i->session);
177 cass_future_wait(close_future);
178 cass_future_free(close_future);
180 cass_cluster_free(i->cluster);
181 cass_session_free(i->session);
186 static int cass_check_conn(
void *conn)
195 static int cass_select(
void *conn,
const Octstr *sql,
List *binds,
List **res)
197 CassInstance *instance = conn;
198 CassError rc = CASS_OK;
199 CassStatement *statement = NULL;
200 CassFuture *future = NULL;
211 for (i = 0; i < binds_len; i++) {
217 future = cass_session_execute(instance->session, statement);
218 cass_future_wait(future);
221 rc = cass_future_error_code(future);
224 cass_future_free(future);
225 cass_statement_free(statement);
228 const CassResult *result = cass_future_get_result(future);
229 CassIterator *iterator_row = cass_iterator_from_result(result);
234 while (cass_iterator_next(iterator_row)) {
235 const CassRow *row = cass_iterator_get_row(iterator_row);
236 CassIterator *iterator_column = cass_iterator_from_row(row);
241 while (cass_iterator_next(iterator_column)) {
242 const CassValue *value;
252 char us[CASS_UUID_STRING_LENGTH];
254 value = cass_iterator_get_column(iterator_column);
255 type = cass_value_type(value);
259 case CASS_VALUE_TYPE_INT:
260 cass_value_get_int32(value, &i);
264 case CASS_VALUE_TYPE_BIGINT:
265 cass_value_get_int64(value, &bi);
269 case CASS_VALUE_TYPE_BOOLEAN:
270 cass_value_get_bool(value, &b);
274 case CASS_VALUE_TYPE_DOUBLE:
275 cass_value_get_double(value, &d);
279 case CASS_VALUE_TYPE_FLOAT:
280 cass_value_get_float(value, &f);
284 case CASS_VALUE_TYPE_TEXT:
285 case CASS_VALUE_TYPE_ASCII:
286 case CASS_VALUE_TYPE_VARCHAR:
287 cass_value_get_string(value, &s, &s_length);
291 case CASS_VALUE_TYPE_UUID:
292 cass_value_get_uuid(value, &u);
293 cass_uuid_string(u, us);
298 if (cass_value_is_null(value)) {
301 error(0,
"Cassandra: %s: unhandled type %d", __func__,
type);
308 cass_iterator_free(iterator_column);
312 cass_result_free(result);
313 cass_iterator_free(iterator_row);
316 cass_future_free(future);
317 cass_statement_free(statement);
323 static int cass_update(
void *conn,
const Octstr *sql,
List *binds)
325 CassInstance *instance = conn;
326 CassError rc = CASS_OK;
327 CassStatement *statement = NULL;
328 CassFuture *future = NULL;
338 for (i = 0; i < binds_len; i++) {
344 future = cass_session_execute(instance->session, statement);
345 cass_future_wait(future);
348 rc = cass_future_error_code(future);
351 cass_future_free(future);
352 cass_statement_free(statement);
356 cass_future_free(future);
357 cass_statement_free(statement);
363 static void cass_conf_destroy(
DBConf *db_conf)
377 static struct db_ops cass_ops = {
378 .
open = cass_open_conn,
379 .close = cass_close_conn,
380 .check = cass_check_conn,
381 .select = cass_select,
382 .update = cass_update,
383 .conf_destroy = cass_conf_destroy
void error(int err, const char *fmt,...)
void info(int err, const char *fmt,...)
void gwlist_append(List *list, void *item)
long gwlist_len(List *list)
void * gwlist_get(List *list, long pos)
#define octstr_get_cstr(ostr)
Octstr * octstr_imm(const char *cstr)
void warning(int err, const char *fmt,...)
Octstr * octstr_format(const char *fmt,...)
void octstr_destroy(Octstr *ostr)
#define octstr_create(cstr)
#define octstr_create_from_data(data, len)
void *(* open)(const DBConf *conf)