Annotation of libaitmqtt/src/aitmqtt.c, revision 1.1.1.1.2.12
1.1 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.1.1.1.2.12! misho 6: * $Id: aitmqtt.c,v 1.1.1.1.2.11 2012/04/27 16:17:11 misho Exp $
1.1 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.1.1.1.2.3 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.1 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: #pragma GCC visibility push(hidden)
50:
51: int mqtt_Errno;
52: char mqtt_Error[STRSIZ];
53:
54: #pragma GCC visibility pop
55:
56: // mqtt_GetErrno() Get error code of last operation
57: inline int
58: mqtt_GetErrno()
59: {
60: return mqtt_Errno;
61: }
62:
63: // mqtt_GetError() Get error text of last operation
64: inline const char *
65: mqtt_GetError()
66: {
67: return mqtt_Error;
68: }
69:
70: // mqtt_SetErr() Set error to variables for internal use!!!
71: inline void
72: mqtt_SetErr(int eno, char *estr, ...)
73: {
74: va_list lst;
75:
76: mqtt_Errno = eno;
77: memset(mqtt_Error, 0, sizeof mqtt_Error);
78: va_start(lst, estr);
79: vsnprintf(mqtt_Error, sizeof mqtt_Error, estr, lst);
80: va_end(lst);
81: }
82:
83: #pragma GCC visibility push(hidden)
1.1.1.1.2.4 misho 84: /* _mqtt_readHEADER() read fixed header from MQTT message */
1.1 misho 85: inline struct mqtthdr *
86: _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *bytes, int *len)
87: {
88: struct mqtthdr *hdr;
89:
90: if (!buf || !buf->msg_base || !buf->msg_len)
91: return NULL;
92:
93: hdr = (struct mqtthdr*) buf->msg_base;
94: if (hdr->mqtt_msg.type != cmd) {
95: mqtt_SetErr(EINVAL, "Error:: wrong command #%d should be %d",
96: hdr->mqtt_msg.type, cmd);
97: return NULL;
98: }
99:
100: *len = mqtt_decodeLen(hdr->mqtt_len, bytes);
101: return hdr;
102: }
103: #pragma GCC visibility pop
104:
105:
106: /*
107: * mqtt_msgFree() Free MQTT message
108: *
109: * @msg = Message buffer
110: * @all = !=0 Destroy entire message, if MQTT Message allocated with mqtt_msgAlloc()
111: * return: none
112: */
113: inline void
114: mqtt_msgFree(mqtt_msg_t ** __restrict msg, int all)
115: {
116: if (msg && *msg) {
117: if ((*msg)->msg_base) {
118: free((*msg)->msg_base);
119: (*msg)->msg_base = NULL;
120: }
121: if (all) {
122: free(*msg);
123: *msg = NULL;
124: } else
125: (*msg)->msg_len ^= (*msg)->msg_len;
126: }
127: }
128:
129: /*
130: * mqtt_msgAlloc() Allocate memory for MQTT Message
131: *
132: * @len = >0 Allocate buffer with length
133: * return: NULL error or Message, after use must call mqtt_msgFree() with all!=0
134: */
135: inline mqtt_msg_t *
136: mqtt_msgAlloc(u_short len)
137: {
138: mqtt_msg_t *m = NULL;
139:
140: m = malloc(sizeof(mqtt_msg_t));
141: if (!m) {
142: LOGERR;
143: return NULL;
144: } else
145: memset(m, 0, sizeof(mqtt_msg_t));
146:
147: if (len) {
148: m->msg_len = len;
149: m->msg_base = malloc(m->msg_len);
150: if (!m->msg_base) {
151: LOGERR;
152: free(m);
153: return NULL;
154: } else
155: memset(m->msg_base, 0, m->msg_len);
156: }
157:
158: return m;
159: }
160:
161: /*
162: * mqtt_msgRealloc() Reallocate MQTT message buffer
163: *
164: * @msg = MQTT message
165: * @len = new length
166: * return: -1 error or >-1 old buffer length
167: */
168: inline int
169: mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short len)
170: {
171: void *p = NULL;
172: int ret = 0;
173:
174: if (!msg)
175: return -1;
176:
1.1.1.1.2.5 misho 177: if (len <= msg->msg_len)
1.1 misho 178: return len;
179:
180: p = realloc(msg->msg_base, len);
181: if (!p) {
182: LOGERR;
183: return -1;
184: }
185:
186: ret = msg->msg_len;
187: msg->msg_len = len;
188: msg->msg_base = p;
189:
190: return ret;
191: }
192:
193: /*
194: * mqtt_encodeLen() Encode number to MQTT length field
195: *
196: * @num = number for encode
197: * return: -1 error or >-1 length
198: */
199: inline u_int
200: mqtt_encodeLen(u_int num)
201: {
202: register u_int dig, i;
203: u_int ret = 0;
204:
205: if (num > 268435455)
206: return (u_int) -1;
207:
208: for (i = 0; i < sizeof ret && num > 0; i++) {
209: dig = num % 0x80;
210: num /= 0x80;
211: if (num > 0)
212: dig |= 0x80;
213:
214: *((u_char*) &ret + i) = (u_char) dig;
215: }
216:
217: return ret;
218: }
219:
220: /*
221: * mqtt_decodeLen() Decode length from MQTT packet
222: *
223: * @len = length from MQTT header
224: * @n = sizeof bytes, if !=NULL
225: * return: -1 error, >-1 length of message
226: */
227: inline u_int
228: mqtt_decodeLen(void * __restrict len, int * __restrict n)
229: {
230: register u_int i, dig, mul;
231: u_int ret = 0;
232: u_char *p = (u_char*) len;
233:
234: if (!len)
235: return (u_int) -1;
236:
237: for (mul = 1, i = 0; i < sizeof ret; i++, mul *= 0x80) {
238: dig = p[i];
239: ret += (dig & 0x7f) * mul;
240:
241: if (!(dig & 0x80))
242: break;
243: }
244:
245: if (n)
246: *n = (char) (i & 0x7f) + 1;
247: return ret;
248: }
249:
250: /*
251: * mqtt_sizeLen Return sizeof len field
252: *
253: * @len = length
254: * return: -1 error, >-1 sizeof len in bytes
255: */
256: inline char
257: mqtt_sizeLen(u_int len)
258: {
259: register char i;
260: u_char *p = (u_char*) &len;
261:
262: if (len > 0xffffff7f)
263: return -1;
264:
265: for (i = 0; i < sizeof len; i++)
266: if (!(*(p + i) & 0x80))
267: break;
268:
269: return ++i;
270: }
271:
272: /*
1.1.1.1.2.7 misho 273: * mqtt_str2subs Create MQTT subscribe variable from string(s)
1.1 misho 274: *
1.1.1.1.2.7 misho 275: * @csStr = null terminated string array
276: * @strnum = copy at most number of strings elements
1.1 misho 277: * @qoses = QoS elements applied to subscribe variable,
278: * count of elements must be equal with csStr elements
279: * return: NULL error or != subscribe variables array, must be free after use with mqtt_freeSub()
280: */
281: inline mqtt_subscr_t *
1.1.1.1.2.7 misho 282: mqtt_str2subs(const char **csStr, u_short strnum, u_char *qoses)
1.1 misho 283: {
284: mqtt_subscr_t *v;
285: register int i, items;
286: const char **strs;
287:
288: if (!csStr)
289: return NULL;
1.1.1.1.2.8 misho 290:
291: for (items = 0, strs = csStr;
292: (!strnum || (strnum && items < strnum)) && *strs;
293: items++, strs++);
1.1 misho 294:
295: if (!(v = malloc((items + 1) * sizeof(mqtt_subscr_t)))) {
296: LOGERR;
297: return NULL;
298: } else
299: memset(v, 0, (items + 1) * sizeof(mqtt_subscr_t));
300:
301: for (i = 0; i < items; i++) {
302: v[i].sub_topic.msg_len = strlen(csStr[i]);
303: v[i].sub_topic.msg_base = (u_char*) strdup(csStr[i]);
304: if (qoses && qoses[i] < MQTT_QOS_RESERVED)
305: v[i].sub_ret = qoses[i];
306: }
307:
308: return v;
309: }
310:
311: /*
312: * mqtt_subFree() Free array from subscribe variables
313: *
314: * @subs = Subscribe variables
315: * return: none
316: */
317: inline void
318: mqtt_subFree(mqtt_subscr_t ** __restrict subs)
319: {
320: mqtt_subscr_t *v;
321:
322: if (!subs)
323: return;
324:
325: for (v = *subs; v->sub_topic.msg_base; v++) {
326: free(v->sub_topic.msg_base);
327: v->sub_topic.msg_base = NULL;
328: v->sub_topic.msg_len = 0;
329:
330: if (v->sub_value.msg_base) {
331: free(v->sub_value.msg_base);
332: v->sub_value.msg_base = NULL;
333: v->sub_value.msg_len = 0;
334: }
335: }
336:
337: free(*subs);
338: *subs = NULL;
339: }
340:
341: /*
342: * mqtt_subAlloc() Create array from subscribe variables
343: *
344: * @num = Number of elements
345: * return: NULL error or subscribe array, after use must call mqtt_subFree()
346: */
347: inline mqtt_subscr_t *
348: mqtt_subAlloc(u_short num)
349: {
350: mqtt_subscr_t *s = NULL;
351:
352: s = malloc((num + 1) * sizeof(mqtt_subscr_t));
353: if (!s) {
354: LOGERR;
355: return NULL;
356: } else
357: memset(s, 0, (num + 1) * sizeof(mqtt_subscr_t));
358:
359: return s;
360: }
361:
362: /*
363: * mqtt_subRealloc() Reallocate array from subscribe variables
364: *
365: * @subs = Subscribe array
366: * @num = Number of elements
367: * return: NULL error or subscribe array, after use must call mqtt_subFree()
368: */
369: inline mqtt_subscr_t *
1.1.1.1.2.9 misho 370: mqtt_subRealloc(mqtt_subscr_t ** __restrict subs, u_short num)
1.1 misho 371: {
372: mqtt_subscr_t *s = NULL;
373:
1.1.1.1.2.9 misho 374: if (!subs)
375: return NULL;
376:
377: s = realloc(*subs, (num + 1) * sizeof(mqtt_subscr_t));
1.1 misho 378: if (!s) {
379: LOGERR;
380: return NULL;
1.1.1.1.2.9 misho 381: } else {
382: memset(s + num, 0, sizeof(mqtt_subscr_t));
383: *subs = s;
1.1 misho 384: }
385:
1.1.1.1.2.9 misho 386: return *subs;
1.1 misho 387: }
1.1.1.1.2.1 misho 388:
389: /*
1.1.1.1.2.10 misho 390: * mqtt_subCopy() - Copy subscription structure to another one
391: *
392: * @dst = destination subscription
393: * @src = source subscription
394: * return: =NULL error or !=NULL successful copied a structure
395: */
396: inline mqtt_subscr_t *
397: mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subscr_t * __restrict src)
398: {
399: if (!dst || !src)
400: return NULL;
401:
402: if (src->sub_topic.msg_base) {
1.1.1.1.2.11 misho 403: dst->sub_topic.msg_base = malloc(src->sub_topic.msg_len + 1);
1.1.1.1.2.10 misho 404: if (!dst->sub_topic.msg_base) {
405: LOGERR;
406: memset(dst, 0, sizeof(mqtt_subscr_t));
407: return NULL;
408: } else {
409: dst->sub_topic.msg_len = src->sub_topic.msg_len;
1.1.1.1.2.11 misho 410: ((char*) dst->sub_topic.msg_base)[dst->sub_topic.msg_len] = 0;
1.1.1.1.2.10 misho 411: memcpy(dst->sub_topic.msg_base, src->sub_topic.msg_base,
412: dst->sub_topic.msg_len);
413: }
1.1.1.1.2.12! misho 414: } else {
! 415: if (dst->sub_topic.msg_base)
! 416: free(dst->sub_topic.msg_base);
! 417: dst->sub_topic.msg_base = NULL;
! 418: dst->sub_topic.msg_len = 0;
1.1.1.1.2.10 misho 419: }
420: if (src->sub_value.msg_base) {
1.1.1.1.2.11 misho 421: dst->sub_value.msg_base = malloc(src->sub_value.msg_len + 1);
1.1.1.1.2.10 misho 422: if (!dst->sub_value.msg_base) {
423: LOGERR;
424: if (dst->sub_topic.msg_base)
425: free(dst->sub_topic.msg_base);
426: memset(dst, 0, sizeof(mqtt_subscr_t));
427: return NULL;
428: } else {
429: dst->sub_value.msg_len = src->sub_value.msg_len;
1.1.1.1.2.11 misho 430: ((char*) dst->sub_value.msg_base)[dst->sub_value.msg_len] = 0;
1.1.1.1.2.10 misho 431: memcpy(dst->sub_value.msg_base, src->sub_value.msg_base,
432: dst->sub_value.msg_len);
433: }
1.1.1.1.2.12! misho 434: } else {
! 435: if (dst->sub_value.msg_base)
! 436: free(dst->sub_value.msg_base);
! 437: dst->sub_value.msg_base = NULL;
! 438: dst->sub_value.msg_len = 0;
1.1.1.1.2.10 misho 439: }
440:
441: dst->sub_ret = src->sub_ret;
442: return dst;
443: }
444:
445:
446: /*
1.1.1.1.2.1 misho 447: * mqtt_expandTopic() - Expanding topic to regular expression
448: *
449: * @csInput = Input topic
450: * @psRegEx = Output to regular expression
451: * @regexLen = Length of psRegEx
452: * @BOL = Begin of Line, if =0 not added
453: * @EOL = End of Line, if =0 not appended
454: * return: -1 error, 0 nothing expanded or >0 expanded bytes
455: */
456: int
457: mqtt_expandTopic(const char *csInput, char * __restrict psRegEx, int regexLen, u_char BOL, u_char EOL)
458: {
459: int ret = 0;
460: register int i;
461: char *pos, *s;
462: const char reROM[] = "[](){}^$\\-|?.+*";
463:
464: if (!csInput || !psRegEx || regexLen < 1)
465: return -1;
466: else
467: memset(psRegEx, 0, regexLen);
468:
469: /* check # */
470: for (i = 0, pos = (char*) csInput; *pos && i < 2; pos++)
471: if (*pos == '#')
472: i++;
473: if (i == 2) {
474: mqtt_SetErr(EINVAL, "Syntax error, multiple occurrences of #..#");
475: return -1;
476: }
477: if (i == 1 && (pos = strrchr(csInput, '#')))
478: if ((pos != csInput && *(pos - 1) != '/') || *(pos + 1)) {
479: mqtt_SetErr(EINVAL, "Syntax error, bad format of #");
480: return -1;
481: }
482: /* check + */
483: for (pos = (char*) csInput; *pos && (pos = strchr(pos, '+')); pos++)
484: if ((pos != csInput && *(pos - 1) != '/') || (*(pos + 1) && *(pos + 1) != '/')) {
485: mqtt_SetErr(EINVAL, "Syntax error, bad format of +");
486: return -1;
487: }
488:
489: /* BUILD REGEX */
490: s = psRegEx;
491: if (BOL) {
492: *s++ = '^';
493: ret++;
494: }
495: for (pos = (char*) csInput; s < psRegEx + regexLen && *pos; s++, pos++) {
496: if (*pos == '#') {
497: strlcat(s, ".*", regexLen - (s - psRegEx));
498: s++;
499: ret++;
500: break;
501: }
502: if (*pos == '+') {
503: if (*(pos + 1)) {
504: strlcat(s, ".*", regexLen - (s - psRegEx));
505: s++;
506: ret++;
507: continue;
508: } else {
509: strlcat(s, ".*/", regexLen - (s - psRegEx));
510: ret += 2;
511: break;
512: }
513: }
514: for (i = 0; i < sizeof reROM - 1; i++)
515: if (*pos == reROM[i] && regexLen - (s - psRegEx) - 1 > 0) {
516: *s++ = '\\';
517: ret++;
518: break;
519: }
520:
521: *s = *pos;
522: }
523: if (EOL) {
524: strlcat(psRegEx, "$", regexLen);
525: ret++;
526: }
527:
528: return ret;
529: }
1.1.1.1.2.2 misho 530:
531: /*
532: * mqtt_sqlTopic() - Expanding topic to SQL search string
533: *
534: * @csInput = Input topic
535: * @psSQL = Output to SQL search string
536: * @sqlLen = Length of psSQL
537: * return: -1 error, 0 changed bytes
538: */
539: int
540: mqtt_sqlTopic(const char *csInput, char * __restrict psSQL, int sqlLen)
541: {
542: int ret = 0;
543: register int i;
544: char *pos, *s;
545:
546: if (!csInput || !psSQL || sqlLen < 1)
547: return -1;
548: else
549: memset(psSQL, 0, sqlLen);
550:
551: /* check # */
552: for (i = 0, pos = (char*) csInput; *pos && i < 2; pos++)
553: if (*pos == '#')
554: i++;
555: if (i == 2) {
556: mqtt_SetErr(EINVAL, "Syntax error, multiple occurrences of #..#");
557: return -1;
558: }
559: if (i == 1 && (pos = strrchr(csInput, '#')))
560: if ((pos != csInput && *(pos - 1) != '/') || *(pos + 1)) {
561: mqtt_SetErr(EINVAL, "Syntax error, bad format of #");
562: return -1;
563: }
564: /* check + */
565: for (pos = (char*) csInput; *pos && (pos = strchr(pos, '+')); pos++)
566: if ((pos != csInput && *(pos - 1) != '/') || (*(pos + 1) && *(pos + 1) != '/')) {
567: mqtt_SetErr(EINVAL, "Syntax error, bad format of +");
568: return -1;
569: }
570:
571: /* BUILD SEARCH STRING */
572: s = psSQL;
573: for (pos = (char*) csInput; s < psSQL + sqlLen && *pos; s++, pos++) {
574: if (*pos == '#') {
575: *s = '%';
576: s++;
577: ret++;
578: break;
579: }
580: if (*pos == '+') {
581: if (*(pos + 1)) {
582: *s = '%';
583: ret++;
584: continue;
585: } else {
586: strlcat(s, "%/", sqlLen - (s - psSQL));
587: ret += 2;
588: break;
589: }
590: }
591: /*
592: for (i = 0; i < sizeof reROM - 1; i++)
593: if (*pos == reROM[i] && regexLen - (s - psRegEx) - 1 > 0) {
594: *s++ = '\\';
595: ret++;
596: break;
597: }
598: */
599:
600: *s = *pos;
601: }
602:
603: return ret;
604: }
1.1.1.1.2.6 misho 605:
606:
607: /*
608: * mqtt_KeepAlive() - Keep Alive check routine
609: *
610: * @sock = connected socket
611: * @ka = keep alive timeout
612: * @tries = tries for receive correct ping response, usually ==1
613: * return: -1 error, 0 host is alive, 1 timeout session or 2 broken session
614: */
615: int
616: mqtt_KeepAlive(int sock, u_short ka, u_char tries)
617: {
618: int ret = 0;
619: struct pollfd pfd;
620: mqtt_msg_t msg = { NULL, 0 };
621:
622: if (sock < 3)
623: return -1; /* error */
624:
625: pfd.fd = sock;
626: pfd.events = POLLOUT;
627: if ((ret = poll(&pfd, 1, ka * 1000)) == -1 ||
628: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
629: LOGERR;
630: return -1; /* error */
631: } else if (!ret)
632: return 1; /* session is abandoned ... must be disconnect! */
633: /* ping request */
634: if ((ret = mqtt_msgPINGREQ(&msg)) == -1)
635: return -1; /* error */
636: if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {
637: LOGERR;
638: goto end;
639: }
640:
641: pfd.events = POLLIN | POLLPRI;
642: while (tries--) {
643: if ((ret = poll(&pfd, 1, ka * 1000)) == -1 ||
644: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
645: LOGERR;
646: break;
647: } else if (!ret) {
648: ret = 1; /* session is abandoned ... must be disconnect! */
649: continue;
650: }
651: /* receive & decode packet */
652: if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) {
653: LOGERR;
654: break;
655: }
656: if (!mqtt_readPINGRESP(&msg)) {
657: ret = 0; /* Host is alive */
658: break;
659: } else
660: ret = 2; /* Session is broken ... must be disconnect! */
661: }
662: end:
663: free(msg.msg_base);
664: return ret;
665: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>