Annotation of libaitmqtt/src/cliside.c, revision 1.3.12.2
1.2 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.3.12.2! misho 6: * $Id: cliside.c,v 1.3.12.1 2016/09/14 15:52:36 misho Exp $
1.2 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.3.12.2! misho 15: Copyright 2004 - 2022
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_cli_Open() - Open client connection to MQTT broker
51: *
52: * @addr = brokers address
53: * @timeout = timeout
54: * return: NULL error or !=NULL connected to broker
55: */
56: mqtt_cli_t *
57: mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
58: {
59: mqtt_cli_t *cli;
60:
61: if (!addr)
62: return NULL;
63:
1.3.12.2! misho 64: cli = e_malloc(sizeof(mqtt_cli_t));
1.2 misho 65: if (!cli) {
66: LOGERR;
67: return NULL;
68: } else
69: memset(cli, 0, sizeof(mqtt_cli_t));
70:
71: cli->timeout = timeout;
72: cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
73: if (cli->sock == -1) {
74: LOGERR;
1.3.12.2! misho 75: e_free(cli);
1.2 misho 76: return NULL;
77: }
1.3.12.1 misho 78: #ifndef __linux__
1.2 misho 79: if (connect(cli->sock, addr, addr->sa_len) == -1) {
1.3.12.1 misho 80: #else
81: if (connect(cli->sock, addr, addr->sa_family == AF_INET6 ?
82: sizeof(struct sockaddr_in6) :
83: sizeof(struct sockaddr_in)) == -1) {
84: #endif
1.2 misho 85: LOGERR;
86: close(cli->sock);
1.3.12.2! misho 87: e_free(cli);
1.2 misho 88: return NULL;
89: }
90:
91: return cli;
92: }
93:
94: /*
95: * mqtt_cli_Close() - Close client connection
96: *
97: * @cli = connected client
98: * return: -1 error or 0 disconnected client and freed all resources
99: */
100: int
101: mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
102: {
1.3.12.2! misho 103: int siz;
1.2 misho 104:
105: if (!cli || !*cli)
106: return -1;
107:
108: /* send disconnect */
1.3.12.2! misho 109: (*cli)->buf = mqtt_msgDISCONNECT();
! 110: if ((*cli)->buf) {
! 111: siz = send((*cli)->sock, (*cli)->buf->msg_base, (*cli)->buf->msg_len, MSG_NOSIGNAL);
1.2 misho 112: if (siz > -1)
113: shutdown((*cli)->sock, SHUT_RDWR);
114: }
115: close((*cli)->sock);
116:
1.3.12.2! misho 117: mqtt_msgFree(&(*cli)->buf, 0);
1.2 misho 118:
1.3.12.2! misho 119: e_free(*cli);
1.2 misho 120: *cli = NULL;
121: return 0;
122: }
123:
124: /*
125: * mqtt_cli_Subscribe() - Subscribe to broker
126: *
127: * @cli = connected client
128: * @Topics = Topics for subscribes
129: * @msgID = Message ID
130: * return: NULL error or !=NULL allocated array with subscribed QoS responses,
1.3.12.2! misho 131: * must be e_free() result!
1.2 misho 132: */
133: u_char *
1.3.12.2! misho 134: mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t ** __restrict Topics, u_short msgID)
1.2 misho 135: {
136: int siz = 0;
137: u_short mid = 0;
138: u_char *qoses = NULL;
139:
140: if (!cli || !Topics)
141: return NULL;
142:
143: /* send subscribe */
1.3.12.2! misho 144: cli->buf = mqtt_msgSUBSCRIBE(Topics, msgID);
! 145: if (!cli->buf)
1.2 misho 146: return NULL;
1.3.12.2! misho 147: siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
1.2 misho 148: if (siz == -1) {
149: LOGERR;
150: return NULL;
1.3 misho 151: } else
1.3.12.2! misho 152: mqtt_msgFree(&cli->buf, 0);
1.2 misho 153:
154: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
155: return NULL;
156: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
157: return NULL;
158:
159: /* receive suback */
160: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
161: if (siz == -1) {
162: LOGERR;
163: return NULL;
164: }
165: siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
166: if (siz == -1)
167: return NULL;
168: if (msgID != mid) {
169: free(qoses);
170: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid);
171: return NULL;
172: }
173:
174: return qoses;
175: }
176:
1.3.12.2! misho 177: #if 0
1.2 misho 178: /*
179: * mqtt_cli_Unsubscribe() - Unsubscribe from broker
180: *
181: * @cli = connected client
182: * @Topics = Topics for unsubscribes
183: * @msgID = Message ID
184: * @Dup = Duplicated request
185: * @QoS = Message QoS
186: * return: -1 error or 0 ok
187: */
188: int
189: mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics,
190: u_short msgID, u_char Dup, u_char QoS)
191: {
192: int siz = 0;
193:
194: if (!cli || !Topics)
195: return -1;
196:
197: /* send unsubscribe */
198: siz = mqtt_msgUNSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
199: if (siz == -1)
200: return -1;
201: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
202: if (siz == -1) {
203: LOGERR;
204: return -1;
1.3 misho 205: } else
206: memset(cli->buf->msg_base, 0, cli->buf->msg_len);
1.2 misho 207:
208: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
209: return -1;
210: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
211: return -1;
212:
213: /* receive unsuback */
214: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
215: if (siz == -1) {
216: LOGERR;
217: return -1;
218: }
219: siz = mqtt_readUNSUBACK(cli->buf);
220: if (siz == -1)
221: return -1;
222: if (msgID != siz) {
223: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
224: return -1;
225: }
226:
227: return 0;
228: }
229:
230: /*
231: * mqtt_cli_Publish() - Publish message to broker
232: *
233: * @cli = connected client
234: * @msgID = Message ID
235: * @Dup = Duplicated request
236: * @QoS = Message QoS
237: * @Retain = Retain message
238: * @csTopic = Topic
239: * @pData = Data
240: * @datLen = Data length
241: * return: -1 error or > -1 sended bytes
242: */
243: int
244: mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain,
245: const char *csTopic, const void *pData, int datLen)
246: {
247: int wlen = 0, siz = 0;
248:
249: if (!cli || !csTopic)
250: return -1;
251:
252: /* send publish */
253: siz = mqtt_msgPUBLISH(cli->buf, csTopic, msgID, Dup, QoS, Retain, pData, datLen);
254: if (siz == -1)
255: return -1;
256: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
257: if (siz == -1) {
258: LOGERR;
259: return -1;
1.3 misho 260: } else {
1.2 misho 261: wlen = siz;
1.3 misho 262: memset(cli->buf->msg_base, 0, cli->buf->msg_len);
263: }
1.2 misho 264:
265: if (QoS == MQTT_QOS_ONCE) /* no reply */
266: goto end;
267:
268: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
269: return -1;
270: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
271: return -1;
272:
273: /* receive PUBxxx */
274: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
275: if (siz == -1) {
276: LOGERR;
277: return -1;
278: }
279:
280: if (QoS == MQTT_QOS_ACK) { /* reply with PUBACK */
281: siz = mqtt_readPUBACK(cli->buf);
282: if (siz == -1)
283: return -1;
284: if (msgID != siz) {
285: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
286: return -1;
287: }
288: goto end;
289: } else { /* reply with PUBREC */
290: siz = mqtt_readPUBREC(cli->buf);
291: if (siz == -1)
292: return -1;
293: if (msgID != siz) {
294: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
295: return -1;
296: }
297: }
298:
299: do {
300: /* send publish release QoS == 2 */
301: siz = mqtt_msgPUBREL(cli->buf, msgID);
302: if (siz == -1)
303: return -1;
304: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
305: if (siz == -1) {
306: LOGERR;
307: return -1;
1.3 misho 308: } else
309: memset(cli->buf->msg_base, 0, cli->buf->msg_len);
1.2 misho 310:
311: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
312: return -1;
313: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
314: if (Dup++ > 1)
315: return -1;
316: else
317: continue;
318: }
319:
320: /* receive PUBCOMP */
1.3 misho 321: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
322: if (siz == -1) {
323: LOGERR;
324: return -1;
325: }
326:
1.2 misho 327: siz = mqtt_readPUBCOMP(cli->buf);
328: if (siz == -1)
329: return -1;
330: if (msgID != siz) {
331: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
332: if (Dup++ > 1)
333: return -1;
334: else
335: continue;
336: }
337: } while (0);
338:
339: end:
340: return wlen;
341: }
1.3.12.2! misho 342: #endif
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>