Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.19

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.19! misho       7: static inline struct tagPkt *
        !             8: mkPkt(void * __restrict data, int dlen)
        !             9: {
        !            10:        struct tagPkt *p = NULL;
        !            11: 
        !            12:        p = io_malloc(sizeof(struct tagPkt));
        !            13:        if (!p) {
        !            14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
        !            15:                return NULL;
        !            16:        } else
        !            17:                memset(p, 0, sizeof(struct tagPkt));
        !            18: 
        !            19:        p->pkt_data = io_allocVar();
        !            20:        if (!p->pkt_data) {
        !            21:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
        !            22:                io_free(p);
        !            23:                return NULL;
        !            24:        }
        !            25: 
        !            26:        if (data && dlen > 0)
        !            27:                AIT_SET_BUF(p->pkt_data, data, dlen);
        !            28: 
        !            29:        return p;
        !            30: }
        !            31: 
        !            32: static inline void
        !            33: freePkt(struct tagPkt ** __restrict p)
        !            34: {
        !            35:        if (!p || !*p)
        !            36:                return;
        !            37: 
        !            38:        io_freeVar(&(*p)->pkt_data);
        !            39:        io_free(*p);
        !            40:        *p = NULL;
        !            41: }
        !            42: 
        !            43: static void *
        !            44: sendPacket(sched_task_t *task)
        !            45: {
        !            46:        struct tagPkt *p = TASK_ARG(task);
        !            47:        register int n, slen;
        !            48:        u_char *pos;
        !            49: 
        !            50:        if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
        !            51:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
        !            52:                return NULL;
        !            53:        }
        !            54: 
        !            55:        for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0; 
        !            56:                        slen -= n, pos += n) {
        !            57:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
        !            58:                if (n == -1) {
        !            59:                        ioSYSERR(0);
        !            60:                        break;
        !            61:                }
        !            62:        }
        !            63: 
        !            64:        return NULL;
        !            65: }
        !            66: 
        !            67: 
1.2.2.17  misho      68: static int
                     69: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                     70:                int topicLen, char * __restrict data, int datlen)
                     71: {
                     72:        return 0;
                     73: }
                     74: 
                     75: static int
                     76: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                     77:                int topicLen, char * __restrict data, int datlen)
                     78: {
                     79:        return 0;
                     80: }
                     81: 
                     82: static int
                     83: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                     84:                int topicLen, char * __restrict data, int datlen)
                     85: {
                     86:        return 0;
                     87: }
                     88: 
                     89: 
1.2       misho      90: int
1.2.2.9   misho      91: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho      92: {
                     93:        struct mqtthdr *hdr;
1.2.2.1   misho      94:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho      95:        void *data = NULL;
                     96:        char szTopic[STRSIZ] = { 0 };
                     97:        int siz = 0;
                     98:        u_short mid = 0;
1.2       misho      99: 
                    100:        ioTRACE(2);
                    101: 
                    102:        if (!sess)
                    103:                return -1;
                    104: 
1.2.2.17  misho     105:        ioDEBUG(5, "Exec PUBLISH session");
                    106:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
                    107:        if (siz == -1) {
                    108:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    109:                return 0;
                    110:        }
                    111: 
1.2       misho     112:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    113:        switch (hdr->mqtt_msg.qos) {
                    114:                case MQTT_QOS_ACK:
1.2.2.17  misho     115:                        pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
                    116:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    117:                        if (siz == -1) {
                    118:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    119:                                                mqtt_GetErrno(), mqtt_GetError());
                    120:                                goto end;
                    121:                        }
1.2       misho     122:                        break;
                    123:                case MQTT_QOS_EXACTLY:
1.2.2.17  misho     124:                        pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
                    125:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    126:                        if (siz == -1) {
                    127:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    128:                                                mqtt_GetErrno(), mqtt_GetError());
                    129:                                goto end;
                    130:                        }
1.2       misho     131:                        break;
1.2.2.17  misho     132:                case MQTT_QOS_ONCE:
                    133:                        pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
1.2       misho     134:                default:
1.2.2.17  misho     135:                        goto end;
1.2       misho     136:        }
                    137: 
1.2.2.17  misho     138:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    139:                ioSYSERR(0);
                    140:        else {
                    141:                ioDEBUG(5, "Sended %d bytes.", siz);
                    142:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    143:        }
                    144: end:
                    145:        if (data)
                    146:                free(data);
1.2       misho     147:        return 0;
                    148: }
1.2.2.1   misho     149: 
                    150: int
1.2.2.9   misho     151: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     152: {
                    153:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     154:        int siz = 0;
                    155:        u_short mid = 0;
1.2.2.1   misho     156: 
                    157:        ioTRACE(2);
                    158: 
                    159:        if (!sess)
                    160:                return -1;
                    161: 
1.2.2.17  misho     162:        ioDEBUG(5, "Exec PUBREL session");
                    163:        mid = mqtt_readPUBREL(sess->sess_buf);
                    164:        if (mid == (u_short) -1) {
                    165:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    166:                return 0;
                    167:        }
                    168: 
                    169:        // TODO:: Delete from database topic
                    170: 
                    171:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    172:        if (siz == -1) {
                    173:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    174:                return 0;
                    175:        }
                    176:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    177:                ioSYSERR(0);
                    178:        else {
                    179:                ioDEBUG(5, "Sended %d bytes.", siz);
                    180:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    181:        }
1.2.2.1   misho     182: 
                    183:        return 0;
                    184: }
                    185: 
                    186: int
1.2.2.9   misho     187: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     188: {
                    189:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     190:        mqtt_subscr_t *subs = NULL;
                    191:        int siz = 0;
                    192:        u_short mid = 0;
                    193:        register int i;
                    194:        struct tagStore *store;
1.2.2.17  misho     195:        char buf[BUFSIZ];
                    196:        void *ptr;
1.2.2.1   misho     197: 
                    198:        ioTRACE(2);
                    199: 
                    200:        if (!sess)
                    201:                return -1;
                    202: 
1.2.2.7   misho     203:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    204:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    205:        if (siz == -1) {
                    206:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    207:                return 0;
                    208:        }
                    209: 
                    210:        /* add to db */
                    211:        for (i = 0; i < siz; i++) {
1.2.2.17  misho     212:                /* convert topic to sql search statement */
                    213:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    214:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    215:                        goto end;
                    216:                }
                    217:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     218:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     219:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     220:                        if (!store) {
                    221:                                ioSYSERR(0);
                    222:                                goto end;
                    223:                        } else {
                    224:                                store->st_msgid = mid;
1.2.2.8   misho     225:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     226:                        }
                    227: 
                    228:                        /* add to cache */
                    229:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    230: 
1.2.2.17  misho     231:                        /* convert topic to regexp */
                    232:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
                    233:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    234:                                goto end;
                    235:                        } else {
                    236:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    237:                                if (!ptr) {
                    238:                                        ioSYSERR(0);
                    239:                                        goto end;
                    240:                                } else {
                    241:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    242:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    243:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    244:                                                        store->st_subscr.sub_topic.msg_len);
                    245:                                }
                    246:                        }
                    247: 
                    248:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    249:                                        store->st_subscr.sub_topic.msg_base, 
                    250:                                        store->st_subscr.sub_topic.msg_len, sess->sess_addr);
                    251: 
1.2.2.7   misho     252:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    253:                } else
                    254:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    255:        }
1.2.2.1   misho     256: 
1.2.2.7   misho     257:        /* send acknowledge */
                    258:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    259:        if (siz == -1) {
                    260:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    261:                goto end;
                    262:        }
1.2.2.13  misho     263:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
1.2.2.7   misho     264:                ioSYSERR(0);
1.2.2.8   misho     265:        else {
1.2.2.7   misho     266:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     267:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    268:        }
1.2.2.7   misho     269: end:
                    270:        mqtt_subFree(&subs);
1.2.2.1   misho     271:        return 0;
                    272: }
                    273: 
                    274: int
1.2.2.9   misho     275: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     276: {
                    277:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     278:        mqtt_subscr_t *subs = NULL;
                    279:        int siz = 0;
                    280:        u_short mid = 0;
                    281:        register int i;
                    282:        struct tagStore *store, *tmp;
1.2.2.1   misho     283: 
                    284:        ioTRACE(2);
                    285: 
                    286:        if (!sess)
                    287:                return -1;
                    288: 
1.2.2.13  misho     289:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    290:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    291:        if (siz == -1) {
                    292:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    293:                return 0;
                    294:        }
                    295: 
                    296:        /* del from db */
                    297:        for (i = 0; i < siz; i++) {
                    298:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    299:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    300:                                        store->st_subscr.sub_topic.msg_base && 
                    301:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    302:                                                subs[i].sub_topic.msg_base)) {
                    303:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     304: 
                    305:                                if (store->st_subscr.sub_topic.msg_base)
                    306:                                        free(store->st_subscr.sub_topic.msg_base);
                    307:                                if (store->st_subscr.sub_value.msg_base)
                    308:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     309:                                io_free(store);
1.2.2.13  misho     310:                        }
                    311:                }
                    312: 
1.2.2.15  misho     313:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    314:                                sess->sess_user, "%");
1.2.2.13  misho     315:        }
                    316: 
                    317:        /* send acknowledge */
                    318:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    319:        if (siz == -1) {
                    320:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    321:                goto end;
                    322:        }
                    323:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    324:                ioSYSERR(0);
                    325:        else {
                    326:                ioDEBUG(5, "Sended %d bytes.", siz);
                    327:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    328:        }
                    329: end:
                    330:        mqtt_subFree(&subs);
1.2.2.1   misho     331:        return 0;
                    332: }
                    333: 
                    334: int
1.2.2.9   misho     335: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     336: {
                    337:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     338:        int siz = 0;
1.2.2.19! misho     339:        struct tagPkt *p = NULL;
1.2.2.1   misho     340: 
                    341:        ioTRACE(2);
                    342: 
                    343:        if (!sess)
                    344:                return -1;
                    345: 
1.2.2.7   misho     346:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     347:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    348:        if (siz == -1) {
                    349:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    350:                return 0;
1.2.2.8   misho     351:        } else {
1.2.2.19! misho     352:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     353:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    354:        }
1.2.2.1   misho     355: 
1.2.2.19! misho     356:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     357:        return 0;
                    358: }
                    359: 
                    360: int
1.2.2.9   misho     361: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     362: {
                    363:        struct tagStore *store;
1.2.2.19! misho     364:        struct tagPkt *p;
1.2.2.1   misho     365:        struct tagSession *sess = (struct tagSession*) arg;
                    366: 
                    367:        ioTRACE(2);
                    368: 
                    369:        if (!sess)
                    370:                return -1;
                    371: 
1.2.2.6   misho     372:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     373:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    374: 
1.2.2.16  misho     375:        if (sess->sess_clean) {
                    376:                if (call.FiniSessPUB)
                    377:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    378:                if (call.DeletePUB_subscribe)
                    379:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    380:                if (call.WipePUB_topic)
                    381:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    382:        }
1.2.2.1   misho     383: 
1.2.2.6   misho     384:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    385:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     386: 
1.2.2.6   misho     387:                if (store->st_subscr.sub_topic.msg_base)
                    388:                        free(store->st_subscr.sub_topic.msg_base);
                    389:                if (store->st_subscr.sub_value.msg_base)
                    390:                        free(store->st_subscr.sub_value.msg_base);
                    391: 
1.2.2.18  misho     392:                io_free(store);
1.2.2.6   misho     393:        }
1.2.2.1   misho     394: 
1.2.2.19! misho     395:        while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
        !           396:                SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
        !           397: 
        !           398:                io_freeVar(&p->pkt_data);
        !           399:                io_free(p);
        !           400:        }
        !           401: 
1.2.2.1   misho     402:        if (sess->sess_will.msg)
                    403:                free(sess->sess_will.msg);
                    404:        if (sess->sess_will.topic)
                    405:                free(sess->sess_will.topic);
                    406: 
                    407:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    408:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     409: 
1.2.2.12  misho     410:        return -3;      /* reconnect client */
1.2.2.1   misho     411: }
                    412: 
                    413: int
1.2.2.9   misho     414: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     415: {
                    416:        struct tagSession *sess = (struct tagSession*) arg;
                    417: 
                    418:        ioTRACE(2);
                    419: 
                    420:        if (!sess)
                    421:                return -1;
                    422: 
1.2.2.5   misho     423:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     424: 
1.2.2.1   misho     425:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    426:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     427: 
1.2.2.10  misho     428:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     429: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>