Kannel: Open Source WAP and SMS gateway  svn-r5335
bb_boxc.c
Go to the documentation of this file.
1 /* ====================================================================
2  * The Kannel Software License, Version 1.0
3  *
4  * Copyright (c) 2001-2018 Kannel Group
5  * Copyright (c) 1998-2001 WapIT Ltd.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  * notice, this list of conditions and the following disclaimer.
14  *
15  * 2. Redistributions in binary form must reproduce the above copyright
16  * notice, this list of conditions and the following disclaimer in
17  * the documentation and/or other materials provided with the
18  * distribution.
19  *
20  * 3. The end-user documentation included with the redistribution,
21  * if any, must include the following acknowledgment:
22  * "This product includes software developed by the
23  * Kannel Group (http://www.kannel.org/)."
24  * Alternately, this acknowledgment may appear in the software itself,
25  * if and wherever such third-party acknowledgments normally appear.
26  *
27  * 4. The names "Kannel" and "Kannel Group" must not be used to
28  * endorse or promote products derived from this software without
29  * prior written permission. For written permission, please
30  * contact org@kannel.org.
31  *
32  * 5. Products derived from this software may not be called "Kannel",
33  * nor may "Kannel" appear in their name, without prior written
34  * permission of the Kannel Group.
35  *
36  * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39  * DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
40  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
41  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
42  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
43  * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
44  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
45  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
46  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
47  * ====================================================================
48  *
49  * This software consists of voluntary contributions made by many
50  * individuals on behalf of the Kannel Group. For more information on
51  * the Kannel Group, please see <http://www.kannel.org/>.
52  *
53  * Portions of this software are based upon software originally written at
54  * WapIT Ltd., Helsinki, Finland for the Kannel project.
55  */
56 
57 /*
58  * bb_boxc.c : bearerbox box connection module
59  *
60  * handles start/restart/stop/suspend/die operations of the sms and
61  * wapbox connections
62  *
63  * Kalle Marjola 2000 for project Kannel
64  * Alexander Malysh (various fixes)
65  */
66 
67 #include <errno.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <time.h>
71 #include <string.h>
72 #include <sys/time.h>
73 #include <sys/types.h>
74 #include <sys/socket.h>
75 #include <unistd.h>
76 #include <signal.h>
77 
78 #include "gwlib/gwlib.h"
79 #include "msg.h"
80 #include "bearerbox.h"
81 #include "bb_smscconn_cb.h"
82 
83 #define SMSBOX_MAX_PENDING 100
84 
85 /* passed from bearerbox core */
86 
87 extern volatile sig_atomic_t bb_status;
88 extern volatile sig_atomic_t restart;
89 extern List *incoming_sms;
90 extern List *outgoing_sms;
91 extern List *incoming_wdp;
92 extern List *outgoing_wdp;
93 
94 extern List *flow_threads;
95 extern List *suspended;
96 
97 /* incoming/outgoing sms queue control */
98 extern long max_incoming_sms_qlength;
99 
100 
101 /* our own thingies */
102 
103 static volatile sig_atomic_t smsbox_running;
104 static volatile sig_atomic_t wapbox_running;
108 
109 /* dictionaries for holding the smsbox routing information */
115 
116 static long smsbox_port;
117 static int smsbox_port_ssl;
119 static long wapbox_port;
120 static int wapbox_port_ssl;
121 
122 /* max pending messages on the line to smsbox */
123 static long smsbox_max_pending;
124 
127 
128 
129 static Counter *boxid;
130 
131 /* sms_to_smsboxes thread-id */
132 static long sms_dequeue_thread;
133 
134 
135 typedef struct _boxc {
137  int is_wap;
138  long id;
139  int load;
140  time_t connect_time;
141  Octstr *client_ip;
142  List *incoming;
143  List *retry; /* If sending fails */
144  List *outgoing;
145  Dict *sent;
147  volatile sig_atomic_t alive;
148  Octstr *boxc_id; /* identifies the connected smsbox instance */
149  /* used to mark connection usable or still waiting for ident. msg */
150  volatile int routable;
151 } Boxc;
152 
153 
154 /* forward declaration */
155 static void sms_to_smsboxes(void *arg);
156 static int send_msg(Boxc *boxconn, Msg *pmsg);
157 static void boxc_sent_push(Boxc*, Msg*);
158 static void boxc_sent_pop(Boxc*, Msg*, Msg**);
159 static void boxc_gwlist_destroy(List *list);
160 
161 
162 /*-------------------------------------------------
163  * receiver thingies
164  */
165 
166 static Msg *read_from_box(Boxc *boxconn)
167 {
168  int ret;
169  Octstr *pack;
170  Msg *msg;
171 
172  pack = NULL;
173  while (bb_status != BB_DEAD && boxconn->alive) {
174  /* XXX: if box doesn't send (just keep conn open) we block here while shutdown */
175  pack = conn_read_withlen(boxconn->conn);
176  gw_claim_area(pack);
177  if (pack != NULL)
178  break;
179  if (conn_error(boxconn->conn)) {
180  info(0, "Read error when reading from box <%s>, disconnecting",
181  octstr_get_cstr(boxconn->client_ip));
182  return NULL;
183  }
184  if (conn_eof(boxconn->conn)) {
185  info(0, "Connection closed by the box <%s>",
186  octstr_get_cstr(boxconn->client_ip));
187  return NULL;
188  }
189 
190  ret = conn_wait(boxconn->conn, -1.0);
191  if (ret < 0) {
192  error(0, "Connection to box <%s> broke.",
193  octstr_get_cstr(boxconn->client_ip));
194  return NULL;
195  }
196  }
197 
198  if (pack == NULL)
199  return NULL;
200 
201  msg = msg_unpack(pack);
202  octstr_destroy(pack);
203 
204  if (msg == NULL)
205  error(0, "Failed to unpack data!");
206  return msg;
207 }
208 
209 
210 /*
211  * Try to deliver message to internal or smscconn queue
212  * and generate ack/nack for smsbox connections.
213  */
214 static void deliver_sms_to_queue(Msg *msg, Boxc *conn)
215 {
216  Msg *mack;
217  int rc;
218 
219  /*
220  * save modifies ID and time, so if the smsbox uses it, save
221  * it FIRST for the reply message!!!
222  */
223  mack = msg_create(ack);
224  gw_assert(mack != NULL);
225  uuid_copy(mack->ack.id, msg->sms.id);
226  mack->ack.time = msg->sms.time;
227 
228  store_save(msg);
229 
230  rc = smsc2_rout(msg, 0);
231  switch (rc) {
232 
233  case SMSCCONN_SUCCESS:
234  mack->ack.nack = ack_success;
235  break;
236 
237  case SMSCCONN_QUEUED:
238  mack->ack.nack = ack_buffered;
239  break;
240 
241  case SMSCCONN_FAILED_DISCARDED: /* no router at all */
242  warning(0, "Message rejected by bearerbox, no router!");
243 
244  /*
245  * we don't store_save_ack() here, since the call to
246  * bb_smscconn_send_failed() within smsc2_route() did
247  * it already.
248  */
249  mack->ack.nack = ack_failed;
250 
251  /* destroy original message */
252  msg_destroy(msg);
253  break;
254 
255  case SMSCCONN_FAILED_QFULL: /* queue full */
256  warning(0, "Message rejected by bearerbox, %s!",
257  (rc == SMSCCONN_FAILED_DISCARDED) ? "no router" : "queue full");
258  /*
259  * first create nack for store-file, in order to delete
260  * message from store-file.
261  */
262  mack->ack.nack = ack_failed_tmp;
264 
265  /* destroy original message */
266  msg_destroy(msg);
267  break;
268 
269  case SMSCCONN_FAILED_EXPIRED: /* validity expired */
270  warning(0, "Message rejected by bearerbox, validity expired!");
271 
272  /*
273  * we don't store_save_ack() here, since the call to
274  * bb_smscconn_send_failed() within smsc2_route() did
275  * it already.
276  */
277  mack->ack.nack = ack_failed;
278 
279  /* destroy original message */
280  msg_destroy(msg);
281  break;
282 
283  case SMSCCONN_FAILED_REJECTED: /* white/black-list rejection */
284  warning(0, "Message rejected by bearerbox, white/black listed!");
285 
286  mack->ack.nack = ack_failed;
287 
288  /* destroy original message */
289  msg_destroy(msg);
290  break;
291 
292  default:
293  break;
294  }
295 
296  /* put ack into incoming queue of conn */
297  send_msg(conn, mack);
298  msg_destroy(mack);
299 }
300 
301 
302 static void boxc_receiver(void *arg)
303 {
304  Boxc *conn = arg;
305  Msg *msg, *mack;
306 
307  /* remove messages from socket until it is closed */
308  while (bb_status != BB_DEAD && conn->alive) {
309 
310  gwlist_consume(suspended); /* block here if suspended */
311 
312  msg = read_from_box(conn);
313 
314  if (msg == NULL) { /* garbage/connection lost */
315  conn->alive = 0;
316  break;
317  }
318 
319  /* we don't accept new messages in shutdown phase */
320  if ((bb_status == BB_SHUTDOWN || bb_status == BB_DEAD) && msg_type(msg) == sms) {
321  mack = msg_create(ack);
322  uuid_copy(mack->ack.id, msg->sms.id);
323  mack->ack.time = msg->sms.time;
324  mack->ack.nack = ack_failed_tmp;
325  msg_destroy(msg);
326  send_msg(conn, mack);
327  msg_destroy(mack);
328  continue;
329  }
330 
331  if (msg_type(msg) == sms && conn->is_wap == 0) {
332  debug("bb.boxc", 0, "boxc_receiver: sms received");
333 
334  /* deliver message to queue */
335  deliver_sms_to_queue(msg, conn);
336 
337  if (conn->routable == 0) {
338  conn->routable = 1;
339  /* wakeup the dequeue thread */
341  }
342  } else if (msg_type(msg) == wdp_datagram && conn->is_wap) {
343  debug("bb.boxc", 0, "boxc_receiver: got wdp from wapbox");
344 
345  /* XXX we should block these in SHUTDOWN phase too, but
346  we need ack/nack msgs implemented first. */
347  gwlist_produce(conn->outgoing, msg);
348 
349  } else if (msg_type(msg) == sms && conn->is_wap) {
350  debug("bb.boxc", 0, "boxc_receiver: got sms from wapbox");
351 
352  /* should be a WAP push message, so tried it the same way */
353  deliver_sms_to_queue(msg, conn);
354 
355  if (conn->routable == 0) {
356  conn->routable = 1;
357  /* wakeup the dequeue thread */
359  }
360  } else {
361  if (msg_type(msg) == heartbeat) {
362  if (msg->heartbeat.load != conn->load)
363  debug("bb.boxc", 0, "boxc_receiver: heartbeat with "
364  "load value %ld received", msg->heartbeat.load);
365  conn->load = msg->heartbeat.load;
366  }
367  else if (msg_type(msg) == ack) {
368  if (msg->ack.nack == ack_failed_tmp) {
369  Msg *orig;
370  boxc_sent_pop(conn, msg, &orig);
371  if (orig != NULL) /* retry this message */
372  gwlist_append(conn->retry, orig);
373  } else {
374  boxc_sent_pop(conn, msg, NULL);
375  store_save(msg);
376  }
377  debug("bb.boxc", 0, "boxc_receiver: got ack");
378  }
379  /* if this is an identification message from an smsbox instance */
380  else if (msg_type(msg) == admin && msg->admin.command == cmd_identify) {
381 
382  /*
383  * any smsbox sends this command even if boxc_id is NULL,
384  * but we will only consider real identified boxes
385  */
386  if (msg->admin.boxc_id != NULL) {
387 
388  /* Only interested if the connection is not named, or its a different name */
389  if (conn->boxc_id == NULL ||
390  octstr_compare(conn->boxc_id, msg->admin.boxc_id)) {
391  List *boxc_id_list = NULL;
392 
393  /*
394  * Different name, need to remove it from the old list.
395  *
396  * I Don't think this case should ever arise, but might as well
397  * be safe.
398  */
399  if (conn->boxc_id != NULL) {
400 
401  /* Get the list for this box id */
402  boxc_id_list = dict_get(smsbox_by_id, conn->boxc_id);
403 
404  /* Delete the connection from the list */
405  if (boxc_id_list != NULL) {
406  gwlist_delete_equal(boxc_id_list, conn);
407  }
408 
409  octstr_destroy(conn->boxc_id);
410  }
411 
412  /* Get the list for this box id */
413  boxc_id_list = dict_get(smsbox_by_id, msg->admin.boxc_id);
414 
415  /* No list yet, so create it */
416  if (boxc_id_list == NULL) {
417  boxc_id_list = gwlist_create();
418  if (!dict_put_once(smsbox_by_id, msg->admin.boxc_id, boxc_id_list))
419  /* list already added */
420  boxc_id_list = dict_get(smsbox_by_id, msg->admin.boxc_id);
421  }
422 
423  /* Add the connection into the list */
424  gwlist_append(boxc_id_list, conn);
425 
426  conn->boxc_id = msg->admin.boxc_id;
427  }
428  else {
429  octstr_destroy(msg->admin.boxc_id);
430  }
431 
432  msg->admin.boxc_id = NULL;
433 
434  debug("bb.boxc", 0, "boxc_receiver: got boxc_id <%s> from <%s>",
435  octstr_get_cstr(conn->boxc_id),
436  octstr_get_cstr(conn->client_ip));
437  }
438 
439  conn->routable = 1;
440  /* wakeup the dequeue thread */
442  }
443  else
444  warning(0, "boxc_receiver: unknown msg received from <%s>, "
445  "ignored", octstr_get_cstr(conn->client_ip));
446  msg_destroy(msg);
447  }
448  }
449 }
450 
451 
452 /*---------------------------------------------
453  * sender thingies
454  */
455 
456 static int send_msg(Boxc *boxconn, Msg *pmsg)
457 {
458  Octstr *pack;
459 
460  pack = msg_pack(pmsg);
461 
462  if (pack == NULL)
463  return -1;
464 
465  if (boxconn->boxc_id != NULL)
466  debug("bb.boxc", 0, "send_msg: sending msg to boxc: <%s>",
467  octstr_get_cstr(boxconn->boxc_id));
468  else
469  debug("bb.boxc", 0, "send_msg: sending msg to box: <%s>",
470  octstr_get_cstr(boxconn->client_ip));
471 
472  if (conn_write_withlen(boxconn->conn, pack) == -1) {
473  error(0, "Couldn't write Msg to box <%s>, disconnecting",
474  octstr_get_cstr(boxconn->client_ip));
475  octstr_destroy(pack);
476  return -1;
477  }
478 
479  octstr_destroy(pack);
480  return 0;
481 }
482 
483 
484 static void boxc_sent_push(Boxc *conn, Msg *m)
485 {
486  Octstr *os;
487  char id[UUID_STR_LEN + 1];
488 
489  if (conn->is_wap || !conn->sent || !m || msg_type(m) != sms)
490  return;
491 
492  uuid_unparse(m->sms.id, id);
493  os = octstr_create(id);
494  dict_put(conn->sent, os, msg_duplicate(m));
495  semaphore_down(conn->pending);
496  octstr_destroy(os);
497 }
498 
499 
500 /*
501  * Remove msg from sent queue.
502  * Return 0 if message should be deleted from store and 1 if not (e.g. tmp nack)
503  */
504 static void boxc_sent_pop(Boxc *conn, Msg *m, Msg **orig)
505 {
506  Octstr *os;
507  char id[UUID_STR_LEN + 1];
508  Msg *msg;
509 
510  if (conn->is_wap || !conn->sent || !m || (msg_type(m) != ack && msg_type(m) != sms))
511  return;
512 
513  if (orig != NULL)
514  *orig = NULL;
515 
516  uuid_unparse((msg_type(m) == sms ? m->sms.id : m->ack.id), id);
517  os = octstr_create(id);
518  msg = dict_remove(conn->sent, os);
519  octstr_destroy(os);
520  if (!msg) {
521  error(0, "BOXC: Got ack for nonexistend message!");
522  msg_dump(m, 0);
523  return;
524  }
525  semaphore_up(conn->pending);
526  if (orig == NULL)
527  msg_destroy(msg);
528  else
529  *orig = msg;
530 }
531 
532 
533 static void boxc_sender(void *arg)
534 {
535  Msg *msg;
536  Boxc *conn = arg;
537 
539 
540  while (bb_status != BB_DEAD && conn->alive) {
541 
542  /*
543  * Make sure there's no data left in the outgoing connection before
544  * doing the potentially blocking gwlist_consume()s
545  */
546  conn_flush(conn->conn);
547 
548  gwlist_consume(suspended); /* block here if suspended */
549 
550  if ((msg = gwlist_consume(conn->incoming)) == NULL) {
551  /* tell sms/wapbox to die */
552  msg = msg_create(admin);
553  msg->admin.command = restart ? cmd_restart : cmd_shutdown;
554  send_msg(conn, msg);
555  msg_destroy(msg);
556  break;
557  }
558  if (msg_type(msg) == heartbeat) {
559  debug("bb.boxc", 0, "boxc_sender: catch an heartbeat - we are alive");
560  msg_destroy(msg);
561  continue;
562  }
563  boxc_sent_push(conn, msg);
564  if (!conn->alive || send_msg(conn, msg) == -1) {
565  /* we got message here */
566  boxc_sent_pop(conn, msg, NULL);
567  gwlist_produce(conn->retry, msg);
568  break;
569  }
570  msg_destroy(msg);
571  debug("bb.boxc", 0, "boxc_sender: sent message to <%s>",
572  octstr_get_cstr(conn->client_ip));
573  }
574  /* the client closes the connection, after that die in receiver */
575  /* conn->alive = 0; */
576 
577  /* set conn to unroutable */
578  conn->routable = 0;
579 
581 }
582 
583 /*---------------------------------------------------------------
584  * accept/create/kill thingies
585  */
586 
587 
588 static Boxc *boxc_create(int fd, Octstr *ip, int ssl)
589 {
590  Boxc *boxc;
591 
592  boxc = gw_malloc(sizeof(Boxc));
593  boxc->is_wap = 0;
594  boxc->load = 0;
595  boxc->conn = conn_wrap_fd(fd, ssl);
596  boxc->id = counter_increase(boxid);
597  boxc->client_ip = ip;
598  boxc->alive = 1;
599  boxc->connect_time = time(NULL);
600  boxc->boxc_id = NULL;
601  boxc->routable = 0;
602  return boxc;
603 }
604 
605 static void boxc_destroy(Boxc *boxc)
606 {
607  if (boxc == NULL)
608  return;
609 
610  /* do nothing to the lists, as they are only references */
611 
612  if (boxc->conn)
613  conn_destroy(boxc->conn);
614  octstr_destroy(boxc->client_ip);
615  octstr_destroy(boxc->boxc_id);
616  gw_free(boxc);
617 }
618 
619 
620 
621 static Boxc *accept_boxc(int fd, int ssl)
622 {
623  Boxc *newconn;
624  Octstr *ip;
625 
626  int newfd;
627  struct sockaddr_in client_addr;
628  socklen_t client_addr_len;
629 
630  client_addr_len = sizeof(client_addr);
631 
632  newfd = accept(fd, (struct sockaddr *)&client_addr, &client_addr_len);
633  if (newfd < 0)
634  return NULL;
635 
636  ip = host_ip(client_addr);
637 
638  if (is_allowed_ip(box_allow_ip, box_deny_ip, ip) == 0) {
639  info(0, "Box connection tried from denied host <%s>, disconnected",
640  octstr_get_cstr(ip));
641  octstr_destroy(ip);
642  close(newfd);
643  return NULL;
644  }
645  newconn = boxc_create(newfd, ip, ssl);
646 
647  /*
648  * check if the SSL handshake was successfull, otherwise
649  * this is no valid box connection any more
650  */
651 #ifdef HAVE_LIBSSL
652  if (ssl && !conn_get_ssl(newconn->conn))
653  return NULL;
654 #endif
655 
656  info(0, "Client connected from <%s> %s", octstr_get_cstr(ip), ssl?"using SSL":"");
657 
658  /* XXX TODO: do the hand-shake, baby, yeah-yeah! */
659 
660  return newconn;
661 }
662 
663 
664 
665 static void run_smsbox(void *arg)
666 {
667  Boxc *newconn;
668  long sender;
669  Msg *msg;
670  List *keys;
671  Octstr *key;
672 
674  newconn = arg;
675  newconn->incoming = gwlist_create();
676  gwlist_add_producer(newconn->incoming);
677  newconn->retry = incoming_sms;
678  newconn->outgoing = outgoing_sms;
679  newconn->sent = dict_create(smsbox_max_pending, NULL);
681 
682  sender = gwthread_create(boxc_sender, newconn);
683  if (sender == -1) {
684  error(0, "Failed to start a new thread, disconnecting client <%s>",
685  octstr_get_cstr(newconn->client_ip));
686  goto cleanup;
687  }
688  /*
689  * We register newconn in the smsbox_list here but mark newconn as routable
690  * after identification or first message received from smsbox. So we can avoid
691  * a race condition for routable smsboxes (otherwise between startup and
692  * registration we will forward some messages to smsbox).
693  */
695  gwlist_append(smsbox_list, newconn);
697 
698  gwlist_add_producer(newconn->outgoing);
699  boxc_receiver(newconn);
701 
702  /* remove us from smsbox routing list */
705  if (newconn->boxc_id) {
706 
707  /* Get the list, and remove the connection from it */
708  List *boxc_id_list = dict_get(smsbox_by_id, newconn->boxc_id);
709 
710  if(boxc_id_list != NULL) {
711  gwlist_delete_equal(boxc_id_list, newconn);
712  }
713  }
714 
716 
717  /*
718  * check if we in the shutdown phase and sms dequeueing thread
719  * has removed the producer already
720  */
721  if (gwlist_producer_count(newconn->incoming) > 0)
723 
724  /* check if we are still waiting for ack's and semaphore locked */
725  if (dict_key_count(newconn->sent) >= smsbox_max_pending)
726  semaphore_up(newconn->pending); /* allow sender to go down */
727 
728  gwthread_join(sender);
729 
730  /* put not acked msgs into incoming queue */
731  keys = dict_keys(newconn->sent);
732  while((key = gwlist_extract_first(keys)) != NULL) {
733  msg = dict_remove(newconn->sent, key);
735  octstr_destroy(key);
736  }
737  gw_assert(gwlist_len(keys) == 0);
739 
740  /* clear our send queue */
741  while((msg = gwlist_extract_first(newconn->incoming)) != NULL) {
743  }
744 
745 cleanup:
746  gw_assert(gwlist_len(newconn->incoming) == 0);
747  gwlist_destroy(newconn->incoming, NULL);
748  gw_assert(dict_key_count(newconn->sent) == 0);
749  dict_destroy(newconn->sent);
750  semaphore_destroy(newconn->pending);
751  boxc_destroy(newconn);
752 
753  /* wakeup the dequeueing thread */
755 
757 }
758 
759 
760 
761 static void run_wapbox(void *arg)
762 {
763  Boxc *newconn;
764  List *newlist;
765  long sender;
766 
768  newconn = arg;
769  newconn->is_wap = 1;
770 
771  /*
772  * create a new incoming list for just that box,
773  * and add it to list of list pointers, so we can start
774  * to route messages to it.
775  */
776 
777  debug("bb", 0, "setting up systems for new wapbox");
778 
779  newlist = gwlist_create();
780  /* this is released by the sender/receiver if it exits */
781  gwlist_add_producer(newlist);
782 
783  newconn->incoming = newlist;
784  newconn->retry = incoming_wdp;
785  newconn->outgoing = outgoing_wdp;
786 
787  sender = gwthread_create(boxc_sender, newconn);
788  if (sender == -1) {
789  error(0, "Failed to start a new thread, disconnecting client <%s>",
790  octstr_get_cstr(newconn->client_ip));
791  goto cleanup;
792  }
793  gwlist_append(wapbox_list, newconn);
794  gwlist_add_producer(newconn->outgoing);
795  boxc_receiver(newconn);
796 
797  /* cleanup after receiver has exited */
798 
803 
804  while (gwlist_producer_count(newlist) > 0)
805  gwlist_remove_producer(newlist);
806 
807  newconn->alive = 0;
808 
809  gwthread_join(sender);
810 
811 cleanup:
812  gw_assert(gwlist_len(newlist) == 0);
813  gwlist_destroy(newlist, NULL);
814  boxc_destroy(newconn);
815 
817 }
818 
819 
820 /*------------------------------------------------
821  * main single thread functions
822  */
823 
824 typedef struct _addrpar {
826  int port;
827  int wapboxid;
828 } AddrPar;
829 
830 static void ap_destroy(AddrPar *addr)
831 {
832  octstr_destroy(addr->address);
833  gw_free(addr);
834 }
835 
836 static int cmp_route(void *ap, void *ms)
837 {
838  AddrPar *addr = ap;
839  Msg *msg = ms;
840 
841  if (msg->wdp_datagram.source_port == addr->port &&
842  octstr_compare(msg->wdp_datagram.source_address, addr->address)==0)
843  return 1;
844 
845  return 0;
846 }
847 
848 static int cmp_boxc(void *bc, void *ap)
849 {
850  Boxc *boxc = bc;
851  AddrPar *addr = ap;
852 
853  if (boxc->id == addr->wapboxid) return 1;
854  return 0;
855 }
856 
857 static Boxc *route_msg(List *route_info, Msg *msg)
858 {
859  AddrPar *ap;
860  Boxc *conn, *best;
861  int i, b, len;
862 
863  ap = gwlist_search(route_info, msg, cmp_route);
864  if (ap == NULL) {
865  debug("bb.boxc", 0, "Did not find previous routing info for WDP, "
866  "generating new");
867 route:
868 
869  if (gwlist_len(wapbox_list) == 0)
870  return NULL;
871 
873 
874  /* take random wapbox from list, and then check all wapboxes
875  * and select the one with lowest load level - if tied, the first
876  * one
877  */
878  len = gwlist_len(wapbox_list);
879  b = gw_rand() % len;
880  best = gwlist_get(wapbox_list, b);
881 
882  for(i = 0; i < gwlist_len(wapbox_list); i++) {
883  conn = gwlist_get(wapbox_list, (i+b) % len);
884  if (conn != NULL && best != NULL)
885  if (conn->load < best->load)
886  best = conn;
887  }
888  if (best == NULL) {
889  warning(0, "wapbox_list empty!");
891  return NULL;
892  }
893  conn = best;
894  conn->load++; /* simulate new client until we get new values */
895 
896  ap = gw_malloc(sizeof(AddrPar));
897  ap->address = octstr_duplicate(msg->wdp_datagram.source_address);
898  ap->port = msg->wdp_datagram.source_port;
899  ap->wapboxid = conn->id;
900  gwlist_produce(route_info, ap);
901 
903  } else
904  conn = gwlist_search(wapbox_list, ap, cmp_boxc);
905 
906  if (conn == NULL) {
907  /* routing failed; wapbox has disappeared!
908  * ..remove routing info and re-route */
909 
910  debug("bb.boxc", 0, "Old wapbox has disappeared, re-routing");
911 
912  gwlist_delete_equal(route_info, ap);
913  ap_destroy(ap);
914  goto route;
915  }
916  return conn;
917 }
918 
919 
920 /*
921  * this thread listens to incoming_wdp list
922  * and then routs messages to proper wapbox
923  */
924 static void wdp_to_wapboxes(void *arg)
925 {
926  List *route_info;
927  AddrPar *ap;
928  Boxc *conn;
929  Msg *msg;
930  int i;
931 
934 
935  route_info = gwlist_create();
936 
937 
938  while(bb_status != BB_DEAD) {
939 
940  gwlist_consume(suspended); /* block here if suspended */
941 
942  if ((msg = gwlist_consume(incoming_wdp)) == NULL)
943  break;
944 
945  gw_assert(msg_type(msg) == wdp_datagram);
946 
947  conn = route_msg(route_info, msg);
948  if (conn == NULL) {
949  warning(0, "Cannot route message, discard it");
950  msg_destroy(msg);
951  continue;
952  }
953  gwlist_produce(conn->incoming, msg);
954  }
955  debug("bb", 0, "wdp_to_wapboxes: destroying lists");
956  while((ap = gwlist_extract_first(route_info)) != NULL)
957  ap_destroy(ap);
958 
959  gw_assert(gwlist_len(route_info) == 0);
960  gwlist_destroy(route_info, NULL);
961 
963  for(i=0; i < gwlist_len(wapbox_list); i++) {
964  conn = gwlist_get(wapbox_list, i);
966  conn->alive = 0;
967  }
969 
972 }
973 
974 
975 static void wait_for_connections(int fd, void (*function) (void *arg),
976  List *waited, int ssl)
977 {
978  int ret;
979  int timeout = 10; /* 10 sec. */
980 
981  gw_assert(function != NULL);
982 
983  while(bb_status != BB_DEAD) {
984 
985  /* if we are being shutdowned, as long as there is
986  * messages in incoming list allow new connections, but when
987  * list is empty, exit.
988  * Note: We have timeout (defined above) for which we allow new connections.
989  * Otherwise we wait here for ever!
990  */
991  if (bb_status == BB_SHUTDOWN) {
992  ret = gwlist_wait_until_nonempty(waited);
993  if (ret == -1 || !timeout)
994  break;
995  else
996  timeout--;
997  }
998 
999  /* block here if suspended */
1001 
1002  ret = gwthread_pollfd(fd, POLLIN, 1.0);
1003  if (ret > 0) {
1004  Boxc *newconn = accept_boxc(fd, ssl);
1005  if (newconn != NULL) {
1006  gwthread_create(function, newconn);
1007  gwthread_sleep(1.0);
1008  } else {
1009  error(0, "Failed to create new boxc connection.");
1010  }
1011  } else if (ret < 0 && errno != EINTR && errno != EAGAIN)
1012  error(errno, "bb_boxc::wait_for_connections failed");
1013  }
1014 }
1015 
1016 
1017 
1018 static void smsboxc_run(void *arg)
1019 {
1020  int fd;
1021 
1024 
1026  /* XXX add interface_name if required */
1027 
1028  if (fd < 0) {
1029  panic(0, "Could not open smsbox port %ld", smsbox_port);
1030  }
1031 
1032  /*
1033  * infinitely wait for new connections;
1034  * to shut down the system, SIGTERM is send and then
1035  * select drops with error, so we can check the status
1036  */
1038 
1040 
1041  /* continue avalanche */
1043 
1044  /* all connections do the same, so that all must remove() before it
1045  * is completely over
1046  */
1048  gwthread_sleep(1.0);
1049 
1050  /* close listen socket */
1051  close(fd);
1052 
1055 
1056  gwlist_destroy(smsbox_list, NULL);
1057  smsbox_list = NULL;
1059  smsbox_list_rwlock = NULL;
1060 
1061  /* destroy things related to smsbox routing */
1063  smsbox_by_id = NULL;
1065  smsbox_by_smsc = NULL;
1067  smsbox_by_receiver = NULL;
1069  smsbox_by_smsc_receiver = NULL;
1071  smsbox_by_default = NULL;
1072 
1074 }
1075 
1076 
1077 static void wapboxc_run(void *arg)
1078 {
1079  int fd, port;
1080 
1083  port = (int) *((long*)arg);
1084 
1085  fd = make_server_socket(port, NULL);
1086  /* XXX add interface_name if required */
1087 
1088  if (fd < 0) {
1089  panic(0, "Could not open wapbox port %d", port);
1090  }
1091 
1093 
1094  /* continue avalanche */
1095 
1097 
1098 
1099  /* wait for all connections to die and then remove list
1100  */
1101 
1103  gwthread_sleep(1.0);
1104 
1105  /* wait for wdp_to_wapboxes to exit */
1106  while(gwlist_consume(wapbox_list)!=NULL)
1107  ;
1108 
1109  /* close listen socket */
1110  close(fd);
1111 
1112  gwlist_destroy(wapbox_list, NULL);
1113  wapbox_list = NULL;
1114 
1116 }
1117 
1118 #define RELOAD_PANIC(...) \
1119  if (reload) { error(__VA_ARGS__); continue; } \
1120  else panic(__VA_ARGS__);
1121 
1122 /*
1123  * Populates the corresponding smsbox_by_foobar dictionary hash tables
1124  */
1125 static void init_smsbox_routes(Cfg *cfg, int reload)
1126 {
1127  CfgGroup *grp;
1128  List *list, *items;
1129  Octstr *boxc_id, *smsc_ids, *shortcuts;
1130  int i, j;
1131 
1132  boxc_id = smsc_ids = shortcuts = NULL;
1133 
1134  list = cfg_get_multi_group(cfg, octstr_imm("smsbox-route"));
1135 
1136  /* loop multi-group "smsbox-route" */
1137  while (list && (grp = gwlist_extract_first(list)) != NULL) {
1138 
1139  if ((boxc_id = cfg_get(grp, octstr_imm("smsbox-id"))) == NULL) {
1140  grp_dump(grp);
1141  RELOAD_PANIC(0,"'smsbox-route' group without valid 'smsbox-id' directive!");
1142  }
1143 
1144  /*
1145  * If smsc-id is given, then any message comming from the specified
1146  * smsc-id in the list will be routed to this smsbox instance.
1147  * If shortcode is given, then any message with receiver number
1148  * matching those will be routed to this smsbox instance.
1149  * If both are given, then only receiver within shortcode originating
1150  * from smsc-id list will be routed to this smsbox instance. So if both
1151  * are present then this is a logical AND operation.
1152  */
1153  smsc_ids = cfg_get(grp, octstr_imm("smsc-id"));
1154  shortcuts = cfg_get(grp, octstr_imm("shortcode"));
1155 
1156  /* consider now the 3 possibilities: */
1157  if (smsc_ids && !shortcuts) {
1158  /* smsc-id only, so all MO traffic */
1159  items = octstr_split(smsc_ids, octstr_imm(";"));
1160  for (i = 0; i < gwlist_len(items); i++) {
1161  Octstr *item = gwlist_get(items, i);
1162  octstr_strip_blanks(item);
1163 
1164  debug("bb.boxc",0,"Adding smsbox routing to id <%s> for smsc id <%s>",
1165  octstr_get_cstr(boxc_id), octstr_get_cstr(item));
1166 
1167  if (!dict_put_once(smsbox_by_smsc, item, octstr_duplicate(boxc_id))) {
1168  RELOAD_PANIC(0, "Routing for smsc-id <%s> already exists!",
1169  octstr_get_cstr(item));
1170  }
1171  }
1173  octstr_destroy(smsc_ids);
1174  }
1175  else if (!smsc_ids && shortcuts) {
1176  /* shortcode only, so these MOs from all smscs */
1177  items = octstr_split(shortcuts, octstr_imm(";"));
1178  for (i = 0; i < gwlist_len(items); i++) {
1179  Octstr *item = gwlist_get(items, i);
1180  octstr_strip_blanks(item);
1181 
1182  debug("bb.boxc",0,"Adding smsbox routing to id <%s> for receiver no <%s>",
1183  octstr_get_cstr(boxc_id), octstr_get_cstr(item));
1184 
1185  if (!dict_put_once(smsbox_by_receiver, item, octstr_duplicate(boxc_id))) {
1186  RELOAD_PANIC(0, "Routing for receiver no <%s> already exists!",
1187  octstr_get_cstr(item));
1188  }
1189  }
1191  octstr_destroy(shortcuts);
1192  }
1193  else if (smsc_ids && shortcuts) {
1194  /* both, so only specified MOs from specified smscs */
1195  items = octstr_split(shortcuts, octstr_imm(";"));
1196  for (i = 0; i < gwlist_len(items); i++) {
1197  List *subitems;
1198  Octstr *item = gwlist_get(items, i);
1199  octstr_strip_blanks(item);
1200  subitems = octstr_split(smsc_ids, octstr_imm(";"));
1201  for (j = 0; j < gwlist_len(subitems); j++) {
1202  Octstr *subitem = gwlist_get(subitems, j);
1203  octstr_strip_blanks(subitem);
1204 
1205  debug("bb.boxc",0,"Adding smsbox routing to id <%s> "
1206  "for receiver no <%s> and smsc id <%s>",
1207  octstr_get_cstr(boxc_id), octstr_get_cstr(item),
1208  octstr_get_cstr(subitem));
1209 
1210  /* construct the dict key '<shortcode>:<smsc-id>' */
1211  octstr_insert(subitem, item, 0);
1212  octstr_insert_char(subitem, octstr_len(item), ':');
1213  if (!dict_put_once(smsbox_by_smsc_receiver, subitem, octstr_duplicate(boxc_id))) {
1214  RELOAD_PANIC(0, "Routing for receiver:smsc <%s> already exists!",
1215  octstr_get_cstr(subitem));
1216  }
1217  }
1219  }
1221  octstr_destroy(shortcuts);
1222  octstr_destroy(smsc_ids);
1223  }
1224  else { /* !smscids && !shortcuts */
1225  if (!smsbox_by_default) {
1226  debug("bb.boxc",0,"Adding smsbox default routing to id <%s>",
1227  octstr_get_cstr(boxc_id));
1229  } else {
1230  RELOAD_PANIC(0, "Default smsbox routing to id <%s> already exists!",
1232  }
1233  }
1234 
1235  octstr_destroy(boxc_id);
1236  }
1237 
1238  gwlist_destroy(list, NULL);
1239 }
1240 
1241 #undef RELOAD_PANIC
1242 
1243 
1244 /*-------------------------------------------------------------
1245  * public functions
1246  *
1247  * SMSBOX
1248  */
1249 
1251 {
1252  CfgGroup *grp;
1253 
1254  if (smsbox_running) return -1;
1255 
1256  debug("bb", 0, "starting smsbox connection module");
1257 
1258  grp = cfg_get_single_group(cfg, octstr_imm("core"));
1259  if (cfg_get_integer(&smsbox_port, grp, octstr_imm("smsbox-port")) == -1) {
1260  error(0, "Missing smsbox-port variable, cannot start smsboxes");
1261  return -1;
1262  }
1263 #ifdef HAVE_LIBSSL
1264  cfg_get_bool(&smsbox_port_ssl, grp, octstr_imm("smsbox-port-ssl"));
1265 #endif /* HAVE_LIBSSL */
1266 
1267  if (smsbox_port_ssl)
1268  debug("bb", 0, "smsbox connection module is SSL-enabled");
1269 
1270  smsbox_interface = cfg_get(grp, octstr_imm("smsbox-interface"));
1271 
1272  if (cfg_get_integer(&smsbox_max_pending, grp, octstr_imm("smsbox-max-pending")) == -1) {
1274  info(0, "BOXC: 'smsbox-max-pending' not set, using default (%ld).", smsbox_max_pending);
1275  }
1276 
1277  box_allow_ip = cfg_get(grp, octstr_imm("box-allow-ip"));
1278  if (box_allow_ip == NULL)
1280  box_deny_ip = cfg_get(grp, octstr_imm("box-deny-ip"));
1281  if (box_deny_ip == NULL)
1282  box_deny_ip = octstr_create("");
1283  if (box_allow_ip != NULL && box_deny_ip == NULL)
1284  info(0, "Box connection allowed IPs defined without any denied...");
1285 
1286  smsbox_list = gwlist_create(); /* have a list of connections */
1288  if (!boxid)
1289  boxid = counter_create();
1290 
1291  /* the smsbox routing specific inits */
1292  smsbox_by_id = dict_create(10, (void(*)(void *)) boxc_gwlist_destroy);
1293  smsbox_by_smsc = dict_create(30, (void(*)(void *)) octstr_destroy);
1294  smsbox_by_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
1295  smsbox_by_smsc_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
1296  smsbox_by_default = NULL;
1297 
1298  /* load the defined smsbox routing rules */
1299  init_smsbox_routes(cfg, 0);
1300 
1303 
1304  smsbox_running = 1;
1305 
1306  if ((sms_dequeue_thread = gwthread_create(sms_to_smsboxes, NULL)) == -1)
1307  panic(0, "Failed to start a new thread for smsbox routing");
1308 
1309  if (gwthread_create(smsboxc_run, NULL) == -1)
1310  panic(0, "Failed to start a new thread for smsbox connections");
1311 
1312  return 0;
1313 }
1314 
1315 
1317 {
1318  if (!smsbox_running) return -1;
1319 
1324  smsbox_by_smsc = dict_create(30, (void(*)(void *)) octstr_destroy);
1325  smsbox_by_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
1326  smsbox_by_smsc_receiver = dict_create(50, (void(*)(void *)) octstr_destroy);
1328  smsbox_by_default = NULL;
1329  init_smsbox_routes(cfg, 1);
1331 
1332  return 0;
1333 }
1334 
1335 
1336 
1337 /* WAPBOX */
1338 
1340 {
1341  CfgGroup *grp;
1342 
1343  if (wapbox_running) return -1;
1344 
1345  debug("bb", 0, "starting wapbox connection module");
1346 
1347  grp = cfg_get_single_group(cfg, octstr_imm("core"));
1348 
1349  if (cfg_get_integer(&wapbox_port, grp, octstr_imm("wapbox-port")) == -1) {
1350  error(0, "Missing wapbox-port variable, cannot start WAP");
1351  return -1;
1352  }
1353 #ifdef HAVE_LIBSSL
1354  cfg_get_bool(&wapbox_port_ssl, grp, octstr_imm("wapbox-port-ssl"));
1355 #endif /* HAVE_LIBSSL */
1356 
1357  if (box_allow_ip == NULL) {
1358  box_allow_ip = cfg_get(grp, octstr_imm("box-allow-ip"));
1359  if (box_allow_ip == NULL)
1361  }
1362  if (box_deny_ip == NULL) {
1363  box_deny_ip = cfg_get(grp, octstr_imm("box-deny-ip"));
1364  if (box_deny_ip == NULL)
1365  box_deny_ip = octstr_create("");
1366  }
1367  if (box_allow_ip != NULL && box_deny_ip == NULL)
1368  info(0, "Box connection allowed IPs defined without any denied...");
1369 
1370  wapbox_list = gwlist_create(); /* have a list of connections */
1372  if (!boxid)
1373  boxid = counter_create();
1374 
1375  if (gwthread_create(wdp_to_wapboxes, NULL) == -1)
1376  panic(0, "Failed to start a new thread for wapbox routing");
1377 
1379  panic(0, "Failed to start a new thread for wapbox connections");
1380 
1381  wapbox_running = 1;
1382  return 0;
1383 }
1384 
1385 
1386 Octstr *boxc_status(int status_type)
1387 {
1388  Octstr *tmp;
1389  char *lb, *ws;
1390  int i, boxes, para = 0;
1391  time_t orig, t;
1392  Boxc *bi;
1393 
1394  orig = time(NULL);
1395 
1396  /*
1397  * XXX: this will cause segmentation fault if this is called
1398  * between 'destroy_list and setting list to NULL calls.
1399  * Ok, this has to be fixed, but now I am too tired.
1400  */
1401 
1402  if ((lb = bb_status_linebreak(status_type))==NULL)
1403  return octstr_create("Un-supported format");
1404 
1405  if (status_type == BBSTATUS_HTML)
1406  ws = "&nbsp;&nbsp;&nbsp;&nbsp;";
1407  else if (status_type == BBSTATUS_TEXT)
1408  ws = " ";
1409  else
1410  ws = "";
1411 
1412  if (status_type == BBSTATUS_HTML || status_type == BBSTATUS_WML)
1413  para = 1;
1414 
1415  if (status_type == BBSTATUS_XML) {
1416  tmp = octstr_create ("");
1417  octstr_append_cstr(tmp, "<boxes>\n\t");
1418  }
1419  else
1420  tmp = octstr_format("%sBox connections:%s", para ? "<p>" : "", lb);
1421  boxes = 0;
1422 
1423  if (wapbox_list) {
1425  for(i=0; i < gwlist_len(wapbox_list); i++) {
1426  bi = gwlist_get(wapbox_list, i);
1427  if (bi->alive == 0)
1428  continue;
1429  t = orig - bi->connect_time;
1430  if (status_type == BBSTATUS_XML)
1432  "<box>\n\t\t<type>wapbox</type>\n\t\t<IP>%s</IP>\n"
1433  "\t\t<status>on-line %ldd %ldh %ldm %lds</status>\n"
1434  "\t\t<ssl>%s</ssl>\n\t</box>\n",
1436  t/3600/24, t/3600%24, t/60%60, t%60,
1437 #ifdef HAVE_LIBSSL
1438  conn_get_ssl(bi->conn) != NULL ? "yes" : "no"
1439 #else
1440  "not installed"
1441 #endif
1442  );
1443  else
1445  "%swapbox, IP %s (on-line %ldd %ldh %ldm %lds) %s %s",
1446  ws, octstr_get_cstr(bi->client_ip),
1447  t/3600/24, t/3600%24, t/60%60, t%60,
1448 #ifdef HAVE_LIBSSL
1449  conn_get_ssl(bi->conn) != NULL ? "using SSL" : "",
1450 #else
1451  "",
1452 #endif
1453  lb);
1454  boxes++;
1455  }
1457  }
1458  if (smsbox_list) {
1460  for(i=0; i < gwlist_len(smsbox_list); i++) {
1461  bi = gwlist_get(smsbox_list, i);
1462  if (bi->alive == 0)
1463  continue;
1464  t = orig - bi->connect_time;
1465  if (status_type == BBSTATUS_XML)
1466  octstr_format_append(tmp, "<box>\n\t\t<type>smsbox</type>\n"
1467  "\t\t<id>%s</id>\n\t\t<IP>%s</IP>\n"
1468  "\t\t<queue>%ld</queue>\n"
1469  "\t\t<status>on-line %ldd %ldh %ldm %lds</status>\n"
1470  "\t\t<ssl>%s</ssl>\n\t</box>",
1471  (bi->boxc_id ? octstr_get_cstr(bi->boxc_id) : ""),
1473  gwlist_len(bi->incoming) + dict_key_count(bi->sent),
1474  t/3600/24, t/3600%24, t/60%60, t%60,
1475 #ifdef HAVE_LIBSSL
1476  conn_get_ssl(bi->conn) != NULL ? "yes" : "no"
1477 #else
1478  "not installed"
1479 #endif
1480  );
1481  else
1482  octstr_format_append(tmp, "%ssmsbox:%s, IP %s (%ld queued), (on-line %ldd %ldh %ldm %lds) %s %s",
1483  ws, (bi->boxc_id ? octstr_get_cstr(bi->boxc_id) : "(none)"),
1485  t/3600/24, t/3600%24, t/60%60, t%60,
1486 #ifdef HAVE_LIBSSL
1487  conn_get_ssl(bi->conn) != NULL ? "using SSL" : "",
1488 #else
1489  "",
1490 #endif
1491  lb);
1492  boxes++;
1493  }
1495  }
1496  if (boxes == 0 && status_type != BBSTATUS_XML) {
1497  octstr_destroy(tmp);
1498  tmp = octstr_format("%sNo boxes connected", para ? "<p>" : "");
1499  }
1500  if (para)
1501  octstr_append_cstr(tmp, "</p>");
1502  if (status_type == BBSTATUS_XML)
1503  octstr_append_cstr(tmp, "</boxes>\n");
1504  else
1505  octstr_append_cstr(tmp, "\n\n");
1506  return tmp;
1507 }
1508 
1509 
1511 {
1512  int i, q = 0;
1513  Boxc *boxc;
1514 
1515  if (wapbox_list) {
1517  for(i=0; i < gwlist_len(wapbox_list); i++) {
1518  boxc = gwlist_get(wapbox_list, i);
1519  q += gwlist_len(boxc->incoming);
1520  }
1522  }
1523  return q;
1524 }
1525 
1526 
1527 void boxc_cleanup(void)
1528 {
1531  box_allow_ip = NULL;
1532  box_deny_ip = NULL;
1534  boxid = NULL;
1536  smsbox_interface = NULL;
1537 }
1538 
1539 
1540 /*
1541  * Route the incoming message to one of the following input queues:
1542  * a specific smsbox conn
1543  * a random smsbox conn if no shortcut routing and msg->sms.boxc_id match
1544  *
1545  * BEWARE: All logic inside here should be fast, hence speed processing
1546  * optimized, because every single MO message passes this function and we
1547  * have to ensure that no unnecessary overhead is done.
1548  */
1550 {
1551  Boxc *bc = NULL;
1552  Octstr *s, *r, *rs, *boxc_id = NULL;
1553  long len, b, i;
1554  int full_found = 0;
1555 
1556  gw_assert(msg_type(msg) == sms);
1557 
1558  /* msg_dump(msg, 0); */
1559 
1560  /* Check we have at least one smsbox connected! */
1562  if (gwlist_len(smsbox_list) == 0) {
1564  warning(0, "smsbox_list empty!");
1565  if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
1567  return 0;
1568  } else {
1569  return -1;
1570  }
1571  }
1572 
1573  /*
1574  * Do we have a specific smsbox-id route to pass this msg to?
1575  */
1576  if (octstr_len(msg->sms.boxc_id) > 0) {
1577  boxc_id = msg->sms.boxc_id;
1578  } else {
1579  /*
1580  * Check if we have a "smsbox-route" for this msg.
1581  * Where the shortcode route has a higher priority then the smsc-id rule.
1582  * Highest priority has the combined <shortcode>:<smsc-id> route.
1583  */
1584  Octstr *os = octstr_format("%s:%s",
1585  octstr_get_cstr(msg->sms.receiver),
1586  octstr_get_cstr(msg->sms.smsc_id));
1587  s = (msg->sms.smsc_id ? dict_get(smsbox_by_smsc, msg->sms.smsc_id) : NULL);
1588  r = (msg->sms.receiver ? dict_get(smsbox_by_receiver, msg->sms.receiver) : NULL);
1589  rs = (os ? dict_get(smsbox_by_smsc_receiver, os) : NULL);
1590  octstr_destroy(os);
1591 
1592  if (rs)
1593  boxc_id = rs;
1594  else if (r)
1595  boxc_id = r;
1596  else if (s)
1597  boxc_id = s;
1598  else if (smsbox_by_default)
1599  boxc_id = smsbox_by_default;
1600  }
1601 
1602  /* We have a specific smsbox-id to use */
1603  if (boxc_id != NULL) {
1604 
1605  List *boxc_id_list = dict_get(smsbox_by_id, boxc_id);
1606  if (gwlist_len(boxc_id_list) == 0) {
1607  /*
1608  * something is wrong, this was the smsbox connection we used
1609  * for sending, so it seems this smsbox is gone
1610  */
1611  warning(0, "Could not route message to smsbox id <%s>, smsbox is gone!",
1612  octstr_get_cstr(boxc_id));
1614  if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
1616  return 0;
1617  } else {
1618  return -1;
1619  }
1620  }
1621 
1622  /*
1623  * Take random smsbox from list, as long as it has space we will use it,
1624  * otherwise check the next one.
1625  */
1626  len = gwlist_len(boxc_id_list);
1627  b = gw_rand() % len;
1628 
1629  for (i = 0; i < len; i++) {
1630  bc = gwlist_get(boxc_id_list, (i+b) % len);
1631 
1632  if (bc != NULL && max_incoming_sms_qlength > 0 &&
1634  bc = NULL;
1635  }
1636 
1637  if (bc != NULL) {
1638  break;
1639  }
1640  }
1641 
1642  if (bc != NULL) {
1643  bc->load++;
1644  gwlist_produce(bc->incoming, msg);
1646  return 1; /* we are done */
1647  }
1648  else {
1649  /*
1650  * we have routing defined, but no smsbox connected at the moment.
1651  * put msg into global incoming queue and wait until smsbox with
1652  * such boxc_id connected.
1653  */
1655  if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
1657  return 0;
1658  } else {
1659  return -1;
1660  }
1661  }
1662  }
1663 
1664  /*
1665  * Ok, none of the specific routing things applied previously,
1666  * so route it to a random smsbox.
1667  * Take random smsbox from list, as long as it has space we will
1668  * use it, therwise check the next one.
1669  */
1670  len = gwlist_len(smsbox_list);
1671  b = gw_rand() % len;
1672 
1673  for (i = 0; i < len; i++) {
1674  bc = gwlist_get(smsbox_list, (i+b) % len);
1675 
1676  if (bc->boxc_id != NULL || bc->routable == 0)
1677  bc = NULL;
1678 
1679  if (bc != NULL && max_incoming_sms_qlength > 0 &&
1681  full_found = 1;
1682  bc = NULL;
1683  }
1684 
1685  if (bc != NULL) {
1686  break;
1687  }
1688  }
1689 
1690  if (bc != NULL) {
1691  bc->load++;
1692  gwlist_produce(bc->incoming, msg);
1693  }
1694 
1696 
1697  if (bc == NULL && full_found == 0) {
1698  warning(0, "smsbox_list empty!");
1699  if (max_incoming_sms_qlength < 0 || max_incoming_sms_qlength > gwlist_len(incoming_sms)) {
1701  return 0;
1702  } else {
1703  return -1;
1704  }
1705  } else if (bc == NULL && full_found == 1) {
1706  return -1;
1707  }
1708 
1709  return 1;
1710 }
1711 
1712 
1713 static void sms_to_smsboxes(void *arg)
1714 {
1715  Msg *newmsg, *startmsg, *msg;
1716  long i, len;
1717  int ret = -1;
1718  Boxc *boxc;
1719 
1721 
1722  newmsg = startmsg = msg = NULL;
1723 
1724  while (bb_status != BB_SHUTDOWN && bb_status != BB_DEAD) {
1725 
1726  if (newmsg == startmsg) {
1727  /* check if we are in shutdown phase */
1729  break;
1730 
1731  if (ret == 0 || ret == -1) {
1732  /* debug("", 0, "time to sleep"); */
1733  gwthread_sleep(60.0);
1734  /* debug("", 0, "wake up list len %ld", gwlist_len(incoming_sms)); */
1735  /* shutdown ? */
1737  break;
1738  }
1739  startmsg = msg = gwlist_consume(incoming_sms);
1740  /* debug("", 0, "gwlist_consume done 1"); */
1741  newmsg = NULL;
1742  }
1743  else {
1744  newmsg = msg = gwlist_consume(incoming_sms);
1745 
1746  /* Back at the first message? */
1747  if (newmsg == startmsg) {
1749  continue;
1750  }
1751  }
1752 
1753  if (msg == NULL)
1754  break;
1755 
1756  gw_assert(msg_type(msg) == sms);
1757 
1758  /* debug("bb.sms", 0, "sms_boxc_router: handling message (%p vs %p)",
1759  msg, startmsg); */
1760 
1761  ret = route_incoming_to_boxc(msg);
1762  if (ret == 1)
1763  startmsg = newmsg = NULL;
1764  else if (ret == -1) {
1766  }
1767  }
1768 
1770  len = gwlist_len(smsbox_list);
1771  for (i=0; i < len; i++) {
1772  boxc = gwlist_get(smsbox_list, i);
1774  }
1776 
1778 }
1779 
1780 
1781 /*
1782  * Simple wrapper to allow the named smsbox Lists to be
1783  * destroyed when the smsbox_by_id Dict is destroyed
1784  *
1785  */
1786 static void boxc_gwlist_destroy(List *list)
1787 {
1788  gwlist_destroy(list, NULL);
1789 }
1790 
Dict * dict_create(long size_hint, void(*destroy_value)(void *))
Definition: dict.c:192
void msg_dump(Msg *msg, int level)
Definition: msg.c:152
Connection * conn
Definition: bb_boxc.c:136
Octstr * address
Definition: bb_boxc.c:825
static volatile sig_atomic_t smsbox_running
Definition: bb_boxc.c:103
void error(int err, const char *fmt,...)
Definition: log.c:648
static void run_wapbox(void *arg)
Definition: bb_boxc.c:761
void info(int err, const char *fmt,...)
Definition: log.c:672
int boxc_incoming_wdp_queue(void)
Definition: bb_boxc.c:1510
List * incoming
Definition: opensmppbox.c:154
static Boxc * route_msg(List *route_info, Msg *msg)
Definition: bb_boxc.c:857
Msg * msg_duplicate(Msg *msg)
Definition: msg.c:111
Definition: http.c:2014
void * gwlist_search(List *list, void *pattern, int(*cmp)(void *, void *))
Definition: list.c:486
static void sms_to_smsboxes(void *arg)
Definition: bb_boxc.c:1713
#define msg_unpack(os)
Definition: msg.h:183
List * outgoing_wdp
Definition: bearerbox.c:88
static Msg * read_from_box(Boxc *boxconn)
Definition: bb_boxc.c:166
gw_assert(wtls_machine->packet_to_send !=NULL)
int ssl
void dict_put(Dict *dict, Octstr *key, void *value)
Definition: dict.c:240
void counter_destroy(Counter *counter)
Definition: counter.c:110
void gwlist_append(List *list, void *item)
Definition: list.c:179
char * bb_status_linebreak(int status_type)
Definition: bearerbox.c:1109
static int wapbox_port_ssl
Definition: bb_boxc.c:120
static void boxc_destroy(Boxc *boxc)
Definition: bb_boxc.c:605
Octstr * client_ip
Definition: opensmppbox.c:153
static Dict * smsbox_by_smsc
Definition: bb_boxc.c:111
void semaphore_destroy(Semaphore *semaphore)
Definition: gw-semaphore.c:104
void gwlist_produce(List *list, void *item)
Definition: list.c:411
List * outgoing
Definition: opensmppbox.c:156
void gwthread_join(long thread)
static RWLock * smsbox_list_rwlock
Definition: bb_boxc.c:107
long gwlist_len(List *list)
Definition: list.c:166
int(* store_save_ack)(Msg *msg, ack_status_t status)
Definition: bb_store.c:73
void gw_rwlock_destroy(RWLock *lock)
Definition: gw-rwlock.c:112
int gw_rwlock_wrlock(RWLock *lock)
Definition: gw-rwlock.c:177
volatile sig_atomic_t restart
Definition: bearerbox.c:149
void * gwlist_get(List *list, long pos)
Definition: list.c:292
static int cmp_boxc(void *bc, void *ap)
Definition: bb_boxc.c:848
msg_type
Definition: msg.h:73
static void boxc_sent_pop(Boxc *, Msg *, Msg **)
Definition: bb_boxc.c:504
#define cfg_get(grp, varname)
Definition: cfg.h:86
int load
Definition: opensmppbox.c:148
void uuid_unparse(const uuid_t uu, char *out)
Definition: gw_uuid.c:562
List * incoming_sms
Definition: bearerbox.c:84
static int smsbox_port_ssl
Definition: bb_boxc.c:117
RWLock * gw_rwlock_create(void)
Definition: gw-rwlock.c:77
#define msg_create(type)
Definition: msg.h:136
int gw_rwlock_rdlock(RWLock *lock)
Definition: gw-rwlock.c:134
void octstr_append_cstr(Octstr *ostr, const char *cstr)
Definition: octstr.c:1511
static Cfg * cfg
Definition: opensmppbox.c:95
static Octstr * box_deny_ip
Definition: bb_boxc.c:126
int conn_eof(Connection *conn)
Definition: conn.c:705
void octstr_strip_blanks(Octstr *text)
Definition: octstr.c:1346
int gwlist_wait_until_nonempty(List *list)
Definition: list.c:361
#define octstr_get_cstr(ostr)
Definition: octstr.h:233
unsigned long counter_increase(Counter *counter)
Definition: counter.c:123
static Octstr * smsbox_by_default
Definition: bb_boxc.c:114
int is_allowed_ip(Octstr *allow_ip, Octstr *deny_ip, Octstr *ip)
Definition: utils.c:815
static void boxc_receiver(void *arg)
Definition: bb_boxc.c:302
static Dict * smsbox_by_receiver
Definition: bb_boxc.c:112
volatile sig_atomic_t alive
Definition: opensmppbox.c:159
#define RELOAD_PANIC(...)
Definition: bb_boxc.c:1118
void semaphore_down(Semaphore *semaphore)
Definition: gw-semaphore.c:132
void gwlist_unlock(List *list)
Definition: list.c:354
List * retry
Definition: opensmppbox.c:155
volatile sig_atomic_t bb_status
Definition: bearerbox.c:132
static int port
Definition: fakesmsc.c:121
static Boxc * boxc_create(int fd, Octstr *ip, int ssl)
Definition: bb_boxc.c:588
#define POLLIN
Definition: gwpoll.h:91
long max_incoming_sms_qlength
Definition: bearerbox.c:98
Octstr * octstr_imm(const char *cstr)
Definition: octstr.c:283
static int send_msg(Boxc *boxconn, Msg *pmsg)
Definition: bb_boxc.c:456
Definition: msg.h:79
Definition: cfg.c:164
void octstr_insert(Octstr *ostr1, const Octstr *ostr2, long pos)
Definition: octstr.c:1303
void * dict_remove(Dict *dict, Octstr *key)
Definition: dict.c:307
Octstr * boxc_id
Definition: opensmppbox.c:160
Counter * counter_create(void)
Definition: counter.c:94
struct _addrpar AddrPar
void * gwlist_extract_first(List *list)
Definition: list.c:305
List * outgoing_sms
Definition: bearerbox.c:85
void grp_dump(CfgGroup *grp)
Definition: cfg.c:811
static void wait_for_connections(int fd, void(*function)(void *arg), List *waited, int ssl)
Definition: bb_boxc.c:975
static void ap_destroy(AddrPar *addr)
Definition: bb_boxc.c:830
void * dict_get(Dict *dict, Octstr *key)
Definition: dict.c:286
List * suspended
Definition: bearerbox.c:122
void gwlist_remove_producer(List *list)
Definition: list.c:401
void conn_destroy(Connection *conn)
Definition: conn.c:627
static void deliver_sms_to_queue(Msg *msg, Boxc *conn)
Definition: bb_boxc.c:214
void octstr_insert_char(Octstr *ostr, long pos, const char c)
Definition: octstr.c:1481
int port
Definition: bb_boxc.c:826
int route_incoming_to_boxc(Msg *msg)
Definition: bb_boxc.c:1549
List * incoming_wdp
Definition: bearerbox.c:87
Definition: dict.c:116
#define octstr_duplicate(ostr)
Definition: octstr.h:187
List * cfg_get_multi_group(Cfg *cfg, Octstr *name)
Definition: cfg.c:645
static Dict * smsbox_by_id
Definition: bb_boxc.c:110
int smsbox_start(Cfg *cfg)
Definition: bb_boxc.c:1250
long dict_key_count(Dict *dict)
Definition: dict.c:335
long gwlist_delete_equal(List *list, void *item)
Definition: list.c:266
void uuid_copy(uuid_t dst, const uuid_t src)
Definition: gw_uuid.c:150
void msg_destroy(Msg *msg)
Definition: msg.c:132
int gw_rwlock_unlock(RWLock *lock)
Definition: gw-rwlock.c:155
static Octstr * smsbox_interface
Definition: bb_boxc.c:118
static void smsboxc_run(void *arg)
Definition: bb_boxc.c:1018
static void wapboxc_run(void *arg)
Definition: bb_boxc.c:1077
static void boxc_gwlist_destroy(List *list)
Definition: bb_boxc.c:1786
int make_server_socket(int port, const char *interface_name)
Definition: socket.c:93
time_t connect_time
Definition: opensmppbox.c:151
void warning(int err, const char *fmt,...)
Definition: log.c:660
Octstr * octstr_format(const char *fmt,...)
Definition: octstr.c:2464
void octstr_destroy(Octstr *ostr)
Definition: octstr.c:324
#define gwthread_create(func, arg)
Definition: gwthread.h:90
#define octstr_create(cstr)
Definition: octstr.h:125
void octstr_destroy_item(void *os)
Definition: octstr.c:336
static long smsbox_max_pending
Definition: bb_boxc.c:123
void gwthread_sleep(double seconds)
static Dict * smsbox_by_smsc_receiver
Definition: bb_boxc.c:113
Octstr * conn_read_withlen(Connection *conn)
Definition: conn.c:1169
int conn_write_withlen(Connection *conn, Octstr *data)
Definition: conn.c:1075
int smsbox_restart(Cfg *cfg)
Definition: bb_boxc.c:1316
static Counter * boxid
Definition: bb_boxc.c:129
void gwlist_insert(List *list, long pos, void *item)
Definition: list.c:214
#define UUID_STR_LEN
Definition: gw_uuid.h:19
int gwthread_pollfd(int fd, int events, double timeout)
void gwlist_lock(List *list)
Definition: list.c:347
void boxc_cleanup(void)
Definition: bb_boxc.c:1527
void semaphore_up(Semaphore *semaphore)
Definition: gw-semaphore.c:118
static void init_smsbox_routes(Cfg *cfg, int reload)
Definition: bb_boxc.c:1125
long octstr_len(const Octstr *ostr)
Definition: octstr.c:342
void dict_destroy(Dict *dict)
Definition: dict.c:215
struct _boxc Boxc
int cfg_get_bool(int *n, CfgGroup *grp, Octstr *varname)
Definition: cfg.c:759
Definition: octstr.c:118
int conn_wait(Connection *conn, double seconds)
Definition: conn.c:904
void * gwlist_consume(List *list)
Definition: list.c:427
Octstr * host_ip(struct sockaddr_in addr)
Definition: socket.c:615
static long smsbox_port
Definition: bb_boxc.c:116
void debug(const char *place, int err, const char *fmt,...)
Definition: log.c:726
int cfg_get_integer(long *n, CfgGroup *grp, Octstr *varname)
Definition: cfg.c:742
int wapbox_start(Cfg *cfg)
Definition: bb_boxc.c:1339
#define panic
Definition: log.h:87
void gwthread_wakeup(long thread)
Definition: cfg.c:73
int socklen_t
Definition: socket.h:73
static Octstr * box_allow_ip
Definition: bb_boxc.c:125
void octstr_format_append(Octstr *os, const char *fmt,...)
Definition: octstr.c:2507
static List * smsbox_list
Definition: bb_boxc.c:106
static void wdp_to_wapboxes(void *arg)
Definition: bb_boxc.c:924
static void boxc_sent_push(Boxc *, Msg *)
Definition: bb_boxc.c:484
Octstr * msg_pack(Msg *msg)
Definition: msg.c:181
List * dict_keys(Dict *dict)
Definition: dict.c:347
#define gwlist_create()
Definition: list.h:136
static Boxc * accept_boxc(int fd, int ssl)
Definition: bb_boxc.c:621
static List * wapbox_list
Definition: bb_boxc.c:105
Octstr * boxc_status(int status_type)
Definition: bb_boxc.c:1386
int(* store_save)(Msg *msg)
Definition: bb_store.c:72
static void boxc_sender(void *arg)
Definition: bb_boxc.c:533
static long sms_dequeue_thread
Definition: bb_boxc.c:132
long id
Definition: opensmppbox.c:147
int dict_put_once(Dict *dict, Octstr *key, void *value)
Definition: dict.c:271
int conn_error(Connection *conn)
Definition: conn.c:716
int is_wap
Definition: opensmppbox.c:146
#define MAIN_THREAD_ID
Definition: gwthread.h:77
List * flow_threads
Definition: bearerbox.c:116
Dict * sent
Definition: opensmppbox.c:157
static void run_smsbox(void *arg)
Definition: bb_boxc.c:665
int wapboxid
Definition: bb_boxc.c:827
CfgGroup * cfg_get_single_group(Cfg *cfg, Octstr *name)
Definition: cfg.c:639
void gwlist_add_producer(List *list)
Definition: list.c:383
volatile int routable
Definition: opensmppbox.c:167
static volatile sig_atomic_t wapbox_running
Definition: bb_boxc.c:104
Semaphore * pending
Definition: opensmppbox.c:158
Semaphore * semaphore_create(long n)
Definition: gw-semaphore.c:81
int gw_rand(void)
Definition: protected.c:174
static int cmp_route(void *ap, void *ms)
Definition: bb_boxc.c:836
List * octstr_split(const Octstr *os, const Octstr *sep)
Definition: octstr.c:1640
Definition: list.c:102
static XMLRPCDocument * msg
Definition: test_xmlrpc.c:86
static long wapbox_port
Definition: bb_boxc.c:119
#define SMSBOX_MAX_PENDING
Definition: bb_boxc.c:83
int gwlist_producer_count(List *list)
Definition: list.c:391
Connection * conn_wrap_fd(int fd, int ssl)
Definition: conn.c:566
int conn_flush(Connection *conn)
Definition: conn.c:995
int octstr_compare(const Octstr *ostr1, const Octstr *ostr2)
Definition: octstr.c:871
long smsc2_rout(Msg *msg, int resend)
Definition: bb_smscconn.c:1716
void gwlist_destroy(List *list, gwlist_item_destructor_t *destructor)
Definition: list.c:145
See file LICENSE for details about the license agreement for using, modifying, copying or deriving work from this software.