Annotation of libaitmqtt/src/cliside.c, revision 1.3.12.3
1.2 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.3.12.3! misho 6: * $Id: cliside.c,v 1.3.12.2 2022/09/15 15:04:44 misho Exp $
1.2 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.3.12.2 misho 15: Copyright 2004 - 2022
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_cli_Open() - Open client connection to MQTT broker
51: *
52: * @addr = brokers address
53: * @timeout = timeout
54: * return: NULL error or !=NULL connected to broker
55: */
56: mqtt_cli_t *
57: mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
58: {
59: mqtt_cli_t *cli;
60:
61: if (!addr)
62: return NULL;
63:
1.3.12.2 misho 64: cli = e_malloc(sizeof(mqtt_cli_t));
1.2 misho 65: if (!cli) {
66: LOGERR;
67: return NULL;
68: } else
69: memset(cli, 0, sizeof(mqtt_cli_t));
70:
71: cli->timeout = timeout;
72: cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
73: if (cli->sock == -1) {
74: LOGERR;
1.3.12.2 misho 75: e_free(cli);
1.2 misho 76: return NULL;
77: }
1.3.12.1 misho 78: #ifndef __linux__
1.2 misho 79: if (connect(cli->sock, addr, addr->sa_len) == -1) {
1.3.12.1 misho 80: #else
81: if (connect(cli->sock, addr, addr->sa_family == AF_INET6 ?
82: sizeof(struct sockaddr_in6) :
83: sizeof(struct sockaddr_in)) == -1) {
84: #endif
1.2 misho 85: LOGERR;
86: close(cli->sock);
1.3.12.2 misho 87: e_free(cli);
1.2 misho 88: return NULL;
89: }
90:
91: return cli;
92: }
93:
94: /*
95: * mqtt_cli_Close() - Close client connection
96: *
97: * @cli = connected client
98: * return: -1 error or 0 disconnected client and freed all resources
99: */
100: int
101: mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
102: {
1.3.12.2 misho 103: int siz;
1.2 misho 104:
105: if (!cli || !*cli)
106: return -1;
107:
108: /* send disconnect */
1.3.12.2 misho 109: (*cli)->buf = mqtt_msgDISCONNECT();
110: if ((*cli)->buf) {
111: siz = send((*cli)->sock, (*cli)->buf->msg_base, (*cli)->buf->msg_len, MSG_NOSIGNAL);
1.2 misho 112: if (siz > -1)
113: shutdown((*cli)->sock, SHUT_RDWR);
114: }
115: close((*cli)->sock);
116:
1.3.12.2 misho 117: mqtt_msgFree(&(*cli)->buf, 0);
1.2 misho 118:
1.3.12.2 misho 119: e_free(*cli);
1.2 misho 120: *cli = NULL;
121: return 0;
122: }
123:
124: /*
125: * mqtt_cli_Subscribe() - Subscribe to broker
126: *
127: * @cli = connected client
128: * @Topics = Topics for subscribes
129: * @msgID = Message ID
130: * return: NULL error or !=NULL allocated array with subscribed QoS responses,
1.3.12.2 misho 131: * must be e_free() result!
1.2 misho 132: */
133: u_char *
1.3.12.2 misho 134: mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t ** __restrict Topics, u_short msgID)
1.2 misho 135: {
136: int siz = 0;
137: u_short mid = 0;
138: u_char *qoses = NULL;
139:
140: if (!cli || !Topics)
141: return NULL;
142:
143: /* send subscribe */
1.3.12.2 misho 144: cli->buf = mqtt_msgSUBSCRIBE(Topics, msgID);
145: if (!cli->buf)
1.2 misho 146: return NULL;
1.3.12.2 misho 147: siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
1.2 misho 148: if (siz == -1) {
149: LOGERR;
150: return NULL;
1.3 misho 151: } else
1.3.12.2 misho 152: mqtt_msgFree(&cli->buf, 0);
1.2 misho 153:
154: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
155: return NULL;
156: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
157: return NULL;
158:
159: /* receive suback */
1.3.12.3! misho 160: cli->buf = mqtt_msgAlloc(BUFSIZ);
! 161: if (!cli->buf)
! 162: return NULL;
1.2 misho 163: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
164: if (siz == -1) {
165: LOGERR;
1.3.12.3! misho 166: mqtt_msgFree(&cli->buf, 0);
1.2 misho 167: return NULL;
168: }
169: siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
1.3.12.3! misho 170: if (siz == -1) {
! 171: mqtt_msgFree(&cli->buf, 0);
1.2 misho 172: return NULL;
1.3.12.3! misho 173: }
1.2 misho 174: if (msgID != mid) {
1.3.12.3! misho 175: if (qoses)
! 176: e_free(qoses);
! 177: mqtt_msgFree(&cli->buf, 0);
1.2 misho 178: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid);
179: return NULL;
180: }
181:
1.3.12.3! misho 182: mqtt_msgFree(&cli->buf, 0);
1.2 misho 183: return qoses;
184: }
185:
186: /*
187: * mqtt_cli_Unsubscribe() - Unsubscribe from broker
188: *
189: * @cli = connected client
190: * @Topics = Topics for unsubscribes
191: * @msgID = Message ID
192: * @Dup = Duplicated request
193: * @QoS = Message QoS
194: * return: -1 error or 0 ok
195: */
196: int
1.3.12.3! misho 197: mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t ** __restrict Topics,
1.2 misho 198: u_short msgID, u_char Dup, u_char QoS)
199: {
200: int siz = 0;
201:
202: if (!cli || !Topics)
203: return -1;
204:
205: /* send unsubscribe */
1.3.12.3! misho 206: cli->buf = mqtt_msgUNSUBSCRIBE(Topics, msgID, Dup, QoS);
! 207: if (!cli->buf)
1.2 misho 208: return -1;
1.3.12.3! misho 209: siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
1.2 misho 210: if (siz == -1) {
211: LOGERR;
1.3.12.3! misho 212: mqtt_msgFree(&cli->buf, 0);
1.2 misho 213: return -1;
1.3 misho 214: } else
1.3.12.3! misho 215: mqtt_msgFree(&cli->buf, 0);
1.2 misho 216:
217: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
218: return -1;
219: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
220: return -1;
221:
222: /* receive unsuback */
1.3.12.3! misho 223: cli->buf = mqtt_msgAlloc(BUFSIZ);
! 224: if (!cli->buf)
! 225: return -1;
1.2 misho 226: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
227: if (siz == -1) {
228: LOGERR;
1.3.12.3! misho 229: mqtt_msgFree(&cli->buf, 0);
1.2 misho 230: return -1;
231: }
232: siz = mqtt_readUNSUBACK(cli->buf);
1.3.12.3! misho 233: if (siz == -1) {
! 234: mqtt_msgFree(&cli->buf, 0);
1.2 misho 235: return -1;
1.3.12.3! misho 236: }
1.2 misho 237: if (msgID != siz) {
1.3.12.3! misho 238: mqtt_msgFree(&cli->buf, 0);
1.2 misho 239: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
240: return -1;
241: }
242:
1.3.12.3! misho 243: mqtt_msgFree(&cli->buf, 0);
1.2 misho 244: return 0;
245: }
246:
247: /*
248: * mqtt_cli_Publish() - Publish message to broker
249: *
250: * @cli = connected client
251: * @msgID = Message ID
252: * @Dup = Duplicated request
253: * @QoS = Message QoS
254: * @Retain = Retain message
255: * @csTopic = Topic
256: * @pData = Data
257: * @datLen = Data length
258: * return: -1 error or > -1 sended bytes
259: */
260: int
261: mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain,
262: const char *csTopic, const void *pData, int datLen)
263: {
264: int wlen = 0, siz = 0;
265:
266: if (!cli || !csTopic)
267: return -1;
268:
269: /* send publish */
1.3.12.3! misho 270: cli->buf = mqtt_msgPUBLISH(csTopic, msgID, Dup, QoS, Retain, pData, datLen);
! 271: if (!cli->buf)
1.2 misho 272: return -1;
1.3.12.3! misho 273: wlen = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
! 274: if (wlen == -1) {
1.2 misho 275: LOGERR;
1.3.12.3! misho 276: mqtt_msgFree(&cli->buf, 0);
1.2 misho 277: return -1;
1.3.12.3! misho 278: } else
! 279: mqtt_msgFree(&cli->buf, 0);
1.2 misho 280:
281: if (QoS == MQTT_QOS_ONCE) /* no reply */
282: goto end;
283:
284: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
285: return -1;
286: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
287: return -1;
288:
289: /* receive PUBxxx */
1.3.12.3! misho 290: cli->buf = mqtt_msgAlloc(BUFSIZ);
! 291: if (!cli->buf)
! 292: return -1;
1.2 misho 293: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
294: if (siz == -1) {
295: LOGERR;
1.3.12.3! misho 296: mqtt_msgFree(&cli->buf, 0);
1.2 misho 297: return -1;
298: }
299:
300: if (QoS == MQTT_QOS_ACK) { /* reply with PUBACK */
301: siz = mqtt_readPUBACK(cli->buf);
1.3.12.3! misho 302: if (siz == -1) {
! 303: mqtt_msgFree(&cli->buf, 0);
1.2 misho 304: return -1;
1.3.12.3! misho 305: }
1.2 misho 306: if (msgID != siz) {
1.3.12.3! misho 307: mqtt_msgFree(&cli->buf, 0);
1.2 misho 308: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
309: return -1;
310: }
1.3.12.3! misho 311: mqtt_msgFree(&cli->buf, 0);
1.2 misho 312: goto end;
313: } else { /* reply with PUBREC */
314: siz = mqtt_readPUBREC(cli->buf);
1.3.12.3! misho 315: if (siz == -1) {
! 316: mqtt_msgFree(&cli->buf, 0);
1.2 misho 317: return -1;
1.3.12.3! misho 318: }
1.2 misho 319: if (msgID != siz) {
1.3.12.3! misho 320: mqtt_msgFree(&cli->buf, 0);
1.2 misho 321: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
322: return -1;
323: }
1.3.12.3! misho 324: mqtt_msgFree(&cli->buf, 0);
1.2 misho 325: }
326:
327: do {
328: /* send publish release QoS == 2 */
1.3.12.3! misho 329: cli->buf = mqtt_msgPUBREL(msgID);
! 330: if (!cli->buf)
1.2 misho 331: return -1;
1.3.12.3! misho 332: siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
1.2 misho 333: if (siz == -1) {
334: LOGERR;
1.3.12.3! misho 335: mqtt_msgFree(&cli->buf, 0);
1.2 misho 336: return -1;
1.3 misho 337: } else
1.3.12.3! misho 338: mqtt_msgFree(&cli->buf, 0);
1.2 misho 339:
340: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
341: return -1;
342: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
343: if (Dup++ > 1)
344: return -1;
345: else
346: continue;
347: }
348:
349: /* receive PUBCOMP */
1.3.12.3! misho 350: cli->buf = mqtt_msgAlloc(BUFSIZ);
! 351: if (!cli->buf)
! 352: return -1;
1.3 misho 353: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
354: if (siz == -1) {
355: LOGERR;
1.3.12.3! misho 356: mqtt_msgFree(&cli->buf, 0);
1.3 misho 357: return -1;
358: }
359:
1.2 misho 360: siz = mqtt_readPUBCOMP(cli->buf);
1.3.12.3! misho 361: if (siz == -1) {
! 362: mqtt_msgFree(&cli->buf, 0);
1.2 misho 363: return -1;
1.3.12.3! misho 364: }
1.2 misho 365: if (msgID != siz) {
1.3.12.3! misho 366: mqtt_msgFree(&cli->buf, 0);
1.2 misho 367: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
368: if (Dup++ > 1)
369: return -1;
370: else
371: continue;
372: }
1.3.12.3! misho 373: mqtt_msgFree(&cli->buf, 0);
1.2 misho 374: } while (0);
375:
376: end:
377: return wlen;
378: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>