Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.34

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.32  misho       3: #include "utils.h"
1.2.2.1   misho       4: #include "rtlm.h"
1.2       misho       5: #include "mqttd_calls.h"
                      6: 
                      7: 
1.2.2.26  misho       8: static inline ait_val_t *
1.2.2.19  misho       9: mkPkt(void * __restrict data, int dlen)
                     10: {
1.2.2.26  misho      11:        ait_val_t *p = NULL;
1.2.2.19  misho      12: 
1.2.2.26  misho      13:        if (!(p = io_allocVar())) {
1.2.2.19  misho      14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     15:                return NULL;
                     16:        }
                     17: 
                     18:        if (data && dlen > 0)
1.2.2.26  misho      19:                AIT_SET_BUF(p, data, dlen);
1.2.2.19  misho      20: 
                     21:        return p;
                     22: }
                     23: 
                     24: static inline void
1.2.2.26  misho      25: freePkt(ait_val_t ** __restrict p)
1.2.2.19  misho      26: {
1.2.2.20  misho      27:        if (!p)
1.2.2.19  misho      28:                return;
                     29: 
1.2.2.26  misho      30:        io_freeVar(p);
1.2.2.19  misho      31: }
                     32: 
                     33: static void *
                     34: sendPacket(sched_task_t *task)
                     35: {
1.2.2.26  misho      36:        ait_val_t *p = TASK_ARG(task);
1.2.2.19  misho      37:        register int n, slen;
                     38:        u_char *pos;
                     39: 
1.2.2.26  misho      40:        if (!p || AIT_ISEMPTY(p)) {
1.2.2.19  misho      41:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     42:                return NULL;
                     43:        }
                     44: 
1.2.2.33  misho      45:        ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), TASK_FD(task));
                     46: 
1.2.2.26  misho      47:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19  misho      48:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     49:                if (n == -1) {
                     50:                        ioSYSERR(0);
                     51:                        break;
                     52:                }
                     53:        }
                     54: 
1.2.2.26  misho      55:        freePkt(&p);
1.2.2.19  misho      56:        return NULL;
                     57: }
                     58: 
1.2.2.17  misho      59: static int
1.2.2.33  misho      60: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
1.2.2.17  misho      61: {
1.2.2.33  misho      62:        regex_t re;
                     63:        regmatch_t match;
1.2.2.26  misho      64:        ait_val_t *p = NULL;
1.2.2.22  misho      65:        struct tagSession *s = NULL;
1.2.2.30  misho      66:        struct tagStore *st_, *st = NULL;
1.2.2.22  misho      67:        char szStr[STRSIZ];
1.2.2.33  misho      68:        int ret;
1.2.2.22  misho      69: 
1.2.2.33  misho      70:        assert(sess);
1.2.2.30  misho      71: 
1.2.2.22  misho      72:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30  misho      73:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.33  misho      74:                        /* check for QoS */
                     75:                        if (st->st_subscr.sub_ret >= qos) {
                     76:                                if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     77:                                        regerror(ret, &re, szStr, sizeof szStr);
                     78:                                        regfree(&re);
                     79:                                        ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     80:                                                        st->st_subscr.sub_topic.msg_base, szStr);
                     81:                                }
                     82:                                if (!regexec(&re, topic, 1, &match, 0)) {
                     83:                                        /* MATCH */
                     84:                                        p = mkPkt(sess->sess_buf->msg_base, datlen);
                     85:                                        schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
                     86:                                }
                     87: 
1.2.2.22  misho      88:                                regfree(&re);
                     89:                        }
                     90:                }
                     91:        }
                     92: 
1.2.2.17  misho      93:        return 0;
                     94: }
                     95: 
1.2.2.33  misho      96: /* --------------------------------------------------- */
                     97: 
1.2.2.34! misho      98: void *
        !            99: sendRetain(sched_task_t *task)
        !           100: {
        !           101:        mqtt_subscr_t *subs, *s;
        !           102:        struct tagSession *sess;
        !           103:        int siz;
        !           104: 
        !           105:        ioTRACE(2);
        !           106: 
        !           107:        assert(task);
        !           108: 
        !           109:        sess = TASK_ARG(task);
        !           110:        assert(sess);
        !           111: 
        !           112:        if (!sess->sess_buf) {
        !           113:                ioDEBUG(9, "WARNING! No allocated buffer!?!\n");
        !           114:                return NULL;
        !           115:        }
        !           116: 
        !           117:        subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1);
        !           118:        if (!subs)
        !           119:                return NULL;
        !           120: 
        !           121:        for (s = subs; s && s->sub_topic.msg_base; s++) {
        !           122:                siz = s->sub_value.msg_len;
        !           123:                memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base, 
        !           124:                                MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
        !           125:                ioDEBUG(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n", 
        !           126:                                siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
        !           127:                if (siz > 0)
        !           128:                        search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
        !           129:        }
        !           130: 
        !           131:        return NULL;
        !           132: }
        !           133: 
1.2.2.33  misho     134: int
                    135: pubWill(struct tagSession * __restrict sess)
                    136: {
                    137:        int datlen;
                    138: 
                    139:        ioTRACE(2);
                    140: 
                    141:        /* prepare will packet */
                    142:        datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0, 
                    143:                        sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
                    144:        if (datlen == -1)
                    145:                return -1;      /* error */
                    146: 
                    147:        return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
                    148: }
                    149: 
1.2.2.17  misho     150: static int
1.2.2.33  misho     151: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     152: {
1.2.2.33  misho     153:        return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
                    154: }
1.2.2.26  misho     155: 
1.2.2.33  misho     156: static int
                    157: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
                    158: {
                    159:        struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
1.2.2.26  misho     160: 
                    161:        /* write topic to database */
1.2.2.27  misho     162:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    163:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.33  misho     164:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, 
                    165:                        sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, 
                    166:                        hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26  misho     167: 
1.2.2.33  misho     168:        search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
1.2.2.26  misho     169: 
1.2.2.28  misho     170:        /* delete not retain message */
1.2.2.27  misho     171:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    172:                        sess->sess_addr, 0);
1.2.2.17  misho     173:        return 0;
                    174: }
                    175: 
                    176: static int
1.2.2.26  misho     177: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho     178: {
1.2.2.33  misho     179:        struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
1.2.2.28  misho     180: 
                    181:        /* write topic to database */
                    182:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    183:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.33  misho     184:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, 
                    185:                        sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, 
                    186:                        hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.28  misho     187: 
1.2.2.33  misho     188:        return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
1.2.2.17  misho     189: }
                    190: 
                    191: 
1.2       misho     192: int
1.2.2.9   misho     193: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     194: {
                    195:        struct mqtthdr *hdr;
1.2.2.1   misho     196:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     197:        char szTopic[STRSIZ] = { 0 };
                    198:        int siz = 0;
                    199:        u_short mid = 0;
1.2.2.26  misho     200:        ait_val_t *p = NULL;
1.2       misho     201: 
                    202:        ioTRACE(2);
                    203: 
                    204:        if (!sess)
                    205:                return -1;
                    206: 
1.2.2.17  misho     207:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     208:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     209:        if (siz == -1) {
                    210:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    211:                return 0;
                    212:        }
                    213: 
1.2       misho     214:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    215:        switch (hdr->mqtt_msg.qos) {
                    216:                case MQTT_QOS_ACK:
1.2.2.26  misho     217:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    218:                                return 0;
1.2.2.17  misho     219:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    220:                        if (siz == -1) {
                    221:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    222:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     223:                                return 0;
1.2.2.17  misho     224:                        }
1.2       misho     225:                        break;
                    226:                case MQTT_QOS_EXACTLY:
1.2.2.26  misho     227:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    228:                                return 0;
1.2.2.17  misho     229:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    230:                        if (siz == -1) {
                    231:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    232:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     233:                                return 0;
1.2.2.17  misho     234:                        }
1.2       misho     235:                        break;
1.2.2.17  misho     236:                case MQTT_QOS_ONCE:
1.2.2.23  misho     237:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     238:                default:
1.2.2.22  misho     239:                        return 0;
1.2       misho     240:        }
                    241: 
1.2.2.22  misho     242:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    243:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    244:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     245:        return 0;
                    246: }
1.2.2.1   misho     247: 
                    248: int
1.2.2.9   misho     249: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     250: {
                    251:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     252:        int siz = 0;
                    253:        u_short mid = 0;
1.2.2.26  misho     254:        ait_val_t *p = NULL;
1.2.2.1   misho     255: 
                    256:        ioTRACE(2);
                    257: 
                    258:        if (!sess)
                    259:                return -1;
                    260: 
1.2.2.17  misho     261:        ioDEBUG(5, "Exec PUBREL session");
                    262:        mid = mqtt_readPUBREL(sess->sess_buf);
                    263:        if (mid == (u_short) -1) {
                    264:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    265:                return 0;
                    266:        }
                    267: 
1.2.2.29  misho     268:        /* delete not retain message */
                    269:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, 
                    270:                        sess->sess_addr, 0);
1.2.2.17  misho     271: 
                    272:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    273:        if (siz == -1) {
                    274:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    275:                return 0;
                    276:        }
1.2.2.1   misho     277: 
1.2.2.29  misho     278:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    279:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
1.2.2.22  misho     280:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     281:        return 0;
                    282: }
                    283: 
                    284: int
1.2.2.9   misho     285: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     286: {
                    287:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     288:        mqtt_subscr_t *subs = NULL;
                    289:        int siz = 0;
                    290:        u_short mid = 0;
                    291:        register int i;
                    292:        struct tagStore *store;
1.2.2.17  misho     293:        char buf[BUFSIZ];
                    294:        void *ptr;
1.2.2.26  misho     295:        ait_val_t *p = NULL;
1.2.2.1   misho     296: 
                    297:        ioTRACE(2);
                    298: 
                    299:        if (!sess)
                    300:                return -1;
                    301: 
1.2.2.7   misho     302:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    303:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    304:        if (siz == -1) {
                    305:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    306:                return 0;
                    307:        }
                    308: 
                    309:        /* add to db */
1.2.2.33  misho     310:        for (i = 0; i < siz; i++) {
1.2.2.31  misho     311:                store = io_malloc(sizeof(struct tagStore));
                    312:                if (!store) {
                    313:                        ioSYSERR(0);
1.2.2.25  misho     314:                        continue;
1.2.2.31  misho     315:                } else {
                    316:                        store->st_msgid = mid;
                    317:                        mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.33  misho     318:                        subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17  misho     319:                }
1.2.2.31  misho     320: 
                    321:                /* add to cache */
                    322:                SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    323: 
                    324:                /* convert topic to regexp */
                    325:                if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
                    326:                        ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    327:                } else {
                    328:                        ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    329:                        if (!ptr) {
1.2.2.7   misho     330:                                ioSYSERR(0);
1.2.2.25  misho     331:                                continue;
1.2.2.7   misho     332:                        } else {
1.2.2.31  misho     333:                                store->st_subscr.sub_topic.msg_base = ptr;
                    334:                                store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    335:                                memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    336:                                                store->st_subscr.sub_topic.msg_len);
1.2.2.7   misho     337:                        }
                    338: 
1.2.2.33  misho     339:                        /* store to db */
                    340:                        call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
                    341:                                        sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
                    342:                        /* subscribe pass */
1.2.2.31  misho     343:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    344: 
1.2.2.33  misho     345:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid, 
                    346:                                        store->st_subscr.sub_topic.msg_base, 
                    347:                                        store->st_subscr.sub_topic.msg_len, 
                    348:                                        store->st_subscr.sub_ret, sess->sess_addr);
                    349:                }
1.2.2.7   misho     350:        }
1.2.2.1   misho     351: 
1.2.2.7   misho     352:        /* send acknowledge */
                    353:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    354:        if (siz == -1) {
                    355:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    356:                goto end;
1.2.2.21  misho     357:        } else {
                    358:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     359:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    360:        }
1.2.2.21  misho     361: 
                    362:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     363: end:
                    364:        mqtt_subFree(&subs);
1.2.2.1   misho     365:        return 0;
                    366: }
                    367: 
                    368: int
1.2.2.9   misho     369: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     370: {
                    371:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     372:        mqtt_subscr_t *subs = NULL;
                    373:        int siz = 0;
                    374:        u_short mid = 0;
                    375:        register int i;
                    376:        struct tagStore *store, *tmp;
1.2.2.26  misho     377:        ait_val_t *p = NULL;
1.2.2.1   misho     378: 
                    379:        ioTRACE(2);
                    380: 
                    381:        if (!sess)
                    382:                return -1;
                    383: 
1.2.2.13  misho     384:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    385:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    386:        if (siz == -1) {
                    387:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    388:                return 0;
                    389:        }
                    390: 
                    391:        /* del from db */
                    392:        for (i = 0; i < siz; i++) {
                    393:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    394:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    395:                                        store->st_subscr.sub_topic.msg_base && 
                    396:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    397:                                                subs[i].sub_topic.msg_base)) {
                    398:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     399: 
                    400:                                if (store->st_subscr.sub_topic.msg_base)
                    401:                                        free(store->st_subscr.sub_topic.msg_base);
                    402:                                if (store->st_subscr.sub_value.msg_base)
                    403:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     404:                                io_free(store);
1.2.2.13  misho     405:                        }
                    406:                }
                    407: 
1.2.2.15  misho     408:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    409:                                sess->sess_user, "%");
1.2.2.13  misho     410:        }
                    411: 
                    412:        /* send acknowledge */
                    413:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    414:        if (siz == -1) {
                    415:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    416:                goto end;
1.2.2.21  misho     417:        } else {
                    418:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     419:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    420:        }
1.2.2.21  misho     421: 
                    422:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     423: end:
                    424:        mqtt_subFree(&subs);
1.2.2.1   misho     425:        return 0;
                    426: }
                    427: 
                    428: int
1.2.2.9   misho     429: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     430: {
                    431:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     432:        int siz = 0;
1.2.2.26  misho     433:        ait_val_t *p = NULL;
1.2.2.1   misho     434: 
                    435:        ioTRACE(2);
                    436: 
                    437:        if (!sess)
                    438:                return -1;
                    439: 
1.2.2.7   misho     440:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     441:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    442:        if (siz == -1) {
                    443:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    444:                return 0;
1.2.2.8   misho     445:        } else {
1.2.2.19  misho     446:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     447:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    448:        }
1.2.2.1   misho     449: 
1.2.2.19  misho     450:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     451:        return 0;
                    452: }
                    453: 
                    454: int
1.2.2.9   misho     455: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     456: {
                    457:        struct tagStore *store;
                    458:        struct tagSession *sess = (struct tagSession*) arg;
                    459: 
                    460:        ioTRACE(2);
                    461: 
                    462:        if (!sess)
                    463:                return -1;
                    464: 
1.2.2.6   misho     465:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     466:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    467: 
1.2.2.34! misho     468:        schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
        !           469: 
1.2.2.16  misho     470:        if (sess->sess_clean) {
                    471:                if (call.FiniSessPUB)
                    472:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    473:                if (call.DeletePUB_subscribe)
                    474:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    475:                if (call.WipePUB_topic)
                    476:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    477:        }
1.2.2.1   misho     478: 
1.2.2.6   misho     479:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    480:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     481: 
1.2.2.6   misho     482:                if (store->st_subscr.sub_topic.msg_base)
                    483:                        free(store->st_subscr.sub_topic.msg_base);
                    484:                if (store->st_subscr.sub_value.msg_base)
                    485:                        free(store->st_subscr.sub_value.msg_base);
                    486: 
1.2.2.18  misho     487:                io_free(store);
1.2.2.6   misho     488:        }
1.2.2.1   misho     489: 
1.2.2.32  misho     490:        if (sess->sess_will.flag)
1.2.2.33  misho     491:                pubWill(sess);
1.2.2.32  misho     492: 
1.2.2.1   misho     493:        if (sess->sess_will.msg)
                    494:                free(sess->sess_will.msg);
                    495:        if (sess->sess_will.topic)
                    496:                free(sess->sess_will.topic);
                    497: 
                    498:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    499:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     500: 
1.2.2.12  misho     501:        return -3;      /* reconnect client */
1.2.2.1   misho     502: }
                    503: 
                    504: int
1.2.2.9   misho     505: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     506: {
                    507:        struct tagSession *sess = (struct tagSession*) arg;
                    508: 
                    509:        ioTRACE(2);
                    510: 
                    511:        if (!sess)
                    512:                return -1;
                    513: 
1.2.2.5   misho     514:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     515: 
1.2.2.1   misho     516:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    517:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     518: 
1.2.2.10  misho     519:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     520: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>