Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.32

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.32! misho       3: #include "utils.h"
1.2.2.1   misho       4: #include "rtlm.h"
1.2       misho       5: #include "mqttd_calls.h"
                      6: 
                      7: 
1.2.2.26  misho       8: static inline ait_val_t *
1.2.2.19  misho       9: mkPkt(void * __restrict data, int dlen)
                     10: {
1.2.2.26  misho      11:        ait_val_t *p = NULL;
1.2.2.19  misho      12: 
1.2.2.26  misho      13:        if (!(p = io_allocVar())) {
1.2.2.19  misho      14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     15:                return NULL;
                     16:        }
                     17: 
                     18:        if (data && dlen > 0)
1.2.2.26  misho      19:                AIT_SET_BUF(p, data, dlen);
1.2.2.19  misho      20: 
                     21:        return p;
                     22: }
                     23: 
                     24: static inline void
1.2.2.26  misho      25: freePkt(ait_val_t ** __restrict p)
1.2.2.19  misho      26: {
1.2.2.20  misho      27:        if (!p)
1.2.2.19  misho      28:                return;
                     29: 
1.2.2.26  misho      30:        io_freeVar(p);
1.2.2.19  misho      31: }
                     32: 
                     33: static void *
                     34: sendPacket(sched_task_t *task)
                     35: {
1.2.2.26  misho      36:        ait_val_t *p = TASK_ARG(task);
1.2.2.19  misho      37:        register int n, slen;
                     38:        u_char *pos;
                     39: 
1.2.2.26  misho      40:        if (!p || AIT_ISEMPTY(p)) {
1.2.2.19  misho      41:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     42:                return NULL;
                     43:        }
                     44: 
1.2.2.26  misho      45:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19  misho      46:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     47:                if (n == -1) {
                     48:                        ioSYSERR(0);
                     49:                        break;
                     50:                }
                     51:        }
                     52: 
1.2.2.26  misho      53:        freePkt(&p);
1.2.2.19  misho      54:        return NULL;
                     55: }
                     56: 
1.2.2.22  misho      57: /* --------------------------------------------------- */
1.2.2.19  misho      58: 
1.2.2.17  misho      59: static int
1.2.2.22  misho      60: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      61: {
1.2.2.26  misho      62:        ait_val_t *p = NULL;
1.2.2.22  misho      63:        struct tagSession *s = NULL;
1.2.2.30  misho      64:        struct tagStore *st_, *st = NULL;
1.2.2.22  misho      65:        regex_t re;
                     66:        regmatch_t match;
                     67:        int ret;
                     68:        char szStr[STRSIZ];
                     69: 
1.2.2.30  misho      70: 
1.2.2.22  misho      71:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30  misho      72:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.22  misho      73:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     74:                                regerror(ret, &re, szStr, sizeof szStr);
                     75:                                regfree(&re);
                     76:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     77:                                                st->st_subscr.sub_topic.msg_base, szStr);
                     78:                        }
                     79:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                     80:                                /* MATCH */
                     81:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23  misho      82:                                schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22  misho      83:                        }
                     84: 
                     85:                        regfree(&re);
                     86:                }
                     87:        }
                     88: 
1.2.2.17  misho      89:        return 0;
                     90: }
                     91: 
                     92: static int
1.2.2.26  misho      93: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho      94: {
1.2.2.26  misho      95:        ait_val_t *p = NULL;
                     96:        struct tagSession *s = NULL;
1.2.2.30  misho      97:        struct tagStore *st_, *st = NULL;
1.2.2.26  misho      98:        regex_t re;
                     99:        regmatch_t match;
1.2.2.28  misho     100:        int ret;
1.2.2.26  misho     101:        char szStr[STRSIZ];
                    102:        struct mqtthdr *hdr;
                    103: 
                    104:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    105:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    106: 
                    107:        /* write topic to database */
1.2.2.27  misho     108:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    109:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.26  misho     110:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
1.2.2.27  misho     111:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26  misho     112: 
                    113:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30  misho     114:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.28  misho     115:                        /* check for QoS */
                    116:                        if (st->st_subscr.sub_ret < MQTT_QOS_ACK)
                    117:                                continue;
                    118: 
1.2.2.26  misho     119:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    120:                                regerror(ret, &re, szStr, sizeof szStr);
                    121:                                regfree(&re);
                    122:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    123:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    124:                        }
                    125:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
1.2.2.28  misho     126:                                /* MATCH */
                    127:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
                    128:                                                s->sess_sock, NULL, 0);
1.2.2.26  misho     129:                        }
                    130: 
                    131:                        regfree(&re);
                    132:                }
                    133:        }
                    134: 
1.2.2.28  misho     135:        /* delete not retain message */
1.2.2.27  misho     136:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    137:                        sess->sess_addr, 0);
1.2.2.28  misho     138: 
                    139:        freePkt(&p);
1.2.2.17  misho     140:        return 0;
                    141: }
                    142: 
                    143: static int
1.2.2.26  misho     144: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho     145: {
1.2.2.28  misho     146:        ait_val_t *p = NULL;
                    147:        struct tagSession *s = NULL;
1.2.2.30  misho     148:        struct tagStore *st_, *st = NULL;
1.2.2.28  misho     149:        regex_t re;
                    150:        regmatch_t match;
                    151:        int ret;
                    152:        char szStr[STRSIZ];
                    153:        struct mqtthdr *hdr;
                    154: 
                    155:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    156:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    157: 
                    158:        /* write topic to database */
                    159:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    160:                        sess->sess_addr, hdr->mqtt_msg.retain);
                    161:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
                    162:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
                    163: 
                    164:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30  misho     165:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.28  misho     166:                        /* check for QoS */
                    167:                        if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY)
                    168:                                continue;
                    169: 
                    170:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    171:                                regerror(ret, &re, szStr, sizeof szStr);
                    172:                                regfree(&re);
                    173:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    174:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    175:                        }
                    176:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                    177:                                /* MATCH */
                    178:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
                    179:                                                s->sess_sock, NULL, 0);
                    180:                        }
                    181: 
                    182:                        regfree(&re);
                    183:                }
                    184:        }
                    185: 
                    186:        freePkt(&p);
1.2.2.17  misho     187:        return 0;
                    188: }
                    189: 
                    190: 
1.2       misho     191: int
1.2.2.9   misho     192: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     193: {
                    194:        struct mqtthdr *hdr;
1.2.2.1   misho     195:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     196:        char szTopic[STRSIZ] = { 0 };
                    197:        int siz = 0;
                    198:        u_short mid = 0;
1.2.2.26  misho     199:        ait_val_t *p = NULL;
1.2       misho     200: 
                    201:        ioTRACE(2);
                    202: 
                    203:        if (!sess)
                    204:                return -1;
                    205: 
1.2.2.17  misho     206:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     207:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     208:        if (siz == -1) {
                    209:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    210:                return 0;
                    211:        }
                    212: 
1.2       misho     213:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    214:        switch (hdr->mqtt_msg.qos) {
                    215:                case MQTT_QOS_ACK:
1.2.2.26  misho     216:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    217:                                return 0;
1.2.2.17  misho     218:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    219:                        if (siz == -1) {
                    220:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    221:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     222:                                return 0;
1.2.2.17  misho     223:                        }
1.2       misho     224:                        break;
                    225:                case MQTT_QOS_EXACTLY:
1.2.2.26  misho     226:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    227:                                return 0;
1.2.2.17  misho     228:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    229:                        if (siz == -1) {
                    230:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    231:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     232:                                return 0;
1.2.2.17  misho     233:                        }
1.2       misho     234:                        break;
1.2.2.17  misho     235:                case MQTT_QOS_ONCE:
1.2.2.23  misho     236:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     237:                default:
1.2.2.22  misho     238:                        return 0;
1.2       misho     239:        }
                    240: 
1.2.2.22  misho     241:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    242:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    243:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     244:        return 0;
                    245: }
1.2.2.1   misho     246: 
                    247: int
1.2.2.9   misho     248: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     249: {
                    250:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     251:        int siz = 0;
                    252:        u_short mid = 0;
1.2.2.26  misho     253:        ait_val_t *p = NULL;
1.2.2.1   misho     254: 
                    255:        ioTRACE(2);
                    256: 
                    257:        if (!sess)
                    258:                return -1;
                    259: 
1.2.2.17  misho     260:        ioDEBUG(5, "Exec PUBREL session");
                    261:        mid = mqtt_readPUBREL(sess->sess_buf);
                    262:        if (mid == (u_short) -1) {
                    263:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    264:                return 0;
                    265:        }
                    266: 
1.2.2.29  misho     267:        /* delete not retain message */
                    268:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, 
                    269:                        sess->sess_addr, 0);
1.2.2.17  misho     270: 
                    271:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    272:        if (siz == -1) {
                    273:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    274:                return 0;
                    275:        }
1.2.2.1   misho     276: 
1.2.2.29  misho     277:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    278:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
1.2.2.22  misho     279:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     280:        return 0;
                    281: }
                    282: 
                    283: int
1.2.2.9   misho     284: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     285: {
                    286:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     287:        mqtt_subscr_t *subs = NULL;
                    288:        int siz = 0;
                    289:        u_short mid = 0;
                    290:        register int i;
                    291:        struct tagStore *store;
1.2.2.17  misho     292:        char buf[BUFSIZ];
                    293:        void *ptr;
1.2.2.26  misho     294:        ait_val_t *p = NULL;
1.2.2.1   misho     295: 
                    296:        ioTRACE(2);
                    297: 
                    298:        if (!sess)
                    299:                return -1;
                    300: 
1.2.2.7   misho     301:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    302:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    303:        if (siz == -1) {
                    304:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    305:                return 0;
                    306:        }
                    307: 
                    308:        /* add to db */
1.2.2.25  misho     309:        for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.31  misho     310:                store = io_malloc(sizeof(struct tagStore));
                    311:                if (!store) {
                    312:                        ioSYSERR(0);
1.2.2.25  misho     313:                        continue;
1.2.2.31  misho     314:                } else {
                    315:                        store->st_msgid = mid;
                    316:                        mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.17  misho     317:                }
1.2.2.31  misho     318: 
                    319:                /* add to cache */
                    320:                SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    321: 
                    322:                /* convert topic to regexp */
                    323:                if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
                    324:                        ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    325: 
                    326:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    327:                } else {
                    328:                        ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    329:                        if (!ptr) {
1.2.2.7   misho     330:                                ioSYSERR(0);
1.2.2.25  misho     331:                                continue;
1.2.2.7   misho     332:                        } else {
1.2.2.31  misho     333:                                store->st_subscr.sub_topic.msg_base = ptr;
                    334:                                store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    335:                                memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    336:                                                store->st_subscr.sub_topic.msg_len);
1.2.2.7   misho     337:                        }
                    338: 
1.2.2.31  misho     339:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    340:                                        store->st_subscr.sub_topic.msg_base, 
                    341:                                        store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.7   misho     342: 
1.2.2.31  misho     343:                        subs[i].sub_ret = MQTT_QOS_PASS;
1.2.2.25  misho     344:                }
1.2.2.31  misho     345: 
                    346:                call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
                    347:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret);
1.2.2.7   misho     348:        }
1.2.2.1   misho     349: 
1.2.2.7   misho     350:        /* send acknowledge */
                    351:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    352:        if (siz == -1) {
                    353:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    354:                goto end;
1.2.2.21  misho     355:        } else {
                    356:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     357:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    358:        }
1.2.2.21  misho     359: 
                    360:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     361: end:
                    362:        mqtt_subFree(&subs);
1.2.2.1   misho     363:        return 0;
                    364: }
                    365: 
                    366: int
1.2.2.9   misho     367: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     368: {
                    369:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     370:        mqtt_subscr_t *subs = NULL;
                    371:        int siz = 0;
                    372:        u_short mid = 0;
                    373:        register int i;
                    374:        struct tagStore *store, *tmp;
1.2.2.26  misho     375:        ait_val_t *p = NULL;
1.2.2.1   misho     376: 
                    377:        ioTRACE(2);
                    378: 
                    379:        if (!sess)
                    380:                return -1;
                    381: 
1.2.2.13  misho     382:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    383:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    384:        if (siz == -1) {
                    385:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    386:                return 0;
                    387:        }
                    388: 
                    389:        /* del from db */
                    390:        for (i = 0; i < siz; i++) {
                    391:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    392:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    393:                                        store->st_subscr.sub_topic.msg_base && 
                    394:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    395:                                                subs[i].sub_topic.msg_base)) {
                    396:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     397: 
                    398:                                if (store->st_subscr.sub_topic.msg_base)
                    399:                                        free(store->st_subscr.sub_topic.msg_base);
                    400:                                if (store->st_subscr.sub_value.msg_base)
                    401:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     402:                                io_free(store);
1.2.2.13  misho     403:                        }
                    404:                }
                    405: 
1.2.2.15  misho     406:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    407:                                sess->sess_user, "%");
1.2.2.13  misho     408:        }
                    409: 
                    410:        /* send acknowledge */
                    411:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    412:        if (siz == -1) {
                    413:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    414:                goto end;
1.2.2.21  misho     415:        } else {
                    416:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     417:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    418:        }
1.2.2.21  misho     419: 
                    420:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     421: end:
                    422:        mqtt_subFree(&subs);
1.2.2.1   misho     423:        return 0;
                    424: }
                    425: 
                    426: int
1.2.2.9   misho     427: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     428: {
                    429:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     430:        int siz = 0;
1.2.2.26  misho     431:        ait_val_t *p = NULL;
1.2.2.1   misho     432: 
                    433:        ioTRACE(2);
                    434: 
                    435:        if (!sess)
                    436:                return -1;
                    437: 
1.2.2.7   misho     438:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     439:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    440:        if (siz == -1) {
                    441:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    442:                return 0;
1.2.2.8   misho     443:        } else {
1.2.2.19  misho     444:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     445:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    446:        }
1.2.2.1   misho     447: 
1.2.2.19  misho     448:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     449:        return 0;
                    450: }
                    451: 
                    452: int
1.2.2.9   misho     453: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     454: {
                    455:        struct tagStore *store;
                    456:        struct tagSession *sess = (struct tagSession*) arg;
                    457: 
                    458:        ioTRACE(2);
                    459: 
                    460:        if (!sess)
                    461:                return -1;
                    462: 
1.2.2.6   misho     463:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     464:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    465: 
1.2.2.16  misho     466:        if (sess->sess_clean) {
                    467:                if (call.FiniSessPUB)
                    468:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    469:                if (call.DeletePUB_subscribe)
                    470:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    471:                if (call.WipePUB_topic)
                    472:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    473:        }
1.2.2.1   misho     474: 
1.2.2.6   misho     475:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    476:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     477: 
1.2.2.6   misho     478:                if (store->st_subscr.sub_topic.msg_base)
                    479:                        free(store->st_subscr.sub_topic.msg_base);
                    480:                if (store->st_subscr.sub_value.msg_base)
                    481:                        free(store->st_subscr.sub_value.msg_base);
                    482: 
1.2.2.18  misho     483:                io_free(store);
1.2.2.6   misho     484:        }
1.2.2.1   misho     485: 
1.2.2.32! misho     486:        if (sess->sess_will.flag)
        !           487:                srv_Will(sess);
        !           488: 
1.2.2.1   misho     489:        if (sess->sess_will.msg)
                    490:                free(sess->sess_will.msg);
                    491:        if (sess->sess_will.topic)
                    492:                free(sess->sess_will.topic);
                    493: 
                    494:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    495:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     496: 
1.2.2.12  misho     497:        return -3;      /* reconnect client */
1.2.2.1   misho     498: }
                    499: 
                    500: int
1.2.2.9   misho     501: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     502: {
                    503:        struct tagSession *sess = (struct tagSession*) arg;
                    504: 
                    505:        ioTRACE(2);
                    506: 
                    507:        if (!sess)
                    508:                return -1;
                    509: 
1.2.2.5   misho     510:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     511: 
1.2.2.1   misho     512:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    513:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     514: 
1.2.2.10  misho     515:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     516: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>