Annotation of mqtt/src/sub.c, revision 1.1.2.11

1.1.2.1   misho       1: #include "global.h"
                      2: 
                      3: 
                      4: /* ------------------------------------------------------------------- */
                      5: 
1.1.2.5   misho       6: /*
                      7:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
                      8:  *
                      9:  * @buf = Message buffer
                     10:  * @Topics = MQTT subscription topics
                     11:  * @msgID = MessageID
                     12:  * @Dup = Duplicate message
                     13:  * @QOS = QoS
                     14:  * return: -1 error or >-1 message size for send
                     15:  */
1.1.2.2   misho      16: int
1.1.2.5   misho      17: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                     18:                u_short msgID, u_char Dup, u_char QOS)
1.1.2.2   misho      19: {
1.1.2.3   misho      20:        int siz = 0;
                     21:        struct mqtthdr *hdr;
                     22:        mqtthdr_var_t *topic;
                     23:        mqtt_v_t *mid;
1.1.2.5   misho      24:        mqtt_subscr_t *t;
                     25:        u_char *qos;
1.1.2.3   misho      26: 
1.1.2.5   misho      27:        if (!buf || !Topics)
1.1.2.3   misho      28:                return -1;
                     29:        if (QOS > MQTT_QOS_EXACTLY) {
                     30:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                     31:                return -1;
                     32:        }
                     33:        if (!msgID && QOS != MQTT_QOS_ONCE) {
                     34:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                     35:                return -1;
                     36:        }
                     37: 
                     38:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     39:                return -1;
                     40:        else {
                     41:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     42:                siz += sizeof(struct mqtthdr);
                     43:        }
                     44: 
                     45:        /* variable header */
1.1.2.4   misho      46:        mid = (mqtt_v_t*) (buf->msg_base + siz);
                     47:        mid->val = htons(msgID);
                     48:        siz += sizeof(mqtt_v_t);
                     49: 
1.1.2.5   misho      50:        /* payload with subscriptions */
1.1.2.8   misho      51:        for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.5   misho      52:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
1.1.2.8   misho      53:                topic->var_sb.val = htons(t->sub_topic._size);
                     54:                memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
1.1.2.5   misho      55:                siz += MQTTHDR_VAR_SIZEOF(topic);
                     56:                qos = (buf->msg_base + siz);
1.1.2.8   misho      57:                *qos = t->sub_ret;
1.1.2.5   misho      58:                siz++;
                     59:        }
1.1.2.3   misho      60: 
                     61:        /* fixed header */
1.1.2.7   misho      62:        MQTTHDR_MSGINIT(hdr);
1.1.2.4   misho      63:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
1.1.2.3   misho      64:        hdr->mqtt_msg.qos = QOS;
                     65:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
1.1.2.4   misho      66:        hdr->mqtt_msg.retain = 0;
1.1.2.3   misho      67:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                     68: 
                     69:        mqtt_msgRealloc(buf, siz);
                     70:        return siz;
1.1.2.2   misho      71: }
1.1.2.6   misho      72: 
                     73: /*
                     74:  * mqtt_msgSUBACK() Create SUBACK message
                     75:  *
                     76:  * @buf = Message buffer
                     77:  * @Topics = MQTT subscription topics
                     78:  * @msgID = MessageID
                     79:  * return: -1 error or >-1 message size for send
                     80:  */
                     81: int
                     82: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
                     83: {
                     84:        int siz = 0;
                     85:        struct mqtthdr *hdr;
                     86:        mqtt_v_t *v;
                     87:        mqtt_subscr_t *t;
                     88:        u_char *qos;
                     89: 
                     90:        if (!buf || !Topics)
                     91:                return -1;
                     92: 
                     93:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     94:                return -1;
                     95:        else {
                     96:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     97:                siz += sizeof(struct mqtthdr);
                     98:                v = (mqtt_v_t*) (buf->msg_base + siz);
                     99:                siz += sizeof(mqtt_v_t);
                    100:        }
                    101: 
                    102:        /* MessageID */
                    103:        v->val = htons(msgID);
                    104: 
                    105:        /* QoS payload from subscriptions */
1.1.2.8   misho     106:        for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.6   misho     107:                qos = (buf->msg_base + siz);
1.1.2.8   misho     108:                *qos = t->sub_ret;
1.1.2.6   misho     109:                siz++;
                    110:        }
                    111: 
                    112:        /* fixed header */
1.1.2.7   misho     113:        MQTTHDR_MSGINIT(hdr);
1.1.2.6   misho     114:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
                    115:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    116: 
                    117:        mqtt_msgRealloc(buf, siz);
                    118:        return siz;
                    119: }
1.1.2.7   misho     120: 
                    121: /*
                    122:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
                    123:  *
                    124:  * @buf = Message buffer
                    125:  * @Topics = MQTT subscription topics
                    126:  * @msgID = MessageID
                    127:  * @Dup = Duplicate message
                    128:  * @QOS = QoS
                    129:  * return: -1 error or >-1 message size for send
                    130:  */
                    131: int
                    132: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                    133:                u_short msgID, u_char Dup, u_char QOS)
                    134: {
                    135:        int siz = 0;
                    136:        struct mqtthdr *hdr;
                    137:        mqtthdr_var_t *topic;
                    138:        mqtt_v_t *mid;
                    139:        mqtt_subscr_t *t;
                    140: 
                    141:        if (!buf || !Topics)
                    142:                return -1;
                    143:        if (QOS > MQTT_QOS_EXACTLY) {
                    144:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                    145:                return -1;
                    146:        }
                    147:        if (!msgID && QOS != MQTT_QOS_ONCE) {
                    148:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                    149:                return -1;
                    150:        }
                    151: 
                    152:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    153:                return -1;
                    154:        else {
                    155:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    156:                siz += sizeof(struct mqtthdr);
                    157:        }
                    158: 
                    159:        /* variable header */
                    160:        mid = (mqtt_v_t*) (buf->msg_base + siz);
                    161:        mid->val = htons(msgID);
                    162:        siz += sizeof(mqtt_v_t);
                    163: 
                    164:        /* payload with subscriptions */
1.1.2.8   misho     165:        for (t = Topics; t && t->sub_topic._base; t++) {
1.1.2.7   misho     166:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
1.1.2.8   misho     167:                topic->var_sb.val = htons(t->sub_topic._size);
                    168:                memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
1.1.2.7   misho     169:                siz += MQTTHDR_VAR_SIZEOF(topic);
                    170:        }
                    171: 
                    172:        /* fixed header */
                    173:        MQTTHDR_MSGINIT(hdr);
                    174:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
                    175:        hdr->mqtt_msg.qos = QOS;
                    176:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    177:        hdr->mqtt_msg.retain = 0;
                    178:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    179: 
                    180:        mqtt_msgRealloc(buf, siz);
                    181:        return siz;
                    182: }
                    183: 
                    184: /*
                    185:  * mqtt_msgUNSUBACK() Create UNSUBACK message
                    186:  *
                    187:  * @buf = Message buffer
                    188:  * @msgID = MessageID
                    189:  * return: -1 error or >-1 message size for send
                    190:  */
                    191: int
                    192: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    193: {
                    194:        int siz = 0;
                    195:        struct mqtthdr *hdr;
                    196:        mqtt_v_t *v;
                    197: 
                    198:        if (!buf)
                    199:                return -1;
                    200: 
                    201:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
                    202:                return -1;
                    203:        else {
                    204:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    205:                siz += sizeof(struct mqtthdr);
                    206:                v = (mqtt_v_t*) (buf->msg_base + siz);
                    207:                siz += sizeof(mqtt_v_t);
                    208:        }
                    209: 
                    210:        /* fixed header */
                    211:        MQTTHDR_MSGINIT(hdr);
                    212:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
                    213:        *hdr->mqtt_len = sizeof(mqtt_v_t);
                    214: 
                    215:        /* MessageID */
                    216:        v->val = htons(msgID);
                    217: 
                    218:        return siz;
                    219: }
1.1.2.9   misho     220: 
                    221: 
                    222: /* ============= decode ============ */
                    223: 
                    224: /*
1.1.2.10  misho     225:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
                    226:  *
                    227:  * @buf = Message buffer
                    228:  * @msgID = MessageID
                    229:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
                    230:  * return: NULL error or !=NULL MQTT fixed header
                    231:  */
                    232: struct mqtthdr *
                    233: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    234: {
                    235:        register int i;
                    236:        int len, ret;
                    237:        struct mqtthdr *hdr;
                    238:        mqtthdr_var_t *var;
                    239:        mqtt_subscr_t *subs;
                    240:        mqtt_v_t *v;
                    241:        caddr_t pos;
                    242: 
                    243:        if (!buf || !msgID || !subscr)
                    244:                return NULL;
                    245: 
                    246:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
                    247:        if (!hdr)
                    248:                return NULL;
                    249:        pos = buf->msg_base + ret + 1;
                    250:        v = (mqtt_v_t*) pos;
                    251: 
                    252:        /* MessageID */
                    253:        len -= sizeof(mqtt_v_t);
                    254:        if (len < 0) {
                    255:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    256:                return NULL;
                    257:        } else {
                    258:                *msgID = ntohs(v->val);
                    259:                pos += sizeof(mqtt_v_t);
                    260:        }
                    261: 
                    262:        subs = mqtt_subAlloc(0);
                    263:        if (!subs)
                    264:                return NULL;
                    265:        else
                    266:                *subscr = subs;
                    267: 
                    268:        /* Subscribes */
                    269:        for (i = 0; len > 0; i++) {
                    270:                var = (mqtthdr_var_t*) pos;
                    271:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
                    272:                if (len < 0) {
                    273:                        mqtt_subFree(subscr);
                    274:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    275:                        return NULL;
                    276:                }
                    277:                subs = mqtt_subRealloc(subs, i + 1);
                    278:                if (!subs) {
                    279:                        mqtt_subFree(subscr);
                    280:                        return NULL;
                    281:                } else
                    282:                        *subscr = subs;
                    283: 
                    284:                memset(&subs[i], 0, sizeof subs[i]);
                    285:                subs[i].sub_topic._size = ntohs(var->var_sb.val);
                    286:                subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
                    287:                if (!subs[i].sub_topic._base) {
                    288:                        LOGERR;
                    289:                        mqtt_subFree(subscr);
                    290:                        return NULL;
                    291:                } else
                    292:                        memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
                    293:                pos += MQTTHDR_VAR_SIZEOF(var);
                    294: 
                    295:                subs[i].sub_ret = *pos;
                    296:                pos++;
                    297:        }
                    298: 
                    299:        return hdr;
                    300: }
                    301: 
                    302: /*
1.1.2.11! misho     303:  * mqtt_readSUBACK() Read SUBACK message
        !           304:  *
        !           305:  * @buf = Message buffer
        !           306:  * @msgID = MessageID
        !           307:  * @subqos = Subscribes QoS, must be free after use with free()
        !           308:  * return: -1 error or >-1 readed subscribes QoS elements
        !           309:  */
        !           310: int
        !           311: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
        !           312: {
        !           313:        int len, ret;
        !           314:        struct mqtthdr *hdr;
        !           315:        mqtt_v_t *v;
        !           316:        caddr_t pos;
        !           317: 
        !           318:        if (!buf || !msgID || !subqos)
        !           319:                return -1;
        !           320: 
        !           321:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
        !           322:        if (!hdr)
        !           323:                return -1;
        !           324:        pos = buf->msg_base + ret + 1;
        !           325:        v = (mqtt_v_t*) pos;
        !           326: 
        !           327:        /* MessageID */
        !           328:        len -= sizeof(mqtt_v_t);
        !           329:        if (len < 0) {
        !           330:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           331:                return -1;
        !           332:        } else {
        !           333:                *msgID = ntohs(v->val);
        !           334:                pos += sizeof(mqtt_v_t);
        !           335:        }
        !           336: 
        !           337:        /* Subscribes */
        !           338:        *subqos = malloc(len);
        !           339:        if (!*subqos) {
        !           340:                LOGERR;
        !           341:                return -1;
        !           342:        } else
        !           343:                memcpy(*subqos, pos, len);
        !           344: 
        !           345:        return len;
        !           346: }
        !           347: 
        !           348: /*
        !           349:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
        !           350:  *
        !           351:  * @buf = Message buffer
        !           352:  * @msgID = MessageID
        !           353:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
        !           354:  * return: NULL error or !=NULL MQTT fixed header
        !           355:  */
        !           356: struct mqtthdr *
        !           357: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
        !           358: {
        !           359:        register int i;
        !           360:        int len, ret;
        !           361:        struct mqtthdr *hdr;
        !           362:        mqtthdr_var_t *var;
        !           363:        mqtt_subscr_t *subs;
        !           364:        mqtt_v_t *v;
        !           365:        caddr_t pos;
        !           366: 
        !           367:        if (!buf || !msgID || !subscr)
        !           368:                return NULL;
        !           369: 
        !           370:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
        !           371:        if (!hdr)
        !           372:                return NULL;
        !           373:        pos = buf->msg_base + ret + 1;
        !           374:        v = (mqtt_v_t*) pos;
        !           375: 
        !           376:        /* MessageID */
        !           377:        len -= sizeof(mqtt_v_t);
        !           378:        if (len < 0) {
        !           379:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           380:                return NULL;
        !           381:        } else {
        !           382:                *msgID = ntohs(v->val);
        !           383:                pos += sizeof(mqtt_v_t);
        !           384:        }
        !           385: 
        !           386:        subs = mqtt_subAlloc(0);
        !           387:        if (!subs)
        !           388:                return NULL;
        !           389:        else
        !           390:                *subscr = subs;
        !           391: 
        !           392:        /* Subscribes */
        !           393:        for (i = 0; len > 0; i++) {
        !           394:                var = (mqtthdr_var_t*) pos;
        !           395:                len -= MQTTHDR_VAR_SIZEOF(var);
        !           396:                if (len < 0) {
        !           397:                        mqtt_subFree(subscr);
        !           398:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           399:                        return NULL;
        !           400:                }
        !           401:                subs = mqtt_subRealloc(subs, i + 1);
        !           402:                if (!subs) {
        !           403:                        mqtt_subFree(subscr);
        !           404:                        return NULL;
        !           405:                } else
        !           406:                        *subscr = subs;
        !           407: 
        !           408:                memset(&subs[i], 0, sizeof subs[i]);
        !           409:                subs[i].sub_topic._size = ntohs(var->var_sb.val);
        !           410:                subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
        !           411:                if (!subs[i].sub_topic._base) {
        !           412:                        LOGERR;
        !           413:                        mqtt_subFree(subscr);
        !           414:                        return NULL;
        !           415:                } else
        !           416:                        memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
        !           417:                pos += MQTTHDR_VAR_SIZEOF(var);
        !           418:        }
        !           419: 
        !           420:        return hdr;
        !           421: }
        !           422: 
        !           423: /*
1.1.2.9   misho     424:  * mqtt_readUNSUBACK() Read UNSUBACK message
                    425:  *
                    426:  * @buf = Message buffer
                    427:  * return: -1 error or MessageID
                    428:  */
                    429: u_short
                    430: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
                    431: {
                    432:        int len, ret;
                    433:        struct mqtthdr *hdr;
                    434:        mqtt_v_t *v;
                    435:        caddr_t pos;
                    436: 
                    437:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
                    438:        if (!hdr)
                    439:                return (u_short) -1;
                    440:        if (len < sizeof(mqtt_v_t)) {
                    441:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    442:                return (u_short) -1;
                    443:        } else {
                    444:                pos = buf->msg_base + ret + 1;
                    445:                v = (mqtt_v_t*) pos;
                    446:        }
                    447: 
                    448:        return ntohs(v->val);
                    449: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>