Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.28

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.26  misho       7: static inline ait_val_t *
1.2.2.19  misho       8: mkPkt(void * __restrict data, int dlen)
                      9: {
1.2.2.26  misho      10:        ait_val_t *p = NULL;
1.2.2.19  misho      11: 
1.2.2.26  misho      12:        if (!(p = io_allocVar())) {
1.2.2.19  misho      13:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     14:                return NULL;
                     15:        }
                     16: 
                     17:        if (data && dlen > 0)
1.2.2.26  misho      18:                AIT_SET_BUF(p, data, dlen);
1.2.2.19  misho      19: 
                     20:        return p;
                     21: }
                     22: 
                     23: static inline void
1.2.2.26  misho      24: freePkt(ait_val_t ** __restrict p)
1.2.2.19  misho      25: {
1.2.2.20  misho      26:        if (!p)
1.2.2.19  misho      27:                return;
                     28: 
1.2.2.26  misho      29:        io_freeVar(p);
1.2.2.19  misho      30: }
                     31: 
                     32: static void *
                     33: sendPacket(sched_task_t *task)
                     34: {
1.2.2.26  misho      35:        ait_val_t *p = TASK_ARG(task);
1.2.2.19  misho      36:        register int n, slen;
                     37:        u_char *pos;
                     38: 
1.2.2.26  misho      39:        if (!p || AIT_ISEMPTY(p)) {
1.2.2.19  misho      40:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     41:                return NULL;
                     42:        }
                     43: 
1.2.2.26  misho      44:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19  misho      45:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     46:                if (n == -1) {
                     47:                        ioSYSERR(0);
                     48:                        break;
                     49:                }
                     50:        }
                     51: 
1.2.2.26  misho      52:        freePkt(&p);
1.2.2.19  misho      53:        return NULL;
                     54: }
                     55: 
1.2.2.22  misho      56: /* --------------------------------------------------- */
1.2.2.19  misho      57: 
1.2.2.17  misho      58: static int
1.2.2.22  misho      59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      60: {
1.2.2.26  misho      61:        ait_val_t *p = NULL;
1.2.2.22  misho      62:        struct tagSession *s = NULL;
                     63:        struct tagStore *st = NULL;
                     64:        regex_t re;
                     65:        regmatch_t match;
                     66:        int ret;
                     67:        char szStr[STRSIZ];
                     68: 
                     69:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                     70:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
                     71:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     72:                                regerror(ret, &re, szStr, sizeof szStr);
                     73:                                regfree(&re);
                     74:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     75:                                                st->st_subscr.sub_topic.msg_base, szStr);
                     76:                        }
                     77:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                     78:                                /* MATCH */
                     79:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23  misho      80:                                schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22  misho      81:                        }
                     82: 
                     83:                        regfree(&re);
                     84:                }
                     85:        }
                     86: 
1.2.2.17  misho      87:        return 0;
                     88: }
                     89: 
                     90: static int
1.2.2.26  misho      91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho      92: {
1.2.2.26  misho      93:        ait_val_t *p = NULL;
                     94:        struct tagSession *s = NULL;
                     95:        struct tagStore *st = NULL;
                     96:        regex_t re;
                     97:        regmatch_t match;
1.2.2.28! misho      98:        int ret;
1.2.2.26  misho      99:        char szStr[STRSIZ];
                    100:        struct mqtthdr *hdr;
                    101: 
                    102:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    103:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    104: 
                    105:        /* write topic to database */
1.2.2.27  misho     106:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    107:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.26  misho     108:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
1.2.2.27  misho     109:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26  misho     110: 
                    111:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                    112:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
1.2.2.28! misho     113:                        /* check for QoS */
        !           114:                        if (st->st_subscr.sub_ret < MQTT_QOS_ACK)
        !           115:                                continue;
        !           116: 
1.2.2.26  misho     117:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    118:                                regerror(ret, &re, szStr, sizeof szStr);
                    119:                                regfree(&re);
                    120:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    121:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    122:                        }
                    123:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
1.2.2.28! misho     124:                                /* MATCH */
        !           125:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
        !           126:                                                s->sess_sock, NULL, 0);
1.2.2.26  misho     127:                        }
                    128: 
                    129:                        regfree(&re);
                    130:                }
                    131:        }
                    132: 
1.2.2.28! misho     133:        /* delete not retain message */
1.2.2.27  misho     134:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    135:                        sess->sess_addr, 0);
1.2.2.28! misho     136: 
        !           137:        freePkt(&p);
1.2.2.17  misho     138:        return 0;
                    139: }
                    140: 
                    141: static int
1.2.2.26  misho     142: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho     143: {
1.2.2.28! misho     144:        ait_val_t *p = NULL;
        !           145:        struct tagSession *s = NULL;
        !           146:        struct tagStore *st = NULL;
        !           147:        regex_t re;
        !           148:        regmatch_t match;
        !           149:        int ret;
        !           150:        char szStr[STRSIZ];
        !           151:        struct mqtthdr *hdr;
        !           152: 
        !           153:        p = mkPkt(sess->sess_buf->msg_base, datlen);
        !           154:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
        !           155: 
        !           156:        /* write topic to database */
        !           157:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
        !           158:                        sess->sess_addr, hdr->mqtt_msg.retain);
        !           159:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
        !           160:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
        !           161: 
        !           162:        TAILQ_FOREACH(s, &Sessions, sess_node) {
        !           163:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
        !           164:                        /* check for QoS */
        !           165:                        if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY)
        !           166:                                continue;
        !           167: 
        !           168:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
        !           169:                                regerror(ret, &re, szStr, sizeof szStr);
        !           170:                                regfree(&re);
        !           171:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
        !           172:                                                st->st_subscr.sub_topic.msg_base, szStr);
        !           173:                        }
        !           174:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
        !           175:                                /* MATCH */
        !           176:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
        !           177:                                                s->sess_sock, NULL, 0);
        !           178:                        }
        !           179: 
        !           180:                        regfree(&re);
        !           181:                }
        !           182:        }
        !           183: 
        !           184:        freePkt(&p);
1.2.2.17  misho     185:        return 0;
                    186: }
                    187: 
                    188: 
1.2       misho     189: int
1.2.2.9   misho     190: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     191: {
                    192:        struct mqtthdr *hdr;
1.2.2.1   misho     193:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     194:        char szTopic[STRSIZ] = { 0 };
                    195:        int siz = 0;
                    196:        u_short mid = 0;
1.2.2.26  misho     197:        ait_val_t *p = NULL;
1.2       misho     198: 
                    199:        ioTRACE(2);
                    200: 
                    201:        if (!sess)
                    202:                return -1;
                    203: 
1.2.2.17  misho     204:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     205:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     206:        if (siz == -1) {
                    207:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    208:                return 0;
                    209:        }
                    210: 
1.2       misho     211:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    212:        switch (hdr->mqtt_msg.qos) {
                    213:                case MQTT_QOS_ACK:
1.2.2.26  misho     214:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    215:                                return 0;
1.2.2.17  misho     216:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    217:                        if (siz == -1) {
                    218:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    219:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     220:                                return 0;
1.2.2.17  misho     221:                        }
1.2       misho     222:                        break;
                    223:                case MQTT_QOS_EXACTLY:
1.2.2.26  misho     224:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    225:                                return 0;
1.2.2.17  misho     226:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    227:                        if (siz == -1) {
                    228:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    229:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     230:                                return 0;
1.2.2.17  misho     231:                        }
1.2       misho     232:                        break;
1.2.2.17  misho     233:                case MQTT_QOS_ONCE:
1.2.2.23  misho     234:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     235:                default:
1.2.2.22  misho     236:                        return 0;
1.2       misho     237:        }
                    238: 
1.2.2.22  misho     239:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    240:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    241:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     242:        return 0;
                    243: }
1.2.2.1   misho     244: 
                    245: int
1.2.2.9   misho     246: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     247: {
                    248:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     249:        int siz = 0;
                    250:        u_short mid = 0;
1.2.2.26  misho     251:        ait_val_t *p = NULL;
1.2.2.1   misho     252: 
                    253:        ioTRACE(2);
                    254: 
                    255:        if (!sess)
                    256:                return -1;
                    257: 
1.2.2.17  misho     258:        ioDEBUG(5, "Exec PUBREL session");
                    259:        mid = mqtt_readPUBREL(sess->sess_buf);
                    260:        if (mid == (u_short) -1) {
                    261:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    262:                return 0;
                    263:        }
                    264: 
                    265:        // TODO:: Delete from database topic
                    266: 
                    267:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    268:        if (siz == -1) {
                    269:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    270:                return 0;
1.2.2.22  misho     271:        } else {
                    272:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17  misho     273:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    274:        }
1.2.2.1   misho     275: 
1.2.2.22  misho     276:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     277:        return 0;
                    278: }
                    279: 
                    280: int
1.2.2.9   misho     281: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     282: {
                    283:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     284:        mqtt_subscr_t *subs = NULL;
                    285:        int siz = 0;
                    286:        u_short mid = 0;
                    287:        register int i;
                    288:        struct tagStore *store;
1.2.2.17  misho     289:        char buf[BUFSIZ];
                    290:        void *ptr;
1.2.2.26  misho     291:        ait_val_t *p = NULL;
1.2.2.1   misho     292: 
                    293:        ioTRACE(2);
                    294: 
                    295:        if (!sess)
                    296:                return -1;
                    297: 
1.2.2.7   misho     298:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    299:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    300:        if (siz == -1) {
                    301:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    302:                return 0;
                    303:        }
                    304: 
                    305:        /* add to db */
1.2.2.25  misho     306:        for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.17  misho     307:                /* convert topic to sql search statement */
                    308:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    309:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.25  misho     310:                        continue;
1.2.2.17  misho     311:                }
                    312:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     313:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     314:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     315:                        if (!store) {
                    316:                                ioSYSERR(0);
1.2.2.25  misho     317:                                continue;
1.2.2.7   misho     318:                        } else {
                    319:                                store->st_msgid = mid;
1.2.2.8   misho     320:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     321:                        }
                    322: 
                    323:                        /* add to cache */
                    324:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    325: 
1.2.2.17  misho     326:                        /* convert topic to regexp */
1.2.2.24  misho     327:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.2.2.17  misho     328:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.24  misho     329: 
                    330:                                subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17  misho     331:                        } else {
                    332:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
1.2.2.25  misho     333:                                if (!ptr)
1.2.2.17  misho     334:                                        ioSYSERR(0);
1.2.2.25  misho     335:                                else {
1.2.2.17  misho     336:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    337:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    338:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    339:                                                        store->st_subscr.sub_topic.msg_len);
                    340:                                }
                    341: 
1.2.2.24  misho     342:                                call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    343:                                                store->st_subscr.sub_topic.msg_base, 
                    344:                                                store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.17  misho     345: 
1.2.2.24  misho     346:                                subs[i].sub_ret = MQTT_QOS_PASS;
                    347:                        }
1.2.2.25  misho     348:                }
1.2.2.7   misho     349:        }
1.2.2.1   misho     350: 
1.2.2.7   misho     351:        /* send acknowledge */
                    352:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    353:        if (siz == -1) {
                    354:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    355:                goto end;
1.2.2.21  misho     356:        } else {
                    357:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     358:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    359:        }
1.2.2.21  misho     360: 
                    361:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     362: end:
                    363:        mqtt_subFree(&subs);
1.2.2.1   misho     364:        return 0;
                    365: }
                    366: 
                    367: int
1.2.2.9   misho     368: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     369: {
                    370:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     371:        mqtt_subscr_t *subs = NULL;
                    372:        int siz = 0;
                    373:        u_short mid = 0;
                    374:        register int i;
                    375:        struct tagStore *store, *tmp;
1.2.2.26  misho     376:        ait_val_t *p = NULL;
1.2.2.1   misho     377: 
                    378:        ioTRACE(2);
                    379: 
                    380:        if (!sess)
                    381:                return -1;
                    382: 
1.2.2.13  misho     383:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    384:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    385:        if (siz == -1) {
                    386:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    387:                return 0;
                    388:        }
                    389: 
                    390:        /* del from db */
                    391:        for (i = 0; i < siz; i++) {
                    392:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    393:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    394:                                        store->st_subscr.sub_topic.msg_base && 
                    395:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    396:                                                subs[i].sub_topic.msg_base)) {
                    397:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     398: 
                    399:                                if (store->st_subscr.sub_topic.msg_base)
                    400:                                        free(store->st_subscr.sub_topic.msg_base);
                    401:                                if (store->st_subscr.sub_value.msg_base)
                    402:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     403:                                io_free(store);
1.2.2.13  misho     404:                        }
                    405:                }
                    406: 
1.2.2.15  misho     407:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    408:                                sess->sess_user, "%");
1.2.2.13  misho     409:        }
                    410: 
                    411:        /* send acknowledge */
                    412:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    413:        if (siz == -1) {
                    414:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    415:                goto end;
1.2.2.21  misho     416:        } else {
                    417:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     418:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    419:        }
1.2.2.21  misho     420: 
                    421:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     422: end:
                    423:        mqtt_subFree(&subs);
1.2.2.1   misho     424:        return 0;
                    425: }
                    426: 
                    427: int
1.2.2.9   misho     428: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     429: {
                    430:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     431:        int siz = 0;
1.2.2.26  misho     432:        ait_val_t *p = NULL;
1.2.2.1   misho     433: 
                    434:        ioTRACE(2);
                    435: 
                    436:        if (!sess)
                    437:                return -1;
                    438: 
1.2.2.7   misho     439:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     440:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    441:        if (siz == -1) {
                    442:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    443:                return 0;
1.2.2.8   misho     444:        } else {
1.2.2.19  misho     445:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     446:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    447:        }
1.2.2.1   misho     448: 
1.2.2.19  misho     449:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     450:        return 0;
                    451: }
                    452: 
                    453: int
1.2.2.9   misho     454: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     455: {
                    456:        struct tagStore *store;
                    457:        struct tagSession *sess = (struct tagSession*) arg;
                    458: 
                    459:        ioTRACE(2);
                    460: 
                    461:        if (!sess)
                    462:                return -1;
                    463: 
1.2.2.6   misho     464:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     465:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    466: 
1.2.2.16  misho     467:        if (sess->sess_clean) {
                    468:                if (call.FiniSessPUB)
                    469:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    470:                if (call.DeletePUB_subscribe)
                    471:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    472:                if (call.WipePUB_topic)
                    473:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    474:        }
1.2.2.1   misho     475: 
1.2.2.6   misho     476:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    477:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     478: 
1.2.2.6   misho     479:                if (store->st_subscr.sub_topic.msg_base)
                    480:                        free(store->st_subscr.sub_topic.msg_base);
                    481:                if (store->st_subscr.sub_value.msg_base)
                    482:                        free(store->st_subscr.sub_value.msg_base);
                    483: 
1.2.2.18  misho     484:                io_free(store);
1.2.2.6   misho     485:        }
1.2.2.1   misho     486: 
                    487:        if (sess->sess_will.msg)
                    488:                free(sess->sess_will.msg);
                    489:        if (sess->sess_will.topic)
                    490:                free(sess->sess_will.topic);
                    491: 
                    492:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    493:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     494: 
1.2.2.12  misho     495:        return -3;      /* reconnect client */
1.2.2.1   misho     496: }
                    497: 
                    498: int
1.2.2.9   misho     499: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     500: {
                    501:        struct tagSession *sess = (struct tagSession*) arg;
                    502: 
                    503:        ioTRACE(2);
                    504: 
                    505:        if (!sess)
                    506:                return -1;
                    507: 
1.2.2.5   misho     508:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     509: 
1.2.2.1   misho     510:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    511:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     512: 
1.2.2.10  misho     513:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     514: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>