Annotation of libaitmqtt/src/sub.c, revision 1.1

1.1     ! misho       1: /*************************************************************************
        !             2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
        !             3: *  by Michael Pounov <misho@openbsd-bg.org>
        !             4: *
        !             5: * $Author: misho $
        !             6: * $Id: aitsched.c,v 1.5 2012/01/24 21:59:47 misho Exp $
        !             7: *
        !             8: **************************************************************************
        !             9: The ELWIX and AITNET software is distributed under the following
        !            10: terms:
        !            11: 
        !            12: All of the documentation and software included in the ELWIX and AITNET
        !            13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
        !            14: 
        !            15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
        !            16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
        !            17: 
        !            18: Redistribution and use in source and binary forms, with or without
        !            19: modification, are permitted provided that the following conditions
        !            20: are met:
        !            21: 1. Redistributions of source code must retain the above copyright
        !            22:    notice, this list of conditions and the following disclaimer.
        !            23: 2. Redistributions in binary form must reproduce the above copyright
        !            24:    notice, this list of conditions and the following disclaimer in the
        !            25:    documentation and/or other materials provided with the distribution.
        !            26: 3. All advertising materials mentioning features or use of this software
        !            27:    must display the following acknowledgement:
        !            28: This product includes software developed by Michael Pounov <misho@elwix.org>
        !            29: ELWIX - Embedded LightWeight unIX and its contributors.
        !            30: 4. Neither the name of AITNET nor the names of its contributors
        !            31:    may be used to endorse or promote products derived from this software
        !            32:    without specific prior written permission.
        !            33: 
        !            34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
        !            35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
        !            36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
        !            37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
        !            38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
        !            39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
        !            40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
        !            41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
        !            42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
        !            43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
        !            44: SUCH DAMAGE.
        !            45: */
        !            46: #include "global.h"
        !            47: 
        !            48: 
        !            49: /* ------------------------------------------------------------------- */
        !            50: 
        !            51: /*
        !            52:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
        !            53:  *
        !            54:  * @buf = Message buffer
        !            55:  * @Topics = MQTT subscription topics
        !            56:  * @msgID = MessageID
        !            57:  * @Dup = Duplicate message
        !            58:  * @QOS = QoS
        !            59:  * return: -1 error or >-1 message size for send
        !            60:  */
        !            61: int
        !            62: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
        !            63:                u_short msgID, u_char Dup, u_char QOS)
        !            64: {
        !            65:        int siz = 0;
        !            66:        struct mqtthdr *hdr;
        !            67:        mqtthdr_var_t *topic;
        !            68:        mqtt_len_t *mid;
        !            69:        mqtt_subscr_t *t;
        !            70:        u_char *qos;
        !            71: 
        !            72:        if (!buf || !Topics)
        !            73:                return -1;
        !            74:        if (QOS > MQTT_QOS_EXACTLY) {
        !            75:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
        !            76:                return -1;
        !            77:        }
        !            78:        if (!msgID && QOS != MQTT_QOS_ONCE) {
        !            79:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
        !            80:                return -1;
        !            81:        }
        !            82: 
        !            83:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
        !            84:                return -1;
        !            85:        else {
        !            86:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
        !            87:                siz += sizeof(struct mqtthdr);
        !            88:        }
        !            89: 
        !            90:        /* variable header */
        !            91:        mid = (mqtt_len_t*) (buf->msg_base + siz);
        !            92:        mid->val = htons(msgID);
        !            93:        siz += sizeof(mqtt_len_t);
        !            94: 
        !            95:        /* payload with subscriptions */
        !            96:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
        !            97:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
        !            98:                topic->var_sb.val = htons(t->sub_topic.msg_len);
        !            99:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
        !           100:                siz += MQTTHDR_VAR_SIZEOF(topic);
        !           101:                qos = (buf->msg_base + siz);
        !           102:                *qos = t->sub_ret;
        !           103:                siz++;
        !           104:        }
        !           105: 
        !           106:        /* fixed header */
        !           107:        MQTTHDR_MSGINIT(hdr);
        !           108:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
        !           109:        hdr->mqtt_msg.qos = QOS;
        !           110:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
        !           111:        hdr->mqtt_msg.retain = 0;
        !           112:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
        !           113: 
        !           114:        mqtt_msgRealloc(buf, siz);
        !           115:        return siz;
        !           116: }
        !           117: 
        !           118: /*
        !           119:  * mqtt_msgSUBACK() Create SUBACK message
        !           120:  *
        !           121:  * @buf = Message buffer
        !           122:  * @Topics = MQTT subscription topics
        !           123:  * @msgID = MessageID
        !           124:  * return: -1 error or >-1 message size for send
        !           125:  */
        !           126: int
        !           127: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
        !           128: {
        !           129:        int siz = 0;
        !           130:        struct mqtthdr *hdr;
        !           131:        mqtt_len_t *v;
        !           132:        mqtt_subscr_t *t;
        !           133:        u_char *qos;
        !           134: 
        !           135:        if (!buf || !Topics)
        !           136:                return -1;
        !           137: 
        !           138:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
        !           139:                return -1;
        !           140:        else {
        !           141:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
        !           142:                siz += sizeof(struct mqtthdr);
        !           143:                v = (mqtt_len_t*) (buf->msg_base + siz);
        !           144:                siz += sizeof(mqtt_len_t);
        !           145:        }
        !           146: 
        !           147:        /* MessageID */
        !           148:        v->val = htons(msgID);
        !           149: 
        !           150:        /* QoS payload from subscriptions */
        !           151:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
        !           152:                qos = (buf->msg_base + siz);
        !           153:                *qos = t->sub_ret;
        !           154:                siz++;
        !           155:        }
        !           156: 
        !           157:        /* fixed header */
        !           158:        MQTTHDR_MSGINIT(hdr);
        !           159:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
        !           160:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
        !           161: 
        !           162:        mqtt_msgRealloc(buf, siz);
        !           163:        return siz;
        !           164: }
        !           165: 
        !           166: /*
        !           167:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
        !           168:  *
        !           169:  * @buf = Message buffer
        !           170:  * @Topics = MQTT subscription topics
        !           171:  * @msgID = MessageID
        !           172:  * @Dup = Duplicate message
        !           173:  * @QOS = QoS
        !           174:  * return: -1 error or >-1 message size for send
        !           175:  */
        !           176: int
        !           177: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
        !           178:                u_short msgID, u_char Dup, u_char QOS)
        !           179: {
        !           180:        int siz = 0;
        !           181:        struct mqtthdr *hdr;
        !           182:        mqtthdr_var_t *topic;
        !           183:        mqtt_len_t *mid;
        !           184:        mqtt_subscr_t *t;
        !           185: 
        !           186:        if (!buf || !Topics)
        !           187:                return -1;
        !           188:        if (QOS > MQTT_QOS_EXACTLY) {
        !           189:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
        !           190:                return -1;
        !           191:        }
        !           192:        if (!msgID && QOS != MQTT_QOS_ONCE) {
        !           193:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
        !           194:                return -1;
        !           195:        }
        !           196: 
        !           197:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
        !           198:                return -1;
        !           199:        else {
        !           200:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
        !           201:                siz += sizeof(struct mqtthdr);
        !           202:        }
        !           203: 
        !           204:        /* variable header */
        !           205:        mid = (mqtt_len_t*) (buf->msg_base + siz);
        !           206:        mid->val = htons(msgID);
        !           207:        siz += sizeof(mqtt_len_t);
        !           208: 
        !           209:        /* payload with subscriptions */
        !           210:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
        !           211:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
        !           212:                topic->var_sb.val = htons(t->sub_topic.msg_len);
        !           213:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
        !           214:                siz += MQTTHDR_VAR_SIZEOF(topic);
        !           215:        }
        !           216: 
        !           217:        /* fixed header */
        !           218:        MQTTHDR_MSGINIT(hdr);
        !           219:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
        !           220:        hdr->mqtt_msg.qos = QOS;
        !           221:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
        !           222:        hdr->mqtt_msg.retain = 0;
        !           223:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
        !           224: 
        !           225:        mqtt_msgRealloc(buf, siz);
        !           226:        return siz;
        !           227: }
        !           228: 
        !           229: /*
        !           230:  * mqtt_msgUNSUBACK() Create UNSUBACK message
        !           231:  *
        !           232:  * @buf = Message buffer
        !           233:  * @msgID = MessageID
        !           234:  * return: -1 error or >-1 message size for send
        !           235:  */
        !           236: int
        !           237: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
        !           238: {
        !           239:        int siz = 0;
        !           240:        struct mqtthdr *hdr;
        !           241:        mqtt_len_t *v;
        !           242: 
        !           243:        if (!buf)
        !           244:                return -1;
        !           245: 
        !           246:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
        !           247:                return -1;
        !           248:        else {
        !           249:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
        !           250:                siz += sizeof(struct mqtthdr);
        !           251:                v = (mqtt_len_t*) (buf->msg_base + siz);
        !           252:                siz += sizeof(mqtt_len_t);
        !           253:        }
        !           254: 
        !           255:        /* fixed header */
        !           256:        MQTTHDR_MSGINIT(hdr);
        !           257:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
        !           258:        *hdr->mqtt_len = sizeof(mqtt_len_t);
        !           259: 
        !           260:        /* MessageID */
        !           261:        v->val = htons(msgID);
        !           262: 
        !           263:        return siz;
        !           264: }
        !           265: 
        !           266: 
        !           267: /* ============= decode ============ */
        !           268: 
        !           269: /*
        !           270:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
        !           271:  *
        !           272:  * @buf = Message buffer
        !           273:  * @msgID = MessageID
        !           274:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
        !           275:  * return: NULL error or !=NULL MQTT fixed header
        !           276:  */
        !           277: struct mqtthdr *
        !           278: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
        !           279: {
        !           280:        register int i;
        !           281:        int len, ret;
        !           282:        struct mqtthdr *hdr;
        !           283:        mqtthdr_var_t *var;
        !           284:        mqtt_subscr_t *subs;
        !           285:        mqtt_len_t *v;
        !           286:        caddr_t pos;
        !           287: 
        !           288:        if (!buf || !msgID || !subscr)
        !           289:                return NULL;
        !           290: 
        !           291:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
        !           292:        if (!hdr)
        !           293:                return NULL;
        !           294:        pos = buf->msg_base + ret + 1;
        !           295:        v = (mqtt_len_t*) pos;
        !           296: 
        !           297:        /* MessageID */
        !           298:        len -= sizeof(mqtt_len_t);
        !           299:        if (len < 0) {
        !           300:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           301:                return NULL;
        !           302:        } else {
        !           303:                *msgID = ntohs(v->val);
        !           304:                pos += sizeof(mqtt_len_t);
        !           305:        }
        !           306: 
        !           307:        subs = mqtt_subAlloc(0);
        !           308:        if (!subs)
        !           309:                return NULL;
        !           310:        else
        !           311:                *subscr = subs;
        !           312: 
        !           313:        /* Subscribes */
        !           314:        for (i = 0; len > 0; i++) {
        !           315:                var = (mqtthdr_var_t*) pos;
        !           316:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
        !           317:                if (len < 0) {
        !           318:                        mqtt_subFree(subscr);
        !           319:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           320:                        return NULL;
        !           321:                }
        !           322:                subs = mqtt_subRealloc(subs, i + 1);
        !           323:                if (!subs) {
        !           324:                        mqtt_subFree(subscr);
        !           325:                        return NULL;
        !           326:                } else
        !           327:                        *subscr = subs;
        !           328: 
        !           329:                memset(&subs[i], 0, sizeof subs[i]);
        !           330:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
        !           331:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
        !           332:                if (!subs[i].sub_topic.msg_base) {
        !           333:                        LOGERR;
        !           334:                        mqtt_subFree(subscr);
        !           335:                        return NULL;
        !           336:                } else
        !           337:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
        !           338:                pos += MQTTHDR_VAR_SIZEOF(var);
        !           339: 
        !           340:                subs[i].sub_ret = *pos;
        !           341:                pos++;
        !           342:        }
        !           343: 
        !           344:        return hdr;
        !           345: }
        !           346: 
        !           347: /*
        !           348:  * mqtt_readSUBACK() Read SUBACK message
        !           349:  *
        !           350:  * @buf = Message buffer
        !           351:  * @msgID = MessageID
        !           352:  * @subqos = Subscribes QoS, must be free after use with free()
        !           353:  * return: -1 error or >-1 readed subscribes QoS elements
        !           354:  */
        !           355: int
        !           356: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
        !           357: {
        !           358:        int len, ret;
        !           359:        struct mqtthdr *hdr;
        !           360:        mqtt_len_t *v;
        !           361:        caddr_t pos;
        !           362: 
        !           363:        if (!buf || !msgID || !subqos)
        !           364:                return -1;
        !           365: 
        !           366:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
        !           367:        if (!hdr)
        !           368:                return -1;
        !           369:        pos = buf->msg_base + ret + 1;
        !           370:        v = (mqtt_len_t*) pos;
        !           371: 
        !           372:        /* MessageID */
        !           373:        len -= sizeof(mqtt_len_t);
        !           374:        if (len < 0) {
        !           375:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           376:                return -1;
        !           377:        } else {
        !           378:                *msgID = ntohs(v->val);
        !           379:                pos += sizeof(mqtt_len_t);
        !           380:        }
        !           381: 
        !           382:        /* Subscribes */
        !           383:        *subqos = malloc(len);
        !           384:        if (!*subqos) {
        !           385:                LOGERR;
        !           386:                return -1;
        !           387:        } else
        !           388:                memcpy(*subqos, pos, len);
        !           389: 
        !           390:        return len;
        !           391: }
        !           392: 
        !           393: /*
        !           394:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
        !           395:  *
        !           396:  * @buf = Message buffer
        !           397:  * @msgID = MessageID
        !           398:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
        !           399:  * return: NULL error or !=NULL MQTT fixed header
        !           400:  */
        !           401: struct mqtthdr *
        !           402: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
        !           403: {
        !           404:        register int i;
        !           405:        int len, ret;
        !           406:        struct mqtthdr *hdr;
        !           407:        mqtthdr_var_t *var;
        !           408:        mqtt_subscr_t *subs;
        !           409:        mqtt_len_t *v;
        !           410:        caddr_t pos;
        !           411: 
        !           412:        if (!buf || !msgID || !subscr)
        !           413:                return NULL;
        !           414: 
        !           415:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
        !           416:        if (!hdr)
        !           417:                return NULL;
        !           418:        pos = buf->msg_base + ret + 1;
        !           419:        v = (mqtt_len_t*) pos;
        !           420: 
        !           421:        /* MessageID */
        !           422:        len -= sizeof(mqtt_len_t);
        !           423:        if (len < 0) {
        !           424:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           425:                return NULL;
        !           426:        } else {
        !           427:                *msgID = ntohs(v->val);
        !           428:                pos += sizeof(mqtt_len_t);
        !           429:        }
        !           430: 
        !           431:        subs = mqtt_subAlloc(0);
        !           432:        if (!subs)
        !           433:                return NULL;
        !           434:        else
        !           435:                *subscr = subs;
        !           436: 
        !           437:        /* Subscribes */
        !           438:        for (i = 0; len > 0; i++) {
        !           439:                var = (mqtthdr_var_t*) pos;
        !           440:                len -= MQTTHDR_VAR_SIZEOF(var);
        !           441:                if (len < 0) {
        !           442:                        mqtt_subFree(subscr);
        !           443:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           444:                        return NULL;
        !           445:                }
        !           446:                subs = mqtt_subRealloc(subs, i + 1);
        !           447:                if (!subs) {
        !           448:                        mqtt_subFree(subscr);
        !           449:                        return NULL;
        !           450:                } else
        !           451:                        *subscr = subs;
        !           452: 
        !           453:                memset(&subs[i], 0, sizeof subs[i]);
        !           454:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
        !           455:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
        !           456:                if (!subs[i].sub_topic.msg_base) {
        !           457:                        LOGERR;
        !           458:                        mqtt_subFree(subscr);
        !           459:                        return NULL;
        !           460:                } else
        !           461:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
        !           462:                pos += MQTTHDR_VAR_SIZEOF(var);
        !           463:        }
        !           464: 
        !           465:        return hdr;
        !           466: }
        !           467: 
        !           468: /*
        !           469:  * mqtt_readUNSUBACK() Read UNSUBACK message
        !           470:  *
        !           471:  * @buf = Message buffer
        !           472:  * return: -1 error or MessageID
        !           473:  */
        !           474: u_short
        !           475: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
        !           476: {
        !           477:        int len, ret;
        !           478:        struct mqtthdr *hdr;
        !           479:        mqtt_len_t *v;
        !           480:        caddr_t pos;
        !           481: 
        !           482:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
        !           483:        if (!hdr)
        !           484:                return (u_short) -1;
        !           485:        if (len < sizeof(mqtt_len_t)) {
        !           486:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           487:                return (u_short) -1;
        !           488:        } else {
        !           489:                pos = buf->msg_base + ret + 1;
        !           490:                v = (mqtt_len_t*) pos;
        !           491:        }
        !           492: 
        !           493:        return ntohs(v->val);
        !           494: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>