Annotation of mqtt/src/sub.c, revision 1.1.2.10

1.1.2.1   misho       1: #include "global.h"
                      2: 
                      3: 
                      4: /* ------------------------------------------------------------------- */
                      5: 
1.1.2.5   misho       6: /*
                      7:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
                      8:  *
                      9:  * @buf = Message buffer
                     10:  * @Topics = MQTT subscription topics
                     11:  * @msgID = MessageID
                     12:  * @Dup = Duplicate message
                     13:  * @QOS = QoS
                     14:  * return: -1 error or >-1 message size for send
                     15:  */
1.1.2.2   misho      16: int
1.1.2.5   misho      17: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                     18:                u_short msgID, u_char Dup, u_char QOS)
1.1.2.2   misho      19: {
1.1.2.3   misho      20:        int siz = 0;
                     21:        struct mqtthdr *hdr;
                     22:        mqtthdr_var_t *topic;
                     23:        mqtt_v_t *mid;
1.1.2.5   misho      24:        mqtt_subscr_t *t;
                     25:        u_char *qos;
1.1.2.3   misho      26: 
1.1.2.5   misho      27:        if (!buf || !Topics)
1.1.2.3   misho      28:                return -1;
                     29:        if (QOS > MQTT_QOS_EXACTLY) {
                     30:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                     31:                return -1;
                     32:        }
                     33:        if (!msgID && QOS != MQTT_QOS_ONCE) {
                     34:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                     35:                return -1;
                     36:        }
                     37: 
                     38:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     39:                return -1;
                     40:        else {
                     41:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     42:                siz += sizeof(struct mqtthdr);
                     43:        }
                     44: 
                     45:        /* variable header */
1.1.2.4   misho      46:        mid = (mqtt_v_t*) (buf->msg_base + siz);
                     47:        mid->val = htons(msgID);
                     48:        siz += sizeof(mqtt_v_t);
                     49: 
1.1.2.5   misho      50:        /* payload with subscriptions */
1.1.2.8   misho      51:        for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.5   misho      52:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
1.1.2.8   misho      53:                topic->var_sb.val = htons(t->sub_topic._size);
                     54:                memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
1.1.2.5   misho      55:                siz += MQTTHDR_VAR_SIZEOF(topic);
                     56:                qos = (buf->msg_base + siz);
1.1.2.8   misho      57:                *qos = t->sub_ret;
1.1.2.5   misho      58:                siz++;
                     59:        }
1.1.2.3   misho      60: 
                     61:        /* fixed header */
1.1.2.7   misho      62:        MQTTHDR_MSGINIT(hdr);
1.1.2.4   misho      63:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
1.1.2.3   misho      64:        hdr->mqtt_msg.qos = QOS;
                     65:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
1.1.2.4   misho      66:        hdr->mqtt_msg.retain = 0;
1.1.2.3   misho      67:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                     68: 
                     69:        mqtt_msgRealloc(buf, siz);
                     70:        return siz;
1.1.2.2   misho      71: }
1.1.2.6   misho      72: 
                     73: /*
                     74:  * mqtt_msgSUBACK() Create SUBACK message
                     75:  *
                     76:  * @buf = Message buffer
                     77:  * @Topics = MQTT subscription topics
                     78:  * @msgID = MessageID
                     79:  * return: -1 error or >-1 message size for send
                     80:  */
                     81: int
                     82: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
                     83: {
                     84:        int siz = 0;
                     85:        struct mqtthdr *hdr;
                     86:        mqtt_v_t *v;
                     87:        mqtt_subscr_t *t;
                     88:        u_char *qos;
                     89: 
                     90:        if (!buf || !Topics)
                     91:                return -1;
                     92: 
                     93:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     94:                return -1;
                     95:        else {
                     96:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     97:                siz += sizeof(struct mqtthdr);
                     98:                v = (mqtt_v_t*) (buf->msg_base + siz);
                     99:                siz += sizeof(mqtt_v_t);
                    100:        }
                    101: 
                    102:        /* MessageID */
                    103:        v->val = htons(msgID);
                    104: 
                    105:        /* QoS payload from subscriptions */
1.1.2.8   misho     106:        for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.6   misho     107:                qos = (buf->msg_base + siz);
1.1.2.8   misho     108:                *qos = t->sub_ret;
1.1.2.6   misho     109:                siz++;
                    110:        }
                    111: 
                    112:        /* fixed header */
1.1.2.7   misho     113:        MQTTHDR_MSGINIT(hdr);
1.1.2.6   misho     114:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
                    115:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    116: 
                    117:        mqtt_msgRealloc(buf, siz);
                    118:        return siz;
                    119: }
1.1.2.7   misho     120: 
                    121: /*
                    122:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
                    123:  *
                    124:  * @buf = Message buffer
                    125:  * @Topics = MQTT subscription topics
                    126:  * @msgID = MessageID
                    127:  * @Dup = Duplicate message
                    128:  * @QOS = QoS
                    129:  * return: -1 error or >-1 message size for send
                    130:  */
                    131: int
                    132: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                    133:                u_short msgID, u_char Dup, u_char QOS)
                    134: {
                    135:        int siz = 0;
                    136:        struct mqtthdr *hdr;
                    137:        mqtthdr_var_t *topic;
                    138:        mqtt_v_t *mid;
                    139:        mqtt_subscr_t *t;
                    140: 
                    141:        if (!buf || !Topics)
                    142:                return -1;
                    143:        if (QOS > MQTT_QOS_EXACTLY) {
                    144:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                    145:                return -1;
                    146:        }
                    147:        if (!msgID && QOS != MQTT_QOS_ONCE) {
                    148:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                    149:                return -1;
                    150:        }
                    151: 
                    152:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    153:                return -1;
                    154:        else {
                    155:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    156:                siz += sizeof(struct mqtthdr);
                    157:        }
                    158: 
                    159:        /* variable header */
                    160:        mid = (mqtt_v_t*) (buf->msg_base + siz);
                    161:        mid->val = htons(msgID);
                    162:        siz += sizeof(mqtt_v_t);
                    163: 
                    164:        /* payload with subscriptions */
1.1.2.8   misho     165:        for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.7   misho     166:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
1.1.2.8   misho     167:                topic->var_sb.val = htons(t->sub_topic._size);
                    168:                memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
1.1.2.7   misho     169:                siz += MQTTHDR_VAR_SIZEOF(topic);
                    170:        }
                    171: 
                    172:        /* fixed header */
                    173:        MQTTHDR_MSGINIT(hdr);
                    174:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
                    175:        hdr->mqtt_msg.qos = QOS;
                    176:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    177:        hdr->mqtt_msg.retain = 0;
                    178:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    179: 
                    180:        mqtt_msgRealloc(buf, siz);
                    181:        return siz;
                    182: }
                    183: 
                    184: /*
                    185:  * mqtt_msgUNSUBACK() Create UNSUBACK message
                    186:  *
                    187:  * @buf = Message buffer
                    188:  * @msgID = MessageID
                    189:  * return: -1 error or >-1 message size for send
                    190:  */
                    191: int
                    192: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    193: {
                    194:        int siz = 0;
                    195:        struct mqtthdr *hdr;
                    196:        mqtt_v_t *v;
                    197: 
                    198:        if (!buf)
                    199:                return -1;
                    200: 
                    201:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
                    202:                return -1;
                    203:        else {
                    204:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    205:                siz += sizeof(struct mqtthdr);
                    206:                v = (mqtt_v_t*) (buf->msg_base + siz);
                    207:                siz += sizeof(mqtt_v_t);
                    208:        }
                    209: 
                    210:        /* fixed header */
                    211:        MQTTHDR_MSGINIT(hdr);
                    212:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
                    213:        *hdr->mqtt_len = sizeof(mqtt_v_t);
                    214: 
                    215:        /* MessageID */
                    216:        v->val = htons(msgID);
                    217: 
                    218:        return siz;
                    219: }
1.1.2.9   misho     220: 
                    221: 
                    222: /* ============= decode ============ */
                    223: 
                    224: /*
1.1.2.10! misho     225:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
        !           226:  *
        !           227:  * @buf = Message buffer
        !           228:  * @msgID = MessageID
        !           229:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
        !           230:  * return: NULL error or !=NULL MQTT fixed header
        !           231:  */
        !           232: struct mqtthdr *
        !           233: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
        !           234: {
        !           235:        register int i;
        !           236:        int len, ret;
        !           237:        struct mqtthdr *hdr;
        !           238:        mqtthdr_var_t *var;
        !           239:        mqtt_subscr_t *subs;
        !           240:        mqtt_v_t *v;
        !           241:        caddr_t pos;
        !           242: 
        !           243:        if (!buf || !msgID || !subscr)
        !           244:                return NULL;
        !           245: 
        !           246:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
        !           247:        if (!hdr)
        !           248:                return NULL;
        !           249:        pos = buf->msg_base + ret + 1;
        !           250:        v = (mqtt_v_t*) pos;
        !           251: 
        !           252:        /* MessageID */
        !           253:        len -= sizeof(mqtt_v_t);
        !           254:        if (len < 0) {
        !           255:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           256:                return NULL;
        !           257:        } else {
        !           258:                *msgID = ntohs(v->val);
        !           259:                pos += sizeof(mqtt_v_t);
        !           260:        }
        !           261: 
        !           262:        subs = mqtt_subAlloc(0);
        !           263:        if (!subs)
        !           264:                return NULL;
        !           265:        else
        !           266:                *subscr = subs;
        !           267: 
        !           268:        /* Subscribes */
        !           269:        for (i = 0; len > 0; i++) {
        !           270:                var = (mqtthdr_var_t*) pos;
        !           271:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
        !           272:                if (len < 0) {
        !           273:                        mqtt_subFree(subscr);
        !           274:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           275:                        return NULL;
        !           276:                }
        !           277:                subs = mqtt_subRealloc(subs, i + 1);
        !           278:                if (!subs) {
        !           279:                        mqtt_subFree(subscr);
        !           280:                        return NULL;
        !           281:                } else
        !           282:                        *subscr = subs;
        !           283: 
        !           284:                memset(&subs[i], 0, sizeof subs[i]);
        !           285:                subs[i].sub_topic._size = ntohs(var->var_sb.val);
        !           286:                subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
        !           287:                if (!subs[i].sub_topic._base) {
        !           288:                        LOGERR;
        !           289:                        mqtt_subFree(subscr);
        !           290:                        return NULL;
        !           291:                } else
        !           292:                        memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
        !           293:                pos += MQTTHDR_VAR_SIZEOF(var);
        !           294: 
        !           295:                subs[i].sub_ret = *pos;
        !           296:                pos++;
        !           297:        }
        !           298: 
        !           299:        return hdr;
        !           300: }
        !           301: 
        !           302: /*
1.1.2.9   misho     303:  * mqtt_readUNSUBACK() Read UNSUBACK message
                    304:  *
                    305:  * @buf = Message buffer
                    306:  * return: -1 error or MessageID
                    307:  */
                    308: u_short
                    309: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
                    310: {
                    311:        int len, ret;
                    312:        struct mqtthdr *hdr;
                    313:        mqtt_v_t *v;
                    314:        caddr_t pos;
                    315: 
                    316:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
                    317:        if (!hdr)
                    318:                return (u_short) -1;
                    319:        if (len < sizeof(mqtt_v_t)) {
                    320:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    321:                return (u_short) -1;
                    322:        } else {
                    323:                pos = buf->msg_base + ret + 1;
                    324:                v = (mqtt_v_t*) pos;
                    325:        }
                    326: 
                    327:        return ntohs(v->val);
                    328: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>