Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.25
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
1.2.2.19 misho 7: static inline struct tagPkt *
8: mkPkt(void * __restrict data, int dlen)
9: {
10: struct tagPkt *p = NULL;
11:
12: p = io_malloc(sizeof(struct tagPkt));
13: if (!p) {
14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: } else
17: memset(p, 0, sizeof(struct tagPkt));
18:
19: p->pkt_data = io_allocVar();
20: if (!p->pkt_data) {
21: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
22: io_free(p);
23: return NULL;
24: }
25:
26: if (data && dlen > 0)
27: AIT_SET_BUF(p->pkt_data, data, dlen);
28:
29: return p;
30: }
31:
32: static inline void
1.2.2.20 misho 33: freePkt(struct tagPkt * __restrict p)
1.2.2.19 misho 34: {
1.2.2.20 misho 35: if (!p)
1.2.2.19 misho 36: return;
37:
1.2.2.20 misho 38: io_freeVar(&p->pkt_data);
39: io_free(p);
1.2.2.19 misho 40: }
41:
42: static void *
43: sendPacket(sched_task_t *task)
44: {
45: struct tagPkt *p = TASK_ARG(task);
46: register int n, slen;
47: u_char *pos;
48:
49: if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) {
50: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
51: return NULL;
52: }
53:
54: for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0;
55: slen -= n, pos += n) {
56: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
57: if (n == -1) {
58: ioSYSERR(0);
59: break;
60: }
61: }
62:
1.2.2.20 misho 63: freePkt(p);
1.2.2.19 misho 64: return NULL;
65: }
66:
1.2.2.22 misho 67: /* --------------------------------------------------- */
1.2.2.19 misho 68:
1.2.2.17 misho 69: static int
1.2.2.22 misho 70: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 71: {
1.2.2.22 misho 72: struct tagPkt *p = NULL;
73: struct tagSession *s = NULL;
74: struct tagStore *st = NULL;
75: regex_t re;
76: regmatch_t match;
77: int ret;
78: char szStr[STRSIZ];
79:
80: TAILQ_FOREACH(s, &Sessions, sess_node) {
81: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
82: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
83: regerror(ret, &re, szStr, sizeof szStr);
84: regfree(&re);
85: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
86: st->st_subscr.sub_topic.msg_base, szStr);
87: }
88: if (!regexec(&re, psTopic, 1, &match, 0)) {
89: /* MATCH */
1.2.2.23 misho 90: ioDEBUG(1, "+++ dlen=%d\n", datlen);
1.2.2.22 misho 91: p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23 misho 92: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22 misho 93: }
94:
95: regfree(&re);
96: }
97: }
98:
1.2.2.17 misho 99: return 0;
100: }
101:
102: static int
1.2.2.22 misho 103: pubAck(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 104: {
105: return 0;
106: }
107:
108: static int
1.2.2.22 misho 109: pubExactly(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 110: {
111: return 0;
112: }
113:
114:
1.2 misho 115: int
1.2.2.9 misho 116: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 117: {
118: struct mqtthdr *hdr;
1.2.2.1 misho 119: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 120: char szTopic[STRSIZ] = { 0 };
121: int siz = 0;
122: u_short mid = 0;
1.2.2.22 misho 123: struct tagPkt *p = NULL;
1.2 misho 124:
125: ioTRACE(2);
126:
127: if (!sess)
128: return -1;
129:
1.2.2.17 misho 130: ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22 misho 131: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17 misho 132: if (siz == -1) {
133: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
134: return 0;
135: }
136:
1.2.2.22 misho 137: /* duplicate packet for retransmit to subscribers */
138: /*
139: pubpkt = mqtt_msgDup(sess->sess_buf);
140: if (!pubpkt) {
141: ioDEBUG(5, "Error:: in duplicate packet #%d - %s", mqtt_GetErrno(), mqtt_GetError());
142: return 0;
143: } else
144: */
145:
1.2 misho 146: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
147: switch (hdr->mqtt_msg.qos) {
148: case MQTT_QOS_ACK:
1.2.2.23 misho 149: pubAck(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17 misho 150: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
151: if (siz == -1) {
152: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
153: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 154: return 0;
1.2.2.17 misho 155: }
1.2 misho 156: break;
157: case MQTT_QOS_EXACTLY:
1.2.2.23 misho 158: pubExactly(sess, szTopic, mqtt_pktLen(hdr));
1.2.2.17 misho 159: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
160: if (siz == -1) {
161: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
162: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 163: return 0;
1.2.2.17 misho 164: }
1.2 misho 165: break;
1.2.2.17 misho 166: case MQTT_QOS_ONCE:
1.2.2.23 misho 167: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2 misho 168: default:
1.2.2.22 misho 169: return 0;
1.2 misho 170: }
171:
1.2.2.22 misho 172: p = mkPkt(sess->sess_buf->msg_base, siz);
173: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
174: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2 misho 175: return 0;
176: }
1.2.2.1 misho 177:
178: int
1.2.2.9 misho 179: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 180: {
181: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 182: int siz = 0;
183: u_short mid = 0;
1.2.2.22 misho 184: struct tagPkt *p = NULL;
1.2.2.1 misho 185:
186: ioTRACE(2);
187:
188: if (!sess)
189: return -1;
190:
1.2.2.17 misho 191: ioDEBUG(5, "Exec PUBREL session");
192: mid = mqtt_readPUBREL(sess->sess_buf);
193: if (mid == (u_short) -1) {
194: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
195: return 0;
196: }
197:
198: // TODO:: Delete from database topic
199:
200: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
201: if (siz == -1) {
202: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
203: return 0;
1.2.2.22 misho 204: } else {
205: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17 misho 206: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
207: }
1.2.2.1 misho 208:
1.2.2.22 misho 209: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 210: return 0;
211: }
212:
213: int
1.2.2.9 misho 214: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 215: {
216: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 217: mqtt_subscr_t *subs = NULL;
218: int siz = 0;
219: u_short mid = 0;
220: register int i;
221: struct tagStore *store;
1.2.2.17 misho 222: char buf[BUFSIZ];
223: void *ptr;
1.2.2.21 misho 224: struct tagPkt *p = NULL;
1.2.2.1 misho 225:
226: ioTRACE(2);
227:
228: if (!sess)
229: return -1;
230:
1.2.2.7 misho 231: ioDEBUG(5, "Exec SUBSCRIBE session");
232: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
233: if (siz == -1) {
234: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
235: return 0;
236: }
237:
238: /* add to db */
1.2.2.25! misho 239: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.17 misho 240: /* convert topic to sql search statement */
241: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
242: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.25! misho 243: continue;
1.2.2.17 misho 244: }
245: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
1.2.2.8 misho 246: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18 misho 247: store = io_malloc(sizeof(struct tagStore));
1.2.2.7 misho 248: if (!store) {
249: ioSYSERR(0);
1.2.2.25! misho 250: continue;
1.2.2.7 misho 251: } else {
252: store->st_msgid = mid;
1.2.2.8 misho 253: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 254: }
255:
256: /* add to cache */
257: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
258:
1.2.2.17 misho 259: /* convert topic to regexp */
1.2.2.24 misho 260: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.2.2.17 misho 261: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.24 misho 262:
263: subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17 misho 264: } else {
265: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
1.2.2.25! misho 266: if (!ptr)
1.2.2.17 misho 267: ioSYSERR(0);
1.2.2.25! misho 268: else {
1.2.2.17 misho 269: store->st_subscr.sub_topic.msg_base = ptr;
270: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
271: memcpy(store->st_subscr.sub_topic.msg_base, buf,
272: store->st_subscr.sub_topic.msg_len);
273: }
274:
1.2.2.24 misho 275: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
276: store->st_subscr.sub_topic.msg_base,
277: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.17 misho 278:
1.2.2.24 misho 279: subs[i].sub_ret = MQTT_QOS_PASS;
280: }
1.2.2.25! misho 281: }
1.2.2.7 misho 282: }
1.2.2.1 misho 283:
1.2.2.7 misho 284: /* send acknowledge */
285: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
286: if (siz == -1) {
287: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
288: goto end;
1.2.2.21 misho 289: } else {
290: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 291: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
292: }
1.2.2.21 misho 293:
294: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7 misho 295: end:
296: mqtt_subFree(&subs);
1.2.2.1 misho 297: return 0;
298: }
299:
300: int
1.2.2.9 misho 301: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 302: {
303: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13 misho 304: mqtt_subscr_t *subs = NULL;
305: int siz = 0;
306: u_short mid = 0;
307: register int i;
308: struct tagStore *store, *tmp;
1.2.2.21 misho 309: struct tagPkt *p = NULL;
1.2.2.1 misho 310:
311: ioTRACE(2);
312:
313: if (!sess)
314: return -1;
315:
1.2.2.13 misho 316: ioDEBUG(5, "Exec UNSUBSCRIBE session");
317: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
318: if (siz == -1) {
319: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
320: return 0;
321: }
322:
323: /* del from db */
324: for (i = 0; i < siz; i++) {
325: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
326: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
327: store->st_subscr.sub_topic.msg_base &&
328: !strcmp(store->st_subscr.sub_topic.msg_base,
329: subs[i].sub_topic.msg_base)) {
330: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14 misho 331:
332: if (store->st_subscr.sub_topic.msg_base)
333: free(store->st_subscr.sub_topic.msg_base);
334: if (store->st_subscr.sub_value.msg_base)
335: free(store->st_subscr.sub_value.msg_base);
1.2.2.18 misho 336: io_free(store);
1.2.2.13 misho 337: }
338: }
339:
1.2.2.15 misho 340: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
341: sess->sess_user, "%");
1.2.2.13 misho 342: }
343:
344: /* send acknowledge */
345: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
346: if (siz == -1) {
347: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
348: goto end;
1.2.2.21 misho 349: } else {
350: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13 misho 351: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
352: }
1.2.2.21 misho 353:
354: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13 misho 355: end:
356: mqtt_subFree(&subs);
1.2.2.1 misho 357: return 0;
358: }
359:
360: int
1.2.2.9 misho 361: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 362: {
363: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 364: int siz = 0;
1.2.2.19 misho 365: struct tagPkt *p = NULL;
1.2.2.1 misho 366:
367: ioTRACE(2);
368:
369: if (!sess)
370: return -1;
371:
1.2.2.7 misho 372: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 373: siz = mqtt_msgPINGRESP(sess->sess_buf);
374: if (siz == -1) {
375: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
376: return 0;
1.2.2.8 misho 377: } else {
1.2.2.19 misho 378: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 379: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
380: }
1.2.2.1 misho 381:
1.2.2.19 misho 382: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 383: return 0;
384: }
385:
386: int
1.2.2.9 misho 387: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 388: {
389: struct tagStore *store;
1.2.2.19 misho 390: struct tagPkt *p;
1.2.2.1 misho 391: struct tagSession *sess = (struct tagSession*) arg;
392:
393: ioTRACE(2);
394:
395: if (!sess)
396: return -1;
397:
1.2.2.6 misho 398: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 399: TAILQ_REMOVE(&Sessions, sess, sess_node);
400:
1.2.2.16 misho 401: if (sess->sess_clean) {
402: if (call.FiniSessPUB)
403: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
404: if (call.DeletePUB_subscribe)
405: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
406: if (call.WipePUB_topic)
407: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
408: }
1.2.2.1 misho 409:
1.2.2.6 misho 410: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
411: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 412:
1.2.2.6 misho 413: if (store->st_subscr.sub_topic.msg_base)
414: free(store->st_subscr.sub_topic.msg_base);
415: if (store->st_subscr.sub_value.msg_base)
416: free(store->st_subscr.sub_value.msg_base);
417:
1.2.2.18 misho 418: io_free(store);
1.2.2.6 misho 419: }
1.2.2.1 misho 420:
1.2.2.19 misho 421: while ((p = SLIST_FIRST(&sess->sess_sndpkt))) {
422: SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node);
423:
424: io_freeVar(&p->pkt_data);
425: io_free(p);
426: }
427:
1.2.2.1 misho 428: if (sess->sess_will.msg)
429: free(sess->sess_will.msg);
430: if (sess->sess_will.topic)
431: free(sess->sess_will.topic);
432:
433: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
434: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 435:
1.2.2.12 misho 436: return -3; /* reconnect client */
1.2.2.1 misho 437: }
438:
439: int
1.2.2.9 misho 440: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 441: {
442: struct tagSession *sess = (struct tagSession*) arg;
443:
444: ioTRACE(2);
445:
446: if (!sess)
447: return -1;
448:
1.2.2.5 misho 449: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 450:
1.2.2.1 misho 451: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
452: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 453:
1.2.2.10 misho 454: return -2; /* must terminate dispatcher */
1.2.2.1 misho 455: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>