Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.23
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
1.2.2.19 misho 7: static inline struct tagPkt *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: struct tagPkt *p = NULL;
11:
12: p = io_malloc(sizeof(struct tagPkt));
13: if (!p) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: } else
17: memset(p, 0, sizeof(struct tagPkt));
18:
19: p->pkt_data = io_allocVar();
20: if (!p->pkt_data) {
21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
22: io_free(p);
23: return NULL;
24: }
25:
26: if (data && dlen > 0)
27: AIT_SET_BUF(p->pkt_data, data, dlen);
28:
29: return p;
30: }
31:
32: static inline void
1.2.2.20 misho 33: freePkt(struct tagPkt * __restrict p)
1.2.2.19 misho 34: {
1.2.2.20 misho 35: if (!p)
1.2.2.19 misho 36: return;
37:
1.2.2.20 misho 38: io_freeVar(&p->pkt_data);
39: io_free(p);
1.2.2.19 misho 40: }
41:
42: static void *
43: sendPacket(sched_task_t *task)
44: {
45: struct tagPkt *p = TASK_ARG(task);
46: register int n, slen;
47: u_char *pos;
48:
49: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
50: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
51: return NULL;
52: }
53:
54: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
55: slen -= n, pos += n) {
56: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
57: if (n == -1) {
58: ioSYSERR(0);
59: break;
60: }
61: }
62:
1.2.2.20 misho 63: freePkt(p);
1.2.2.19 misho 64: return NULL;
65: }
66:
1.2.2.22 misho 67: /* --------------------------------------------------- */
1.2.2.19 misho 68:
1.2.2.17 misho 69: static int
1.2.2.22 misho 70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 71: {
1.2.2.22 misho 72: struct tagPkt *p = NULL;
73: struct tagSession *s = NULL;
74: struct tagStore *st = NULL;
75: regex_t re;
76: regmatch_t match;
77: int ret;
78: char szStr[STRSIZ];
79:
80: TAILQ_FOREACH(s, &Sessions, sess_node) {
81: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
82: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
83: regerror(ret, &re, szStr, sizeof szStr);
84: regfree(&re);
85: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
86: st->st_subscr.sub_topic.msg_base, szStr);
87: }
88: if (!regexec(&re, psTopic, 1, &match, 0)) {
89: /* MATCH */
1.2.2.23! misho 90: ioDEBUG(1, "+++ dlen=%d\n", datlen);
1.2.2.22 misho 91: p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23! misho 92: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22 misho 93: }
94:
95: regfree(&re);
96: }
97: }
98:
1.2.2.17 misho 99: return 0;
100: }
101:
102: static int
1.2.2.22 misho 103: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 104: {
105: return 0;
106: }
107:
108: static int
1.2.2.22 misho 109: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 110: {
111: return 0;
112: }
113:
114:
1.2 misho 115: int
1.2.2.9 misho 116: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 117: {
118: struct mqtthdr *hdr;
1.2.2.1 misho 119: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 120: char szTopic[STRSIZ] = { 0 };
121: int siz = 0;
122: u_short mid = 0;
1.2.2.22 misho 123: struct tagPkt *p = NULL;
1.2 misho 124:
125: ioTRACE(2);
126:
127: if (!sess)
128: return -1;
129:
1.2.2.17 misho 130: ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22 misho 131: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17 misho 132: if (siz == -1) {
133: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
134: return 0;
135: }
136:
1.2.2.22 misho 137: /* duplicate packet for retransmit to subscribers */
138: /*
139: pubpkt = mqtt_msgDup(sess->sess_buf);
140: if (!pubpkt) {
141: ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
142: return 0;
143: } else
144: */
145:
1.2 misho 146: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
147: switch (hdr->mqtt_msg.qos) {
148: case MQTT_QOS_ACK:
1.2.2.23! misho 149: pubAck(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17 misho 150: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
151: if (siz == -1) {
152: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
153: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 154: return 0;
1.2.2.17 misho 155: }
1.2 misho 156: break;
157: case MQTT_QOS_EXACTLY:
1.2.2.23! misho 158: pubExactly(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17 misho 159: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
160: if (siz == -1) {
161: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
162: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 163: return 0;
1.2.2.17 misho 164: }
1.2 misho 165: break;
1.2.2.17 misho 166: case MQTT_QOS_ONCE:
1.2.2.23! misho 167: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2 misho 168: default:
1.2.2.22 misho 169: return 0;
1.2 misho 170: }
171:
1.2.2.22 misho 172: p = mkPkt(sess->sess_buf->msg_base, siz);
173: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
174: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2 misho 175: return 0;
176: }
1.2.2.1 misho 177:
178: int
1.2.2.9 misho 179: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 180: {
181: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 182: int siz = 0;
183: u_short mid = 0;
1.2.2.22 misho 184: struct tagPkt *p = NULL;
1.2.2.1 misho 185:
186: ioTRACE(2);
187:
188: if (!sess)
189: return -1;
190:
1.2.2.17 misho 191: ioDEBUG(5, "Exec PUBREL session");
192: mid = mqtt_readPUBREL(sess->sess_buf);
193: if (mid == (u_short) -1) {
194: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
195: return 0;
196: }
197:
198: // TODO:: Delete from database topic
199:
200: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
201: if (siz == -1) {
202: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
203: return 0;
1.2.2.22 misho 204: } else {
205: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17 misho 206: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
207: }
1.2.2.1 misho 208:
1.2.2.22 misho 209: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 210: return 0;
211: }
212:
213: int
1.2.2.9 misho 214: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 215: {
216: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 217: mqtt_subscr_t *subs = NULL;
218: int siz = 0;
219: u_short mid = 0;
220: register int i;
221: struct tagStore *store;
1.2.2.17 misho 222: char buf[BUFSIZ];
223: void *ptr;
1.2.2.21 misho 224: struct tagPkt *p = NULL;
1.2.2.1 misho 225:
226: ioTRACE(2);
227:
228: if (!sess)
229: return -1;
230:
1.2.2.7 misho 231: ioDEBUG(5, "Exec SUBSCRIBE session");
232: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
233: if (siz == -1) {
234: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
235: return 0;
236: }
237:
238: /* add to db */
239: for (i = 0; i < siz; i++) {
1.2.2.17 misho 240: /* convert topic to sql search statement */
241: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
242: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
243: goto end;
244: }
245: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
1.2.2.8 misho 246: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18 misho 247: store = io_malloc(sizeof(struct tagStore));
1.2.2.7 misho 248: if (!store) {
249: ioSYSERR(0);
250: goto end;
251: } else {
252: store->st_msgid = mid;
1.2.2.8 misho 253: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 254: }
255:
256: /* add to cache */
257: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
258:
1.2.2.17 misho 259: /* convert topic to regexp */
260: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 0) == -1) {
261: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
262: goto end;
263: } else {
264: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
265: if (!ptr) {
266: ioSYSERR(0);
267: goto end;
268: } else {
269: store->st_subscr.sub_topic.msg_base = ptr;
270: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
271: memcpy(store->st_subscr.sub_topic.msg_base, buf,
272: store->st_subscr.sub_topic.msg_len);
273: }
274: }
275:
276: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
277: store->st_subscr.sub_topic.msg_base,
278: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
279:
1.2.2.7 misho 280: subs[i].sub_ret = MQTT_QOS_PASS;
281: } else
282: subs[i].sub_ret = MQTT_QOS_DENY;
283: }
1.2.2.1 misho 284:
1.2.2.7 misho 285: /* send acknowledge */
286: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
287: if (siz == -1) {
288: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
289: goto end;
1.2.2.21 misho 290: } else {
291: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 292: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
293: }
1.2.2.21 misho 294:
295: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7 misho 296: end:
297: mqtt_subFree(&subs);
1.2.2.1 misho 298: return 0;
299: }
300:
301: int
1.2.2.9 misho 302: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 303: {
304: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13 misho 305: mqtt_subscr_t *subs = NULL;
306: int siz = 0;
307: u_short mid = 0;
308: register int i;
309: struct tagStore *store, *tmp;
1.2.2.21 misho 310: struct tagPkt *p = NULL;
1.2.2.1 misho 311:
312: ioTRACE(2);
313:
314: if (!sess)
315: return -1;
316:
1.2.2.13 misho 317: ioDEBUG(5, "Exec UNSUBSCRIBE session");
318: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
319: if (siz == -1) {
320: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
321: return 0;
322: }
323:
324: /* del from db */
325: for (i = 0; i < siz; i++) {
326: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
327: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
328: store->st_subscr.sub_topic.msg_base &&
329: !strcmp(store->st_subscr.sub_topic.msg_base,
330: subs[i].sub_topic.msg_base)) {
331: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14 misho 332:
333: if (store->st_subscr.sub_topic.msg_base)
334: free(store->st_subscr.sub_topic.msg_base);
335: if (store->st_subscr.sub_value.msg_base)
336: free(store->st_subscr.sub_value.msg_base);
1.2.2.18 misho 337: io_free(store);
1.2.2.13 misho 338: }
339: }
340:
1.2.2.15 misho 341: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
342: sess->sess_user, "%");
1.2.2.13 misho 343: }
344:
345: /* send acknowledge */
346: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
347: if (siz == -1) {
348: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
349: goto end;
1.2.2.21 misho 350: } else {
351: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13 misho 352: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
353: }
1.2.2.21 misho 354:
355: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13 misho 356: end:
357: mqtt_subFree(&subs);
1.2.2.1 misho 358: return 0;
359: }
360:
361: int
1.2.2.9 misho 362: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 363: {
364: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 365: int siz = 0;
1.2.2.19 misho 366: struct tagPkt *p = NULL;
1.2.2.1 misho 367:
368: ioTRACE(2);
369:
370: if (!sess)
371: return -1;
372:
1.2.2.7 misho 373: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 374: siz = mqtt_msgPINGRESP(sess->sess_buf);
375: if (siz == -1) {
376: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
377: return 0;
1.2.2.8 misho 378: } else {
1.2.2.19 misho 379: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 380: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
381: }
1.2.2.1 misho 382:
1.2.2.19 misho 383: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 384: return 0;
385: }
386:
387: int
1.2.2.9 misho 388: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 389: {
390: struct tagStore *store;
1.2.2.19 misho 391: struct tagPkt *p;
1.2.2.1 misho 392: struct tagSession *sess = (struct tagSession*) arg;
393:
394: ioTRACE(2);
395:
396: if (!sess)
397: return -1;
398:
1.2.2.6 misho 399: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 400: TAILQ_REMOVE(&Sessions, sess, sess_node);
401:
1.2.2.16 misho 402: if (sess->sess_clean) {
403: if (call.FiniSessPUB)
404: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
405: if (call.DeletePUB_subscribe)
406: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
407: if (call.WipePUB_topic)
408: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
409: }
1.2.2.1 misho 410:
1.2.2.6 misho 411: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
412: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 413:
1.2.2.6 misho 414: if (store->st_subscr.sub_topic.msg_base)
415: free(store->st_subscr.sub_topic.msg_base);
416: if (store->st_subscr.sub_value.msg_base)
417: free(store->st_subscr.sub_value.msg_base);
418:
1.2.2.18 misho 419: io_free(store);
1.2.2.6 misho 420: }
1.2.2.1 misho 421:
1.2.2.19 misho 422: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
423: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
424:
425: io_freeVar(&p->pkt_data);
426: io_free(p);
427: }
428:
1.2.2.1 misho 429: if (sess->sess_will.msg)
430: free(sess->sess_will.msg);
431: if (sess->sess_will.topic)
432: free(sess->sess_will.topic);
433:
434: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
435: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 436:
1.2.2.12 misho 437: return -3; /* reconnect client */
1.2.2.1 misho 438: }
439:
440: int
1.2.2.9 misho 441: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 442: {
443: struct tagSession *sess = (struct tagSession*) arg;
444:
445: ioTRACE(2);
446:
447: if (!sess)
448: return -1;
449:
1.2.2.5 misho 450: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 451:
1.2.2.1 misho 452: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
453: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 454:
1.2.2.10 misho 455: return -2; /* must terminate dispatcher */
1.2.2.1 misho 456: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>