Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.35

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.32  misho       3: #include "utils.h"
1.2.2.1   misho       4: #include "rtlm.h"
1.2       misho       5: #include "mqttd_calls.h"
                      6: 
                      7: 
1.2.2.26  misho       8: static inline ait_val_t *
1.2.2.19  misho       9: mkPkt(void * __restrict data, int dlen)
                     10: {
1.2.2.26  misho      11:        ait_val_t *p = NULL;
1.2.2.19  misho      12: 
1.2.2.26  misho      13:        if (!(p = io_allocVar())) {
1.2.2.19  misho      14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     15:                return NULL;
                     16:        }
                     17: 
                     18:        if (data && dlen > 0)
1.2.2.26  misho      19:                AIT_SET_BUF(p, data, dlen);
1.2.2.19  misho      20: 
                     21:        return p;
                     22: }
                     23: 
                     24: static inline void
1.2.2.26  misho      25: freePkt(ait_val_t ** __restrict p)
1.2.2.19  misho      26: {
1.2.2.20  misho      27:        if (!p)
1.2.2.19  misho      28:                return;
                     29: 
1.2.2.26  misho      30:        io_freeVar(p);
1.2.2.19  misho      31: }
                     32: 
                     33: static void *
                     34: sendPacket(sched_task_t *task)
                     35: {
1.2.2.26  misho      36:        ait_val_t *p = TASK_ARG(task);
1.2.2.19  misho      37:        register int n, slen;
                     38:        u_char *pos;
                     39: 
1.2.2.26  misho      40:        if (!p || AIT_ISEMPTY(p)) {
1.2.2.19  misho      41:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     42:                return NULL;
                     43:        }
                     44: 
1.2.2.33  misho      45:        ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), TASK_FD(task));
                     46: 
1.2.2.26  misho      47:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19  misho      48:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     49:                if (n == -1) {
                     50:                        ioSYSERR(0);
                     51:                        break;
                     52:                }
                     53:        }
                     54: 
1.2.2.26  misho      55:        freePkt(&p);
1.2.2.19  misho      56:        return NULL;
                     57: }
                     58: 
1.2.2.17  misho      59: static int
1.2.2.33  misho      60: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
1.2.2.17  misho      61: {
1.2.2.33  misho      62:        regex_t re;
                     63:        regmatch_t match;
1.2.2.26  misho      64:        ait_val_t *p = NULL;
1.2.2.22  misho      65:        struct tagSession *s = NULL;
1.2.2.30  misho      66:        struct tagStore *st_, *st = NULL;
1.2.2.22  misho      67:        char szStr[STRSIZ];
1.2.2.33  misho      68:        int ret;
1.2.2.22  misho      69: 
1.2.2.33  misho      70:        assert(sess);
1.2.2.30  misho      71: 
1.2.2.22  misho      72:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30  misho      73:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.33  misho      74:                        /* check for QoS */
                     75:                        if (st->st_subscr.sub_ret >= qos) {
                     76:                                if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     77:                                        regerror(ret, &re, szStr, sizeof szStr);
                     78:                                        regfree(&re);
                     79:                                        ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     80:                                                        st->st_subscr.sub_topic.msg_base, szStr);
                     81:                                }
                     82:                                if (!regexec(&re, topic, 1, &match, 0)) {
                     83:                                        /* MATCH */
                     84:                                        p = mkPkt(sess->sess_buf->msg_base, datlen);
                     85:                                        schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
                     86:                                }
                     87: 
1.2.2.22  misho      88:                                regfree(&re);
                     89:                        }
                     90:                }
                     91:        }
                     92: 
1.2.2.17  misho      93:        return 0;
                     94: }
                     95: 
1.2.2.33  misho      96: /* --------------------------------------------------- */
                     97: 
1.2.2.34  misho      98: void *
                     99: sendRetain(sched_task_t *task)
                    100: {
                    101:        mqtt_subscr_t *subs, *s;
                    102:        struct tagSession *sess;
                    103:        int siz;
                    104: 
                    105:        ioTRACE(2);
                    106: 
                    107:        assert(task);
                    108: 
                    109:        sess = TASK_ARG(task);
                    110:        assert(sess);
                    111: 
                    112:        if (!sess->sess_buf) {
                    113:                ioDEBUG(9, "WARNING! No allocated buffer!?!\n");
                    114:                return NULL;
                    115:        }
                    116: 
                    117:        subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1);
                    118:        if (!subs)
                    119:                return NULL;
                    120: 
                    121:        for (s = subs; s && s->sub_topic.msg_base; s++) {
                    122:                siz = s->sub_value.msg_len;
                    123:                memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base, 
                    124:                                MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
                    125:                ioDEBUG(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n", 
                    126:                                siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
                    127:                if (siz > 0)
                    128:                        search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
                    129:        }
                    130: 
1.2.2.35! misho     131:        mqtt_subFree(&subs);
1.2.2.34  misho     132:        return NULL;
                    133: }
                    134: 
1.2.2.33  misho     135: int
                    136: pubWill(struct tagSession * __restrict sess)
                    137: {
                    138:        int datlen;
                    139: 
                    140:        ioTRACE(2);
                    141: 
                    142:        /* prepare will packet */
                    143:        datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0, 
                    144:                        sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
                    145:        if (datlen == -1)
                    146:                return -1;      /* error */
                    147: 
                    148:        return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
                    149: }
                    150: 
1.2.2.17  misho     151: static int
1.2.2.33  misho     152: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho     153: {
1.2.2.33  misho     154:        return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
                    155: }
1.2.2.26  misho     156: 
1.2.2.33  misho     157: static int
                    158: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
                    159: {
                    160:        struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
1.2.2.26  misho     161: 
                    162:        /* write topic to database */
1.2.2.27  misho     163:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    164:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.33  misho     165:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, 
                    166:                        sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, 
                    167:                        hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26  misho     168: 
1.2.2.33  misho     169:        search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
1.2.2.26  misho     170: 
1.2.2.28  misho     171:        /* delete not retain message */
1.2.2.27  misho     172:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    173:                        sess->sess_addr, 0);
1.2.2.17  misho     174:        return 0;
                    175: }
                    176: 
                    177: static int
1.2.2.26  misho     178: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho     179: {
1.2.2.33  misho     180:        struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
1.2.2.28  misho     181: 
                    182:        /* write topic to database */
                    183:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    184:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.33  misho     185:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, 
                    186:                        sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr, 
                    187:                        hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.28  misho     188: 
1.2.2.33  misho     189:        return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
1.2.2.17  misho     190: }
                    191: 
                    192: 
1.2       misho     193: int
1.2.2.9   misho     194: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     195: {
                    196:        struct mqtthdr *hdr;
1.2.2.1   misho     197:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     198:        char szTopic[STRSIZ] = { 0 };
                    199:        int siz = 0;
                    200:        u_short mid = 0;
1.2.2.26  misho     201:        ait_val_t *p = NULL;
1.2       misho     202: 
                    203:        ioTRACE(2);
                    204: 
                    205:        if (!sess)
                    206:                return -1;
                    207: 
1.2.2.17  misho     208:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     209:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     210:        if (siz == -1) {
                    211:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    212:                return 0;
                    213:        }
                    214: 
1.2       misho     215:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    216:        switch (hdr->mqtt_msg.qos) {
                    217:                case MQTT_QOS_ACK:
1.2.2.26  misho     218:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    219:                                return 0;
1.2.2.17  misho     220:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    221:                        if (siz == -1) {
                    222:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    223:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     224:                                return 0;
1.2.2.17  misho     225:                        }
1.2       misho     226:                        break;
                    227:                case MQTT_QOS_EXACTLY:
1.2.2.26  misho     228:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    229:                                return 0;
1.2.2.17  misho     230:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    231:                        if (siz == -1) {
                    232:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    233:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     234:                                return 0;
1.2.2.17  misho     235:                        }
1.2       misho     236:                        break;
1.2.2.17  misho     237:                case MQTT_QOS_ONCE:
1.2.2.23  misho     238:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     239:                default:
1.2.2.22  misho     240:                        return 0;
1.2       misho     241:        }
                    242: 
1.2.2.22  misho     243:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    244:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    245:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     246:        return 0;
                    247: }
1.2.2.1   misho     248: 
                    249: int
1.2.2.9   misho     250: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     251: {
                    252:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     253:        int siz = 0;
                    254:        u_short mid = 0;
1.2.2.26  misho     255:        ait_val_t *p = NULL;
1.2.2.1   misho     256: 
                    257:        ioTRACE(2);
                    258: 
                    259:        if (!sess)
                    260:                return -1;
                    261: 
1.2.2.17  misho     262:        ioDEBUG(5, "Exec PUBREL session");
                    263:        mid = mqtt_readPUBREL(sess->sess_buf);
                    264:        if (mid == (u_short) -1) {
                    265:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    266:                return 0;
                    267:        }
                    268: 
1.2.2.29  misho     269:        /* delete not retain message */
                    270:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, 
                    271:                        sess->sess_addr, 0);
1.2.2.17  misho     272: 
                    273:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    274:        if (siz == -1) {
                    275:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    276:                return 0;
                    277:        }
1.2.2.1   misho     278: 
1.2.2.29  misho     279:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    280:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
1.2.2.22  misho     281:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     282:        return 0;
                    283: }
                    284: 
                    285: int
1.2.2.9   misho     286: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     287: {
                    288:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     289:        mqtt_subscr_t *subs = NULL;
                    290:        int siz = 0;
                    291:        u_short mid = 0;
                    292:        register int i;
                    293:        struct tagStore *store;
1.2.2.17  misho     294:        char buf[BUFSIZ];
                    295:        void *ptr;
1.2.2.26  misho     296:        ait_val_t *p = NULL;
1.2.2.1   misho     297: 
                    298:        ioTRACE(2);
                    299: 
                    300:        if (!sess)
                    301:                return -1;
                    302: 
1.2.2.7   misho     303:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    304:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    305:        if (siz == -1) {
                    306:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    307:                return 0;
                    308:        }
                    309: 
                    310:        /* add to db */
1.2.2.33  misho     311:        for (i = 0; i < siz; i++) {
1.2.2.31  misho     312:                store = io_malloc(sizeof(struct tagStore));
                    313:                if (!store) {
                    314:                        ioSYSERR(0);
1.2.2.25  misho     315:                        continue;
1.2.2.31  misho     316:                } else {
                    317:                        store->st_msgid = mid;
                    318:                        mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.33  misho     319:                        subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17  misho     320:                }
1.2.2.31  misho     321: 
                    322:                /* add to cache */
                    323:                SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    324: 
                    325:                /* convert topic to regexp */
                    326:                if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
                    327:                        ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    328:                } else {
                    329:                        ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    330:                        if (!ptr) {
1.2.2.7   misho     331:                                ioSYSERR(0);
1.2.2.25  misho     332:                                continue;
1.2.2.7   misho     333:                        } else {
1.2.2.31  misho     334:                                store->st_subscr.sub_topic.msg_base = ptr;
                    335:                                store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    336:                                memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    337:                                                store->st_subscr.sub_topic.msg_len);
1.2.2.7   misho     338:                        }
                    339: 
1.2.2.33  misho     340:                        /* store to db */
                    341:                        call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
                    342:                                        sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
                    343:                        /* subscribe pass */
1.2.2.31  misho     344:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    345: 
1.2.2.33  misho     346:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid, 
                    347:                                        store->st_subscr.sub_topic.msg_base, 
                    348:                                        store->st_subscr.sub_topic.msg_len, 
                    349:                                        store->st_subscr.sub_ret, sess->sess_addr);
                    350:                }
1.2.2.7   misho     351:        }
1.2.2.1   misho     352: 
1.2.2.7   misho     353:        /* send acknowledge */
                    354:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    355:        if (siz == -1) {
                    356:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    357:                goto end;
1.2.2.21  misho     358:        } else {
                    359:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     360:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    361:        }
1.2.2.21  misho     362: 
                    363:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     364: end:
                    365:        mqtt_subFree(&subs);
1.2.2.1   misho     366:        return 0;
                    367: }
                    368: 
                    369: int
1.2.2.9   misho     370: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     371: {
                    372:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     373:        mqtt_subscr_t *subs = NULL;
                    374:        int siz = 0;
                    375:        u_short mid = 0;
                    376:        register int i;
                    377:        struct tagStore *store, *tmp;
1.2.2.26  misho     378:        ait_val_t *p = NULL;
1.2.2.1   misho     379: 
                    380:        ioTRACE(2);
                    381: 
                    382:        if (!sess)
                    383:                return -1;
                    384: 
1.2.2.13  misho     385:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    386:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    387:        if (siz == -1) {
                    388:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    389:                return 0;
                    390:        }
                    391: 
                    392:        /* del from db */
                    393:        for (i = 0; i < siz; i++) {
                    394:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    395:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    396:                                        store->st_subscr.sub_topic.msg_base && 
                    397:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    398:                                                subs[i].sub_topic.msg_base)) {
                    399:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     400: 
                    401:                                if (store->st_subscr.sub_topic.msg_base)
                    402:                                        free(store->st_subscr.sub_topic.msg_base);
                    403:                                if (store->st_subscr.sub_value.msg_base)
                    404:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     405:                                io_free(store);
1.2.2.13  misho     406:                        }
                    407:                }
                    408: 
1.2.2.15  misho     409:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    410:                                sess->sess_user, "%");
1.2.2.13  misho     411:        }
                    412: 
                    413:        /* send acknowledge */
                    414:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    415:        if (siz == -1) {
                    416:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    417:                goto end;
1.2.2.21  misho     418:        } else {
                    419:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     420:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    421:        }
1.2.2.21  misho     422: 
                    423:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     424: end:
                    425:        mqtt_subFree(&subs);
1.2.2.1   misho     426:        return 0;
                    427: }
                    428: 
                    429: int
1.2.2.9   misho     430: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     431: {
                    432:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     433:        int siz = 0;
1.2.2.26  misho     434:        ait_val_t *p = NULL;
1.2.2.1   misho     435: 
                    436:        ioTRACE(2);
                    437: 
                    438:        if (!sess)
                    439:                return -1;
                    440: 
1.2.2.7   misho     441:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     442:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    443:        if (siz == -1) {
                    444:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    445:                return 0;
1.2.2.8   misho     446:        } else {
1.2.2.19  misho     447:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     448:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    449:        }
1.2.2.1   misho     450: 
1.2.2.19  misho     451:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     452:        return 0;
                    453: }
                    454: 
                    455: int
1.2.2.9   misho     456: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     457: {
                    458:        struct tagStore *store;
                    459:        struct tagSession *sess = (struct tagSession*) arg;
                    460: 
                    461:        ioTRACE(2);
                    462: 
                    463:        if (!sess)
                    464:                return -1;
                    465: 
1.2.2.6   misho     466:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     467:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    468: 
1.2.2.34  misho     469:        schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
                    470: 
1.2.2.16  misho     471:        if (sess->sess_clean) {
                    472:                if (call.FiniSessPUB)
                    473:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    474:                if (call.DeletePUB_subscribe)
                    475:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    476:                if (call.WipePUB_topic)
                    477:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    478:        }
1.2.2.1   misho     479: 
1.2.2.6   misho     480:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    481:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     482: 
1.2.2.6   misho     483:                if (store->st_subscr.sub_topic.msg_base)
                    484:                        free(store->st_subscr.sub_topic.msg_base);
                    485:                if (store->st_subscr.sub_value.msg_base)
                    486:                        free(store->st_subscr.sub_value.msg_base);
                    487: 
1.2.2.18  misho     488:                io_free(store);
1.2.2.6   misho     489:        }
1.2.2.1   misho     490: 
1.2.2.32  misho     491:        if (sess->sess_will.flag)
1.2.2.33  misho     492:                pubWill(sess);
1.2.2.32  misho     493: 
1.2.2.1   misho     494:        if (sess->sess_will.msg)
                    495:                free(sess->sess_will.msg);
                    496:        if (sess->sess_will.topic)
                    497:                free(sess->sess_will.topic);
                    498: 
                    499:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    500:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     501: 
1.2.2.12  misho     502:        return -3;      /* reconnect client */
1.2.2.1   misho     503: }
                    504: 
                    505: int
1.2.2.9   misho     506: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     507: {
                    508:        struct tagSession *sess = (struct tagSession*) arg;
                    509: 
                    510:        ioTRACE(2);
                    511: 
                    512:        if (!sess)
                    513:                return -1;
                    514: 
1.2.2.5   misho     515:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     516: 
1.2.2.1   misho     517:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    518:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     519: 
1.2.2.10  misho     520:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     521: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>