Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.7

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
                      7: int
1.2.2.1   misho       8: cmdPUBLISH(void *srv, void *arg)
1.2       misho       9: {
                     10:        struct mqtthdr *hdr;
1.2.2.1   misho      11:        struct tagSession *sess = (struct tagSession*) arg;
1.2       misho      12: 
                     13:        ioTRACE(2);
                     14: 
                     15:        if (!sess)
                     16:                return -1;
                     17: 
                     18:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     19:        switch (hdr->mqtt_msg.qos) {
                     20:                case MQTT_QOS_ONCE:
                     21:                        break;
                     22:                case MQTT_QOS_ACK:
                     23:                        break;
                     24:                case MQTT_QOS_EXACTLY:
                     25:                        break;
                     26:                default:
                     27:                        ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request", 
                     28:                                        hdr->mqtt_msg.qos);
                     29:                        return 0;
                     30:        }
                     31: 
                     32:        return 0;
                     33: }
1.2.2.1   misho      34: 
                     35: int
                     36: cmdPUBREL(void *srv, void *arg)
                     37: {
                     38:        struct mqtthdr *hdr;
                     39:        struct tagSession *sess = (struct tagSession*) arg;
                     40: 
                     41:        ioTRACE(2);
                     42: 
                     43:        if (!sess)
                     44:                return -1;
                     45: 
                     46:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     47: 
                     48:        return 0;
                     49: }
                     50: 
                     51: int
                     52: cmdSUBSCRIBE(void *srv, void *arg)
                     53: {
                     54:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 ! misho      55:        mqtt_subscr_t *subs = NULL;
        !            56:        int siz = 0;
        !            57:        u_short mid = 0;
        !            58:        register int i;
        !            59:        struct tagStore *store;
1.2.2.1   misho      60: 
                     61:        ioTRACE(2);
                     62: 
                     63:        if (!sess)
                     64:                return -1;
                     65: 
1.2.2.7 ! misho      66:        ioDEBUG(5, "Exec SUBSCRIBE session");
        !            67:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
        !            68:        if (siz == -1) {
        !            69:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !            70:                return 0;
        !            71:        }
        !            72: 
        !            73:        /* add to db */
        !            74:        for (i = 0; i < siz; i++) {
        !            75:                if ((siz = call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base, 
        !            76:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret)) > 0) {
        !            77:                        store = malloc(sizeof(struct tagStore));
        !            78:                        if (!store) {
        !            79:                                ioSYSERR(0);
        !            80:                                goto end;
        !            81:                        } else {
        !            82:                                store->st_msgid = mid;
        !            83:                                store->st_subscr = subs[i];
        !            84:                        }
        !            85: 
        !            86:                        /* add to cache */
        !            87:                        SESS_ELEM_LOCK(sess);
        !            88:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
        !            89:                        SESS_ELEM_UNLOCK(sess);
        !            90: 
        !            91:                        subs[i].sub_ret = MQTT_QOS_PASS;
        !            92:                } else
        !            93:                        subs[i].sub_ret = MQTT_QOS_DENY;
        !            94:        }
1.2.2.1   misho      95: 
1.2.2.7 ! misho      96:        /* send acknowledge */
        !            97:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
        !            98:        if (siz == -1) {
        !            99:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
        !           100:                goto end;
        !           101:        }
        !           102:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1)
        !           103:                ioSYSERR(0);
        !           104:        else
        !           105:                ioDEBUG(5, "Sended %d bytes.", siz);
        !           106: end:
        !           107:        mqtt_subFree(&subs);
1.2.2.1   misho     108:        return 0;
                    109: }
                    110: 
                    111: int
                    112: cmdUNSUBSCRIBE(void *srv, void *arg)
                    113: {
                    114:        struct tagSession *sess = (struct tagSession*) arg;
                    115: 
                    116:        ioTRACE(2);
                    117: 
                    118:        if (!sess)
                    119:                return -1;
                    120: 
                    121:        return 0;
                    122: }
                    123: 
                    124: int
                    125: cmdPINGREQ(void *srv, void *arg)
                    126: {
                    127:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     128:        int siz = 0;
1.2.2.1   misho     129: 
                    130:        ioTRACE(2);
                    131: 
                    132:        if (!sess)
                    133:                return -1;
                    134: 
1.2.2.7 ! misho     135:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     136:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    137:        if (siz == -1) {
                    138:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    139:                return 0;
                    140:        }
                    141:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) {
                    142:                ioSYSERR(0);
                    143:                return 0;
                    144:        } else
                    145:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.1   misho     146: 
                    147:        return 0;
                    148: }
                    149: 
                    150: int
                    151: cmdCONNECT(void *srv, void *arg)
                    152: {
                    153:        struct tagStore *store;
                    154:        struct tagSession *sess = (struct tagSession*) arg;
                    155: 
                    156:        ioTRACE(2);
                    157: 
                    158:        if (!sess)
                    159:                return -1;
                    160: 
1.2.2.6   misho     161:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.3   misho     162:        SESS_LOCK;
1.2.2.1   misho     163:        TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3   misho     164:        SESS_UNLOCK;
1.2.2.1   misho     165: 
                    166:        if (call.FiniSessPUB)
                    167:                call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    168: 
1.2.2.3   misho     169:        SESS_ELEM_LOCK(sess);
1.2.2.6   misho     170:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    171:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     172: 
1.2.2.6   misho     173:                if (store->st_subscr.sub_topic.msg_base)
                    174:                        free(store->st_subscr.sub_topic.msg_base);
                    175:                if (store->st_subscr.sub_value.msg_base)
                    176:                        free(store->st_subscr.sub_value.msg_base);
                    177: 
                    178:                free(store);
                    179:        }
1.2.2.3   misho     180:        SESS_ELEM_UNLOCK(sess);
1.2.2.1   misho     181: 
                    182:        if (sess->sess_will.msg)
                    183:                free(sess->sess_will.msg);
                    184:        if (sess->sess_will.topic)
                    185:                free(sess->sess_will.topic);
                    186: 
                    187:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    188:                        sess->sess_addr, sess->sess_user);
                    189:        return 0;
                    190: }
                    191: 
                    192: int
                    193: cmdDISCONNECT(void *srv, void *arg)
                    194: {
                    195:        struct tagSession *sess = (struct tagSession*) arg;
                    196: 
                    197:        ioTRACE(2);
                    198: 
                    199:        if (!sess)
                    200:                return -1;
                    201: 
1.2.2.5   misho     202:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.3   misho     203:        SESS_LOCK;
1.2.2.1   misho     204:        TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3   misho     205:        SESS_UNLOCK;
1.2.2.1   misho     206: 
                    207:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    208:                        sess->sess_addr, sess->sess_user);
                    209:        return 0;
                    210: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>