Annotation of libaitmqtt/src/sub.c, revision 1.1.1.1.2.3

1.1       misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.1.1.1.2.3! misho       6: * $Id: sub.c,v 1.1.1.1.2.2 2012/04/26 11:49:12 misho Exp $
1.1       misho       7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
1.1.1.1.2.1  misho      15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1       misho      16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
                     46: #include "global.h"
                     47: 
                     48: 
                     49: /*
                     50:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
                     51:  *
                     52:  * @buf = Message buffer
                     53:  * @Topics = MQTT subscription topics
                     54:  * @msgID = MessageID
                     55:  * @Dup = Duplicate message
                     56:  * @QOS = QoS
                     57:  * return: -1 error or >-1 message size for send
                     58:  */
                     59: int
                     60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                     61:                u_short msgID, u_char Dup, u_char QOS)
                     62: {
                     63:        int siz = 0;
                     64:        struct mqtthdr *hdr;
                     65:        mqtthdr_var_t *topic;
                     66:        mqtt_len_t *mid;
                     67:        mqtt_subscr_t *t;
                     68:        u_char *qos;
                     69: 
                     70:        if (!buf || !Topics)
                     71:                return -1;
                     72:        if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1  misho      73:                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1       misho      74:                return -1;
                     75:        }
                     76:        if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1  misho      77:                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1       misho      78:                return -1;
                     79:        }
                     80: 
                     81:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     82:                return -1;
                     83:        else {
                     84:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     85:                siz += sizeof(struct mqtthdr);
                     86:        }
                     87: 
                     88:        /* variable header */
                     89:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                     90:        mid->val = htons(msgID);
                     91:        siz += sizeof(mqtt_len_t);
                     92: 
                     93:        /* payload with subscriptions */
                     94:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                     95:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                     96:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                     97:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                     98:                siz += MQTTHDR_VAR_SIZEOF(topic);
                     99:                qos = (buf->msg_base + siz);
                    100:                *qos = t->sub_ret;
                    101:                siz++;
                    102:        }
                    103: 
                    104:        /* fixed header */
                    105:        MQTTHDR_MSGINIT(hdr);
                    106:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
                    107:        hdr->mqtt_msg.qos = QOS;
                    108:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    109:        hdr->mqtt_msg.retain = 0;
                    110:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    111: 
                    112:        return siz;
                    113: }
                    114: 
                    115: /*
                    116:  * mqtt_msgSUBACK() Create SUBACK message
                    117:  *
                    118:  * @buf = Message buffer
                    119:  * @Topics = MQTT subscription topics
                    120:  * @msgID = MessageID
                    121:  * return: -1 error or >-1 message size for send
                    122:  */
                    123: int
                    124: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
                    125: {
                    126:        int siz = 0;
                    127:        struct mqtthdr *hdr;
                    128:        mqtt_len_t *v;
                    129:        mqtt_subscr_t *t;
                    130:        u_char *qos;
                    131: 
                    132:        if (!buf || !Topics)
                    133:                return -1;
                    134: 
                    135:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    136:                return -1;
                    137:        else {
                    138:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    139:                siz += sizeof(struct mqtthdr);
                    140:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    141:                siz += sizeof(mqtt_len_t);
                    142:        }
                    143: 
                    144:        /* MessageID */
                    145:        v->val = htons(msgID);
                    146: 
                    147:        /* QoS payload from subscriptions */
                    148:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    149:                qos = (buf->msg_base + siz);
                    150:                *qos = t->sub_ret;
                    151:                siz++;
                    152:        }
                    153: 
                    154:        /* fixed header */
                    155:        MQTTHDR_MSGINIT(hdr);
                    156:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
                    157:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    158: 
                    159:        return siz;
                    160: }
                    161: 
                    162: /*
                    163:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
                    164:  *
                    165:  * @buf = Message buffer
                    166:  * @Topics = MQTT subscription topics
                    167:  * @msgID = MessageID
                    168:  * @Dup = Duplicate message
                    169:  * @QOS = QoS
                    170:  * return: -1 error or >-1 message size for send
                    171:  */
                    172: int
                    173: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                    174:                u_short msgID, u_char Dup, u_char QOS)
                    175: {
                    176:        int siz = 0;
                    177:        struct mqtthdr *hdr;
                    178:        mqtthdr_var_t *topic;
                    179:        mqtt_len_t *mid;
                    180:        mqtt_subscr_t *t;
                    181: 
                    182:        if (!buf || !Topics)
                    183:                return -1;
                    184:        if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1  misho     185:                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1       misho     186:                return -1;
                    187:        }
                    188:        if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1  misho     189:                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1       misho     190:                return -1;
                    191:        }
                    192: 
                    193:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    194:                return -1;
                    195:        else {
                    196:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    197:                siz += sizeof(struct mqtthdr);
                    198:        }
                    199: 
                    200:        /* variable header */
                    201:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                    202:        mid->val = htons(msgID);
                    203:        siz += sizeof(mqtt_len_t);
                    204: 
                    205:        /* payload with subscriptions */
                    206:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    207:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                    208:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                    209:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                    210:                siz += MQTTHDR_VAR_SIZEOF(topic);
                    211:        }
                    212: 
                    213:        /* fixed header */
                    214:        MQTTHDR_MSGINIT(hdr);
                    215:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
                    216:        hdr->mqtt_msg.qos = QOS;
                    217:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    218:        hdr->mqtt_msg.retain = 0;
                    219:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    220: 
                    221:        return siz;
                    222: }
                    223: 
                    224: /*
                    225:  * mqtt_msgUNSUBACK() Create UNSUBACK message
                    226:  *
                    227:  * @buf = Message buffer
                    228:  * @msgID = MessageID
                    229:  * return: -1 error or >-1 message size for send
                    230:  */
                    231: int
                    232: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    233: {
                    234:        int siz = 0;
                    235:        struct mqtthdr *hdr;
                    236:        mqtt_len_t *v;
                    237: 
                    238:        if (!buf)
                    239:                return -1;
                    240: 
                    241:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
                    242:                return -1;
                    243:        else {
                    244:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    245:                siz += sizeof(struct mqtthdr);
                    246:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    247:                siz += sizeof(mqtt_len_t);
                    248:        }
                    249: 
                    250:        /* fixed header */
                    251:        MQTTHDR_MSGINIT(hdr);
                    252:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
                    253:        *hdr->mqtt_len = sizeof(mqtt_len_t);
                    254: 
                    255:        /* MessageID */
                    256:        v->val = htons(msgID);
                    257: 
                    258:        return siz;
                    259: }
                    260: 
                    261: 
                    262: /* ============= decode ============ */
                    263: 
                    264: /*
                    265:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
                    266:  *
                    267:  * @buf = Message buffer
                    268:  * @msgID = MessageID
                    269:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2  misho     270:  * return: -1 error or >-1 elements into subscr
1.1       misho     271:  */
1.1.1.1.2.2  misho     272: int
1.1       misho     273: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    274: {
                    275:        register int i;
                    276:        int len, ret;
                    277:        struct mqtthdr *hdr;
                    278:        mqtthdr_var_t *var;
                    279:        mqtt_subscr_t *subs;
                    280:        mqtt_len_t *v;
                    281:        caddr_t pos;
                    282: 
                    283:        if (!buf || !msgID || !subscr)
1.1.1.1.2.2  misho     284:                return -1;
1.1       misho     285: 
                    286:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
                    287:        if (!hdr)
1.1.1.1.2.2  misho     288:                return -1;
1.1       misho     289:        pos = buf->msg_base + ret + 1;
                    290:        v = (mqtt_len_t*) pos;
                    291: 
                    292:        /* MessageID */
                    293:        len -= sizeof(mqtt_len_t);
                    294:        if (len < 0) {
1.1.1.1.2.1  misho     295:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     296:                return -1;
1.1       misho     297:        } else {
                    298:                *msgID = ntohs(v->val);
                    299:                pos += sizeof(mqtt_len_t);
                    300:        }
                    301: 
                    302:        subs = mqtt_subAlloc(0);
                    303:        if (!subs)
1.1.1.1.2.2  misho     304:                return -1;
1.1       misho     305:        else
                    306:                *subscr = subs;
                    307: 
                    308:        /* Subscribes */
                    309:        for (i = 0; len > 0; i++) {
                    310:                var = (mqtthdr_var_t*) pos;
                    311:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
                    312:                if (len < 0) {
                    313:                        mqtt_subFree(subscr);
1.1.1.1.2.1  misho     314:                        mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     315:                        return -1;
1.1       misho     316:                }
                    317:                subs = mqtt_subRealloc(subs, i + 1);
                    318:                if (!subs) {
                    319:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     320:                        return -1;
1.1       misho     321:                } else
                    322:                        *subscr = subs;
                    323: 
                    324:                memset(&subs[i], 0, sizeof subs[i]);
                    325:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                    326:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
                    327:                if (!subs[i].sub_topic.msg_base) {
                    328:                        LOGERR;
                    329:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     330:                        return -1;
1.1       misho     331:                } else
                    332:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                    333:                pos += MQTTHDR_VAR_SIZEOF(var);
                    334: 
                    335:                subs[i].sub_ret = *pos;
                    336:                pos++;
                    337:        }
                    338: 
1.1.1.1.2.2  misho     339:        return i;
1.1       misho     340: }
                    341: 
                    342: /*
                    343:  * mqtt_readSUBACK() Read SUBACK message
                    344:  *
                    345:  * @buf = Message buffer
                    346:  * @msgID = MessageID
                    347:  * @subqos = Subscribes QoS, must be free after use with free()
                    348:  * return: -1 error or >-1 readed subscribes QoS elements
                    349:  */
                    350: int
                    351: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
                    352: {
                    353:        int len, ret;
                    354:        struct mqtthdr *hdr;
                    355:        mqtt_len_t *v;
                    356:        caddr_t pos;
                    357: 
                    358:        if (!buf || !msgID || !subqos)
                    359:                return -1;
                    360: 
                    361:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
                    362:        if (!hdr)
                    363:                return -1;
                    364:        pos = buf->msg_base + ret + 1;
                    365:        v = (mqtt_len_t*) pos;
                    366: 
                    367:        /* MessageID */
                    368:        len -= sizeof(mqtt_len_t);
                    369:        if (len < 0) {
1.1.1.1.2.1  misho     370:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1       misho     371:                return -1;
                    372:        } else {
                    373:                *msgID = ntohs(v->val);
                    374:                pos += sizeof(mqtt_len_t);
                    375:        }
                    376: 
                    377:        /* Subscribes */
                    378:        *subqos = malloc(len);
                    379:        if (!*subqos) {
                    380:                LOGERR;
                    381:                return -1;
                    382:        } else
                    383:                memcpy(*subqos, pos, len);
                    384: 
                    385:        return len;
                    386: }
                    387: 
                    388: /*
                    389:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
                    390:  *
                    391:  * @buf = Message buffer
                    392:  * @msgID = MessageID
                    393:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2  misho     394:  * return: -1 error or >-1 elements into subscr
1.1       misho     395:  */
1.1.1.1.2.2  misho     396: int
1.1       misho     397: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    398: {
                    399:        register int i;
                    400:        int len, ret;
                    401:        struct mqtthdr *hdr;
                    402:        mqtthdr_var_t *var;
                    403:        mqtt_subscr_t *subs;
                    404:        mqtt_len_t *v;
                    405:        caddr_t pos;
                    406: 
                    407:        if (!buf || !msgID || !subscr)
1.1.1.1.2.2  misho     408:                return -1;
1.1       misho     409: 
                    410:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
                    411:        if (!hdr)
1.1.1.1.2.2  misho     412:                return -1;
1.1       misho     413:        pos = buf->msg_base + ret + 1;
                    414:        v = (mqtt_len_t*) pos;
                    415: 
                    416:        /* MessageID */
                    417:        len -= sizeof(mqtt_len_t);
                    418:        if (len < 0) {
1.1.1.1.2.1  misho     419:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     420:                return -1;
1.1       misho     421:        } else {
                    422:                *msgID = ntohs(v->val);
                    423:                pos += sizeof(mqtt_len_t);
                    424:        }
                    425: 
                    426:        subs = mqtt_subAlloc(0);
                    427:        if (!subs)
1.1.1.1.2.2  misho     428:                return -1;
1.1       misho     429:        else
                    430:                *subscr = subs;
                    431: 
                    432:        /* Subscribes */
                    433:        for (i = 0; len > 0; i++) {
                    434:                var = (mqtthdr_var_t*) pos;
                    435:                len -= MQTTHDR_VAR_SIZEOF(var);
                    436:                if (len < 0) {
                    437:                        mqtt_subFree(subscr);
1.1.1.1.2.1  misho     438:                        mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2  misho     439:                        return -1;
1.1       misho     440:                }
                    441:                subs = mqtt_subRealloc(subs, i + 1);
                    442:                if (!subs) {
                    443:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     444:                        return -1;
1.1       misho     445:                } else
                    446:                        *subscr = subs;
                    447: 
                    448:                memset(&subs[i], 0, sizeof subs[i]);
                    449:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                    450:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
                    451:                if (!subs[i].sub_topic.msg_base) {
                    452:                        LOGERR;
                    453:                        mqtt_subFree(subscr);
1.1.1.1.2.2  misho     454:                        return -1;
1.1       misho     455:                } else
                    456:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                    457:                pos += MQTTHDR_VAR_SIZEOF(var);
                    458:        }
                    459: 
1.1.1.1.2.2  misho     460:        return i;
1.1       misho     461: }
                    462: 
                    463: /*
                    464:  * mqtt_readUNSUBACK() Read UNSUBACK message
                    465:  *
                    466:  * @buf = Message buffer
                    467:  * return: -1 error or MessageID
                    468:  */
                    469: u_short
                    470: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
                    471: {
                    472:        int len, ret;
                    473:        struct mqtthdr *hdr;
                    474:        mqtt_len_t *v;
                    475:        caddr_t pos;
                    476: 
                    477:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
                    478:        if (!hdr)
                    479:                return (u_short) -1;
                    480:        if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1  misho     481:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1       misho     482:                return (u_short) -1;
                    483:        } else {
                    484:                pos = buf->msg_base + ret + 1;
                    485:                v = (mqtt_len_t*) pos;
                    486:        }
                    487: 
                    488:        return ntohs(v->val);
                    489: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>