1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: static inline struct tagPkt *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: struct tagPkt *p = NULL;
11:
12: p = io_malloc(sizeof(struct tagPkt));
13: if (!p) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: } else
17: memset(p, 0, sizeof(struct tagPkt));
18:
19: p->pkt_data = io_allocVar();
20: if (!p->pkt_data) {
21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
22: io_free(p);
23: return NULL;
24: }
25:
26: if (data && dlen > 0)
27: AIT_SET_BUF(p->pkt_data, data, dlen);
28:
29: return p;
30: }
31:
32: static inline void
33: freePkt(struct tagPkt * __restrict p)
34: {
35: if (!p)
36: return;
37:
38: io_freeVar(&p->pkt_data);
39: io_free(p);
40: }
41:
42: static void *
43: sendPacket(sched_task_t *task)
44: {
45: struct tagPkt *p = TASK_ARG(task);
46: register int n, slen;
47: u_char *pos;
48:
49: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
50: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
51: return NULL;
52: }
53:
54: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
55: slen -= n, pos += n) {
56: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
57: if (n == -1) {
58: ioSYSERR(0);
59: break;
60: }
61: }
62:
63: freePkt(p);
64: return NULL;
65: }
66:
67: /* --------------------------------------------------- */
68:
69: static int
70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
71: {
72: struct tagPkt *p = NULL;
73: struct tagSession *s = NULL;
74: struct tagStore *st = NULL;
75: regex_t re;
76: regmatch_t match;
77: int ret;
78: char szStr[STRSIZ];
79:
80: TAILQ_FOREACH(s, &Sessions, sess_node) {
81: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
82: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
83: regerror(ret, &re, szStr, sizeof szStr);
84: regfree(&re);
85: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
86: st->st_subscr.sub_topic.msg_base, szStr);
87: }
88: if (!regexec(&re, psTopic, 1, &match, 0)) {
89: /* MATCH */
90: ioDEBUG(1, "+++ dlen=%d\n", datlen);
91: p = mkPkt(sess->sess_buf->msg_base, datlen);
92: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
93: }
94:
95: regfree(&re);
96: }
97: }
98:
99: return 0;
100: }
101:
102: static int
103: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
104: {
105: return 0;
106: }
107:
108: static int
109: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
110: {
111: return 0;
112: }
113:
114:
115: int
116: cmdPUBLISH(void *srv, int len, void *arg)
117: {
118: struct mqtthdr *hdr;
119: struct tagSession *sess = (struct tagSession*) arg;
120: char szTopic[STRSIZ] = { 0 };
121: int siz = 0;
122: u_short mid = 0;
123: struct tagPkt *p = NULL;
124:
125: ioTRACE(2);
126:
127: if (!sess)
128: return -1;
129:
130: ioDEBUG(5, "Exec PUBLISH session");
131: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
132: if (siz == -1) {
133: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
134: return 0;
135: }
136:
137: /* duplicate packet for retransmit to subscribers */
138: /*
139: pubpkt = mqtt_msgDup(sess->sess_buf);
140: if (!pubpkt) {
141: ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
142: return 0;
143: } else
144: */
145:
146: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
147: switch (hdr->mqtt_msg.qos) {
148: case MQTT_QOS_ACK:
149: pubAck(sess, szTopic, mqtt_pktLen(hdr));
150: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
151: if (siz == -1) {
152: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
153: mqtt_GetErrno(), mqtt_GetError());
154: return 0;
155: }
156: break;
157: case MQTT_QOS_EXACTLY:
158: pubExactly(sess, szTopic, mqtt_pktLen(hdr));
159: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
160: if (siz == -1) {
161: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
162: mqtt_GetErrno(), mqtt_GetError());
163: return 0;
164: }
165: break;
166: case MQTT_QOS_ONCE:
167: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
168: default:
169: return 0;
170: }
171:
172: p = mkPkt(sess->sess_buf->msg_base, siz);
173: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
174: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
175: return 0;
176: }
177:
178: int
179: cmdPUBREL(void *srv, int len, void *arg)
180: {
181: struct tagSession *sess = (struct tagSession*) arg;
182: int siz = 0;
183: u_short mid = 0;
184: struct tagPkt *p = NULL;
185:
186: ioTRACE(2);
187:
188: if (!sess)
189: return -1;
190:
191: ioDEBUG(5, "Exec PUBREL session");
192: mid = mqtt_readPUBREL(sess->sess_buf);
193: if (mid == (u_short) -1) {
194: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
195: return 0;
196: }
197:
198: // TODO:: Delete from database topic
199:
200: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
201: if (siz == -1) {
202: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
203: return 0;
204: } else {
205: p = mkPkt(sess->sess_buf->msg_base, siz);
206: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
207: }
208:
209: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
210: return 0;
211: }
212:
213: int
214: cmdSUBSCRIBE(void *srv, int len, void *arg)
215: {
216: struct tagSession *sess = (struct tagSession*) arg;
217: mqtt_subscr_t *subs = NULL;
218: int siz = 0;
219: u_short mid = 0;
220: register int i;
221: struct tagStore *store;
222: char buf[BUFSIZ];
223: void *ptr;
224: struct tagPkt *p = NULL;
225:
226: ioTRACE(2);
227:
228: if (!sess)
229: return -1;
230:
231: ioDEBUG(5, "Exec SUBSCRIBE session");
232: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
233: if (siz == -1) {
234: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
235: return 0;
236: }
237:
238: /* add to db */
239: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
240: /* convert topic to sql search statement */
241: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
242: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
243: continue;
244: }
245: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
246: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
247: store = io_malloc(sizeof(struct tagStore));
248: if (!store) {
249: ioSYSERR(0);
250: continue;
251: } else {
252: store->st_msgid = mid;
253: mqtt_subCopy(&store->st_subscr, &subs[i]);
254: }
255:
256: /* add to cache */
257: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
258:
259: /* convert topic to regexp */
260: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
261: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
262:
263: subs[i].sub_ret = MQTT_QOS_DENY;
264: } else {
265: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
266: if (!ptr)
267: ioSYSERR(0);
268: else {
269: store->st_subscr.sub_topic.msg_base = ptr;
270: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
271: memcpy(store->st_subscr.sub_topic.msg_base, buf,
272: store->st_subscr.sub_topic.msg_len);
273: }
274:
275: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
276: store->st_subscr.sub_topic.msg_base,
277: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
278:
279: subs[i].sub_ret = MQTT_QOS_PASS;
280: }
281: }
282: }
283:
284: /* send acknowledge */
285: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
286: if (siz == -1) {
287: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
288: goto end;
289: } else {
290: p = mkPkt(sess->sess_buf->msg_base, siz);
291: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
292: }
293:
294: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
295: end:
296: mqtt_subFree(&subs);
297: return 0;
298: }
299:
300: int
301: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
302: {
303: struct tagSession *sess = (struct tagSession*) arg;
304: mqtt_subscr_t *subs = NULL;
305: int siz = 0;
306: u_short mid = 0;
307: register int i;
308: struct tagStore *store, *tmp;
309: struct tagPkt *p = NULL;
310:
311: ioTRACE(2);
312:
313: if (!sess)
314: return -1;
315:
316: ioDEBUG(5, "Exec UNSUBSCRIBE session");
317: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
318: if (siz == -1) {
319: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
320: return 0;
321: }
322:
323: /* del from db */
324: for (i = 0; i < siz; i++) {
325: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
326: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
327: store->st_subscr.sub_topic.msg_base &&
328: !strcmp(store->st_subscr.sub_topic.msg_base,
329: subs[i].sub_topic.msg_base)) {
330: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
331:
332: if (store->st_subscr.sub_topic.msg_base)
333: free(store->st_subscr.sub_topic.msg_base);
334: if (store->st_subscr.sub_value.msg_base)
335: free(store->st_subscr.sub_value.msg_base);
336: io_free(store);
337: }
338: }
339:
340: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
341: sess->sess_user, "%");
342: }
343:
344: /* send acknowledge */
345: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
346: if (siz == -1) {
347: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
348: goto end;
349: } else {
350: p = mkPkt(sess->sess_buf->msg_base, siz);
351: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
352: }
353:
354: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
355: end:
356: mqtt_subFree(&subs);
357: return 0;
358: }
359:
360: int
361: cmdPINGREQ(void *srv, int len, void *arg)
362: {
363: struct tagSession *sess = (struct tagSession*) arg;
364: int siz = 0;
365: struct tagPkt *p = NULL;
366:
367: ioTRACE(2);
368:
369: if (!sess)
370: return -1;
371:
372: ioDEBUG(5, "Exec PINGREQ session");
373: siz = mqtt_msgPINGRESP(sess->sess_buf);
374: if (siz == -1) {
375: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
376: return 0;
377: } else {
378: p = mkPkt(sess->sess_buf->msg_base, siz);
379: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
380: }
381:
382: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
383: return 0;
384: }
385:
386: int
387: cmdCONNECT(void *srv, int len, void *arg)
388: {
389: struct tagStore *store;
390: struct tagPkt *p;
391: struct tagSession *sess = (struct tagSession*) arg;
392:
393: ioTRACE(2);
394:
395: if (!sess)
396: return -1;
397:
398: ioDEBUG(5, "Exec CONNECT session");
399: TAILQ_REMOVE(&Sessions, sess, sess_node);
400:
401: if (sess->sess_clean) {
402: if (call.FiniSessPUB)
403: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
404: if (call.DeletePUB_subscribe)
405: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
406: if (call.WipePUB_topic)
407: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
408: }
409:
410: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
411: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
412:
413: if (store->st_subscr.sub_topic.msg_base)
414: free(store->st_subscr.sub_topic.msg_base);
415: if (store->st_subscr.sub_value.msg_base)
416: free(store->st_subscr.sub_value.msg_base);
417:
418: io_free(store);
419: }
420:
421: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
422: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
423:
424: io_freeVar(&p->pkt_data);
425: io_free(p);
426: }
427:
428: if (sess->sess_will.msg)
429: free(sess->sess_will.msg);
430: if (sess->sess_will.topic)
431: free(sess->sess_will.topic);
432:
433: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
434: sess->sess_addr, sess->sess_user);
435:
436: return -3; /* reconnect client */
437: }
438:
439: int
440: cmdDISCONNECT(void *srv, int len, void *arg)
441: {
442: struct tagSession *sess = (struct tagSession*) arg;
443:
444: ioTRACE(2);
445:
446: if (!sess)
447: return -1;
448:
449: ioDEBUG(5, "Exec DISCONNECT session");
450:
451: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
452: sess->sess_addr, sess->sess_user);
453:
454: return -2; /* must terminate dispatcher */
455: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>