Annotation of libaitmqtt/src/cliside.c, revision 1.2
1.2 ! misho 1: /*************************************************************************
! 2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
! 3: * by Michael Pounov <misho@openbsd-bg.org>
! 4: *
! 5: * $Author: misho $
! 6: * $Id: cliside.c,v 1.1.2.8 2012/06/19 15:55:01 misho Exp $
! 7: *
! 8: **************************************************************************
! 9: The ELWIX and AITNET software is distributed under the following
! 10: terms:
! 11:
! 12: All of the documentation and software included in the ELWIX and AITNET
! 13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
! 14:
! 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
! 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
! 17:
! 18: Redistribution and use in source and binary forms, with or without
! 19: modification, are permitted provided that the following conditions
! 20: are met:
! 21: 1. Redistributions of source code must retain the above copyright
! 22: notice, this list of conditions and the following disclaimer.
! 23: 2. Redistributions in binary form must reproduce the above copyright
! 24: notice, this list of conditions and the following disclaimer in the
! 25: documentation and/or other materials provided with the distribution.
! 26: 3. All advertising materials mentioning features or use of this software
! 27: must display the following acknowledgement:
! 28: This product includes software developed by Michael Pounov <misho@elwix.org>
! 29: ELWIX - Embedded LightWeight unIX and its contributors.
! 30: 4. Neither the name of AITNET nor the names of its contributors
! 31: may be used to endorse or promote products derived from this software
! 32: without specific prior written permission.
! 33:
! 34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
! 35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
! 36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
! 37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
! 38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
! 39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
! 40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
! 41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
! 42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
! 43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
! 44: SUCH DAMAGE.
! 45: */
! 46: #include "global.h"
! 47:
! 48:
! 49: /*
! 50: * mqtt_cli_Open() - Open client connection to MQTT broker
! 51: *
! 52: * @addr = brokers address
! 53: * @timeout = timeout
! 54: * return: NULL error or !=NULL connected to broker
! 55: */
! 56: mqtt_cli_t *
! 57: mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
! 58: {
! 59: mqtt_cli_t *cli;
! 60:
! 61: if (!addr)
! 62: return NULL;
! 63:
! 64: cli = malloc(sizeof(mqtt_cli_t));
! 65: if (!cli) {
! 66: LOGERR;
! 67: return NULL;
! 68: } else
! 69: memset(cli, 0, sizeof(mqtt_cli_t));
! 70:
! 71: cli->timeout = timeout;
! 72: cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
! 73: if (cli->sock == -1) {
! 74: LOGERR;
! 75: free(cli);
! 76: return NULL;
! 77: }
! 78: if (connect(cli->sock, addr, addr->sa_len) == -1) {
! 79: LOGERR;
! 80: close(cli->sock);
! 81: free(cli);
! 82: return NULL;
! 83: }
! 84:
! 85: cli->buf = mqtt_msgAlloc(USHRT_MAX);
! 86: if (!cli->buf) {
! 87: close(cli->sock);
! 88: free(cli);
! 89: return NULL;
! 90: }
! 91:
! 92: return cli;
! 93: }
! 94:
! 95: /*
! 96: * mqtt_cli_Close() - Close client connection
! 97: *
! 98: * @cli = connected client
! 99: * return: -1 error or 0 disconnected client and freed all resources
! 100: */
! 101: int
! 102: mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
! 103: {
! 104: int siz = 0;
! 105:
! 106: if (!cli || !*cli)
! 107: return -1;
! 108:
! 109: /* send disconnect */
! 110: siz = mqtt_msgDISCONNECT((*cli)->buf);
! 111: if (siz > -1) {
! 112: siz = send((*cli)->sock, (*cli)->buf->msg_base, siz, MSG_NOSIGNAL);
! 113: if (siz > -1)
! 114: shutdown((*cli)->sock, SHUT_RDWR);
! 115: }
! 116: close((*cli)->sock);
! 117:
! 118: mqtt_msgFree(&(*cli)->buf, 42);
! 119:
! 120: free(*cli);
! 121: *cli = NULL;
! 122: return 0;
! 123: }
! 124:
! 125: /*
! 126: * mqtt_cli_Subscribe() - Subscribe to broker
! 127: *
! 128: * @cli = connected client
! 129: * @Topics = Topics for subscribes
! 130: * @msgID = Message ID
! 131: * @Dup = Duplicated request
! 132: * @QoS = Message QoS
! 133: * return: NULL error or !=NULL allocated array with subscribed QoS responses,
! 134: * must be free() result!
! 135: */
! 136: u_char *
! 137: mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics,
! 138: u_short msgID, u_char Dup, u_char QoS)
! 139: {
! 140: int siz = 0;
! 141: u_short mid = 0;
! 142: u_char *qoses = NULL;
! 143:
! 144: if (!cli || !Topics)
! 145: return NULL;
! 146:
! 147: /* send subscribe */
! 148: siz = mqtt_msgSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
! 149: if (siz == -1)
! 150: return NULL;
! 151: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
! 152: if (siz == -1) {
! 153: LOGERR;
! 154: return NULL;
! 155: }
! 156:
! 157: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
! 158: return NULL;
! 159: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
! 160: return NULL;
! 161:
! 162: /* receive suback */
! 163: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
! 164: if (siz == -1) {
! 165: LOGERR;
! 166: return NULL;
! 167: }
! 168: siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
! 169: if (siz == -1)
! 170: return NULL;
! 171: if (msgID != mid) {
! 172: free(qoses);
! 173: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid);
! 174: return NULL;
! 175: }
! 176:
! 177: return qoses;
! 178: }
! 179:
! 180: /*
! 181: * mqtt_cli_Unsubscribe() - Unsubscribe from broker
! 182: *
! 183: * @cli = connected client
! 184: * @Topics = Topics for unsubscribes
! 185: * @msgID = Message ID
! 186: * @Dup = Duplicated request
! 187: * @QoS = Message QoS
! 188: * return: -1 error or 0 ok
! 189: */
! 190: int
! 191: mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics,
! 192: u_short msgID, u_char Dup, u_char QoS)
! 193: {
! 194: int siz = 0;
! 195:
! 196: if (!cli || !Topics)
! 197: return -1;
! 198:
! 199: /* send unsubscribe */
! 200: siz = mqtt_msgUNSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
! 201: if (siz == -1)
! 202: return -1;
! 203: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
! 204: if (siz == -1) {
! 205: LOGERR;
! 206: return -1;
! 207: }
! 208:
! 209: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
! 210: return -1;
! 211: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
! 212: return -1;
! 213:
! 214: /* receive unsuback */
! 215: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
! 216: if (siz == -1) {
! 217: LOGERR;
! 218: return -1;
! 219: }
! 220: siz = mqtt_readUNSUBACK(cli->buf);
! 221: if (siz == -1)
! 222: return -1;
! 223: if (msgID != siz) {
! 224: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
! 225: return -1;
! 226: }
! 227:
! 228: return 0;
! 229: }
! 230:
! 231: /*
! 232: * mqtt_cli_Publish() - Publish message to broker
! 233: *
! 234: * @cli = connected client
! 235: * @msgID = Message ID
! 236: * @Dup = Duplicated request
! 237: * @QoS = Message QoS
! 238: * @Retain = Retain message
! 239: * @csTopic = Topic
! 240: * @pData = Data
! 241: * @datLen = Data length
! 242: * return: -1 error or > -1 sended bytes
! 243: */
! 244: int
! 245: mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain,
! 246: const char *csTopic, const void *pData, int datLen)
! 247: {
! 248: int wlen = 0, siz = 0;
! 249:
! 250: if (!cli || !csTopic)
! 251: return -1;
! 252:
! 253: /* send publish */
! 254: siz = mqtt_msgPUBLISH(cli->buf, csTopic, msgID, Dup, QoS, Retain, pData, datLen);
! 255: if (siz == -1)
! 256: return -1;
! 257: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
! 258: if (siz == -1) {
! 259: LOGERR;
! 260: return -1;
! 261: } else
! 262: wlen = siz;
! 263:
! 264: if (QoS == MQTT_QOS_ONCE) /* no reply */
! 265: goto end;
! 266:
! 267: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
! 268: return -1;
! 269: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
! 270: return -1;
! 271:
! 272: /* receive PUBxxx */
! 273: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
! 274: if (siz == -1) {
! 275: LOGERR;
! 276: return -1;
! 277: }
! 278:
! 279: if (QoS == MQTT_QOS_ACK) { /* reply with PUBACK */
! 280: siz = mqtt_readPUBACK(cli->buf);
! 281: if (siz == -1)
! 282: return -1;
! 283: if (msgID != siz) {
! 284: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
! 285: return -1;
! 286: }
! 287: goto end;
! 288: } else { /* reply with PUBREC */
! 289: siz = mqtt_readPUBREC(cli->buf);
! 290: if (siz == -1)
! 291: return -1;
! 292: if (msgID != siz) {
! 293: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
! 294: return -1;
! 295: }
! 296: }
! 297:
! 298: do {
! 299: /* send publish release QoS == 2 */
! 300: siz = mqtt_msgPUBREL(cli->buf, msgID);
! 301: if (siz == -1)
! 302: return -1;
! 303: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
! 304: if (siz == -1) {
! 305: LOGERR;
! 306: return -1;
! 307: }
! 308:
! 309: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
! 310: return -1;
! 311: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
! 312: if (Dup++ > 1)
! 313: return -1;
! 314: else
! 315: continue;
! 316: }
! 317:
! 318: /* receive PUBCOMP */
! 319: siz = mqtt_readPUBCOMP(cli->buf);
! 320: if (siz == -1)
! 321: return -1;
! 322: if (msgID != siz) {
! 323: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
! 324: if (Dup++ > 1)
! 325: return -1;
! 326: else
! 327: continue;
! 328: }
! 329: } while (0);
! 330:
! 331: end:
! 332: return wlen;
! 333: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>