Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.18

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
1.2.2.17  misho       7: static int
                      8: pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                      9:                int topicLen, char * __restrict data, int datlen)
                     10: {
                     11:        return 0;
                     12: }
                     13: 
                     14: static int
                     15: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                     16:                int topicLen, char * __restrict data, int datlen)
                     17: {
                     18:        return 0;
                     19: }
                     20: 
                     21: static int
                     22: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, 
                     23:                int topicLen, char * __restrict data, int datlen)
                     24: {
                     25:        return 0;
                     26: }
                     27: 
                     28: 
1.2       misho      29: int
1.2.2.9   misho      30: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho      31: {
                     32:        struct mqtthdr *hdr;
1.2.2.1   misho      33:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho      34:        void *data = NULL;
                     35:        char szTopic[STRSIZ] = { 0 };
                     36:        int siz = 0;
                     37:        u_short mid = 0;
1.2       misho      38: 
                     39:        ioTRACE(2);
                     40: 
                     41:        if (!sess)
                     42:                return -1;
                     43: 
1.2.2.17  misho      44:        ioDEBUG(5, "Exec PUBLISH session");
                     45:        siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, &data);
                     46:        if (siz == -1) {
                     47:                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                     48:                return 0;
                     49:        }
                     50: 
1.2       misho      51:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     52:        switch (hdr->mqtt_msg.qos) {
                     53:                case MQTT_QOS_ACK:
1.2.2.17  misho      54:                        pubAck(sess, mid, szTopic, sizeof szTopic, data, siz);
                     55:                        siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                     56:                        if (siz == -1) {
                     57:                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 
                     58:                                                mqtt_GetErrno(), mqtt_GetError());
                     59:                                goto end;
                     60:                        }
1.2       misho      61:                        break;
                     62:                case MQTT_QOS_EXACTLY:
1.2.2.17  misho      63:                        pubExactly(sess, mid, szTopic, sizeof szTopic, data, siz);
                     64:                        siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                     65:                        if (siz == -1) {
                     66:                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 
                     67:                                                mqtt_GetErrno(), mqtt_GetError());
                     68:                                goto end;
                     69:                        }
1.2       misho      70:                        break;
1.2.2.17  misho      71:                case MQTT_QOS_ONCE:
                     72:                        pubOnce(sess, mid, szTopic, sizeof szTopic, data, siz);
1.2       misho      73:                default:
1.2.2.17  misho      74:                        goto end;
1.2       misho      75:        }
                     76: 
1.2.2.17  misho      77:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                     78:                ioSYSERR(0);
                     79:        else {
                     80:                ioDEBUG(5, "Sended %d bytes.", siz);
                     81:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                     82:        }
                     83: end:
                     84:        if (data)
                     85:                free(data);
1.2       misho      86:        return 0;
                     87: }
1.2.2.1   misho      88: 
                     89: int
1.2.2.9   misho      90: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho      91: {
                     92:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17  misho      93:        int siz = 0;
                     94:        u_short mid = 0;
1.2.2.1   misho      95: 
                     96:        ioTRACE(2);
                     97: 
                     98:        if (!sess)
                     99:                return -1;
                    100: 
1.2.2.17  misho     101:        ioDEBUG(5, "Exec PUBREL session");
                    102:        mid = mqtt_readPUBREL(sess->sess_buf);
                    103:        if (mid == (u_short) -1) {
                    104:                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    105:                return 0;
                    106:        }
                    107: 
                    108:        // TODO:: Delete from database topic
                    109: 
                    110:        siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
                    111:        if (siz == -1) {
                    112:                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    113:                return 0;
                    114:        }
                    115:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    116:                ioSYSERR(0);
                    117:        else {
                    118:                ioDEBUG(5, "Sended %d bytes.", siz);
                    119:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    120:        }
1.2.2.1   misho     121: 
                    122:        return 0;
                    123: }
                    124: 
                    125: int
1.2.2.9   misho     126: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     127: {
                    128:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho     129:        mqtt_subscr_t *subs = NULL;
                    130:        int siz = 0;
                    131:        u_short mid = 0;
                    132:        register int i;
                    133:        struct tagStore *store;
1.2.2.17  misho     134:        char buf[BUFSIZ];
                    135:        void *ptr;
1.2.2.1   misho     136: 
                    137:        ioTRACE(2);
                    138: 
                    139:        if (!sess)
                    140:                return -1;
                    141: 
1.2.2.7   misho     142:        ioDEBUG(5, "Exec SUBSCRIBE session");
                    143:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    144:        if (siz == -1) {
                    145:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    146:                return 0;
                    147:        }
                    148: 
                    149:        /* add to db */
                    150:        for (i = 0; i < siz; i++) {
1.2.2.17  misho     151:                /* convert topic to sql search statement */
                    152:                if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
                    153:                        ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    154:                        goto end;
                    155:                }
                    156:                if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 
1.2.2.8   misho     157:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18! misho     158:                        store = io_malloc(sizeof(struct tagStore));
1.2.2.7   misho     159:                        if (!store) {
                    160:                                ioSYSERR(0);
                    161:                                goto end;
                    162:                        } else {
                    163:                                store->st_msgid = mid;
1.2.2.8   misho     164:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho     165:                        }
                    166: 
                    167:                        /* add to cache */
                    168:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                    169: 
1.2.2.17  misho     170:                        /* convert topic to regexp */
                    171:                        if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
                    172:                                ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    173:                                goto end;
                    174:                        } else {
                    175:                                ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                    176:                                if (!ptr) {
                    177:                                        ioSYSERR(0);
                    178:                                        goto end;
                    179:                                } else {
                    180:                                        store->st_subscr.sub_topic.msg_base = ptr;
                    181:                                        store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
                    182:                                        memcpy(store->st_subscr.sub_topic.msg_base, buf, 
                    183:                                                        store->st_subscr.sub_topic.msg_len);
                    184:                                }
                    185:                        }
                    186: 
                    187:                        call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 
                    188:                                        store->st_subscr.sub_topic.msg_base, 
                    189:                                        store->st_subscr.sub_topic.msg_len, sess->sess_addr);
                    190: 
1.2.2.7   misho     191:                        subs[i].sub_ret = MQTT_QOS_PASS;
                    192:                } else
                    193:                        subs[i].sub_ret = MQTT_QOS_DENY;
                    194:        }
1.2.2.1   misho     195: 
1.2.2.7   misho     196:        /* send acknowledge */
                    197:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                    198:        if (siz == -1) {
                    199:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    200:                goto end;
                    201:        }
1.2.2.13  misho     202:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
1.2.2.7   misho     203:                ioSYSERR(0);
1.2.2.8   misho     204:        else {
1.2.2.7   misho     205:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     206:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    207:        }
1.2.2.7   misho     208: end:
                    209:        mqtt_subFree(&subs);
1.2.2.1   misho     210:        return 0;
                    211: }
                    212: 
                    213: int
1.2.2.9   misho     214: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     215: {
                    216:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     217:        mqtt_subscr_t *subs = NULL;
                    218:        int siz = 0;
                    219:        u_short mid = 0;
                    220:        register int i;
                    221:        struct tagStore *store, *tmp;
1.2.2.1   misho     222: 
                    223:        ioTRACE(2);
                    224: 
                    225:        if (!sess)
                    226:                return -1;
                    227: 
1.2.2.13  misho     228:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    229:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    230:        if (siz == -1) {
                    231:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    232:                return 0;
                    233:        }
                    234: 
                    235:        /* del from db */
                    236:        for (i = 0; i < siz; i++) {
                    237:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    238:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    239:                                        store->st_subscr.sub_topic.msg_base && 
                    240:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    241:                                                subs[i].sub_topic.msg_base)) {
                    242:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14  misho     243: 
                    244:                                if (store->st_subscr.sub_topic.msg_base)
                    245:                                        free(store->st_subscr.sub_topic.msg_base);
                    246:                                if (store->st_subscr.sub_value.msg_base)
                    247:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.18! misho     248:                                io_free(store);
1.2.2.13  misho     249:                        }
                    250:                }
                    251: 
1.2.2.15  misho     252:                call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
                    253:                                sess->sess_user, "%");
1.2.2.13  misho     254:        }
                    255: 
                    256:        /* send acknowledge */
                    257:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    258:        if (siz == -1) {
                    259:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    260:                goto end;
                    261:        }
                    262:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    263:                ioSYSERR(0);
                    264:        else {
                    265:                ioDEBUG(5, "Sended %d bytes.", siz);
                    266:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    267:        }
                    268: end:
                    269:        mqtt_subFree(&subs);
1.2.2.1   misho     270:        return 0;
                    271: }
                    272: 
                    273: int
1.2.2.9   misho     274: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     275: {
                    276:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     277:        int siz = 0;
1.2.2.1   misho     278: 
                    279:        ioTRACE(2);
                    280: 
                    281:        if (!sess)
                    282:                return -1;
                    283: 
1.2.2.7   misho     284:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     285:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    286:        if (siz == -1) {
                    287:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    288:                return 0;
                    289:        }
1.2.2.13  misho     290:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
1.2.2.2   misho     291:                ioSYSERR(0);
                    292:                return 0;
1.2.2.8   misho     293:        } else {
1.2.2.2   misho     294:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     295:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    296:        }
1.2.2.1   misho     297: 
                    298:        return 0;
                    299: }
                    300: 
                    301: int
1.2.2.9   misho     302: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     303: {
                    304:        struct tagStore *store;
                    305:        struct tagSession *sess = (struct tagSession*) arg;
                    306: 
                    307:        ioTRACE(2);
                    308: 
                    309:        if (!sess)
                    310:                return -1;
                    311: 
1.2.2.6   misho     312:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     313:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    314: 
1.2.2.16  misho     315:        if (sess->sess_clean) {
                    316:                if (call.FiniSessPUB)
                    317:                        call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    318:                if (call.DeletePUB_subscribe)
                    319:                        call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
                    320:                if (call.WipePUB_topic)
                    321:                        call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
                    322:        }
1.2.2.1   misho     323: 
1.2.2.6   misho     324:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    325:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     326: 
1.2.2.6   misho     327:                if (store->st_subscr.sub_topic.msg_base)
                    328:                        free(store->st_subscr.sub_topic.msg_base);
                    329:                if (store->st_subscr.sub_value.msg_base)
                    330:                        free(store->st_subscr.sub_value.msg_base);
                    331: 
1.2.2.18! misho     332:                io_free(store);
1.2.2.6   misho     333:        }
1.2.2.1   misho     334: 
                    335:        if (sess->sess_will.msg)
                    336:                free(sess->sess_will.msg);
                    337:        if (sess->sess_will.topic)
                    338:                free(sess->sess_will.topic);
                    339: 
                    340:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    341:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     342: 
1.2.2.12  misho     343:        return -3;      /* reconnect client */
1.2.2.1   misho     344: }
                    345: 
                    346: int
1.2.2.9   misho     347: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     348: {
                    349:        struct tagSession *sess = (struct tagSession*) arg;
                    350: 
                    351:        ioTRACE(2);
                    352: 
                    353:        if (!sess)
                    354:                return -1;
                    355: 
1.2.2.5   misho     356:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     357: 
1.2.2.1   misho     358:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    359:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     360: 
1.2.2.10  misho     361:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     362: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>