Annotation of mqtt/src/mqttd_calls.c, revision 1.4

1.4     ! misho       1: /*************************************************************************
        !             2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
        !             3: *  by Michael Pounov <misho@openbsd-bg.org>
        !             4: *
        !             5: * $Author: misho $
        !             6: * $Id: mqttd_calls.c,v 1.3.2.1 2012/07/03 12:22:56 misho Exp $
        !             7: *
        !             8: **************************************************************************
        !             9: The ELWIX and AITNET software is distributed under the following
        !            10: terms:
        !            11: 
        !            12: All of the documentation and software included in the ELWIX and AITNET
        !            13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
        !            14: 
        !            15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
        !            16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
        !            17: 
        !            18: Redistribution and use in source and binary forms, with or without
        !            19: modification, are permitted provided that the following conditions
        !            20: are met:
        !            21: 1. Redistributions of source code must retain the above copyright
        !            22:    notice, this list of conditions and the following disclaimer.
        !            23: 2. Redistributions in binary form must reproduce the above copyright
        !            24:    notice, this list of conditions and the following disclaimer in the
        !            25:    documentation and/or other materials provided with the distribution.
        !            26: 3. All advertising materials mentioning features or use of this software
        !            27:    must display the following acknowledgement:
        !            28: This product includes software developed by Michael Pounov <misho@elwix.org>
        !            29: ELWIX - Embedded LightWeight unIX and its contributors.
        !            30: 4. Neither the name of AITNET nor the names of its contributors
        !            31:    may be used to endorse or promote products derived from this software
        !            32:    without specific prior written permission.
        !            33: 
        !            34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
        !            35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
        !            36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
        !            37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
        !            38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
        !            39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
        !            40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
        !            41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
        !            42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
        !            43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
        !            44: SUCH DAMAGE.
        !            45: */
1.2       misho      46: #include "global.h"
                     47: #include "mqttd.h"
1.3       misho      48: #include "utils.h"
                     49: #include "rtlm.h"
1.2       misho      50: #include "mqttd_calls.h"
                     51: 
                     52: 
1.3       misho      53: static inline ait_val_t *
                     54: mkPkt(void * __restrict data, int dlen)
                     55: {
                     56:        ait_val_t *p = NULL;
                     57: 
                     58:        if (!(p = io_allocVar())) {
                     59:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     60:                return NULL;
                     61:        }
                     62: 
                     63:        if (data && dlen > 0)
                     64:                AIT_SET_BUF(p, data, dlen);
                     65: 
                     66:        return p;
                     67: }
                     68: 
                     69: static inline void
                     70: freePkt(ait_val_t ** __restrict p)
                     71: {
                     72:        if (!p)
                     73:                return;
                     74: 
                     75:        io_freeVar(p);
                     76: }
                     77: 
                     78: static void *
                     79: sendPacket(sched_task_t *task)
                     80: {
                     81:        ait_val_t *p = TASK_ARG(task);
                     82:        register int n, slen;
                     83:        u_char *pos;
                     84: 
                     85:        if (!p || AIT_ISEMPTY(p)) {
                     86:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     87:                return NULL;
                     88:        }
                     89: 
                     90:        ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task));
                     91: 
                     92:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
                     93:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     94:                if (n == -1) {
                     95:                        ioSYSERR(0);
                     96:                        break;
                     97:                }
                     98:        }
                     99: 
                    100:        freePkt(&p);
                    101:        return NULL;
                    102: }
                    103: 
                    104: static int
                    105: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
                    106: {
                    107:        regex_t re;
                    108:        regmatch_t match;
                    109:        ait_val_t *p = NULL;
                    110:        struct tagSession *s = NULL;
                    111:        struct tagStore *st_, *st = NULL;
                    112:        char szStr[STRSIZ];
                    113:        int ret;
                    114: 
                    115:        assert(sess);
                    116: 
                    117:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                    118:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
                    119:                        /* check for QoS */
                    120:                        if (st->st_subscr.sub_ret >= qos) {
                    121:                                if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    122:                                        regerror(ret, &re, szStr, sizeof szStr);
                    123:                                        regfree(&re);
                    124:                                        ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    125:                                                        st->st_subscr.sub_topic.msg_base, szStr);
                    126:                                }
                    127:                                if (!regexec(&re, topic, 1, &match, 0)) {
                    128:                                        /* MATCH */
                    129:                                        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    130:                                        schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
                    131:                                }
                    132: 
                    133:                                regfree(&re);
                    134:                        }
                    135:                }
                    136:        }
                    137: 
                    138:        return 0;
                    139: }
                    140: 
                    141: /* --------------------------------------------------- */
                    142: 
                    143: void *
                    144: sendRetain(sched_task_t *task)
                    145: {
                    146:        mqtt_subscr_t *subs, *s;
                    147:        struct tagSession *sess;
                    148:        int siz;
                    149: 
                    150:        ioTRACE(2);
                    151: 
                    152:        assert(task);
                    153: 
                    154:        sess = TASK_ARG(task);
                    155:        assert(sess);
                    156: 
                    157:        if (!sess->sess_buf) {
                    158:                ioDEBUG(9, "WARNING! No allocated buffer!?!\n");
                    159:                return NULL;
                    160:        }
                    161: 
                    162:        subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1);
                    163:        if (!subs)
                    164:                return NULL;
                    165: 
                    166:        for (s = subs; s && s->sub_topic.msg_base; s++) {
                    167:                siz = s->sub_value.msg_len;
                    168:                memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base, 
                    169:                                MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
                    170:                ioDEBUG(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n", 
                    171:                                siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
                    172:                if (siz > 0)
                    173:                        search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
                    174:        }
                    175: 
                    176:        mqtt_subFree(&subs);
                    177:        return NULL;
                    178: }
                    179: 
1.2       misho     180: int
1.3       misho     181: pubWill(struct tagSession * __restrict sess)
                    182: {
                    183:        int datlen;
                    184: 
                    185:        ioTRACE(2);
                    186: 
                    187:        /* prepare will packet */
                    188:        datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0, 
                    189:                        sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
                    190:        if (datlen == -1)
                    191:                return -1;      /* error */
                    192: 
                    193:        return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
                    194: }
                    195: 
                    196: static int
                    197: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
                    198: {
                    199:        return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
                    200: }
                    201: 
                    202: static int
                    203: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
                    204: {
                    205:        struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    206: 
                    207:        /* write topic to database */
                    208:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    209:                        sess->sess_addr, hdr->mqtt_msg.retain);
                    210:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, 
                    211:                        sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, 
                    212:                        hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
                    213: 
                    214:        search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
                    215: 
                    216:        /* delete not retain message */
                    217:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    218:                        sess->sess_addr, 0);
                    219:        return 0;
                    220: }
                    221: 
                    222: static int
                    223: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
                    224: {
                    225:        struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    226: 
                    227:        /* write topic to database */
                    228:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    229:                        sess->sess_addr, hdr->mqtt_msg.retain);
                    230:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, 
                    231:                        sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, 
                    232:                        hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
                    233: 
                    234:        return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
                    235: }
                    236: 
                    237: 
                    238: int
                    239: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     240: {
                    241:        struct mqtthdr *hdr;
1.3       misho     242:        struct tagSession *sess = (struct tagSession*) arg;
                    243:        char szTopic[STRSIZ] = { 0 };
                    244:        int siz = 0;
                    245:        u_short mid = 0;
                    246:        ait_val_t *p = NULL;
1.2       misho     247: 
                    248:        ioTRACE(2);
                    249: 
                    250:        if (!sess)
                    251:                return -1;
                    252: 
1.3       misho     253:        ioDEBUG(5, "Exec PUBLISH session");
                    254:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
                    255:        if (siz == -1) {
                    256:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    257:                return 0;
                    258:        }
                    259: 
1.2       misho     260:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    261:        switch (hdr->mqtt_msg.qos) {
                    262:                case MQTT_QOS_ACK:
1.3       misho     263:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    264:                                return 0;
                    265:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    266:                        if (siz == -1) {
                    267:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    268:                                                mqtt_GetErrno(), mqtt_GetError());
                    269:                                return 0;
                    270:                        }
1.2       misho     271:                        break;
                    272:                case MQTT_QOS_EXACTLY:
1.3       misho     273:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    274:                                return 0;
                    275:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    276:                        if (siz == -1) {
                    277:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    278:                                                mqtt_GetErrno(), mqtt_GetError());
                    279:                                return 0;
                    280:                        }
1.2       misho     281:                        break;
1.3       misho     282:                case MQTT_QOS_ONCE:
                    283:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     284:                default:
                    285:                        return 0;
                    286:        }
                    287: 
1.3       misho     288:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    289:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    290:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     291:        return 0;
                    292: }
1.3       misho     293: 
                    294: int
                    295: cmdPUBREL(void *srv, int len, void *arg)
                    296: {
                    297:        struct tagSession *sess = (struct tagSession*) arg;
                    298:        int siz = 0;
                    299:        u_short mid = 0;
                    300:        ait_val_t *p = NULL;
                    301: 
                    302:        ioTRACE(2);
                    303: 
                    304:        if (!sess)
                    305:                return -1;
                    306: 
                    307:        ioDEBUG(5, "Exec PUBREL session");
                    308:        mid = mqtt_readPUBREL(sess->sess_buf);
                    309:        if (mid == (u_short) -1) {
                    310:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    311:                return 0;
                    312:        }
                    313: 
                    314:        /* delete not retain message */
                    315:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, 
                    316:                        sess->sess_addr, 0);
                    317: 
                    318:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    319:        if (siz == -1) {
                    320:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    321:                return 0;
                    322:        }
                    323: 
                    324:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    325:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    326:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
                    327:        return 0;
                    328: }
                    329: 
                    330: int
                    331: cmdSUBSCRIBE(void *srv, int len, void *arg)
                    332: {
                    333:        struct tagSession *sess = (struct tagSession*) arg;
                    334:        mqtt_subscr_t *subs = NULL;
                    335:        int siz = 0;
                    336:        u_short mid = 0;
                    337:        register int i;
                    338:        struct tagStore *store;
                    339:        char buf[BUFSIZ];
                    340:        void *ptr;
                    341:        ait_val_t *p = NULL;
                    342: 
                    343:        ioTRACE(2);
                    344: 
                    345:        if (!sess)
                    346:                return -1;
                    347: 
                    348:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    349:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    350:        if (siz == -1) {
                    351:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    352:                return 0;
                    353:        }
                    354: 
                    355:        /* add to db */
                    356:        for (i = 0; i < siz; i++) {
                    357:                store = io_malloc(sizeof(struct tagStore));
                    358:                if (!store) {
                    359:                        ioSYSERR(0);
                    360:                        continue;
                    361:                } else {
                    362:                        store->st_msgid = mid;
                    363:                        mqtt_subCopy(&store->st_subscr, &subs[i]);
                    364:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    365:                }
                    366: 
                    367:                /* add to cache */
                    368:                SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    369: 
                    370:                /* convert topic to regexp */
                    371:                if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
                    372:                        ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    373:                } else {
                    374:                        ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    375:                        if (!ptr) {
                    376:                                ioSYSERR(0);
                    377:                                continue;
                    378:                        } else {
                    379:                                store->st_subscr.sub_topic.msg_base = ptr;
                    380:                                store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    381:                                memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    382:                                                store->st_subscr.sub_topic.msg_len);
                    383:                        }
                    384: 
                    385:                        /* store to db */
                    386:                        call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
                    387:                                        sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
                    388:                        /* subscribe pass */
                    389:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    390: 
                    391:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid, 
                    392:                                        store->st_subscr.sub_topic.msg_base, 
                    393:                                        store->st_subscr.sub_topic.msg_len, 
                    394:                                        store->st_subscr.sub_ret, sess->sess_addr);
                    395:                }
                    396:        }
                    397: 
                    398:        /* send acknowledge */
                    399:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    400:        if (siz == -1) {
                    401:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    402:                goto end;
                    403:        } else {
                    404:                p = mkPkt(sess->sess_buf->msg_base, siz);
                    405:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    406:        }
                    407: 
                    408:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
                    409: end:
                    410:        mqtt_subFree(&subs);
                    411:        return 0;
                    412: }
                    413: 
                    414: int
                    415: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
                    416: {
                    417:        struct tagSession *sess = (struct tagSession*) arg;
                    418:        mqtt_subscr_t *subs = NULL;
                    419:        int siz = 0;
                    420:        u_short mid = 0;
                    421:        register int i;
                    422:        struct tagStore *store, *tmp;
                    423:        ait_val_t *p = NULL;
                    424: 
                    425:        ioTRACE(2);
                    426: 
                    427:        if (!sess)
                    428:                return -1;
                    429: 
                    430:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    431:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    432:        if (siz == -1) {
                    433:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    434:                return 0;
                    435:        }
                    436: 
                    437:        /* del from db */
                    438:        for (i = 0; i < siz; i++) {
                    439:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    440:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    441:                                        store->st_subscr.sub_topic.msg_base && 
                    442:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    443:                                                subs[i].sub_topic.msg_base)) {
                    444:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
                    445: 
                    446:                                if (store->st_subscr.sub_topic.msg_base)
                    447:                                        free(store->st_subscr.sub_topic.msg_base);
                    448:                                if (store->st_subscr.sub_value.msg_base)
                    449:                                        free(store->st_subscr.sub_value.msg_base);
                    450:                                io_free(store);
                    451:                        }
                    452:                }
                    453: 
                    454:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    455:                                sess->sess_user, "%");
                    456:        }
                    457: 
                    458:        /* send acknowledge */
                    459:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    460:        if (siz == -1) {
                    461:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    462:                goto end;
                    463:        } else {
                    464:                p = mkPkt(sess->sess_buf->msg_base, siz);
                    465:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    466:        }
                    467: 
                    468:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
                    469: end:
                    470:        mqtt_subFree(&subs);
                    471:        return 0;
                    472: }
                    473: 
                    474: int
                    475: cmdPINGREQ(void *srv, int len, void *arg)
                    476: {
                    477:        struct tagSession *sess = (struct tagSession*) arg;
                    478:        int siz = 0;
                    479:        ait_val_t *p = NULL;
                    480: 
                    481:        ioTRACE(2);
                    482: 
                    483:        if (!sess)
                    484:                return -1;
                    485: 
                    486:        ioDEBUG(5, "Exec PINGREQ session");
                    487:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    488:        if (siz == -1) {
                    489:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    490:                return 0;
                    491:        } else {
                    492:                p = mkPkt(sess->sess_buf->msg_base, siz);
                    493:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    494:        }
                    495: 
                    496:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
                    497:        return 0;
                    498: }
                    499: 
                    500: int
                    501: cmdCONNECT(void *srv, int len, void *arg)
                    502: {
                    503:        struct tagStore *store;
                    504:        struct tagSession *sess = (struct tagSession*) arg;
                    505: 
                    506:        ioTRACE(2);
                    507: 
                    508:        if (!sess)
                    509:                return -1;
                    510: 
                    511:        ioDEBUG(5, "Exec CONNECT session");
                    512:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    513: 
                    514:        schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
                    515: 
                    516:        if (sess->sess_clean) {
                    517:                if (call.FiniSessPUB)
                    518:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    519:                if (call.DeletePUB_subscribe)
                    520:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    521:                if (call.WipePUB_topic)
                    522:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    523:        }
                    524: 
                    525:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    526:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
                    527: 
                    528:                if (store->st_subscr.sub_topic.msg_base)
                    529:                        free(store->st_subscr.sub_topic.msg_base);
                    530:                if (store->st_subscr.sub_value.msg_base)
                    531:                        free(store->st_subscr.sub_value.msg_base);
                    532: 
                    533:                io_free(store);
                    534:        }
                    535: 
                    536:        if (sess->sess_will.flag)
                    537:                pubWill(sess);
                    538: 
                    539:        if (sess->sess_will.msg)
                    540:                free(sess->sess_will.msg);
                    541:        if (sess->sess_will.topic)
                    542:                free(sess->sess_will.topic);
                    543: 
                    544:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    545:                        sess->sess_addr, sess->sess_user);
                    546: 
                    547:        return -3;      /* reconnect client */
                    548: }
                    549: 
                    550: int
                    551: cmdDISCONNECT(void *srv, int len, void *arg)
                    552: {
                    553:        struct tagSession *sess = (struct tagSession*) arg;
                    554: 
                    555:        ioTRACE(2);
                    556: 
                    557:        if (!sess)
                    558:                return -1;
                    559: 
                    560:        ioDEBUG(5, "Exec DISCONNECT session");
                    561: 
                    562:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    563:                        sess->sess_addr, sess->sess_user);
                    564: 
                    565:        return -2;      /* must terminate dispatcher */
                    566: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>