Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.27
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
1.2.2.26 misho 7: static inline ait_val_t *
1.2.2.19 misho 8: mkPkt(void * __restrict data, int dlen)
9: {
1.2.2.26 misho 10: ait_val_t *p = NULL;
1.2.2.19 misho 11:
1.2.2.26 misho 12: if (!(p = io_allocVar())) {
1.2.2.19 misho 13: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
14: return NULL;
15: }
16:
17: if (data && dlen > 0)
1.2.2.26 misho 18: AIT_SET_BUF(p, data, dlen);
1.2.2.19 misho 19:
20: return p;
21: }
22:
23: static inline void
1.2.2.26 misho 24: freePkt(ait_val_t ** __restrict p)
1.2.2.19 misho 25: {
1.2.2.20 misho 26: if (!p)
1.2.2.19 misho 27: return;
28:
1.2.2.26 misho 29: io_freeVar(p);
1.2.2.19 misho 30: }
31:
32: static void *
33: sendPacket(sched_task_t *task)
34: {
1.2.2.26 misho 35: ait_val_t *p = TASK_ARG(task);
1.2.2.19 misho 36: register int n, slen;
37: u_char *pos;
38:
1.2.2.26 misho 39: if (!p || AIT_ISEMPTY(p)) {
1.2.2.19 misho 40: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
41: return NULL;
42: }
43:
1.2.2.26 misho 44: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19 misho 45: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
46: if (n == -1) {
47: ioSYSERR(0);
48: break;
49: }
50: }
51:
1.2.2.26 misho 52: freePkt(&p);
1.2.2.19 misho 53: return NULL;
54: }
55:
1.2.2.22 misho 56: /* --------------------------------------------------- */
1.2.2.19 misho 57:
1.2.2.17 misho 58: static int
1.2.2.22 misho 59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 60: {
1.2.2.26 misho 61: ait_val_t *p = NULL;
1.2.2.22 misho 62: struct tagSession *s = NULL;
63: struct tagStore *st = NULL;
64: regex_t re;
65: regmatch_t match;
66: int ret;
67: char szStr[STRSIZ];
68:
69: TAILQ_FOREACH(s, &Sessions, sess_node) {
70: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
71: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
72: regerror(ret, &re, szStr, sizeof szStr);
73: regfree(&re);
74: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
75: st->st_subscr.sub_topic.msg_base, szStr);
76: }
77: if (!regexec(&re, psTopic, 1, &match, 0)) {
78: /* MATCH */
79: p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23 misho 80: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22 misho 81: }
82:
83: regfree(&re);
84: }
85: }
86:
1.2.2.17 misho 87: return 0;
88: }
89:
90: static int
1.2.2.26 misho 91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17 misho 92: {
1.2.2.26 misho 93: ait_val_t *p = NULL;
94: struct tagSession *s = NULL;
95: struct tagStore *st = NULL;
96: regex_t re;
97: regmatch_t match;
98: int ret, flg = 0;
99: char szStr[STRSIZ];
100: struct mqtthdr *hdr;
101:
102: p = mkPkt(sess->sess_buf->msg_base, datlen);
103: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
104:
105: /* write topic to database */
1.2.2.27! misho 106: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
! 107: sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.26 misho 108: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
1.2.2.27! misho 109: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26 misho 110:
111: TAILQ_FOREACH(s, &Sessions, sess_node) {
112: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
113: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
114: regerror(ret, &re, szStr, sizeof szStr);
115: regfree(&re);
116: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
117: st->st_subscr.sub_topic.msg_base, szStr);
118: }
119: if (!regexec(&re, psTopic, 1, &match, 0)) {
120: flg = 1; /* MATCH */
121: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
122: }
123:
124: regfree(&re);
125: }
126: }
127:
128: /* if subscriber not found then free packet */
129: if (!flg)
130: freePkt(&p);
131:
1.2.2.27! misho 132: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
! 133: sess->sess_addr, 0);
1.2.2.17 misho 134: return 0;
135: }
136:
137: static int
1.2.2.26 misho 138: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17 misho 139: {
140: return 0;
141: }
142:
143:
1.2 misho 144: int
1.2.2.9 misho 145: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 146: {
147: struct mqtthdr *hdr;
1.2.2.1 misho 148: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 149: char szTopic[STRSIZ] = { 0 };
150: int siz = 0;
151: u_short mid = 0;
1.2.2.26 misho 152: ait_val_t *p = NULL;
1.2 misho 153:
154: ioTRACE(2);
155:
156: if (!sess)
157: return -1;
158:
1.2.2.17 misho 159: ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22 misho 160: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17 misho 161: if (siz == -1) {
162: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
163: return 0;
164: }
165:
1.2 misho 166: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
167: switch (hdr->mqtt_msg.qos) {
168: case MQTT_QOS_ACK:
1.2.2.26 misho 169: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
170: return 0;
1.2.2.17 misho 171: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
172: if (siz == -1) {
173: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
174: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 175: return 0;
1.2.2.17 misho 176: }
1.2 misho 177: break;
178: case MQTT_QOS_EXACTLY:
1.2.2.26 misho 179: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
180: return 0;
1.2.2.17 misho 181: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
182: if (siz == -1) {
183: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
184: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 185: return 0;
1.2.2.17 misho 186: }
1.2 misho 187: break;
1.2.2.17 misho 188: case MQTT_QOS_ONCE:
1.2.2.23 misho 189: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2 misho 190: default:
1.2.2.22 misho 191: return 0;
1.2 misho 192: }
193:
1.2.2.22 misho 194: p = mkPkt(sess->sess_buf->msg_base, siz);
195: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
196: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2 misho 197: return 0;
198: }
1.2.2.1 misho 199:
200: int
1.2.2.9 misho 201: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 202: {
203: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 204: int siz = 0;
205: u_short mid = 0;
1.2.2.26 misho 206: ait_val_t *p = NULL;
1.2.2.1 misho 207:
208: ioTRACE(2);
209:
210: if (!sess)
211: return -1;
212:
1.2.2.17 misho 213: ioDEBUG(5, "Exec PUBREL session");
214: mid = mqtt_readPUBREL(sess->sess_buf);
215: if (mid == (u_short) -1) {
216: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
217: return 0;
218: }
219:
220: // TODO:: Delete from database topic
221:
222: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
223: if (siz == -1) {
224: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
225: return 0;
1.2.2.22 misho 226: } else {
227: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17 misho 228: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
229: }
1.2.2.1 misho 230:
1.2.2.22 misho 231: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 232: return 0;
233: }
234:
235: int
1.2.2.9 misho 236: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 237: {
238: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 239: mqtt_subscr_t *subs = NULL;
240: int siz = 0;
241: u_short mid = 0;
242: register int i;
243: struct tagStore *store;
1.2.2.17 misho 244: char buf[BUFSIZ];
245: void *ptr;
1.2.2.26 misho 246: ait_val_t *p = NULL;
1.2.2.1 misho 247:
248: ioTRACE(2);
249:
250: if (!sess)
251: return -1;
252:
1.2.2.7 misho 253: ioDEBUG(5, "Exec SUBSCRIBE session");
254: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
255: if (siz == -1) {
256: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
257: return 0;
258: }
259:
260: /* add to db */
1.2.2.25 misho 261: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.17 misho 262: /* convert topic to sql search statement */
263: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
264: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.25 misho 265: continue;
1.2.2.17 misho 266: }
267: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
1.2.2.8 misho 268: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18 misho 269: store = io_malloc(sizeof(struct tagStore));
1.2.2.7 misho 270: if (!store) {
271: ioSYSERR(0);
1.2.2.25 misho 272: continue;
1.2.2.7 misho 273: } else {
274: store->st_msgid = mid;
1.2.2.8 misho 275: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 276: }
277:
278: /* add to cache */
279: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
280:
1.2.2.17 misho 281: /* convert topic to regexp */
1.2.2.24 misho 282: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.2.2.17 misho 283: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.24 misho 284:
285: subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17 misho 286: } else {
287: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
1.2.2.25 misho 288: if (!ptr)
1.2.2.17 misho 289: ioSYSERR(0);
1.2.2.25 misho 290: else {
1.2.2.17 misho 291: store->st_subscr.sub_topic.msg_base = ptr;
292: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
293: memcpy(store->st_subscr.sub_topic.msg_base, buf,
294: store->st_subscr.sub_topic.msg_len);
295: }
296:
1.2.2.24 misho 297: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
298: store->st_subscr.sub_topic.msg_base,
299: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.17 misho 300:
1.2.2.24 misho 301: subs[i].sub_ret = MQTT_QOS_PASS;
302: }
1.2.2.25 misho 303: }
1.2.2.7 misho 304: }
1.2.2.1 misho 305:
1.2.2.7 misho 306: /* send acknowledge */
307: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
308: if (siz == -1) {
309: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
310: goto end;
1.2.2.21 misho 311: } else {
312: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 313: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
314: }
1.2.2.21 misho 315:
316: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7 misho 317: end:
318: mqtt_subFree(&subs);
1.2.2.1 misho 319: return 0;
320: }
321:
322: int
1.2.2.9 misho 323: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 324: {
325: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13 misho 326: mqtt_subscr_t *subs = NULL;
327: int siz = 0;
328: u_short mid = 0;
329: register int i;
330: struct tagStore *store, *tmp;
1.2.2.26 misho 331: ait_val_t *p = NULL;
1.2.2.1 misho 332:
333: ioTRACE(2);
334:
335: if (!sess)
336: return -1;
337:
1.2.2.13 misho 338: ioDEBUG(5, "Exec UNSUBSCRIBE session");
339: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
340: if (siz == -1) {
341: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
342: return 0;
343: }
344:
345: /* del from db */
346: for (i = 0; i < siz; i++) {
347: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
348: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
349: store->st_subscr.sub_topic.msg_base &&
350: !strcmp(store->st_subscr.sub_topic.msg_base,
351: subs[i].sub_topic.msg_base)) {
352: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14 misho 353:
354: if (store->st_subscr.sub_topic.msg_base)
355: free(store->st_subscr.sub_topic.msg_base);
356: if (store->st_subscr.sub_value.msg_base)
357: free(store->st_subscr.sub_value.msg_base);
1.2.2.18 misho 358: io_free(store);
1.2.2.13 misho 359: }
360: }
361:
1.2.2.15 misho 362: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
363: sess->sess_user, "%");
1.2.2.13 misho 364: }
365:
366: /* send acknowledge */
367: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
368: if (siz == -1) {
369: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
370: goto end;
1.2.2.21 misho 371: } else {
372: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13 misho 373: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
374: }
1.2.2.21 misho 375:
376: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13 misho 377: end:
378: mqtt_subFree(&subs);
1.2.2.1 misho 379: return 0;
380: }
381:
382: int
1.2.2.9 misho 383: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 384: {
385: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 386: int siz = 0;
1.2.2.26 misho 387: ait_val_t *p = NULL;
1.2.2.1 misho 388:
389: ioTRACE(2);
390:
391: if (!sess)
392: return -1;
393:
1.2.2.7 misho 394: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 395: siz = mqtt_msgPINGRESP(sess->sess_buf);
396: if (siz == -1) {
397: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
398: return 0;
1.2.2.8 misho 399: } else {
1.2.2.19 misho 400: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 401: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
402: }
1.2.2.1 misho 403:
1.2.2.19 misho 404: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 405: return 0;
406: }
407:
408: int
1.2.2.9 misho 409: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 410: {
411: struct tagStore *store;
412: struct tagSession *sess = (struct tagSession*) arg;
413:
414: ioTRACE(2);
415:
416: if (!sess)
417: return -1;
418:
1.2.2.6 misho 419: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 420: TAILQ_REMOVE(&Sessions, sess, sess_node);
421:
1.2.2.16 misho 422: if (sess->sess_clean) {
423: if (call.FiniSessPUB)
424: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
425: if (call.DeletePUB_subscribe)
426: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
427: if (call.WipePUB_topic)
428: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
429: }
1.2.2.1 misho 430:
1.2.2.6 misho 431: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
432: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 433:
1.2.2.6 misho 434: if (store->st_subscr.sub_topic.msg_base)
435: free(store->st_subscr.sub_topic.msg_base);
436: if (store->st_subscr.sub_value.msg_base)
437: free(store->st_subscr.sub_value.msg_base);
438:
1.2.2.18 misho 439: io_free(store);
1.2.2.6 misho 440: }
1.2.2.1 misho 441:
442: if (sess->sess_will.msg)
443: free(sess->sess_will.msg);
444: if (sess->sess_will.topic)
445: free(sess->sess_will.topic);
446:
447: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
448: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 449:
1.2.2.12 misho 450: return -3; /* reconnect client */
1.2.2.1 misho 451: }
452:
453: int
1.2.2.9 misho 454: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 455: {
456: struct tagSession *sess = (struct tagSession*) arg;
457:
458: ioTRACE(2);
459:
460: if (!sess)
461: return -1;
462:
1.2.2.5 misho 463: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 464:
1.2.2.1 misho 465: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
466: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 467:
1.2.2.10 misho 468: return -2; /* must terminate dispatcher */
1.2.2.1 misho 469: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>