1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline ait_val_t *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: ait_val_t *p = NULL;
11:
12: if (!(p = io_allocVar())) {
13: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
14: return NULL;
15: }
16:
17: if (data && dlen > 0)
18: AIT_SET_BUF(p, data, dlen);
19:
20: return p;
21: }
22:
23: static inline void
24: freePkt(ait_val_t ** __restrict p)
25: {
26: if (!p)
27: return;
28:
29: io_freeVar(p);
30: }
31:
32: static void *
33: sendPacket(sched_task_t *task)
34: {
35: ait_val_t *p = TASK_ARG(task);
36: register int n, slen;
37: u_char *pos;
38:
39: if (!p || AIT_ISEMPTY(p)) {
40: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
41: return NULL;
42: }
43:
44: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
45: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
46: if (n == -1) {
47: ioSYSERR(0);
48: break;
49: }
50: }
51:
52: freePkt(&p);
53: return NULL;
54: }
55:
56: /* --------------------------------------------------- */
57:
58: static int
59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
60: {
61: ait_val_t *p = NULL;
62: struct tagSession *s = NULL;
63: struct tagStore *st = NULL;
64: regex_t re;
65: regmatch_t match;
66: int ret;
67: char szStr[STRSIZ];
68:
69: TAILQ_FOREACH(s, &Sessions, sess_node) {
70: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
71: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
72: regerror(ret, &re, szStr, sizeof szStr);
73: regfree(&re);
74: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
75: st->st_subscr.sub_topic.msg_base, szStr);
76: }
77: if (!regexec(&re, psTopic, 1, &match, 0)) {
78: /* MATCH */
79: p = mkPkt(sess->sess_buf->msg_base, datlen);
80: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
81: }
82:
83: regfree(&re);
84: }
85: }
86:
87: return 0;
88: }
89:
90: static int
91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
92: {
93: ait_val_t *p = NULL;
94: struct tagSession *s = NULL;
95: struct tagStore *st = NULL;
96: regex_t re;
97: regmatch_t match;
98: int ret, flg = 0;
99: char szStr[STRSIZ];
100: struct mqtthdr *hdr;
101:
102: p = mkPkt(sess->sess_buf->msg_base, datlen);
103: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
104:
105: /* write topic to database */
106: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
107: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.retain);
108:
109: TAILQ_FOREACH(s, &Sessions, sess_node) {
110: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
111: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
112: regerror(ret, &re, szStr, sizeof szStr);
113: regfree(&re);
114: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
115: st->st_subscr.sub_topic.msg_base, szStr);
116: }
117: if (!regexec(&re, psTopic, 1, &match, 0)) {
118: flg = 1; /* MATCH */
119: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
120: }
121:
122: regfree(&re);
123: }
124: }
125:
126: /* if subscriber not found then free packet */
127: if (!flg)
128: freePkt(&p);
129:
130: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, sess->sess_addr, -1);
131: return 0;
132: }
133:
134: static int
135: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
136: {
137: return 0;
138: }
139:
140:
141: int
142: cmdPUBLISH(void *srv, int len, void *arg)
143: {
144: struct mqtthdr *hdr;
145: struct tagSession *sess = (struct tagSession*) arg;
146: char szTopic[STRSIZ] = { 0 };
147: int siz = 0;
148: u_short mid = 0;
149: ait_val_t *p = NULL;
150:
151: ioTRACE(2);
152:
153: if (!sess)
154: return -1;
155:
156: ioDEBUG(5, "Exec PUBLISH session");
157: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
158: if (siz == -1) {
159: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
160: return 0;
161: }
162:
163: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
164: switch (hdr->mqtt_msg.qos) {
165: case MQTT_QOS_ACK:
166: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
167: return 0;
168: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
169: if (siz == -1) {
170: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
171: mqtt_GetErrno(), mqtt_GetError());
172: return 0;
173: }
174: break;
175: case MQTT_QOS_EXACTLY:
176: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
177: return 0;
178: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
179: if (siz == -1) {
180: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
181: mqtt_GetErrno(), mqtt_GetError());
182: return 0;
183: }
184: break;
185: case MQTT_QOS_ONCE:
186: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
187: default:
188: return 0;
189: }
190:
191: p = mkPkt(sess->sess_buf->msg_base, siz);
192: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
193: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
194: return 0;
195: }
196:
197: int
198: cmdPUBREL(void *srv, int len, void *arg)
199: {
200: struct tagSession *sess = (struct tagSession*) arg;
201: int siz = 0;
202: u_short mid = 0;
203: ait_val_t *p = NULL;
204:
205: ioTRACE(2);
206:
207: if (!sess)
208: return -1;
209:
210: ioDEBUG(5, "Exec PUBREL session");
211: mid = mqtt_readPUBREL(sess->sess_buf);
212: if (mid == (u_short) -1) {
213: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
214: return 0;
215: }
216:
217: // TODO:: Delete from database topic
218:
219: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
220: if (siz == -1) {
221: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
222: return 0;
223: } else {
224: p = mkPkt(sess->sess_buf->msg_base, siz);
225: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
226: }
227:
228: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
229: return 0;
230: }
231:
232: int
233: cmdSUBSCRIBE(void *srv, int len, void *arg)
234: {
235: struct tagSession *sess = (struct tagSession*) arg;
236: mqtt_subscr_t *subs = NULL;
237: int siz = 0;
238: u_short mid = 0;
239: register int i;
240: struct tagStore *store;
241: char buf[BUFSIZ];
242: void *ptr;
243: ait_val_t *p = NULL;
244:
245: ioTRACE(2);
246:
247: if (!sess)
248: return -1;
249:
250: ioDEBUG(5, "Exec SUBSCRIBE session");
251: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
252: if (siz == -1) {
253: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
254: return 0;
255: }
256:
257: /* add to db */
258: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
259: /* convert topic to sql search statement */
260: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
261: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
262: continue;
263: }
264: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
265: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
266: store = io_malloc(sizeof(struct tagStore));
267: if (!store) {
268: ioSYSERR(0);
269: continue;
270: } else {
271: store->st_msgid = mid;
272: mqtt_subCopy(&store->st_subscr, &subs[i]);
273: }
274:
275: /* add to cache */
276: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
277:
278: /* convert topic to regexp */
279: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
280: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
281:
282: subs[i].sub_ret = MQTT_QOS_DENY;
283: } else {
284: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
285: if (!ptr)
286: ioSYSERR(0);
287: else {
288: store->st_subscr.sub_topic.msg_base = ptr;
289: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
290: memcpy(store->st_subscr.sub_topic.msg_base, buf,
291: store->st_subscr.sub_topic.msg_len);
292: }
293:
294: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
295: store->st_subscr.sub_topic.msg_base,
296: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
297:
298: subs[i].sub_ret = MQTT_QOS_PASS;
299: }
300: }
301: }
302:
303: /* send acknowledge */
304: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
305: if (siz == -1) {
306: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
307: goto end;
308: } else {
309: p = mkPkt(sess->sess_buf->msg_base, siz);
310: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
311: }
312:
313: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
314: end:
315: mqtt_subFree(&subs);
316: return 0;
317: }
318:
319: int
320: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
321: {
322: struct tagSession *sess = (struct tagSession*) arg;
323: mqtt_subscr_t *subs = NULL;
324: int siz = 0;
325: u_short mid = 0;
326: register int i;
327: struct tagStore *store, *tmp;
328: ait_val_t *p = NULL;
329:
330: ioTRACE(2);
331:
332: if (!sess)
333: return -1;
334:
335: ioDEBUG(5, "Exec UNSUBSCRIBE session");
336: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
337: if (siz == -1) {
338: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
339: return 0;
340: }
341:
342: /* del from db */
343: for (i = 0; i < siz; i++) {
344: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
345: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
346: store->st_subscr.sub_topic.msg_base &&
347: !strcmp(store->st_subscr.sub_topic.msg_base,
348: subs[i].sub_topic.msg_base)) {
349: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
350:
351: if (store->st_subscr.sub_topic.msg_base)
352: free(store->st_subscr.sub_topic.msg_base);
353: if (store->st_subscr.sub_value.msg_base)
354: free(store->st_subscr.sub_value.msg_base);
355: io_free(store);
356: }
357: }
358:
359: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
360: sess->sess_user, "%");
361: }
362:
363: /* send acknowledge */
364: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
365: if (siz == -1) {
366: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
367: goto end;
368: } else {
369: p = mkPkt(sess->sess_buf->msg_base, siz);
370: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
371: }
372:
373: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
374: end:
375: mqtt_subFree(&subs);
376: return 0;
377: }
378:
379: int
380: cmdPINGREQ(void *srv, int len, void *arg)
381: {
382: struct tagSession *sess = (struct tagSession*) arg;
383: int siz = 0;
384: ait_val_t *p = NULL;
385:
386: ioTRACE(2);
387:
388: if (!sess)
389: return -1;
390:
391: ioDEBUG(5, "Exec PINGREQ session");
392: siz = mqtt_msgPINGRESP(sess->sess_buf);
393: if (siz == -1) {
394: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
395: return 0;
396: } else {
397: p = mkPkt(sess->sess_buf->msg_base, siz);
398: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
399: }
400:
401: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
402: return 0;
403: }
404:
405: int
406: cmdCONNECT(void *srv, int len, void *arg)
407: {
408: struct tagStore *store;
409: struct tagSession *sess = (struct tagSession*) arg;
410:
411: ioTRACE(2);
412:
413: if (!sess)
414: return -1;
415:
416: ioDEBUG(5, "Exec CONNECT session");
417: TAILQ_REMOVE(&Sessions, sess, sess_node);
418:
419: if (sess->sess_clean) {
420: if (call.FiniSessPUB)
421: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
422: if (call.DeletePUB_subscribe)
423: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
424: if (call.WipePUB_topic)
425: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
426: }
427:
428: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
429: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
430:
431: if (store->st_subscr.sub_topic.msg_base)
432: free(store->st_subscr.sub_topic.msg_base);
433: if (store->st_subscr.sub_value.msg_base)
434: free(store->st_subscr.sub_value.msg_base);
435:
436: io_free(store);
437: }
438:
439: if (sess->sess_will.msg)
440: free(sess->sess_will.msg);
441: if (sess->sess_will.topic)
442: free(sess->sess_will.topic);
443:
444: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
445: sess->sess_addr, sess->sess_user);
446:
447: return -3; /* reconnect client */
448: }
449:
450: int
451: cmdDISCONNECT(void *srv, int len, void *arg)
452: {
453: struct tagSession *sess = (struct tagSession*) arg;
454:
455: ioTRACE(2);
456:
457: if (!sess)
458: return -1;
459:
460: ioDEBUG(5, "Exec DISCONNECT session");
461:
462: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
463: sess->sess_addr, sess->sess_user);
464:
465: return -2; /* must terminate dispatcher */
466: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>