1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline struct tagPkt *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: struct tagPkt *p = NULL;
11:
12: p = io_malloc(sizeof(struct tagPkt));
13: if (!p) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: } else
17: memset(p, 0, sizeof(struct tagPkt));
18:
19: p->pkt_data = io_allocVar();
20: if (!p->pkt_data) {
21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
22: io_free(p);
23: return NULL;
24: }
25:
26: if (data && dlen > 0)
27: AIT_SET_BUF(p->pkt_data, data, dlen);
28:
29: return p;
30: }
31:
32: static inline void
33: freePkt(struct tagPkt * __restrict p)
34: {
35: if (!p)
36: return;
37:
38: io_freeVar(&p->pkt_data);
39: io_free(p);
40: }
41:
42: static void *
43: sendPacket(sched_task_t *task)
44: {
45: struct tagPkt *p = TASK_ARG(task);
46: register int n, slen;
47: u_char *pos;
48:
49: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
50: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
51: return NULL;
52: }
53:
54: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
55: slen -= n, pos += n) {
56: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
57: if (n == -1) {
58: ioSYSERR(0);
59: break;
60: }
61: }
62:
63: freePkt(p);
64: return NULL;
65: }
66:
67: /* --------------------------------------------------- */
68:
69: static int
70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
71: {
72: struct tagPkt *p = NULL;
73: struct tagSession *s = NULL;
74: struct tagStore *st = NULL;
75: regex_t re;
76: regmatch_t match;
77: int ret;
78: char szStr[STRSIZ];
79:
80: TAILQ_FOREACH(s, &Sessions, sess_node) {
81: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
82: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
83: regerror(ret, &re, szStr, sizeof szStr);
84: regfree(&re);
85: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
86: st->st_subscr.sub_topic.msg_base, szStr);
87: }
88: if (!regexec(&re, psTopic, 1, &match, 0)) {
89: /* MATCH */
90: p = mkPkt(sess->sess_buf->msg_base, datlen);
91: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
92: }
93:
94: regfree(&re);
95: }
96: }
97:
98: return 0;
99: }
100:
101: static int
102: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
103: {
104: return 0;
105: }
106:
107: static int
108: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
109: {
110: return 0;
111: }
112:
113:
114: int
115: cmdPUBLISH(void *srv, int len, void *arg)
116: {
117: struct mqtthdr *hdr;
118: struct tagSession *sess = (struct tagSession*) arg;
119: char szTopic[STRSIZ] = { 0 };
120: int siz = 0;
121: u_short mid = 0;
122: struct tagPkt *p = NULL;
123:
124: ioTRACE(2);
125:
126: if (!sess)
127: return -1;
128:
129: ioDEBUG(5, "Exec PUBLISH session");
130: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
131: if (siz == -1) {
132: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
133: return 0;
134: }
135:
136: /* duplicate packet for retransmit to subscribers */
137: /*
138: pubpkt = mqtt_msgDup(sess->sess_buf);
139: if (!pubpkt) {
140: ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
141: return 0;
142: } else
143: */
144:
145: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
146: switch (hdr->mqtt_msg.qos) {
147: case MQTT_QOS_ACK:
148: pubAck(sess, szTopic, siz);
149: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
150: if (siz == -1) {
151: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
152: mqtt_GetErrno(), mqtt_GetError());
153: return 0;
154: }
155: break;
156: case MQTT_QOS_EXACTLY:
157: pubExactly(sess, szTopic, siz);
158: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
159: if (siz == -1) {
160: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
161: mqtt_GetErrno(), mqtt_GetError());
162: return 0;
163: }
164: break;
165: case MQTT_QOS_ONCE:
166: pubOnce(sess, szTopic, siz);
167: default:
168: return 0;
169: }
170:
171: p = mkPkt(sess->sess_buf->msg_base, siz);
172: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
173: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
174: return 0;
175: }
176:
177: int
178: cmdPUBREL(void *srv, int len, void *arg)
179: {
180: struct tagSession *sess = (struct tagSession*) arg;
181: int siz = 0;
182: u_short mid = 0;
183: struct tagPkt *p = NULL;
184:
185: ioTRACE(2);
186:
187: if (!sess)
188: return -1;
189:
190: ioDEBUG(5, "Exec PUBREL session");
191: mid = mqtt_readPUBREL(sess->sess_buf);
192: if (mid == (u_short) -1) {
193: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
194: return 0;
195: }
196:
197: // TODO:: Delete from database topic
198:
199: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
200: if (siz == -1) {
201: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
202: return 0;
203: } else {
204: p = mkPkt(sess->sess_buf->msg_base, siz);
205: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
206: }
207:
208: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
209: return 0;
210: }
211:
212: int
213: cmdSUBSCRIBE(void *srv, int len, void *arg)
214: {
215: struct tagSession *sess = (struct tagSession*) arg;
216: mqtt_subscr_t *subs = NULL;
217: int siz = 0;
218: u_short mid = 0;
219: register int i;
220: struct tagStore *store;
221: char buf[BUFSIZ];
222: void *ptr;
223: struct tagPkt *p = NULL;
224:
225: ioTRACE(2);
226:
227: if (!sess)
228: return -1;
229:
230: ioDEBUG(5, "Exec SUBSCRIBE session");
231: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
232: if (siz == -1) {
233: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
234: return 0;
235: }
236:
237: /* add to db */
238: for (i = 0; i < siz; i++) {
239: /* convert topic to sql search statement */
240: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
241: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
242: goto end;
243: }
244: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
245: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
246: store = io_malloc(sizeof(struct tagStore));
247: if (!store) {
248: ioSYSERR(0);
249: goto end;
250: } else {
251: store->st_msgid = mid;
252: mqtt_subCopy(&store->st_subscr, &subs[i]);
253: }
254:
255: /* add to cache */
256: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
257:
258: /* convert topic to regexp */
259: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
260: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
261: goto end;
262: } else {
263: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
264: if (!ptr) {
265: ioSYSERR(0);
266: goto end;
267: } else {
268: store->st_subscr.sub_topic.msg_base = ptr;
269: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
270: memcpy(store->st_subscr.sub_topic.msg_base, buf,
271: store->st_subscr.sub_topic.msg_len);
272: }
273: }
274:
275: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
276: store->st_subscr.sub_topic.msg_base,
277: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
278:
279: subs[i].sub_ret = MQTT_QOS_PASS;
280: } else
281: subs[i].sub_ret = MQTT_QOS_DENY;
282: }
283:
284: /* send acknowledge */
285: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
286: if (siz == -1) {
287: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
288: goto end;
289: } else {
290: p = mkPkt(sess->sess_buf->msg_base, siz);
291: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
292: }
293:
294: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
295: end:
296: mqtt_subFree(&subs);
297: return 0;
298: }
299:
300: int
301: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
302: {
303: struct tagSession *sess = (struct tagSession*) arg;
304: mqtt_subscr_t *subs = NULL;
305: int siz = 0;
306: u_short mid = 0;
307: register int i;
308: struct tagStore *store, *tmp;
309: struct tagPkt *p = NULL;
310:
311: ioTRACE(2);
312:
313: if (!sess)
314: return -1;
315:
316: ioDEBUG(5, "Exec UNSUBSCRIBE session");
317: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
318: if (siz == -1) {
319: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
320: return 0;
321: }
322:
323: /* del from db */
324: for (i = 0; i < siz; i++) {
325: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
326: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
327: store->st_subscr.sub_topic.msg_base &&
328: !strcmp(store->st_subscr.sub_topic.msg_base,
329: subs[i].sub_topic.msg_base)) {
330: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
331:
332: if (store->st_subscr.sub_topic.msg_base)
333: free(store->st_subscr.sub_topic.msg_base);
334: if (store->st_subscr.sub_value.msg_base)
335: free(store->st_subscr.sub_value.msg_base);
336: io_free(store);
337: }
338: }
339:
340: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
341: sess->sess_user, "%");
342: }
343:
344: /* send acknowledge */
345: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
346: if (siz == -1) {
347: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
348: goto end;
349: } else {
350: p = mkPkt(sess->sess_buf->msg_base, siz);
351: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
352: }
353:
354: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
355: end:
356: mqtt_subFree(&subs);
357: return 0;
358: }
359:
360: int
361: cmdPINGREQ(void *srv, int len, void *arg)
362: {
363: struct tagSession *sess = (struct tagSession*) arg;
364: int siz = 0;
365: struct tagPkt *p = NULL;
366:
367: ioTRACE(2);
368:
369: if (!sess)
370: return -1;
371:
372: ioDEBUG(5, "Exec PINGREQ session");
373: siz = mqtt_msgPINGRESP(sess->sess_buf);
374: if (siz == -1) {
375: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
376: return 0;
377: } else {
378: p = mkPkt(sess->sess_buf->msg_base, siz);
379: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
380: }
381:
382: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
383: return 0;
384: }
385:
386: int
387: cmdCONNECT(void *srv, int len, void *arg)
388: {
389: struct tagStore *store;
390: struct tagPkt *p;
391: struct tagSession *sess = (struct tagSession*) arg;
392:
393: ioTRACE(2);
394:
395: if (!sess)
396: return -1;
397:
398: ioDEBUG(5, "Exec CONNECT session");
399: TAILQ_REMOVE(&Sessions, sess, sess_node);
400:
401: if (sess->sess_clean) {
402: if (call.FiniSessPUB)
403: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
404: if (call.DeletePUB_subscribe)
405: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
406: if (call.WipePUB_topic)
407: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
408: }
409:
410: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
411: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
412:
413: if (store->st_subscr.sub_topic.msg_base)
414: free(store->st_subscr.sub_topic.msg_base);
415: if (store->st_subscr.sub_value.msg_base)
416: free(store->st_subscr.sub_value.msg_base);
417:
418: io_free(store);
419: }
420:
421: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
422: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
423:
424: io_freeVar(&p->pkt_data);
425: io_free(p);
426: }
427:
428: if (sess->sess_will.msg)
429: free(sess->sess_will.msg);
430: if (sess->sess_will.topic)
431: free(sess->sess_will.topic);
432:
433: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
434: sess->sess_addr, sess->sess_user);
435:
436: return -3; /* reconnect client */
437: }
438:
439: int
440: cmdDISCONNECT(void *srv, int len, void *arg)
441: {
442: struct tagSession *sess = (struct tagSession*) arg;
443:
444: ioTRACE(2);
445:
446: if (!sess)
447: return -1;
448:
449: ioDEBUG(5, "Exec DISCONNECT session");
450:
451: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
452: sess->sess_addr, sess->sess_user);
453:
454: return -2; /* must terminate dispatcher */
455: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>