Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.9

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
                      7: int
1.2.2.9 ! misho       8: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho       9: {
                     10:        struct mqtthdr *hdr;
1.2.2.1   misho      11:        struct tagSession *sess = (struct tagSession*) arg;
1.2       misho      12: 
                     13:        ioTRACE(2);
                     14: 
                     15:        if (!sess)
                     16:                return -1;
                     17: 
                     18:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     19:        switch (hdr->mqtt_msg.qos) {
                     20:                case MQTT_QOS_ONCE:
                     21:                        break;
                     22:                case MQTT_QOS_ACK:
                     23:                        break;
                     24:                case MQTT_QOS_EXACTLY:
                     25:                        break;
                     26:                default:
                     27:                        ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request", 
                     28:                                        hdr->mqtt_msg.qos);
                     29:                        return 0;
                     30:        }
                     31: 
                     32:        return 0;
                     33: }
1.2.2.1   misho      34: 
                     35: int
1.2.2.9 ! misho      36: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho      37: {
                     38:        struct mqtthdr *hdr;
                     39:        struct tagSession *sess = (struct tagSession*) arg;
                     40: 
                     41:        ioTRACE(2);
                     42: 
                     43:        if (!sess)
                     44:                return -1;
                     45: 
                     46:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     47: 
                     48:        return 0;
                     49: }
                     50: 
                     51: int
1.2.2.9 ! misho      52: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho      53: {
                     54:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho      55:        mqtt_subscr_t *subs = NULL;
                     56:        int siz = 0;
                     57:        u_short mid = 0;
                     58:        register int i;
                     59:        struct tagStore *store;
1.2.2.1   misho      60: 
                     61:        ioTRACE(2);
                     62: 
                     63:        if (!sess)
                     64:                return -1;
                     65: 
1.2.2.7   misho      66:        ioDEBUG(5, "Exec SUBSCRIBE session");
                     67:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                     68:        if (siz == -1) {
                     69:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                     70:                return 0;
                     71:        }
                     72: 
                     73:        /* add to db */
                     74:        for (i = 0; i < siz; i++) {
1.2.2.8   misho      75:                if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base, 
                     76:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.7   misho      77:                        store = malloc(sizeof(struct tagStore));
                     78:                        if (!store) {
                     79:                                ioSYSERR(0);
                     80:                                goto end;
                     81:                        } else {
                     82:                                store->st_msgid = mid;
1.2.2.8   misho      83:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho      84:                        }
                     85: 
                     86:                        /* add to cache */
                     87:                        SESS_ELEM_LOCK(sess);
                     88:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                     89:                        SESS_ELEM_UNLOCK(sess);
                     90: 
                     91:                        subs[i].sub_ret = MQTT_QOS_PASS;
                     92:                } else
                     93:                        subs[i].sub_ret = MQTT_QOS_DENY;
                     94:        }
1.2.2.1   misho      95: 
1.2.2.7   misho      96:        /* send acknowledge */
                     97:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                     98:        if (siz == -1) {
                     99:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    100:                goto end;
                    101:        }
                    102:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1)
                    103:                ioSYSERR(0);
1.2.2.8   misho     104:        else {
1.2.2.7   misho     105:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     106:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    107:        }
1.2.2.7   misho     108: end:
                    109:        mqtt_subFree(&subs);
1.2.2.1   misho     110:        return 0;
                    111: }
                    112: 
                    113: int
1.2.2.9 ! misho     114: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     115: {
                    116:        struct tagSession *sess = (struct tagSession*) arg;
                    117: 
                    118:        ioTRACE(2);
                    119: 
                    120:        if (!sess)
                    121:                return -1;
                    122: 
                    123:        return 0;
                    124: }
                    125: 
                    126: int
1.2.2.9 ! misho     127: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     128: {
                    129:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     130:        int siz = 0;
1.2.2.1   misho     131: 
                    132:        ioTRACE(2);
                    133: 
                    134:        if (!sess)
                    135:                return -1;
                    136: 
1.2.2.7   misho     137:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     138:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    139:        if (siz == -1) {
                    140:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    141:                return 0;
                    142:        }
                    143:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) {
                    144:                ioSYSERR(0);
                    145:                return 0;
1.2.2.8   misho     146:        } else {
1.2.2.2   misho     147:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     148:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    149:        }
1.2.2.1   misho     150: 
                    151:        return 0;
                    152: }
                    153: 
                    154: int
1.2.2.9 ! misho     155: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     156: {
                    157:        struct tagStore *store;
                    158:        struct tagSession *sess = (struct tagSession*) arg;
                    159: 
                    160:        ioTRACE(2);
                    161: 
                    162:        if (!sess)
                    163:                return -1;
                    164: 
1.2.2.6   misho     165:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.3   misho     166:        SESS_LOCK;
1.2.2.1   misho     167:        TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3   misho     168:        SESS_UNLOCK;
1.2.2.1   misho     169: 
                    170:        if (call.FiniSessPUB)
                    171:                call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    172: 
1.2.2.3   misho     173:        SESS_ELEM_LOCK(sess);
1.2.2.6   misho     174:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    175:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     176: 
1.2.2.6   misho     177:                if (store->st_subscr.sub_topic.msg_base)
                    178:                        free(store->st_subscr.sub_topic.msg_base);
                    179:                if (store->st_subscr.sub_value.msg_base)
                    180:                        free(store->st_subscr.sub_value.msg_base);
                    181: 
                    182:                free(store);
                    183:        }
1.2.2.3   misho     184:        SESS_ELEM_UNLOCK(sess);
1.2.2.1   misho     185: 
                    186:        if (sess->sess_will.msg)
                    187:                free(sess->sess_will.msg);
                    188:        if (sess->sess_will.topic)
                    189:                free(sess->sess_will.topic);
                    190: 
                    191:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    192:                        sess->sess_addr, sess->sess_user);
1.2.2.9 ! misho     193: 
        !           194: //     schedEvent(root, startSession, NULL, (u_long) sess->sess_sock, sess, ret);
1.2.2.1   misho     195:        return 0;
                    196: }
                    197: 
                    198: int
1.2.2.9 ! misho     199: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     200: {
                    201:        struct tagSession *sess = (struct tagSession*) arg;
                    202: 
                    203:        ioTRACE(2);
                    204: 
                    205:        if (!sess)
                    206:                return -1;
                    207: 
1.2.2.5   misho     208:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.3   misho     209:        SESS_LOCK;
1.2.2.1   misho     210:        TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3   misho     211:        SESS_UNLOCK;
1.2.2.1   misho     212: 
                    213:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    214:                        sess->sess_addr, sess->sess_user);
1.2.2.9 ! misho     215: 
        !           216:        finiSession(sess);
1.2.2.1   misho     217:        return 0;
                    218: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>