1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: mqttd_calls.c,v 1.5 2017/10/08 22:49:25 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47: #include "mqttd.h"
48: #include "utils.h"
49: #include "rtlm.h"
50: #include "mqttd_calls.h"
51:
52:
53: static inline ait_val_t *
54: mkPkt(void * __restrict data, int dlen)
55: {
56: ait_val_t *p = NULL;
57:
58: if (!(p = ait_allocVar())) {
59: EVERBOSE(7, "Error:: in send packet prepare #%d - %s",
60: elwix_GetErrno(), elwix_GetError());
61: return NULL;
62: }
63:
64: if (data && dlen > 0)
65: AIT_SET_BUF(p, data, dlen);
66:
67: return p;
68: }
69:
70: static inline void
71: freePkt(ait_val_t ** __restrict p)
72: {
73: if (!p)
74: return;
75:
76: ait_freeVar(p);
77: }
78:
79: static void *
80: sendPacket(sched_task_t *task)
81: {
82: ait_val_t *p = TASK_ARG(task);
83: register int n, slen;
84: u_char *pos;
85:
86: if (!p || AIT_ISEMPTY(p)) {
87: EVERBOSE(9, "Error:: invalid packet or found empty content ...");
88: return NULL;
89: }
90:
91: EVERBOSE(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task));
92:
93: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
94: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
95: if (n == -1) {
96: ESYSERR(0);
97: break;
98: }
99: }
100:
101: freePkt(&p);
102: return NULL;
103: }
104:
105: static int
106: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
107: {
108: regex_t re;
109: regmatch_t match;
110: ait_val_t *p = NULL;
111: struct tagSession *s = NULL;
112: struct tagStore *st_, *st = NULL;
113: char szStr[STRSIZ];
114: int ret;
115:
116: assert(sess);
117:
118: TAILQ_FOREACH(s, &Sessions, sess_node) {
119: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
120: /* check for QoS */
121: if (st->st_subscr.sub_ret >= qos) {
122: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
123: regerror(ret, &re, szStr, sizeof szStr);
124: regfree(&re);
125: EVERBOSE(3, "Error:: regcomp(%s) %s\n", (char*)
126: st->st_subscr.sub_topic.msg_base, szStr);
127: }
128: if (!regexec(&re, topic, 1, &match, 0)) {
129: /* MATCH */
130: p = mkPkt(sess->sess_buf->msg_base, datlen);
131: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
132: }
133:
134: regfree(&re);
135: }
136: }
137: }
138:
139: return 0;
140: }
141:
142: /* --------------------------------------------------- */
143:
144: void *
145: sendRetain(sched_task_t *task)
146: {
147: mqtt_subscr_t *subs, *s;
148: struct tagSession *sess;
149: int siz;
150:
151: ETRACE();
152:
153: assert(task);
154:
155: sess = TASK_ARG(task);
156: assert(sess);
157:
158: if (!sess->sess_buf) {
159: EVERBOSE(9, "WARNING! No allocated buffer!?!\n");
160: return NULL;
161: }
162:
163: subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1);
164: if (!subs)
165: return NULL;
166:
167: for (s = subs; s && s->sub_topic.msg_base; s++) {
168: siz = s->sub_value.msg_len;
169: memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base,
170: MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
171: EVERBOSE(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n",
172: siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
173: if (siz > 0)
174: search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
175: }
176:
177: mqtt_subFree(&subs);
178: return NULL;
179: }
180:
181: int
182: pubWill(struct tagSession * __restrict sess)
183: {
184: int datlen;
185:
186: ETRACE();
187:
188: /* prepare will packet */
189: datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0,
190: sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
191: if (datlen == -1)
192: return -1; /* error */
193:
194: return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
195: }
196:
197: static int
198: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
199: {
200: return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
201: }
202:
203: static int
204: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
205: {
206: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
207:
208: /* write topic to database */
209: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
210: sess->sess_addr, hdr->mqtt_msg.retain);
211: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
212: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
213: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
214:
215: search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
216:
217: /* delete not retain message */
218: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
219: sess->sess_addr, 0);
220: return 0;
221: }
222:
223: static int
224: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
225: {
226: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
227:
228: /* write topic to database */
229: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
230: sess->sess_addr, hdr->mqtt_msg.retain);
231: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
232: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
233: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
234:
235: return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
236: }
237:
238:
239: int
240: cmdPUBLISH(void *srv, int len, void *arg)
241: {
242: struct mqtthdr *hdr;
243: struct tagSession *sess = (struct tagSession*) arg;
244: char szTopic[STRSIZ] = { 0 };
245: int siz = 0;
246: u_short mid = 0;
247: ait_val_t *p = NULL;
248:
249: ETRACE();
250:
251: if (!sess)
252: return -1;
253:
254: EVERBOSE(5, "Exec PUBLISH session");
255: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
256: if (siz == -1) {
257: EVERBOSE(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
258: return 0;
259: }
260:
261: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
262: switch (hdr->mqtt_msg.qos) {
263: case MQTT_QOS_ACK:
264: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
265: return 0;
266: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
267: if (siz == -1) {
268: EVERBOSE(5, "Error:: in msgPUBACK #%d - %s",
269: mqtt_GetErrno(), mqtt_GetError());
270: return 0;
271: }
272: break;
273: case MQTT_QOS_EXACTLY:
274: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
275: return 0;
276: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
277: if (siz == -1) {
278: EVERBOSE(5, "Error:: in msgPUBREC #%d - %s",
279: mqtt_GetErrno(), mqtt_GetError());
280: return 0;
281: }
282: break;
283: case MQTT_QOS_ONCE:
284: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
285: default:
286: return 0;
287: }
288:
289: p = mkPkt(sess->sess_buf->msg_base, siz);
290: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
291: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
292: return 0;
293: }
294:
295: int
296: cmdPUBREL(void *srv, int len, void *arg)
297: {
298: struct tagSession *sess = (struct tagSession*) arg;
299: int siz = 0;
300: u_short mid = 0;
301: ait_val_t *p = NULL;
302:
303: ETRACE();
304:
305: if (!sess)
306: return -1;
307:
308: EVERBOSE(5, "Exec PUBREL session");
309: mid = mqtt_readPUBREL(sess->sess_buf);
310: if (mid == (u_short) -1) {
311: EVERBOSE(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
312: return 0;
313: }
314:
315: /* delete not retain message */
316: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user,
317: sess->sess_addr, 0);
318:
319: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
320: if (siz == -1) {
321: EVERBOSE(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
322: return 0;
323: }
324:
325: p = mkPkt(sess->sess_buf->msg_base, siz);
326: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
327: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
328: return 0;
329: }
330:
331: int
332: cmdSUBSCRIBE(void *srv, int len, void *arg)
333: {
334: struct tagSession *sess = (struct tagSession*) arg;
335: mqtt_subscr_t *subs = NULL;
336: int siz = 0;
337: u_short mid = 0;
338: register int i;
339: struct tagStore *store;
340: char buf[BUFSIZ];
341: void *ptr;
342: ait_val_t *p = NULL;
343:
344: ETRACE();
345:
346: if (!sess)
347: return -1;
348:
349: EVERBOSE(5, "Exec SUBSCRIBE session");
350: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
351: if (siz == -1) {
352: EVERBOSE(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
353: return 0;
354: }
355:
356: /* add to db */
357: for (i = 0; i < siz; i++) {
358: store = e_malloc(sizeof(struct tagStore));
359: if (!store) {
360: ELIBERR(elwix);
361: continue;
362: } else {
363: store->st_msgid = mid;
364: mqtt_subCopy(&store->st_subscr, &subs[i]);
365: subs[i].sub_ret = MQTT_QOS_DENY;
366: }
367:
368: /* add to cache */
369: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
370:
371: /* convert topic to regexp */
372: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
373: EVERBOSE(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
374: } else {
375: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
376: if (!ptr) {
377: ESYSERR(0);
378: continue;
379: } else {
380: store->st_subscr.sub_topic.msg_base = ptr;
381: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
382: memcpy(store->st_subscr.sub_topic.msg_base, buf,
383: store->st_subscr.sub_topic.msg_len);
384: }
385:
386: /* store to db */
387: call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
388: sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
389: /* subscribe pass */
390: subs[i].sub_ret = MQTT_QOS_PASS;
391:
392: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid,
393: store->st_subscr.sub_topic.msg_base,
394: store->st_subscr.sub_topic.msg_len,
395: store->st_subscr.sub_ret, sess->sess_addr);
396: }
397: }
398:
399: /* send acknowledge */
400: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
401: if (siz == -1) {
402: EVERBOSE(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
403: goto end;
404: } else {
405: p = mkPkt(sess->sess_buf->msg_base, siz);
406: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
407: }
408:
409: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
410: end:
411: mqtt_subFree(&subs);
412: return 0;
413: }
414:
415: int
416: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
417: {
418: struct tagSession *sess = (struct tagSession*) arg;
419: mqtt_subscr_t *subs = NULL;
420: int siz = 0;
421: u_short mid = 0;
422: register int i;
423: struct tagStore *store, *tmp;
424: ait_val_t *p = NULL;
425:
426: ETRACE();
427:
428: if (!sess)
429: return -1;
430:
431: EVERBOSE(5, "Exec UNSUBSCRIBE session");
432: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
433: if (siz == -1) {
434: EVERBOSE(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
435: return 0;
436: }
437:
438: /* del from db */
439: for (i = 0; i < siz; i++) {
440: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
441: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
442: store->st_subscr.sub_topic.msg_base &&
443: !strcmp(store->st_subscr.sub_topic.msg_base,
444: subs[i].sub_topic.msg_base)) {
445: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
446:
447: if (store->st_subscr.sub_topic.msg_base)
448: free(store->st_subscr.sub_topic.msg_base);
449: if (store->st_subscr.sub_value.msg_base)
450: free(store->st_subscr.sub_value.msg_base);
451: e_free(store);
452: }
453: }
454:
455: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
456: sess->sess_user, "%");
457: }
458:
459: /* send acknowledge */
460: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
461: if (siz == -1) {
462: EVERBOSE(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
463: goto end;
464: } else {
465: p = mkPkt(sess->sess_buf->msg_base, siz);
466: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
467: }
468:
469: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
470: end:
471: mqtt_subFree(&subs);
472: return 0;
473: }
474:
475: int
476: cmdPINGREQ(void *srv, int len, void *arg)
477: {
478: struct tagSession *sess = (struct tagSession*) arg;
479: int siz = 0;
480: ait_val_t *p = NULL;
481:
482: ETRACE();
483:
484: if (!sess)
485: return -1;
486:
487: EVERBOSE(5, "Exec PINGREQ session");
488: siz = mqtt_msgPINGRESP(sess->sess_buf);
489: if (siz == -1) {
490: EVERBOSE(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
491: return 0;
492: } else {
493: p = mkPkt(sess->sess_buf->msg_base, siz);
494: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
495: }
496:
497: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
498: return 0;
499: }
500:
501: int
502: cmdCONNECT(void *srv, int len, void *arg)
503: {
504: struct tagStore *store;
505: struct tagSession *sess = (struct tagSession*) arg;
506:
507: ETRACE();
508:
509: if (!sess)
510: return -1;
511:
512: EVERBOSE(5, "Exec CONNECT session");
513: TAILQ_REMOVE(&Sessions, sess, sess_node);
514:
515: schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
516:
517: if (sess->sess_clean) {
518: if (call.FiniSessPUB)
519: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
520: if (call.DeletePUB_subscribe)
521: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
522: if (call.WipePUB_topic)
523: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
524: }
525:
526: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
527: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
528:
529: if (store->st_subscr.sub_topic.msg_base)
530: free(store->st_subscr.sub_topic.msg_base);
531: if (store->st_subscr.sub_value.msg_base)
532: free(store->st_subscr.sub_value.msg_base);
533:
534: e_free(store);
535: }
536:
537: if (sess->sess_will.flag)
538: pubWill(sess);
539:
540: if (sess->sess_will.msg)
541: free(sess->sess_will.msg);
542: if (sess->sess_will.topic)
543: free(sess->sess_will.topic);
544:
545: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
546: sess->sess_addr, sess->sess_user);
547:
548: return -3; /* reconnect client */
549: }
550:
551: int
552: cmdDISCONNECT(void *srv, int len, void *arg)
553: {
554: struct tagSession *sess = (struct tagSession*) arg;
555:
556: ETRACE();
557:
558: if (!sess)
559: return -1;
560:
561: EVERBOSE(5, "Exec DISCONNECT session");
562:
563: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
564: sess->sess_addr, sess->sess_user);
565:
566: return -2; /* must terminate dispatcher */
567: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>