Annotation of libaitmqtt/src/sub.c, revision 1.1.1.1

1.1       misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
                      6: * $Id: aitsched.c,v 1.5 2012/01/24 21:59:47 misho Exp $
                      7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
                     15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
                     16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
                     46: #include "global.h"
                     47: 
                     48: 
                     49: /* ------------------------------------------------------------------- */
                     50: 
                     51: /*
                     52:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
                     53:  *
                     54:  * @buf = Message buffer
                     55:  * @Topics = MQTT subscription topics
                     56:  * @msgID = MessageID
                     57:  * @Dup = Duplicate message
                     58:  * @QOS = QoS
                     59:  * return: -1 error or >-1 message size for send
                     60:  */
                     61: int
                     62: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                     63:                u_short msgID, u_char Dup, u_char QOS)
                     64: {
                     65:        int siz = 0;
                     66:        struct mqtthdr *hdr;
                     67:        mqtthdr_var_t *topic;
                     68:        mqtt_len_t *mid;
                     69:        mqtt_subscr_t *t;
                     70:        u_char *qos;
                     71: 
                     72:        if (!buf || !Topics)
                     73:                return -1;
                     74:        if (QOS > MQTT_QOS_EXACTLY) {
                     75:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                     76:                return -1;
                     77:        }
                     78:        if (!msgID && QOS != MQTT_QOS_ONCE) {
                     79:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                     80:                return -1;
                     81:        }
                     82: 
                     83:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     84:                return -1;
                     85:        else {
                     86:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     87:                siz += sizeof(struct mqtthdr);
                     88:        }
                     89: 
                     90:        /* variable header */
                     91:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                     92:        mid->val = htons(msgID);
                     93:        siz += sizeof(mqtt_len_t);
                     94: 
                     95:        /* payload with subscriptions */
                     96:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                     97:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                     98:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                     99:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                    100:                siz += MQTTHDR_VAR_SIZEOF(topic);
                    101:                qos = (buf->msg_base + siz);
                    102:                *qos = t->sub_ret;
                    103:                siz++;
                    104:        }
                    105: 
                    106:        /* fixed header */
                    107:        MQTTHDR_MSGINIT(hdr);
                    108:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
                    109:        hdr->mqtt_msg.qos = QOS;
                    110:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    111:        hdr->mqtt_msg.retain = 0;
                    112:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    113: 
                    114:        mqtt_msgRealloc(buf, siz);
                    115:        return siz;
                    116: }
                    117: 
                    118: /*
                    119:  * mqtt_msgSUBACK() Create SUBACK message
                    120:  *
                    121:  * @buf = Message buffer
                    122:  * @Topics = MQTT subscription topics
                    123:  * @msgID = MessageID
                    124:  * return: -1 error or >-1 message size for send
                    125:  */
                    126: int
                    127: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
                    128: {
                    129:        int siz = 0;
                    130:        struct mqtthdr *hdr;
                    131:        mqtt_len_t *v;
                    132:        mqtt_subscr_t *t;
                    133:        u_char *qos;
                    134: 
                    135:        if (!buf || !Topics)
                    136:                return -1;
                    137: 
                    138:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    139:                return -1;
                    140:        else {
                    141:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    142:                siz += sizeof(struct mqtthdr);
                    143:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    144:                siz += sizeof(mqtt_len_t);
                    145:        }
                    146: 
                    147:        /* MessageID */
                    148:        v->val = htons(msgID);
                    149: 
                    150:        /* QoS payload from subscriptions */
                    151:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    152:                qos = (buf->msg_base + siz);
                    153:                *qos = t->sub_ret;
                    154:                siz++;
                    155:        }
                    156: 
                    157:        /* fixed header */
                    158:        MQTTHDR_MSGINIT(hdr);
                    159:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
                    160:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    161: 
                    162:        mqtt_msgRealloc(buf, siz);
                    163:        return siz;
                    164: }
                    165: 
                    166: /*
                    167:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
                    168:  *
                    169:  * @buf = Message buffer
                    170:  * @Topics = MQTT subscription topics
                    171:  * @msgID = MessageID
                    172:  * @Dup = Duplicate message
                    173:  * @QOS = QoS
                    174:  * return: -1 error or >-1 message size for send
                    175:  */
                    176: int
                    177: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                    178:                u_short msgID, u_char Dup, u_char QOS)
                    179: {
                    180:        int siz = 0;
                    181:        struct mqtthdr *hdr;
                    182:        mqtthdr_var_t *topic;
                    183:        mqtt_len_t *mid;
                    184:        mqtt_subscr_t *t;
                    185: 
                    186:        if (!buf || !Topics)
                    187:                return -1;
                    188:        if (QOS > MQTT_QOS_EXACTLY) {
                    189:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                    190:                return -1;
                    191:        }
                    192:        if (!msgID && QOS != MQTT_QOS_ONCE) {
                    193:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                    194:                return -1;
                    195:        }
                    196: 
                    197:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    198:                return -1;
                    199:        else {
                    200:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    201:                siz += sizeof(struct mqtthdr);
                    202:        }
                    203: 
                    204:        /* variable header */
                    205:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                    206:        mid->val = htons(msgID);
                    207:        siz += sizeof(mqtt_len_t);
                    208: 
                    209:        /* payload with subscriptions */
                    210:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    211:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                    212:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                    213:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                    214:                siz += MQTTHDR_VAR_SIZEOF(topic);
                    215:        }
                    216: 
                    217:        /* fixed header */
                    218:        MQTTHDR_MSGINIT(hdr);
                    219:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
                    220:        hdr->mqtt_msg.qos = QOS;
                    221:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    222:        hdr->mqtt_msg.retain = 0;
                    223:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    224: 
                    225:        mqtt_msgRealloc(buf, siz);
                    226:        return siz;
                    227: }
                    228: 
                    229: /*
                    230:  * mqtt_msgUNSUBACK() Create UNSUBACK message
                    231:  *
                    232:  * @buf = Message buffer
                    233:  * @msgID = MessageID
                    234:  * return: -1 error or >-1 message size for send
                    235:  */
                    236: int
                    237: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    238: {
                    239:        int siz = 0;
                    240:        struct mqtthdr *hdr;
                    241:        mqtt_len_t *v;
                    242: 
                    243:        if (!buf)
                    244:                return -1;
                    245: 
                    246:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
                    247:                return -1;
                    248:        else {
                    249:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    250:                siz += sizeof(struct mqtthdr);
                    251:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    252:                siz += sizeof(mqtt_len_t);
                    253:        }
                    254: 
                    255:        /* fixed header */
                    256:        MQTTHDR_MSGINIT(hdr);
                    257:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
                    258:        *hdr->mqtt_len = sizeof(mqtt_len_t);
                    259: 
                    260:        /* MessageID */
                    261:        v->val = htons(msgID);
                    262: 
                    263:        return siz;
                    264: }
                    265: 
                    266: 
                    267: /* ============= decode ============ */
                    268: 
                    269: /*
                    270:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
                    271:  *
                    272:  * @buf = Message buffer
                    273:  * @msgID = MessageID
                    274:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
                    275:  * return: NULL error or !=NULL MQTT fixed header
                    276:  */
                    277: struct mqtthdr *
                    278: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    279: {
                    280:        register int i;
                    281:        int len, ret;
                    282:        struct mqtthdr *hdr;
                    283:        mqtthdr_var_t *var;
                    284:        mqtt_subscr_t *subs;
                    285:        mqtt_len_t *v;
                    286:        caddr_t pos;
                    287: 
                    288:        if (!buf || !msgID || !subscr)
                    289:                return NULL;
                    290: 
                    291:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
                    292:        if (!hdr)
                    293:                return NULL;
                    294:        pos = buf->msg_base + ret + 1;
                    295:        v = (mqtt_len_t*) pos;
                    296: 
                    297:        /* MessageID */
                    298:        len -= sizeof(mqtt_len_t);
                    299:        if (len < 0) {
                    300:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    301:                return NULL;
                    302:        } else {
                    303:                *msgID = ntohs(v->val);
                    304:                pos += sizeof(mqtt_len_t);
                    305:        }
                    306: 
                    307:        subs = mqtt_subAlloc(0);
                    308:        if (!subs)
                    309:                return NULL;
                    310:        else
                    311:                *subscr = subs;
                    312: 
                    313:        /* Subscribes */
                    314:        for (i = 0; len > 0; i++) {
                    315:                var = (mqtthdr_var_t*) pos;
                    316:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
                    317:                if (len < 0) {
                    318:                        mqtt_subFree(subscr);
                    319:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    320:                        return NULL;
                    321:                }
                    322:                subs = mqtt_subRealloc(subs, i + 1);
                    323:                if (!subs) {
                    324:                        mqtt_subFree(subscr);
                    325:                        return NULL;
                    326:                } else
                    327:                        *subscr = subs;
                    328: 
                    329:                memset(&subs[i], 0, sizeof subs[i]);
                    330:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                    331:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
                    332:                if (!subs[i].sub_topic.msg_base) {
                    333:                        LOGERR;
                    334:                        mqtt_subFree(subscr);
                    335:                        return NULL;
                    336:                } else
                    337:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                    338:                pos += MQTTHDR_VAR_SIZEOF(var);
                    339: 
                    340:                subs[i].sub_ret = *pos;
                    341:                pos++;
                    342:        }
                    343: 
                    344:        return hdr;
                    345: }
                    346: 
                    347: /*
                    348:  * mqtt_readSUBACK() Read SUBACK message
                    349:  *
                    350:  * @buf = Message buffer
                    351:  * @msgID = MessageID
                    352:  * @subqos = Subscribes QoS, must be free after use with free()
                    353:  * return: -1 error or >-1 readed subscribes QoS elements
                    354:  */
                    355: int
                    356: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
                    357: {
                    358:        int len, ret;
                    359:        struct mqtthdr *hdr;
                    360:        mqtt_len_t *v;
                    361:        caddr_t pos;
                    362: 
                    363:        if (!buf || !msgID || !subqos)
                    364:                return -1;
                    365: 
                    366:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
                    367:        if (!hdr)
                    368:                return -1;
                    369:        pos = buf->msg_base + ret + 1;
                    370:        v = (mqtt_len_t*) pos;
                    371: 
                    372:        /* MessageID */
                    373:        len -= sizeof(mqtt_len_t);
                    374:        if (len < 0) {
                    375:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    376:                return -1;
                    377:        } else {
                    378:                *msgID = ntohs(v->val);
                    379:                pos += sizeof(mqtt_len_t);
                    380:        }
                    381: 
                    382:        /* Subscribes */
                    383:        *subqos = malloc(len);
                    384:        if (!*subqos) {
                    385:                LOGERR;
                    386:                return -1;
                    387:        } else
                    388:                memcpy(*subqos, pos, len);
                    389: 
                    390:        return len;
                    391: }
                    392: 
                    393: /*
                    394:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
                    395:  *
                    396:  * @buf = Message buffer
                    397:  * @msgID = MessageID
                    398:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
                    399:  * return: NULL error or !=NULL MQTT fixed header
                    400:  */
                    401: struct mqtthdr *
                    402: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    403: {
                    404:        register int i;
                    405:        int len, ret;
                    406:        struct mqtthdr *hdr;
                    407:        mqtthdr_var_t *var;
                    408:        mqtt_subscr_t *subs;
                    409:        mqtt_len_t *v;
                    410:        caddr_t pos;
                    411: 
                    412:        if (!buf || !msgID || !subscr)
                    413:                return NULL;
                    414: 
                    415:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
                    416:        if (!hdr)
                    417:                return NULL;
                    418:        pos = buf->msg_base + ret + 1;
                    419:        v = (mqtt_len_t*) pos;
                    420: 
                    421:        /* MessageID */
                    422:        len -= sizeof(mqtt_len_t);
                    423:        if (len < 0) {
                    424:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    425:                return NULL;
                    426:        } else {
                    427:                *msgID = ntohs(v->val);
                    428:                pos += sizeof(mqtt_len_t);
                    429:        }
                    430: 
                    431:        subs = mqtt_subAlloc(0);
                    432:        if (!subs)
                    433:                return NULL;
                    434:        else
                    435:                *subscr = subs;
                    436: 
                    437:        /* Subscribes */
                    438:        for (i = 0; len > 0; i++) {
                    439:                var = (mqtthdr_var_t*) pos;
                    440:                len -= MQTTHDR_VAR_SIZEOF(var);
                    441:                if (len < 0) {
                    442:                        mqtt_subFree(subscr);
                    443:                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    444:                        return NULL;
                    445:                }
                    446:                subs = mqtt_subRealloc(subs, i + 1);
                    447:                if (!subs) {
                    448:                        mqtt_subFree(subscr);
                    449:                        return NULL;
                    450:                } else
                    451:                        *subscr = subs;
                    452: 
                    453:                memset(&subs[i], 0, sizeof subs[i]);
                    454:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                    455:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
                    456:                if (!subs[i].sub_topic.msg_base) {
                    457:                        LOGERR;
                    458:                        mqtt_subFree(subscr);
                    459:                        return NULL;
                    460:                } else
                    461:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                    462:                pos += MQTTHDR_VAR_SIZEOF(var);
                    463:        }
                    464: 
                    465:        return hdr;
                    466: }
                    467: 
                    468: /*
                    469:  * mqtt_readUNSUBACK() Read UNSUBACK message
                    470:  *
                    471:  * @buf = Message buffer
                    472:  * return: -1 error or MessageID
                    473:  */
                    474: u_short
                    475: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
                    476: {
                    477:        int len, ret;
                    478:        struct mqtthdr *hdr;
                    479:        mqtt_len_t *v;
                    480:        caddr_t pos;
                    481: 
                    482:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
                    483:        if (!hdr)
                    484:                return (u_short) -1;
                    485:        if (len < sizeof(mqtt_len_t)) {
                    486:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    487:                return (u_short) -1;
                    488:        } else {
                    489:                pos = buf->msg_base + ret + 1;
                    490:                v = (mqtt_len_t*) pos;
                    491:        }
                    492: 
                    493:        return ntohs(v->val);
                    494: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>