Annotation of libaitmqtt/src/sub.c, revision 1.1.1.1.2.4

1.1       misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.1.1.1.2.4! misho       6: * $Id: sub.c,v 1.1.1.1.2.3 2012/04/26 12:33:14 misho Exp $
1.1       misho       7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
1.1.1.1.2.1  misho      15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1       misho      16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
                     46: #include "global.h"
                     47: 
                     48: 
                     49: /*
                     50:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
                     51:  *
                     52:  * @buf = Message buffer
                     53:  * @Topics = MQTT subscription topics
                     54:  * @msgID = MessageID
                     55:  * @Dup = Duplicate message
                     56:  * @QOS = QoS
                     57:  * return: -1 error or >-1 message size for send
                     58:  */
                     59: int
                     60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                     61:                u_short msgID, u_char Dup, u_char QOS)
                     62: {
                     63:        int siz = 0;
                     64:        struct mqtthdr *hdr;
                     65:        mqtthdr_var_t *topic;
                     66:        mqtt_len_t *mid;
                     67:        mqtt_subscr_t *t;
                     68:        u_char *qos;
                     69: 
                     70:        if (!buf || !Topics)
                     71:                return -1;
                     72:        if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1  misho      73:                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1       misho      74:                return -1;
                     75:        }
                     76:        if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1  misho      77:                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1       misho      78:                return -1;
                     79:        }
                     80: 
                     81:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     82:                return -1;
                     83:        else {
                     84:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     85:                siz += sizeof(struct mqtthdr);
                     86:        }
                     87: 
                     88:        /* variable header */
                     89:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                     90:        mid->val = htons(msgID);
                     91:        siz += sizeof(mqtt_len_t);
                     92: 
                     93:        /* payload with subscriptions */
                     94:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                     95:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                     96:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                     97:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                     98:                siz += MQTTHDR_VAR_SIZEOF(topic);
                     99:                qos = (buf->msg_base + siz);
                    100:                *qos = t->sub_ret;
                    101:                siz++;
                    102:        }
                    103: 
                    104:        /* fixed header */
                    105:        MQTTHDR_MSGINIT(hdr);
                    106:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
                    107:        hdr->mqtt_msg.qos = QOS;
                    108:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    109:        hdr->mqtt_msg.retain = 0;
                    110:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    111: 
                    112:        return siz;
                    113: }
                    114: 
                    115: /*
                    116:  * mqtt_msgSUBACK() Create SUBACK message
                    117:  *
                    118:  * @buf = Message buffer
                    119:  * @Topics = MQTT subscription topics
                    120:  * @msgID = MessageID
                    121:  * return: -1 error or >-1 message size for send
                    122:  */
                    123: int
                    124: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
                    125: {
                    126:        int siz = 0;
                    127:        struct mqtthdr *hdr;
                    128:        mqtt_len_t *v;
                    129:        mqtt_subscr_t *t;
                    130:        u_char *qos;
                    131: 
                    132:        if (!buf || !Topics)
                    133:                return -1;
                    134: 
                    135:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    136:                return -1;
                    137:        else {
                    138:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    139:                siz += sizeof(struct mqtthdr);
                    140:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    141:                siz += sizeof(mqtt_len_t);
                    142:        }
                    143: 
                    144:        /* MessageID */
                    145:        v->val = htons(msgID);
                    146: 
                    147:        /* QoS payload from subscriptions */
                    148:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    149:                qos = (buf->msg_base + siz);
                    150:                *qos = t->sub_ret;
                    151:                siz++;
                    152:        }
                    153: 
                    154:        /* fixed header */
                    155:        MQTTHDR_MSGINIT(hdr);
                    156:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
                    157:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    158: 
                    159:        return siz;
                    160: }
                    161: 
                    162: /*
                    163:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
                    164:  *
                    165:  * @buf = Message buffer
                    166:  * @Topics = MQTT subscription topics
                    167:  * @msgID = MessageID
                    168:  * @Dup = Duplicate message
                    169:  * @QOS = QoS
                    170:  * return: -1 error or >-1 message size for send
                    171:  */
                    172: int
                    173: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                    174:                u_short msgID, u_char Dup, u_char QOS)
                    175: {
                    176:        int siz = 0;
                    177:        struct mqtthdr *hdr;
                    178:        mqtthdr_var_t *topic;
                    179:        mqtt_len_t *mid;
                    180:        mqtt_subscr_t *t;
                    181: 
                    182:        if (!buf || !Topics)
                    183:                return -1;
                    184:        if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1  misho     185:                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1       misho     186:                return -1;
                    187:        }
                    188:        if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1  misho     189:                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1       misho     190:                return -1;
                    191:        }
                    192: 
                    193:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    194:                return -1;
                    195:        else {
                    196:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    197:                siz += sizeof(struct mqtthdr);
                    198:        }
                    199: 
                    200:        /* variable header */
                    201:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                    202:        mid->val = htons(msgID);
                    203:        siz += sizeof(mqtt_len_t);
                    204: 
                    205:        /* payload with subscriptions */
                    206:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    207:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                    208:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                    209:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                    210:                siz += MQTTHDR_VAR_SIZEOF(topic);
                    211:        }
                    212: 
                    213:        /* fixed header */
                    214:        MQTTHDR_MSGINIT(hdr);
                    215:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
                    216:        hdr->mqtt_msg.qos = QOS;
                    217:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    218:        hdr->mqtt_msg.retain = 0;
                    219:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    220: 
                    221:        return siz;
                    222: }
                    223: 
                    224: /*
                    225:  * mqtt_msgUNSUBACK() Create UNSUBACK message
                    226:  *
                    227:  * @buf = Message buffer
                    228:  * @msgID = MessageID
                    229:  * return: -1 error or >-1 message size for send
                    230:  */
                    231: int
                    232: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    233: {
                    234:        int siz = 0;
                    235:        struct mqtthdr *hdr;
                    236:        mqtt_len_t *v;
                    237: 
                    238:        if (!buf)
                    239:                return -1;
                    240: 
                    241:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
                    242:                return -1;
                    243:        else {
                    244:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    245:                siz += sizeof(struct mqtthdr);
                    246:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    247:                siz += sizeof(mqtt_len_t);
                    248:        }
                    249: 
                    250:        /* fixed header */
                    251:        MQTTHDR_MSGINIT(hdr);
                    252:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
                    253:        *hdr->mqtt_len = sizeof(mqtt_len_t);
                    254: 
                    255:        /* MessageID */
                    256:        v->val = htons(msgID);
                    257: 
                    258:        return siz;
                    259: }
                    260: 
                    261: 
                    262: /* ============= decode ============ */
                    263: 
                    264: /*
                    265:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
                    266:  *
                    267:  * @buf = Message buffer
                    268:  * @msgID = MessageID
                    269:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2  misho     270:  * return: -1 error or >-1 elements into subscr
1.1       misho     271:  */
1.1.1.1.2.2  misho     272: int
1.1       misho     273: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    274: {
                    275:        register int i;
                    276:        int len, ret;
                    277:        struct mqtthdr *hdr;
                    278:        mqtthdr_var_t *var;
                    279:        mqtt_subscr_t *subs;
                    280:        mqtt_len_t *v;
                    281:        caddr_t pos;
                    282: 
                    283:        if (!buf || !msgID || !subscr)
1.1.1.1.2.2  misho     284:                return -1;
1.1       misho     285: 
                    286:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
                    287:        if (!hdr)
1.1.1.1.2.2  misho     288:                return -1;
1.1       misho     289:        pos = buf->msg_base + ret + 1;
                    290:        v = (mqtt_len_t*) pos;
                    291: 
                    292:        /* MessageID */
                    293:        len -= sizeof(mqtt_len_t);
                    294:        if (len < 0) {
1.1.1.1.2.1  misho     295:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     296:                return -1;
1.1       misho     297:        } else {
                    298:                *msgID = ntohs(v->val);
                    299:                pos += sizeof(mqtt_len_t);
                    300:        }
                    301: 
                    302:        subs = mqtt_subAlloc(0);
                    303:        if (!subs)
1.1.1.1.2.2  misho     304:                return -1;
1.1       misho     305:        else
                    306:                *subscr = subs;
                    307: 
                    308:        /* Subscribes */
                    309:        for (i = 0; len > 0; i++) {
                    310:                var = (mqtthdr_var_t*) pos;
                    311:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
                    312:                if (len < 0) {
                    313:                        mqtt_subFree(subscr);
1.1.1.1.2.1  misho     314:                        mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     315:                        return -1;
1.1       misho     316:                }
1.1.1.1.2.4! misho     317:                if (!mqtt_subRealloc(&subs, i + 1)) {
1.1       misho     318:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     319:                        return -1;
1.1       misho     320:                } else
                    321:                        *subscr = subs;
                    322: 
                    323:                memset(&subs[i], 0, sizeof subs[i]);
                    324:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                    325:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
                    326:                if (!subs[i].sub_topic.msg_base) {
                    327:                        LOGERR;
                    328:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     329:                        return -1;
1.1       misho     330:                } else
                    331:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                    332:                pos += MQTTHDR_VAR_SIZEOF(var);
                    333: 
                    334:                subs[i].sub_ret = *pos;
                    335:                pos++;
                    336:        }
                    337: 
1.1.1.1.2.2  misho     338:        return i;
1.1       misho     339: }
                    340: 
                    341: /*
                    342:  * mqtt_readSUBACK() Read SUBACK message
                    343:  *
                    344:  * @buf = Message buffer
                    345:  * @msgID = MessageID
                    346:  * @subqos = Subscribes QoS, must be free after use with free()
                    347:  * return: -1 error or >-1 readed subscribes QoS elements
                    348:  */
                    349: int
                    350: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
                    351: {
                    352:        int len, ret;
                    353:        struct mqtthdr *hdr;
                    354:        mqtt_len_t *v;
                    355:        caddr_t pos;
                    356: 
                    357:        if (!buf || !msgID || !subqos)
                    358:                return -1;
                    359: 
                    360:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
                    361:        if (!hdr)
                    362:                return -1;
                    363:        pos = buf->msg_base + ret + 1;
                    364:        v = (mqtt_len_t*) pos;
                    365: 
                    366:        /* MessageID */
                    367:        len -= sizeof(mqtt_len_t);
                    368:        if (len < 0) {
1.1.1.1.2.1  misho     369:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1       misho     370:                return -1;
                    371:        } else {
                    372:                *msgID = ntohs(v->val);
                    373:                pos += sizeof(mqtt_len_t);
                    374:        }
                    375: 
                    376:        /* Subscribes */
                    377:        *subqos = malloc(len);
                    378:        if (!*subqos) {
                    379:                LOGERR;
                    380:                return -1;
                    381:        } else
                    382:                memcpy(*subqos, pos, len);
                    383: 
                    384:        return len;
                    385: }
                    386: 
                    387: /*
                    388:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
                    389:  *
                    390:  * @buf = Message buffer
                    391:  * @msgID = MessageID
                    392:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2  misho     393:  * return: -1 error or >-1 elements into subscr
1.1       misho     394:  */
1.1.1.1.2.2  misho     395: int
1.1       misho     396: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    397: {
                    398:        register int i;
                    399:        int len, ret;
                    400:        struct mqtthdr *hdr;
                    401:        mqtthdr_var_t *var;
                    402:        mqtt_subscr_t *subs;
                    403:        mqtt_len_t *v;
                    404:        caddr_t pos;
                    405: 
                    406:        if (!buf || !msgID || !subscr)
1.1.1.1.2.2  misho     407:                return -1;
1.1       misho     408: 
                    409:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
                    410:        if (!hdr)
1.1.1.1.2.2  misho     411:                return -1;
1.1       misho     412:        pos = buf->msg_base + ret + 1;
                    413:        v = (mqtt_len_t*) pos;
                    414: 
                    415:        /* MessageID */
                    416:        len -= sizeof(mqtt_len_t);
                    417:        if (len < 0) {
1.1.1.1.2.1  misho     418:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     419:                return -1;
1.1       misho     420:        } else {
                    421:                *msgID = ntohs(v->val);
                    422:                pos += sizeof(mqtt_len_t);
                    423:        }
                    424: 
                    425:        subs = mqtt_subAlloc(0);
                    426:        if (!subs)
1.1.1.1.2.2  misho     427:                return -1;
1.1       misho     428:        else
                    429:                *subscr = subs;
                    430: 
                    431:        /* Subscribes */
                    432:        for (i = 0; len > 0; i++) {
                    433:                var = (mqtthdr_var_t*) pos;
                    434:                len -= MQTTHDR_VAR_SIZEOF(var);
                    435:                if (len < 0) {
                    436:                        mqtt_subFree(subscr);
1.1.1.1.2.1  misho     437:                        mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     438:                        return -1;
1.1       misho     439:                }
1.1.1.1.2.4! misho     440:                if (!mqtt_subRealloc(&subs, i + 1)) {
1.1       misho     441:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     442:                        return -1;
1.1       misho     443:                } else
                    444:                        *subscr = subs;
                    445: 
                    446:                memset(&subs[i], 0, sizeof subs[i]);
                    447:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                    448:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
                    449:                if (!subs[i].sub_topic.msg_base) {
                    450:                        LOGERR;
                    451:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     452:                        return -1;
1.1       misho     453:                } else
                    454:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                    455:                pos += MQTTHDR_VAR_SIZEOF(var);
                    456:        }
                    457: 
1.1.1.1.2.2  misho     458:        return i;
1.1       misho     459: }
                    460: 
                    461: /*
                    462:  * mqtt_readUNSUBACK() Read UNSUBACK message
                    463:  *
                    464:  * @buf = Message buffer
                    465:  * return: -1 error or MessageID
                    466:  */
                    467: u_short
                    468: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
                    469: {
                    470:        int len, ret;
                    471:        struct mqtthdr *hdr;
                    472:        mqtt_len_t *v;
                    473:        caddr_t pos;
                    474: 
                    475:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
                    476:        if (!hdr)
                    477:                return (u_short) -1;
                    478:        if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1  misho     479:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1       misho     480:                return (u_short) -1;
                    481:        } else {
                    482:                pos = buf->msg_base + ret + 1;
                    483:                v = (mqtt_len_t*) pos;
                    484:        }
                    485: 
                    486:        return ntohs(v->val);
                    487: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>