Annotation of mqtt/src/mqttd_calls.c, revision 1.5

1.4       misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.5     ! misho       6: * $Id: mqttd_calls.c,v 1.4.4.2 2013/07/16 14:34:58 misho Exp $
1.4       misho       7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
1.5     ! misho      15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.4       misho      16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
1.2       misho      46: #include "global.h"
                     47: #include "mqttd.h"
1.3       misho      48: #include "utils.h"
                     49: #include "rtlm.h"
1.2       misho      50: #include "mqttd_calls.h"
                     51: 
                     52: 
1.3       misho      53: static inline ait_val_t *
                     54: mkPkt(void * __restrict data, int dlen)
                     55: {
                     56:        ait_val_t *p = NULL;
                     57: 
1.5     ! misho      58:        if (!(p = ait_allocVar())) {
        !            59:                EVERBOSE(7, "Error:: in send packet prepare #%d - %s", 
        !            60:                                elwix_GetErrno(), elwix_GetError());
1.3       misho      61:                return NULL;
                     62:        }
                     63: 
                     64:        if (data && dlen > 0)
                     65:                AIT_SET_BUF(p, data, dlen);
                     66: 
                     67:        return p;
                     68: }
                     69: 
                     70: static inline void
                     71: freePkt(ait_val_t ** __restrict p)
                     72: {
                     73:        if (!p)
                     74:                return;
                     75: 
1.5     ! misho      76:        ait_freeVar(p);
1.3       misho      77: }
                     78: 
                     79: static void *
                     80: sendPacket(sched_task_t *task)
                     81: {
                     82:        ait_val_t *p = TASK_ARG(task);
                     83:        register int n, slen;
                     84:        u_char *pos;
                     85: 
                     86:        if (!p || AIT_ISEMPTY(p)) {
1.5     ! misho      87:                EVERBOSE(9, "Error:: invalid packet or found empty content ...");
1.3       misho      88:                return NULL;
                     89:        }
                     90: 
1.5     ! misho      91:        EVERBOSE(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task));
1.3       misho      92: 
                     93:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
                     94:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     95:                if (n == -1) {
1.5     ! misho      96:                        ESYSERR(0);
1.3       misho      97:                        break;
                     98:                }
                     99:        }
                    100: 
                    101:        freePkt(&p);
                    102:        return NULL;
                    103: }
                    104: 
                    105: static int
                    106: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
                    107: {
                    108:        regex_t re;
                    109:        regmatch_t match;
                    110:        ait_val_t *p = NULL;
                    111:        struct tagSession *s = NULL;
                    112:        struct tagStore *st_, *st = NULL;
                    113:        char szStr[STRSIZ];
                    114:        int ret;
                    115: 
                    116:        assert(sess);
                    117: 
                    118:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                    119:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
                    120:                        /* check for QoS */
                    121:                        if (st->st_subscr.sub_ret >= qos) {
                    122:                                if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    123:                                        regerror(ret, &re, szStr, sizeof szStr);
                    124:                                        regfree(&re);
1.5     ! misho     125:                                        EVERBOSE(3, "Error:: regcomp(%s) %s\n", (char*)
1.3       misho     126:                                                        st->st_subscr.sub_topic.msg_base, szStr);
                    127:                                }
                    128:                                if (!regexec(&re, topic, 1, &match, 0)) {
                    129:                                        /* MATCH */
                    130:                                        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    131:                                        schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
                    132:                                }
                    133: 
                    134:                                regfree(&re);
                    135:                        }
                    136:                }
                    137:        }
                    138: 
                    139:        return 0;
                    140: }
                    141: 
                    142: /* --------------------------------------------------- */
                    143: 
                    144: void *
                    145: sendRetain(sched_task_t *task)
                    146: {
                    147:        mqtt_subscr_t *subs, *s;
                    148:        struct tagSession *sess;
                    149:        int siz;
                    150: 
1.5     ! misho     151:        ETRACE();
1.3       misho     152: 
                    153:        assert(task);
                    154: 
                    155:        sess = TASK_ARG(task);
                    156:        assert(sess);
                    157: 
                    158:        if (!sess->sess_buf) {
1.5     ! misho     159:                EVERBOSE(9, "WARNING! No allocated buffer!?!\n");
1.3       misho     160:                return NULL;
                    161:        }
                    162: 
                    163:        subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1);
                    164:        if (!subs)
                    165:                return NULL;
                    166: 
                    167:        for (s = subs; s && s->sub_topic.msg_base; s++) {
                    168:                siz = s->sub_value.msg_len;
                    169:                memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base, 
                    170:                                MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
1.5     ! misho     171:                EVERBOSE(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n", 
1.3       misho     172:                                siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
                    173:                if (siz > 0)
                    174:                        search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
                    175:        }
                    176: 
                    177:        mqtt_subFree(&subs);
                    178:        return NULL;
                    179: }
                    180: 
1.2       misho     181: int
1.3       misho     182: pubWill(struct tagSession * __restrict sess)
                    183: {
                    184:        int datlen;
                    185: 
1.5     ! misho     186:        ETRACE();
1.3       misho     187: 
                    188:        /* prepare will packet */
                    189:        datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0, 
                    190:                        sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
                    191:        if (datlen == -1)
                    192:                return -1;      /* error */
                    193: 
                    194:        return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
                    195: }
                    196: 
                    197: static int
                    198: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
                    199: {
                    200:        return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
                    201: }
                    202: 
                    203: static int
                    204: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
                    205: {
                    206:        struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    207: 
                    208:        /* write topic to database */
                    209:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    210:                        sess->sess_addr, hdr->mqtt_msg.retain);
                    211:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, 
                    212:                        sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, 
                    213:                        hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
                    214: 
                    215:        search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
                    216: 
                    217:        /* delete not retain message */
                    218:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    219:                        sess->sess_addr, 0);
                    220:        return 0;
                    221: }
                    222: 
                    223: static int
                    224: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
                    225: {
                    226:        struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    227: 
                    228:        /* write topic to database */
                    229:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    230:                        sess->sess_addr, hdr->mqtt_msg.retain);
                    231:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, 
                    232:                        sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, 
                    233:                        hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
                    234: 
                    235:        return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
                    236: }
                    237: 
                    238: 
                    239: int
                    240: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     241: {
                    242:        struct mqtthdr *hdr;
1.3       misho     243:        struct tagSession *sess = (struct tagSession*) arg;
                    244:        char szTopic[STRSIZ] = { 0 };
                    245:        int siz = 0;
                    246:        u_short mid = 0;
                    247:        ait_val_t *p = NULL;
1.2       misho     248: 
1.5     ! misho     249:        ETRACE();
1.2       misho     250: 
                    251:        if (!sess)
                    252:                return -1;
                    253: 
1.5     ! misho     254:        EVERBOSE(5, "Exec PUBLISH session");
1.3       misho     255:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
                    256:        if (siz == -1) {
1.5     ! misho     257:                EVERBOSE(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     258:                return 0;
                    259:        }
                    260: 
1.2       misho     261:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    262:        switch (hdr->mqtt_msg.qos) {
                    263:                case MQTT_QOS_ACK:
1.3       misho     264:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    265:                                return 0;
                    266:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    267:                        if (siz == -1) {
1.5     ! misho     268:                                EVERBOSE(5, "Error:: in msgPUBACK #%d - %s", 
1.3       misho     269:                                                mqtt_GetErrno(), mqtt_GetError());
                    270:                                return 0;
                    271:                        }
1.2       misho     272:                        break;
                    273:                case MQTT_QOS_EXACTLY:
1.3       misho     274:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    275:                                return 0;
                    276:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    277:                        if (siz == -1) {
1.5     ! misho     278:                                EVERBOSE(5, "Error:: in msgPUBREC #%d - %s", 
1.3       misho     279:                                                mqtt_GetErrno(), mqtt_GetError());
                    280:                                return 0;
                    281:                        }
1.2       misho     282:                        break;
1.3       misho     283:                case MQTT_QOS_ONCE:
                    284:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     285:                default:
                    286:                        return 0;
                    287:        }
                    288: 
1.3       misho     289:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    290:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    291:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     292:        return 0;
                    293: }
1.3       misho     294: 
                    295: int
                    296: cmdPUBREL(void *srv, int len, void *arg)
                    297: {
                    298:        struct tagSession *sess = (struct tagSession*) arg;
                    299:        int siz = 0;
                    300:        u_short mid = 0;
                    301:        ait_val_t *p = NULL;
                    302: 
1.5     ! misho     303:        ETRACE();
1.3       misho     304: 
                    305:        if (!sess)
                    306:                return -1;
                    307: 
1.5     ! misho     308:        EVERBOSE(5, "Exec PUBREL session");
1.3       misho     309:        mid = mqtt_readPUBREL(sess->sess_buf);
                    310:        if (mid == (u_short) -1) {
1.5     ! misho     311:                EVERBOSE(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     312:                return 0;
                    313:        }
                    314: 
                    315:        /* delete not retain message */
                    316:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, 
                    317:                        sess->sess_addr, 0);
                    318: 
                    319:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    320:        if (siz == -1) {
1.5     ! misho     321:                EVERBOSE(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     322:                return 0;
                    323:        }
                    324: 
                    325:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    326:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    327:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
                    328:        return 0;
                    329: }
                    330: 
                    331: int
                    332: cmdSUBSCRIBE(void *srv, int len, void *arg)
                    333: {
                    334:        struct tagSession *sess = (struct tagSession*) arg;
                    335:        mqtt_subscr_t *subs = NULL;
                    336:        int siz = 0;
                    337:        u_short mid = 0;
                    338:        register int i;
                    339:        struct tagStore *store;
                    340:        char buf[BUFSIZ];
                    341:        void *ptr;
                    342:        ait_val_t *p = NULL;
                    343: 
1.5     ! misho     344:        ETRACE();
1.3       misho     345: 
                    346:        if (!sess)
                    347:                return -1;
                    348: 
1.5     ! misho     349:        EVERBOSE(5, "Exec SUBSCRIBE session");
1.3       misho     350:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    351:        if (siz == -1) {
1.5     ! misho     352:                EVERBOSE(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     353:                return 0;
                    354:        }
                    355: 
                    356:        /* add to db */
                    357:        for (i = 0; i < siz; i++) {
1.5     ! misho     358:                store = e_malloc(sizeof(struct tagStore));
1.3       misho     359:                if (!store) {
1.5     ! misho     360:                        ELIBERR(elwix);
1.3       misho     361:                        continue;
                    362:                } else {
                    363:                        store->st_msgid = mid;
                    364:                        mqtt_subCopy(&store->st_subscr, &subs[i]);
                    365:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    366:                }
                    367: 
                    368:                /* add to cache */
                    369:                SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    370: 
                    371:                /* convert topic to regexp */
                    372:                if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.5     ! misho     373:                        EVERBOSE(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     374:                } else {
                    375:                        ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    376:                        if (!ptr) {
1.5     ! misho     377:                                ESYSERR(0);
1.3       misho     378:                                continue;
                    379:                        } else {
                    380:                                store->st_subscr.sub_topic.msg_base = ptr;
                    381:                                store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    382:                                memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    383:                                                store->st_subscr.sub_topic.msg_len);
                    384:                        }
                    385: 
                    386:                        /* store to db */
                    387:                        call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
                    388:                                        sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
                    389:                        /* subscribe pass */
                    390:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    391: 
                    392:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid, 
                    393:                                        store->st_subscr.sub_topic.msg_base, 
                    394:                                        store->st_subscr.sub_topic.msg_len, 
                    395:                                        store->st_subscr.sub_ret, sess->sess_addr);
                    396:                }
                    397:        }
                    398: 
                    399:        /* send acknowledge */
                    400:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    401:        if (siz == -1) {
1.5     ! misho     402:                EVERBOSE(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     403:                goto end;
                    404:        } else {
                    405:                p = mkPkt(sess->sess_buf->msg_base, siz);
                    406:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    407:        }
                    408: 
                    409:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
                    410: end:
                    411:        mqtt_subFree(&subs);
                    412:        return 0;
                    413: }
                    414: 
                    415: int
                    416: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
                    417: {
                    418:        struct tagSession *sess = (struct tagSession*) arg;
                    419:        mqtt_subscr_t *subs = NULL;
                    420:        int siz = 0;
                    421:        u_short mid = 0;
                    422:        register int i;
                    423:        struct tagStore *store, *tmp;
                    424:        ait_val_t *p = NULL;
                    425: 
1.5     ! misho     426:        ETRACE();
1.3       misho     427: 
                    428:        if (!sess)
                    429:                return -1;
                    430: 
1.5     ! misho     431:        EVERBOSE(5, "Exec UNSUBSCRIBE session");
1.3       misho     432:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    433:        if (siz == -1) {
1.5     ! misho     434:                EVERBOSE(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     435:                return 0;
                    436:        }
                    437: 
                    438:        /* del from db */
                    439:        for (i = 0; i < siz; i++) {
                    440:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    441:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    442:                                        store->st_subscr.sub_topic.msg_base && 
                    443:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    444:                                                subs[i].sub_topic.msg_base)) {
                    445:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
                    446: 
                    447:                                if (store->st_subscr.sub_topic.msg_base)
                    448:                                        free(store->st_subscr.sub_topic.msg_base);
                    449:                                if (store->st_subscr.sub_value.msg_base)
                    450:                                        free(store->st_subscr.sub_value.msg_base);
1.5     ! misho     451:                                e_free(store);
1.3       misho     452:                        }
                    453:                }
                    454: 
                    455:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    456:                                sess->sess_user, "%");
                    457:        }
                    458: 
                    459:        /* send acknowledge */
                    460:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    461:        if (siz == -1) {
1.5     ! misho     462:                EVERBOSE(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     463:                goto end;
                    464:        } else {
                    465:                p = mkPkt(sess->sess_buf->msg_base, siz);
                    466:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    467:        }
                    468: 
                    469:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
                    470: end:
                    471:        mqtt_subFree(&subs);
                    472:        return 0;
                    473: }
                    474: 
                    475: int
                    476: cmdPINGREQ(void *srv, int len, void *arg)
                    477: {
                    478:        struct tagSession *sess = (struct tagSession*) arg;
                    479:        int siz = 0;
                    480:        ait_val_t *p = NULL;
                    481: 
1.5     ! misho     482:        ETRACE();
1.3       misho     483: 
                    484:        if (!sess)
                    485:                return -1;
                    486: 
1.5     ! misho     487:        EVERBOSE(5, "Exec PINGREQ session");
1.3       misho     488:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    489:        if (siz == -1) {
1.5     ! misho     490:                EVERBOSE(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.3       misho     491:                return 0;
                    492:        } else {
                    493:                p = mkPkt(sess->sess_buf->msg_base, siz);
                    494:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    495:        }
                    496: 
                    497:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
                    498:        return 0;
                    499: }
                    500: 
                    501: int
                    502: cmdCONNECT(void *srv, int len, void *arg)
                    503: {
                    504:        struct tagStore *store;
                    505:        struct tagSession *sess = (struct tagSession*) arg;
                    506: 
1.5     ! misho     507:        ETRACE();
1.3       misho     508: 
                    509:        if (!sess)
                    510:                return -1;
                    511: 
1.5     ! misho     512:        EVERBOSE(5, "Exec CONNECT session");
1.3       misho     513:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    514: 
                    515:        schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
                    516: 
                    517:        if (sess->sess_clean) {
                    518:                if (call.FiniSessPUB)
                    519:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    520:                if (call.DeletePUB_subscribe)
                    521:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    522:                if (call.WipePUB_topic)
                    523:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    524:        }
                    525: 
                    526:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    527:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
                    528: 
                    529:                if (store->st_subscr.sub_topic.msg_base)
                    530:                        free(store->st_subscr.sub_topic.msg_base);
                    531:                if (store->st_subscr.sub_value.msg_base)
                    532:                        free(store->st_subscr.sub_value.msg_base);
                    533: 
1.5     ! misho     534:                e_free(store);
1.3       misho     535:        }
                    536: 
                    537:        if (sess->sess_will.flag)
                    538:                pubWill(sess);
                    539: 
                    540:        if (sess->sess_will.msg)
                    541:                free(sess->sess_will.msg);
                    542:        if (sess->sess_will.topic)
                    543:                free(sess->sess_will.topic);
                    544: 
                    545:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    546:                        sess->sess_addr, sess->sess_user);
                    547: 
                    548:        return -3;      /* reconnect client */
                    549: }
                    550: 
                    551: int
                    552: cmdDISCONNECT(void *srv, int len, void *arg)
                    553: {
                    554:        struct tagSession *sess = (struct tagSession*) arg;
                    555: 
1.5     ! misho     556:        ETRACE();
1.3       misho     557: 
                    558:        if (!sess)
                    559:                return -1;
                    560: 
1.5     ! misho     561:        EVERBOSE(5, "Exec DISCONNECT session");
1.3       misho     562: 
                    563:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    564:                        sess->sess_addr, sess->sess_user);
                    565: 
                    566:        return -2;      /* must terminate dispatcher */
                    567: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>