1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: cliside.c,v 1.3.12.4 2022/09/16 04:18:17 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004 - 2022
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_cli_Open() - Open client connection to MQTT broker
51: *
52: * @addr = brokers address
53: * @timeout = timeout
54: * return: NULL error or !=NULL connected to broker
55: */
56: mqtt_cli_t *
57: mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
58: {
59: mqtt_cli_t *cli;
60:
61: if (!addr)
62: return NULL;
63:
64: cli = e_malloc(sizeof(mqtt_cli_t));
65: if (!cli) {
66: LOGERR;
67: return NULL;
68: } else
69: memset(cli, 0, sizeof(mqtt_cli_t));
70:
71: cli->timeout = timeout;
72: cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
73: if (cli->sock == -1) {
74: LOGERR;
75: e_free(cli);
76: return NULL;
77: }
78: #ifndef __linux__
79: if (connect(cli->sock, addr, addr->sa_len) == -1) {
80: #else
81: if (connect(cli->sock, addr, addr->sa_family == AF_INET6 ?
82: sizeof(struct sockaddr_in6) :
83: sizeof(struct sockaddr_in)) == -1) {
84: #endif
85: LOGERR;
86: close(cli->sock);
87: e_free(cli);
88: return NULL;
89: }
90:
91: return cli;
92: }
93:
94: /*
95: * mqtt_cli_Close() - Close client connection
96: *
97: * @cli = connected client
98: * return: -1 error or 0 disconnected client and freed all resources
99: */
100: int
101: mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
102: {
103: int siz;
104:
105: if (!cli || !*cli)
106: return -1;
107:
108: /* send disconnect */
109: (*cli)->buf = mqtt_msgDISCONNECT();
110: if ((*cli)->buf) {
111: siz = send((*cli)->sock, (*cli)->buf->msg_base, (*cli)->buf->msg_len, MSG_NOSIGNAL);
112: if (siz > -1)
113: shutdown((*cli)->sock, SHUT_RDWR);
114: }
115: close((*cli)->sock);
116:
117: mqtt_msgFree(&(*cli)->buf, 0);
118:
119: e_free(*cli);
120: *cli = NULL;
121: return 0;
122: }
123:
124: /*
125: * mqtt_cli_Subscribe() - Subscribe to broker
126: *
127: * @cli = connected client
128: * @Topics = Topics for subscribes
129: * @msgID = Message ID
130: * return: NULL error or !=NULL allocated array with subscribed QoS responses,
131: * must be e_free() result!
132: */
133: u_char *
134: mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics, u_short msgID)
135: {
136: int siz = 0;
137: u_short mid = 0;
138: u_char *qoses = NULL;
139:
140: if (!cli || !Topics)
141: return NULL;
142:
143: /* send subscribe */
144: cli->buf = mqtt_msgSUBSCRIBE(Topics, msgID);
145: if (!cli->buf)
146: return NULL;
147: siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
148: if (siz == -1) {
149: LOGERR;
150: return NULL;
151: } else
152: mqtt_msgFree(&cli->buf, 0);
153:
154: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
155: return NULL;
156: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
157: return NULL;
158:
159: /* receive suback */
160: cli->buf = mqtt_msgAlloc(BUFSIZ);
161: if (!cli->buf)
162: return NULL;
163: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
164: if (siz == -1) {
165: LOGERR;
166: mqtt_msgFree(&cli->buf, 0);
167: return NULL;
168: }
169: siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
170: if (siz == -1) {
171: mqtt_msgFree(&cli->buf, 0);
172: return NULL;
173: }
174: if (msgID != mid) {
175: if (qoses)
176: e_free(qoses);
177: mqtt_msgFree(&cli->buf, 0);
178: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid);
179: return NULL;
180: }
181:
182: mqtt_msgFree(&cli->buf, 0);
183: return qoses;
184: }
185:
186: /*
187: * mqtt_cli_Unsubscribe() - Unsubscribe from broker
188: *
189: * @cli = connected client
190: * @Topics = Topics for unsubscribes
191: * @msgID = Message ID
192: * @Dup = Duplicated request
193: * @QoS = Message QoS
194: * return: -1 error or 0 ok
195: */
196: int
197: mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics,
198: u_short msgID, u_char Dup, u_char QoS)
199: {
200: int siz = 0;
201:
202: if (!cli || !Topics)
203: return -1;
204:
205: /* send unsubscribe */
206: cli->buf = mqtt_msgUNSUBSCRIBE(Topics, msgID, Dup, QoS);
207: if (!cli->buf)
208: return -1;
209: siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
210: if (siz == -1) {
211: LOGERR;
212: mqtt_msgFree(&cli->buf, 0);
213: return -1;
214: } else
215: mqtt_msgFree(&cli->buf, 0);
216:
217: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
218: return -1;
219: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
220: return -1;
221:
222: /* receive unsuback */
223: cli->buf = mqtt_msgAlloc(BUFSIZ);
224: if (!cli->buf)
225: return -1;
226: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
227: if (siz == -1) {
228: LOGERR;
229: mqtt_msgFree(&cli->buf, 0);
230: return -1;
231: }
232: siz = mqtt_readUNSUBACK(cli->buf);
233: if (siz == -1) {
234: mqtt_msgFree(&cli->buf, 0);
235: return -1;
236: }
237: if (msgID != siz) {
238: mqtt_msgFree(&cli->buf, 0);
239: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
240: return -1;
241: }
242:
243: mqtt_msgFree(&cli->buf, 0);
244: return 0;
245: }
246:
247: /*
248: * mqtt_cli_Publish() - Publish message to broker
249: *
250: * @cli = connected client
251: * @msgID = Message ID
252: * @Dup = Duplicated request
253: * @QoS = Message QoS
254: * @Retain = Retain message
255: * @csTopic = Topic
256: * @pData = Data
257: * @datLen = Data length
258: * return: -1 error or > -1 sended bytes
259: */
260: int
261: mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain,
262: const char *csTopic, const void *pData, int datLen)
263: {
264: int wlen = 0, siz = 0;
265:
266: if (!cli || !csTopic)
267: return -1;
268:
269: /* send publish */
270: cli->buf = mqtt_msgPUBLISH(csTopic, msgID, Dup, QoS, Retain, pData, datLen);
271: if (!cli->buf)
272: return -1;
273: wlen = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
274: if (wlen == -1) {
275: LOGERR;
276: mqtt_msgFree(&cli->buf, 0);
277: return -1;
278: } else
279: mqtt_msgFree(&cli->buf, 0);
280:
281: if (QoS == MQTT_QOS_ONCE) /* no reply */
282: goto end;
283:
284: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
285: return -1;
286: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
287: return -1;
288:
289: /* receive PUBxxx */
290: cli->buf = mqtt_msgAlloc(BUFSIZ);
291: if (!cli->buf)
292: return -1;
293: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
294: if (siz == -1) {
295: LOGERR;
296: mqtt_msgFree(&cli->buf, 0);
297: return -1;
298: }
299:
300: if (QoS == MQTT_QOS_ACK) { /* reply with PUBACK */
301: siz = mqtt_readPUBACK(cli->buf);
302: if (siz == -1) {
303: mqtt_msgFree(&cli->buf, 0);
304: return -1;
305: }
306: if (msgID != siz) {
307: mqtt_msgFree(&cli->buf, 0);
308: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
309: return -1;
310: }
311: mqtt_msgFree(&cli->buf, 0);
312: goto end;
313: } else { /* reply with PUBREC */
314: siz = mqtt_readPUBREC(cli->buf);
315: if (siz == -1) {
316: mqtt_msgFree(&cli->buf, 0);
317: return -1;
318: }
319: if (msgID != siz) {
320: mqtt_msgFree(&cli->buf, 0);
321: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
322: return -1;
323: }
324: mqtt_msgFree(&cli->buf, 0);
325: }
326:
327: do {
328: /* send publish release QoS == 2 */
329: cli->buf = mqtt_msgPUBREL(msgID);
330: if (!cli->buf)
331: return -1;
332: siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL);
333: if (siz == -1) {
334: LOGERR;
335: mqtt_msgFree(&cli->buf, 0);
336: return -1;
337: } else
338: mqtt_msgFree(&cli->buf, 0);
339:
340: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
341: return -1;
342: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
343: if (Dup++ > 1)
344: return -1;
345: else
346: continue;
347: }
348:
349: /* receive PUBCOMP */
350: cli->buf = mqtt_msgAlloc(BUFSIZ);
351: if (!cli->buf)
352: return -1;
353: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
354: if (siz == -1) {
355: LOGERR;
356: mqtt_msgFree(&cli->buf, 0);
357: return -1;
358: }
359:
360: siz = mqtt_readPUBCOMP(cli->buf);
361: if (siz == -1) {
362: mqtt_msgFree(&cli->buf, 0);
363: return -1;
364: }
365: if (msgID != siz) {
366: mqtt_msgFree(&cli->buf, 0);
367: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
368: if (Dup++ > 1)
369: return -1;
370: else
371: continue;
372: }
373: mqtt_msgFree(&cli->buf, 0);
374: } while (0);
375:
376: end:
377: return wlen;
378: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>