1: #include "global.h"
2: #include "mqttd.h"
3: #include "utils.h"
4: #include "rtlm.h"
5: #include "mqttd_calls.h"
6:
7:
8: static inline ait_val_t *
9: mkPkt(void * __restrict data, int dlen)
10: {
11: ait_val_t *p = NULL;
12:
13: if (!(p = io_allocVar())) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: }
17:
18: if (data && dlen > 0)
19: AIT_SET_BUF(p, data, dlen);
20:
21: return p;
22: }
23:
24: static inline void
25: freePkt(ait_val_t ** __restrict p)
26: {
27: if (!p)
28: return;
29:
30: io_freeVar(p);
31: }
32:
33: static void *
34: sendPacket(sched_task_t *task)
35: {
36: ait_val_t *p = TASK_ARG(task);
37: register int n, slen;
38: u_char *pos;
39:
40: if (!p || AIT_ISEMPTY(p)) {
41: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
42: return NULL;
43: }
44:
45: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
46: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
47: if (n == -1) {
48: ioSYSERR(0);
49: break;
50: }
51: }
52:
53: freePkt(&p);
54: return NULL;
55: }
56:
57: /* --------------------------------------------------- */
58:
59: static int
60: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
61: {
62: ait_val_t *p = NULL;
63: struct tagSession *s = NULL;
64: struct tagStore *st_, *st = NULL;
65: regex_t re;
66: regmatch_t match;
67: int ret;
68: char szStr[STRSIZ];
69:
70:
71: TAILQ_FOREACH(s, &Sessions, sess_node) {
72: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
73: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
74: regerror(ret, &re, szStr, sizeof szStr);
75: regfree(&re);
76: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
77: st->st_subscr.sub_topic.msg_base, szStr);
78: }
79: if (!regexec(&re, psTopic, 1, &match, 0)) {
80: /* MATCH */
81: p = mkPkt(sess->sess_buf->msg_base, datlen);
82: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
83: }
84:
85: regfree(&re);
86: }
87: }
88:
89: return 0;
90: }
91:
92: static int
93: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
94: {
95: ait_val_t *p = NULL;
96: struct tagSession *s = NULL;
97: struct tagStore *st_, *st = NULL;
98: regex_t re;
99: regmatch_t match;
100: int ret;
101: char szStr[STRSIZ];
102: struct mqtthdr *hdr;
103:
104: p = mkPkt(sess->sess_buf->msg_base, datlen);
105: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
106:
107: /* write topic to database */
108: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
109: sess->sess_addr, hdr->mqtt_msg.retain);
110: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
111: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
112:
113: TAILQ_FOREACH(s, &Sessions, sess_node) {
114: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
115: /* check for QoS */
116: if (st->st_subscr.sub_ret < MQTT_QOS_ACK)
117: continue;
118:
119: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
120: regerror(ret, &re, szStr, sizeof szStr);
121: regfree(&re);
122: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
123: st->st_subscr.sub_topic.msg_base, szStr);
124: }
125: if (!regexec(&re, psTopic, 1, &match, 0)) {
126: /* MATCH */
127: schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)),
128: s->sess_sock, NULL, 0);
129: }
130:
131: regfree(&re);
132: }
133: }
134:
135: /* delete not retain message */
136: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
137: sess->sess_addr, 0);
138:
139: freePkt(&p);
140: return 0;
141: }
142:
143: static int
144: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
145: {
146: ait_val_t *p = NULL;
147: struct tagSession *s = NULL;
148: struct tagStore *st_, *st = NULL;
149: regex_t re;
150: regmatch_t match;
151: int ret;
152: char szStr[STRSIZ];
153: struct mqtthdr *hdr;
154:
155: p = mkPkt(sess->sess_buf->msg_base, datlen);
156: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
157:
158: /* write topic to database */
159: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
160: sess->sess_addr, hdr->mqtt_msg.retain);
161: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
162: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
163:
164: TAILQ_FOREACH(s, &Sessions, sess_node) {
165: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
166: /* check for QoS */
167: if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY)
168: continue;
169:
170: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
171: regerror(ret, &re, szStr, sizeof szStr);
172: regfree(&re);
173: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
174: st->st_subscr.sub_topic.msg_base, szStr);
175: }
176: if (!regexec(&re, psTopic, 1, &match, 0)) {
177: /* MATCH */
178: schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)),
179: s->sess_sock, NULL, 0);
180: }
181:
182: regfree(&re);
183: }
184: }
185:
186: freePkt(&p);
187: return 0;
188: }
189:
190:
191: int
192: cmdPUBLISH(void *srv, int len, void *arg)
193: {
194: struct mqtthdr *hdr;
195: struct tagSession *sess = (struct tagSession*) arg;
196: char szTopic[STRSIZ] = { 0 };
197: int siz = 0;
198: u_short mid = 0;
199: ait_val_t *p = NULL;
200:
201: ioTRACE(2);
202:
203: if (!sess)
204: return -1;
205:
206: ioDEBUG(5, "Exec PUBLISH session");
207: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
208: if (siz == -1) {
209: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
210: return 0;
211: }
212:
213: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
214: switch (hdr->mqtt_msg.qos) {
215: case MQTT_QOS_ACK:
216: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
217: return 0;
218: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
219: if (siz == -1) {
220: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
221: mqtt_GetErrno(), mqtt_GetError());
222: return 0;
223: }
224: break;
225: case MQTT_QOS_EXACTLY:
226: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
227: return 0;
228: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
229: if (siz == -1) {
230: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
231: mqtt_GetErrno(), mqtt_GetError());
232: return 0;
233: }
234: break;
235: case MQTT_QOS_ONCE:
236: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
237: default:
238: return 0;
239: }
240:
241: p = mkPkt(sess->sess_buf->msg_base, siz);
242: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
243: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
244: return 0;
245: }
246:
247: int
248: cmdPUBREL(void *srv, int len, void *arg)
249: {
250: struct tagSession *sess = (struct tagSession*) arg;
251: int siz = 0;
252: u_short mid = 0;
253: ait_val_t *p = NULL;
254:
255: ioTRACE(2);
256:
257: if (!sess)
258: return -1;
259:
260: ioDEBUG(5, "Exec PUBREL session");
261: mid = mqtt_readPUBREL(sess->sess_buf);
262: if (mid == (u_short) -1) {
263: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
264: return 0;
265: }
266:
267: /* delete not retain message */
268: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user,
269: sess->sess_addr, 0);
270:
271: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
272: if (siz == -1) {
273: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
274: return 0;
275: }
276:
277: p = mkPkt(sess->sess_buf->msg_base, siz);
278: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
279: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
280: return 0;
281: }
282:
283: int
284: cmdSUBSCRIBE(void *srv, int len, void *arg)
285: {
286: struct tagSession *sess = (struct tagSession*) arg;
287: mqtt_subscr_t *subs = NULL;
288: int siz = 0;
289: u_short mid = 0;
290: register int i;
291: struct tagStore *store;
292: char buf[BUFSIZ];
293: void *ptr;
294: ait_val_t *p = NULL;
295:
296: ioTRACE(2);
297:
298: if (!sess)
299: return -1;
300:
301: ioDEBUG(5, "Exec SUBSCRIBE session");
302: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
303: if (siz == -1) {
304: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
305: return 0;
306: }
307:
308: /* add to db */
309: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
310: store = io_malloc(sizeof(struct tagStore));
311: if (!store) {
312: ioSYSERR(0);
313: continue;
314: } else {
315: store->st_msgid = mid;
316: mqtt_subCopy(&store->st_subscr, &subs[i]);
317: }
318:
319: /* add to cache */
320: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
321:
322: /* convert topic to regexp */
323: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
324: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
325:
326: subs[i].sub_ret = MQTT_QOS_DENY;
327: } else {
328: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
329: if (!ptr) {
330: ioSYSERR(0);
331: continue;
332: } else {
333: store->st_subscr.sub_topic.msg_base = ptr;
334: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
335: memcpy(store->st_subscr.sub_topic.msg_base, buf,
336: store->st_subscr.sub_topic.msg_len);
337: }
338:
339: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
340: store->st_subscr.sub_topic.msg_base,
341: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
342:
343: subs[i].sub_ret = MQTT_QOS_PASS;
344: }
345:
346: call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
347: sess->sess_user, sess->sess_addr, subs[i].sub_ret);
348: }
349:
350: /* send acknowledge */
351: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
352: if (siz == -1) {
353: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
354: goto end;
355: } else {
356: p = mkPkt(sess->sess_buf->msg_base, siz);
357: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
358: }
359:
360: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
361: end:
362: mqtt_subFree(&subs);
363: return 0;
364: }
365:
366: int
367: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
368: {
369: struct tagSession *sess = (struct tagSession*) arg;
370: mqtt_subscr_t *subs = NULL;
371: int siz = 0;
372: u_short mid = 0;
373: register int i;
374: struct tagStore *store, *tmp;
375: ait_val_t *p = NULL;
376:
377: ioTRACE(2);
378:
379: if (!sess)
380: return -1;
381:
382: ioDEBUG(5, "Exec UNSUBSCRIBE session");
383: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
384: if (siz == -1) {
385: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
386: return 0;
387: }
388:
389: /* del from db */
390: for (i = 0; i < siz; i++) {
391: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
392: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
393: store->st_subscr.sub_topic.msg_base &&
394: !strcmp(store->st_subscr.sub_topic.msg_base,
395: subs[i].sub_topic.msg_base)) {
396: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
397:
398: if (store->st_subscr.sub_topic.msg_base)
399: free(store->st_subscr.sub_topic.msg_base);
400: if (store->st_subscr.sub_value.msg_base)
401: free(store->st_subscr.sub_value.msg_base);
402: io_free(store);
403: }
404: }
405:
406: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
407: sess->sess_user, "%");
408: }
409:
410: /* send acknowledge */
411: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
412: if (siz == -1) {
413: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
414: goto end;
415: } else {
416: p = mkPkt(sess->sess_buf->msg_base, siz);
417: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
418: }
419:
420: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
421: end:
422: mqtt_subFree(&subs);
423: return 0;
424: }
425:
426: int
427: cmdPINGREQ(void *srv, int len, void *arg)
428: {
429: struct tagSession *sess = (struct tagSession*) arg;
430: int siz = 0;
431: ait_val_t *p = NULL;
432:
433: ioTRACE(2);
434:
435: if (!sess)
436: return -1;
437:
438: ioDEBUG(5, "Exec PINGREQ session");
439: siz = mqtt_msgPINGRESP(sess->sess_buf);
440: if (siz == -1) {
441: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
442: return 0;
443: } else {
444: p = mkPkt(sess->sess_buf->msg_base, siz);
445: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
446: }
447:
448: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
449: return 0;
450: }
451:
452: int
453: cmdCONNECT(void *srv, int len, void *arg)
454: {
455: struct tagStore *store;
456: struct tagSession *sess = (struct tagSession*) arg;
457:
458: ioTRACE(2);
459:
460: if (!sess)
461: return -1;
462:
463: ioDEBUG(5, "Exec CONNECT session");
464: TAILQ_REMOVE(&Sessions, sess, sess_node);
465:
466: if (sess->sess_clean) {
467: if (call.FiniSessPUB)
468: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
469: if (call.DeletePUB_subscribe)
470: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
471: if (call.WipePUB_topic)
472: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
473: }
474:
475: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
476: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
477:
478: if (store->st_subscr.sub_topic.msg_base)
479: free(store->st_subscr.sub_topic.msg_base);
480: if (store->st_subscr.sub_value.msg_base)
481: free(store->st_subscr.sub_value.msg_base);
482:
483: io_free(store);
484: }
485:
486: if (sess->sess_will.flag)
487: srv_Will(sess);
488:
489: if (sess->sess_will.msg)
490: free(sess->sess_will.msg);
491: if (sess->sess_will.topic)
492: free(sess->sess_will.topic);
493:
494: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
495: sess->sess_addr, sess->sess_user);
496:
497: return -3; /* reconnect client */
498: }
499:
500: int
501: cmdDISCONNECT(void *srv, int len, void *arg)
502: {
503: struct tagSession *sess = (struct tagSession*) arg;
504:
505: ioTRACE(2);
506:
507: if (!sess)
508: return -1;
509:
510: ioDEBUG(5, "Exec DISCONNECT session");
511:
512: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
513: sess->sess_addr, sess->sess_user);
514:
515: return -2; /* must terminate dispatcher */
516: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>