Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.29

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.26  misho       7: static inline ait_val_t *
1.2.2.19  misho       8: mkPkt(void * __restrict data, int dlen)
                      9: {
1.2.2.26  misho      10:        ait_val_t *p = NULL;
1.2.2.19  misho      11: 
1.2.2.26  misho      12:        if (!(p = io_allocVar())) {
1.2.2.19  misho      13:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     14:                return NULL;
                     15:        }
                     16: 
                     17:        if (data && dlen > 0)
1.2.2.26  misho      18:                AIT_SET_BUF(p, data, dlen);
1.2.2.19  misho      19: 
                     20:        return p;
                     21: }
                     22: 
                     23: static inline void
1.2.2.26  misho      24: freePkt(ait_val_t ** __restrict p)
1.2.2.19  misho      25: {
1.2.2.20  misho      26:        if (!p)
1.2.2.19  misho      27:                return;
                     28: 
1.2.2.26  misho      29:        io_freeVar(p);
1.2.2.19  misho      30: }
                     31: 
                     32: static void *
                     33: sendPacket(sched_task_t *task)
                     34: {
1.2.2.26  misho      35:        ait_val_t *p = TASK_ARG(task);
1.2.2.19  misho      36:        register int n, slen;
                     37:        u_char *pos;
                     38: 
1.2.2.26  misho      39:        if (!p || AIT_ISEMPTY(p)) {
1.2.2.19  misho      40:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     41:                return NULL;
                     42:        }
                     43: 
1.2.2.26  misho      44:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19  misho      45:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     46:                if (n == -1) {
                     47:                        ioSYSERR(0);
                     48:                        break;
                     49:                }
                     50:        }
                     51: 
1.2.2.26  misho      52:        freePkt(&p);
1.2.2.19  misho      53:        return NULL;
                     54: }
                     55: 
1.2.2.22  misho      56: /* --------------------------------------------------- */
1.2.2.19  misho      57: 
1.2.2.17  misho      58: static int
1.2.2.22  misho      59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      60: {
1.2.2.26  misho      61:        ait_val_t *p = NULL;
1.2.2.22  misho      62:        struct tagSession *s = NULL;
                     63:        struct tagStore *st = NULL;
                     64:        regex_t re;
                     65:        regmatch_t match;
                     66:        int ret;
                     67:        char szStr[STRSIZ];
                     68: 
                     69:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                     70:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
                     71:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     72:                                regerror(ret, &re, szStr, sizeof szStr);
                     73:                                regfree(&re);
                     74:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     75:                                                st->st_subscr.sub_topic.msg_base, szStr);
                     76:                        }
                     77:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                     78:                                /* MATCH */
                     79:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23  misho      80:                                schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22  misho      81:                        }
                     82: 
                     83:                        regfree(&re);
                     84:                }
                     85:        }
                     86: 
1.2.2.17  misho      87:        return 0;
                     88: }
                     89: 
                     90: static int
1.2.2.26  misho      91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho      92: {
1.2.2.26  misho      93:        ait_val_t *p = NULL;
                     94:        struct tagSession *s = NULL;
                     95:        struct tagStore *st = NULL;
                     96:        regex_t re;
                     97:        regmatch_t match;
1.2.2.28  misho      98:        int ret;
1.2.2.26  misho      99:        char szStr[STRSIZ];
                    100:        struct mqtthdr *hdr;
                    101: 
                    102:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    103:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    104: 
                    105:        /* write topic to database */
1.2.2.27  misho     106:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    107:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.26  misho     108:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
1.2.2.27  misho     109:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26  misho     110: 
                    111:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                    112:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
1.2.2.28  misho     113:                        /* check for QoS */
                    114:                        if (st->st_subscr.sub_ret < MQTT_QOS_ACK)
                    115:                                continue;
                    116: 
1.2.2.26  misho     117:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    118:                                regerror(ret, &re, szStr, sizeof szStr);
                    119:                                regfree(&re);
                    120:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    121:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    122:                        }
                    123:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
1.2.2.28  misho     124:                                /* MATCH */
                    125:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
                    126:                                                s->sess_sock, NULL, 0);
1.2.2.26  misho     127:                        }
                    128: 
                    129:                        regfree(&re);
                    130:                }
                    131:        }
                    132: 
1.2.2.28  misho     133:        /* delete not retain message */
1.2.2.27  misho     134:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    135:                        sess->sess_addr, 0);
1.2.2.28  misho     136: 
                    137:        freePkt(&p);
1.2.2.17  misho     138:        return 0;
                    139: }
                    140: 
                    141: static int
1.2.2.26  misho     142: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho     143: {
1.2.2.28  misho     144:        ait_val_t *p = NULL;
                    145:        struct tagSession *s = NULL;
                    146:        struct tagStore *st = NULL;
                    147:        regex_t re;
                    148:        regmatch_t match;
                    149:        int ret;
                    150:        char szStr[STRSIZ];
                    151:        struct mqtthdr *hdr;
                    152: 
                    153:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    154:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    155: 
                    156:        /* write topic to database */
                    157:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    158:                        sess->sess_addr, hdr->mqtt_msg.retain);
                    159:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
                    160:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
                    161: 
                    162:        TAILQ_FOREACH(s, &Sessions, sess_node) {
                    163:                SLIST_FOREACH(st, &s->sess_subscr, st_node) {
                    164:                        /* check for QoS */
                    165:                        if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY)
                    166:                                continue;
                    167: 
                    168:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    169:                                regerror(ret, &re, szStr, sizeof szStr);
                    170:                                regfree(&re);
                    171:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    172:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    173:                        }
                    174:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                    175:                                /* MATCH */
                    176:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
                    177:                                                s->sess_sock, NULL, 0);
                    178:                        }
                    179: 
                    180:                        regfree(&re);
                    181:                }
                    182:        }
                    183: 
                    184:        freePkt(&p);
1.2.2.17  misho     185:        return 0;
                    186: }
                    187: 
                    188: 
1.2       misho     189: int
1.2.2.9   misho     190: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     191: {
                    192:        struct mqtthdr *hdr;
1.2.2.1   misho     193:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     194:        char szTopic[STRSIZ] = { 0 };
                    195:        int siz = 0;
                    196:        u_short mid = 0;
1.2.2.26  misho     197:        ait_val_t *p = NULL;
1.2       misho     198: 
                    199:        ioTRACE(2);
                    200: 
                    201:        if (!sess)
                    202:                return -1;
                    203: 
1.2.2.17  misho     204:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     205:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     206:        if (siz == -1) {
                    207:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    208:                return 0;
                    209:        }
                    210: 
1.2       misho     211:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    212:        switch (hdr->mqtt_msg.qos) {
                    213:                case MQTT_QOS_ACK:
1.2.2.26  misho     214:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    215:                                return 0;
1.2.2.17  misho     216:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    217:                        if (siz == -1) {
                    218:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    219:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     220:                                return 0;
1.2.2.17  misho     221:                        }
1.2       misho     222:                        break;
                    223:                case MQTT_QOS_EXACTLY:
1.2.2.26  misho     224:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    225:                                return 0;
1.2.2.17  misho     226:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    227:                        if (siz == -1) {
                    228:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    229:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     230:                                return 0;
1.2.2.17  misho     231:                        }
1.2       misho     232:                        break;
1.2.2.17  misho     233:                case MQTT_QOS_ONCE:
1.2.2.23  misho     234:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     235:                default:
1.2.2.22  misho     236:                        return 0;
1.2       misho     237:        }
                    238: 
1.2.2.22  misho     239:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    240:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    241:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     242:        return 0;
                    243: }
1.2.2.1   misho     244: 
                    245: int
1.2.2.9   misho     246: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     247: {
                    248:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     249:        int siz = 0;
                    250:        u_short mid = 0;
1.2.2.26  misho     251:        ait_val_t *p = NULL;
1.2.2.1   misho     252: 
                    253:        ioTRACE(2);
                    254: 
                    255:        if (!sess)
                    256:                return -1;
                    257: 
1.2.2.17  misho     258:        ioDEBUG(5, "Exec PUBREL session");
                    259:        mid = mqtt_readPUBREL(sess->sess_buf);
                    260:        if (mid == (u_short) -1) {
                    261:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    262:                return 0;
                    263:        }
                    264: 
1.2.2.29! misho     265:        /* delete not retain message */
        !           266:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, 
        !           267:                        sess->sess_addr, 0);
1.2.2.17  misho     268: 
                    269:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    270:        if (siz == -1) {
                    271:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    272:                return 0;
                    273:        }
1.2.2.1   misho     274: 
1.2.2.29! misho     275:        p = mkPkt(sess->sess_buf->msg_base, siz);
        !           276:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
1.2.2.22  misho     277:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     278:        return 0;
                    279: }
                    280: 
                    281: int
1.2.2.9   misho     282: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     283: {
                    284:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     285:        mqtt_subscr_t *subs = NULL;
                    286:        int siz = 0;
                    287:        u_short mid = 0;
                    288:        register int i;
                    289:        struct tagStore *store;
1.2.2.17  misho     290:        char buf[BUFSIZ];
                    291:        void *ptr;
1.2.2.26  misho     292:        ait_val_t *p = NULL;
1.2.2.1   misho     293: 
                    294:        ioTRACE(2);
                    295: 
                    296:        if (!sess)
                    297:                return -1;
                    298: 
1.2.2.7   misho     299:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    300:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    301:        if (siz == -1) {
                    302:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    303:                return 0;
                    304:        }
                    305: 
                    306:        /* add to db */
1.2.2.25  misho     307:        for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.17  misho     308:                /* convert topic to sql search statement */
                    309:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    310:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.25  misho     311:                        continue;
1.2.2.17  misho     312:                }
                    313:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     314:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     315:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     316:                        if (!store) {
                    317:                                ioSYSERR(0);
1.2.2.25  misho     318:                                continue;
1.2.2.7   misho     319:                        } else {
                    320:                                store->st_msgid = mid;
1.2.2.8   misho     321:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     322:                        }
                    323: 
                    324:                        /* add to cache */
                    325:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    326: 
1.2.2.17  misho     327:                        /* convert topic to regexp */
1.2.2.24  misho     328:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.2.2.17  misho     329:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.24  misho     330: 
                    331:                                subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17  misho     332:                        } else {
                    333:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
1.2.2.25  misho     334:                                if (!ptr)
1.2.2.17  misho     335:                                        ioSYSERR(0);
1.2.2.25  misho     336:                                else {
1.2.2.17  misho     337:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    338:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    339:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    340:                                                        store->st_subscr.sub_topic.msg_len);
                    341:                                }
                    342: 
1.2.2.24  misho     343:                                call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    344:                                                store->st_subscr.sub_topic.msg_base, 
                    345:                                                store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.17  misho     346: 
1.2.2.24  misho     347:                                subs[i].sub_ret = MQTT_QOS_PASS;
                    348:                        }
1.2.2.25  misho     349:                }
1.2.2.7   misho     350:        }
1.2.2.1   misho     351: 
1.2.2.7   misho     352:        /* send acknowledge */
                    353:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    354:        if (siz == -1) {
                    355:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    356:                goto end;
1.2.2.21  misho     357:        } else {
                    358:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     359:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    360:        }
1.2.2.21  misho     361: 
                    362:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     363: end:
                    364:        mqtt_subFree(&subs);
1.2.2.1   misho     365:        return 0;
                    366: }
                    367: 
                    368: int
1.2.2.9   misho     369: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     370: {
                    371:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     372:        mqtt_subscr_t *subs = NULL;
                    373:        int siz = 0;
                    374:        u_short mid = 0;
                    375:        register int i;
                    376:        struct tagStore *store, *tmp;
1.2.2.26  misho     377:        ait_val_t *p = NULL;
1.2.2.1   misho     378: 
                    379:        ioTRACE(2);
                    380: 
                    381:        if (!sess)
                    382:                return -1;
                    383: 
1.2.2.13  misho     384:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    385:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    386:        if (siz == -1) {
                    387:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    388:                return 0;
                    389:        }
                    390: 
                    391:        /* del from db */
                    392:        for (i = 0; i < siz; i++) {
                    393:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    394:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    395:                                        store->st_subscr.sub_topic.msg_base && 
                    396:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    397:                                                subs[i].sub_topic.msg_base)) {
                    398:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     399: 
                    400:                                if (store->st_subscr.sub_topic.msg_base)
                    401:                                        free(store->st_subscr.sub_topic.msg_base);
                    402:                                if (store->st_subscr.sub_value.msg_base)
                    403:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     404:                                io_free(store);
1.2.2.13  misho     405:                        }
                    406:                }
                    407: 
1.2.2.15  misho     408:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    409:                                sess->sess_user, "%");
1.2.2.13  misho     410:        }
                    411: 
                    412:        /* send acknowledge */
                    413:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    414:        if (siz == -1) {
                    415:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    416:                goto end;
1.2.2.21  misho     417:        } else {
                    418:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     419:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    420:        }
1.2.2.21  misho     421: 
                    422:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     423: end:
                    424:        mqtt_subFree(&subs);
1.2.2.1   misho     425:        return 0;
                    426: }
                    427: 
                    428: int
1.2.2.9   misho     429: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     430: {
                    431:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     432:        int siz = 0;
1.2.2.26  misho     433:        ait_val_t *p = NULL;
1.2.2.1   misho     434: 
                    435:        ioTRACE(2);
                    436: 
                    437:        if (!sess)
                    438:                return -1;
                    439: 
1.2.2.7   misho     440:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     441:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    442:        if (siz == -1) {
                    443:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    444:                return 0;
1.2.2.8   misho     445:        } else {
1.2.2.19  misho     446:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     447:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    448:        }
1.2.2.1   misho     449: 
1.2.2.19  misho     450:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     451:        return 0;
                    452: }
                    453: 
                    454: int
1.2.2.9   misho     455: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     456: {
                    457:        struct tagStore *store;
                    458:        struct tagSession *sess = (struct tagSession*) arg;
                    459: 
                    460:        ioTRACE(2);
                    461: 
                    462:        if (!sess)
                    463:                return -1;
                    464: 
1.2.2.6   misho     465:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     466:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    467: 
1.2.2.16  misho     468:        if (sess->sess_clean) {
                    469:                if (call.FiniSessPUB)
                    470:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    471:                if (call.DeletePUB_subscribe)
                    472:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    473:                if (call.WipePUB_topic)
                    474:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    475:        }
1.2.2.1   misho     476: 
1.2.2.6   misho     477:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    478:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     479: 
1.2.2.6   misho     480:                if (store->st_subscr.sub_topic.msg_base)
                    481:                        free(store->st_subscr.sub_topic.msg_base);
                    482:                if (store->st_subscr.sub_value.msg_base)
                    483:                        free(store->st_subscr.sub_value.msg_base);
                    484: 
1.2.2.18  misho     485:                io_free(store);
1.2.2.6   misho     486:        }
1.2.2.1   misho     487: 
                    488:        if (sess->sess_will.msg)
                    489:                free(sess->sess_will.msg);
                    490:        if (sess->sess_will.topic)
                    491:                free(sess->sess_will.topic);
                    492: 
                    493:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    494:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     495: 
1.2.2.12  misho     496:        return -3;      /* reconnect client */
1.2.2.1   misho     497: }
                    498: 
                    499: int
1.2.2.9   misho     500: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     501: {
                    502:        struct tagSession *sess = (struct tagSession*) arg;
                    503: 
                    504:        ioTRACE(2);
                    505: 
                    506:        if (!sess)
                    507:                return -1;
                    508: 
1.2.2.5   misho     509:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     510: 
1.2.2.1   misho     511:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    512:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     513: 
1.2.2.10  misho     514:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     515: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>