Annotation of libaitmqtt/src/cliside.c, revision 1.1.2.6

1.1.2.2   misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.1.2.6 ! misho       6: * $Id: cliside.c,v 1.1.2.5 2012/05/08 11:29:56 misho Exp $
1.1.2.2   misho       7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
                     15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
                     16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
1.1.2.1   misho      46: #include "global.h"
                     47: 
                     48: 
1.1.2.3   misho      49: /*
                     50:  * mqtt_cli_Open() - Open client connection to MQTT broker
                     51:  *
                     52:  * @addr = brokers address
1.1.2.4   misho      53:  * @timeout = timeout
1.1.2.3   misho      54:  * return: NULL error or !=NULL connected to broker
                     55:  */
                     56: mqtt_cli_t *
1.1.2.4   misho      57: mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
1.1.2.3   misho      58: {
                     59:        mqtt_cli_t *cli;
                     60: 
                     61:        if (!addr)
                     62:                return NULL;
                     63: 
                     64:        cli = malloc(sizeof(mqtt_cli_t));
                     65:        if (!cli) {
                     66:                LOGERR;
                     67:                return NULL;
                     68:        } else
                     69:                memset(cli, 0, sizeof(mqtt_cli_t));
                     70: 
1.1.2.4   misho      71:        cli->timeout = timeout;
1.1.2.3   misho      72:        cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
                     73:        if (cli->sock == -1) {
                     74:                LOGERR;
                     75:                free(cli);
                     76:                return NULL;
                     77:        }
                     78:        if (connect(cli->sock, addr, addr->sa_len) == -1) {
                     79:                LOGERR;
                     80:                close(cli->sock);
                     81:                free(cli);
                     82:                return NULL;
                     83:        }
                     84: 
                     85:        cli->buf = mqtt_msgAlloc(USHRT_MAX);
                     86:        if (!cli->buf) {
                     87:                close(cli->sock);
                     88:                free(cli);
                     89:                return NULL;
                     90:        }
                     91: 
                     92:        return cli;
                     93: }
                     94: 
                     95: /*
                     96:  * mqtt_cli_Close() - Close client connection
                     97:  *
                     98:  * @cli = connected client
                     99:  * return: -1 error or 0 disconnected client and freed all resources
                    100:  */
                    101: int
                    102: mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
                    103: {
                    104:        int siz = 0;
                    105: 
                    106:        if (!cli || !*cli)
                    107:                return -1;
                    108: 
                    109:        /* send disconnect */
                    110:        siz = mqtt_msgDISCONNECT((*cli)->buf);
                    111:        if (siz > -1) {
                    112:                siz = send((*cli)->sock, (*cli)->buf->msg_base, siz, MSG_NOSIGNAL);
                    113:                if (siz > -1)
                    114:                        shutdown((*cli)->sock, SHUT_RDWR);
                    115:        }
                    116:        close((*cli)->sock);
                    117: 
                    118:        mqtt_msgFree(&(*cli)->buf, 42);
                    119: 
                    120:        free(*cli);
                    121:        *cli = NULL;
                    122:        return 0;
                    123: }
1.1.2.4   misho     124: 
                    125: /*
                    126:  * mqtt_cli_Subscribe() - Subscribe to broker
                    127:  *
                    128:  * @cli = connected client
                    129:  * @Topics = Topics for subscribes
                    130:  * @msgID = Message ID
                    131:  * @Dup = Duplicated request
                    132:  * @QoS = Message QoS
                    133:  * return: NULL error or !=NULL allocated array with subscribed QoS responses, 
                    134:  *     must be free() result!
                    135:  */
                    136: u_char *
                    137: mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics, 
                    138:                u_short msgID, u_char Dup, u_char QoS)
                    139: {
                    140:        int siz = 0;
                    141:        u_short mid = 0;
                    142:        u_char *qoses = NULL;
                    143: 
1.1.2.5   misho     144:        if (!cli || !Topics)
1.1.2.4   misho     145:                return NULL;
                    146: 
                    147:        /* send subscribe */
                    148:        siz = mqtt_msgSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
                    149:        if (siz == -1)
                    150:                return NULL;
                    151:        siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
                    152:        if (siz == -1) {
                    153:                LOGERR;
                    154:                return NULL;
                    155:        }
                    156: 
                    157:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                    158:                return NULL;
                    159:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
                    160:                return NULL;
                    161: 
                    162:        /* receive suback */
                    163:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
                    164:        if (siz == -1) {
                    165:                LOGERR;
                    166:                return NULL;
                    167:        }
                    168:        siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
                    169:        if (siz == -1)
                    170:                return NULL;
                    171:        if (msgID != mid) {
                    172:                free(qoses);
                    173:                mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, mid);
                    174:                return NULL;
                    175:        }
                    176: 
                    177:        return qoses;
                    178: }
1.1.2.5   misho     179: 
                    180: /*
                    181:  * mqtt_cli_Unsubscribe() - Unsubscribe from broker
                    182:  *
                    183:  * @cli = connected client
                    184:  * @Topics = Topics for unsubscribes
                    185:  * @msgID = Message ID
                    186:  * @Dup = Duplicated request
                    187:  * @QoS = Message QoS
                    188:  * return: -1 error or 0 ok
                    189:  */
                    190: int
                    191: mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics, 
                    192:                u_short msgID, u_char Dup, u_char QoS)
                    193: {
                    194:        int siz = 0;
                    195: 
                    196:        if (!cli || !Topics)
                    197:                return -1;
                    198: 
                    199:        /* send unsubscribe */
                    200:        siz = mqtt_msgUNSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
                    201:        if (siz == -1)
                    202:                return -1;
                    203:        siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
                    204:        if (siz == -1) {
                    205:                LOGERR;
                    206:                return -1;
                    207:        }
                    208: 
                    209:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                    210:                return -1;
                    211:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
                    212:                return -1;
                    213: 
                    214:        /* receive unsuback */
                    215:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
                    216:        if (siz == -1) {
                    217:                LOGERR;
                    218:                return -1;
                    219:        }
                    220:        siz = mqtt_readUNSUBACK(cli->buf);
                    221:        if (siz == -1)
                    222:                return -1;
                    223:        if (msgID != siz) {
1.1.2.6 ! misho     224:                mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, siz);
        !           225:                return -1;
        !           226:        }
        !           227: 
        !           228:        return 0;
        !           229: }
        !           230: 
        !           231: /*
        !           232:  * mqtt_cli_Publish() - Publish message to broker
        !           233:  *
        !           234:  * @cli = connected client
        !           235:  * @msgID = Message ID
        !           236:  * @Dup = Duplicated request
        !           237:  * @QoS = Message QoS
        !           238:  * @Retain = Retain message
        !           239:  * @csTopic = Topic
        !           240:  * @pData = Data
        !           241:  * @datLen = Data length
        !           242:  * return: -1 error or 0 ok
        !           243:  */
        !           244: int
        !           245: mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain, 
        !           246:                const char *csTopic, const void *pData, int datLen)
        !           247: {
        !           248:        int siz = 0;
        !           249: 
        !           250:        if (!cli || !csTopic)
        !           251:                return -1;
        !           252: 
        !           253:        /* send publish */
        !           254:        siz = mqtt_msgPUBLISH(cli->buf, csTopic, msgID, Dup, QoS, Retain, pData, datLen);
        !           255:        if (siz == -1)
        !           256:                return -1;
        !           257:        siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
        !           258:        if (siz == -1) {
        !           259:                LOGERR;
1.1.2.5   misho     260:                return -1;
                    261:        }
                    262: 
1.1.2.6 ! misho     263:        if (QoS == MQTT_QOS_ONCE)       /* no reply */
        !           264:                goto end;
        !           265: 
        !           266:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
        !           267:                return -1;
        !           268:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
        !           269:                return -1;
        !           270: 
        !           271:        /* receive PUBxxx */
        !           272:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
        !           273:        if (siz == -1) {
        !           274:                LOGERR;
        !           275:                return -1;
        !           276:        }
        !           277: 
        !           278:        if (QoS == MQTT_QOS_ACK) {      /* reply with PUBACK */
        !           279:                siz = mqtt_readPUBACK(cli->buf);
        !           280:                if (siz == -1)
        !           281:                        return -1;
        !           282:                if (msgID != siz) {
        !           283:                        mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, siz);
        !           284:                        return -1;
        !           285:                }
        !           286:                goto end;
        !           287:        } else {                        /* reply with PUBREC */
        !           288:                siz = mqtt_readPUBREC(cli->buf);
        !           289:                if (siz == -1)
        !           290:                        return -1;
        !           291:                if (msgID != siz) {
        !           292:                        mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, siz);
        !           293:                        return -1;
        !           294:                }
        !           295:        }
        !           296: 
        !           297:        do {
        !           298:                /* send publish release QoS == 2 */
        !           299:                siz = mqtt_msgPUBREL(cli->buf, msgID);
        !           300:                if (siz == -1)
        !           301:                        return -1;
        !           302:                siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
        !           303:                if (siz == -1) {
        !           304:                        LOGERR;
        !           305:                        return -1;
        !           306:                }
        !           307: 
        !           308:                if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
        !           309:                        return -1;
        !           310:                } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
        !           311:                        if (Dup++ > 1)
        !           312:                                return -1;
        !           313:                        else
        !           314:                                continue;
        !           315:                }
        !           316: 
        !           317:                /* receive PUBCOMP */
        !           318:                siz = mqtt_readPUBCOMP(cli->buf);
        !           319:                if (siz == -1)
        !           320:                        return -1;
        !           321:                if (msgID != siz) {
        !           322:                        mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, siz);
        !           323:                        if (Dup++ > 1)
        !           324:                                return -1;
        !           325:                        else
        !           326:                                continue;
        !           327:                }
        !           328:        } while (0);
        !           329: 
        !           330: end:
1.1.2.5   misho     331:        return 0;
                    332: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>