Annotation of libaitmqtt/src/sub.c, revision 1.1.1.1.2.2

1.1       misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.1.1.1.2.2! misho       6: * $Id: sub.c,v 1.1.1.1.2.1 2012/04/07 20:54:57 misho Exp $
1.1       misho       7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
1.1.1.1.2.1  misho      15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1       misho      16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
                     46: #include "global.h"
                     47: 
                     48: 
                     49: /*
                     50:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
                     51:  *
                     52:  * @buf = Message buffer
                     53:  * @Topics = MQTT subscription topics
                     54:  * @msgID = MessageID
                     55:  * @Dup = Duplicate message
                     56:  * @QOS = QoS
                     57:  * return: -1 error or >-1 message size for send
                     58:  */
                     59: int
                     60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                     61:                u_short msgID, u_char Dup, u_char QOS)
                     62: {
                     63:        int siz = 0;
                     64:        struct mqtthdr *hdr;
                     65:        mqtthdr_var_t *topic;
                     66:        mqtt_len_t *mid;
                     67:        mqtt_subscr_t *t;
                     68:        u_char *qos;
                     69: 
                     70:        if (!buf || !Topics)
                     71:                return -1;
                     72:        if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1  misho      73:                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1       misho      74:                return -1;
                     75:        }
                     76:        if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1  misho      77:                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1       misho      78:                return -1;
                     79:        }
                     80: 
                     81:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     82:                return -1;
                     83:        else {
                     84:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     85:                siz += sizeof(struct mqtthdr);
                     86:        }
                     87: 
                     88:        /* variable header */
                     89:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                     90:        mid->val = htons(msgID);
                     91:        siz += sizeof(mqtt_len_t);
                     92: 
                     93:        /* payload with subscriptions */
                     94:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                     95:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                     96:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                     97:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                     98:                siz += MQTTHDR_VAR_SIZEOF(topic);
                     99:                qos = (buf->msg_base + siz);
                    100:                *qos = t->sub_ret;
                    101:                siz++;
                    102:        }
                    103: 
                    104:        /* fixed header */
                    105:        MQTTHDR_MSGINIT(hdr);
                    106:        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
                    107:        hdr->mqtt_msg.qos = QOS;
                    108:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    109:        hdr->mqtt_msg.retain = 0;
                    110:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    111: 
                    112:        mqtt_msgRealloc(buf, siz);
                    113:        return siz;
                    114: }
                    115: 
                    116: /*
                    117:  * mqtt_msgSUBACK() Create SUBACK message
                    118:  *
                    119:  * @buf = Message buffer
                    120:  * @Topics = MQTT subscription topics
                    121:  * @msgID = MessageID
                    122:  * return: -1 error or >-1 message size for send
                    123:  */
                    124: int
                    125: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
                    126: {
                    127:        int siz = 0;
                    128:        struct mqtthdr *hdr;
                    129:        mqtt_len_t *v;
                    130:        mqtt_subscr_t *t;
                    131:        u_char *qos;
                    132: 
                    133:        if (!buf || !Topics)
                    134:                return -1;
                    135: 
                    136:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    137:                return -1;
                    138:        else {
                    139:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    140:                siz += sizeof(struct mqtthdr);
                    141:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    142:                siz += sizeof(mqtt_len_t);
                    143:        }
                    144: 
                    145:        /* MessageID */
                    146:        v->val = htons(msgID);
                    147: 
                    148:        /* QoS payload from subscriptions */
                    149:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    150:                qos = (buf->msg_base + siz);
                    151:                *qos = t->sub_ret;
                    152:                siz++;
                    153:        }
                    154: 
                    155:        /* fixed header */
                    156:        MQTTHDR_MSGINIT(hdr);
                    157:        hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
                    158:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    159: 
                    160:        mqtt_msgRealloc(buf, siz);
                    161:        return siz;
                    162: }
                    163: 
                    164: /*
                    165:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
                    166:  *
                    167:  * @buf = Message buffer
                    168:  * @Topics = MQTT subscription topics
                    169:  * @msgID = MessageID
                    170:  * @Dup = Duplicate message
                    171:  * @QOS = QoS
                    172:  * return: -1 error or >-1 message size for send
                    173:  */
                    174: int
                    175: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                    176:                u_short msgID, u_char Dup, u_char QOS)
                    177: {
                    178:        int siz = 0;
                    179:        struct mqtthdr *hdr;
                    180:        mqtthdr_var_t *topic;
                    181:        mqtt_len_t *mid;
                    182:        mqtt_subscr_t *t;
                    183: 
                    184:        if (!buf || !Topics)
                    185:                return -1;
                    186:        if (QOS > MQTT_QOS_EXACTLY) {
1.1.1.1.2.1  misho     187:                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
1.1       misho     188:                return -1;
                    189:        }
                    190:        if (!msgID && QOS != MQTT_QOS_ONCE) {
1.1.1.1.2.1  misho     191:                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
1.1       misho     192:                return -1;
                    193:        }
                    194: 
                    195:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                    196:                return -1;
                    197:        else {
                    198:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    199:                siz += sizeof(struct mqtthdr);
                    200:        }
                    201: 
                    202:        /* variable header */
                    203:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                    204:        mid->val = htons(msgID);
                    205:        siz += sizeof(mqtt_len_t);
                    206: 
                    207:        /* payload with subscriptions */
                    208:        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                    209:                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                    210:                topic->var_sb.val = htons(t->sub_topic.msg_len);
                    211:                memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                    212:                siz += MQTTHDR_VAR_SIZEOF(topic);
                    213:        }
                    214: 
                    215:        /* fixed header */
                    216:        MQTTHDR_MSGINIT(hdr);
                    217:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
                    218:        hdr->mqtt_msg.qos = QOS;
                    219:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    220:        hdr->mqtt_msg.retain = 0;
                    221:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    222: 
                    223:        mqtt_msgRealloc(buf, siz);
                    224:        return siz;
                    225: }
                    226: 
                    227: /*
                    228:  * mqtt_msgUNSUBACK() Create UNSUBACK message
                    229:  *
                    230:  * @buf = Message buffer
                    231:  * @msgID = MessageID
                    232:  * return: -1 error or >-1 message size for send
                    233:  */
                    234: int
                    235: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    236: {
                    237:        int siz = 0;
                    238:        struct mqtthdr *hdr;
                    239:        mqtt_len_t *v;
                    240: 
                    241:        if (!buf)
                    242:                return -1;
                    243: 
                    244:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
                    245:                return -1;
                    246:        else {
                    247:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    248:                siz += sizeof(struct mqtthdr);
                    249:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    250:                siz += sizeof(mqtt_len_t);
                    251:        }
                    252: 
                    253:        /* fixed header */
                    254:        MQTTHDR_MSGINIT(hdr);
                    255:        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
                    256:        *hdr->mqtt_len = sizeof(mqtt_len_t);
                    257: 
                    258:        /* MessageID */
                    259:        v->val = htons(msgID);
                    260: 
                    261:        return siz;
                    262: }
                    263: 
                    264: 
                    265: /* ============= decode ============ */
                    266: 
                    267: /*
                    268:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
                    269:  *
                    270:  * @buf = Message buffer
                    271:  * @msgID = MessageID
                    272:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2! misho     273:  * return: -1 error or >-1 elements into subscr
1.1       misho     274:  */
1.1.1.1.2.2! misho     275: int
1.1       misho     276: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    277: {
                    278:        register int i;
                    279:        int len, ret;
                    280:        struct mqtthdr *hdr;
                    281:        mqtthdr_var_t *var;
                    282:        mqtt_subscr_t *subs;
                    283:        mqtt_len_t *v;
                    284:        caddr_t pos;
                    285: 
                    286:        if (!buf || !msgID || !subscr)
1.1.1.1.2.2! misho     287:                return -1;
1.1       misho     288: 
                    289:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
                    290:        if (!hdr)
1.1.1.1.2.2! misho     291:                return -1;
1.1       misho     292:        pos = buf->msg_base + ret + 1;
                    293:        v = (mqtt_len_t*) pos;
                    294: 
                    295:        /* MessageID */
                    296:        len -= sizeof(mqtt_len_t);
                    297:        if (len < 0) {
1.1.1.1.2.1  misho     298:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2! misho     299:                return -1;
1.1       misho     300:        } else {
                    301:                *msgID = ntohs(v->val);
                    302:                pos += sizeof(mqtt_len_t);
                    303:        }
                    304: 
                    305:        subs = mqtt_subAlloc(0);
                    306:        if (!subs)
1.1.1.1.2.2! misho     307:                return -1;
1.1       misho     308:        else
                    309:                *subscr = subs;
                    310: 
                    311:        /* Subscribes */
                    312:        for (i = 0; len > 0; i++) {
                    313:                var = (mqtthdr_var_t*) pos;
                    314:                len -= MQTTHDR_VAR_SIZEOF(var) + 1;
                    315:                if (len < 0) {
                    316:                        mqtt_subFree(subscr);
1.1.1.1.2.1  misho     317:                        mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2! misho     318:                        return -1;
1.1       misho     319:                }
                    320:                subs = mqtt_subRealloc(subs, i + 1);
                    321:                if (!subs) {
                    322:                        mqtt_subFree(subscr);
1.1.1.1.2.2! misho     323:                        return -1;
1.1       misho     324:                } else
                    325:                        *subscr = subs;
                    326: 
                    327:                memset(&subs[i], 0, sizeof subs[i]);
                    328:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                    329:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
                    330:                if (!subs[i].sub_topic.msg_base) {
                    331:                        LOGERR;
                    332:                        mqtt_subFree(subscr);
1.1.1.1.2.2! misho     333:                        return -1;
1.1       misho     334:                } else
                    335:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                    336:                pos += MQTTHDR_VAR_SIZEOF(var);
                    337: 
                    338:                subs[i].sub_ret = *pos;
                    339:                pos++;
                    340:        }
                    341: 
1.1.1.1.2.2! misho     342:        return i;
1.1       misho     343: }
                    344: 
                    345: /*
                    346:  * mqtt_readSUBACK() Read SUBACK message
                    347:  *
                    348:  * @buf = Message buffer
                    349:  * @msgID = MessageID
                    350:  * @subqos = Subscribes QoS, must be free after use with free()
                    351:  * return: -1 error or >-1 readed subscribes QoS elements
                    352:  */
                    353: int
                    354: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
                    355: {
                    356:        int len, ret;
                    357:        struct mqtthdr *hdr;
                    358:        mqtt_len_t *v;
                    359:        caddr_t pos;
                    360: 
                    361:        if (!buf || !msgID || !subqos)
                    362:                return -1;
                    363: 
                    364:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
                    365:        if (!hdr)
                    366:                return -1;
                    367:        pos = buf->msg_base + ret + 1;
                    368:        v = (mqtt_len_t*) pos;
                    369: 
                    370:        /* MessageID */
                    371:        len -= sizeof(mqtt_len_t);
                    372:        if (len < 0) {
1.1.1.1.2.1  misho     373:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1       misho     374:                return -1;
                    375:        } else {
                    376:                *msgID = ntohs(v->val);
                    377:                pos += sizeof(mqtt_len_t);
                    378:        }
                    379: 
                    380:        /* Subscribes */
                    381:        *subqos = malloc(len);
                    382:        if (!*subqos) {
                    383:                LOGERR;
                    384:                return -1;
                    385:        } else
                    386:                memcpy(*subqos, pos, len);
                    387: 
                    388:        return len;
                    389: }
                    390: 
                    391: /*
                    392:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
                    393:  *
                    394:  * @buf = Message buffer
                    395:  * @msgID = MessageID
                    396:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
1.1.1.1.2.2! misho     397:  * return: -1 error or >-1 elements into subscr
1.1       misho     398:  */
1.1.1.1.2.2! misho     399: int
1.1       misho     400: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
                    401: {
                    402:        register int i;
                    403:        int len, ret;
                    404:        struct mqtthdr *hdr;
                    405:        mqtthdr_var_t *var;
                    406:        mqtt_subscr_t *subs;
                    407:        mqtt_len_t *v;
                    408:        caddr_t pos;
                    409: 
                    410:        if (!buf || !msgID || !subscr)
1.1.1.1.2.2! misho     411:                return -1;
1.1       misho     412: 
                    413:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
                    414:        if (!hdr)
1.1.1.1.2.2! misho     415:                return -1;
1.1       misho     416:        pos = buf->msg_base + ret + 1;
                    417:        v = (mqtt_len_t*) pos;
                    418: 
                    419:        /* MessageID */
                    420:        len -= sizeof(mqtt_len_t);
                    421:        if (len < 0) {
1.1.1.1.2.1  misho     422:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2! misho     423:                return -1;
1.1       misho     424:        } else {
                    425:                *msgID = ntohs(v->val);
                    426:                pos += sizeof(mqtt_len_t);
                    427:        }
                    428: 
                    429:        subs = mqtt_subAlloc(0);
                    430:        if (!subs)
1.1.1.1.2.2! misho     431:                return -1;
1.1       misho     432:        else
                    433:                *subscr = subs;
                    434: 
                    435:        /* Subscribes */
                    436:        for (i = 0; len > 0; i++) {
                    437:                var = (mqtthdr_var_t*) pos;
                    438:                len -= MQTTHDR_VAR_SIZEOF(var);
                    439:                if (len < 0) {
                    440:                        mqtt_subFree(subscr);
1.1.1.1.2.1  misho     441:                        mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1.1.1.2.2! misho     442:                        return -1;
1.1       misho     443:                }
                    444:                subs = mqtt_subRealloc(subs, i + 1);
                    445:                if (!subs) {
                    446:                        mqtt_subFree(subscr);
1.1.1.1.2.2! misho     447:                        return -1;
1.1       misho     448:                } else
                    449:                        *subscr = subs;
                    450: 
                    451:                memset(&subs[i], 0, sizeof subs[i]);
                    452:                subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                    453:                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
                    454:                if (!subs[i].sub_topic.msg_base) {
                    455:                        LOGERR;
                    456:                        mqtt_subFree(subscr);
1.1.1.1.2.2! misho     457:                        return -1;
1.1       misho     458:                } else
                    459:                        memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                    460:                pos += MQTTHDR_VAR_SIZEOF(var);
                    461:        }
                    462: 
1.1.1.1.2.2! misho     463:        return i;
1.1       misho     464: }
                    465: 
                    466: /*
                    467:  * mqtt_readUNSUBACK() Read UNSUBACK message
                    468:  *
                    469:  * @buf = Message buffer
                    470:  * return: -1 error or MessageID
                    471:  */
                    472: u_short
                    473: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
                    474: {
                    475:        int len, ret;
                    476:        struct mqtthdr *hdr;
                    477:        mqtt_len_t *v;
                    478:        caddr_t pos;
                    479: 
                    480:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
                    481:        if (!hdr)
                    482:                return (u_short) -1;
                    483:        if (len < sizeof(mqtt_len_t)) {
1.1.1.1.2.1  misho     484:                mqtt_SetErr(EINVAL, "Short message length %d", len);
1.1       misho     485:                return (u_short) -1;
                    486:        } else {
                    487:                pos = buf->msg_base + ret + 1;
                    488:                v = (mqtt_len_t*) pos;
                    489:        }
                    490: 
                    491:        return ntohs(v->val);
                    492: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>