1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline ait_val_t *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: ait_val_t *p = NULL;
11:
12: if (!(p = io_allocVar())) {
13: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
14: return NULL;
15: }
16:
17: if (data && dlen > 0)
18: AIT_SET_BUF(p, data, dlen);
19:
20: return p;
21: }
22:
23: static inline void
24: freePkt(ait_val_t ** __restrict p)
25: {
26: if (!p)
27: return;
28:
29: io_freeVar(p);
30: }
31:
32: static void *
33: sendPacket(sched_task_t *task)
34: {
35: ait_val_t *p = TASK_ARG(task);
36: register int n, slen;
37: u_char *pos;
38:
39: if (!p || AIT_ISEMPTY(p)) {
40: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
41: return NULL;
42: }
43:
44: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
45: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
46: if (n == -1) {
47: ioSYSERR(0);
48: break;
49: }
50: }
51:
52: freePkt(&p);
53: return NULL;
54: }
55:
56: /* --------------------------------------------------- */
57:
58: static int
59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
60: {
61: ait_val_t *p = NULL;
62: struct tagSession *s = NULL;
63: struct tagStore *st = NULL;
64: regex_t re;
65: regmatch_t match;
66: int ret;
67: char szStr[STRSIZ];
68:
69: TAILQ_FOREACH(s, &Sessions, sess_node) {
70: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
71: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
72: regerror(ret, &re, szStr, sizeof szStr);
73: regfree(&re);
74: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
75: st->st_subscr.sub_topic.msg_base, szStr);
76: }
77: if (!regexec(&re, psTopic, 1, &match, 0)) {
78: /* MATCH */
79: p = mkPkt(sess->sess_buf->msg_base, datlen);
80: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
81: }
82:
83: regfree(&re);
84: }
85: }
86:
87: return 0;
88: }
89:
90: static int
91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
92: {
93: ait_val_t *p = NULL;
94: struct tagSession *s = NULL;
95: struct tagStore *st = NULL;
96: regex_t re;
97: regmatch_t match;
98: int ret;
99: char szStr[STRSIZ];
100: struct mqtthdr *hdr;
101:
102: p = mkPkt(sess->sess_buf->msg_base, datlen);
103: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
104:
105: /* write topic to database */
106: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
107: sess->sess_addr, hdr->mqtt_msg.retain);
108: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
109: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
110:
111: TAILQ_FOREACH(s, &Sessions, sess_node) {
112: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
113: /* check for QoS */
114: if (st->st_subscr.sub_ret < MQTT_QOS_ACK)
115: continue;
116:
117: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
118: regerror(ret, &re, szStr, sizeof szStr);
119: regfree(&re);
120: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
121: st->st_subscr.sub_topic.msg_base, szStr);
122: }
123: if (!regexec(&re, psTopic, 1, &match, 0)) {
124: /* MATCH */
125: schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)),
126: s->sess_sock, NULL, 0);
127: }
128:
129: regfree(&re);
130: }
131: }
132:
133: /* delete not retain message */
134: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
135: sess->sess_addr, 0);
136:
137: freePkt(&p);
138: return 0;
139: }
140:
141: static int
142: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
143: {
144: ait_val_t *p = NULL;
145: struct tagSession *s = NULL;
146: struct tagStore *st = NULL;
147: regex_t re;
148: regmatch_t match;
149: int ret;
150: char szStr[STRSIZ];
151: struct mqtthdr *hdr;
152:
153: p = mkPkt(sess->sess_buf->msg_base, datlen);
154: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
155:
156: /* write topic to database */
157: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
158: sess->sess_addr, hdr->mqtt_msg.retain);
159: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
160: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
161:
162: TAILQ_FOREACH(s, &Sessions, sess_node) {
163: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
164: /* check for QoS */
165: if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY)
166: continue;
167:
168: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
169: regerror(ret, &re, szStr, sizeof szStr);
170: regfree(&re);
171: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
172: st->st_subscr.sub_topic.msg_base, szStr);
173: }
174: if (!regexec(&re, psTopic, 1, &match, 0)) {
175: /* MATCH */
176: schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)),
177: s->sess_sock, NULL, 0);
178: }
179:
180: regfree(&re);
181: }
182: }
183:
184: freePkt(&p);
185: return 0;
186: }
187:
188:
189: int
190: cmdPUBLISH(void *srv, int len, void *arg)
191: {
192: struct mqtthdr *hdr;
193: struct tagSession *sess = (struct tagSession*) arg;
194: char szTopic[STRSIZ] = { 0 };
195: int siz = 0;
196: u_short mid = 0;
197: ait_val_t *p = NULL;
198:
199: ioTRACE(2);
200:
201: if (!sess)
202: return -1;
203:
204: ioDEBUG(5, "Exec PUBLISH session");
205: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
206: if (siz == -1) {
207: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
208: return 0;
209: }
210:
211: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
212: switch (hdr->mqtt_msg.qos) {
213: case MQTT_QOS_ACK:
214: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
215: return 0;
216: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
217: if (siz == -1) {
218: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
219: mqtt_GetErrno(), mqtt_GetError());
220: return 0;
221: }
222: break;
223: case MQTT_QOS_EXACTLY:
224: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
225: return 0;
226: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
227: if (siz == -1) {
228: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
229: mqtt_GetErrno(), mqtt_GetError());
230: return 0;
231: }
232: break;
233: case MQTT_QOS_ONCE:
234: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
235: default:
236: return 0;
237: }
238:
239: p = mkPkt(sess->sess_buf->msg_base, siz);
240: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
241: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
242: return 0;
243: }
244:
245: int
246: cmdPUBREL(void *srv, int len, void *arg)
247: {
248: struct tagSession *sess = (struct tagSession*) arg;
249: int siz = 0;
250: u_short mid = 0;
251: ait_val_t *p = NULL;
252:
253: ioTRACE(2);
254:
255: if (!sess)
256: return -1;
257:
258: ioDEBUG(5, "Exec PUBREL session");
259: mid = mqtt_readPUBREL(sess->sess_buf);
260: if (mid == (u_short) -1) {
261: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
262: return 0;
263: }
264:
265: // TODO:: Delete from database topic
266:
267: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
268: if (siz == -1) {
269: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
270: return 0;
271: } else {
272: p = mkPkt(sess->sess_buf->msg_base, siz);
273: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
274: }
275:
276: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
277: return 0;
278: }
279:
280: int
281: cmdSUBSCRIBE(void *srv, int len, void *arg)
282: {
283: struct tagSession *sess = (struct tagSession*) arg;
284: mqtt_subscr_t *subs = NULL;
285: int siz = 0;
286: u_short mid = 0;
287: register int i;
288: struct tagStore *store;
289: char buf[BUFSIZ];
290: void *ptr;
291: ait_val_t *p = NULL;
292:
293: ioTRACE(2);
294:
295: if (!sess)
296: return -1;
297:
298: ioDEBUG(5, "Exec SUBSCRIBE session");
299: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
300: if (siz == -1) {
301: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
302: return 0;
303: }
304:
305: /* add to db */
306: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
307: /* convert topic to sql search statement */
308: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
309: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
310: continue;
311: }
312: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
313: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
314: store = io_malloc(sizeof(struct tagStore));
315: if (!store) {
316: ioSYSERR(0);
317: continue;
318: } else {
319: store->st_msgid = mid;
320: mqtt_subCopy(&store->st_subscr, &subs[i]);
321: }
322:
323: /* add to cache */
324: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
325:
326: /* convert topic to regexp */
327: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
328: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
329:
330: subs[i].sub_ret = MQTT_QOS_DENY;
331: } else {
332: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
333: if (!ptr)
334: ioSYSERR(0);
335: else {
336: store->st_subscr.sub_topic.msg_base = ptr;
337: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
338: memcpy(store->st_subscr.sub_topic.msg_base, buf,
339: store->st_subscr.sub_topic.msg_len);
340: }
341:
342: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
343: store->st_subscr.sub_topic.msg_base,
344: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
345:
346: subs[i].sub_ret = MQTT_QOS_PASS;
347: }
348: }
349: }
350:
351: /* send acknowledge */
352: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
353: if (siz == -1) {
354: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
355: goto end;
356: } else {
357: p = mkPkt(sess->sess_buf->msg_base, siz);
358: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
359: }
360:
361: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
362: end:
363: mqtt_subFree(&subs);
364: return 0;
365: }
366:
367: int
368: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
369: {
370: struct tagSession *sess = (struct tagSession*) arg;
371: mqtt_subscr_t *subs = NULL;
372: int siz = 0;
373: u_short mid = 0;
374: register int i;
375: struct tagStore *store, *tmp;
376: ait_val_t *p = NULL;
377:
378: ioTRACE(2);
379:
380: if (!sess)
381: return -1;
382:
383: ioDEBUG(5, "Exec UNSUBSCRIBE session");
384: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
385: if (siz == -1) {
386: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
387: return 0;
388: }
389:
390: /* del from db */
391: for (i = 0; i < siz; i++) {
392: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
393: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
394: store->st_subscr.sub_topic.msg_base &&
395: !strcmp(store->st_subscr.sub_topic.msg_base,
396: subs[i].sub_topic.msg_base)) {
397: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
398:
399: if (store->st_subscr.sub_topic.msg_base)
400: free(store->st_subscr.sub_topic.msg_base);
401: if (store->st_subscr.sub_value.msg_base)
402: free(store->st_subscr.sub_value.msg_base);
403: io_free(store);
404: }
405: }
406:
407: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
408: sess->sess_user, "%");
409: }
410:
411: /* send acknowledge */
412: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
413: if (siz == -1) {
414: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
415: goto end;
416: } else {
417: p = mkPkt(sess->sess_buf->msg_base, siz);
418: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
419: }
420:
421: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
422: end:
423: mqtt_subFree(&subs);
424: return 0;
425: }
426:
427: int
428: cmdPINGREQ(void *srv, int len, void *arg)
429: {
430: struct tagSession *sess = (struct tagSession*) arg;
431: int siz = 0;
432: ait_val_t *p = NULL;
433:
434: ioTRACE(2);
435:
436: if (!sess)
437: return -1;
438:
439: ioDEBUG(5, "Exec PINGREQ session");
440: siz = mqtt_msgPINGRESP(sess->sess_buf);
441: if (siz == -1) {
442: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
443: return 0;
444: } else {
445: p = mkPkt(sess->sess_buf->msg_base, siz);
446: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
447: }
448:
449: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
450: return 0;
451: }
452:
453: int
454: cmdCONNECT(void *srv, int len, void *arg)
455: {
456: struct tagStore *store;
457: struct tagSession *sess = (struct tagSession*) arg;
458:
459: ioTRACE(2);
460:
461: if (!sess)
462: return -1;
463:
464: ioDEBUG(5, "Exec CONNECT session");
465: TAILQ_REMOVE(&Sessions, sess, sess_node);
466:
467: if (sess->sess_clean) {
468: if (call.FiniSessPUB)
469: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
470: if (call.DeletePUB_subscribe)
471: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
472: if (call.WipePUB_topic)
473: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
474: }
475:
476: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
477: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
478:
479: if (store->st_subscr.sub_topic.msg_base)
480: free(store->st_subscr.sub_topic.msg_base);
481: if (store->st_subscr.sub_value.msg_base)
482: free(store->st_subscr.sub_value.msg_base);
483:
484: io_free(store);
485: }
486:
487: if (sess->sess_will.msg)
488: free(sess->sess_will.msg);
489: if (sess->sess_will.topic)
490: free(sess->sess_will.topic);
491:
492: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
493: sess->sess_addr, sess->sess_user);
494:
495: return -3; /* reconnect client */
496: }
497:
498: int
499: cmdDISCONNECT(void *srv, int len, void *arg)
500: {
501: struct tagSession *sess = (struct tagSession*) arg;
502:
503: ioTRACE(2);
504:
505: if (!sess)
506: return -1;
507:
508: ioDEBUG(5, "Exec DISCONNECT session");
509:
510: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
511: sess->sess_addr, sess->sess_user);
512:
513: return -2; /* must terminate dispatcher */
514: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>