Annotation of libaitmqtt/src/cliside.c, revision 1.3.12.3

1.2       misho       1: /*************************************************************************
                      2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
                      3: *  by Michael Pounov <misho@openbsd-bg.org>
                      4: *
                      5: * $Author: misho $
1.3.12.3! misho       6: * $Id: cliside.c,v 1.3.12.2 2022/09/15 15:04:44 misho Exp $
1.2       misho       7: *
                      8: **************************************************************************
                      9: The ELWIX and AITNET software is distributed under the following
                     10: terms:
                     11: 
                     12: All of the documentation and software included in the ELWIX and AITNET
                     13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
                     14: 
1.3.12.2  misho      15: Copyright 2004 - 2022
1.2       misho      16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
                     17: 
                     18: Redistribution and use in source and binary forms, with or without
                     19: modification, are permitted provided that the following conditions
                     20: are met:
                     21: 1. Redistributions of source code must retain the above copyright
                     22:    notice, this list of conditions and the following disclaimer.
                     23: 2. Redistributions in binary form must reproduce the above copyright
                     24:    notice, this list of conditions and the following disclaimer in the
                     25:    documentation and/or other materials provided with the distribution.
                     26: 3. All advertising materials mentioning features or use of this software
                     27:    must display the following acknowledgement:
                     28: This product includes software developed by Michael Pounov <misho@elwix.org>
                     29: ELWIX - Embedded LightWeight unIX and its contributors.
                     30: 4. Neither the name of AITNET nor the names of its contributors
                     31:    may be used to endorse or promote products derived from this software
                     32:    without specific prior written permission.
                     33: 
                     34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
                     35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
                     36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
                     37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
                     38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
                     39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
                     40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
                     41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
                     42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
                     43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
                     44: SUCH DAMAGE.
                     45: */
                     46: #include "global.h"
                     47: 
                     48: 
                     49: /*
                     50:  * mqtt_cli_Open() - Open client connection to MQTT broker
                     51:  *
                     52:  * @addr = brokers address
                     53:  * @timeout = timeout
                     54:  * return: NULL error or !=NULL connected to broker
                     55:  */
                     56: mqtt_cli_t *
                     57: mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
                     58: {
                     59:        mqtt_cli_t *cli;
                     60: 
                     61:        if (!addr)
                     62:                return NULL;
                     63: 
1.3.12.2  misho      64:        cli = e_malloc(sizeof(mqtt_cli_t));
1.2       misho      65:        if (!cli) {
                     66:                LOGERR;
                     67:                return NULL;
                     68:        } else
                     69:                memset(cli, 0, sizeof(mqtt_cli_t));
                     70: 
                     71:        cli->timeout = timeout;
                     72:        cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
                     73:        if (cli->sock == -1) {
                     74:                LOGERR;
1.3.12.2  misho      75:                e_free(cli);
1.2       misho      76:                return NULL;
                     77:        }
1.3.12.1  misho      78: #ifndef __linux__
1.2       misho      79:        if (connect(cli->sock, addr, addr->sa_len) == -1) {
1.3.12.1  misho      80: #else
                     81:        if (connect(cli->sock, addr, addr->sa_family == AF_INET6 ? 
                     82:                                sizeof(struct sockaddr_in6) : 
                     83:                                sizeof(struct sockaddr_in)) == -1) {
                     84: #endif
1.2       misho      85:                LOGERR;
                     86:                close(cli->sock);
1.3.12.2  misho      87:                e_free(cli);
1.2       misho      88:                return NULL;
                     89:        }
                     90: 
                     91:        return cli;
                     92: }
                     93: 
                     94: /*
                     95:  * mqtt_cli_Close() - Close client connection
                     96:  *
                     97:  * @cli = connected client
                     98:  * return: -1 error or 0 disconnected client and freed all resources
                     99:  */
                    100: int
                    101: mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
                    102: {
1.3.12.2  misho     103:        int siz;
1.2       misho     104: 
                    105:        if (!cli || !*cli)
                    106:                return -1;
                    107: 
                    108:        /* send disconnect */
1.3.12.2  misho     109:        (*cli)->buf = mqtt_msgDISCONNECT();
                    110:        if ((*cli)->buf) {
                    111:                siz = send((*cli)->sock, (*cli)->buf->msg_base, (*cli)->buf->msg_len, MSG_NOSIGNAL);
1.2       misho     112:                if (siz > -1)
                    113:                        shutdown((*cli)->sock, SHUT_RDWR);
                    114:        }
                    115:        close((*cli)->sock);
                    116: 
1.3.12.2  misho     117:        mqtt_msgFree(&(*cli)->buf, 0);
1.2       misho     118: 
1.3.12.2  misho     119:        e_free(*cli);
1.2       misho     120:        *cli = NULL;
                    121:        return 0;
                    122: }
                    123: 
                    124: /*
                    125:  * mqtt_cli_Subscribe() - Subscribe to broker
                    126:  *
                    127:  * @cli = connected client
                    128:  * @Topics = Topics for subscribes
                    129:  * @msgID = Message ID
                    130:  * return: NULL error or !=NULL allocated array with subscribed QoS responses, 
1.3.12.2  misho     131:  *     must be e_free() result!
1.2       misho     132:  */
                    133: u_char *
1.3.12.2  misho     134: mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t ** __restrict Topics, u_short msgID)
1.2       misho     135: {
                    136:        int siz = 0;
                    137:        u_short mid = 0;
                    138:        u_char *qoses = NULL;
                    139: 
                    140:        if (!cli || !Topics)
                    141:                return NULL;
                    142: 
                    143:        /* send subscribe */
1.3.12.2  misho     144:        cli->buf = mqtt_msgSUBSCRIBE(Topics, msgID);
                    145:        if (!cli->buf)
1.2       misho     146:                return NULL;
1.3.12.2  misho     147:        siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
1.2       misho     148:        if (siz == -1) {
                    149:                LOGERR;
                    150:                return NULL;
1.3       misho     151:        } else
1.3.12.2  misho     152:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     153: 
                    154:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                    155:                return NULL;
                    156:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
                    157:                return NULL;
                    158: 
                    159:        /* receive suback */
1.3.12.3! misho     160:        cli->buf = mqtt_msgAlloc(BUFSIZ);
        !           161:        if (!cli->buf)
        !           162:                return NULL;
1.2       misho     163:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
                    164:        if (siz == -1) {
                    165:                LOGERR;
1.3.12.3! misho     166:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     167:                return NULL;
                    168:        }
                    169:        siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
1.3.12.3! misho     170:        if (siz == -1) {
        !           171:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     172:                return NULL;
1.3.12.3! misho     173:        }
1.2       misho     174:        if (msgID != mid) {
1.3.12.3! misho     175:                if (qoses)
        !           176:                        e_free(qoses);
        !           177:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     178:                mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid);
                    179:                return NULL;
                    180:        }
                    181: 
1.3.12.3! misho     182:        mqtt_msgFree(&cli->buf, 0);
1.2       misho     183:        return qoses;
                    184: }
                    185: 
                    186: /*
                    187:  * mqtt_cli_Unsubscribe() - Unsubscribe from broker
                    188:  *
                    189:  * @cli = connected client
                    190:  * @Topics = Topics for unsubscribes
                    191:  * @msgID = Message ID
                    192:  * @Dup = Duplicated request
                    193:  * @QoS = Message QoS
                    194:  * return: -1 error or 0 ok
                    195:  */
                    196: int
1.3.12.3! misho     197: mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t ** __restrict Topics, 
1.2       misho     198:                u_short msgID, u_char Dup, u_char QoS)
                    199: {
                    200:        int siz = 0;
                    201: 
                    202:        if (!cli || !Topics)
                    203:                return -1;
                    204: 
                    205:        /* send unsubscribe */
1.3.12.3! misho     206:        cli->buf = mqtt_msgUNSUBSCRIBE(Topics, msgID, Dup, QoS);
        !           207:        if (!cli->buf)
1.2       misho     208:                return -1;
1.3.12.3! misho     209:        siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
1.2       misho     210:        if (siz == -1) {
                    211:                LOGERR;
1.3.12.3! misho     212:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     213:                return -1;
1.3       misho     214:        } else
1.3.12.3! misho     215:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     216: 
                    217:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                    218:                return -1;
                    219:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
                    220:                return -1;
                    221: 
                    222:        /* receive unsuback */
1.3.12.3! misho     223:        cli->buf = mqtt_msgAlloc(BUFSIZ);
        !           224:        if (!cli->buf)
        !           225:                return -1;
1.2       misho     226:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
                    227:        if (siz == -1) {
                    228:                LOGERR;
1.3.12.3! misho     229:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     230:                return -1;
                    231:        }
                    232:        siz = mqtt_readUNSUBACK(cli->buf);
1.3.12.3! misho     233:        if (siz == -1) {
        !           234:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     235:                return -1;
1.3.12.3! misho     236:        }
1.2       misho     237:        if (msgID != siz) {
1.3.12.3! misho     238:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     239:                mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
                    240:                return -1;
                    241:        }
                    242: 
1.3.12.3! misho     243:        mqtt_msgFree(&cli->buf, 0);
1.2       misho     244:        return 0;
                    245: }
                    246: 
                    247: /*
                    248:  * mqtt_cli_Publish() - Publish message to broker
                    249:  *
                    250:  * @cli = connected client
                    251:  * @msgID = Message ID
                    252:  * @Dup = Duplicated request
                    253:  * @QoS = Message QoS
                    254:  * @Retain = Retain message
                    255:  * @csTopic = Topic
                    256:  * @pData = Data
                    257:  * @datLen = Data length
                    258:  * return: -1 error or > -1 sended bytes
                    259:  */
                    260: int
                    261: mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain, 
                    262:                const char *csTopic, const void *pData, int datLen)
                    263: {
                    264:        int wlen = 0, siz = 0;
                    265: 
                    266:        if (!cli || !csTopic)
                    267:                return -1;
                    268: 
                    269:        /* send publish */
1.3.12.3! misho     270:        cli->buf = mqtt_msgPUBLISH(csTopic, msgID, Dup, QoS, Retain, pData, datLen);
        !           271:        if (!cli->buf)
1.2       misho     272:                return -1;
1.3.12.3! misho     273:        wlen = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
        !           274:        if (wlen == -1) {
1.2       misho     275:                LOGERR;
1.3.12.3! misho     276:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     277:                return -1;
1.3.12.3! misho     278:        } else
        !           279:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     280: 
                    281:        if (QoS == MQTT_QOS_ONCE)       /* no reply */
                    282:                goto end;
                    283: 
                    284:        if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                    285:                return -1;
                    286:        } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
                    287:                return -1;
                    288: 
                    289:        /* receive PUBxxx */
1.3.12.3! misho     290:        cli->buf = mqtt_msgAlloc(BUFSIZ);
        !           291:        if (!cli->buf)
        !           292:                return -1;
1.2       misho     293:        siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
                    294:        if (siz == -1) {
                    295:                LOGERR;
1.3.12.3! misho     296:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     297:                return -1;
                    298:        }
                    299: 
                    300:        if (QoS == MQTT_QOS_ACK) {      /* reply with PUBACK */
                    301:                siz = mqtt_readPUBACK(cli->buf);
1.3.12.3! misho     302:                if (siz == -1) {
        !           303:                        mqtt_msgFree(&cli->buf, 0);
1.2       misho     304:                        return -1;
1.3.12.3! misho     305:                }
1.2       misho     306:                if (msgID != siz) {
1.3.12.3! misho     307:                        mqtt_msgFree(&cli->buf, 0);
1.2       misho     308:                        mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
                    309:                        return -1;
                    310:                }
1.3.12.3! misho     311:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     312:                goto end;
                    313:        } else {                        /* reply with PUBREC */
                    314:                siz = mqtt_readPUBREC(cli->buf);
1.3.12.3! misho     315:                if (siz == -1) {
        !           316:                        mqtt_msgFree(&cli->buf, 0);
1.2       misho     317:                        return -1;
1.3.12.3! misho     318:                }
1.2       misho     319:                if (msgID != siz) {
1.3.12.3! misho     320:                        mqtt_msgFree(&cli->buf, 0);
1.2       misho     321:                        mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
                    322:                        return -1;
                    323:                }
1.3.12.3! misho     324:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     325:        }
                    326: 
                    327:        do {
                    328:                /* send publish release QoS == 2 */
1.3.12.3! misho     329:                cli->buf = mqtt_msgPUBREL(msgID);
        !           330:                if (!cli->buf)
1.2       misho     331:                        return -1;
1.3.12.3! misho     332:                siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
1.2       misho     333:                if (siz == -1) {
                    334:                        LOGERR;
1.3.12.3! misho     335:                        mqtt_msgFree(&cli->buf, 0);
1.2       misho     336:                        return -1;
1.3       misho     337:                } else
1.3.12.3! misho     338:                        mqtt_msgFree(&cli->buf, 0);
1.2       misho     339: 
                    340:                if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                    341:                        return -1;
                    342:                } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
                    343:                        if (Dup++ > 1)
                    344:                                return -1;
                    345:                        else
                    346:                                continue;
                    347:                }
                    348: 
                    349:                /* receive PUBCOMP */
1.3.12.3! misho     350:                cli->buf = mqtt_msgAlloc(BUFSIZ);
        !           351:                if (!cli->buf)
        !           352:                        return -1;
1.3       misho     353:                siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
                    354:                if (siz == -1) {
                    355:                        LOGERR;
1.3.12.3! misho     356:                        mqtt_msgFree(&cli->buf, 0);
1.3       misho     357:                        return -1;
                    358:                }
                    359: 
1.2       misho     360:                siz = mqtt_readPUBCOMP(cli->buf);
1.3.12.3! misho     361:                if (siz == -1) {
        !           362:                        mqtt_msgFree(&cli->buf, 0);
1.2       misho     363:                        return -1;
1.3.12.3! misho     364:                }
1.2       misho     365:                if (msgID != siz) {
1.3.12.3! misho     366:                        mqtt_msgFree(&cli->buf, 0);
1.2       misho     367:                        mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
                    368:                        if (Dup++ > 1)
                    369:                                return -1;
                    370:                        else
                    371:                                continue;
                    372:                }
1.3.12.3! misho     373:                mqtt_msgFree(&cli->buf, 0);
1.2       misho     374:        } while (0);
                    375: 
                    376: end:
                    377:        return wlen;
                    378: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>