1: #include "global.h"
2: #include "mqttd.h"
3: #include "utils.h"
4: #include "rtlm.h"
5: #include "mqttd_calls.h"
6:
7:
8: static inline ait_val_t *
9: mkPkt(void * __restrict data, int dlen)
10: {
11: ait_val_t *p = NULL;
12:
13: if (!(p = io_allocVar())) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: }
17:
18: if (data && dlen > 0)
19: AIT_SET_BUF(p, data, dlen);
20:
21: return p;
22: }
23:
24: static inline void
25: freePkt(ait_val_t ** __restrict p)
26: {
27: if (!p)
28: return;
29:
30: io_freeVar(p);
31: }
32:
33: static void *
34: sendPacket(sched_task_t *task)
35: {
36: ait_val_t *p = TASK_ARG(task);
37: register int n, slen;
38: u_char *pos;
39:
40: if (!p || AIT_ISEMPTY(p)) {
41: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
42: return NULL;
43: }
44:
45: ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), TASK_FD(task));
46:
47: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
48: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
49: if (n == -1) {
50: ioSYSERR(0);
51: break;
52: }
53: }
54:
55: freePkt(&p);
56: return NULL;
57: }
58:
59: static int
60: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
61: {
62: regex_t re;
63: regmatch_t match;
64: ait_val_t *p = NULL;
65: struct tagSession *s = NULL;
66: struct tagStore *st_, *st = NULL;
67: char szStr[STRSIZ];
68: int ret;
69:
70: assert(sess);
71:
72: TAILQ_FOREACH(s, &Sessions, sess_node) {
73: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
74: /* check for QoS */
75: if (st->st_subscr.sub_ret >= qos) {
76: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
77: regerror(ret, &re, szStr, sizeof szStr);
78: regfree(&re);
79: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
80: st->st_subscr.sub_topic.msg_base, szStr);
81: }
82: if (!regexec(&re, topic, 1, &match, 0)) {
83: /* MATCH */
84: p = mkPkt(sess->sess_buf->msg_base, datlen);
85: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
86: }
87:
88: regfree(&re);
89: }
90: }
91: }
92:
93: return 0;
94: }
95:
96: /* --------------------------------------------------- */
97:
98: int
99: pubWill(struct tagSession * __restrict sess)
100: {
101: int datlen;
102:
103: ioTRACE(2);
104:
105: /* prepare will packet */
106: datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0,
107: sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
108: if (datlen == -1)
109: return -1; /* error */
110:
111: return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
112: }
113:
114: static int
115: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
116: {
117: return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
118: }
119:
120: static int
121: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
122: {
123: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
124:
125: /* write topic to database */
126: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
127: sess->sess_addr, hdr->mqtt_msg.retain);
128: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
129: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
130: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
131:
132: search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
133:
134: /* delete not retain message */
135: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
136: sess->sess_addr, 0);
137: return 0;
138: }
139:
140: static int
141: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
142: {
143: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
144:
145: /* write topic to database */
146: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
147: sess->sess_addr, hdr->mqtt_msg.retain);
148: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
149: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
150: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
151:
152: return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
153: }
154:
155:
156: int
157: cmdPUBLISH(void *srv, int len, void *arg)
158: {
159: struct mqtthdr *hdr;
160: struct tagSession *sess = (struct tagSession*) arg;
161: char szTopic[STRSIZ] = { 0 };
162: int siz = 0;
163: u_short mid = 0;
164: ait_val_t *p = NULL;
165:
166: ioTRACE(2);
167:
168: if (!sess)
169: return -1;
170:
171: ioDEBUG(5, "Exec PUBLISH session");
172: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
173: if (siz == -1) {
174: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
175: return 0;
176: }
177:
178: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
179: switch (hdr->mqtt_msg.qos) {
180: case MQTT_QOS_ACK:
181: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
182: return 0;
183: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
184: if (siz == -1) {
185: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
186: mqtt_GetErrno(), mqtt_GetError());
187: return 0;
188: }
189: break;
190: case MQTT_QOS_EXACTLY:
191: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
192: return 0;
193: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
194: if (siz == -1) {
195: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
196: mqtt_GetErrno(), mqtt_GetError());
197: return 0;
198: }
199: break;
200: case MQTT_QOS_ONCE:
201: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
202: default:
203: return 0;
204: }
205:
206: p = mkPkt(sess->sess_buf->msg_base, siz);
207: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
208: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
209: return 0;
210: }
211:
212: int
213: cmdPUBREL(void *srv, int len, void *arg)
214: {
215: struct tagSession *sess = (struct tagSession*) arg;
216: int siz = 0;
217: u_short mid = 0;
218: ait_val_t *p = NULL;
219:
220: ioTRACE(2);
221:
222: if (!sess)
223: return -1;
224:
225: ioDEBUG(5, "Exec PUBREL session");
226: mid = mqtt_readPUBREL(sess->sess_buf);
227: if (mid == (u_short) -1) {
228: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
229: return 0;
230: }
231:
232: /* delete not retain message */
233: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user,
234: sess->sess_addr, 0);
235:
236: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
237: if (siz == -1) {
238: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
239: return 0;
240: }
241:
242: p = mkPkt(sess->sess_buf->msg_base, siz);
243: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
244: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
245: return 0;
246: }
247:
248: int
249: cmdSUBSCRIBE(void *srv, int len, void *arg)
250: {
251: struct tagSession *sess = (struct tagSession*) arg;
252: mqtt_subscr_t *subs = NULL;
253: int siz = 0;
254: u_short mid = 0;
255: register int i;
256: struct tagStore *store;
257: char buf[BUFSIZ];
258: void *ptr;
259: ait_val_t *p = NULL;
260:
261: ioTRACE(2);
262:
263: if (!sess)
264: return -1;
265:
266: ioDEBUG(5, "Exec SUBSCRIBE session");
267: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
268: if (siz == -1) {
269: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
270: return 0;
271: }
272:
273: /* add to db */
274: for (i = 0; i < siz; i++) {
275: store = io_malloc(sizeof(struct tagStore));
276: if (!store) {
277: ioSYSERR(0);
278: continue;
279: } else {
280: store->st_msgid = mid;
281: mqtt_subCopy(&store->st_subscr, &subs[i]);
282: subs[i].sub_ret = MQTT_QOS_DENY;
283: }
284:
285: /* add to cache */
286: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
287:
288: /* convert topic to regexp */
289: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
290: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
291: } else {
292: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
293: if (!ptr) {
294: ioSYSERR(0);
295: continue;
296: } else {
297: store->st_subscr.sub_topic.msg_base = ptr;
298: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
299: memcpy(store->st_subscr.sub_topic.msg_base, buf,
300: store->st_subscr.sub_topic.msg_len);
301: }
302:
303: /* store to db */
304: call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
305: sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
306: /* subscribe pass */
307: subs[i].sub_ret = MQTT_QOS_PASS;
308:
309: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid,
310: store->st_subscr.sub_topic.msg_base,
311: store->st_subscr.sub_topic.msg_len,
312: store->st_subscr.sub_ret, sess->sess_addr);
313: }
314: }
315:
316: /* send acknowledge */
317: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
318: if (siz == -1) {
319: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
320: goto end;
321: } else {
322: p = mkPkt(sess->sess_buf->msg_base, siz);
323: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
324: }
325:
326: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
327: end:
328: mqtt_subFree(&subs);
329: return 0;
330: }
331:
332: int
333: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
334: {
335: struct tagSession *sess = (struct tagSession*) arg;
336: mqtt_subscr_t *subs = NULL;
337: int siz = 0;
338: u_short mid = 0;
339: register int i;
340: struct tagStore *store, *tmp;
341: ait_val_t *p = NULL;
342:
343: ioTRACE(2);
344:
345: if (!sess)
346: return -1;
347:
348: ioDEBUG(5, "Exec UNSUBSCRIBE session");
349: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
350: if (siz == -1) {
351: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
352: return 0;
353: }
354:
355: /* del from db */
356: for (i = 0; i < siz; i++) {
357: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
358: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
359: store->st_subscr.sub_topic.msg_base &&
360: !strcmp(store->st_subscr.sub_topic.msg_base,
361: subs[i].sub_topic.msg_base)) {
362: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
363:
364: if (store->st_subscr.sub_topic.msg_base)
365: free(store->st_subscr.sub_topic.msg_base);
366: if (store->st_subscr.sub_value.msg_base)
367: free(store->st_subscr.sub_value.msg_base);
368: io_free(store);
369: }
370: }
371:
372: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
373: sess->sess_user, "%");
374: }
375:
376: /* send acknowledge */
377: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
378: if (siz == -1) {
379: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
380: goto end;
381: } else {
382: p = mkPkt(sess->sess_buf->msg_base, siz);
383: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
384: }
385:
386: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
387: end:
388: mqtt_subFree(&subs);
389: return 0;
390: }
391:
392: int
393: cmdPINGREQ(void *srv, int len, void *arg)
394: {
395: struct tagSession *sess = (struct tagSession*) arg;
396: int siz = 0;
397: ait_val_t *p = NULL;
398:
399: ioTRACE(2);
400:
401: if (!sess)
402: return -1;
403:
404: ioDEBUG(5, "Exec PINGREQ session");
405: siz = mqtt_msgPINGRESP(sess->sess_buf);
406: if (siz == -1) {
407: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
408: return 0;
409: } else {
410: p = mkPkt(sess->sess_buf->msg_base, siz);
411: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
412: }
413:
414: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
415: return 0;
416: }
417:
418: int
419: cmdCONNECT(void *srv, int len, void *arg)
420: {
421: struct tagStore *store;
422: struct tagSession *sess = (struct tagSession*) arg;
423:
424: ioTRACE(2);
425:
426: if (!sess)
427: return -1;
428:
429: ioDEBUG(5, "Exec CONNECT session");
430: TAILQ_REMOVE(&Sessions, sess, sess_node);
431:
432: if (sess->sess_clean) {
433: if (call.FiniSessPUB)
434: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
435: if (call.DeletePUB_subscribe)
436: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
437: if (call.WipePUB_topic)
438: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
439: }
440:
441: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
442: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
443:
444: if (store->st_subscr.sub_topic.msg_base)
445: free(store->st_subscr.sub_topic.msg_base);
446: if (store->st_subscr.sub_value.msg_base)
447: free(store->st_subscr.sub_value.msg_base);
448:
449: io_free(store);
450: }
451:
452: if (sess->sess_will.flag)
453: pubWill(sess);
454:
455: if (sess->sess_will.msg)
456: free(sess->sess_will.msg);
457: if (sess->sess_will.topic)
458: free(sess->sess_will.topic);
459:
460: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
461: sess->sess_addr, sess->sess_user);
462:
463: return -3; /* reconnect client */
464: }
465:
466: int
467: cmdDISCONNECT(void *srv, int len, void *arg)
468: {
469: struct tagSession *sess = (struct tagSession*) arg;
470:
471: ioTRACE(2);
472:
473: if (!sess)
474: return -1;
475:
476: ioDEBUG(5, "Exec DISCONNECT session");
477:
478: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
479: sess->sess_addr, sess->sess_user);
480:
481: return -2; /* must terminate dispatcher */
482: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>