Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.21

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.19  misho       7: static inline struct tagPkt *
                      8: mkPkt(void * __restrict data, int dlen)
                      9: {
                     10:        struct tagPkt *p = NULL;
                     11: 
                     12:        p = io_malloc(sizeof(struct tagPkt));
                     13:        if (!p) {
                     14:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     15:                return NULL;
                     16:        } else
                     17:                memset(p, 0, sizeof(struct tagPkt));
                     18: 
                     19:        p->pkt_data = io_allocVar();
                     20:        if (!p->pkt_data) {
                     21:                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
                     22:                io_free(p);
                     23:                return NULL;
                     24:        }
                     25: 
                     26:        if (data && dlen > 0)
                     27:                AIT_SET_BUF(p->pkt_data, data, dlen);
                     28: 
                     29:        return p;
                     30: }
                     31: 
                     32: static inline void
1.2.2.20  misho      33: freePkt(struct tagPkt * __restrict p)
1.2.2.19  misho      34: {
1.2.2.20  misho      35:        if (!p)
1.2.2.19  misho      36:                return;
                     37: 
1.2.2.20  misho      38:        io_freeVar(&p->pkt_data);
                     39:        io_free(p);
1.2.2.19  misho      40: }
                     41: 
                     42: static void *
                     43: sendPacket(sched_task_t *task)
                     44: {
                     45:        struct tagPkt *p = TASK_ARG(task);
                     46:        register int n, slen;
                     47:        u_char *pos;
                     48: 
                     49:        if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
                     50:                ioDEBUG(9, "Error:: invalid packet or found empty content ...");
                     51:                return NULL;
                     52:        }
                     53: 
                     54:        for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0; 
                     55:                        slen -= n, pos += n) {
                     56:                n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                     57:                if (n == -1) {
                     58:                        ioSYSERR(0);
                     59:                        break;
                     60:                }
                     61:        }
                     62: 
1.2.2.20  misho      63:        freePkt(p);
1.2.2.19  misho      64:        return NULL;
                     65: }
                     66: 
                     67: 
1.2.2.17  misho      68: static int
                     69: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                     70:                int topicLen, char * __restrict data, int datlen)
                     71: {
                     72:        return 0;
                     73: }
                     74: 
                     75: static int
                     76: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                     77:                int topicLen, char * __restrict data, int datlen)
                     78: {
                     79:        return 0;
                     80: }
                     81: 
                     82: static int
                     83: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                     84:                int topicLen, char * __restrict data, int datlen)
                     85: {
                     86:        return 0;
                     87: }
                     88: 
                     89: 
1.2       misho      90: int
1.2.2.9   misho      91: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho      92: {
                     93:        struct mqtthdr *hdr;
1.2.2.1   misho      94:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho      95:        void *data = NULL;
                     96:        char szTopic[STRSIZ] = { 0 };
                     97:        int siz = 0;
                     98:        u_short mid = 0;
1.2       misho      99: 
                    100:        ioTRACE(2);
                    101: 
                    102:        if (!sess)
                    103:                return -1;
                    104: 
1.2.2.17  misho     105:        ioDEBUG(5, "Exec PUBLISH session");
                    106:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
                    107:        if (siz == -1) {
                    108:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    109:                return 0;
                    110:        }
                    111: 
1.2       misho     112:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    113:        switch (hdr->mqtt_msg.qos) {
                    114:                case MQTT_QOS_ACK:
1.2.2.17  misho     115:                        pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
                    116:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                    117:                        if (siz == -1) {
                    118:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                    119:                                                mqtt_GetErrno(), mqtt_GetError());
                    120:                                goto end;
                    121:                        }
1.2       misho     122:                        break;
                    123:                case MQTT_QOS_EXACTLY:
1.2.2.17  misho     124:                        pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
                    125:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                    126:                        if (siz == -1) {
                    127:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                    128:                                                mqtt_GetErrno(), mqtt_GetError());
                    129:                                goto end;
                    130:                        }
1.2       misho     131:                        break;
1.2.2.17  misho     132:                case MQTT_QOS_ONCE:
                    133:                        pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
1.2       misho     134:                default:
1.2.2.17  misho     135:                        goto end;
1.2       misho     136:        }
                    137: 
1.2.2.17  misho     138:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    139:                ioSYSERR(0);
                    140:        else {
                    141:                ioDEBUG(5, "Sended %d bytes.", siz);
                    142:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    143:        }
                    144: end:
                    145:        if (data)
                    146:                free(data);
1.2       misho     147:        return 0;
                    148: }
1.2.2.1   misho     149: 
                    150: int
1.2.2.9   misho     151: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho     152: {
                    153:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho     154:        int siz = 0;
                    155:        u_short mid = 0;
1.2.2.1   misho     156: 
                    157:        ioTRACE(2);
                    158: 
                    159:        if (!sess)
                    160:                return -1;
                    161: 
1.2.2.17  misho     162:        ioDEBUG(5, "Exec PUBREL session");
                    163:        mid = mqtt_readPUBREL(sess->sess_buf);
                    164:        if (mid == (u_short) -1) {
                    165:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    166:                return 0;
                    167:        }
                    168: 
                    169:        // TODO:: Delete from database topic
                    170: 
                    171:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    172:        if (siz == -1) {
                    173:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    174:                return 0;
                    175:        }
                    176:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    177:                ioSYSERR(0);
                    178:        else {
                    179:                ioDEBUG(5, "Sended %d bytes.", siz);
                    180:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    181:        }
1.2.2.1   misho     182: 
                    183:        return 0;
                    184: }
                    185: 
                    186: int
1.2.2.9   misho     187: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     188: {
                    189:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     190:        mqtt_subscr_t *subs = NULL;
                    191:        int siz = 0;
                    192:        u_short mid = 0;
                    193:        register int i;
                    194:        struct tagStore *store;
1.2.2.17  misho     195:        char buf[BUFSIZ];
                    196:        void *ptr;
1.2.2.21! misho     197:        struct tagPkt *p = NULL;
1.2.2.1   misho     198: 
                    199:        ioTRACE(2);
                    200: 
                    201:        if (!sess)
                    202:                return -1;
                    203: 
1.2.2.7   misho     204:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    205:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    206:        if (siz == -1) {
                    207:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    208:                return 0;
                    209:        }
                    210: 
                    211:        /* add to db */
                    212:        for (i = 0; i < siz; i++) {
1.2.2.17  misho     213:                /* convert topic to sql search statement */
                    214:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    215:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    216:                        goto end;
                    217:                }
                    218:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     219:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18  misho     220:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     221:                        if (!store) {
                    222:                                ioSYSERR(0);
                    223:                                goto end;
                    224:                        } else {
                    225:                                store->st_msgid = mid;
1.2.2.8   misho     226:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     227:                        }
                    228: 
                    229:                        /* add to cache */
                    230:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    231: 
1.2.2.17  misho     232:                        /* convert topic to regexp */
                    233:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
                    234:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    235:                                goto end;
                    236:                        } else {
                    237:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    238:                                if (!ptr) {
                    239:                                        ioSYSERR(0);
                    240:                                        goto end;
                    241:                                } else {
                    242:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    243:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    244:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    245:                                                        store->st_subscr.sub_topic.msg_len);
                    246:                                }
                    247:                        }
                    248: 
                    249:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    250:                                        store->st_subscr.sub_topic.msg_base, 
                    251:                                        store->st_subscr.sub_topic.msg_len, sess->sess_addr);
                    252: 
1.2.2.7   misho     253:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    254:                } else
                    255:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    256:        }
1.2.2.1   misho     257: 
1.2.2.7   misho     258:        /* send acknowledge */
                    259:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    260:        if (siz == -1) {
                    261:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    262:                goto end;
1.2.2.21! misho     263:        } else {
        !           264:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     265:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    266:        }
1.2.2.21! misho     267: 
        !           268:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7   misho     269: end:
                    270:        mqtt_subFree(&subs);
1.2.2.1   misho     271:        return 0;
                    272: }
                    273: 
                    274: int
1.2.2.9   misho     275: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     276: {
                    277:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     278:        mqtt_subscr_t *subs = NULL;
                    279:        int siz = 0;
                    280:        u_short mid = 0;
                    281:        register int i;
                    282:        struct tagStore *store, *tmp;
1.2.2.21! misho     283:        struct tagPkt *p = NULL;
1.2.2.1   misho     284: 
                    285:        ioTRACE(2);
                    286: 
                    287:        if (!sess)
                    288:                return -1;
                    289: 
1.2.2.13  misho     290:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    291:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    292:        if (siz == -1) {
                    293:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    294:                return 0;
                    295:        }
                    296: 
                    297:        /* del from db */
                    298:        for (i = 0; i < siz; i++) {
                    299:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    300:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    301:                                        store->st_subscr.sub_topic.msg_base && 
                    302:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    303:                                                subs[i].sub_topic.msg_base)) {
                    304:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     305: 
                    306:                                if (store->st_subscr.sub_topic.msg_base)
                    307:                                        free(store->st_subscr.sub_topic.msg_base);
                    308:                                if (store->st_subscr.sub_value.msg_base)
                    309:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18  misho     310:                                io_free(store);
1.2.2.13  misho     311:                        }
                    312:                }
                    313: 
1.2.2.15  misho     314:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    315:                                sess->sess_user, "%");
1.2.2.13  misho     316:        }
                    317: 
                    318:        /* send acknowledge */
                    319:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    320:        if (siz == -1) {
                    321:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    322:                goto end;
1.2.2.21! misho     323:        } else {
        !           324:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13  misho     325:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    326:        }
1.2.2.21! misho     327: 
        !           328:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13  misho     329: end:
                    330:        mqtt_subFree(&subs);
1.2.2.1   misho     331:        return 0;
                    332: }
                    333: 
                    334: int
1.2.2.9   misho     335: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     336: {
                    337:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     338:        int siz = 0;
1.2.2.19  misho     339:        struct tagPkt *p = NULL;
1.2.2.1   misho     340: 
                    341:        ioTRACE(2);
                    342: 
                    343:        if (!sess)
                    344:                return -1;
                    345: 
1.2.2.7   misho     346:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     347:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    348:        if (siz == -1) {
                    349:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    350:                return 0;
1.2.2.8   misho     351:        } else {
1.2.2.19  misho     352:                p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8   misho     353:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    354:        }
1.2.2.1   misho     355: 
1.2.2.19  misho     356:        schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1   misho     357:        return 0;
                    358: }
                    359: 
                    360: int
1.2.2.9   misho     361: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     362: {
                    363:        struct tagStore *store;
1.2.2.19  misho     364:        struct tagPkt *p;
1.2.2.1   misho     365:        struct tagSession *sess = (struct tagSession*) arg;
                    366: 
                    367:        ioTRACE(2);
                    368: 
                    369:        if (!sess)
                    370:                return -1;
                    371: 
1.2.2.6   misho     372:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     373:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    374: 
1.2.2.16  misho     375:        if (sess->sess_clean) {
                    376:                if (call.FiniSessPUB)
                    377:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    378:                if (call.DeletePUB_subscribe)
                    379:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    380:                if (call.WipePUB_topic)
                    381:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    382:        }
1.2.2.1   misho     383: 
1.2.2.6   misho     384:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    385:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     386: 
1.2.2.6   misho     387:                if (store->st_subscr.sub_topic.msg_base)
                    388:                        free(store->st_subscr.sub_topic.msg_base);
                    389:                if (store->st_subscr.sub_value.msg_base)
                    390:                        free(store->st_subscr.sub_value.msg_base);
                    391: 
1.2.2.18  misho     392:                io_free(store);
1.2.2.6   misho     393:        }
1.2.2.1   misho     394: 
1.2.2.19  misho     395:        while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
                    396:                SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
                    397: 
                    398:                io_freeVar(&p->pkt_data);
                    399:                io_free(p);
                    400:        }
                    401: 
1.2.2.1   misho     402:        if (sess->sess_will.msg)
                    403:                free(sess->sess_will.msg);
                    404:        if (sess->sess_will.topic)
                    405:                free(sess->sess_will.topic);
                    406: 
                    407:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    408:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     409: 
1.2.2.12  misho     410:        return -3;      /* reconnect client */
1.2.2.1   misho     411: }
                    412: 
                    413: int
1.2.2.9   misho     414: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     415: {
                    416:        struct tagSession *sess = (struct tagSession*) arg;
                    417: 
                    418:        ioTRACE(2);
                    419: 
                    420:        if (!sess)
                    421:                return -1;
                    422: 
1.2.2.5   misho     423:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     424: 
1.2.2.1   misho     425:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    426:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     427: 
1.2.2.10  misho     428:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     429: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>