Kannel: Open Source WAP and SMS gateway  svn-r5335
dbpool_cass.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * dbpool_cass.c - implement Cassandra 2.x, 3.0 operations for generic database connection pool
59  *
60  * Stipe Tolj <stolj at kannel.org>, 2015-09-25
61  */
62 
63 #ifdef HAVE_CASS
64 #include <cassandra.h>
65 
66 struct CassInstance {
67  CassCluster *cluster;
68  CassSession *session;
69 };
70 
71 typedef struct CassInstance CassInstance;
72 
73 
74 static void cass_log(const CassLogMessage *message, void *data)
75 {
76  switch (message->severity) {
77 
78 #define CASS_LEVEL(e, f) \
79  case e: \
80  f(0, "Cassandra: (%s:%d:%s): %s", \
81  message->file, message->line, message->function, \
82  message->message); \
83  break;
84 
85  CASS_LEVEL(CASS_LOG_INFO, info)
86  CASS_LEVEL(CASS_LOG_WARN, warning)
87  CASS_LEVEL(CASS_LOG_ERROR, error)
88  CASS_LEVEL(CASS_LOG_CRITICAL, panic)
89 
90  default:
91  break;
92  }
93 }
94 
95 
96 static void print_error(CassFuture* future)
97 {
98  const char* message;
99  size_t message_length;
100 
101  cass_future_error_message(future, &message, &message_length);
102  error(0, "Cassandra: %s", message);
103 }
104 
105 
106 static CassError connect_session(CassSession* session, const CassCluster* cluster)
107 {
108  CassError rc = CASS_OK;
109  CassFuture* future = cass_session_connect(session, cluster);
110 
111  cass_future_wait(future);
112  rc = cass_future_error_code(future);
113  if (rc != CASS_OK) {
114  print_error(future);
115  }
116  cass_future_free(future);
117 
118  return rc;
119 }
120 
121 
122 static void *cass_open_conn(const DBConf *db_conf)
123 {
124  CassInstance *i;
125  CassConf *conf = db_conf->cass; /* make compiler happy */
126 
127  /* sanity check */
128  if (conf == NULL)
129  return NULL;
130 
131  /* set logging callback */
132  cass_log_set_level(CASS_LOG_INFO);
133  cass_log_set_callback(cass_log, NULL);
134 
135  /* create instance */
136  i = gw_malloc(sizeof(CassInstance));
137  i->cluster = cass_cluster_new();
138  cass_cluster_set_contact_points(i->cluster, octstr_get_cstr(conf->host));
139 
140  /* set authentication credentials */
141  cass_cluster_set_credentials(i->cluster,
143 
144  /* change idle timeout (default 60s) */
145  cass_cluster_set_connection_idle_timeout(i->cluster, conf->idle_timeout);
146 
147  i->session = cass_session_new();
148 
149  if (connect_session(i->session, i->cluster) != CASS_OK) {
150  error(0, "Cassandra: Can not connect to cluster!");
151  goto failed;
152  }
153 
154  info(0, "Cassandra: Connected to cluster <%s>",
155  octstr_get_cstr(conf->host));
156 
157  return i;
158 
159 failed:
160  cass_cluster_free(i->cluster);
161  cass_session_free(i->session);
162  gw_free(i);
163 
164  return NULL;
165 }
166 
167 
168 static void cass_close_conn(void *conn)
169 {
170  CassInstance *i = (CassInstance*) conn;
171  CassFuture *close_future = NULL;
172 
173  if (conn == NULL)
174  return;
175 
176  close_future = cass_session_close(i->session);
177  cass_future_wait(close_future);
178  cass_future_free(close_future);
179 
180  cass_cluster_free(i->cluster);
181  cass_session_free(i->session);
182  gw_free(i);
183 }
184 
185 
186 static int cass_check_conn(void *conn)
187 {
188  if (conn == NULL)
189  return -1;
190 
191  return 0;
192 }
193 
194 
195 static int cass_select(void *conn, const Octstr *sql, List *binds, List **res)
196 {
197  CassInstance *instance = conn;
198  CassError rc = CASS_OK;
199  CassStatement *statement = NULL;
200  CassFuture *future = NULL;
201  long i, binds_len;
202  List *res_row;
203 
204  binds_len = gwlist_len(binds);
205 
206  /* allocate statement handle */
207  statement = cass_statement_new(octstr_get_cstr(sql), binds_len);
208 
209  /* bind parameters if any */
210  if (binds_len > 0) {
211  for (i = 0; i < binds_len; i++) {
212  cass_statement_bind_string(statement, i, octstr_get_cstr(gwlist_get(binds, i)));
213  }
214  }
215 
216  /* execute statement */
217  future = cass_session_execute(instance->session, statement);
218  cass_future_wait(future);
219 
220  /* evaluate result */
221  rc = cass_future_error_code(future);
222  if (rc != CASS_OK) {
223  print_error(future);
224  cass_future_free(future);
225  cass_statement_free(statement);
226  return -1;
227  } else {
228  const CassResult *result = cass_future_get_result(future);
229  CassIterator *iterator_row = cass_iterator_from_result(result);
230 
231  *res = gwlist_create();
232 
233  /* iterate through all rows */
234  while (cass_iterator_next(iterator_row)) {
235  const CassRow *row = cass_iterator_get_row(iterator_row);
236  CassIterator *iterator_column = cass_iterator_from_row(row);
237 
238  res_row = gwlist_create();
239 
240  /* iterate through all columns (of one row) */
241  while (cass_iterator_next(iterator_column)) {
242  const CassValue *value;
243  CassValueType type;
244  cass_int32_t i;
245  cass_int64_t bi;
246  cass_bool_t b;
247  cass_double_t d;
248  cass_float_t f;
249  const char* s;
250  size_t s_length;
251  CassUuid u;
252  char us[CASS_UUID_STRING_LENGTH];
253 
254  value = cass_iterator_get_column(iterator_column);
255  type = cass_value_type(value);
256 
257  /* determine value type and convert */
258  switch (type) {
259  case CASS_VALUE_TYPE_INT:
260  cass_value_get_int32(value, &i);
261  gwlist_append(res_row, octstr_format("%d", i));
262  break;
263 
264  case CASS_VALUE_TYPE_BIGINT:
265  cass_value_get_int64(value, &bi);
266  gwlist_append(res_row, octstr_format("%ld", bi));
267  break;
268 
269  case CASS_VALUE_TYPE_BOOLEAN:
270  cass_value_get_bool(value, &b);
271  gwlist_append(res_row, (b ? octstr_imm("true") : octstr_imm("false")));
272  break;
273 
274  case CASS_VALUE_TYPE_DOUBLE:
275  cass_value_get_double(value, &d);
276  gwlist_append(res_row, octstr_format("%g", d));
277  break;
278 
279  case CASS_VALUE_TYPE_FLOAT:
280  cass_value_get_float(value, &f);
281  gwlist_append(res_row, octstr_format("%f", f));
282  break;
283 
284  case CASS_VALUE_TYPE_TEXT:
285  case CASS_VALUE_TYPE_ASCII:
286  case CASS_VALUE_TYPE_VARCHAR:
287  cass_value_get_string(value, &s, &s_length);
288  gwlist_append(res_row, octstr_create_from_data(s, s_length));
289  break;
290 
291  case CASS_VALUE_TYPE_UUID:
292  cass_value_get_uuid(value, &u);
293  cass_uuid_string(u, us);
294  gwlist_append(res_row, octstr_create(us));
295  break;
296 
297  default:
298  if (cass_value_is_null(value)) {
299  gwlist_append(res_row, octstr_imm("_NULL_"));
300  } else {
301  error(0, "Cassandra: %s: unhandled type %d", __func__, type);
302  }
303  break;
304  }
305 
306  }
307  gwlist_append(*res, res_row);
308  cass_iterator_free(iterator_column);
309 
310  }
311 
312  cass_result_free(result);
313  cass_iterator_free(iterator_row);
314  }
315 
316  cass_future_free(future);
317  cass_statement_free(statement);
318 
319  return 0;
320 }
321 
322 
323 static int cass_update(void *conn, const Octstr *sql, List *binds)
324 {
325  CassInstance *instance = conn;
326  CassError rc = CASS_OK;
327  CassStatement *statement = NULL;
328  CassFuture *future = NULL;
329  long i, binds_len;
330 
331  binds_len = gwlist_len(binds);
332 
333  /* allocate statement handle */
334  statement = cass_statement_new(octstr_get_cstr(sql), binds_len);
335 
336  /* bind parameters if any */
337  if (binds_len > 0) {
338  for (i = 0; i < binds_len; i++) {
339  cass_statement_bind_string(statement, i, octstr_get_cstr(gwlist_get(binds, i)));
340  }
341  }
342 
343  /* execute statement */
344  future = cass_session_execute(instance->session, statement);
345  cass_future_wait(future);
346 
347  /* evaluate result */
348  rc = cass_future_error_code(future);
349  if (rc != CASS_OK) {
350  print_error(future);
351  cass_future_free(future);
352  cass_statement_free(statement);
353  return -1;
354  }
355 
356  cass_future_free(future);
357  cass_statement_free(statement);
358 
359  return 0;
360 }
361 
362 
363 static void cass_conf_destroy(DBConf *db_conf)
364 {
365  CassConf *conf = db_conf->cass;
366 
367  octstr_destroy(conf->host);
368  octstr_destroy(conf->username);
369  octstr_destroy(conf->password);
370  octstr_destroy(conf->database);
371 
372  gw_free(conf);
373  gw_free(db_conf);
374 }
375 
376 
377 static struct db_ops cass_ops = {
378  .open = cass_open_conn,
379  .close = cass_close_conn,
380  .check = cass_check_conn,
381  .select = cass_select,
382  .update = cass_update,
383  .conf_destroy = cass_conf_destroy
384 };
385 
386 #endif /* HAVE_CASS */
void error(int err, const char *fmt,...)
Definition: log.c:648
void info(int err, const char *fmt,...)
Definition: log.c:672
Octstr * database
Definition: dbpool.h:160
void gwlist_append(List *list, void *item)
Definition: list.c:179
long gwlist_len(List *list)
Definition: list.c:166
void * gwlist_get(List *list, long pos)
Definition: list.c:292
int type
Definition: smsc_cimd2.c:215
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
Octstr * octstr_imm(const char *cstr)
Definition: octstr.c:283
CassConf * cass
Definition: dbpool.h:173
void warning(int err, const char *fmt,...)
Definition: log.c:660
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2464
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:324
#define octstr_create(cstr)
Definition: octstr.h:125
Definition: dbpool.h:164
Octstr * host
Definition: dbpool.h:156
Definition: octstr.c:118
#define panic
Definition: log.h:87
long idle_timeout
Definition: dbpool.h:161
#define gwlist_create()
Definition: list.h:136
Octstr * password
Definition: dbpool.h:159
Octstr * username
Definition: dbpool.h:158
#define octstr_create_from_data(data, len)
Definition: octstr.h:134
Definition: list.c:102
void *(* open)(const DBConf *conf)
Definition: dbpool_p.h:73
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.