Annotation of mqtt/src/mqttd_calls.c, revision 1.3.2.1
1.3.2.1 ! misho 1: /*************************************************************************
! 2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
! 3: * by Michael Pounov <misho@openbsd-bg.org>
! 4: *
! 5: * $Author: misho $
! 6: * $Id: global.h,v 1.4 2012/07/03 08:57:04 misho Exp $
! 7: *
! 8: **************************************************************************
! 9: The ELWIX and AITNET software is distributed under the following
! 10: terms:
! 11:
! 12: All of the documentation and software included in the ELWIX and AITNET
! 13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
! 14:
! 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
! 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
! 17:
! 18: Redistribution and use in source and binary forms, with or without
! 19: modification, are permitted provided that the following conditions
! 20: are met:
! 21: 1. Redistributions of source code must retain the above copyright
! 22: notice, this list of conditions and the following disclaimer.
! 23: 2. Redistributions in binary form must reproduce the above copyright
! 24: notice, this list of conditions and the following disclaimer in the
! 25: documentation and/or other materials provided with the distribution.
! 26: 3. All advertising materials mentioning features or use of this software
! 27: must display the following acknowledgement:
! 28: This product includes software developed by Michael Pounov <misho@elwix.org>
! 29: ELWIX - Embedded LightWeight unIX and its contributors.
! 30: 4. Neither the name of AITNET nor the names of its contributors
! 31: may be used to endorse or promote products derived from this software
! 32: without specific prior written permission.
! 33:
! 34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
! 35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
! 36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
! 37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
! 38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
! 39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
! 40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
! 41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
! 42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
! 43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
! 44: SUCH DAMAGE.
! 45: */
1.2 misho 46: #include "global.h"
47: #include "mqttd.h"
1.3 misho 48: #include "utils.h"
49: #include "rtlm.h"
1.2 misho 50: #include "mqttd_calls.h"
51:
52:
1.3 misho 53: static inline ait_val_t *
54: mkPkt(void * __restrict data, int dlen)
55: {
56: ait_val_t *p = NULL;
57:
58: if (!(p = io_allocVar())) {
59: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
60: return NULL;
61: }
62:
63: if (data && dlen > 0)
64: AIT_SET_BUF(p, data, dlen);
65:
66: return p;
67: }
68:
69: static inline void
70: freePkt(ait_val_t ** __restrict p)
71: {
72: if (!p)
73: return;
74:
75: io_freeVar(p);
76: }
77:
78: static void *
79: sendPacket(sched_task_t *task)
80: {
81: ait_val_t *p = TASK_ARG(task);
82: register int n, slen;
83: u_char *pos;
84:
85: if (!p || AIT_ISEMPTY(p)) {
86: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
87: return NULL;
88: }
89:
90: ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task));
91:
92: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
93: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
94: if (n == -1) {
95: ioSYSERR(0);
96: break;
97: }
98: }
99:
100: freePkt(&p);
101: return NULL;
102: }
103:
104: static int
105: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
106: {
107: regex_t re;
108: regmatch_t match;
109: ait_val_t *p = NULL;
110: struct tagSession *s = NULL;
111: struct tagStore *st_, *st = NULL;
112: char szStr[STRSIZ];
113: int ret;
114:
115: assert(sess);
116:
117: TAILQ_FOREACH(s, &Sessions, sess_node) {
118: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
119: /* check for QoS */
120: if (st->st_subscr.sub_ret >= qos) {
121: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
122: regerror(ret, &re, szStr, sizeof szStr);
123: regfree(&re);
124: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
125: st->st_subscr.sub_topic.msg_base, szStr);
126: }
127: if (!regexec(&re, topic, 1, &match, 0)) {
128: /* MATCH */
129: p = mkPkt(sess->sess_buf->msg_base, datlen);
130: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
131: }
132:
133: regfree(&re);
134: }
135: }
136: }
137:
138: return 0;
139: }
140:
141: /* --------------------------------------------------- */
142:
143: void *
144: sendRetain(sched_task_t *task)
145: {
146: mqtt_subscr_t *subs, *s;
147: struct tagSession *sess;
148: int siz;
149:
150: ioTRACE(2);
151:
152: assert(task);
153:
154: sess = TASK_ARG(task);
155: assert(sess);
156:
157: if (!sess->sess_buf) {
158: ioDEBUG(9, "WARNING! No allocated buffer!?!\n");
159: return NULL;
160: }
161:
162: subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1);
163: if (!subs)
164: return NULL;
165:
166: for (s = subs; s && s->sub_topic.msg_base; s++) {
167: siz = s->sub_value.msg_len;
168: memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base,
169: MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
170: ioDEBUG(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n",
171: siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
172: if (siz > 0)
173: search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
174: }
175:
176: mqtt_subFree(&subs);
177: return NULL;
178: }
179:
1.2 misho 180: int
1.3 misho 181: pubWill(struct tagSession * __restrict sess)
182: {
183: int datlen;
184:
185: ioTRACE(2);
186:
187: /* prepare will packet */
188: datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0,
189: sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
190: if (datlen == -1)
191: return -1; /* error */
192:
193: return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
194: }
195:
196: static int
197: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
198: {
199: return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
200: }
201:
202: static int
203: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
204: {
205: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
206:
207: /* write topic to database */
208: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
209: sess->sess_addr, hdr->mqtt_msg.retain);
210: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
211: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
212: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
213:
214: search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
215:
216: /* delete not retain message */
217: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
218: sess->sess_addr, 0);
219: return 0;
220: }
221:
222: static int
223: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
224: {
225: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
226:
227: /* write topic to database */
228: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
229: sess->sess_addr, hdr->mqtt_msg.retain);
230: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
231: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
232: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
233:
234: return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
235: }
236:
237:
238: int
239: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 240: {
241: struct mqtthdr *hdr;
1.3 misho 242: struct tagSession *sess = (struct tagSession*) arg;
243: char szTopic[STRSIZ] = { 0 };
244: int siz = 0;
245: u_short mid = 0;
246: ait_val_t *p = NULL;
1.2 misho 247:
248: ioTRACE(2);
249:
250: if (!sess)
251: return -1;
252:
1.3 misho 253: ioDEBUG(5, "Exec PUBLISH session");
254: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
255: if (siz == -1) {
256: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
257: return 0;
258: }
259:
1.2 misho 260: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
261: switch (hdr->mqtt_msg.qos) {
262: case MQTT_QOS_ACK:
1.3 misho 263: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
264: return 0;
265: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
266: if (siz == -1) {
267: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
268: mqtt_GetErrno(), mqtt_GetError());
269: return 0;
270: }
1.2 misho 271: break;
272: case MQTT_QOS_EXACTLY:
1.3 misho 273: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
274: return 0;
275: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
276: if (siz == -1) {
277: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
278: mqtt_GetErrno(), mqtt_GetError());
279: return 0;
280: }
1.2 misho 281: break;
1.3 misho 282: case MQTT_QOS_ONCE:
283: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2 misho 284: default:
285: return 0;
286: }
287:
1.3 misho 288: p = mkPkt(sess->sess_buf->msg_base, siz);
289: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
290: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2 misho 291: return 0;
292: }
1.3 misho 293:
294: int
295: cmdPUBREL(void *srv, int len, void *arg)
296: {
297: struct tagSession *sess = (struct tagSession*) arg;
298: int siz = 0;
299: u_short mid = 0;
300: ait_val_t *p = NULL;
301:
302: ioTRACE(2);
303:
304: if (!sess)
305: return -1;
306:
307: ioDEBUG(5, "Exec PUBREL session");
308: mid = mqtt_readPUBREL(sess->sess_buf);
309: if (mid == (u_short) -1) {
310: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
311: return 0;
312: }
313:
314: /* delete not retain message */
315: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user,
316: sess->sess_addr, 0);
317:
318: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
319: if (siz == -1) {
320: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
321: return 0;
322: }
323:
324: p = mkPkt(sess->sess_buf->msg_base, siz);
325: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
326: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
327: return 0;
328: }
329:
330: int
331: cmdSUBSCRIBE(void *srv, int len, void *arg)
332: {
333: struct tagSession *sess = (struct tagSession*) arg;
334: mqtt_subscr_t *subs = NULL;
335: int siz = 0;
336: u_short mid = 0;
337: register int i;
338: struct tagStore *store;
339: char buf[BUFSIZ];
340: void *ptr;
341: ait_val_t *p = NULL;
342:
343: ioTRACE(2);
344:
345: if (!sess)
346: return -1;
347:
348: ioDEBUG(5, "Exec SUBSCRIBE session");
349: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
350: if (siz == -1) {
351: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
352: return 0;
353: }
354:
355: /* add to db */
356: for (i = 0; i < siz; i++) {
357: store = io_malloc(sizeof(struct tagStore));
358: if (!store) {
359: ioSYSERR(0);
360: continue;
361: } else {
362: store->st_msgid = mid;
363: mqtt_subCopy(&store->st_subscr, &subs[i]);
364: subs[i].sub_ret = MQTT_QOS_DENY;
365: }
366:
367: /* add to cache */
368: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
369:
370: /* convert topic to regexp */
371: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
372: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
373: } else {
374: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
375: if (!ptr) {
376: ioSYSERR(0);
377: continue;
378: } else {
379: store->st_subscr.sub_topic.msg_base = ptr;
380: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
381: memcpy(store->st_subscr.sub_topic.msg_base, buf,
382: store->st_subscr.sub_topic.msg_len);
383: }
384:
385: /* store to db */
386: call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
387: sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
388: /* subscribe pass */
389: subs[i].sub_ret = MQTT_QOS_PASS;
390:
391: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid,
392: store->st_subscr.sub_topic.msg_base,
393: store->st_subscr.sub_topic.msg_len,
394: store->st_subscr.sub_ret, sess->sess_addr);
395: }
396: }
397:
398: /* send acknowledge */
399: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
400: if (siz == -1) {
401: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
402: goto end;
403: } else {
404: p = mkPkt(sess->sess_buf->msg_base, siz);
405: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
406: }
407:
408: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
409: end:
410: mqtt_subFree(&subs);
411: return 0;
412: }
413:
414: int
415: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
416: {
417: struct tagSession *sess = (struct tagSession*) arg;
418: mqtt_subscr_t *subs = NULL;
419: int siz = 0;
420: u_short mid = 0;
421: register int i;
422: struct tagStore *store, *tmp;
423: ait_val_t *p = NULL;
424:
425: ioTRACE(2);
426:
427: if (!sess)
428: return -1;
429:
430: ioDEBUG(5, "Exec UNSUBSCRIBE session");
431: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
432: if (siz == -1) {
433: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
434: return 0;
435: }
436:
437: /* del from db */
438: for (i = 0; i < siz; i++) {
439: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
440: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
441: store->st_subscr.sub_topic.msg_base &&
442: !strcmp(store->st_subscr.sub_topic.msg_base,
443: subs[i].sub_topic.msg_base)) {
444: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
445:
446: if (store->st_subscr.sub_topic.msg_base)
447: free(store->st_subscr.sub_topic.msg_base);
448: if (store->st_subscr.sub_value.msg_base)
449: free(store->st_subscr.sub_value.msg_base);
450: io_free(store);
451: }
452: }
453:
454: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
455: sess->sess_user, "%");
456: }
457:
458: /* send acknowledge */
459: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
460: if (siz == -1) {
461: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
462: goto end;
463: } else {
464: p = mkPkt(sess->sess_buf->msg_base, siz);
465: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
466: }
467:
468: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
469: end:
470: mqtt_subFree(&subs);
471: return 0;
472: }
473:
474: int
475: cmdPINGREQ(void *srv, int len, void *arg)
476: {
477: struct tagSession *sess = (struct tagSession*) arg;
478: int siz = 0;
479: ait_val_t *p = NULL;
480:
481: ioTRACE(2);
482:
483: if (!sess)
484: return -1;
485:
486: ioDEBUG(5, "Exec PINGREQ session");
487: siz = mqtt_msgPINGRESP(sess->sess_buf);
488: if (siz == -1) {
489: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
490: return 0;
491: } else {
492: p = mkPkt(sess->sess_buf->msg_base, siz);
493: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
494: }
495:
496: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
497: return 0;
498: }
499:
500: int
501: cmdCONNECT(void *srv, int len, void *arg)
502: {
503: struct tagStore *store;
504: struct tagSession *sess = (struct tagSession*) arg;
505:
506: ioTRACE(2);
507:
508: if (!sess)
509: return -1;
510:
511: ioDEBUG(5, "Exec CONNECT session");
512: TAILQ_REMOVE(&Sessions, sess, sess_node);
513:
514: schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
515:
516: if (sess->sess_clean) {
517: if (call.FiniSessPUB)
518: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
519: if (call.DeletePUB_subscribe)
520: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
521: if (call.WipePUB_topic)
522: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
523: }
524:
525: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
526: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
527:
528: if (store->st_subscr.sub_topic.msg_base)
529: free(store->st_subscr.sub_topic.msg_base);
530: if (store->st_subscr.sub_value.msg_base)
531: free(store->st_subscr.sub_value.msg_base);
532:
533: io_free(store);
534: }
535:
536: if (sess->sess_will.flag)
537: pubWill(sess);
538:
539: if (sess->sess_will.msg)
540: free(sess->sess_will.msg);
541: if (sess->sess_will.topic)
542: free(sess->sess_will.topic);
543:
544: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
545: sess->sess_addr, sess->sess_user);
546:
547: return -3; /* reconnect client */
548: }
549:
550: int
551: cmdDISCONNECT(void *srv, int len, void *arg)
552: {
553: struct tagSession *sess = (struct tagSession*) arg;
554:
555: ioTRACE(2);
556:
557: if (!sess)
558: return -1;
559:
560: ioDEBUG(5, "Exec DISCONNECT session");
561:
562: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
563: sess->sess_addr, sess->sess_user);
564:
565: return -2; /* must terminate dispatcher */
566: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>