1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline ait_val_t *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: ait_val_t *p = NULL;
11:
12: if (!(p = io_allocVar())) {
13: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
14: return NULL;
15: }
16:
17: if (data && dlen > 0)
18: AIT_SET_BUF(p, data, dlen);
19:
20: return p;
21: }
22:
23: static inline void
24: freePkt(ait_val_t ** __restrict p)
25: {
26: if (!p)
27: return;
28:
29: io_freeVar(p);
30: }
31:
32: static void *
33: sendPacket(sched_task_t *task)
34: {
35: ait_val_t *p = TASK_ARG(task);
36: register int n, slen;
37: u_char *pos;
38:
39: if (!p || AIT_ISEMPTY(p)) {
40: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
41: return NULL;
42: }
43:
44: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
45: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
46: if (n == -1) {
47: ioSYSERR(0);
48: break;
49: }
50: }
51:
52: freePkt(&p);
53: return NULL;
54: }
55:
56: /* --------------------------------------------------- */
57:
58: static int
59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
60: {
61: ait_val_t *p = NULL;
62: struct tagSession *s = NULL;
63: struct tagStore *st = NULL;
64: regex_t re;
65: regmatch_t match;
66: int ret;
67: char szStr[STRSIZ];
68:
69: TAILQ_FOREACH(s, &Sessions, sess_node) {
70: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
71: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
72: regerror(ret, &re, szStr, sizeof szStr);
73: regfree(&re);
74: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
75: st->st_subscr.sub_topic.msg_base, szStr);
76: }
77: if (!regexec(&re, psTopic, 1, &match, 0)) {
78: /* MATCH */
79: p = mkPkt(sess->sess_buf->msg_base, datlen);
80: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
81: }
82:
83: regfree(&re);
84: }
85: }
86:
87: return 0;
88: }
89:
90: static int
91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
92: {
93: ait_val_t *p = NULL;
94: struct tagSession *s = NULL;
95: struct tagStore *st = NULL;
96: regex_t re;
97: regmatch_t match;
98: int ret;
99: char szStr[STRSIZ];
100: struct mqtthdr *hdr;
101:
102: p = mkPkt(sess->sess_buf->msg_base, datlen);
103: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
104:
105: /* write topic to database */
106: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
107: sess->sess_addr, hdr->mqtt_msg.retain);
108: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
109: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
110:
111: TAILQ_FOREACH(s, &Sessions, sess_node) {
112: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
113: /* check for QoS */
114: if (st->st_subscr.sub_ret < MQTT_QOS_ACK)
115: continue;
116:
117: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
118: regerror(ret, &re, szStr, sizeof szStr);
119: regfree(&re);
120: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
121: st->st_subscr.sub_topic.msg_base, szStr);
122: }
123: if (!regexec(&re, psTopic, 1, &match, 0)) {
124: /* MATCH */
125: schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)),
126: s->sess_sock, NULL, 0);
127: }
128:
129: regfree(&re);
130: }
131: }
132:
133: /* delete not retain message */
134: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
135: sess->sess_addr, 0);
136:
137: freePkt(&p);
138: return 0;
139: }
140:
141: static int
142: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
143: {
144: ait_val_t *p = NULL;
145: struct tagSession *s = NULL;
146: struct tagStore *st = NULL;
147: regex_t re;
148: regmatch_t match;
149: int ret;
150: char szStr[STRSIZ];
151: struct mqtthdr *hdr;
152:
153: p = mkPkt(sess->sess_buf->msg_base, datlen);
154: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
155:
156: /* write topic to database */
157: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
158: sess->sess_addr, hdr->mqtt_msg.retain);
159: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
160: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
161:
162: TAILQ_FOREACH(s, &Sessions, sess_node) {
163: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
164: /* check for QoS */
165: if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY)
166: continue;
167:
168: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
169: regerror(ret, &re, szStr, sizeof szStr);
170: regfree(&re);
171: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
172: st->st_subscr.sub_topic.msg_base, szStr);
173: }
174: if (!regexec(&re, psTopic, 1, &match, 0)) {
175: /* MATCH */
176: schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)),
177: s->sess_sock, NULL, 0);
178: }
179:
180: regfree(&re);
181: }
182: }
183:
184: freePkt(&p);
185: return 0;
186: }
187:
188:
189: int
190: cmdPUBLISH(void *srv, int len, void *arg)
191: {
192: struct mqtthdr *hdr;
193: struct tagSession *sess = (struct tagSession*) arg;
194: char szTopic[STRSIZ] = { 0 };
195: int siz = 0;
196: u_short mid = 0;
197: ait_val_t *p = NULL;
198:
199: ioTRACE(2);
200:
201: if (!sess)
202: return -1;
203:
204: ioDEBUG(5, "Exec PUBLISH session");
205: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
206: if (siz == -1) {
207: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
208: return 0;
209: }
210:
211: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
212: switch (hdr->mqtt_msg.qos) {
213: case MQTT_QOS_ACK:
214: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
215: return 0;
216: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
217: if (siz == -1) {
218: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
219: mqtt_GetErrno(), mqtt_GetError());
220: return 0;
221: }
222: break;
223: case MQTT_QOS_EXACTLY:
224: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
225: return 0;
226: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
227: if (siz == -1) {
228: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
229: mqtt_GetErrno(), mqtt_GetError());
230: return 0;
231: }
232: break;
233: case MQTT_QOS_ONCE:
234: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
235: default:
236: return 0;
237: }
238:
239: p = mkPkt(sess->sess_buf->msg_base, siz);
240: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
241: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
242: return 0;
243: }
244:
245: int
246: cmdPUBREL(void *srv, int len, void *arg)
247: {
248: struct tagSession *sess = (struct tagSession*) arg;
249: int siz = 0;
250: u_short mid = 0;
251: ait_val_t *p = NULL;
252:
253: ioTRACE(2);
254:
255: if (!sess)
256: return -1;
257:
258: ioDEBUG(5, "Exec PUBREL session");
259: mid = mqtt_readPUBREL(sess->sess_buf);
260: if (mid == (u_short) -1) {
261: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
262: return 0;
263: }
264:
265: /* delete not retain message */
266: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user,
267: sess->sess_addr, 0);
268:
269: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
270: if (siz == -1) {
271: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
272: return 0;
273: }
274:
275: p = mkPkt(sess->sess_buf->msg_base, siz);
276: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
277: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
278: return 0;
279: }
280:
281: int
282: cmdSUBSCRIBE(void *srv, int len, void *arg)
283: {
284: struct tagSession *sess = (struct tagSession*) arg;
285: mqtt_subscr_t *subs = NULL;
286: int siz = 0;
287: u_short mid = 0;
288: register int i;
289: struct tagStore *store;
290: char buf[BUFSIZ];
291: void *ptr;
292: ait_val_t *p = NULL;
293:
294: ioTRACE(2);
295:
296: if (!sess)
297: return -1;
298:
299: ioDEBUG(5, "Exec SUBSCRIBE session");
300: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
301: if (siz == -1) {
302: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
303: return 0;
304: }
305:
306: /* add to db */
307: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
308: /* convert topic to sql search statement */
309: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
310: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
311: continue;
312: }
313: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
314: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
315: store = io_malloc(sizeof(struct tagStore));
316: if (!store) {
317: ioSYSERR(0);
318: continue;
319: } else {
320: store->st_msgid = mid;
321: mqtt_subCopy(&store->st_subscr, &subs[i]);
322: }
323:
324: /* add to cache */
325: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
326:
327: /* convert topic to regexp */
328: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
329: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
330:
331: subs[i].sub_ret = MQTT_QOS_DENY;
332: } else {
333: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
334: if (!ptr)
335: ioSYSERR(0);
336: else {
337: store->st_subscr.sub_topic.msg_base = ptr;
338: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
339: memcpy(store->st_subscr.sub_topic.msg_base, buf,
340: store->st_subscr.sub_topic.msg_len);
341: }
342:
343: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
344: store->st_subscr.sub_topic.msg_base,
345: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
346:
347: subs[i].sub_ret = MQTT_QOS_PASS;
348: }
349: }
350: }
351:
352: /* send acknowledge */
353: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
354: if (siz == -1) {
355: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
356: goto end;
357: } else {
358: p = mkPkt(sess->sess_buf->msg_base, siz);
359: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
360: }
361:
362: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
363: end:
364: mqtt_subFree(&subs);
365: return 0;
366: }
367:
368: int
369: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
370: {
371: struct tagSession *sess = (struct tagSession*) arg;
372: mqtt_subscr_t *subs = NULL;
373: int siz = 0;
374: u_short mid = 0;
375: register int i;
376: struct tagStore *store, *tmp;
377: ait_val_t *p = NULL;
378:
379: ioTRACE(2);
380:
381: if (!sess)
382: return -1;
383:
384: ioDEBUG(5, "Exec UNSUBSCRIBE session");
385: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
386: if (siz == -1) {
387: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
388: return 0;
389: }
390:
391: /* del from db */
392: for (i = 0; i < siz; i++) {
393: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
394: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
395: store->st_subscr.sub_topic.msg_base &&
396: !strcmp(store->st_subscr.sub_topic.msg_base,
397: subs[i].sub_topic.msg_base)) {
398: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
399:
400: if (store->st_subscr.sub_topic.msg_base)
401: free(store->st_subscr.sub_topic.msg_base);
402: if (store->st_subscr.sub_value.msg_base)
403: free(store->st_subscr.sub_value.msg_base);
404: io_free(store);
405: }
406: }
407:
408: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
409: sess->sess_user, "%");
410: }
411:
412: /* send acknowledge */
413: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
414: if (siz == -1) {
415: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
416: goto end;
417: } else {
418: p = mkPkt(sess->sess_buf->msg_base, siz);
419: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
420: }
421:
422: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
423: end:
424: mqtt_subFree(&subs);
425: return 0;
426: }
427:
428: int
429: cmdPINGREQ(void *srv, int len, void *arg)
430: {
431: struct tagSession *sess = (struct tagSession*) arg;
432: int siz = 0;
433: ait_val_t *p = NULL;
434:
435: ioTRACE(2);
436:
437: if (!sess)
438: return -1;
439:
440: ioDEBUG(5, "Exec PINGREQ session");
441: siz = mqtt_msgPINGRESP(sess->sess_buf);
442: if (siz == -1) {
443: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
444: return 0;
445: } else {
446: p = mkPkt(sess->sess_buf->msg_base, siz);
447: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
448: }
449:
450: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
451: return 0;
452: }
453:
454: int
455: cmdCONNECT(void *srv, int len, void *arg)
456: {
457: struct tagStore *store;
458: struct tagSession *sess = (struct tagSession*) arg;
459:
460: ioTRACE(2);
461:
462: if (!sess)
463: return -1;
464:
465: ioDEBUG(5, "Exec CONNECT session");
466: TAILQ_REMOVE(&Sessions, sess, sess_node);
467:
468: if (sess->sess_clean) {
469: if (call.FiniSessPUB)
470: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
471: if (call.DeletePUB_subscribe)
472: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
473: if (call.WipePUB_topic)
474: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
475: }
476:
477: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
478: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
479:
480: if (store->st_subscr.sub_topic.msg_base)
481: free(store->st_subscr.sub_topic.msg_base);
482: if (store->st_subscr.sub_value.msg_base)
483: free(store->st_subscr.sub_value.msg_base);
484:
485: io_free(store);
486: }
487:
488: if (sess->sess_will.msg)
489: free(sess->sess_will.msg);
490: if (sess->sess_will.topic)
491: free(sess->sess_will.topic);
492:
493: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
494: sess->sess_addr, sess->sess_user);
495:
496: return -3; /* reconnect client */
497: }
498:
499: int
500: cmdDISCONNECT(void *srv, int len, void *arg)
501: {
502: struct tagSession *sess = (struct tagSession*) arg;
503:
504: ioTRACE(2);
505:
506: if (!sess)
507: return -1;
508:
509: ioDEBUG(5, "Exec DISCONNECT session");
510:
511: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
512: sess->sess_addr, sess->sess_user);
513:
514: return -2; /* must terminate dispatcher */
515: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>