| version 1.2.2.28, 2012/06/26 14:51:33 | version 1.4.4.1, 2013/01/18 10:17:22 | 
| Line 1 | Line 1 | 
 |  | /************************************************************************* | 
 |  | * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com> | 
 |  | *  by Michael Pounov <misho@openbsd-bg.org> | 
 |  | * | 
 |  | * $Author$ | 
 |  | * $Id$ | 
 |  | * | 
 |  | ************************************************************************** | 
 |  | The ELWIX and AITNET software is distributed under the following | 
 |  | terms: | 
 |  |  | 
 |  | All of the documentation and software included in the ELWIX and AITNET | 
 |  | Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org> | 
 |  |  | 
 |  | Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 | 
 |  | by Michael Pounov <misho@elwix.org>.  All rights reserved. | 
 |  |  | 
 |  | Redistribution and use in source and binary forms, with or without | 
 |  | modification, are permitted provided that the following conditions | 
 |  | are met: | 
 |  | 1. Redistributions of source code must retain the above copyright | 
 |  | notice, this list of conditions and the following disclaimer. | 
 |  | 2. Redistributions in binary form must reproduce the above copyright | 
 |  | notice, this list of conditions and the following disclaimer in the | 
 |  | documentation and/or other materials provided with the distribution. | 
 |  | 3. All advertising materials mentioning features or use of this software | 
 |  | must display the following acknowledgement: | 
 |  | This product includes software developed by Michael Pounov <misho@elwix.org> | 
 |  | ELWIX - Embedded LightWeight unIX and its contributors. | 
 |  | 4. Neither the name of AITNET nor the names of its contributors | 
 |  | may be used to endorse or promote products derived from this software | 
 |  | without specific prior written permission. | 
 |  |  | 
 |  | THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND | 
 |  | ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | 
 |  | IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | 
 |  | ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE | 
 |  | FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | 
 |  | DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS | 
 |  | OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) | 
 |  | HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | 
 |  | LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | 
 |  | OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF | 
 |  | SUCH DAMAGE. | 
 |  | */ | 
 | #include "global.h" | #include "global.h" | 
 | #include "mqttd.h" | #include "mqttd.h" | 
 |  | #include "utils.h" | 
 | #include "rtlm.h" | #include "rtlm.h" | 
 | #include "mqttd_calls.h" | #include "mqttd_calls.h" | 
 |  |  | 
| Line 9  mkPkt(void * __restrict data, int dlen) | Line 55  mkPkt(void * __restrict data, int dlen) | 
 | { | { | 
 | ait_val_t *p = NULL; | ait_val_t *p = NULL; | 
 |  |  | 
| if (!(p = io_allocVar())) { | if (!(p = ait_allocVar())) { | 
| ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError()); | EVERBOSE(7, "Error:: in send packet prepare #%d - %s", | 
|  | elwix_GetErrno(), elwix_GetError()); | 
 | return NULL; | return NULL; | 
 | } | } | 
 |  |  | 
| Line 26  freePkt(ait_val_t ** __restrict p) | Line 73  freePkt(ait_val_t ** __restrict p) | 
 | if (!p) | if (!p) | 
 | return; | return; | 
 |  |  | 
| io_freeVar(p); | ait_freeVar(p); | 
 | } | } | 
 |  |  | 
 | static void * | static void * | 
| Line 37  sendPacket(sched_task_t *task) | Line 84  sendPacket(sched_task_t *task) | 
 | u_char *pos; | u_char *pos; | 
 |  |  | 
 | if (!p || AIT_ISEMPTY(p)) { | if (!p || AIT_ISEMPTY(p)) { | 
| ioDEBUG(9, "Error:: invalid packet or found empty content ..."); | EVERBOSE(9, "Error:: invalid packet or found empty content ..."); | 
 | return NULL; | return NULL; | 
 | } | } | 
 |  |  | 
 |  | EVERBOSE(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task)); | 
 |  |  | 
 | for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) { | for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) { | 
 | n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL); | n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL); | 
 | if (n == -1) { | if (n == -1) { | 
| ioSYSERR(0); | ESYSERR(0); | 
 | break; | break; | 
 | } | } | 
 | } | } | 
| Line 53  sendPacket(sched_task_t *task) | Line 102  sendPacket(sched_task_t *task) | 
 | return NULL; | return NULL; | 
 | } | } | 
 |  |  | 
 | /* --------------------------------------------------- */ |  | 
 |  |  | 
 | static int | static int | 
| pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen) | search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos) | 
 | { | { | 
 | ait_val_t *p = NULL; |  | 
 | struct tagSession *s = NULL; |  | 
 | struct tagStore *st = NULL; |  | 
 | regex_t re; | regex_t re; | 
 | regmatch_t match; | regmatch_t match; | 
| int ret; | ait_val_t *p = NULL; | 
|  | struct tagSession *s = NULL; | 
|  | struct tagStore *st_, *st = NULL; | 
 | char szStr[STRSIZ]; | char szStr[STRSIZ]; | 
 |  | int ret; | 
 |  |  | 
 |  | assert(sess); | 
 |  |  | 
 | TAILQ_FOREACH(s, &Sessions, sess_node) { | TAILQ_FOREACH(s, &Sessions, sess_node) { | 
| SLIST_FOREACH(st, &s->sess_subscr, st_node) { | SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) { | 
| if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) { | /* check for QoS */ | 
| regerror(ret, &re, szStr, sizeof szStr); | if (st->st_subscr.sub_ret >= qos) { | 
|  | if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) { | 
|  | regerror(ret, &re, szStr, sizeof szStr); | 
|  | regfree(&re); | 
|  | EVERBOSE(3, "Error:: regcomp(%s) %s\n", (char*) | 
|  | st->st_subscr.sub_topic.msg_base, szStr); | 
|  | } | 
|  | if (!regexec(&re, topic, 1, &match, 0)) { | 
|  | /* MATCH */ | 
|  | p = mkPkt(sess->sess_buf->msg_base, datlen); | 
|  | schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0); | 
|  | } | 
|  |  | 
 | regfree(&re); | regfree(&re); | 
 | ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*) |  | 
 | st->st_subscr.sub_topic.msg_base, szStr); |  | 
 | } | } | 
 | if (!regexec(&re, psTopic, 1, &match, 0)) { |  | 
 | /* MATCH */ |  | 
 | p = mkPkt(sess->sess_buf->msg_base, datlen); |  | 
 | schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0); |  | 
 | } |  | 
 |  |  | 
 | regfree(&re); |  | 
 | } | } | 
 | } | } | 
 |  |  | 
 | return 0; | return 0; | 
 | } | } | 
 |  |  | 
 |  | /* --------------------------------------------------- */ | 
 |  |  | 
 |  | void * | 
 |  | sendRetain(sched_task_t *task) | 
 |  | { | 
 |  | mqtt_subscr_t *subs, *s; | 
 |  | struct tagSession *sess; | 
 |  | int siz; | 
 |  |  | 
 |  | ETRACE(2); | 
 |  |  | 
 |  | assert(task); | 
 |  |  | 
 |  | sess = TASK_ARG(task); | 
 |  | assert(sess); | 
 |  |  | 
 |  | if (!sess->sess_buf) { | 
 |  | EVERBOSE(9, "WARNING! No allocated buffer!?!\n"); | 
 |  | return NULL; | 
 |  | } | 
 |  |  | 
 |  | subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1); | 
 |  | if (!subs) | 
 |  | return NULL; | 
 |  |  | 
 |  | for (s = subs; s && s->sub_topic.msg_base; s++) { | 
 |  | siz = s->sub_value.msg_len; | 
 |  | memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base, | 
 |  | MIN(sess->sess_buf->msg_len, s->sub_value.msg_len)); | 
 |  | EVERBOSE(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n", | 
 |  | siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len); | 
 |  | if (siz > 0) | 
 |  | search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret); | 
 |  | } | 
 |  |  | 
 |  | mqtt_subFree(&subs); | 
 |  | return NULL; | 
 |  | } | 
 |  |  | 
 |  | int | 
 |  | pubWill(struct tagSession * __restrict sess) | 
 |  | { | 
 |  | int datlen; | 
 |  |  | 
 |  | ETRACE(2); | 
 |  |  | 
 |  | /* prepare will packet */ | 
 |  | datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0, | 
 |  | sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0); | 
 |  | if (datlen == -1) | 
 |  | return -1;      /* error */ | 
 |  |  | 
 |  | return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK); | 
 |  | } | 
 |  |  | 
 | static int | static int | 
 |  | pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen) | 
 |  | { | 
 |  | return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE); | 
 |  | } | 
 |  |  | 
 |  | static int | 
 | pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen) | pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen) | 
 | { | { | 
| ait_val_t *p = NULL; | struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | 
| struct tagSession *s = NULL; |  | 
| struct tagStore *st = NULL; |  | 
| regex_t re; |  | 
| regmatch_t match; |  | 
| int ret; |  | 
| char szStr[STRSIZ]; |  | 
| struct mqtthdr *hdr; |  | 
 |  |  | 
 | p = mkPkt(sess->sess_buf->msg_base, datlen); |  | 
 | hdr = (struct mqtthdr*) sess->sess_buf->msg_base; |  | 
 |  |  | 
 | /* write topic to database */ | /* write topic to database */ | 
 | call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, | call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, | 
 | sess->sess_addr, hdr->mqtt_msg.retain); | sess->sess_addr, hdr->mqtt_msg.retain); | 
| call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), | call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, | 
| sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain); | sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, | 
|  | hdr->mqtt_msg.qos, hdr->mqtt_msg.retain); | 
 |  |  | 
| TAILQ_FOREACH(s, &Sessions, sess_node) { | search4send(sess, psTopic, datlen, MQTT_QOS_ACK); | 
| SLIST_FOREACH(st, &s->sess_subscr, st_node) { |  | 
| /* check for QoS */ |  | 
| if (st->st_subscr.sub_ret < MQTT_QOS_ACK) |  | 
| continue; |  | 
 |  |  | 
 | if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) { |  | 
 | regerror(ret, &re, szStr, sizeof szStr); |  | 
 | regfree(&re); |  | 
 | ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*) |  | 
 | st->st_subscr.sub_topic.msg_base, szStr); |  | 
 | } |  | 
 | if (!regexec(&re, psTopic, 1, &match, 0)) { |  | 
 | /* MATCH */ |  | 
 | schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), |  | 
 | s->sess_sock, NULL, 0); |  | 
 | } |  | 
 |  |  | 
 | regfree(&re); |  | 
 | } |  | 
 | } |  | 
 |  |  | 
 | /* delete not retain message */ | /* delete not retain message */ | 
 | call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, | call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, | 
 | sess->sess_addr, 0); | sess->sess_addr, 0); | 
 |  |  | 
 | freePkt(&p); |  | 
 | return 0; | return 0; | 
 | } | } | 
 |  |  | 
 | static int | static int | 
 | pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen) | pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen) | 
 | { | { | 
| ait_val_t *p = NULL; | struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | 
| struct tagSession *s = NULL; |  | 
| struct tagStore *st = NULL; |  | 
| regex_t re; |  | 
| regmatch_t match; |  | 
| int ret; |  | 
| char szStr[STRSIZ]; |  | 
| struct mqtthdr *hdr; |  | 
 |  |  | 
 | p = mkPkt(sess->sess_buf->msg_base, datlen); |  | 
 | hdr = (struct mqtthdr*) sess->sess_buf->msg_base; |  | 
 |  |  | 
 | /* write topic to database */ | /* write topic to database */ | 
 | call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, | call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, | 
 | sess->sess_addr, hdr->mqtt_msg.retain); | sess->sess_addr, hdr->mqtt_msg.retain); | 
| call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), | call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, | 
| sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain); | sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, | 
|  | hdr->mqtt_msg.qos, hdr->mqtt_msg.retain); | 
 |  |  | 
| TAILQ_FOREACH(s, &Sessions, sess_node) { | return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY); | 
| SLIST_FOREACH(st, &s->sess_subscr, st_node) { |  | 
| /* check for QoS */ |  | 
| if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY) |  | 
| continue; |  | 
|  |  | 
| if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) { |  | 
| regerror(ret, &re, szStr, sizeof szStr); |  | 
| regfree(&re); |  | 
| ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*) |  | 
| st->st_subscr.sub_topic.msg_base, szStr); |  | 
| } |  | 
| if (!regexec(&re, psTopic, 1, &match, 0)) { |  | 
| /* MATCH */ |  | 
| schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), |  | 
| s->sess_sock, NULL, 0); |  | 
| } |  | 
|  |  | 
| regfree(&re); |  | 
| } |  | 
| } |  | 
|  |  | 
| freePkt(&p); |  | 
| return 0; |  | 
 | } | } | 
 |  |  | 
 |  |  | 
| Line 196  cmdPUBLISH(void *srv, int len, void *arg) | Line 246  cmdPUBLISH(void *srv, int len, void *arg) | 
 | u_short mid = 0; | u_short mid = 0; | 
 | ait_val_t *p = NULL; | ait_val_t *p = NULL; | 
 |  |  | 
| ioTRACE(2); | ETRACE(2); | 
 |  |  | 
 | if (!sess) | if (!sess) | 
 | return -1; | return -1; | 
 |  |  | 
| ioDEBUG(5, "Exec PUBLISH session"); | EVERBOSE(5, "Exec PUBLISH session"); | 
 | siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL); | siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | EVERBOSE(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 | return 0; | return 0; | 
 | } | } | 
 |  |  | 
| Line 215  cmdPUBLISH(void *srv, int len, void *arg) | Line 265  cmdPUBLISH(void *srv, int len, void *arg) | 
 | return 0; | return 0; | 
 | siz = mqtt_msgPUBACK(sess->sess_buf, mid); | siz = mqtt_msgPUBACK(sess->sess_buf, mid); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", | EVERBOSE(5, "Error:: in msgPUBACK #%d - %s", | 
 | mqtt_GetErrno(), mqtt_GetError()); | mqtt_GetErrno(), mqtt_GetError()); | 
 | return 0; | return 0; | 
 | } | } | 
| Line 225  cmdPUBLISH(void *srv, int len, void *arg) | Line 275  cmdPUBLISH(void *srv, int len, void *arg) | 
 | return 0; | return 0; | 
 | siz = mqtt_msgPUBREC(sess->sess_buf, mid); | siz = mqtt_msgPUBREC(sess->sess_buf, mid); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", | EVERBOSE(5, "Error:: in msgPUBREC #%d - %s", | 
 | mqtt_GetErrno(), mqtt_GetError()); | mqtt_GetErrno(), mqtt_GetError()); | 
 | return 0; | return 0; | 
 | } | } | 
| Line 250  cmdPUBREL(void *srv, int len, void *arg) | Line 300  cmdPUBREL(void *srv, int len, void *arg) | 
 | u_short mid = 0; | u_short mid = 0; | 
 | ait_val_t *p = NULL; | ait_val_t *p = NULL; | 
 |  |  | 
| ioTRACE(2); | ETRACE(2); | 
 |  |  | 
 | if (!sess) | if (!sess) | 
 | return -1; | return -1; | 
 |  |  | 
| ioDEBUG(5, "Exec PUBREL session"); | EVERBOSE(5, "Exec PUBREL session"); | 
 | mid = mqtt_readPUBREL(sess->sess_buf); | mid = mqtt_readPUBREL(sess->sess_buf); | 
 | if (mid == (u_short) -1) { | if (mid == (u_short) -1) { | 
| ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | EVERBOSE(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 | return 0; | return 0; | 
 | } | } | 
 |  |  | 
| // TODO:: Delete from database topic | /* delete not retain message */ | 
|  | call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, | 
|  | sess->sess_addr, 0); | 
 |  |  | 
 | siz = mqtt_msgPUBCOMP(sess->sess_buf, mid); | siz = mqtt_msgPUBCOMP(sess->sess_buf, mid); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | EVERBOSE(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 | return 0; | return 0; | 
 | } else { |  | 
 | p = mkPkt(sess->sess_buf->msg_base, siz); |  | 
 | memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); |  | 
 | } | } | 
 |  |  | 
 |  | p = mkPkt(sess->sess_buf->msg_base, siz); | 
 |  | memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | 
 | schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); | schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); | 
 | return 0; | return 0; | 
 | } | } | 
| Line 290  cmdSUBSCRIBE(void *srv, int len, void *arg) | Line 341  cmdSUBSCRIBE(void *srv, int len, void *arg) | 
 | void *ptr; | void *ptr; | 
 | ait_val_t *p = NULL; | ait_val_t *p = NULL; | 
 |  |  | 
| ioTRACE(2); | ETRACE(2); | 
 |  |  | 
 | if (!sess) | if (!sess) | 
 | return -1; | return -1; | 
 |  |  | 
| ioDEBUG(5, "Exec SUBSCRIBE session"); | EVERBOSE(5, "Exec SUBSCRIBE session"); | 
 | siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs); | siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | EVERBOSE(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 | return 0; | return 0; | 
 | } | } | 
 |  |  | 
 | /* add to db */ | /* add to db */ | 
| for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) { | for (i = 0; i < siz; i++) { | 
| /* convert topic to sql search statement */ | store = e_malloc(sizeof(struct tagStore)); | 
| if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) { | if (!store) { | 
| ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | ELIBERR(elwix); | 
 | continue; | continue; | 
 |  | } else { | 
 |  | store->st_msgid = mid; | 
 |  | mqtt_subCopy(&store->st_subscr, &subs[i]); | 
 |  | subs[i].sub_ret = MQTT_QOS_DENY; | 
 | } | } | 
| if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, |  | 
| sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) { | /* add to cache */ | 
| store = io_malloc(sizeof(struct tagStore)); | SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); | 
| if (!store) { |  | 
| ioSYSERR(0); | /* convert topic to regexp */ | 
|  | if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) { | 
|  | EVERBOSE(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
|  | } else { | 
|  | ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1); | 
|  | if (!ptr) { | 
|  | ESYSERR(0); | 
 | continue; | continue; | 
 | } else { | } else { | 
| store->st_msgid = mid; | store->st_subscr.sub_topic.msg_base = ptr; | 
| mqtt_subCopy(&store->st_subscr, &subs[i]); | store->st_subscr.sub_topic.msg_len = strlen(buf) + 1; | 
|  | memcpy(store->st_subscr.sub_topic.msg_base, buf, | 
|  | store->st_subscr.sub_topic.msg_len); | 
 | } | } | 
 |  |  | 
| /* add to cache */ | /* store to db */ | 
| SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); | call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, | 
|  | sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret); | 
|  | /* subscribe pass */ | 
|  | subs[i].sub_ret = MQTT_QOS_PASS; | 
 |  |  | 
| /* convert topic to regexp */ | call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid, | 
| if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) { | store->st_subscr.sub_topic.msg_base, | 
| ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | store->st_subscr.sub_topic.msg_len, | 
|  | store->st_subscr.sub_ret, sess->sess_addr); | 
| subs[i].sub_ret = MQTT_QOS_DENY; |  | 
| } else { |  | 
| ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1); |  | 
| if (!ptr) |  | 
| ioSYSERR(0); |  | 
| else { |  | 
| store->st_subscr.sub_topic.msg_base = ptr; |  | 
| store->st_subscr.sub_topic.msg_len = strlen(buf) + 1; |  | 
| memcpy(store->st_subscr.sub_topic.msg_base, buf, |  | 
| store->st_subscr.sub_topic.msg_len); |  | 
| } |  | 
|  |  | 
| call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, |  | 
| store->st_subscr.sub_topic.msg_base, |  | 
| store->st_subscr.sub_topic.msg_len, sess->sess_addr); |  | 
|  |  | 
| subs[i].sub_ret = MQTT_QOS_PASS; |  | 
| } |  | 
 | } | } | 
 | } | } | 
 |  |  | 
 | /* send acknowledge */ | /* send acknowledge */ | 
 | siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid); | siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | EVERBOSE(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 | goto end; | goto end; | 
 | } else { | } else { | 
 | p = mkPkt(sess->sess_buf->msg_base, siz); | p = mkPkt(sess->sess_buf->msg_base, siz); | 
| Line 375  cmdUNSUBSCRIBE(void *srv, int len, void *arg) | Line 423  cmdUNSUBSCRIBE(void *srv, int len, void *arg) | 
 | struct tagStore *store, *tmp; | struct tagStore *store, *tmp; | 
 | ait_val_t *p = NULL; | ait_val_t *p = NULL; | 
 |  |  | 
| ioTRACE(2); | ETRACE(2); | 
 |  |  | 
 | if (!sess) | if (!sess) | 
 | return -1; | return -1; | 
 |  |  | 
| ioDEBUG(5, "Exec UNSUBSCRIBE session"); | EVERBOSE(5, "Exec UNSUBSCRIBE session"); | 
 | siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs); | siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | EVERBOSE(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 | return 0; | return 0; | 
 | } | } | 
 |  |  | 
| Line 400  cmdUNSUBSCRIBE(void *srv, int len, void *arg) | Line 448  cmdUNSUBSCRIBE(void *srv, int len, void *arg) | 
 | free(store->st_subscr.sub_topic.msg_base); | free(store->st_subscr.sub_topic.msg_base); | 
 | if (store->st_subscr.sub_value.msg_base) | if (store->st_subscr.sub_value.msg_base) | 
 | free(store->st_subscr.sub_value.msg_base); | free(store->st_subscr.sub_value.msg_base); | 
| io_free(store); | e_free(store); | 
 | } | } | 
 | } | } | 
 |  |  | 
| Line 411  cmdUNSUBSCRIBE(void *srv, int len, void *arg) | Line 459  cmdUNSUBSCRIBE(void *srv, int len, void *arg) | 
 | /* send acknowledge */ | /* send acknowledge */ | 
 | siz = mqtt_msgUNSUBACK(sess->sess_buf, mid); | siz = mqtt_msgUNSUBACK(sess->sess_buf, mid); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | EVERBOSE(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 | goto end; | goto end; | 
 | } else { | } else { | 
 | p = mkPkt(sess->sess_buf->msg_base, siz); | p = mkPkt(sess->sess_buf->msg_base, siz); | 
| Line 431  cmdPINGREQ(void *srv, int len, void *arg) | Line 479  cmdPINGREQ(void *srv, int len, void *arg) | 
 | int siz = 0; | int siz = 0; | 
 | ait_val_t *p = NULL; | ait_val_t *p = NULL; | 
 |  |  | 
| ioTRACE(2); | ETRACE(2); | 
 |  |  | 
 | if (!sess) | if (!sess) | 
 | return -1; | return -1; | 
 |  |  | 
| ioDEBUG(5, "Exec PINGREQ session"); | EVERBOSE(5, "Exec PINGREQ session"); | 
 | siz = mqtt_msgPINGRESP(sess->sess_buf); | siz = mqtt_msgPINGRESP(sess->sess_buf); | 
 | if (siz == -1) { | if (siz == -1) { | 
| ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | EVERBOSE(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | 
 | return 0; | return 0; | 
 | } else { | } else { | 
 | p = mkPkt(sess->sess_buf->msg_base, siz); | p = mkPkt(sess->sess_buf->msg_base, siz); | 
| Line 456  cmdCONNECT(void *srv, int len, void *arg) | Line 504  cmdCONNECT(void *srv, int len, void *arg) | 
 | struct tagStore *store; | struct tagStore *store; | 
 | struct tagSession *sess = (struct tagSession*) arg; | struct tagSession *sess = (struct tagSession*) arg; | 
 |  |  | 
| ioTRACE(2); | ETRACE(2); | 
 |  |  | 
 | if (!sess) | if (!sess) | 
 | return -1; | return -1; | 
 |  |  | 
| ioDEBUG(5, "Exec CONNECT session"); | EVERBOSE(5, "Exec CONNECT session"); | 
 | TAILQ_REMOVE(&Sessions, sess, sess_node); | TAILQ_REMOVE(&Sessions, sess, sess_node); | 
 |  |  | 
 |  | schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL); | 
 |  |  | 
 | if (sess->sess_clean) { | if (sess->sess_clean) { | 
 | if (call.FiniSessPUB) | if (call.FiniSessPUB) | 
 | call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); | call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); | 
| Line 481  cmdCONNECT(void *srv, int len, void *arg) | Line 531  cmdCONNECT(void *srv, int len, void *arg) | 
 | if (store->st_subscr.sub_value.msg_base) | if (store->st_subscr.sub_value.msg_base) | 
 | free(store->st_subscr.sub_value.msg_base); | free(store->st_subscr.sub_value.msg_base); | 
 |  |  | 
| io_free(store); | e_free(store); | 
 | } | } | 
 |  |  | 
 |  | if (sess->sess_will.flag) | 
 |  | pubWill(sess); | 
 |  |  | 
 | if (sess->sess_will.msg) | if (sess->sess_will.msg) | 
 | free(sess->sess_will.msg); | free(sess->sess_will.msg); | 
 | if (sess->sess_will.topic) | if (sess->sess_will.topic) | 
| Line 500  cmdDISCONNECT(void *srv, int len, void *arg) | Line 553  cmdDISCONNECT(void *srv, int len, void *arg) | 
 | { | { | 
 | struct tagSession *sess = (struct tagSession*) arg; | struct tagSession *sess = (struct tagSession*) arg; | 
 |  |  | 
| ioTRACE(2); | ETRACE(2); | 
 |  |  | 
 | if (!sess) | if (!sess) | 
 | return -1; | return -1; | 
 |  |  | 
| ioDEBUG(5, "Exec DISCONNECT session"); | EVERBOSE(5, "Exec DISCONNECT session"); | 
 |  |  | 
 | call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, | call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, | 
 | sess->sess_addr, sess->sess_user); | sess->sess_addr, sess->sess_user); |