![]() ![]() | ![]() |
added SIGINT for mqttd check QoS into mqttd
1: #include "global.h" 2: #include "mqttd.h" 3: #include "rtlm.h" 4: #include "mqttd_calls.h" 5: 6: 7: static inline ait_val_t * 8: mkPkt(void * __restrict data, int dlen) 9: { 10: ait_val_t *p = NULL; 11: 12: if (!(p = io_allocVar())) { 13: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError()); 14: return NULL; 15: } 16: 17: if (data && dlen > 0) 18: AIT_SET_BUF(p, data, dlen); 19: 20: return p; 21: } 22: 23: static inline void 24: freePkt(ait_val_t ** __restrict p) 25: { 26: if (!p) 27: return; 28: 29: io_freeVar(p); 30: } 31: 32: static void * 33: sendPacket(sched_task_t *task) 34: { 35: ait_val_t *p = TASK_ARG(task); 36: register int n, slen; 37: u_char *pos; 38: 39: if (!p || AIT_ISEMPTY(p)) { 40: ioDEBUG(9, "Error:: invalid packet or found empty content ..."); 41: return NULL; 42: } 43: 44: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) { 45: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL); 46: if (n == -1) { 47: ioSYSERR(0); 48: break; 49: } 50: } 51: 52: freePkt(&p); 53: return NULL; 54: } 55: 56: /* --------------------------------------------------- */ 57: 58: static int 59: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen) 60: { 61: ait_val_t *p = NULL; 62: struct tagSession *s = NULL; 63: struct tagStore *st = NULL; 64: regex_t re; 65: regmatch_t match; 66: int ret; 67: char szStr[STRSIZ]; 68: 69: TAILQ_FOREACH(s, &Sessions, sess_node) { 70: SLIST_FOREACH(st, &s->sess_subscr, st_node) { 71: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) { 72: regerror(ret, &re, szStr, sizeof szStr); 73: regfree(&re); 74: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*) 75: st->st_subscr.sub_topic.msg_base, szStr); 76: } 77: if (!regexec(&re, psTopic, 1, &match, 0)) { 78: /* MATCH */ 79: p = mkPkt(sess->sess_buf->msg_base, datlen); 80: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0); 81: } 82: 83: regfree(&re); 84: } 85: } 86: 87: return 0; 88: } 89: 90: static int 91: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen) 92: { 93: ait_val_t *p = NULL; 94: struct tagSession *s = NULL; 95: struct tagStore *st = NULL; 96: regex_t re; 97: regmatch_t match; 98: int ret; 99: char szStr[STRSIZ]; 100: struct mqtthdr *hdr; 101: 102: p = mkPkt(sess->sess_buf->msg_base, datlen); 103: hdr = (struct mqtthdr*) sess->sess_buf->msg_base; 104: 105: /* write topic to database */ 106: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 107: sess->sess_addr, hdr->mqtt_msg.retain); 108: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 109: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain); 110: 111: TAILQ_FOREACH(s, &Sessions, sess_node) { 112: SLIST_FOREACH(st, &s->sess_subscr, st_node) { 113: /* check for QoS */ 114: if (st->st_subscr.sub_ret < MQTT_QOS_ACK) 115: continue; 116: 117: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) { 118: regerror(ret, &re, szStr, sizeof szStr); 119: regfree(&re); 120: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*) 121: st->st_subscr.sub_topic.msg_base, szStr); 122: } 123: if (!regexec(&re, psTopic, 1, &match, 0)) { 124: /* MATCH */ 125: schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 126: s->sess_sock, NULL, 0); 127: } 128: 129: regfree(&re); 130: } 131: } 132: 133: /* delete not retain message */ 134: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 135: sess->sess_addr, 0); 136: 137: freePkt(&p); 138: return 0; 139: } 140: 141: static int 142: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen) 143: { 144: ait_val_t *p = NULL; 145: struct tagSession *s = NULL; 146: struct tagStore *st = NULL; 147: regex_t re; 148: regmatch_t match; 149: int ret; 150: char szStr[STRSIZ]; 151: struct mqtthdr *hdr; 152: 153: p = mkPkt(sess->sess_buf->msg_base, datlen); 154: hdr = (struct mqtthdr*) sess->sess_buf->msg_base; 155: 156: /* write topic to database */ 157: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user, 158: sess->sess_addr, hdr->mqtt_msg.retain); 159: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, AIT_GET_BUF(p), AIT_LEN(p), 160: sess->sess_user, sess->sess_addr, hdr->mqtt_msg.qos, hdr->mqtt_msg.retain); 161: 162: TAILQ_FOREACH(s, &Sessions, sess_node) { 163: SLIST_FOREACH(st, &s->sess_subscr, st_node) { 164: /* check for QoS */ 165: if (st->st_subscr.sub_ret < MQTT_QOS_EXACTLY) 166: continue; 167: 168: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) { 169: regerror(ret, &re, szStr, sizeof szStr); 170: regfree(&re); 171: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*) 172: st->st_subscr.sub_topic.msg_base, szStr); 173: } 174: if (!regexec(&re, psTopic, 1, &match, 0)) { 175: /* MATCH */ 176: schedWrite(root, sendPacket, mkPkt(AIT_GET_BUF(p), AIT_LEN(p)), 177: s->sess_sock, NULL, 0); 178: } 179: 180: regfree(&re); 181: } 182: } 183: 184: freePkt(&p); 185: return 0; 186: } 187: 188: 189: int 190: cmdPUBLISH(void *srv, int len, void *arg) 191: { 192: struct mqtthdr *hdr; 193: struct tagSession *sess = (struct tagSession*) arg; 194: char szTopic[STRSIZ] = { 0 }; 195: int siz = 0; 196: u_short mid = 0; 197: ait_val_t *p = NULL; 198: 199: ioTRACE(2); 200: 201: if (!sess) 202: return -1; 203: 204: ioDEBUG(5, "Exec PUBLISH session"); 205: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL); 206: if (siz == -1) { 207: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 208: return 0; 209: } 210: 211: hdr = (struct mqtthdr*) sess->sess_buf->msg_base; 212: switch (hdr->mqtt_msg.qos) { 213: case MQTT_QOS_ACK: 214: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr))) 215: return 0; 216: siz = mqtt_msgPUBACK(sess->sess_buf, mid); 217: if (siz == -1) { 218: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s", 219: mqtt_GetErrno(), mqtt_GetError()); 220: return 0; 221: } 222: break; 223: case MQTT_QOS_EXACTLY: 224: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr))) 225: return 0; 226: siz = mqtt_msgPUBREC(sess->sess_buf, mid); 227: if (siz == -1) { 228: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s", 229: mqtt_GetErrno(), mqtt_GetError()); 230: return 0; 231: } 232: break; 233: case MQTT_QOS_ONCE: 234: pubOnce(sess, szTopic, mqtt_pktLen(hdr)); 235: default: 236: return 0; 237: } 238: 239: p = mkPkt(sess->sess_buf->msg_base, siz); 240: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); 241: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); 242: return 0; 243: } 244: 245: int 246: cmdPUBREL(void *srv, int len, void *arg) 247: { 248: struct tagSession *sess = (struct tagSession*) arg; 249: int siz = 0; 250: u_short mid = 0; 251: ait_val_t *p = NULL; 252: 253: ioTRACE(2); 254: 255: if (!sess) 256: return -1; 257: 258: ioDEBUG(5, "Exec PUBREL session"); 259: mid = mqtt_readPUBREL(sess->sess_buf); 260: if (mid == (u_short) -1) { 261: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 262: return 0; 263: } 264: 265: // TODO:: Delete from database topic 266: 267: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid); 268: if (siz == -1) { 269: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 270: return 0; 271: } else { 272: p = mkPkt(sess->sess_buf->msg_base, siz); 273: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); 274: } 275: 276: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); 277: return 0; 278: } 279: 280: int 281: cmdSUBSCRIBE(void *srv, int len, void *arg) 282: { 283: struct tagSession *sess = (struct tagSession*) arg; 284: mqtt_subscr_t *subs = NULL; 285: int siz = 0; 286: u_short mid = 0; 287: register int i; 288: struct tagStore *store; 289: char buf[BUFSIZ]; 290: void *ptr; 291: ait_val_t *p = NULL; 292: 293: ioTRACE(2); 294: 295: if (!sess) 296: return -1; 297: 298: ioDEBUG(5, "Exec SUBSCRIBE session"); 299: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs); 300: if (siz == -1) { 301: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 302: return 0; 303: } 304: 305: /* add to db */ 306: for (i = 0; i < siz; i++, subs[i].sub_ret = MQTT_QOS_DENY) { 307: /* convert topic to sql search statement */ 308: if (mqtt_sqlTopic(subs[i].sub_topic.msg_base, buf, sizeof buf) == -1) { 309: ioDEBUG(5, "Error:: in db #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 310: continue; 311: } 312: if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf, 313: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) { 314: store = io_malloc(sizeof(struct tagStore)); 315: if (!store) { 316: ioSYSERR(0); 317: continue; 318: } else { 319: store->st_msgid = mid; 320: mqtt_subCopy(&store->st_subscr, &subs[i]); 321: } 322: 323: /* add to cache */ 324: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); 325: 326: /* convert topic to regexp */ 327: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) { 328: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 329: 330: subs[i].sub_ret = MQTT_QOS_DENY; 331: } else { 332: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1); 333: if (!ptr) 334: ioSYSERR(0); 335: else { 336: store->st_subscr.sub_topic.msg_base = ptr; 337: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1; 338: memcpy(store->st_subscr.sub_topic.msg_base, buf, 339: store->st_subscr.sub_topic.msg_len); 340: } 341: 342: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) from %s\n", sess->sess_cid, 343: store->st_subscr.sub_topic.msg_base, 344: store->st_subscr.sub_topic.msg_len, sess->sess_addr); 345: 346: subs[i].sub_ret = MQTT_QOS_PASS; 347: } 348: } 349: } 350: 351: /* send acknowledge */ 352: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid); 353: if (siz == -1) { 354: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 355: goto end; 356: } else { 357: p = mkPkt(sess->sess_buf->msg_base, siz); 358: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); 359: } 360: 361: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); 362: end: 363: mqtt_subFree(&subs); 364: return 0; 365: } 366: 367: int 368: cmdUNSUBSCRIBE(void *srv, int len, void *arg) 369: { 370: struct tagSession *sess = (struct tagSession*) arg; 371: mqtt_subscr_t *subs = NULL; 372: int siz = 0; 373: u_short mid = 0; 374: register int i; 375: struct tagStore *store, *tmp; 376: ait_val_t *p = NULL; 377: 378: ioTRACE(2); 379: 380: if (!sess) 381: return -1; 382: 383: ioDEBUG(5, "Exec UNSUBSCRIBE session"); 384: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs); 385: if (siz == -1) { 386: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 387: return 0; 388: } 389: 390: /* del from db */ 391: for (i = 0; i < siz; i++) { 392: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) { 393: if (store->st_subscr.sub_ret == subs[i].sub_ret && 394: store->st_subscr.sub_topic.msg_base && 395: !strcmp(store->st_subscr.sub_topic.msg_base, 396: subs[i].sub_topic.msg_base)) { 397: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node); 398: 399: if (store->st_subscr.sub_topic.msg_base) 400: free(store->st_subscr.sub_topic.msg_base); 401: if (store->st_subscr.sub_value.msg_base) 402: free(store->st_subscr.sub_value.msg_base); 403: io_free(store); 404: } 405: } 406: 407: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 408: sess->sess_user, "%"); 409: } 410: 411: /* send acknowledge */ 412: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid); 413: if (siz == -1) { 414: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 415: goto end; 416: } else { 417: p = mkPkt(sess->sess_buf->msg_base, siz); 418: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); 419: } 420: 421: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); 422: end: 423: mqtt_subFree(&subs); 424: return 0; 425: } 426: 427: int 428: cmdPINGREQ(void *srv, int len, void *arg) 429: { 430: struct tagSession *sess = (struct tagSession*) arg; 431: int siz = 0; 432: ait_val_t *p = NULL; 433: 434: ioTRACE(2); 435: 436: if (!sess) 437: return -1; 438: 439: ioDEBUG(5, "Exec PINGREQ session"); 440: siz = mqtt_msgPINGRESP(sess->sess_buf); 441: if (siz == -1) { 442: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 443: return 0; 444: } else { 445: p = mkPkt(sess->sess_buf->msg_base, siz); 446: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); 447: } 448: 449: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); 450: return 0; 451: } 452: 453: int 454: cmdCONNECT(void *srv, int len, void *arg) 455: { 456: struct tagStore *store; 457: struct tagSession *sess = (struct tagSession*) arg; 458: 459: ioTRACE(2); 460: 461: if (!sess) 462: return -1; 463: 464: ioDEBUG(5, "Exec CONNECT session"); 465: TAILQ_REMOVE(&Sessions, sess, sess_node); 466: 467: if (sess->sess_clean) { 468: if (call.FiniSessPUB) 469: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); 470: if (call.DeletePUB_subscribe) 471: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); 472: if (call.WipePUB_topic) 473: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); 474: } 475: 476: while ((store = SLIST_FIRST(&sess->sess_subscr))) { 477: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); 478: 479: if (store->st_subscr.sub_topic.msg_base) 480: free(store->st_subscr.sub_topic.msg_base); 481: if (store->st_subscr.sub_value.msg_base) 482: free(store->st_subscr.sub_value.msg_base); 483: 484: io_free(store); 485: } 486: 487: if (sess->sess_will.msg) 488: free(sess->sess_will.msg); 489: if (sess->sess_will.topic) 490: free(sess->sess_will.topic); 491: 492: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 493: sess->sess_addr, sess->sess_user); 494: 495: return -3; /* reconnect client */ 496: } 497: 498: int 499: cmdDISCONNECT(void *srv, int len, void *arg) 500: { 501: struct tagSession *sess = (struct tagSession*) arg; 502: 503: ioTRACE(2); 504: 505: if (!sess) 506: return -1; 507: 508: ioDEBUG(5, "Exec DISCONNECT session"); 509: 510: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 511: sess->sess_addr, sess->sess_user); 512: 513: return -2; /* must terminate dispatcher */ 514: }