Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.31

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.26  misho       7: static inline ait_val_t *
1.2.2.19  misho       8: mkPkt(void * __restrict data, int dlen)
                      9: {
1.2.2.26  misho      10:        ait_val_t *p = NULL;
1.2.2.19  misho      11: 
1.2.2.26  misho      12:        if (!(p = io_allocVar())) {
1.2.2.19  misho      13:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     14:                return NULL;
                     15:        }
                     16: 
                     17:        if (data && dlen > 0)
1.2.2.26  misho      18:                AIT_SET_BUF(p, data, dlen);
1.2.2.19  misho      19: 
                     20:        return p;
                     21: }
                     22: 
                     23: static inline void
1.2.2.26  misho      24: freePkt(ait_val_t ** __restrict p)
1.2.2.19  misho      25: {
1.2.2.20  misho      26:        if (!p)
1.2.2.19  misho      27:                return;
                     28: 
1.2.2.26  misho      29:        io_freeVar(p);
1.2.2.19  misho      30: }
                     31: 
                     32: static void *
                     33: sendPacket(sched_task_t *task)
                     34: {
1.2.2.26  misho      35:        ait_val_t *p = TASK_ARG(task);
1.2.2.19  misho      36:        register int n, slen;
                     37:        u_char *pos;
                     38: 
1.2.2.26  misho      39:        if (!p || AIT_ISEMPTY(p)) {
1.2.2.19  misho      40:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     41:                return NULL;
                     42:        }
                     43: 
1.2.2.26  misho      44:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19  misho      45:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     46:                if (n == -1) {
                     47:                        ioSYSERR(0);
                     48:                        break;
                     49:                }
                     50:        }
                     51: 
1.2.2.26  misho      52:        freePkt(&p);
1.2.2.19  misho      53:        return NULL;
                     54: }
                     55: 
1.2.2.22  misho      56: /* --------------------------------------------------- */
1.2.2.19  misho      57: 
1.2.2.17  misho      58: static int
1.2.2.22  misho      59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      60: {
1.2.2.26  misho      61:        ait_val_t *p = NULL;
1.2.2.22  misho      62:        struct tagSession *s = NULL;
1.2.2.30  misho      63:        struct tagStore *st_, *st = NULL;
1.2.2.22  misho      64:        regex_t re;
                     65:        regmatch_t match;
                     66:        int ret;
                     67:        char szStr[STRSIZ];
                     68: 
1.2.2.30  misho      69: 
1.2.2.22  misho      70:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30  misho      71:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.22  misho      72:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     73:                                regerror(ret, &re, szStr, sizeof szStr);
                     74:                                regfree(&re);
                     75:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     76:                                                st->st_subscr.sub_topic.msg_base, szStr);
                     77:                        }
                     78:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                     79:                                /* MATCH */
                     80:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23  misho      81:                                schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22  misho      82:                        }
                     83: 
                     84:                        regfree(&re);
                     85:                }
                     86:        }
                     87: 
1.2.2.17  misho      88:        return 0;
                     89: }
                     90: 
                     91: static int
1.2.2.26  misho      92: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho      93: {
1.2.2.26  misho      94:        ait_val_t *p = NULL;
                     95:        struct tagSession *s = NULL;
1.2.2.30  misho      96:        struct tagStore *st_, *st = NULL;
1.2.2.26  misho      97:        regex_t re;
                     98:        regmatch_t match;
1.2.2.28  misho      99:        int ret;
1.2.2.26  misho     100:        char szStr[STRSIZ];
                    101:        struct mqtthdr *hdr;
                    102: 
                    103:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    104:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    105: 
                    106:        /* write topic to database */
1.2.2.27  misho     107:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    108:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.26  misho     109:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
1.2.2.27  misho     110:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26  misho     111: 
                    112:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30  misho     113:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.28  misho     114:                        /* check for QoS */
                    115:                        if (st->st_subscr.sub_ret < MQTT_QOS_ACK)
                    116:                                continue;
                    117: 
1.2.2.26  misho     118:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    119:                                regerror(ret, &re, szStr, sizeof szStr);
                    120:                                regfree(&re);
                    121:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    122:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    123:                        }
                    124:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
1.2.2.28  misho     125:                                /* MATCH */
                    126:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
                    127:                                                s->sess_sock, NULL, 0);
1.2.2.26  misho     128:                        }
                    129: 
                    130:                        regfree(&re);
                    131:                }
                    132:        }
                    133: 
1.2.2.28  misho     134:        /* delete not retain message */
1.2.2.27  misho     135:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    136:                        sess->sess_addr, 0);
1.2.2.28  misho     137: 
                    138:        freePkt(&p);
1.2.2.17  misho     139:        return 0;
                    140: }
                    141: 
                    142: static int
1.2.2.26  misho     143: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho     144: {
1.2.2.28  misho     145:        ait_val_t *p = NULL;
                    146:        struct tagSession *s = NULL;
1.2.2.30  misho     147:        struct tagStore *st_, *st = NULL;
1.2.2.28  misho     148:        regex_t re;
                    149:        regmatch_t match;
                    150:        int ret;
                    151:        char szStr[STRSIZ];
                    152:        struct mqtthdr *hdr;
                    153: 
                    154:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    155:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    156: 
                    157:        /* write topic to database */
                    158:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    159:                        sess->sess_addr, hdr->mqtt_msg.retain);
                    160:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
                    161:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
                    162: 
                    163:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30  misho     164:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.28  misho     165:                        /* check for QoS */
                    166:                        if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY)
                    167:                                continue;
                    168: 
                    169:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    170:                                regerror(ret, &re, szStr, sizeof szStr);
                    171:                                regfree(&re);
                    172:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    173:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    174:                        }
                    175:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                    176:                                /* MATCH */
                    177:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
                    178:                                                s->sess_sock, NULL, 0);
                    179:                        }
                    180: 
                    181:                        regfree(&re);
                    182:                }
                    183:        }
                    184: 
                    185:        freePkt(&p);
1.2.2.17  misho     186:        return 0;
                    187: }
                    188: 
                    189: 
1.2       misho     190: int
1.2.2.9   misho     191: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     192: {
                    193:        struct mqtthdr *hdr;
1.2.2.1   misho     194:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     195:        char szTopic[STRSIZ] = { 0 };
                    196:        int siz = 0;
                    197:        u_short mid = 0;
1.2.2.26  misho     198:        ait_val_t *p = NULL;
1.2       misho     199: 
                    200:        ioTRACE(2);
                    201: 
                    202:        if (!sess)
                    203:                return -1;
                    204: 
1.2.2.17  misho     205:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     206:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     207:        if (siz == -1) {
                    208:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    209:                return 0;
                    210:        }
                    211: 
1.2       misho     212:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    213:        switch (hdr->mqtt_msg.qos) {
                    214:                case MQTT_QOS_ACK:
1.2.2.26  misho     215:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    216:                                return 0;
1.2.2.17  misho     217:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    218:                        if (siz == -1) {
                    219:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    220:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     221:                                return 0;
1.2.2.17  misho     222:                        }
1.2       misho     223:                        break;
                    224:                case MQTT_QOS_EXACTLY:
1.2.2.26  misho     225:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    226:                                return 0;
1.2.2.17  misho     227:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    228:                        if (siz == -1) {
                    229:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    230:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     231:                                return 0;
1.2.2.17  misho     232:                        }
1.2       misho     233:                        break;
1.2.2.17  misho     234:                case MQTT_QOS_ONCE:
1.2.2.23  misho     235:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     236:                default:
1.2.2.22  misho     237:                        return 0;
1.2       misho     238:        }
                    239: 
1.2.2.22  misho     240:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    241:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    242:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     243:        return 0;
                    244: }
1.2.2.1   misho     245: 
                    246: int
1.2.2.9   misho     247: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     248: {
                    249:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     250:        int siz = 0;
                    251:        u_short mid = 0;
1.2.2.26  misho     252:        ait_val_t *p = NULL;
1.2.2.1   misho     253: 
                    254:        ioTRACE(2);
                    255: 
                    256:        if (!sess)
                    257:                return -1;
                    258: 
1.2.2.17  misho     259:        ioDEBUG(5, "Exec PUBREL session");
                    260:        mid = mqtt_readPUBREL(sess->sess_buf);
                    261:        if (mid == (u_short) -1) {
                    262:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    263:                return 0;
                    264:        }
                    265: 
1.2.2.29  misho     266:        /* delete not retain message */
                    267:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, 
                    268:                        sess->sess_addr, 0);
1.2.2.17  misho     269: 
                    270:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    271:        if (siz == -1) {
                    272:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    273:                return 0;
                    274:        }
1.2.2.1   misho     275: 
1.2.2.29  misho     276:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    277:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
1.2.2.22  misho     278:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     279:        return 0;
                    280: }
                    281: 
                    282: int
1.2.2.9   misho     283: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     284: {
                    285:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     286:        mqtt_subscr_t *subs = NULL;
                    287:        int siz = 0;
                    288:        u_short mid = 0;
                    289:        register int i;
                    290:        struct tagStore *store;
1.2.2.17  misho     291:        char buf[BUFSIZ];
                    292:        void *ptr;
1.2.2.26  misho     293:        ait_val_t *p = NULL;
1.2.2.1   misho     294: 
                    295:        ioTRACE(2);
                    296: 
                    297:        if (!sess)
                    298:                return -1;
                    299: 
1.2.2.7   misho     300:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    301:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    302:        if (siz == -1) {
                    303:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    304:                return 0;
                    305:        }
                    306: 
                    307:        /* add to db */
1.2.2.25  misho     308:        for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.31! misho     309:                store = io_malloc(sizeof(struct tagStore));
        !           310:                if (!store) {
        !           311:                        ioSYSERR(0);
1.2.2.25  misho     312:                        continue;
1.2.2.31! misho     313:                } else {
        !           314:                        store->st_msgid = mid;
        !           315:                        mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.17  misho     316:                }
1.2.2.31! misho     317: 
        !           318:                /* add to cache */
        !           319:                SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
        !           320: 
        !           321:                /* convert topic to regexp */
        !           322:                if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
        !           323:                        ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           324: 
        !           325:                        subs[i].sub_ret = MQTT_QOS_DENY;
        !           326:                } else {
        !           327:                        ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
        !           328:                        if (!ptr) {
1.2.2.7   misho     329:                                ioSYSERR(0);
1.2.2.25  misho     330:                                continue;
1.2.2.7   misho     331:                        } else {
1.2.2.31! misho     332:                                store->st_subscr.sub_topic.msg_base = ptr;
        !           333:                                store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
        !           334:                                memcpy(store->st_subscr.sub_topic.msg_base, buf, 
        !           335:                                                store->st_subscr.sub_topic.msg_len);
1.2.2.7   misho     336:                        }
                    337: 
1.2.2.31! misho     338:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
        !           339:                                        store->st_subscr.sub_topic.msg_base, 
        !           340:                                        store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.7   misho     341: 
1.2.2.31! misho     342:                        subs[i].sub_ret = MQTT_QOS_PASS;
1.2.2.25  misho     343:                }
1.2.2.31! misho     344: 
        !           345:                call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
        !           346:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret);
1.2.2.7   misho     347:        }
1.2.2.1   misho     348: 
1.2.2.7   misho     349:        /* send acknowledge */
                    350:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    351:        if (siz == -1) {
                    352:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    353:                goto end;
1.2.2.21  misho     354:        } else {
                    355:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     356:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    357:        }
1.2.2.21  misho     358: 
                    359:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     360: end:
                    361:        mqtt_subFree(&subs);
1.2.2.1   misho     362:        return 0;
                    363: }
                    364: 
                    365: int
1.2.2.9   misho     366: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     367: {
                    368:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     369:        mqtt_subscr_t *subs = NULL;
                    370:        int siz = 0;
                    371:        u_short mid = 0;
                    372:        register int i;
                    373:        struct tagStore *store, *tmp;
1.2.2.26  misho     374:        ait_val_t *p = NULL;
1.2.2.1   misho     375: 
                    376:        ioTRACE(2);
                    377: 
                    378:        if (!sess)
                    379:                return -1;
                    380: 
1.2.2.13  misho     381:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    382:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    383:        if (siz == -1) {
                    384:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    385:                return 0;
                    386:        }
                    387: 
                    388:        /* del from db */
                    389:        for (i = 0; i < siz; i++) {
                    390:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    391:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    392:                                        store->st_subscr.sub_topic.msg_base && 
                    393:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    394:                                                subs[i].sub_topic.msg_base)) {
                    395:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     396: 
                    397:                                if (store->st_subscr.sub_topic.msg_base)
                    398:                                        free(store->st_subscr.sub_topic.msg_base);
                    399:                                if (store->st_subscr.sub_value.msg_base)
                    400:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     401:                                io_free(store);
1.2.2.13  misho     402:                        }
                    403:                }
                    404: 
1.2.2.15  misho     405:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    406:                                sess->sess_user, "%");
1.2.2.13  misho     407:        }
                    408: 
                    409:        /* send acknowledge */
                    410:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    411:        if (siz == -1) {
                    412:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    413:                goto end;
1.2.2.21  misho     414:        } else {
                    415:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     416:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    417:        }
1.2.2.21  misho     418: 
                    419:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     420: end:
                    421:        mqtt_subFree(&subs);
1.2.2.1   misho     422:        return 0;
                    423: }
                    424: 
                    425: int
1.2.2.9   misho     426: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     427: {
                    428:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     429:        int siz = 0;
1.2.2.26  misho     430:        ait_val_t *p = NULL;
1.2.2.1   misho     431: 
                    432:        ioTRACE(2);
                    433: 
                    434:        if (!sess)
                    435:                return -1;
                    436: 
1.2.2.7   misho     437:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     438:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    439:        if (siz == -1) {
                    440:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    441:                return 0;
1.2.2.8   misho     442:        } else {
1.2.2.19  misho     443:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     444:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    445:        }
1.2.2.1   misho     446: 
1.2.2.19  misho     447:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     448:        return 0;
                    449: }
                    450: 
                    451: int
1.2.2.9   misho     452: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     453: {
                    454:        struct tagStore *store;
                    455:        struct tagSession *sess = (struct tagSession*) arg;
                    456: 
                    457:        ioTRACE(2);
                    458: 
                    459:        if (!sess)
                    460:                return -1;
                    461: 
1.2.2.6   misho     462:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     463:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    464: 
1.2.2.16  misho     465:        if (sess->sess_clean) {
                    466:                if (call.FiniSessPUB)
                    467:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    468:                if (call.DeletePUB_subscribe)
                    469:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    470:                if (call.WipePUB_topic)
                    471:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    472:        }
1.2.2.1   misho     473: 
1.2.2.6   misho     474:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    475:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     476: 
1.2.2.6   misho     477:                if (store->st_subscr.sub_topic.msg_base)
                    478:                        free(store->st_subscr.sub_topic.msg_base);
                    479:                if (store->st_subscr.sub_value.msg_base)
                    480:                        free(store->st_subscr.sub_value.msg_base);
                    481: 
1.2.2.18  misho     482:                io_free(store);
1.2.2.6   misho     483:        }
1.2.2.1   misho     484: 
                    485:        if (sess->sess_will.msg)
                    486:                free(sess->sess_will.msg);
                    487:        if (sess->sess_will.topic)
                    488:                free(sess->sess_will.topic);
                    489: 
                    490:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    491:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     492: 
1.2.2.12  misho     493:        return -3;      /* reconnect client */
1.2.2.1   misho     494: }
                    495: 
                    496: int
1.2.2.9   misho     497: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     498: {
                    499:        struct tagSession *sess = (struct tagSession*) arg;
                    500: 
                    501:        ioTRACE(2);
                    502: 
                    503:        if (!sess)
                    504:                return -1;
                    505: 
1.2.2.5   misho     506:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     507: 
1.2.2.1   misho     508:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    509:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     510: 
1.2.2.10  misho     511:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     512: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>