Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.26
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
1.2.2.26! misho 7: static inline ait_val_t *
1.2.2.19 misho 8: mkPkt(void * __restrict data, int dlen)
9: {
1.2.2.26! misho 10: ait_val_t *p = NULL;
1.2.2.19 misho 11:
1.2.2.26! misho 12: if (!(p = io_allocVar())) {
1.2.2.19 misho 13: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
14: return NULL;
15: }
16:
17: if (data && dlen > 0)
1.2.2.26! misho 18: AIT_SET_BUF(p, data, dlen);
1.2.2.19 misho 19:
20: return p;
21: }
22:
23: static inline void
1.2.2.26! misho 24: freePkt(ait_val_t ** __restrict p)
1.2.2.19 misho 25: {
1.2.2.20 misho 26: if (!p)
1.2.2.19 misho 27: return;
28:
1.2.2.26! misho 29: io_freeVar(p);
1.2.2.19 misho 30: }
31:
32: static void *
33: sendPacket(sched_task_t *task)
34: {
1.2.2.26! misho 35: ait_val_t *p = TASK_ARG(task);
1.2.2.19 misho 36: register int n, slen;
37: u_char *pos;
38:
1.2.2.26! misho 39: if (!p || AIT_ISEMPTY(p)) {
1.2.2.19 misho 40: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
41: return NULL;
42: }
43:
1.2.2.26! misho 44: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19 misho 45: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
46: if (n == -1) {
47: ioSYSERR(0);
48: break;
49: }
50: }
51:
1.2.2.26! misho 52: freePkt(&p);
1.2.2.19 misho 53: return NULL;
54: }
55:
1.2.2.22 misho 56: /* --------------------------------------------------- */
1.2.2.19 misho 57:
1.2.2.17 misho 58: static int
1.2.2.22 misho 59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 60: {
1.2.2.26! misho 61: ait_val_t *p = NULL;
1.2.2.22 misho 62: struct tagSession *s = NULL;
63: struct tagStore *st = NULL;
64: regex_t re;
65: regmatch_t match;
66: int ret;
67: char szStr[STRSIZ];
68:
69: TAILQ_FOREACH(s, &Sessions, sess_node) {
70: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
71: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
72: regerror(ret, &re, szStr, sizeof szStr);
73: regfree(&re);
74: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
75: st->st_subscr.sub_topic.msg_base, szStr);
76: }
77: if (!regexec(&re, psTopic, 1, &match, 0)) {
78: /* MATCH */
79: p = mkPkt(sess->sess_buf->msg_base, datlen);
1.2.2.23 misho 80: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
1.2.2.22 misho 81: }
82:
83: regfree(&re);
84: }
85: }
86:
1.2.2.17 misho 87: return 0;
88: }
89:
90: static int
1.2.2.26! misho 91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17 misho 92: {
1.2.2.26! misho 93: ait_val_t *p = NULL;
! 94: struct tagSession *s = NULL;
! 95: struct tagStore *st = NULL;
! 96: regex_t re;
! 97: regmatch_t match;
! 98: int ret, flg = 0;
! 99: char szStr[STRSIZ];
! 100: struct mqtthdr *hdr;
! 101:
! 102: p = mkPkt(sess->sess_buf->msg_base, datlen);
! 103: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
! 104:
! 105: /* write topic to database */
! 106: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p),
! 107: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.retain);
! 108:
! 109: TAILQ_FOREACH(s, &Sessions, sess_node) {
! 110: SLIST_FOREACH(st, &s->sess_subscr, st_node) {
! 111: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
! 112: regerror(ret, &re, szStr, sizeof szStr);
! 113: regfree(&re);
! 114: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
! 115: st->st_subscr.sub_topic.msg_base, szStr);
! 116: }
! 117: if (!regexec(&re, psTopic, 1, &match, 0)) {
! 118: flg = 1; /* MATCH */
! 119: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
! 120: }
! 121:
! 122: regfree(&re);
! 123: }
! 124: }
! 125:
! 126: /* if subscriber not found then free packet */
! 127: if (!flg)
! 128: freePkt(&p);
! 129:
! 130: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, sess->sess_addr, -1);
1.2.2.17 misho 131: return 0;
132: }
133:
134: static int
1.2.2.26! misho 135: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17 misho 136: {
137: return 0;
138: }
139:
140:
1.2 misho 141: int
1.2.2.9 misho 142: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 143: {
144: struct mqtthdr *hdr;
1.2.2.1 misho 145: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 146: char szTopic[STRSIZ] = { 0 };
147: int siz = 0;
148: u_short mid = 0;
1.2.2.26! misho 149: ait_val_t *p = NULL;
1.2 misho 150:
151: ioTRACE(2);
152:
153: if (!sess)
154: return -1;
155:
1.2.2.17 misho 156: ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22 misho 157: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17 misho 158: if (siz == -1) {
159: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
160: return 0;
161: }
162:
1.2 misho 163: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
164: switch (hdr->mqtt_msg.qos) {
165: case MQTT_QOS_ACK:
1.2.2.26! misho 166: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
! 167: return 0;
1.2.2.17 misho 168: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
169: if (siz == -1) {
170: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
171: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 172: return 0;
1.2.2.17 misho 173: }
1.2 misho 174: break;
175: case MQTT_QOS_EXACTLY:
1.2.2.26! misho 176: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
! 177: return 0;
1.2.2.17 misho 178: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
179: if (siz == -1) {
180: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
181: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 182: return 0;
1.2.2.17 misho 183: }
1.2 misho 184: break;
1.2.2.17 misho 185: case MQTT_QOS_ONCE:
1.2.2.23 misho 186: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2 misho 187: default:
1.2.2.22 misho 188: return 0;
1.2 misho 189: }
190:
1.2.2.22 misho 191: p = mkPkt(sess->sess_buf->msg_base, siz);
192: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
193: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2 misho 194: return 0;
195: }
1.2.2.1 misho 196:
197: int
1.2.2.9 misho 198: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 199: {
200: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 201: int siz = 0;
202: u_short mid = 0;
1.2.2.26! misho 203: ait_val_t *p = NULL;
1.2.2.1 misho 204:
205: ioTRACE(2);
206:
207: if (!sess)
208: return -1;
209:
1.2.2.17 misho 210: ioDEBUG(5, "Exec PUBREL session");
211: mid = mqtt_readPUBREL(sess->sess_buf);
212: if (mid == (u_short) -1) {
213: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
214: return 0;
215: }
216:
217: // TODO:: Delete from database topic
218:
219: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
220: if (siz == -1) {
221: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
222: return 0;
1.2.2.22 misho 223: } else {
224: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.17 misho 225: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
226: }
1.2.2.1 misho 227:
1.2.2.22 misho 228: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 229: return 0;
230: }
231:
232: int
1.2.2.9 misho 233: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 234: {
235: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 236: mqtt_subscr_t *subs = NULL;
237: int siz = 0;
238: u_short mid = 0;
239: register int i;
240: struct tagStore *store;
1.2.2.17 misho 241: char buf[BUFSIZ];
242: void *ptr;
1.2.2.26! misho 243: ait_val_t *p = NULL;
1.2.2.1 misho 244:
245: ioTRACE(2);
246:
247: if (!sess)
248: return -1;
249:
1.2.2.7 misho 250: ioDEBUG(5, "Exec SUBSCRIBE session");
251: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
252: if (siz == -1) {
253: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
254: return 0;
255: }
256:
257: /* add to db */
1.2.2.25 misho 258: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) {
1.2.2.17 misho 259: /* convert topic to sql search statement */
260: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) {
261: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.25 misho 262: continue;
1.2.2.17 misho 263: }
264: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
1.2.2.8 misho 265: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.18 misho 266: store = io_malloc(sizeof(struct tagStore));
1.2.2.7 misho 267: if (!store) {
268: ioSYSERR(0);
1.2.2.25 misho 269: continue;
1.2.2.7 misho 270: } else {
271: store->st_msgid = mid;
1.2.2.8 misho 272: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 273: }
274:
275: /* add to cache */
276: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
277:
1.2.2.17 misho 278: /* convert topic to regexp */
1.2.2.24 misho 279: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
1.2.2.17 misho 280: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.2.2.24 misho 281:
282: subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17 misho 283: } else {
284: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
1.2.2.25 misho 285: if (!ptr)
1.2.2.17 misho 286: ioSYSERR(0);
1.2.2.25 misho 287: else {
1.2.2.17 misho 288: store->st_subscr.sub_topic.msg_base = ptr;
289: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
290: memcpy(store->st_subscr.sub_topic.msg_base, buf,
291: store->st_subscr.sub_topic.msg_len);
292: }
293:
1.2.2.24 misho 294: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid,
295: store->st_subscr.sub_topic.msg_base,
296: store->st_subscr.sub_topic.msg_len, sess->sess_addr);
1.2.2.17 misho 297:
1.2.2.24 misho 298: subs[i].sub_ret = MQTT_QOS_PASS;
299: }
1.2.2.25 misho 300: }
1.2.2.7 misho 301: }
1.2.2.1 misho 302:
1.2.2.7 misho 303: /* send acknowledge */
304: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
305: if (siz == -1) {
306: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
307: goto end;
1.2.2.21 misho 308: } else {
309: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 310: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
311: }
1.2.2.21 misho 312:
313: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7 misho 314: end:
315: mqtt_subFree(&subs);
1.2.2.1 misho 316: return 0;
317: }
318:
319: int
1.2.2.9 misho 320: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 321: {
322: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13 misho 323: mqtt_subscr_t *subs = NULL;
324: int siz = 0;
325: u_short mid = 0;
326: register int i;
327: struct tagStore *store, *tmp;
1.2.2.26! misho 328: ait_val_t *p = NULL;
1.2.2.1 misho 329:
330: ioTRACE(2);
331:
332: if (!sess)
333: return -1;
334:
1.2.2.13 misho 335: ioDEBUG(5, "Exec UNSUBSCRIBE session");
336: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
337: if (siz == -1) {
338: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
339: return 0;
340: }
341:
342: /* del from db */
343: for (i = 0; i < siz; i++) {
344: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
345: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
346: store->st_subscr.sub_topic.msg_base &&
347: !strcmp(store->st_subscr.sub_topic.msg_base,
348: subs[i].sub_topic.msg_base)) {
349: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14 misho 350:
351: if (store->st_subscr.sub_topic.msg_base)
352: free(store->st_subscr.sub_topic.msg_base);
353: if (store->st_subscr.sub_value.msg_base)
354: free(store->st_subscr.sub_value.msg_base);
1.2.2.18 misho 355: io_free(store);
1.2.2.13 misho 356: }
357: }
358:
1.2.2.15 misho 359: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
360: sess->sess_user, "%");
1.2.2.13 misho 361: }
362:
363: /* send acknowledge */
364: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
365: if (siz == -1) {
366: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
367: goto end;
1.2.2.21 misho 368: } else {
369: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13 misho 370: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
371: }
1.2.2.21 misho 372:
373: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13 misho 374: end:
375: mqtt_subFree(&subs);
1.2.2.1 misho 376: return 0;
377: }
378:
379: int
1.2.2.9 misho 380: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 381: {
382: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 383: int siz = 0;
1.2.2.26! misho 384: ait_val_t *p = NULL;
1.2.2.1 misho 385:
386: ioTRACE(2);
387:
388: if (!sess)
389: return -1;
390:
1.2.2.7 misho 391: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 392: siz = mqtt_msgPINGRESP(sess->sess_buf);
393: if (siz == -1) {
394: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
395: return 0;
1.2.2.8 misho 396: } else {
1.2.2.19 misho 397: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 398: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
399: }
1.2.2.1 misho 400:
1.2.2.19 misho 401: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 402: return 0;
403: }
404:
405: int
1.2.2.9 misho 406: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 407: {
408: struct tagStore *store;
409: struct tagSession *sess = (struct tagSession*) arg;
410:
411: ioTRACE(2);
412:
413: if (!sess)
414: return -1;
415:
1.2.2.6 misho 416: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 417: TAILQ_REMOVE(&Sessions, sess, sess_node);
418:
1.2.2.16 misho 419: if (sess->sess_clean) {
420: if (call.FiniSessPUB)
421: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
422: if (call.DeletePUB_subscribe)
423: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
424: if (call.WipePUB_topic)
425: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
426: }
1.2.2.1 misho 427:
1.2.2.6 misho 428: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
429: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 430:
1.2.2.6 misho 431: if (store->st_subscr.sub_topic.msg_base)
432: free(store->st_subscr.sub_topic.msg_base);
433: if (store->st_subscr.sub_value.msg_base)
434: free(store->st_subscr.sub_value.msg_base);
435:
1.2.2.18 misho 436: io_free(store);
1.2.2.6 misho 437: }
1.2.2.1 misho 438:
439: if (sess->sess_will.msg)
440: free(sess->sess_will.msg);
441: if (sess->sess_will.topic)
442: free(sess->sess_will.topic);
443:
444: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
445: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 446:
1.2.2.12 misho 447: return -3; /* reconnect client */
1.2.2.1 misho 448: }
449:
450: int
1.2.2.9 misho 451: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 452: {
453: struct tagSession *sess = (struct tagSession*) arg;
454:
455: ioTRACE(2);
456:
457: if (!sess)
458: return -1;
459:
1.2.2.5 misho 460: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 461:
1.2.2.1 misho 462: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
463: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 464:
1.2.2.10 misho 465: return -2; /* must terminate dispatcher */
1.2.2.1 misho 466: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>