Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.33
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.32 misho 3: #include "utils.h"
1.2.2.1 misho 4: #include "rtlm.h"
1.2 misho 5: #include "mqttd_calls.h"
6:
7:
1.2.2.26 misho 8: static inline ait_val_t *
1.2.2.19 misho 9: mkPkt(void * __restrict data, int dlen)
10: {
1.2.2.26 misho 11: ait_val_t *p = NULL;
1.2.2.19 misho 12:
1.2.2.26 misho 13: if (!(p = io_allocVar())) {
1.2.2.19 misho 14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
15: return NULL;
16: }
17:
18: if (data && dlen > 0)
1.2.2.26 misho 19: AIT_SET_BUF(p, data, dlen);
1.2.2.19 misho 20:
21: return p;
22: }
23:
24: static inline void
1.2.2.26 misho 25: freePkt(ait_val_t ** __restrict p)
1.2.2.19 misho 26: {
1.2.2.20 misho 27: if (!p)
1.2.2.19 misho 28: return;
29:
1.2.2.26 misho 30: io_freeVar(p);
1.2.2.19 misho 31: }
32:
33: static void *
34: sendPacket(sched_task_t *task)
35: {
1.2.2.26 misho 36: ait_val_t *p = TASK_ARG(task);
1.2.2.19 misho 37: register int n, slen;
38: u_char *pos;
39:
1.2.2.26 misho 40: if (!p || AIT_ISEMPTY(p)) {
1.2.2.19 misho 41: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
42: return NULL;
43: }
44:
1.2.2.33! misho 45: ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), TASK_FD(task));
! 46:
1.2.2.26 misho 47: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
1.2.2.19 misho 48: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
49: if (n == -1) {
50: ioSYSERR(0);
51: break;
52: }
53: }
54:
1.2.2.26 misho 55: freePkt(&p);
1.2.2.19 misho 56: return NULL;
57: }
58:
1.2.2.17 misho 59: static int
1.2.2.33! misho 60: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
1.2.2.17 misho 61: {
1.2.2.33! misho 62: regex_t re;
! 63: regmatch_t match;
1.2.2.26 misho 64: ait_val_t *p = NULL;
1.2.2.22 misho 65: struct tagSession *s = NULL;
1.2.2.30 misho 66: struct tagStore *st_, *st = NULL;
1.2.2.22 misho 67: char szStr[STRSIZ];
1.2.2.33! misho 68: int ret;
1.2.2.22 misho 69:
1.2.2.33! misho 70: assert(sess);
1.2.2.30 misho 71:
1.2.2.22 misho 72: TAILQ_FOREACH(s, &Sessions, sess_node) {
1.2.2.30 misho 73: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
1.2.2.33! misho 74: /* check for QoS */
! 75: if (st->st_subscr.sub_ret >= qos) {
! 76: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
! 77: regerror(ret, &re, szStr, sizeof szStr);
! 78: regfree(&re);
! 79: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
! 80: st->st_subscr.sub_topic.msg_base, szStr);
! 81: }
! 82: if (!regexec(&re, topic, 1, &match, 0)) {
! 83: /* MATCH */
! 84: p = mkPkt(sess->sess_buf->msg_base, datlen);
! 85: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
! 86: }
! 87:
1.2.2.22 misho 88: regfree(&re);
89: }
90: }
91: }
92:
1.2.2.17 misho 93: return 0;
94: }
95:
1.2.2.33! misho 96: /* --------------------------------------------------- */
! 97:
! 98: int
! 99: pubWill(struct tagSession * __restrict sess)
! 100: {
! 101: int datlen;
! 102:
! 103: ioTRACE(2);
! 104:
! 105: /* prepare will packet */
! 106: datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0,
! 107: sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
! 108: if (datlen == -1)
! 109: return -1; /* error */
! 110:
! 111: return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
! 112: }
! 113:
1.2.2.17 misho 114: static int
1.2.2.33! misho 115: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
1.2.2.17 misho 116: {
1.2.2.33! misho 117: return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
! 118: }
1.2.2.26 misho 119:
1.2.2.33! misho 120: static int
! 121: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
! 122: {
! 123: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
1.2.2.26 misho 124:
125: /* write topic to database */
1.2.2.27 misho 126: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
127: sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.33! misho 128: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
! 129: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
! 130: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.26 misho 131:
1.2.2.33! misho 132: search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
1.2.2.26 misho 133:
1.2.2.28 misho 134: /* delete not retain message */
1.2.2.27 misho 135: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
136: sess->sess_addr, 0);
1.2.2.17 misho 137: return 0;
138: }
139:
140: static int
1.2.2.26 misho 141: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
1.2.2.17 misho 142: {
1.2.2.33! misho 143: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
1.2.2.28 misho 144:
145: /* write topic to database */
146: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
147: sess->sess_addr, hdr->mqtt_msg.retain);
1.2.2.33! misho 148: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
! 149: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
! 150: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
1.2.2.28 misho 151:
1.2.2.33! misho 152: return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
1.2.2.17 misho 153: }
154:
155:
1.2 misho 156: int
1.2.2.9 misho 157: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 158: {
159: struct mqtthdr *hdr;
1.2.2.1 misho 160: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 161: char szTopic[STRSIZ] = { 0 };
162: int siz = 0;
163: u_short mid = 0;
1.2.2.26 misho 164: ait_val_t *p = NULL;
1.2 misho 165:
166: ioTRACE(2);
167:
168: if (!sess)
169: return -1;
170:
1.2.2.17 misho 171: ioDEBUG(5, "Exec PUBLISH session");
1.2.2.22 misho 172: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
1.2.2.17 misho 173: if (siz == -1) {
174: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
175: return 0;
176: }
177:
1.2 misho 178: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
179: switch (hdr->mqtt_msg.qos) {
180: case MQTT_QOS_ACK:
1.2.2.26 misho 181: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
182: return 0;
1.2.2.17 misho 183: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
184: if (siz == -1) {
185: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
186: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 187: return 0;
1.2.2.17 misho 188: }
1.2 misho 189: break;
190: case MQTT_QOS_EXACTLY:
1.2.2.26 misho 191: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
192: return 0;
1.2.2.17 misho 193: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
194: if (siz == -1) {
195: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
196: mqtt_GetErrno(), mqtt_GetError());
1.2.2.22 misho 197: return 0;
1.2.2.17 misho 198: }
1.2 misho 199: break;
1.2.2.17 misho 200: case MQTT_QOS_ONCE:
1.2.2.23 misho 201: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2 misho 202: default:
1.2.2.22 misho 203: return 0;
1.2 misho 204: }
205:
1.2.2.22 misho 206: p = mkPkt(sess->sess_buf->msg_base, siz);
207: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
208: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2 misho 209: return 0;
210: }
1.2.2.1 misho 211:
212: int
1.2.2.9 misho 213: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 214: {
215: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.17 misho 216: int siz = 0;
217: u_short mid = 0;
1.2.2.26 misho 218: ait_val_t *p = NULL;
1.2.2.1 misho 219:
220: ioTRACE(2);
221:
222: if (!sess)
223: return -1;
224:
1.2.2.17 misho 225: ioDEBUG(5, "Exec PUBREL session");
226: mid = mqtt_readPUBREL(sess->sess_buf);
227: if (mid == (u_short) -1) {
228: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
229: return 0;
230: }
231:
1.2.2.29 misho 232: /* delete not retain message */
233: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user,
234: sess->sess_addr, 0);
1.2.2.17 misho 235:
236: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
237: if (siz == -1) {
238: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
239: return 0;
240: }
1.2.2.1 misho 241:
1.2.2.29 misho 242: p = mkPkt(sess->sess_buf->msg_base, siz);
243: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
1.2.2.22 misho 244: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 245: return 0;
246: }
247:
248: int
1.2.2.9 misho 249: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 250: {
251: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 252: mqtt_subscr_t *subs = NULL;
253: int siz = 0;
254: u_short mid = 0;
255: register int i;
256: struct tagStore *store;
1.2.2.17 misho 257: char buf[BUFSIZ];
258: void *ptr;
1.2.2.26 misho 259: ait_val_t *p = NULL;
1.2.2.1 misho 260:
261: ioTRACE(2);
262:
263: if (!sess)
264: return -1;
265:
1.2.2.7 misho 266: ioDEBUG(5, "Exec SUBSCRIBE session");
267: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
268: if (siz == -1) {
269: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
270: return 0;
271: }
272:
273: /* add to db */
1.2.2.33! misho 274: for (i = 0; i < siz; i++) {
1.2.2.31 misho 275: store = io_malloc(sizeof(struct tagStore));
276: if (!store) {
277: ioSYSERR(0);
1.2.2.25 misho 278: continue;
1.2.2.31 misho 279: } else {
280: store->st_msgid = mid;
281: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.33! misho 282: subs[i].sub_ret = MQTT_QOS_DENY;
1.2.2.17 misho 283: }
1.2.2.31 misho 284:
285: /* add to cache */
286: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
287:
288: /* convert topic to regexp */
289: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
290: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
291: } else {
292: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
293: if (!ptr) {
1.2.2.7 misho 294: ioSYSERR(0);
1.2.2.25 misho 295: continue;
1.2.2.7 misho 296: } else {
1.2.2.31 misho 297: store->st_subscr.sub_topic.msg_base = ptr;
298: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
299: memcpy(store->st_subscr.sub_topic.msg_base, buf,
300: store->st_subscr.sub_topic.msg_len);
1.2.2.7 misho 301: }
302:
1.2.2.33! misho 303: /* store to db */
! 304: call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
! 305: sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
! 306: /* subscribe pass */
1.2.2.31 misho 307: subs[i].sub_ret = MQTT_QOS_PASS;
308:
1.2.2.33! misho 309: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid,
! 310: store->st_subscr.sub_topic.msg_base,
! 311: store->st_subscr.sub_topic.msg_len,
! 312: store->st_subscr.sub_ret, sess->sess_addr);
! 313: }
1.2.2.7 misho 314: }
1.2.2.1 misho 315:
1.2.2.7 misho 316: /* send acknowledge */
317: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
318: if (siz == -1) {
319: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
320: goto end;
1.2.2.21 misho 321: } else {
322: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 323: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
324: }
1.2.2.21 misho 325:
326: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.7 misho 327: end:
328: mqtt_subFree(&subs);
1.2.2.1 misho 329: return 0;
330: }
331:
332: int
1.2.2.9 misho 333: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 334: {
335: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13 misho 336: mqtt_subscr_t *subs = NULL;
337: int siz = 0;
338: u_short mid = 0;
339: register int i;
340: struct tagStore *store, *tmp;
1.2.2.26 misho 341: ait_val_t *p = NULL;
1.2.2.1 misho 342:
343: ioTRACE(2);
344:
345: if (!sess)
346: return -1;
347:
1.2.2.13 misho 348: ioDEBUG(5, "Exec UNSUBSCRIBE session");
349: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
350: if (siz == -1) {
351: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
352: return 0;
353: }
354:
355: /* del from db */
356: for (i = 0; i < siz; i++) {
357: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
358: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
359: store->st_subscr.sub_topic.msg_base &&
360: !strcmp(store->st_subscr.sub_topic.msg_base,
361: subs[i].sub_topic.msg_base)) {
362: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
1.2.2.14 misho 363:
364: if (store->st_subscr.sub_topic.msg_base)
365: free(store->st_subscr.sub_topic.msg_base);
366: if (store->st_subscr.sub_value.msg_base)
367: free(store->st_subscr.sub_value.msg_base);
1.2.2.18 misho 368: io_free(store);
1.2.2.13 misho 369: }
370: }
371:
1.2.2.15 misho 372: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
373: sess->sess_user, "%");
1.2.2.13 misho 374: }
375:
376: /* send acknowledge */
377: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
378: if (siz == -1) {
379: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
380: goto end;
1.2.2.21 misho 381: } else {
382: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.13 misho 383: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
384: }
1.2.2.21 misho 385:
386: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.13 misho 387: end:
388: mqtt_subFree(&subs);
1.2.2.1 misho 389: return 0;
390: }
391:
392: int
1.2.2.9 misho 393: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 394: {
395: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 396: int siz = 0;
1.2.2.26 misho 397: ait_val_t *p = NULL;
1.2.2.1 misho 398:
399: ioTRACE(2);
400:
401: if (!sess)
402: return -1;
403:
1.2.2.7 misho 404: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 405: siz = mqtt_msgPINGRESP(sess->sess_buf);
406: if (siz == -1) {
407: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
408: return 0;
1.2.2.8 misho 409: } else {
1.2.2.19 misho 410: p = mkPkt(sess->sess_buf->msg_base, siz);
1.2.2.8 misho 411: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
412: }
1.2.2.1 misho 413:
1.2.2.19 misho 414: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2.2.1 misho 415: return 0;
416: }
417:
418: int
1.2.2.9 misho 419: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 420: {
421: struct tagStore *store;
422: struct tagSession *sess = (struct tagSession*) arg;
423:
424: ioTRACE(2);
425:
426: if (!sess)
427: return -1;
428:
1.2.2.6 misho 429: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 430: TAILQ_REMOVE(&Sessions, sess, sess_node);
431:
1.2.2.16 misho 432: if (sess->sess_clean) {
433: if (call.FiniSessPUB)
434: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
435: if (call.DeletePUB_subscribe)
436: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
437: if (call.WipePUB_topic)
438: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
439: }
1.2.2.1 misho 440:
1.2.2.6 misho 441: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
442: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 443:
1.2.2.6 misho 444: if (store->st_subscr.sub_topic.msg_base)
445: free(store->st_subscr.sub_topic.msg_base);
446: if (store->st_subscr.sub_value.msg_base)
447: free(store->st_subscr.sub_value.msg_base);
448:
1.2.2.18 misho 449: io_free(store);
1.2.2.6 misho 450: }
1.2.2.1 misho 451:
1.2.2.32 misho 452: if (sess->sess_will.flag)
1.2.2.33! misho 453: pubWill(sess);
1.2.2.32 misho 454:
1.2.2.1 misho 455: if (sess->sess_will.msg)
456: free(sess->sess_will.msg);
457: if (sess->sess_will.topic)
458: free(sess->sess_will.topic);
459:
460: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
461: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 462:
1.2.2.12 misho 463: return -3; /* reconnect client */
1.2.2.1 misho 464: }
465:
466: int
1.2.2.9 misho 467: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 468: {
469: struct tagSession *sess = (struct tagSession*) arg;
470:
471: ioTRACE(2);
472:
473: if (!sess)
474: return -1;
475:
1.2.2.5 misho 476: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 477:
1.2.2.1 misho 478: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
479: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 480:
1.2.2.10 misho 481: return -2; /* must terminate dispatcher */
1.2.2.1 misho 482: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>