1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline ait_val_t *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: ait_val_t *p = NULL;
11:
12: if (!(p = io_allocVar())) {
13: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
14: return NULL;
15: }
16:
17: if (data && dlen > 0)
18: AIT_SET_BUF(p, data, dlen);
19:
20: return p;
21: }
22:
23: static inline void
24: freePkt(ait_val_t ** __restrict p)
25: {
26: if (!p)
27: return;
28:
29: io_freeVar(p);
30: }
31:
32: static void *
33: sendPacket(sched_task_t *task)
34: {
35: ait_val_t *p = TASK_ARG(task);
36: register int n, slen;
37: u_char *pos;
38:
39: if (!p || AIT_ISEMPTY(p)) {
40: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
41: return NULL;
42: }
43:
44: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
45: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
46: if (n == -1) {
47: ioSYSERR(0);
48: break;
49: }
50: }
51:
52: freePkt(&p);
53: return NULL;
54: }
55:
56: /* --------------------------------------------------- */
57:
58: static int
59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
60: {
61: ait_val_t *p = NULL;
62: struct tagSession *s = NULL;
63: struct tagStore *st = NULL;
64: regex_t re;
65: regmatch_t match;
66: int ret;
67: char szStr[STRSIZ];
68:
69: TAILQ_FOREACH(s, &Sessions, sess_node) {
70: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
71: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
72: regerror(ret, &re, szStr, sizeof szStr);
73: regfree(&re);
74: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
75: st->st_subscr.sub_topic.msg_base, szStr);
76: }
77: if (!regexec(&re, psTopic, 1, &match, 0)) {
78: /* MATCH */
79: p = mkPkt(sess->sess_buf->msg_base, datlen);
80: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
81: }
82:
83: regfree(&re);
84: }
85: }
86:
87: return 0;
88: }
89:
90: static int
91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
92: {
93: ait_val_t *p = NULL;
94: struct tagSession *s = NULL;
95: struct tagStore *st = NULL;
96: regex_t re;
97: regmatch_t match;
98: int ret, flg = 0;
99: char szStr[STRSIZ];
100: struct mqtthdr *hdr;
101:
102: p = mkPkt(sess->sess_buf->msg_base, datlen);
103: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
104:
105: /* write topic to database */
106: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
107: sess->sess_addr, hdr->mqtt_msg.retain);
108: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
109: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
110:
111: TAILQ_FOREACH(s, &Sessions, sess_node) {
112: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
113: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
114: regerror(ret, &re, szStr, sizeof szStr);
115: regfree(&re);
116: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
117: st->st_subscr.sub_topic.msg_base, szStr);
118: }
119: if (!regexec(&re, psTopic, 1, &match, 0)) {
120: flg = 1; /* MATCH */
121: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
122: }
123:
124: regfree(&re);
125: }
126: }
127:
128: /* if subscriber not found then free packet */
129: if (!flg)
130: freePkt(&p);
131:
132: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
133: sess->sess_addr, 0);
134: return 0;
135: }
136:
137: static int
138: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
139: {
140: return 0;
141: }
142:
143:
144: int
145: cmdPUBLISH(void *srv, int len, void *arg)
146: {
147: struct mqtthdr *hdr;
148: struct tagSession *sess = (struct tagSession*) arg;
149: char szTopic[STRSIZ] = { 0 };
150: int siz = 0;
151: u_short mid = 0;
152: ait_val_t *p = NULL;
153:
154: ioTRACE(2);
155:
156: if (!sess)
157: return -1;
158:
159: ioDEBUG(5, "Exec PUBLISH session");
160: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
161: if (siz == -1) {
162: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
163: return 0;
164: }
165:
166: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
167: switch (hdr->mqtt_msg.qos) {
168: case MQTT_QOS_ACK:
169: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
170: return 0;
171: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
172: if (siz == -1) {
173: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
174: mqtt_GetErrno(), mqtt_GetError());
175: return 0;
176: }
177: break;
178: case MQTT_QOS_EXACTLY:
179: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
180: return 0;
181: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
182: if (siz == -1) {
183: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
184: mqtt_GetErrno(), mqtt_GetError());
185: return 0;
186: }
187: break;
188: case MQTT_QOS_ONCE:
189: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
190: default:
191: return 0;
192: }
193:
194: p = mkPkt(sess->sess_buf->msg_base, siz);
195: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
196: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
197: return 0;
198: }
199:
200: int
201: cmdPUBREL(void *srv, int len, void *arg)
202: {
203: struct tagSession *sess = (struct tagSession*) arg;
204: int siz = 0;
205: u_short mid = 0;
206: ait_val_t *p = NULL;
207:
208: ioTRACE(2);
209:
210: if (!sess)
211: return -1;
212:
213: ioDEBUG(5, "Exec PUBREL session");
214: mid = mqtt_readPUBREL(sess->sess_buf);
215: if (mid == (u_short) -1) {
216: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
217: return 0;
218: }
219:
220: // TODO:: Delete from database topic
221:
222: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
223: if (siz == -1) {
224: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
225: return 0;
226: } else {
227: p = mkPkt(sess->sess_buf->msg_base, siz);
228: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
229: }
230:
231: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
232: return 0;
233: }
234:
235: int
236: cmdSUBSCRIBE(void *srv, int len, void *arg)
237: {
238: struct tagSession *sess = (struct tagSession*) arg;
239: mqtt_subscr_t *subs = NULL;
240: int siz = 0;
241: u_short mid = 0;
242: register int i;
243: struct tagStore *store;
244: char buf[BUFSIZ];
245: void *ptr;
246: ait_val_t *p = NULL;
247:
248: ioTRACE(2);
249:
250: if (!sess)
251: return -1;
252:
253: ioDEBUG(5, "Exec SUBSCRIBE session");
254: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
255: if (siz == -1) {
256: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
257: return 0;
258: }
259:
260: /* add to db */
261: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
262: /* convert topic to sql search statement */
263: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
264: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
265: continue;
266: }
267: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
268: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
269: store = io_malloc(sizeof(struct tagStore));
270: if (!store) {
271: ioSYSERR(0);
272: continue;
273: } else {
274: store->st_msgid = mid;
275: mqtt_subCopy(&store->st_subscr, &subs[i]);
276: }
277:
278: /* add to cache */
279: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
280:
281: /* convert topic to regexp */
282: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
283: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
284:
285: subs[i].sub_ret = MQTT_QOS_DENY;
286: } else {
287: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
288: if (!ptr)
289: ioSYSERR(0);
290: else {
291: store->st_subscr.sub_topic.msg_base = ptr;
292: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
293: memcpy(store->st_subscr.sub_topic.msg_base, buf,
294: store->st_subscr.sub_topic.msg_len);
295: }
296:
297: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
298: store->st_subscr.sub_topic.msg_base,
299: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
300:
301: subs[i].sub_ret = MQTT_QOS_PASS;
302: }
303: }
304: }
305:
306: /* send acknowledge */
307: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
308: if (siz == -1) {
309: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
310: goto end;
311: } else {
312: p = mkPkt(sess->sess_buf->msg_base, siz);
313: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
314: }
315:
316: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
317: end:
318: mqtt_subFree(&subs);
319: return 0;
320: }
321:
322: int
323: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
324: {
325: struct tagSession *sess = (struct tagSession*) arg;
326: mqtt_subscr_t *subs = NULL;
327: int siz = 0;
328: u_short mid = 0;
329: register int i;
330: struct tagStore *store, *tmp;
331: ait_val_t *p = NULL;
332:
333: ioTRACE(2);
334:
335: if (!sess)
336: return -1;
337:
338: ioDEBUG(5, "Exec UNSUBSCRIBE session");
339: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
340: if (siz == -1) {
341: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
342: return 0;
343: }
344:
345: /* del from db */
346: for (i = 0; i < siz; i++) {
347: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
348: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
349: store->st_subscr.sub_topic.msg_base &&
350: !strcmp(store->st_subscr.sub_topic.msg_base,
351: subs[i].sub_topic.msg_base)) {
352: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
353:
354: if (store->st_subscr.sub_topic.msg_base)
355: free(store->st_subscr.sub_topic.msg_base);
356: if (store->st_subscr.sub_value.msg_base)
357: free(store->st_subscr.sub_value.msg_base);
358: io_free(store);
359: }
360: }
361:
362: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
363: sess->sess_user, "%");
364: }
365:
366: /* send acknowledge */
367: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
368: if (siz == -1) {
369: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
370: goto end;
371: } else {
372: p = mkPkt(sess->sess_buf->msg_base, siz);
373: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
374: }
375:
376: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
377: end:
378: mqtt_subFree(&subs);
379: return 0;
380: }
381:
382: int
383: cmdPINGREQ(void *srv, int len, void *arg)
384: {
385: struct tagSession *sess = (struct tagSession*) arg;
386: int siz = 0;
387: ait_val_t *p = NULL;
388:
389: ioTRACE(2);
390:
391: if (!sess)
392: return -1;
393:
394: ioDEBUG(5, "Exec PINGREQ session");
395: siz = mqtt_msgPINGRESP(sess->sess_buf);
396: if (siz == -1) {
397: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
398: return 0;
399: } else {
400: p = mkPkt(sess->sess_buf->msg_base, siz);
401: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
402: }
403:
404: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
405: return 0;
406: }
407:
408: int
409: cmdCONNECT(void *srv, int len, void *arg)
410: {
411: struct tagStore *store;
412: struct tagSession *sess = (struct tagSession*) arg;
413:
414: ioTRACE(2);
415:
416: if (!sess)
417: return -1;
418:
419: ioDEBUG(5, "Exec CONNECT session");
420: TAILQ_REMOVE(&Sessions, sess, sess_node);
421:
422: if (sess->sess_clean) {
423: if (call.FiniSessPUB)
424: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
425: if (call.DeletePUB_subscribe)
426: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
427: if (call.WipePUB_topic)
428: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
429: }
430:
431: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
432: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
433:
434: if (store->st_subscr.sub_topic.msg_base)
435: free(store->st_subscr.sub_topic.msg_base);
436: if (store->st_subscr.sub_value.msg_base)
437: free(store->st_subscr.sub_value.msg_base);
438:
439: io_free(store);
440: }
441:
442: if (sess->sess_will.msg)
443: free(sess->sess_will.msg);
444: if (sess->sess_will.topic)
445: free(sess->sess_will.topic);
446:
447: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
448: sess->sess_addr, sess->sess_user);
449:
450: return -3; /* reconnect client */
451: }
452:
453: int
454: cmdDISCONNECT(void *srv, int len, void *arg)
455: {
456: struct tagSession *sess = (struct tagSession*) arg;
457:
458: ioTRACE(2);
459:
460: if (!sess)
461: return -1;
462:
463: ioDEBUG(5, "Exec DISCONNECT session");
464:
465: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
466: sess->sess_addr, sess->sess_user);
467:
468: return -2; /* must terminate dispatcher */
469: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>