1: #include "global.h"
2: #include "mqttd.h"
3: #include "utils.h"
4: #include "rtlm.h"
5: #include "mqttd_calls.h"
6:
7:
8: static inline ait_val_t *
9: mkPkt(void * __restrict data, int dlen)
10: {
11: ait_val_t *p = NULL;
12:
13: if (!(p = io_allocVar())) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: }
17:
18: if (data && dlen > 0)
19: AIT_SET_BUF(p, data, dlen);
20:
21: return p;
22: }
23:
24: static inline void
25: freePkt(ait_val_t ** __restrict p)
26: {
27: if (!p)
28: return;
29:
30: io_freeVar(p);
31: }
32:
33: static void *
34: sendPacket(sched_task_t *task)
35: {
36: ait_val_t *p = TASK_ARG(task);
37: register int n, slen;
38: u_char *pos;
39:
40: if (!p || AIT_ISEMPTY(p)) {
41: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
42: return NULL;
43: }
44:
45: ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), TASK_FD(task));
46:
47: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
48: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
49: if (n == -1) {
50: ioSYSERR(0);
51: break;
52: }
53: }
54:
55: freePkt(&p);
56: return NULL;
57: }
58:
59: static int
60: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
61: {
62: regex_t re;
63: regmatch_t match;
64: ait_val_t *p = NULL;
65: struct tagSession *s = NULL;
66: struct tagStore *st_, *st = NULL;
67: char szStr[STRSIZ];
68: int ret;
69:
70: assert(sess);
71:
72: TAILQ_FOREACH(s, &Sessions, sess_node) {
73: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
74: /* check for QoS */
75: if (st->st_subscr.sub_ret >= qos) {
76: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
77: regerror(ret, &re, szStr, sizeof szStr);
78: regfree(&re);
79: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
80: st->st_subscr.sub_topic.msg_base, szStr);
81: }
82: if (!regexec(&re, topic, 1, &match, 0)) {
83: /* MATCH */
84: p = mkPkt(sess->sess_buf->msg_base, datlen);
85: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
86: }
87:
88: regfree(&re);
89: }
90: }
91: }
92:
93: return 0;
94: }
95:
96: /* --------------------------------------------------- */
97:
98: void *
99: sendRetain(sched_task_t *task)
100: {
101: mqtt_subscr_t *subs, *s;
102: struct tagSession *sess;
103: int siz;
104:
105: ioTRACE(2);
106:
107: assert(task);
108:
109: sess = TASK_ARG(task);
110: assert(sess);
111:
112: if (!sess->sess_buf) {
113: ioDEBUG(9, "WARNING! No allocated buffer!?!\n");
114: return NULL;
115: }
116:
117: subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1);
118: if (!subs)
119: return NULL;
120:
121: for (s = subs; s && s->sub_topic.msg_base; s++) {
122: siz = s->sub_value.msg_len;
123: memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base,
124: MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
125: ioDEBUG(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n",
126: siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
127: if (siz > 0)
128: search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
129: }
130:
131: return NULL;
132: }
133:
134: int
135: pubWill(struct tagSession * __restrict sess)
136: {
137: int datlen;
138:
139: ioTRACE(2);
140:
141: /* prepare will packet */
142: datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0,
143: sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
144: if (datlen == -1)
145: return -1; /* error */
146:
147: return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
148: }
149:
150: static int
151: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
152: {
153: return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
154: }
155:
156: static int
157: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
158: {
159: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
160:
161: /* write topic to database */
162: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
163: sess->sess_addr, hdr->mqtt_msg.retain);
164: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
165: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
166: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
167:
168: search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
169:
170: /* delete not retain message */
171: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
172: sess->sess_addr, 0);
173: return 0;
174: }
175:
176: static int
177: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
178: {
179: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
180:
181: /* write topic to database */
182: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
183: sess->sess_addr, hdr->mqtt_msg.retain);
184: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
185: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
186: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
187:
188: return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
189: }
190:
191:
192: int
193: cmdPUBLISH(void *srv, int len, void *arg)
194: {
195: struct mqtthdr *hdr;
196: struct tagSession *sess = (struct tagSession*) arg;
197: char szTopic[STRSIZ] = { 0 };
198: int siz = 0;
199: u_short mid = 0;
200: ait_val_t *p = NULL;
201:
202: ioTRACE(2);
203:
204: if (!sess)
205: return -1;
206:
207: ioDEBUG(5, "Exec PUBLISH session");
208: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
209: if (siz == -1) {
210: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
211: return 0;
212: }
213:
214: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
215: switch (hdr->mqtt_msg.qos) {
216: case MQTT_QOS_ACK:
217: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
218: return 0;
219: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
220: if (siz == -1) {
221: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
222: mqtt_GetErrno(), mqtt_GetError());
223: return 0;
224: }
225: break;
226: case MQTT_QOS_EXACTLY:
227: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
228: return 0;
229: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
230: if (siz == -1) {
231: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
232: mqtt_GetErrno(), mqtt_GetError());
233: return 0;
234: }
235: break;
236: case MQTT_QOS_ONCE:
237: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
238: default:
239: return 0;
240: }
241:
242: p = mkPkt(sess->sess_buf->msg_base, siz);
243: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
244: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
245: return 0;
246: }
247:
248: int
249: cmdPUBREL(void *srv, int len, void *arg)
250: {
251: struct tagSession *sess = (struct tagSession*) arg;
252: int siz = 0;
253: u_short mid = 0;
254: ait_val_t *p = NULL;
255:
256: ioTRACE(2);
257:
258: if (!sess)
259: return -1;
260:
261: ioDEBUG(5, "Exec PUBREL session");
262: mid = mqtt_readPUBREL(sess->sess_buf);
263: if (mid == (u_short) -1) {
264: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
265: return 0;
266: }
267:
268: /* delete not retain message */
269: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user,
270: sess->sess_addr, 0);
271:
272: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
273: if (siz == -1) {
274: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
275: return 0;
276: }
277:
278: p = mkPkt(sess->sess_buf->msg_base, siz);
279: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
280: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
281: return 0;
282: }
283:
284: int
285: cmdSUBSCRIBE(void *srv, int len, void *arg)
286: {
287: struct tagSession *sess = (struct tagSession*) arg;
288: mqtt_subscr_t *subs = NULL;
289: int siz = 0;
290: u_short mid = 0;
291: register int i;
292: struct tagStore *store;
293: char buf[BUFSIZ];
294: void *ptr;
295: ait_val_t *p = NULL;
296:
297: ioTRACE(2);
298:
299: if (!sess)
300: return -1;
301:
302: ioDEBUG(5, "Exec SUBSCRIBE session");
303: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
304: if (siz == -1) {
305: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
306: return 0;
307: }
308:
309: /* add to db */
310: for (i = 0; i < siz; i++) {
311: store = io_malloc(sizeof(struct tagStore));
312: if (!store) {
313: ioSYSERR(0);
314: continue;
315: } else {
316: store->st_msgid = mid;
317: mqtt_subCopy(&store->st_subscr, &subs[i]);
318: subs[i].sub_ret = MQTT_QOS_DENY;
319: }
320:
321: /* add to cache */
322: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
323:
324: /* convert topic to regexp */
325: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
326: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
327: } else {
328: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
329: if (!ptr) {
330: ioSYSERR(0);
331: continue;
332: } else {
333: store->st_subscr.sub_topic.msg_base = ptr;
334: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
335: memcpy(store->st_subscr.sub_topic.msg_base, buf,
336: store->st_subscr.sub_topic.msg_len);
337: }
338:
339: /* store to db */
340: call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
341: sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
342: /* subscribe pass */
343: subs[i].sub_ret = MQTT_QOS_PASS;
344:
345: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid,
346: store->st_subscr.sub_topic.msg_base,
347: store->st_subscr.sub_topic.msg_len,
348: store->st_subscr.sub_ret, sess->sess_addr);
349: }
350: }
351:
352: /* send acknowledge */
353: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
354: if (siz == -1) {
355: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
356: goto end;
357: } else {
358: p = mkPkt(sess->sess_buf->msg_base, siz);
359: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
360: }
361:
362: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
363: end:
364: mqtt_subFree(&subs);
365: return 0;
366: }
367:
368: int
369: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
370: {
371: struct tagSession *sess = (struct tagSession*) arg;
372: mqtt_subscr_t *subs = NULL;
373: int siz = 0;
374: u_short mid = 0;
375: register int i;
376: struct tagStore *store, *tmp;
377: ait_val_t *p = NULL;
378:
379: ioTRACE(2);
380:
381: if (!sess)
382: return -1;
383:
384: ioDEBUG(5, "Exec UNSUBSCRIBE session");
385: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
386: if (siz == -1) {
387: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
388: return 0;
389: }
390:
391: /* del from db */
392: for (i = 0; i < siz; i++) {
393: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
394: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
395: store->st_subscr.sub_topic.msg_base &&
396: !strcmp(store->st_subscr.sub_topic.msg_base,
397: subs[i].sub_topic.msg_base)) {
398: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
399:
400: if (store->st_subscr.sub_topic.msg_base)
401: free(store->st_subscr.sub_topic.msg_base);
402: if (store->st_subscr.sub_value.msg_base)
403: free(store->st_subscr.sub_value.msg_base);
404: io_free(store);
405: }
406: }
407:
408: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
409: sess->sess_user, "%");
410: }
411:
412: /* send acknowledge */
413: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
414: if (siz == -1) {
415: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
416: goto end;
417: } else {
418: p = mkPkt(sess->sess_buf->msg_base, siz);
419: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
420: }
421:
422: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
423: end:
424: mqtt_subFree(&subs);
425: return 0;
426: }
427:
428: int
429: cmdPINGREQ(void *srv, int len, void *arg)
430: {
431: struct tagSession *sess = (struct tagSession*) arg;
432: int siz = 0;
433: ait_val_t *p = NULL;
434:
435: ioTRACE(2);
436:
437: if (!sess)
438: return -1;
439:
440: ioDEBUG(5, "Exec PINGREQ session");
441: siz = mqtt_msgPINGRESP(sess->sess_buf);
442: if (siz == -1) {
443: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
444: return 0;
445: } else {
446: p = mkPkt(sess->sess_buf->msg_base, siz);
447: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
448: }
449:
450: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
451: return 0;
452: }
453:
454: int
455: cmdCONNECT(void *srv, int len, void *arg)
456: {
457: struct tagStore *store;
458: struct tagSession *sess = (struct tagSession*) arg;
459:
460: ioTRACE(2);
461:
462: if (!sess)
463: return -1;
464:
465: ioDEBUG(5, "Exec CONNECT session");
466: TAILQ_REMOVE(&Sessions, sess, sess_node);
467:
468: schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
469:
470: if (sess->sess_clean) {
471: if (call.FiniSessPUB)
472: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
473: if (call.DeletePUB_subscribe)
474: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
475: if (call.WipePUB_topic)
476: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
477: }
478:
479: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
480: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
481:
482: if (store->st_subscr.sub_topic.msg_base)
483: free(store->st_subscr.sub_topic.msg_base);
484: if (store->st_subscr.sub_value.msg_base)
485: free(store->st_subscr.sub_value.msg_base);
486:
487: io_free(store);
488: }
489:
490: if (sess->sess_will.flag)
491: pubWill(sess);
492:
493: if (sess->sess_will.msg)
494: free(sess->sess_will.msg);
495: if (sess->sess_will.topic)
496: free(sess->sess_will.topic);
497:
498: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
499: sess->sess_addr, sess->sess_user);
500:
501: return -3; /* reconnect client */
502: }
503:
504: int
505: cmdDISCONNECT(void *srv, int len, void *arg)
506: {
507: struct tagSession *sess = (struct tagSession*) arg;
508:
509: ioTRACE(2);
510:
511: if (!sess)
512: return -1;
513:
514: ioDEBUG(5, "Exec DISCONNECT session");
515:
516: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
517: sess->sess_addr, sess->sess_user);
518:
519: return -2; /* must terminate dispatcher */
520: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>