Annotation of libaitmqtt/src/cliside.c, revision 1.3
1.2 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.3 ! misho 6: * $Id: cliside.c,v 1.2.2.1 2012/06/28 09:01:42 misho Exp $
1.2 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * mqtt_cli_Open() - Open client connection to MQTT broker
51: *
52: * @addr = brokers address
53: * @timeout = timeout
54: * return: NULL error or !=NULL connected to broker
55: */
56: mqtt_cli_t *
57: mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
58: {
59: mqtt_cli_t *cli;
60:
61: if (!addr)
62: return NULL;
63:
64: cli = malloc(sizeof(mqtt_cli_t));
65: if (!cli) {
66: LOGERR;
67: return NULL;
68: } else
69: memset(cli, 0, sizeof(mqtt_cli_t));
70:
71: cli->timeout = timeout;
72: cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
73: if (cli->sock == -1) {
74: LOGERR;
75: free(cli);
76: return NULL;
77: }
78: if (connect(cli->sock, addr, addr->sa_len) == -1) {
79: LOGERR;
80: close(cli->sock);
81: free(cli);
82: return NULL;
83: }
84:
85: cli->buf = mqtt_msgAlloc(USHRT_MAX);
86: if (!cli->buf) {
87: close(cli->sock);
88: free(cli);
89: return NULL;
90: }
91:
92: return cli;
93: }
94:
95: /*
96: * mqtt_cli_Close() - Close client connection
97: *
98: * @cli = connected client
99: * return: -1 error or 0 disconnected client and freed all resources
100: */
101: int
102: mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
103: {
104: int siz = 0;
105:
106: if (!cli || !*cli)
107: return -1;
108:
109: /* send disconnect */
110: siz = mqtt_msgDISCONNECT((*cli)->buf);
111: if (siz > -1) {
112: siz = send((*cli)->sock, (*cli)->buf->msg_base, siz, MSG_NOSIGNAL);
113: if (siz > -1)
114: shutdown((*cli)->sock, SHUT_RDWR);
115: }
116: close((*cli)->sock);
117:
118: mqtt_msgFree(&(*cli)->buf, 42);
119:
120: free(*cli);
121: *cli = NULL;
122: return 0;
123: }
124:
125: /*
126: * mqtt_cli_Subscribe() - Subscribe to broker
127: *
128: * @cli = connected client
129: * @Topics = Topics for subscribes
130: * @msgID = Message ID
131: * @Dup = Duplicated request
132: * @QoS = Message QoS
133: * return: NULL error or !=NULL allocated array with subscribed QoS responses,
134: * must be free() result!
135: */
136: u_char *
137: mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics,
138: u_short msgID, u_char Dup, u_char QoS)
139: {
140: int siz = 0;
141: u_short mid = 0;
142: u_char *qoses = NULL;
143:
144: if (!cli || !Topics)
145: return NULL;
146:
147: /* send subscribe */
148: siz = mqtt_msgSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
149: if (siz == -1)
150: return NULL;
151: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
152: if (siz == -1) {
153: LOGERR;
154: return NULL;
1.3 ! misho 155: } else
! 156: memset(cli->buf->msg_base, 0, cli->buf->msg_len);
1.2 misho 157:
158: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
159: return NULL;
160: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
161: return NULL;
162:
163: /* receive suback */
164: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
165: if (siz == -1) {
166: LOGERR;
167: return NULL;
168: }
169: siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
170: if (siz == -1)
171: return NULL;
172: if (msgID != mid) {
173: free(qoses);
174: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid);
175: return NULL;
176: }
177:
178: return qoses;
179: }
180:
181: /*
182: * mqtt_cli_Unsubscribe() - Unsubscribe from broker
183: *
184: * @cli = connected client
185: * @Topics = Topics for unsubscribes
186: * @msgID = Message ID
187: * @Dup = Duplicated request
188: * @QoS = Message QoS
189: * return: -1 error or 0 ok
190: */
191: int
192: mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics,
193: u_short msgID, u_char Dup, u_char QoS)
194: {
195: int siz = 0;
196:
197: if (!cli || !Topics)
198: return -1;
199:
200: /* send unsubscribe */
201: siz = mqtt_msgUNSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
202: if (siz == -1)
203: return -1;
204: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
205: if (siz == -1) {
206: LOGERR;
207: return -1;
1.3 ! misho 208: } else
! 209: memset(cli->buf->msg_base, 0, cli->buf->msg_len);
1.2 misho 210:
211: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
212: return -1;
213: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
214: return -1;
215:
216: /* receive unsuback */
217: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
218: if (siz == -1) {
219: LOGERR;
220: return -1;
221: }
222: siz = mqtt_readUNSUBACK(cli->buf);
223: if (siz == -1)
224: return -1;
225: if (msgID != siz) {
226: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
227: return -1;
228: }
229:
230: return 0;
231: }
232:
233: /*
234: * mqtt_cli_Publish() - Publish message to broker
235: *
236: * @cli = connected client
237: * @msgID = Message ID
238: * @Dup = Duplicated request
239: * @QoS = Message QoS
240: * @Retain = Retain message
241: * @csTopic = Topic
242: * @pData = Data
243: * @datLen = Data length
244: * return: -1 error or > -1 sended bytes
245: */
246: int
247: mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain,
248: const char *csTopic, const void *pData, int datLen)
249: {
250: int wlen = 0, siz = 0;
251:
252: if (!cli || !csTopic)
253: return -1;
254:
255: /* send publish */
256: siz = mqtt_msgPUBLISH(cli->buf, csTopic, msgID, Dup, QoS, Retain, pData, datLen);
257: if (siz == -1)
258: return -1;
259: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
260: if (siz == -1) {
261: LOGERR;
262: return -1;
1.3 ! misho 263: } else {
1.2 misho 264: wlen = siz;
1.3 ! misho 265: memset(cli->buf->msg_base, 0, cli->buf->msg_len);
! 266: }
1.2 misho 267:
268: if (QoS == MQTT_QOS_ONCE) /* no reply */
269: goto end;
270:
271: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
272: return -1;
273: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
274: return -1;
275:
276: /* receive PUBxxx */
277: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
278: if (siz == -1) {
279: LOGERR;
280: return -1;
281: }
282:
283: if (QoS == MQTT_QOS_ACK) { /* reply with PUBACK */
284: siz = mqtt_readPUBACK(cli->buf);
285: if (siz == -1)
286: return -1;
287: if (msgID != siz) {
288: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
289: return -1;
290: }
291: goto end;
292: } else { /* reply with PUBREC */
293: siz = mqtt_readPUBREC(cli->buf);
294: if (siz == -1)
295: return -1;
296: if (msgID != siz) {
297: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
298: return -1;
299: }
300: }
301:
302: do {
303: /* send publish release QoS == 2 */
304: siz = mqtt_msgPUBREL(cli->buf, msgID);
305: if (siz == -1)
306: return -1;
307: siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
308: if (siz == -1) {
309: LOGERR;
310: return -1;
1.3 ! misho 311: } else
! 312: memset(cli->buf->msg_base, 0, cli->buf->msg_len);
1.2 misho 313:
314: if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
315: return -1;
316: } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
317: if (Dup++ > 1)
318: return -1;
319: else
320: continue;
321: }
322:
323: /* receive PUBCOMP */
1.3 ! misho 324: siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
! 325: if (siz == -1) {
! 326: LOGERR;
! 327: return -1;
! 328: }
! 329:
1.2 misho 330: siz = mqtt_readPUBCOMP(cli->buf);
331: if (siz == -1)
332: return -1;
333: if (msgID != siz) {
334: mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz);
335: if (Dup++ > 1)
336: return -1;
337: else
338: continue;
339: }
340: } while (0);
341:
342: end:
343: return wlen;
344: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>