Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.13

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
                      7: int
1.2.2.9   misho       8: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho       9: {
                     10:        struct mqtthdr *hdr;
1.2.2.1   misho      11:        struct tagSession *sess = (struct tagSession*) arg;
1.2       misho      12: 
                     13:        ioTRACE(2);
                     14: 
                     15:        if (!sess)
                     16:                return -1;
                     17: 
                     18:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     19:        switch (hdr->mqtt_msg.qos) {
                     20:                case MQTT_QOS_ONCE:
                     21:                        break;
                     22:                case MQTT_QOS_ACK:
                     23:                        break;
                     24:                case MQTT_QOS_EXACTLY:
                     25:                        break;
                     26:                default:
                     27:                        ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request", 
                     28:                                        hdr->mqtt_msg.qos);
                     29:                        return 0;
                     30:        }
                     31: 
                     32:        return 0;
                     33: }
1.2.2.1   misho      34: 
                     35: int
1.2.2.9   misho      36: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho      37: {
                     38:        struct mqtthdr *hdr;
                     39:        struct tagSession *sess = (struct tagSession*) arg;
                     40: 
                     41:        ioTRACE(2);
                     42: 
                     43:        if (!sess)
                     44:                return -1;
                     45: 
                     46:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     47: 
                     48:        return 0;
                     49: }
                     50: 
                     51: int
1.2.2.9   misho      52: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho      53: {
                     54:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho      55:        mqtt_subscr_t *subs = NULL;
                     56:        int siz = 0;
                     57:        u_short mid = 0;
                     58:        register int i;
                     59:        struct tagStore *store;
1.2.2.1   misho      60: 
                     61:        ioTRACE(2);
                     62: 
                     63:        if (!sess)
                     64:                return -1;
                     65: 
1.2.2.7   misho      66:        ioDEBUG(5, "Exec SUBSCRIBE session");
                     67:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                     68:        if (siz == -1) {
                     69:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                     70:                return 0;
                     71:        }
                     72: 
                     73:        /* add to db */
                     74:        for (i = 0; i < siz; i++) {
1.2.2.8   misho      75:                if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base, 
                     76:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.7   misho      77:                        store = malloc(sizeof(struct tagStore));
                     78:                        if (!store) {
                     79:                                ioSYSERR(0);
                     80:                                goto end;
                     81:                        } else {
                     82:                                store->st_msgid = mid;
1.2.2.8   misho      83:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho      84:                        }
                     85: 
                     86:                        /* add to cache */
                     87:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                     88: 
                     89:                        subs[i].sub_ret = MQTT_QOS_PASS;
                     90:                } else
                     91:                        subs[i].sub_ret = MQTT_QOS_DENY;
                     92:        }
1.2.2.1   misho      93: 
1.2.2.7   misho      94:        /* send acknowledge */
                     95:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                     96:        if (siz == -1) {
                     97:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                     98:                goto end;
                     99:        }
1.2.2.13! misho     100:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
1.2.2.7   misho     101:                ioSYSERR(0);
1.2.2.8   misho     102:        else {
1.2.2.7   misho     103:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     104:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    105:        }
1.2.2.7   misho     106: end:
                    107:        mqtt_subFree(&subs);
1.2.2.1   misho     108:        return 0;
                    109: }
                    110: 
                    111: int
1.2.2.9   misho     112: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     113: {
                    114:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13! misho     115:        mqtt_subscr_t *subs = NULL;
        !           116:        int siz = 0;
        !           117:        u_short mid = 0;
        !           118:        register int i;
        !           119:        struct tagStore *store, *tmp;
1.2.2.1   misho     120: 
                    121:        ioTRACE(2);
                    122: 
                    123:        if (!sess)
                    124:                return -1;
                    125: 
1.2.2.13! misho     126:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
        !           127:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
        !           128:        if (siz == -1) {
        !           129:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           130:                return 0;
        !           131:        }
        !           132: 
        !           133:        /* del from db */
        !           134:        for (i = 0; i < siz; i++) {
        !           135:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
        !           136:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
        !           137:                                        store->st_subscr.sub_topic.msg_base && 
        !           138:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
        !           139:                                                subs[i].sub_topic.msg_base)) {
        !           140:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
        !           141:                                free(store);
        !           142:                        }
        !           143:                }
        !           144: 
        !           145:                call.DeletePUB_subscribe(&cfg, pub, subs[i].sub_topic.msg_base, 
        !           146:                                sess->sess_user, sess->sess_addr);
        !           147:        }
        !           148: 
        !           149:        /* send acknowledge */
        !           150:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
        !           151:        if (siz == -1) {
        !           152:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           153:                goto end;
        !           154:        }
        !           155:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
        !           156:                ioSYSERR(0);
        !           157:        else {
        !           158:                ioDEBUG(5, "Sended %d bytes.", siz);
        !           159:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
        !           160:        }
        !           161: end:
        !           162:        mqtt_subFree(&subs);
1.2.2.1   misho     163:        return 0;
                    164: }
                    165: 
                    166: int
1.2.2.9   misho     167: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     168: {
                    169:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     170:        int siz = 0;
1.2.2.1   misho     171: 
                    172:        ioTRACE(2);
                    173: 
                    174:        if (!sess)
                    175:                return -1;
                    176: 
1.2.2.7   misho     177:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     178:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    179:        if (siz == -1) {
                    180:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    181:                return 0;
                    182:        }
1.2.2.13! misho     183:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
1.2.2.2   misho     184:                ioSYSERR(0);
                    185:                return 0;
1.2.2.8   misho     186:        } else {
1.2.2.2   misho     187:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     188:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    189:        }
1.2.2.1   misho     190: 
                    191:        return 0;
                    192: }
                    193: 
                    194: int
1.2.2.9   misho     195: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     196: {
                    197:        struct tagStore *store;
                    198:        struct tagSession *sess = (struct tagSession*) arg;
                    199: 
                    200:        ioTRACE(2);
                    201: 
                    202:        if (!sess)
                    203:                return -1;
                    204: 
1.2.2.6   misho     205:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     206:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    207: 
                    208:        if (call.FiniSessPUB)
                    209:                call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    210: 
1.2.2.6   misho     211:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    212:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     213: 
1.2.2.6   misho     214:                if (store->st_subscr.sub_topic.msg_base)
                    215:                        free(store->st_subscr.sub_topic.msg_base);
                    216:                if (store->st_subscr.sub_value.msg_base)
                    217:                        free(store->st_subscr.sub_value.msg_base);
                    218: 
                    219:                free(store);
                    220:        }
1.2.2.1   misho     221: 
                    222:        if (sess->sess_will.msg)
                    223:                free(sess->sess_will.msg);
                    224:        if (sess->sess_will.topic)
                    225:                free(sess->sess_will.topic);
                    226: 
                    227:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    228:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     229: 
1.2.2.12  misho     230:        return -3;      /* reconnect client */
1.2.2.1   misho     231: }
                    232: 
                    233: int
1.2.2.9   misho     234: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     235: {
                    236:        struct tagSession *sess = (struct tagSession*) arg;
                    237: 
                    238:        ioTRACE(2);
                    239: 
                    240:        if (!sess)
                    241:                return -1;
                    242: 
1.2.2.5   misho     243:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     244: 
1.2.2.1   misho     245:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    246:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     247: 
1.2.2.10  misho     248:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     249: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>