Annotation of libaitmqtt/src/pub.c, revision 1.1.1.1

1.1       misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
                      6: * $Id: aitsched.c,v 1.5 2012/01/24 21:59:47 misho Exp $
                      7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
                     15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
                     16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
                     46: #include "global.h"
                     47: 
                     48: 
                     49: /* ------------------------------------------------------------------- */
                     50: 
                     51: /*
                     52:  * mqtt_msgPUBLISH() Create PUBLISH message
                     53:  *
                     54:  * @buf = Message buffer
                     55:  * @csTopic = Publish topic
                     56:  * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
                     57:  * @Dup = Duplicate message
                     58:  * @QOS = QoS
                     59:  * @Retain = Retain message
                     60:  * @pData = Publish data into topic
                     61:  * @datlen = Publish data length
                     62:  * return: -1 error or >-1 message size for send
                     63:  */
                     64: int
                     65: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID, 
                     66:                u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
                     67: {
                     68:        int siz = 0;
                     69:        struct mqtthdr *hdr;
                     70:        mqtthdr_var_t *topic;
                     71:        mqtt_len_t *mid;
                     72:        void *data;
                     73: 
                     74:        if (!buf || !csTopic)
                     75:                return -1;
                     76:        if (QOS > MQTT_QOS_EXACTLY) {
                     77:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                     78:                return -1;
                     79:        }
                     80:        if (!msgID && QOS != MQTT_QOS_ONCE) {
                     81:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                     82:                return -1;
                     83:        }
                     84: 
                     85:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     86:                return -1;
                     87:        else {
                     88:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     89:                siz += sizeof(struct mqtthdr);
                     90:        }
                     91: 
                     92:        /* variable header */
                     93:        topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                     94:        topic->var_sb.val = htons(strlen(csTopic));
                     95:        memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
                     96:        siz += MQTTHDR_VAR_SIZEOF(topic);
                     97: 
                     98:        mid = (mqtt_len_t*) (buf->msg_base + siz);
                     99:        mid->val = htons(msgID);
                    100:        siz += sizeof(mqtt_len_t);
                    101: 
                    102:        /* load with data */
                    103:        if (pData && datlen) {
                    104:                data = buf->msg_base + siz;
                    105:                memcpy(data, pData, datlen);
                    106:                siz += datlen;
                    107:        }
                    108: 
                    109:        /* fixed header */
                    110:        MQTTHDR_MSGINIT(hdr);
                    111:        hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
                    112:        hdr->mqtt_msg.qos = QOS;
                    113:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
                    114:        hdr->mqtt_msg.retain = Retain ? 1 : 0;
                    115:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                    116: 
                    117:        mqtt_msgRealloc(buf, siz);
                    118:        return siz;
                    119: }
                    120: 
                    121: static int
                    122: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
                    123: {
                    124:        int siz = 0;
                    125:        struct mqtthdr *hdr;
                    126:        mqtt_len_t *v;
                    127: 
                    128:        if (!buf)
                    129:                return -1;
                    130: 
                    131:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
                    132:                return -1;
                    133:        else {
                    134:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                    135:                siz += sizeof(struct mqtthdr);
                    136:                v = (mqtt_len_t*) (buf->msg_base + siz);
                    137:                siz += sizeof(mqtt_len_t);
                    138:        }
                    139: 
                    140:        /* fixed header */
                    141:        MQTTHDR_MSGINIT(hdr);
                    142:        hdr->mqtt_msg.type = cmd;
                    143:        *hdr->mqtt_len = sizeof(mqtt_len_t);
                    144: 
                    145:        /* MessageID */
                    146:        v->val = htons(msgID);
                    147: 
                    148:        return siz;
                    149: }
                    150: 
                    151: /*
                    152:  * mqtt_msgPUBACK() Create PUBACK message
                    153:  *
                    154:  * @buf = Message buffer
                    155:  * @msgID = MessageID
                    156:  * return: -1 error or >-1 message size for send
                    157:  */
                    158: inline int
                    159: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    160: {
                    161:        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
                    162: }
                    163: 
                    164: /*
                    165:  * mqtt_msgPUBREC() Create PUBREC message
                    166:  *
                    167:  * @buf = Message buffer
                    168:  * @msgID = MessageID
                    169:  * return: -1 error or >-1 message size for send
                    170:  */
                    171: inline int
                    172: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
                    173: {
                    174:        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
                    175: }
                    176: 
                    177: /*
                    178:  * mqtt_msgPUBREL() Create PUBREL message
                    179:  *
                    180:  * @buf = Message buffer
                    181:  * @msgID = MessageID
                    182:  * return: -1 error or >-1 message size for send
                    183:  */
                    184: inline int
                    185: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
                    186: {
                    187:        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
                    188: }
                    189: 
                    190: /*
                    191:  * mqtt_msgPUBCOMP() Create PUBCOMP message
                    192:  *
                    193:  * @buf = Message buffer
                    194:  * @msgID = MessageID
                    195:  * return: -1 error or >-1 message size for send
                    196:  */
                    197: inline int
                    198: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
                    199: {
                    200:        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
                    201: }
                    202: 
                    203: 
                    204: /* ============= decode ============ */
                    205: 
                    206: /*
                    207:  * mqtt_readPUBLISH() Read PUBLISH message
                    208:  *
                    209:  * @buf = Message buffer
                    210:  * @psTopic = Topic
                    211:  * @topicLen = Topic length
                    212:  * @msgID = MessageID
                    213:  * @pData = Data buffer
                    214:  * @datLen = Data buffer length, if *datLen == 0 allocate memory for pData
                    215:  * return: NULL error or !=NULL MQTT fixed header
                    216:  */
                    217: struct mqtthdr *
                    218: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen, 
                    219:                u_short *msgID, void * __restrict pData, int *datLen)
                    220: {
                    221:        int len, ret;
                    222:        struct mqtthdr *hdr;
                    223:        mqtthdr_var_t *var;
                    224:        mqtt_len_t *v;
                    225:        caddr_t pos;
                    226: 
                    227:        if (!buf || !psTopic || !msgID || !pData)
                    228:                return NULL;
                    229: 
                    230:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
                    231:        if (!hdr)
                    232:                return NULL;
                    233:        pos = buf->msg_base + ret + 1;
                    234:        var = (mqtthdr_var_t*) pos;
                    235: 
                    236:        /* topic */
                    237:        len -= MQTTHDR_VAR_SIZEOF(var);
                    238:        if (len < 0) {
                    239:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    240:                return NULL;
                    241:        } else {
                    242:                memset(psTopic, 0, topicLen--);
                    243:                memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ? 
                    244:                                topicLen : ntohs(var->var_sb.val));
                    245:                pos += MQTTHDR_VAR_SIZEOF(var);
                    246:                v = (mqtt_len_t*) pos;
                    247:        }
                    248: 
                    249:        len -= sizeof(mqtt_len_t);
                    250:        if (len < 0) {
                    251:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    252:                return NULL;
                    253:        } else {
                    254:                *msgID = ntohs(v->val);
                    255:                pos += sizeof(mqtt_len_t);
                    256:        }
                    257: 
                    258:        /* data */
                    259:        if (len < 0) {
                    260:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    261:                return NULL;
                    262:        } else {
                    263:                if (!*datLen) {
                    264:                        if (!(pData = malloc(len))) {
                    265:                                LOGERR;
                    266:                                return NULL;
                    267:                        } else
                    268:                                *datLen = len;
                    269:                }
                    270: 
                    271:                memset(pData, 0, *datLen);
                    272:                if (len < *datLen)
                    273:                        *datLen = len;
                    274:                memcpy(pData, pos, *datLen);
                    275:        }
                    276: 
                    277:        return hdr;
                    278: }
                    279: 
                    280: /*
                    281:  * mqtt_readPUBACK() Read PUBACK message
                    282:  *
                    283:  * @buf = Message buffer
                    284:  * return: -1 error or MessageID
                    285:  */
                    286: u_short
                    287: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
                    288: {
                    289:        int len, ret;
                    290:        struct mqtthdr *hdr;
                    291:        mqtt_len_t *v;
                    292:        caddr_t pos;
                    293: 
                    294:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
                    295:        if (!hdr)
                    296:                return (u_short) -1;
                    297:        if (len < sizeof(mqtt_len_t)) {
                    298:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    299:                return (u_short) -1;
                    300:        } else {
                    301:                pos = buf->msg_base + ret + 1;
                    302:                v = (mqtt_len_t*) pos;
                    303:        }
                    304: 
                    305:        return ntohs(v->val);
                    306: }
                    307: 
                    308: /*
                    309:  * mqtt_readPUBREC() Read PUBREC message
                    310:  *
                    311:  * @buf = Message buffer
                    312:  * return: -1 error or MessageID
                    313:  */
                    314: u_short
                    315: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
                    316: {
                    317:        int len, ret;
                    318:        struct mqtthdr *hdr;
                    319:        mqtt_len_t *v;
                    320:        caddr_t pos;
                    321: 
                    322:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
                    323:        if (!hdr)
                    324:                return (u_short) -1;
                    325:        if (len < sizeof(mqtt_len_t)) {
                    326:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    327:                return (u_short) -1;
                    328:        } else {
                    329:                pos = buf->msg_base + ret + 1;
                    330:                v = (mqtt_len_t*) pos;
                    331:        }
                    332: 
                    333:        return ntohs(v->val);
                    334: }
                    335: 
                    336: /*
                    337:  * mqtt_readPUBREL() Read PUBREL message
                    338:  *
                    339:  * @buf = Message buffer
                    340:  * return: -1 error or MessageID
                    341:  */
                    342: u_short
                    343: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
                    344: {
                    345:        int len, ret;
                    346:        struct mqtthdr *hdr;
                    347:        mqtt_len_t *v;
                    348:        caddr_t pos;
                    349: 
                    350:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
                    351:        if (!hdr)
                    352:                return (u_short) -1;
                    353:        if (len < sizeof(mqtt_len_t)) {
                    354:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    355:                return (u_short) -1;
                    356:        } else {
                    357:                pos = buf->msg_base + ret + 1;
                    358:                v = (mqtt_len_t*) pos;
                    359:        }
                    360: 
                    361:        return ntohs(v->val);
                    362: }
                    363: 
                    364: /*
                    365:  * mqtt_readPUBCOMP() Read PUBCOMP message
                    366:  *
                    367:  * @buf = Message buffer
                    368:  * return: -1 error or MessageID
                    369:  */
                    370: u_short
                    371: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
                    372: {
                    373:        int len, ret;
                    374:        struct mqtthdr *hdr;
                    375:        mqtt_len_t *v;
                    376:        caddr_t pos;
                    377: 
                    378:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
                    379:        if (!hdr)
                    380:                return (u_short) -1;
                    381:        if (len < sizeof(mqtt_len_t)) {
                    382:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    383:                return (u_short) -1;
                    384:        } else {
                    385:                pos = buf->msg_base + ret + 1;
                    386:                v = (mqtt_len_t*) pos;
                    387:        }
                    388: 
                    389:        return ntohs(v->val);
                    390: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>