1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline struct tagPkt *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: struct tagPkt *p = NULL;
11:
12: p = io_malloc(sizeof(struct tagPkt));
13: if (!p) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: } else
17: memset(p, 0, sizeof(struct tagPkt));
18:
19: p->pkt_data = io_allocVar();
20: if (!p->pkt_data) {
21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
22: io_free(p);
23: return NULL;
24: }
25:
26: if (data && dlen > 0)
27: AIT_SET_BUF(p->pkt_data, data, dlen);
28:
29: return p;
30: }
31:
32: static inline void
33: freePkt(struct tagPkt * __restrict p)
34: {
35: if (!p)
36: return;
37:
38: io_freeVar(&p->pkt_data);
39: io_free(p);
40: }
41:
42: static void *
43: sendPacket(sched_task_t *task)
44: {
45: struct tagPkt *p = TASK_ARG(task);
46: register int n, slen;
47: u_char *pos;
48:
49: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
50: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
51: return NULL;
52: }
53:
54: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
55: slen -= n, pos += n) {
56: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
57: if (n == -1) {
58: ioSYSERR(0);
59: break;
60: }
61: }
62:
63: freePkt(p);
64: return NULL;
65: }
66:
67: /* --------------------------------------------------- */
68:
69: static int
70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
71: {
72: struct tagPkt *p = NULL;
73: struct tagSession *s = NULL;
74: struct tagStore *st = NULL;
75: regex_t re;
76: regmatch_t match;
77: int ret;
78: char szStr[STRSIZ];
79:
80: TAILQ_FOREACH(s, &Sessions, sess_node) {
81: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
82: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
83: regerror(ret, &re, szStr, sizeof szStr);
84: regfree(&re);
85: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
86: st->st_subscr.sub_topic.msg_base, szStr);
87: }
88: if (!regexec(&re, psTopic, 1, &match, 0)) {
89: /* MATCH */
90: ioDEBUG(1, "+++ dlen=%d\n", datlen);
91: p = mkPkt(sess->sess_buf->msg_base, datlen);
92: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
93: }
94:
95: regfree(&re);
96: }
97: }
98:
99: return 0;
100: }
101:
102: static int
103: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
104: {
105: return 0;
106: }
107:
108: static int
109: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
110: {
111: return 0;
112: }
113:
114:
115: int
116: cmdPUBLISH(void *srv, int len, void *arg)
117: {
118: struct mqtthdr *hdr;
119: struct tagSession *sess = (struct tagSession*) arg;
120: char szTopic[STRSIZ] = { 0 };
121: int siz = 0;
122: u_short mid = 0;
123: struct tagPkt *p = NULL;
124:
125: ioTRACE(2);
126:
127: if (!sess)
128: return -1;
129:
130: ioDEBUG(5, "Exec PUBLISH session");
131: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
132: if (siz == -1) {
133: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
134: return 0;
135: }
136:
137: /* duplicate packet for retransmit to subscribers */
138: /*
139: pubpkt = mqtt_msgDup(sess->sess_buf);
140: if (!pubpkt) {
141: ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
142: return 0;
143: } else
144: */
145:
146: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
147: switch (hdr->mqtt_msg.qos) {
148: case MQTT_QOS_ACK:
149: pubAck(sess, szTopic, mqtt_pktLen(hdr));
150: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
151: if (siz == -1) {
152: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
153: mqtt_GetErrno(), mqtt_GetError());
154: return 0;
155: }
156: break;
157: case MQTT_QOS_EXACTLY:
158: pubExactly(sess, szTopic, mqtt_pktLen(hdr));
159: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
160: if (siz == -1) {
161: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
162: mqtt_GetErrno(), mqtt_GetError());
163: return 0;
164: }
165: break;
166: case MQTT_QOS_ONCE:
167: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
168: default:
169: return 0;
170: }
171:
172: p = mkPkt(sess->sess_buf->msg_base, siz);
173: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
174: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
175: return 0;
176: }
177:
178: int
179: cmdPUBREL(void *srv, int len, void *arg)
180: {
181: struct tagSession *sess = (struct tagSession*) arg;
182: int siz = 0;
183: u_short mid = 0;
184: struct tagPkt *p = NULL;
185:
186: ioTRACE(2);
187:
188: if (!sess)
189: return -1;
190:
191: ioDEBUG(5, "Exec PUBREL session");
192: mid = mqtt_readPUBREL(sess->sess_buf);
193: if (mid == (u_short) -1) {
194: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
195: return 0;
196: }
197:
198: // TODO:: Delete from database topic
199:
200: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
201: if (siz == -1) {
202: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
203: return 0;
204: } else {
205: p = mkPkt(sess->sess_buf->msg_base, siz);
206: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
207: }
208:
209: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
210: return 0;
211: }
212:
213: int
214: cmdSUBSCRIBE(void *srv, int len, void *arg)
215: {
216: struct tagSession *sess = (struct tagSession*) arg;
217: mqtt_subscr_t *subs = NULL;
218: int siz = 0;
219: u_short mid = 0;
220: register int i;
221: struct tagStore *store;
222: char buf[BUFSIZ];
223: void *ptr;
224: struct tagPkt *p = NULL;
225:
226: ioTRACE(2);
227:
228: if (!sess)
229: return -1;
230:
231: ioDEBUG(5, "Exec SUBSCRIBE session");
232: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
233: if (siz == -1) {
234: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
235: return 0;
236: }
237:
238: /* add to db */
239: for (i = 0; i < siz; i++) {
240: /* convert topic to sql search statement */
241: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
242: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
243: goto end;
244: }
245: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
246: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
247: store = io_malloc(sizeof(struct tagStore));
248: if (!store) {
249: ioSYSERR(0);
250: goto end;
251: } else {
252: store->st_msgid = mid;
253: mqtt_subCopy(&store->st_subscr, &subs[i]);
254: }
255:
256: /* add to cache */
257: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
258:
259: /* convert topic to regexp */
260: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
261: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
262: goto end;
263: } else {
264: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
265: if (!ptr) {
266: ioSYSERR(0);
267: goto end;
268: } else {
269: store->st_subscr.sub_topic.msg_base = ptr;
270: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
271: memcpy(store->st_subscr.sub_topic.msg_base, buf,
272: store->st_subscr.sub_topic.msg_len);
273: }
274: }
275:
276: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
277: store->st_subscr.sub_topic.msg_base,
278: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
279:
280: subs[i].sub_ret = MQTT_QOS_PASS;
281: } else
282: subs[i].sub_ret = MQTT_QOS_DENY;
283: }
284:
285: /* send acknowledge */
286: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
287: if (siz == -1) {
288: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
289: goto end;
290: } else {
291: p = mkPkt(sess->sess_buf->msg_base, siz);
292: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
293: }
294:
295: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
296: end:
297: mqtt_subFree(&subs);
298: return 0;
299: }
300:
301: int
302: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
303: {
304: struct tagSession *sess = (struct tagSession*) arg;
305: mqtt_subscr_t *subs = NULL;
306: int siz = 0;
307: u_short mid = 0;
308: register int i;
309: struct tagStore *store, *tmp;
310: struct tagPkt *p = NULL;
311:
312: ioTRACE(2);
313:
314: if (!sess)
315: return -1;
316:
317: ioDEBUG(5, "Exec UNSUBSCRIBE session");
318: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
319: if (siz == -1) {
320: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
321: return 0;
322: }
323:
324: /* del from db */
325: for (i = 0; i < siz; i++) {
326: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
327: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
328: store->st_subscr.sub_topic.msg_base &&
329: !strcmp(store->st_subscr.sub_topic.msg_base,
330: subs[i].sub_topic.msg_base)) {
331: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
332:
333: if (store->st_subscr.sub_topic.msg_base)
334: free(store->st_subscr.sub_topic.msg_base);
335: if (store->st_subscr.sub_value.msg_base)
336: free(store->st_subscr.sub_value.msg_base);
337: io_free(store);
338: }
339: }
340:
341: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
342: sess->sess_user, "%");
343: }
344:
345: /* send acknowledge */
346: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
347: if (siz == -1) {
348: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
349: goto end;
350: } else {
351: p = mkPkt(sess->sess_buf->msg_base, siz);
352: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
353: }
354:
355: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
356: end:
357: mqtt_subFree(&subs);
358: return 0;
359: }
360:
361: int
362: cmdPINGREQ(void *srv, int len, void *arg)
363: {
364: struct tagSession *sess = (struct tagSession*) arg;
365: int siz = 0;
366: struct tagPkt *p = NULL;
367:
368: ioTRACE(2);
369:
370: if (!sess)
371: return -1;
372:
373: ioDEBUG(5, "Exec PINGREQ session");
374: siz = mqtt_msgPINGRESP(sess->sess_buf);
375: if (siz == -1) {
376: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
377: return 0;
378: } else {
379: p = mkPkt(sess->sess_buf->msg_base, siz);
380: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
381: }
382:
383: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
384: return 0;
385: }
386:
387: int
388: cmdCONNECT(void *srv, int len, void *arg)
389: {
390: struct tagStore *store;
391: struct tagPkt *p;
392: struct tagSession *sess = (struct tagSession*) arg;
393:
394: ioTRACE(2);
395:
396: if (!sess)
397: return -1;
398:
399: ioDEBUG(5, "Exec CONNECT session");
400: TAILQ_REMOVE(&Sessions, sess, sess_node);
401:
402: if (sess->sess_clean) {
403: if (call.FiniSessPUB)
404: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
405: if (call.DeletePUB_subscribe)
406: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
407: if (call.WipePUB_topic)
408: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
409: }
410:
411: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
412: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
413:
414: if (store->st_subscr.sub_topic.msg_base)
415: free(store->st_subscr.sub_topic.msg_base);
416: if (store->st_subscr.sub_value.msg_base)
417: free(store->st_subscr.sub_value.msg_base);
418:
419: io_free(store);
420: }
421:
422: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
423: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
424:
425: io_freeVar(&p->pkt_data);
426: io_free(p);
427: }
428:
429: if (sess->sess_will.msg)
430: free(sess->sess_will.msg);
431: if (sess->sess_will.topic)
432: free(sess->sess_will.topic);
433:
434: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
435: sess->sess_addr, sess->sess_user);
436:
437: return -3; /* reconnect client */
438: }
439:
440: int
441: cmdDISCONNECT(void *srv, int len, void *arg)
442: {
443: struct tagSession *sess = (struct tagSession*) arg;
444:
445: ioTRACE(2);
446:
447: if (!sess)
448: return -1;
449:
450: ioDEBUG(5, "Exec DISCONNECT session");
451:
452: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
453: sess->sess_addr, sess->sess_user);
454:
455: return -2; /* must terminate dispatcher */
456: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>