Annotation of mqtt/src/sub.c, revision 1.2

1.2     ! misho       1: #include "global.h"
        !             2: 
        !             3: 
        !             4: /* ------------------------------------------------------------------- */
        !             5: 
        !             6: /*
        !             7:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
        !             8:  *
        !             9:  * @buf = Message buffer
        !            10:  * @Topics = MQTT subscription topics
        !            11:  * @msgID = MessageID
        !            12:  * @Dup = Duplicate message
        !            13:  * @QOS = QoS
        !            14:  * return: -1 error or >-1 message size for send
        !            15:  */
        !            16: int
        !            17: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
        !            18:                u_short msgID, u_char Dup, u_char QOS)
        !            19: {
        !            20:        int siz = 0;
        !            21:        struct mqtthdr *hdr;
        !            22:        mqtthdr_var_t *topic;
        !            23:        mqtt_v_t *mid;
        !            24:        mqtt_subscr_t *t;
        !            25:        u_char *qos;
        !            26: 
        !            27:        if (!buf || !Topics)
        !            28:                return -1;
        !            29:        if (QOS > MQTT_QOS_EXACTLY) {
        !            30:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
        !            31:                return -1;
        !            32:        }
        !            33:        if (!msgID && QOS != MQTT_QOS_ONCE) {
        !            34:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
        !            35:                return -1;
        !            36:        }
        !            37: 
        !            38:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
        !            39:                return -1;
        !            40:        else {
        !            41:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
        !            42:                siz += sizeof(struct mqtthdr);
        !            43:        }
        !            44: 
        !            45:        /* variable header */
        !            46:        mid = (mqtt_v_t*) (buf->msg_base + siz);
        !            47:        mid->val = htons(msgID);
        !            48:        siz += sizeof(mqtt_v_t);
        !            49: 
        !            50:        /* payload with subscriptions */
        !            51:        for (t = Topics; t && t->sub_topic._base; t++) {
        !            52:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
        !            53:                topic->var_sb.val = htons(t->sub_topic._size);
        !            54:                memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
        !            55:                siz += MQTTHDR_VAR_SIZEOF(topic);
        !            56:                qos = (buf->msg_base + siz);
        !            57:                *qos = t->sub_ret;
        !            58:                siz++;
        !            59:        }
        !            60: 
        !            61:        /* fixed header */
        !            62:        MQTTHDR_MSGINIT(hdr);
        !            63:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
        !            64:        hdr->mqtt_msg.qos = QOS;
        !            65:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
        !            66:        hdr->mqtt_msg.retain = 0;
        !            67:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
        !            68: 
        !            69:        mqtt_msgRealloc(buf, siz);
        !            70:        return siz;
        !            71: }
        !            72: 
        !            73: /*
        !            74:  * mqtt_msgSUBACK() Create SUBACK message
        !            75:  *
        !            76:  * @buf = Message buffer
        !            77:  * @Topics = MQTT subscription topics
        !            78:  * @msgID = MessageID
        !            79:  * return: -1 error or >-1 message size for send
        !            80:  */
        !            81: int
        !            82: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
        !            83: {
        !            84:        int siz = 0;
        !            85:        struct mqtthdr *hdr;
        !            86:        mqtt_v_t *v;
        !            87:        mqtt_subscr_t *t;
        !            88:        u_char *qos;
        !            89: 
        !            90:        if (!buf || !Topics)
        !            91:                return -1;
        !            92: 
        !            93:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
        !            94:                return -1;
        !            95:        else {
        !            96:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
        !            97:                siz += sizeof(struct mqtthdr);
        !            98:                v = (mqtt_v_t*) (buf->msg_base + siz);
        !            99:                siz += sizeof(mqtt_v_t);
        !           100:        }
        !           101: 
        !           102:        /* MessageID */
        !           103:        v->val = htons(msgID);
        !           104: 
        !           105:        /* QoS payload from subscriptions */
        !           106:        for (t = Topics; t && t->sub_topic._base; t++) {
        !           107:                qos = (buf->msg_base + siz);
        !           108:                *qos = t->sub_ret;
        !           109:                siz++;
        !           110:        }
        !           111: 
        !           112:        /* fixed header */
        !           113:        MQTTHDR_MSGINIT(hdr);
        !           114:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
        !           115:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
        !           116: 
        !           117:        mqtt_msgRealloc(buf, siz);
        !           118:        return siz;
        !           119: }
        !           120: 
        !           121: /*
        !           122:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
        !           123:  *
        !           124:  * @buf = Message buffer
        !           125:  * @Topics = MQTT subscription topics
        !           126:  * @msgID = MessageID
        !           127:  * @Dup = Duplicate message
        !           128:  * @QOS = QoS
        !           129:  * return: -1 error or >-1 message size for send
        !           130:  */
        !           131: int
        !           132: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
        !           133:                u_short msgID, u_char Dup, u_char QOS)
        !           134: {
        !           135:        int siz = 0;
        !           136:        struct mqtthdr *hdr;
        !           137:        mqtthdr_var_t *topic;
        !           138:        mqtt_v_t *mid;
        !           139:        mqtt_subscr_t *t;
        !           140: 
        !           141:        if (!buf || !Topics)
        !           142:                return -1;
        !           143:        if (QOS > MQTT_QOS_EXACTLY) {
        !           144:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
        !           145:                return -1;
        !           146:        }
        !           147:        if (!msgID && QOS != MQTT_QOS_ONCE) {
        !           148:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
        !           149:                return -1;
        !           150:        }
        !           151: 
        !           152:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
        !           153:                return -1;
        !           154:        else {
        !           155:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
        !           156:                siz += sizeof(struct mqtthdr);
        !           157:        }
        !           158: 
        !           159:        /* variable header */
        !           160:        mid = (mqtt_v_t*) (buf->msg_base + siz);
        !           161:        mid->val = htons(msgID);
        !           162:        siz += sizeof(mqtt_v_t);
        !           163: 
        !           164:        /* payload with subscriptions */
        !           165:        for (t = Topics; t && t->sub_topic._base; t++) {
        !           166:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
        !           167:                topic->var_sb.val = htons(t->sub_topic._size);
        !           168:                memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
        !           169:                siz += MQTTHDR_VAR_SIZEOF(topic);
        !           170:        }
        !           171: 
        !           172:        /* fixed header */
        !           173:        MQTTHDR_MSGINIT(hdr);
        !           174:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
        !           175:        hdr->mqtt_msg.qos = QOS;
        !           176:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
        !           177:        hdr->mqtt_msg.retain = 0;
        !           178:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
        !           179: 
        !           180:        mqtt_msgRealloc(buf, siz);
        !           181:        return siz;
        !           182: }
        !           183: 
        !           184: /*
        !           185:  * mqtt_msgUNSUBACK() Create UNSUBACK message
        !           186:  *
        !           187:  * @buf = Message buffer
        !           188:  * @msgID = MessageID
        !           189:  * return: -1 error or >-1 message size for send
        !           190:  */
        !           191: int
        !           192: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
        !           193: {
        !           194:        int siz = 0;
        !           195:        struct mqtthdr *hdr;
        !           196:        mqtt_v_t *v;
        !           197: 
        !           198:        if (!buf)
        !           199:                return -1;
        !           200: 
        !           201:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
        !           202:                return -1;
        !           203:        else {
        !           204:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
        !           205:                siz += sizeof(struct mqtthdr);
        !           206:                v = (mqtt_v_t*) (buf->msg_base + siz);
        !           207:                siz += sizeof(mqtt_v_t);
        !           208:        }
        !           209: 
        !           210:        /* fixed header */
        !           211:        MQTTHDR_MSGINIT(hdr);
        !           212:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
        !           213:        *hdr->mqtt_len = sizeof(mqtt_v_t);
        !           214: 
        !           215:        /* MessageID */
        !           216:        v->val = htons(msgID);
        !           217: 
        !           218:        return siz;
        !           219: }
        !           220: 
        !           221: 
        !           222: /* ============= decode ============ */
        !           223: 
        !           224: /*
        !           225:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
        !           226:  *
        !           227:  * @buf = Message buffer
        !           228:  * @msgID = MessageID
        !           229:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
        !           230:  * return: NULL error or !=NULL MQTT fixed header
        !           231:  */
        !           232: struct mqtthdr *
        !           233: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
        !           234: {
        !           235:        register int i;
        !           236:        int len, ret;
        !           237:        struct mqtthdr *hdr;
        !           238:        mqtthdr_var_t *var;
        !           239:        mqtt_subscr_t *subs;
        !           240:        mqtt_v_t *v;
        !           241:        caddr_t pos;
        !           242: 
        !           243:        if (!buf || !msgID || !subscr)
        !           244:                return NULL;
        !           245: 
        !           246:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
        !           247:        if (!hdr)
        !           248:                return NULL;
        !           249:        pos = buf->msg_base + ret + 1;
        !           250:        v = (mqtt_v_t*) pos;
        !           251: 
        !           252:        /* MessageID */
        !           253:        len -= sizeof(mqtt_v_t);
        !           254:        if (len < 0) {
        !           255:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           256:                return NULL;
        !           257:        } else {
        !           258:                *msgID = ntohs(v->val);
        !           259:                pos += sizeof(mqtt_v_t);
        !           260:        }
        !           261: 
        !           262:        subs = mqtt_subAlloc(0);
        !           263:        if (!subs)
        !           264:                return NULL;
        !           265:        else
        !           266:                *subscr = subs;
        !           267: 
        !           268:        /* Subscribes */
        !           269:        for (i = 0; len > 0; i++) {
        !           270:                var = (mqtthdr_var_t*) pos;
        !           271:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
        !           272:                if (len < 0) {
        !           273:                        mqtt_subFree(subscr);
        !           274:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           275:                        return NULL;
        !           276:                }
        !           277:                subs = mqtt_subRealloc(subs, i + 1);
        !           278:                if (!subs) {
        !           279:                        mqtt_subFree(subscr);
        !           280:                        return NULL;
        !           281:                } else
        !           282:                        *subscr = subs;
        !           283: 
        !           284:                memset(&subs[i], 0, sizeof subs[i]);
        !           285:                subs[i].sub_topic._size = ntohs(var->var_sb.val);
        !           286:                subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
        !           287:                if (!subs[i].sub_topic._base) {
        !           288:                        LOGERR;
        !           289:                        mqtt_subFree(subscr);
        !           290:                        return NULL;
        !           291:                } else
        !           292:                        memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
        !           293:                pos += MQTTHDR_VAR_SIZEOF(var);
        !           294: 
        !           295:                subs[i].sub_ret = *pos;
        !           296:                pos++;
        !           297:        }
        !           298: 
        !           299:        return hdr;
        !           300: }
        !           301: 
        !           302: /*
        !           303:  * mqtt_readSUBACK() Read SUBACK message
        !           304:  *
        !           305:  * @buf = Message buffer
        !           306:  * @msgID = MessageID
        !           307:  * @subqos = Subscribes QoS, must be free after use with free()
        !           308:  * return: -1 error or >-1 readed subscribes QoS elements
        !           309:  */
        !           310: int
        !           311: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
        !           312: {
        !           313:        int len, ret;
        !           314:        struct mqtthdr *hdr;
        !           315:        mqtt_v_t *v;
        !           316:        caddr_t pos;
        !           317: 
        !           318:        if (!buf || !msgID || !subqos)
        !           319:                return -1;
        !           320: 
        !           321:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
        !           322:        if (!hdr)
        !           323:                return -1;
        !           324:        pos = buf->msg_base + ret + 1;
        !           325:        v = (mqtt_v_t*) pos;
        !           326: 
        !           327:        /* MessageID */
        !           328:        len -= sizeof(mqtt_v_t);
        !           329:        if (len < 0) {
        !           330:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           331:                return -1;
        !           332:        } else {
        !           333:                *msgID = ntohs(v->val);
        !           334:                pos += sizeof(mqtt_v_t);
        !           335:        }
        !           336: 
        !           337:        /* Subscribes */
        !           338:        *subqos = malloc(len);
        !           339:        if (!*subqos) {
        !           340:                LOGERR;
        !           341:                return -1;
        !           342:        } else
        !           343:                memcpy(*subqos, pos, len);
        !           344: 
        !           345:        return len;
        !           346: }
        !           347: 
        !           348: /*
        !           349:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
        !           350:  *
        !           351:  * @buf = Message buffer
        !           352:  * @msgID = MessageID
        !           353:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
        !           354:  * return: NULL error or !=NULL MQTT fixed header
        !           355:  */
        !           356: struct mqtthdr *
        !           357: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
        !           358: {
        !           359:        register int i;
        !           360:        int len, ret;
        !           361:        struct mqtthdr *hdr;
        !           362:        mqtthdr_var_t *var;
        !           363:        mqtt_subscr_t *subs;
        !           364:        mqtt_v_t *v;
        !           365:        caddr_t pos;
        !           366: 
        !           367:        if (!buf || !msgID || !subscr)
        !           368:                return NULL;
        !           369: 
        !           370:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
        !           371:        if (!hdr)
        !           372:                return NULL;
        !           373:        pos = buf->msg_base + ret + 1;
        !           374:        v = (mqtt_v_t*) pos;
        !           375: 
        !           376:        /* MessageID */
        !           377:        len -= sizeof(mqtt_v_t);
        !           378:        if (len < 0) {
        !           379:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           380:                return NULL;
        !           381:        } else {
        !           382:                *msgID = ntohs(v->val);
        !           383:                pos += sizeof(mqtt_v_t);
        !           384:        }
        !           385: 
        !           386:        subs = mqtt_subAlloc(0);
        !           387:        if (!subs)
        !           388:                return NULL;
        !           389:        else
        !           390:                *subscr = subs;
        !           391: 
        !           392:        /* Subscribes */
        !           393:        for (i = 0; len > 0; i++) {
        !           394:                var = (mqtthdr_var_t*) pos;
        !           395:                len -= MQTTHDR_VAR_SIZEOF(var);
        !           396:                if (len < 0) {
        !           397:                        mqtt_subFree(subscr);
        !           398:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           399:                        return NULL;
        !           400:                }
        !           401:                subs = mqtt_subRealloc(subs, i + 1);
        !           402:                if (!subs) {
        !           403:                        mqtt_subFree(subscr);
        !           404:                        return NULL;
        !           405:                } else
        !           406:                        *subscr = subs;
        !           407: 
        !           408:                memset(&subs[i], 0, sizeof subs[i]);
        !           409:                subs[i].sub_topic._size = ntohs(var->var_sb.val);
        !           410:                subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
        !           411:                if (!subs[i].sub_topic._base) {
        !           412:                        LOGERR;
        !           413:                        mqtt_subFree(subscr);
        !           414:                        return NULL;
        !           415:                } else
        !           416:                        memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
        !           417:                pos += MQTTHDR_VAR_SIZEOF(var);
        !           418:        }
        !           419: 
        !           420:        return hdr;
        !           421: }
        !           422: 
        !           423: /*
        !           424:  * mqtt_readUNSUBACK() Read UNSUBACK message
        !           425:  *
        !           426:  * @buf = Message buffer
        !           427:  * return: -1 error or MessageID
        !           428:  */
        !           429: u_short
        !           430: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
        !           431: {
        !           432:        int len, ret;
        !           433:        struct mqtthdr *hdr;
        !           434:        mqtt_v_t *v;
        !           435:        caddr_t pos;
        !           436: 
        !           437:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
        !           438:        if (!hdr)
        !           439:                return (u_short) -1;
        !           440:        if (len < sizeof(mqtt_v_t)) {
        !           441:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           442:                return (u_short) -1;
        !           443:        } else {
        !           444:                pos = buf->msg_base + ret + 1;
        !           445:                v = (mqtt_v_t*) pos;
        !           446:        }
        !           447: 
        !           448:        return ntohs(v->val);
        !           449: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>