Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.14

1.2       misho       1: #include "global.h"
                      2: #include "mqttd.h"
1.2.2.1   misho       3: #include "rtlm.h"
1.2       misho       4: #include "mqttd_calls.h"
                      5: 
                      6: 
                      7: int
1.2.2.9   misho       8: cmdPUBLISH(void *srv, int len, void *arg)
1.2       misho       9: {
                     10:        struct mqtthdr *hdr;
1.2.2.1   misho      11:        struct tagSession *sess = (struct tagSession*) arg;
1.2       misho      12: 
                     13:        ioTRACE(2);
                     14: 
                     15:        if (!sess)
                     16:                return -1;
                     17: 
                     18:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     19:        switch (hdr->mqtt_msg.qos) {
                     20:                case MQTT_QOS_ONCE:
                     21:                        break;
                     22:                case MQTT_QOS_ACK:
                     23:                        break;
                     24:                case MQTT_QOS_EXACTLY:
                     25:                        break;
                     26:                default:
                     27:                        ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request", 
                     28:                                        hdr->mqtt_msg.qos);
                     29:                        return 0;
                     30:        }
                     31: 
                     32:        return 0;
                     33: }
1.2.2.1   misho      34: 
                     35: int
1.2.2.9   misho      36: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1   misho      37: {
                     38:        struct mqtthdr *hdr;
                     39:        struct tagSession *sess = (struct tagSession*) arg;
                     40: 
                     41:        ioTRACE(2);
                     42: 
                     43:        if (!sess)
                     44:                return -1;
                     45: 
                     46:        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                     47: 
                     48:        return 0;
                     49: }
                     50: 
                     51: int
1.2.2.9   misho      52: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho      53: {
                     54:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7   misho      55:        mqtt_subscr_t *subs = NULL;
                     56:        int siz = 0;
                     57:        u_short mid = 0;
                     58:        register int i;
                     59:        struct tagStore *store;
1.2.2.1   misho      60: 
                     61:        ioTRACE(2);
                     62: 
                     63:        if (!sess)
                     64:                return -1;
                     65: 
1.2.2.7   misho      66:        ioDEBUG(5, "Exec SUBSCRIBE session");
                     67:        siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
                     68:        if (siz == -1) {
                     69:                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                     70:                return 0;
                     71:        }
                     72: 
                     73:        /* add to db */
                     74:        for (i = 0; i < siz; i++) {
1.2.2.8   misho      75:                if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base, 
                     76:                                sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.7   misho      77:                        store = malloc(sizeof(struct tagStore));
                     78:                        if (!store) {
                     79:                                ioSYSERR(0);
                     80:                                goto end;
                     81:                        } else {
                     82:                                store->st_msgid = mid;
1.2.2.8   misho      83:                                mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7   misho      84:                        }
                     85: 
                     86:                        /* add to cache */
                     87:                        SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
                     88: 
                     89:                        subs[i].sub_ret = MQTT_QOS_PASS;
                     90:                } else
                     91:                        subs[i].sub_ret = MQTT_QOS_DENY;
                     92:        }
1.2.2.1   misho      93: 
1.2.2.7   misho      94:        /* send acknowledge */
                     95:        siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
                     96:        if (siz == -1) {
                     97:                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                     98:                goto end;
                     99:        }
1.2.2.13  misho     100:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
1.2.2.7   misho     101:                ioSYSERR(0);
1.2.2.8   misho     102:        else {
1.2.2.7   misho     103:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     104:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    105:        }
1.2.2.7   misho     106: end:
                    107:        mqtt_subFree(&subs);
1.2.2.1   misho     108:        return 0;
                    109: }
                    110: 
                    111: int
1.2.2.9   misho     112: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1   misho     113: {
                    114:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13  misho     115:        mqtt_subscr_t *subs = NULL;
                    116:        int siz = 0;
                    117:        u_short mid = 0;
                    118:        register int i;
                    119:        struct tagStore *store, *tmp;
1.2.2.1   misho     120: 
                    121:        ioTRACE(2);
                    122: 
                    123:        if (!sess)
                    124:                return -1;
                    125: 
1.2.2.13  misho     126:        ioDEBUG(5, "Exec UNSUBSCRIBE session");
                    127:        siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
                    128:        if (siz == -1) {
                    129:                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    130:                return 0;
                    131:        }
                    132: 
                    133:        /* del from db */
                    134:        for (i = 0; i < siz; i++) {
                    135:                SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
                    136:                        if (store->st_subscr.sub_ret == subs[i].sub_ret && 
                    137:                                        store->st_subscr.sub_topic.msg_base && 
                    138:                                        !strcmp(store->st_subscr.sub_topic.msg_base, 
                    139:                                                subs[i].sub_topic.msg_base)) {
                    140:                                SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14! misho     141: 
        !           142:                                if (store->st_subscr.sub_topic.msg_base)
        !           143:                                        free(store->st_subscr.sub_topic.msg_base);
        !           144:                                if (store->st_subscr.sub_value.msg_base)
        !           145:                                        free(store->st_subscr.sub_value.msg_base);
1.2.2.13  misho     146:                                free(store);
                    147:                        }
                    148:                }
                    149: 
                    150:                call.DeletePUB_subscribe(&cfg, pub, subs[i].sub_topic.msg_base, 
                    151:                                sess->sess_user, sess->sess_addr);
                    152:        }
                    153: 
                    154:        /* send acknowledge */
                    155:        siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
                    156:        if (siz == -1) {
                    157:                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    158:                goto end;
                    159:        }
                    160:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
                    161:                ioSYSERR(0);
                    162:        else {
                    163:                ioDEBUG(5, "Sended %d bytes.", siz);
                    164:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    165:        }
                    166: end:
                    167:        mqtt_subFree(&subs);
1.2.2.1   misho     168:        return 0;
                    169: }
                    170: 
                    171: int
1.2.2.9   misho     172: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1   misho     173: {
                    174:        struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2   misho     175:        int siz = 0;
1.2.2.1   misho     176: 
                    177:        ioTRACE(2);
                    178: 
                    179:        if (!sess)
                    180:                return -1;
                    181: 
1.2.2.7   misho     182:        ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2   misho     183:        siz = mqtt_msgPINGRESP(sess->sess_buf);
                    184:        if (siz == -1) {
                    185:                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    186:                return 0;
                    187:        }
1.2.2.13  misho     188:        if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
1.2.2.2   misho     189:                ioSYSERR(0);
                    190:                return 0;
1.2.2.8   misho     191:        } else {
1.2.2.2   misho     192:                ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8   misho     193:                memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
                    194:        }
1.2.2.1   misho     195: 
                    196:        return 0;
                    197: }
                    198: 
                    199: int
1.2.2.9   misho     200: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     201: {
                    202:        struct tagStore *store;
                    203:        struct tagSession *sess = (struct tagSession*) arg;
                    204: 
                    205:        ioTRACE(2);
                    206: 
                    207:        if (!sess)
                    208:                return -1;
                    209: 
1.2.2.6   misho     210:        ioDEBUG(5, "Exec CONNECT session");
1.2.2.1   misho     211:        TAILQ_REMOVE(&Sessions, sess, sess_node);
                    212: 
                    213:        if (call.FiniSessPUB)
                    214:                call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                    215: 
1.2.2.6   misho     216:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
                    217:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3   misho     218: 
1.2.2.6   misho     219:                if (store->st_subscr.sub_topic.msg_base)
                    220:                        free(store->st_subscr.sub_topic.msg_base);
                    221:                if (store->st_subscr.sub_value.msg_base)
                    222:                        free(store->st_subscr.sub_value.msg_base);
                    223: 
                    224:                free(store);
                    225:        }
1.2.2.1   misho     226: 
                    227:        if (sess->sess_will.msg)
                    228:                free(sess->sess_will.msg);
                    229:        if (sess->sess_will.topic)
                    230:                free(sess->sess_will.topic);
                    231: 
                    232:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    233:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     234: 
1.2.2.12  misho     235:        return -3;      /* reconnect client */
1.2.2.1   misho     236: }
                    237: 
                    238: int
1.2.2.9   misho     239: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1   misho     240: {
                    241:        struct tagSession *sess = (struct tagSession*) arg;
                    242: 
                    243:        ioTRACE(2);
                    244: 
                    245:        if (!sess)
                    246:                return -1;
                    247: 
1.2.2.5   misho     248:        ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10  misho     249: 
1.2.2.1   misho     250:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    251:                        sess->sess_addr, sess->sess_user);
1.2.2.9   misho     252: 
1.2.2.10  misho     253:        return -2;      /* must terminate dispatcher */
1.2.2.1   misho     254: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>