Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.30

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.26  misho       7: static inline ait_val_t *
1.2.2.19  misho       8: mkPkt(void * __restrict data, int dlen)
                      9: {
1.2.2.26  misho      10:        ait_val_t *p = NULL;
1.2.2.19  misho      11: 
1.2.2.26  misho      12:        if (!(p = io_allocVar())) {
1.2.2.19  misho      13:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     14:                return NULL;
                     15:        }
                     16: 
                     17:        if (data && dlen > 0)
1.2.2.26  misho      18:                AIT_SET_BUF(p, data, dlen);
1.2.2.19  misho      19: 
                     20:        return p;
                     21: }
                     22: 
                     23: static inline void
1.2.2.26  misho      24: freePkt(ait_val_t ** __restrict p)
1.2.2.19  misho      25: {
1.2.2.20  misho      26:        if (!p)
1.2.2.19  misho      27:                return;
                     28: 
1.2.2.26  misho      29:        io_freeVar(p);
1.2.2.19  misho      30: }
                     31: 
                     32: static void *
                     33: sendPacket(sched_task_t *task)
                     34: {
1.2.2.26  misho      35:        ait_val_t *p = TASK_ARG(task);
1.2.2.19  misho      36:        register int n, slen;
                     37:        u_char *pos;
                     38: 
1.2.2.26  misho      39:        if (!p || AIT_ISEMPTY(p)) {
1.2.2.19  misho      40:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     41:                return NULL;
                     42:        }
                     43: 
1.2.2.26  misho      44:        for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19  misho      45:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     46:                if (n == -1) {
                     47:                        ioSYSERR(0);
                     48:                        break;
                     49:                }
                     50:        }
                     51: 
1.2.2.26  misho      52:        freePkt(&p);
1.2.2.19  misho      53:        return NULL;
                     54: }
                     55: 
1.2.2.22  misho      56: /* --------------------------------------------------- */
1.2.2.19  misho      57: 
1.2.2.17  misho      58: static int
1.2.2.22  misho      59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17  misho      60: {
1.2.2.26  misho      61:        ait_val_t *p = NULL;
1.2.2.22  misho      62:        struct tagSession *s = NULL;
1.2.2.30! misho      63:        struct tagStore *st_, *st = NULL;
1.2.2.22  misho      64:        regex_t re;
                     65:        regmatch_t match;
                     66:        int ret;
                     67:        char szStr[STRSIZ];
                     68: 
1.2.2.30! misho      69: 
1.2.2.22  misho      70:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30! misho      71:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.22  misho      72:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                     73:                                regerror(ret, &re, szStr, sizeof szStr);
                     74:                                regfree(&re);
                     75:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                     76:                                                st->st_subscr.sub_topic.msg_base, szStr);
                     77:                        }
                     78:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                     79:                                /* MATCH */
                     80:                                p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23  misho      81:                                schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22  misho      82:                        }
                     83: 
                     84:                        regfree(&re);
                     85:                }
                     86:        }
                     87: 
1.2.2.17  misho      88:        return 0;
                     89: }
                     90: 
                     91: static int
1.2.2.26  misho      92: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho      93: {
1.2.2.26  misho      94:        ait_val_t *p = NULL;
                     95:        struct tagSession *s = NULL;
1.2.2.30! misho      96:        struct tagStore *st_, *st = NULL;
1.2.2.26  misho      97:        regex_t re;
                     98:        regmatch_t match;
1.2.2.28  misho      99:        int ret;
1.2.2.26  misho     100:        char szStr[STRSIZ];
                    101:        struct mqtthdr *hdr;
                    102: 
                    103:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    104:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    105: 
                    106:        /* write topic to database */
1.2.2.27  misho     107:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    108:                        sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.26  misho     109:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
1.2.2.27  misho     110:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26  misho     111: 
                    112:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30! misho     113:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.28  misho     114:                        /* check for QoS */
                    115:                        if (st->st_subscr.sub_ret < MQTT_QOS_ACK)
                    116:                                continue;
                    117: 
1.2.2.26  misho     118:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    119:                                regerror(ret, &re, szStr, sizeof szStr);
                    120:                                regfree(&re);
                    121:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    122:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    123:                        }
                    124:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
1.2.2.28  misho     125:                                /* MATCH */
                    126:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
                    127:                                                s->sess_sock, NULL, 0);
1.2.2.26  misho     128:                        }
                    129: 
                    130:                        regfree(&re);
                    131:                }
                    132:        }
                    133: 
1.2.2.28  misho     134:        /* delete not retain message */
1.2.2.27  misho     135:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    136:                        sess->sess_addr, 0);
1.2.2.28  misho     137: 
                    138:        freePkt(&p);
1.2.2.17  misho     139:        return 0;
                    140: }
                    141: 
                    142: static int
1.2.2.26  misho     143: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17  misho     144: {
1.2.2.28  misho     145:        ait_val_t *p = NULL;
                    146:        struct tagSession *s = NULL;
1.2.2.30! misho     147:        struct tagStore *st_, *st = NULL;
1.2.2.28  misho     148:        regex_t re;
                    149:        regmatch_t match;
                    150:        int ret;
                    151:        char szStr[STRSIZ];
                    152:        struct mqtthdr *hdr;
                    153: 
                    154:        p = mkPkt(sess->sess_buf->msg_base, datlen);
                    155:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    156: 
                    157:        /* write topic to database */
                    158:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 
                    159:                        sess->sess_addr, hdr->mqtt_msg.retain);
                    160:        call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 
                    161:                        sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
                    162: 
                    163:        TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30! misho     164:                SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.28  misho     165:                        /* check for QoS */
                    166:                        if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY)
                    167:                                continue;
                    168: 
                    169:                        if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                    170:                                regerror(ret, &re, szStr, sizeof szStr);
                    171:                                regfree(&re);
                    172:                                ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
                    173:                                                st->st_subscr.sub_topic.msg_base, szStr);
                    174:                        }
                    175:                        if (!regexec(&re, psTopic, 1, &match, 0)) {
                    176:                                /* MATCH */
                    177:                                schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 
                    178:                                                s->sess_sock, NULL, 0);
                    179:                        }
                    180: 
                    181:                        regfree(&re);
                    182:                }
                    183:        }
                    184: 
                    185:        freePkt(&p);
1.2.2.17  misho     186:        return 0;
                    187: }
                    188: 
                    189: 
1.2       misho     190: int
1.2.2.9   misho     191: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho     192: {
                    193:        struct mqtthdr *hdr;
1.2.2.1   misho     194:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     195:        char szTopic[STRSIZ] = { 0 };
                    196:        int siz = 0;
                    197:        u_short mid = 0;
1.2.2.26  misho     198:        ait_val_t *p = NULL;
1.2       misho     199: 
                    200:        ioTRACE(2);
                    201: 
                    202:        if (!sess)
                    203:                return -1;
                    204: 
1.2.2.17  misho     205:        ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22  misho     206:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17  misho     207:        if (siz == -1) {
                    208:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    209:                return 0;
                    210:        }
                    211: 
1.2       misho     212:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    213:        switch (hdr->mqtt_msg.qos) {
                    214:                case MQTT_QOS_ACK:
1.2.2.26  misho     215:                        if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    216:                                return 0;
1.2.2.17  misho     217:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    218:                        if (siz == -1) {
                    219:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    220:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     221:                                return 0;
1.2.2.17  misho     222:                        }
1.2       misho     223:                        break;
                    224:                case MQTT_QOS_EXACTLY:
1.2.2.26  misho     225:                        if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
                    226:                                return 0;
1.2.2.17  misho     227:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    228:                        if (siz == -1) {
                    229:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    230:                                                mqtt_GetErrno(), mqtt_GetError());
1.2.2.22  misho     231:                                return 0;
1.2.2.17  misho     232:                        }
1.2       misho     233:                        break;
1.2.2.17  misho     234:                case MQTT_QOS_ONCE:
1.2.2.23  misho     235:                        pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2       misho     236:                default:
1.2.2.22  misho     237:                        return 0;
1.2       misho     238:        }
                    239: 
1.2.2.22  misho     240:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    241:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    242:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2       misho     243:        return 0;
                    244: }
1.2.2.1   misho     245: 
                    246: int
1.2.2.9   misho     247: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     248: {
                    249:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     250:        int siz = 0;
                    251:        u_short mid = 0;
1.2.2.26  misho     252:        ait_val_t *p = NULL;
1.2.2.1   misho     253: 
                    254:        ioTRACE(2);
                    255: 
                    256:        if (!sess)
                    257:                return -1;
                    258: 
1.2.2.17  misho     259:        ioDEBUG(5, "Exec PUBREL session");
                    260:        mid = mqtt_readPUBREL(sess->sess_buf);
                    261:        if (mid == (u_short) -1) {
                    262:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    263:                return 0;
                    264:        }
                    265: 
1.2.2.29  misho     266:        /* delete not retain message */
                    267:        call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user, 
                    268:                        sess->sess_addr, 0);
1.2.2.17  misho     269: 
                    270:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    271:        if (siz == -1) {
                    272:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    273:                return 0;
                    274:        }
1.2.2.1   misho     275: 
1.2.2.29  misho     276:        p = mkPkt(sess->sess_buf->msg_base, siz);
                    277:        memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
1.2.2.22  misho     278:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     279:        return 0;
                    280: }
                    281: 
                    282: int
1.2.2.9   misho     283: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     284: {
                    285:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     286:        mqtt_subscr_t *subs = NULL;
                    287:        int siz = 0;
                    288:        u_short mid = 0;
                    289:        register int i;
                    290:        struct tagStore *store;
1.2.2.17  misho     291:        char buf[BUFSIZ];
                    292:        void *ptr;
1.2.2.26  misho     293:        ait_val_t *p = NULL;
1.2.2.1   misho     294: 
                    295:        ioTRACE(2);
                    296: 
                    297:        if (!sess)
                    298:                return -1;
                    299: 
1.2.2.7   misho     300:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    301:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    302:        if (siz == -1) {
                    303:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    304:                return 0;
                    305:        }
                    306: 
                    307:        /* add to db */
1.2.2.25  misho     308:        for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.17  misho     309:                /* convert topic to sql search statement */
                    310:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    311:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.25  misho     312:                        continue;
1.2.2.17  misho     313:                }
                    314:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     315:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     316:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     317:                        if (!store) {
                    318:                                ioSYSERR(0);
1.2.2.25  misho     319:                                continue;
1.2.2.7   misho     320:                        } else {
                    321:                                store->st_msgid = mid;
1.2.2.8   misho     322:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     323:                        }
                    324: 
                    325:                        /* add to cache */
                    326:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    327: 
1.2.2.17  misho     328:                        /* convert topic to regexp */
1.2.2.24  misho     329:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.2.2.17  misho     330:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.24  misho     331: 
                    332:                                subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17  misho     333:                        } else {
                    334:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
1.2.2.25  misho     335:                                if (!ptr)
1.2.2.17  misho     336:                                        ioSYSERR(0);
1.2.2.25  misho     337:                                else {
1.2.2.17  misho     338:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    339:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    340:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    341:                                                        store->st_subscr.sub_topic.msg_len);
                    342:                                }
                    343: 
1.2.2.24  misho     344:                                call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    345:                                                store->st_subscr.sub_topic.msg_base, 
                    346:                                                store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.17  misho     347: 
1.2.2.24  misho     348:                                subs[i].sub_ret = MQTT_QOS_PASS;
                    349:                        }
1.2.2.25  misho     350:                }
1.2.2.7   misho     351:        }
1.2.2.1   misho     352: 
1.2.2.7   misho     353:        /* send acknowledge */
                    354:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    355:        if (siz == -1) {
                    356:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    357:                goto end;
1.2.2.21  misho     358:        } else {
                    359:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     360:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    361:        }
1.2.2.21  misho     362: 
                    363:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     364: end:
                    365:        mqtt_subFree(&subs);
1.2.2.1   misho     366:        return 0;
                    367: }
                    368: 
                    369: int
1.2.2.9   misho     370: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     371: {
                    372:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     373:        mqtt_subscr_t *subs = NULL;
                    374:        int siz = 0;
                    375:        u_short mid = 0;
                    376:        register int i;
                    377:        struct tagStore *store, *tmp;
1.2.2.26  misho     378:        ait_val_t *p = NULL;
1.2.2.1   misho     379: 
                    380:        ioTRACE(2);
                    381: 
                    382:        if (!sess)
                    383:                return -1;
                    384: 
1.2.2.13  misho     385:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    386:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    387:        if (siz == -1) {
                    388:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    389:                return 0;
                    390:        }
                    391: 
                    392:        /* del from db */
                    393:        for (i = 0; i < siz; i++) {
                    394:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    395:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    396:                                        store->st_subscr.sub_topic.msg_base && 
                    397:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    398:                                                subs[i].sub_topic.msg_base)) {
                    399:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     400: 
                    401:                                if (store->st_subscr.sub_topic.msg_base)
                    402:                                        free(store->st_subscr.sub_topic.msg_base);
                    403:                                if (store->st_subscr.sub_value.msg_base)
                    404:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     405:                                io_free(store);
1.2.2.13  misho     406:                        }
                    407:                }
                    408: 
1.2.2.15  misho     409:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    410:                                sess->sess_user, "%");
1.2.2.13  misho     411:        }
                    412: 
                    413:        /* send acknowledge */
                    414:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    415:        if (siz == -1) {
                    416:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    417:                goto end;
1.2.2.21  misho     418:        } else {
                    419:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     420:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    421:        }
1.2.2.21  misho     422: 
                    423:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     424: end:
                    425:        mqtt_subFree(&subs);
1.2.2.1   misho     426:        return 0;
                    427: }
                    428: 
                    429: int
1.2.2.9   misho     430: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     431: {
                    432:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     433:        int siz = 0;
1.2.2.26  misho     434:        ait_val_t *p = NULL;
1.2.2.1   misho     435: 
                    436:        ioTRACE(2);
                    437: 
                    438:        if (!sess)
                    439:                return -1;
                    440: 
1.2.2.7   misho     441:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     442:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    443:        if (siz == -1) {
                    444:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    445:                return 0;
1.2.2.8   misho     446:        } else {
1.2.2.19  misho     447:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     448:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    449:        }
1.2.2.1   misho     450: 
1.2.2.19  misho     451:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     452:        return 0;
                    453: }
                    454: 
                    455: int
1.2.2.9   misho     456: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     457: {
                    458:        struct tagStore *store;
                    459:        struct tagSession *sess = (struct tagSession*) arg;
                    460: 
                    461:        ioTRACE(2);
                    462: 
                    463:        if (!sess)
                    464:                return -1;
                    465: 
1.2.2.6   misho     466:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     467:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    468: 
1.2.2.16  misho     469:        if (sess->sess_clean) {
                    470:                if (call.FiniSessPUB)
                    471:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    472:                if (call.DeletePUB_subscribe)
                    473:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    474:                if (call.WipePUB_topic)
                    475:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    476:        }
1.2.2.1   misho     477: 
1.2.2.6   misho     478:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    479:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     480: 
1.2.2.6   misho     481:                if (store->st_subscr.sub_topic.msg_base)
                    482:                        free(store->st_subscr.sub_topic.msg_base);
                    483:                if (store->st_subscr.sub_value.msg_base)
                    484:                        free(store->st_subscr.sub_value.msg_base);
                    485: 
1.2.2.18  misho     486:                io_free(store);
1.2.2.6   misho     487:        }
1.2.2.1   misho     488: 
                    489:        if (sess->sess_will.msg)
                    490:                free(sess->sess_will.msg);
                    491:        if (sess->sess_will.topic)
                    492:                free(sess->sess_will.topic);
                    493: 
                    494:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    495:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     496: 
1.2.2.12  misho     497:        return -3;      /* reconnect client */
1.2.2.1   misho     498: }
                    499: 
                    500: int
1.2.2.9   misho     501: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     502: {
                    503:        struct tagSession *sess = (struct tagSession*) arg;
                    504: 
                    505:        ioTRACE(2);
                    506: 
                    507:        if (!sess)
                    508:                return -1;
                    509: 
1.2.2.5   misho     510:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     511: 
1.2.2.1   misho     512:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    513:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     514: 
1.2.2.10  misho     515:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     516: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>