Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.8

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
                      7: int
1.2.2.1   misho       8: cmdPUBLISH(void *srv, void *arg)
1.2       misho       9: {
                     10:        struct mqtthdr *hdr;
1.2.2.1   misho      11:        struct tagSession *sess = (struct tagSession*) arg;
1.2       misho      12: 
                     13:        ioTRACE(2);
                     14: 
                     15:        if (!sess)
                     16:                return -1;
                     17: 
                     18:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     19:        switch (hdr->mqtt_msg.qos) {
                     20:                case MQTT_QOS_ONCE:
                     21:                        break;
                     22:                case MQTT_QOS_ACK:
                     23:                        break;
                     24:                case MQTT_QOS_EXACTLY:
                     25:                        break;
                     26:                default:
                     27:                        ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request", 
                     28:                                        hdr->mqtt_msg.qos);
                     29:                        return 0;
                     30:        }
                     31: 
                     32:        return 0;
                     33: }
1.2.2.1   misho      34: 
                     35: int
                     36: cmdPUBREL(void *srv, void *arg)
                     37: {
                     38:        struct mqtthdr *hdr;
                     39:        struct tagSession *sess = (struct tagSession*) arg;
                     40: 
                     41:        ioTRACE(2);
                     42: 
                     43:        if (!sess)
                     44:                return -1;
                     45: 
                     46:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     47: 
                     48:        return 0;
                     49: }
                     50: 
                     51: int
                     52: cmdSUBSCRIBE(void *srv, void *arg)
                     53: {
                     54:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho      55:        mqtt_subscr_t *subs = NULL;
                     56:        int siz = 0;
                     57:        u_short mid = 0;
                     58:        register int i;
                     59:        struct tagStore *store;
1.2.2.1   misho      60: 
                     61:        ioTRACE(2);
                     62: 
                     63:        if (!sess)
                     64:                return -1;
                     65: 
1.2.2.7   misho      66:        ioDEBUG(5, "Exec SUBSCRIBE session");
                     67:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                     68:        if (siz == -1) {
                     69:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                     70:                return 0;
                     71:        }
                     72: 
                     73:        /* add to db */
                     74:        for (i = 0; i < siz; i++) {
1.2.2.8 ! misho      75:                if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base, 
        !            76:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.7   misho      77:                        store = malloc(sizeof(struct tagStore));
                     78:                        if (!store) {
                     79:                                ioSYSERR(0);
                     80:                                goto end;
                     81:                        } else {
                     82:                                store->st_msgid = mid;
1.2.2.8 ! misho      83:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho      84:                        }
                     85: 
                     86:                        /* add to cache */
                     87:                        SESS_ELEM_LOCK(sess);
                     88:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                     89:                        SESS_ELEM_UNLOCK(sess);
                     90: 
                     91:                        subs[i].sub_ret = MQTT_QOS_PASS;
                     92:                } else
                     93:                        subs[i].sub_ret = MQTT_QOS_DENY;
                     94:        }
1.2.2.1   misho      95: 
1.2.2.7   misho      96:        /* send acknowledge */
                     97:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                     98:        if (siz == -1) {
                     99:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    100:                goto end;
                    101:        }
                    102:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1)
                    103:                ioSYSERR(0);
1.2.2.8 ! misho     104:        else {
1.2.2.7   misho     105:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 ! misho     106:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
        !           107:        }
1.2.2.7   misho     108: end:
                    109:        mqtt_subFree(&subs);
1.2.2.1   misho     110:        return 0;
                    111: }
                    112: 
                    113: int
                    114: cmdUNSUBSCRIBE(void *srv, void *arg)
                    115: {
                    116:        struct tagSession *sess = (struct tagSession*) arg;
                    117: 
                    118:        ioTRACE(2);
                    119: 
                    120:        if (!sess)
                    121:                return -1;
                    122: 
                    123:        return 0;
                    124: }
                    125: 
                    126: int
                    127: cmdPINGREQ(void *srv, void *arg)
                    128: {
                    129:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     130:        int siz = 0;
1.2.2.1   misho     131: 
                    132:        ioTRACE(2);
                    133: 
                    134:        if (!sess)
                    135:                return -1;
                    136: 
1.2.2.7   misho     137:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     138:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    139:        if (siz == -1) {
                    140:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    141:                return 0;
                    142:        }
                    143:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) {
                    144:                ioSYSERR(0);
                    145:                return 0;
1.2.2.8 ! misho     146:        } else {
1.2.2.2   misho     147:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 ! misho     148:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
        !           149:        }
1.2.2.1   misho     150: 
                    151:        return 0;
                    152: }
                    153: 
                    154: int
                    155: cmdCONNECT(void *srv, void *arg)
                    156: {
                    157:        struct tagStore *store;
                    158:        struct tagSession *sess = (struct tagSession*) arg;
                    159: 
                    160:        ioTRACE(2);
                    161: 
                    162:        if (!sess)
                    163:                return -1;
                    164: 
1.2.2.6   misho     165:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.3   misho     166:        SESS_LOCK;
1.2.2.1   misho     167:        TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3   misho     168:        SESS_UNLOCK;
1.2.2.1   misho     169: 
                    170:        if (call.FiniSessPUB)
                    171:                call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    172: 
1.2.2.3   misho     173:        SESS_ELEM_LOCK(sess);
1.2.2.6   misho     174:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    175:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     176: 
1.2.2.6   misho     177:                if (store->st_subscr.sub_topic.msg_base)
                    178:                        free(store->st_subscr.sub_topic.msg_base);
                    179:                if (store->st_subscr.sub_value.msg_base)
                    180:                        free(store->st_subscr.sub_value.msg_base);
                    181: 
                    182:                free(store);
                    183:        }
1.2.2.3   misho     184:        SESS_ELEM_UNLOCK(sess);
1.2.2.1   misho     185: 
                    186:        if (sess->sess_will.msg)
                    187:                free(sess->sess_will.msg);
                    188:        if (sess->sess_will.topic)
                    189:                free(sess->sess_will.topic);
                    190: 
                    191:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    192:                        sess->sess_addr, sess->sess_user);
                    193:        return 0;
                    194: }
                    195: 
                    196: int
                    197: cmdDISCONNECT(void *srv, void *arg)
                    198: {
                    199:        struct tagSession *sess = (struct tagSession*) arg;
                    200: 
                    201:        ioTRACE(2);
                    202: 
                    203:        if (!sess)
                    204:                return -1;
                    205: 
1.2.2.5   misho     206:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.3   misho     207:        SESS_LOCK;
1.2.2.1   misho     208:        TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3   misho     209:        SESS_UNLOCK;
1.2.2.1   misho     210: 
                    211:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    212:                        sess->sess_addr, sess->sess_user);
                    213:        return 0;
                    214: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>