Annotation of libaitmqtt/src/sub.c, revision 1.1.1.1.2.5

1.1       misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.1.1.1.2.5! misho       6: * $Id: sub.c,v 1.1.1.1.2.4 2012/04/27 08:12:30 misho Exp $
1.1       misho       7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
1.1.1.1.2.1  misho      15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1       misho      16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
                     46: #include "global.h"
                     47: 
                     48: 
                     49: /*
                     50:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
                     51:  *
                     52:  * @buf = Message buffer
                     53:  * @Topics = MQTT subscription topics
                     54:  * @msgID = MessageID
                     55:  * @Dup = Duplicate message
                     56:  * @QOS = QoS
                     57:  * return: -1 error or >-1 message size for send
                     58:  */
                     59: int
                     60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                     61:                u_short msgID, u_char Dup, u_char QOS)
                     62: {
                     63:        int siz = 0;
                     64:        struct mqtthdr *hdr;
                     65:        mqtthdr_var_t *topic;
                     66:        mqtt_len_t *mid;
                     67:        mqtt_subscr_t *t;
                     68:        u_char *qos;
                     69: 
                     70:        if (!buf || !Topics)
                     71:                return -1;
                     72:        if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1  misho      73:                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1       misho      74:                return -1;
                     75:        }
                     76:        if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1  misho      77:                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1       misho      78:                return -1;
                     79:        }
                     80: 
                     81:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     82:                return -1;
                     83:        else {
                     84:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     85:                siz += sizeof(struct mqtthdr);
                     86:        }
                     87: 
                     88:        /* variable header */
                     89:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                     90:        mid->val = htons(msgID);
                     91:        siz += sizeof(mqtt_len_t);
                     92: 
                     93:        /* payload with subscriptions */
                     94:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                     95:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                     96:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                     97:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                     98:                siz += MQTTHDR_VAR_SIZEOF(topic);
                     99:                qos = (buf->msg_base + siz);
                    100:                *qos = t->sub_ret;
                    101:                siz++;
                    102:        }
                    103: 
                    104:        /* fixed header */
                    105:        MQTTHDR_MSGINIT(hdr);
                    106:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
                    107:        hdr->mqtt_msg.qos = QOS;
                    108:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    109:        hdr->mqtt_msg.retain = 0;
                    110:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    111: 
                    112:        return siz;
                    113: }
                    114: 
                    115: /*
                    116:  * mqtt_msgSUBACK() Create SUBACK message
                    117:  *
                    118:  * @buf = Message buffer
                    119:  * @Topics = MQTT subscription topics
                    120:  * @msgID = MessageID
                    121:  * return: -1 error or >-1 message size for send
                    122:  */
                    123: int
                    124: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
                    125: {
                    126:        int siz = 0;
                    127:        struct mqtthdr *hdr;
                    128:        mqtt_len_t *v;
                    129:        mqtt_subscr_t *t;
                    130:        u_char *qos;
                    131: 
                    132:        if (!buf || !Topics)
                    133:                return -1;
                    134: 
                    135:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    136:                return -1;
                    137:        else {
                    138:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    139:                siz += sizeof(struct mqtthdr);
                    140:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    141:                siz += sizeof(mqtt_len_t);
                    142:        }
                    143: 
                    144:        /* MessageID */
                    145:        v->val = htons(msgID);
                    146: 
                    147:        /* QoS payload from subscriptions */
                    148:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    149:                qos = (buf->msg_base + siz);
                    150:                *qos = t->sub_ret;
                    151:                siz++;
                    152:        }
                    153: 
                    154:        /* fixed header */
                    155:        MQTTHDR_MSGINIT(hdr);
                    156:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
                    157:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    158: 
                    159:        return siz;
                    160: }
                    161: 
                    162: /*
                    163:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
                    164:  *
                    165:  * @buf = Message buffer
                    166:  * @Topics = MQTT subscription topics
                    167:  * @msgID = MessageID
                    168:  * @Dup = Duplicate message
                    169:  * @QOS = QoS
                    170:  * return: -1 error or >-1 message size for send
                    171:  */
                    172: int
                    173: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                    174:                u_short msgID, u_char Dup, u_char QOS)
                    175: {
                    176:        int siz = 0;
                    177:        struct mqtthdr *hdr;
                    178:        mqtthdr_var_t *topic;
                    179:        mqtt_len_t *mid;
                    180:        mqtt_subscr_t *t;
                    181: 
                    182:        if (!buf || !Topics)
                    183:                return -1;
                    184:        if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1  misho     185:                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1       misho     186:                return -1;
                    187:        }
                    188:        if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1  misho     189:                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1       misho     190:                return -1;
                    191:        }
                    192: 
                    193:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    194:                return -1;
                    195:        else {
                    196:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    197:                siz += sizeof(struct mqtthdr);
                    198:        }
                    199: 
                    200:        /* variable header */
                    201:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                    202:        mid->val = htons(msgID);
                    203:        siz += sizeof(mqtt_len_t);
                    204: 
                    205:        /* payload with subscriptions */
                    206:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    207:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                    208:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                    209:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                    210:                siz += MQTTHDR_VAR_SIZEOF(topic);
                    211:        }
                    212: 
                    213:        /* fixed header */
                    214:        MQTTHDR_MSGINIT(hdr);
                    215:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
                    216:        hdr->mqtt_msg.qos = QOS;
                    217:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    218:        hdr->mqtt_msg.retain = 0;
                    219:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    220: 
                    221:        return siz;
                    222: }
                    223: 
                    224: /*
                    225:  * mqtt_msgUNSUBACK() Create UNSUBACK message
                    226:  *
                    227:  * @buf = Message buffer
                    228:  * @msgID = MessageID
                    229:  * return: -1 error or >-1 message size for send
                    230:  */
                    231: int
                    232: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    233: {
                    234:        int siz = 0;
                    235:        struct mqtthdr *hdr;
                    236:        mqtt_len_t *v;
                    237: 
                    238:        if (!buf)
                    239:                return -1;
                    240: 
                    241:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
                    242:                return -1;
                    243:        else {
                    244:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    245:                siz += sizeof(struct mqtthdr);
                    246:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    247:                siz += sizeof(mqtt_len_t);
                    248:        }
                    249: 
                    250:        /* fixed header */
                    251:        MQTTHDR_MSGINIT(hdr);
                    252:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
                    253:        *hdr->mqtt_len = sizeof(mqtt_len_t);
                    254: 
                    255:        /* MessageID */
                    256:        v->val = htons(msgID);
                    257: 
                    258:        return siz;
                    259: }
                    260: 
                    261: 
                    262: /* ============= decode ============ */
                    263: 
                    264: /*
                    265:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
                    266:  *
                    267:  * @buf = Message buffer
                    268:  * @msgID = MessageID
                    269:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2  misho     270:  * return: -1 error or >-1 elements into subscr
1.1       misho     271:  */
1.1.1.1.2.2  misho     272: int
1.1       misho     273: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    274: {
                    275:        register int i;
                    276:        int len, ret;
                    277:        struct mqtthdr *hdr;
                    278:        mqtthdr_var_t *var;
                    279:        mqtt_subscr_t *subs;
                    280:        mqtt_len_t *v;
                    281:        caddr_t pos;
                    282: 
                    283:        if (!buf || !msgID || !subscr)
1.1.1.1.2.2  misho     284:                return -1;
1.1       misho     285: 
                    286:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
                    287:        if (!hdr)
1.1.1.1.2.2  misho     288:                return -1;
1.1       misho     289:        pos = buf->msg_base + ret + 1;
                    290:        v = (mqtt_len_t*) pos;
                    291: 
                    292:        /* MessageID */
                    293:        len -= sizeof(mqtt_len_t);
                    294:        if (len < 0) {
1.1.1.1.2.1  misho     295:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     296:                return -1;
1.1       misho     297:        } else {
                    298:                *msgID = ntohs(v->val);
                    299:                pos += sizeof(mqtt_len_t);
                    300:        }
                    301: 
                    302:        subs = mqtt_subAlloc(0);
                    303:        if (!subs)
1.1.1.1.2.2  misho     304:                return -1;
1.1       misho     305:        else
                    306:                *subscr = subs;
                    307: 
                    308:        /* Subscribes */
                    309:        for (i = 0; len > 0; i++) {
                    310:                var = (mqtthdr_var_t*) pos;
                    311:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
                    312:                if (len < 0) {
                    313:                        mqtt_subFree(subscr);
1.1.1.1.2.1  misho     314:                        mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     315:                        return -1;
1.1       misho     316:                }
1.1.1.1.2.4  misho     317:                if (!mqtt_subRealloc(&subs, i + 1)) {
1.1       misho     318:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     319:                        return -1;
1.1       misho     320:                } else
                    321:                        *subscr = subs;
                    322: 
                    323:                memset(&subs[i], 0, sizeof subs[i]);
                    324:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.1.1.1.2.5! misho     325:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1       misho     326:                if (!subs[i].sub_topic.msg_base) {
                    327:                        LOGERR;
                    328:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     329:                        return -1;
1.1.1.1.2.5! misho     330:                } else {
1.1       misho     331:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.1.1.1.2.5! misho     332:                        ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
        !           333:                }
1.1       misho     334:                pos += MQTTHDR_VAR_SIZEOF(var);
                    335: 
                    336:                subs[i].sub_ret = *pos;
                    337:                pos++;
                    338:        }
                    339: 
1.1.1.1.2.2  misho     340:        return i;
1.1       misho     341: }
                    342: 
                    343: /*
                    344:  * mqtt_readSUBACK() Read SUBACK message
                    345:  *
                    346:  * @buf = Message buffer
                    347:  * @msgID = MessageID
                    348:  * @subqos = Subscribes QoS, must be free after use with free()
                    349:  * return: -1 error or >-1 readed subscribes QoS elements
                    350:  */
                    351: int
                    352: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
                    353: {
                    354:        int len, ret;
                    355:        struct mqtthdr *hdr;
                    356:        mqtt_len_t *v;
                    357:        caddr_t pos;
                    358: 
                    359:        if (!buf || !msgID || !subqos)
                    360:                return -1;
                    361: 
                    362:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
                    363:        if (!hdr)
                    364:                return -1;
                    365:        pos = buf->msg_base + ret + 1;
                    366:        v = (mqtt_len_t*) pos;
                    367: 
                    368:        /* MessageID */
                    369:        len -= sizeof(mqtt_len_t);
                    370:        if (len < 0) {
1.1.1.1.2.1  misho     371:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1       misho     372:                return -1;
                    373:        } else {
                    374:                *msgID = ntohs(v->val);
                    375:                pos += sizeof(mqtt_len_t);
                    376:        }
                    377: 
                    378:        /* Subscribes */
                    379:        *subqos = malloc(len);
                    380:        if (!*subqos) {
                    381:                LOGERR;
                    382:                return -1;
                    383:        } else
                    384:                memcpy(*subqos, pos, len);
                    385: 
                    386:        return len;
                    387: }
                    388: 
                    389: /*
                    390:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
                    391:  *
                    392:  * @buf = Message buffer
                    393:  * @msgID = MessageID
                    394:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2  misho     395:  * return: -1 error or >-1 elements into subscr
1.1       misho     396:  */
1.1.1.1.2.2  misho     397: int
1.1       misho     398: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    399: {
                    400:        register int i;
                    401:        int len, ret;
                    402:        struct mqtthdr *hdr;
                    403:        mqtthdr_var_t *var;
                    404:        mqtt_subscr_t *subs;
                    405:        mqtt_len_t *v;
                    406:        caddr_t pos;
                    407: 
                    408:        if (!buf || !msgID || !subscr)
1.1.1.1.2.2  misho     409:                return -1;
1.1       misho     410: 
                    411:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
                    412:        if (!hdr)
1.1.1.1.2.2  misho     413:                return -1;
1.1       misho     414:        pos = buf->msg_base + ret + 1;
                    415:        v = (mqtt_len_t*) pos;
                    416: 
                    417:        /* MessageID */
                    418:        len -= sizeof(mqtt_len_t);
                    419:        if (len < 0) {
1.1.1.1.2.1  misho     420:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     421:                return -1;
1.1       misho     422:        } else {
                    423:                *msgID = ntohs(v->val);
                    424:                pos += sizeof(mqtt_len_t);
                    425:        }
                    426: 
                    427:        subs = mqtt_subAlloc(0);
                    428:        if (!subs)
1.1.1.1.2.2  misho     429:                return -1;
1.1       misho     430:        else
                    431:                *subscr = subs;
                    432: 
                    433:        /* Subscribes */
                    434:        for (i = 0; len > 0; i++) {
                    435:                var = (mqtthdr_var_t*) pos;
                    436:                len -= MQTTHDR_VAR_SIZEOF(var);
                    437:                if (len < 0) {
                    438:                        mqtt_subFree(subscr);
1.1.1.1.2.1  misho     439:                        mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     440:                        return -1;
1.1       misho     441:                }
1.1.1.1.2.4  misho     442:                if (!mqtt_subRealloc(&subs, i + 1)) {
1.1       misho     443:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     444:                        return -1;
1.1       misho     445:                } else
                    446:                        *subscr = subs;
                    447: 
                    448:                memset(&subs[i], 0, sizeof subs[i]);
                    449:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
1.1.1.1.2.5! misho     450:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
1.1       misho     451:                if (!subs[i].sub_topic.msg_base) {
                    452:                        LOGERR;
                    453:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     454:                        return -1;
1.1.1.1.2.5! misho     455:                } else {
1.1       misho     456:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
1.1.1.1.2.5! misho     457:                        ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
        !           458:                }
1.1       misho     459:                pos += MQTTHDR_VAR_SIZEOF(var);
                    460:        }
                    461: 
1.1.1.1.2.2  misho     462:        return i;
1.1       misho     463: }
                    464: 
                    465: /*
                    466:  * mqtt_readUNSUBACK() Read UNSUBACK message
                    467:  *
                    468:  * @buf = Message buffer
                    469:  * return: -1 error or MessageID
                    470:  */
                    471: u_short
                    472: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
                    473: {
                    474:        int len, ret;
                    475:        struct mqtthdr *hdr;
                    476:        mqtt_len_t *v;
                    477:        caddr_t pos;
                    478: 
                    479:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
                    480:        if (!hdr)
                    481:                return (u_short) -1;
                    482:        if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1  misho     483:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1       misho     484:                return (u_short) -1;
                    485:        } else {
                    486:                pos = buf->msg_base + ret + 1;
                    487:                v = (mqtt_len_t*) pos;
                    488:        }
                    489: 
                    490:        return ntohs(v->val);
                    491: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>