Annotation of libaitmqtt/src/cliside.c, revision 1.2

1.2     ! misho       1: /*************************************************************************
        !             2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
        !             3: *  by Michael Pounov <misho@openbsd-bg.org>
        !             4: *
        !             5: * $Author: misho $
        !             6: * $Id: cliside.c,v 1.1.2.8 2012/06/19 15:55:01 misho Exp $
        !             7: *
        !             8: **************************************************************************
        !             9: The ELWIX and AITNET software is distributed under the following
        !            10: terms:
        !            11: 
        !            12: All of the documentation and software included in the ELWIX and AITNET
        !            13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
        !            14: 
        !            15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
        !            16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
        !            17: 
        !            18: Redistribution and use in source and binary forms, with or without
        !            19: modification, are permitted provided that the following conditions
        !            20: are met:
        !            21: 1. Redistributions of source code must retain the above copyright
        !            22:    notice, this list of conditions and the following disclaimer.
        !            23: 2. Redistributions in binary form must reproduce the above copyright
        !            24:    notice, this list of conditions and the following disclaimer in the
        !            25:    documentation and/or other materials provided with the distribution.
        !            26: 3. All advertising materials mentioning features or use of this software
        !            27:    must display the following acknowledgement:
        !            28: This product includes software developed by Michael Pounov <misho@elwix.org>
        !            29: ELWIX - Embedded LightWeight unIX and its contributors.
        !            30: 4. Neither the name of AITNET nor the names of its contributors
        !            31:    may be used to endorse or promote products derived from this software
        !            32:    without specific prior written permission.
        !            33: 
        !            34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
        !            35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
        !            36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
        !            37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
        !            38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
        !            39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
        !            40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
        !            41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
        !            42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
        !            43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
        !            44: SUCH DAMAGE.
        !            45: */
        !            46: #include "global.h"
        !            47: 
        !            48: 
        !            49: /*
        !            50:  * mqtt_cli_Open() - Open client connection to MQTT broker
        !            51:  *
        !            52:  * @addr = brokers address
        !            53:  * @timeout = timeout
        !            54:  * return: NULL error or !=NULL connected to broker
        !            55:  */
        !            56: mqtt_cli_t *
        !            57: mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
        !            58: {
        !            59:        mqtt_cli_t *cli;
        !            60: 
        !            61:        if (!addr)
        !            62:                return NULL;
        !            63: 
        !            64:        cli = malloc(sizeof(mqtt_cli_t));
        !            65:        if (!cli) {
        !            66:                LOGERR;
        !            67:                return NULL;
        !            68:        } else
        !            69:                memset(cli, 0, sizeof(mqtt_cli_t));
        !            70: 
        !            71:        cli->timeout = timeout;
        !            72:        cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
        !            73:        if (cli->sock == -1) {
        !            74:                LOGERR;
        !            75:                free(cli);
        !            76:                return NULL;
        !            77:        }
        !            78:        if (connect(cli->sock, addr, addr->sa_len) == -1) {
        !            79:                LOGERR;
        !            80:                close(cli->sock);
        !            81:                free(cli);
        !            82:                return NULL;
        !            83:        }
        !            84: 
        !            85:        cli->buf = mqtt_msgAlloc(USHRT_MAX);
        !            86:        if (!cli->buf) {
        !            87:                close(cli->sock);
        !            88:                free(cli);
        !            89:                return NULL;
        !            90:        }
        !            91: 
        !            92:        return cli;
        !            93: }
        !            94: 
        !            95: /*
        !            96:  * mqtt_cli_Close() - Close client connection
        !            97:  *
        !            98:  * @cli = connected client
        !            99:  * return: -1 error or 0 disconnected client and freed all resources
        !           100:  */
        !           101: int
        !           102: mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
        !           103: {
        !           104:        int siz = 0;
        !           105: 
        !           106:        if (!cli || !*cli)
        !           107:                return -1;
        !           108: 
        !           109:        /* send disconnect */
        !           110:        siz = mqtt_msgDISCONNECT((*cli)->buf);
        !           111:        if (siz > -1) {
        !           112:                siz = send((*cli)->sock, (*cli)->buf->msg_base, siz, MSG_NOSIGNAL);
        !           113:                if (siz > -1)
        !           114:                        shutdown((*cli)->sock, SHUT_RDWR);
        !           115:        }
        !           116:        close((*cli)->sock);
        !           117: 
        !           118:        mqtt_msgFree(&(*cli)->buf, 42);
        !           119: 
        !           120:        free(*cli);
        !           121:        *cli = NULL;
        !           122:        return 0;
        !           123: }
        !           124: 
        !           125: /*
        !           126:  * mqtt_cli_Subscribe() - Subscribe to broker
        !           127:  *
        !           128:  * @cli = connected client
        !           129:  * @Topics = Topics for subscribes
        !           130:  * @msgID = Message ID
        !           131:  * @Dup = Duplicated request
        !           132:  * @QoS = Message QoS
        !           133:  * return: NULL error or !=NULL allocated array with subscribed QoS responses, 
        !           134:  *     must be free() result!
        !           135:  */
        !           136: u_char *
        !           137: mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics, 
        !           138:                u_short msgID, u_char Dup, u_char QoS)
        !           139: {
        !           140:        int siz = 0;
        !           141:        u_short mid = 0;
        !           142:        u_char *qoses = NULL;
        !           143: 
        !           144:        if (!cli || !Topics)
        !           145:                return NULL;
        !           146: 
        !           147:        /* send subscribe */
        !           148:        siz = mqtt_msgSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
        !           149:        if (siz == -1)
        !           150:                return NULL;
        !           151:        siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
        !           152:        if (siz == -1) {
        !           153:                LOGERR;
        !           154:                return NULL;
        !           155:        }
        !           156: 
        !           157:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
        !           158:                return NULL;
        !           159:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
        !           160:                return NULL;
        !           161: 
        !           162:        /* receive suback */
        !           163:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
        !           164:        if (siz == -1) {
        !           165:                LOGERR;
        !           166:                return NULL;
        !           167:        }
        !           168:        siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
        !           169:        if (siz == -1)
        !           170:                return NULL;
        !           171:        if (msgID != mid) {
        !           172:                free(qoses);
        !           173:                mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid);
        !           174:                return NULL;
        !           175:        }
        !           176: 
        !           177:        return qoses;
        !           178: }
        !           179: 
        !           180: /*
        !           181:  * mqtt_cli_Unsubscribe() - Unsubscribe from broker
        !           182:  *
        !           183:  * @cli = connected client
        !           184:  * @Topics = Topics for unsubscribes
        !           185:  * @msgID = Message ID
        !           186:  * @Dup = Duplicated request
        !           187:  * @QoS = Message QoS
        !           188:  * return: -1 error or 0 ok
        !           189:  */
        !           190: int
        !           191: mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics, 
        !           192:                u_short msgID, u_char Dup, u_char QoS)
        !           193: {
        !           194:        int siz = 0;
        !           195: 
        !           196:        if (!cli || !Topics)
        !           197:                return -1;
        !           198: 
        !           199:        /* send unsubscribe */
        !           200:        siz = mqtt_msgUNSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
        !           201:        if (siz == -1)
        !           202:                return -1;
        !           203:        siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
        !           204:        if (siz == -1) {
        !           205:                LOGERR;
        !           206:                return -1;
        !           207:        }
        !           208: 
        !           209:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
        !           210:                return -1;
        !           211:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
        !           212:                return -1;
        !           213: 
        !           214:        /* receive unsuback */
        !           215:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
        !           216:        if (siz == -1) {
        !           217:                LOGERR;
        !           218:                return -1;
        !           219:        }
        !           220:        siz = mqtt_readUNSUBACK(cli->buf);
        !           221:        if (siz == -1)
        !           222:                return -1;
        !           223:        if (msgID != siz) {
        !           224:                mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
        !           225:                return -1;
        !           226:        }
        !           227: 
        !           228:        return 0;
        !           229: }
        !           230: 
        !           231: /*
        !           232:  * mqtt_cli_Publish() - Publish message to broker
        !           233:  *
        !           234:  * @cli = connected client
        !           235:  * @msgID = Message ID
        !           236:  * @Dup = Duplicated request
        !           237:  * @QoS = Message QoS
        !           238:  * @Retain = Retain message
        !           239:  * @csTopic = Topic
        !           240:  * @pData = Data
        !           241:  * @datLen = Data length
        !           242:  * return: -1 error or > -1 sended bytes
        !           243:  */
        !           244: int
        !           245: mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain, 
        !           246:                const char *csTopic, const void *pData, int datLen)
        !           247: {
        !           248:        int wlen = 0, siz = 0;
        !           249: 
        !           250:        if (!cli || !csTopic)
        !           251:                return -1;
        !           252: 
        !           253:        /* send publish */
        !           254:        siz = mqtt_msgPUBLISH(cli->buf, csTopic, msgID, Dup, QoS, Retain, pData, datLen);
        !           255:        if (siz == -1)
        !           256:                return -1;
        !           257:        siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
        !           258:        if (siz == -1) {
        !           259:                LOGERR;
        !           260:                return -1;
        !           261:        } else
        !           262:                wlen = siz;
        !           263: 
        !           264:        if (QoS == MQTT_QOS_ONCE)       /* no reply */
        !           265:                goto end;
        !           266: 
        !           267:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
        !           268:                return -1;
        !           269:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
        !           270:                return -1;
        !           271: 
        !           272:        /* receive PUBxxx */
        !           273:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
        !           274:        if (siz == -1) {
        !           275:                LOGERR;
        !           276:                return -1;
        !           277:        }
        !           278: 
        !           279:        if (QoS == MQTT_QOS_ACK) {      /* reply with PUBACK */
        !           280:                siz = mqtt_readPUBACK(cli->buf);
        !           281:                if (siz == -1)
        !           282:                        return -1;
        !           283:                if (msgID != siz) {
        !           284:                        mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
        !           285:                        return -1;
        !           286:                }
        !           287:                goto end;
        !           288:        } else {                        /* reply with PUBREC */
        !           289:                siz = mqtt_readPUBREC(cli->buf);
        !           290:                if (siz == -1)
        !           291:                        return -1;
        !           292:                if (msgID != siz) {
        !           293:                        mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
        !           294:                        return -1;
        !           295:                }
        !           296:        }
        !           297: 
        !           298:        do {
        !           299:                /* send publish release QoS == 2 */
        !           300:                siz = mqtt_msgPUBREL(cli->buf, msgID);
        !           301:                if (siz == -1)
        !           302:                        return -1;
        !           303:                siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
        !           304:                if (siz == -1) {
        !           305:                        LOGERR;
        !           306:                        return -1;
        !           307:                }
        !           308: 
        !           309:                if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
        !           310:                        return -1;
        !           311:                } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
        !           312:                        if (Dup++ > 1)
        !           313:                                return -1;
        !           314:                        else
        !           315:                                continue;
        !           316:                }
        !           317: 
        !           318:                /* receive PUBCOMP */
        !           319:                siz = mqtt_readPUBCOMP(cli->buf);
        !           320:                if (siz == -1)
        !           321:                        return -1;
        !           322:                if (msgID != siz) {
        !           323:                        mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
        !           324:                        if (Dup++ > 1)
        !           325:                                return -1;
        !           326:                        else
        !           327:                                continue;
        !           328:                }
        !           329:        } while (0);
        !           330: 
        !           331: end:
        !           332:        return wlen;
        !           333: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>