Annotation of mqtt/src/mqttd_calls.c, revision 1.3
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.3 ! misho 3: #include "utils.h"
! 4: #include "rtlm.h"
1.2 misho 5: #include "mqttd_calls.h"
6:
7:
1.3 ! misho 8: static inline ait_val_t *
! 9: mkPkt(void * __restrict data, int dlen)
! 10: {
! 11: ait_val_t *p = NULL;
! 12:
! 13: if (!(p = io_allocVar())) {
! 14: ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());
! 15: return NULL;
! 16: }
! 17:
! 18: if (data && dlen > 0)
! 19: AIT_SET_BUF(p, data, dlen);
! 20:
! 21: return p;
! 22: }
! 23:
! 24: static inline void
! 25: freePkt(ait_val_t ** __restrict p)
! 26: {
! 27: if (!p)
! 28: return;
! 29:
! 30: io_freeVar(p);
! 31: }
! 32:
! 33: static void *
! 34: sendPacket(sched_task_t *task)
! 35: {
! 36: ait_val_t *p = TASK_ARG(task);
! 37: register int n, slen;
! 38: u_char *pos;
! 39:
! 40: if (!p || AIT_ISEMPTY(p)) {
! 41: ioDEBUG(9, "Error:: invalid packet or found empty content ...");
! 42: return NULL;
! 43: }
! 44:
! 45: ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task));
! 46:
! 47: for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
! 48: n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
! 49: if (n == -1) {
! 50: ioSYSERR(0);
! 51: break;
! 52: }
! 53: }
! 54:
! 55: freePkt(&p);
! 56: return NULL;
! 57: }
! 58:
! 59: static int
! 60: search4send(struct tagSession * __restrict sess, const char *topic, int datlen, char qos)
! 61: {
! 62: regex_t re;
! 63: regmatch_t match;
! 64: ait_val_t *p = NULL;
! 65: struct tagSession *s = NULL;
! 66: struct tagStore *st_, *st = NULL;
! 67: char szStr[STRSIZ];
! 68: int ret;
! 69:
! 70: assert(sess);
! 71:
! 72: TAILQ_FOREACH(s, &Sessions, sess_node) {
! 73: SLIST_FOREACH_SAFE(st, &s->sess_subscr, st_node, st_) {
! 74: /* check for QoS */
! 75: if (st->st_subscr.sub_ret >= qos) {
! 76: if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
! 77: regerror(ret, &re, szStr, sizeof szStr);
! 78: regfree(&re);
! 79: ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)
! 80: st->st_subscr.sub_topic.msg_base, szStr);
! 81: }
! 82: if (!regexec(&re, topic, 1, &match, 0)) {
! 83: /* MATCH */
! 84: p = mkPkt(sess->sess_buf->msg_base, datlen);
! 85: schedWrite(root, sendPacket, p, s->sess_sock, NULL, 0);
! 86: }
! 87:
! 88: regfree(&re);
! 89: }
! 90: }
! 91: }
! 92:
! 93: return 0;
! 94: }
! 95:
! 96: /* --------------------------------------------------- */
! 97:
! 98: void *
! 99: sendRetain(sched_task_t *task)
! 100: {
! 101: mqtt_subscr_t *subs, *s;
! 102: struct tagSession *sess;
! 103: int siz;
! 104:
! 105: ioTRACE(2);
! 106:
! 107: assert(task);
! 108:
! 109: sess = TASK_ARG(task);
! 110: assert(sess);
! 111:
! 112: if (!sess->sess_buf) {
! 113: ioDEBUG(9, "WARNING! No allocated buffer!?!\n");
! 114: return NULL;
! 115: }
! 116:
! 117: subs = call.ReadPUB_topic(&cfg, pub, "%", "%", 1);
! 118: if (!subs)
! 119: return NULL;
! 120:
! 121: for (s = subs; s && s->sub_topic.msg_base; s++) {
! 122: siz = s->sub_value.msg_len;
! 123: memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base,
! 124: MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
! 125: ioDEBUG(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n",
! 126: siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
! 127: if (siz > 0)
! 128: search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
! 129: }
! 130:
! 131: mqtt_subFree(&subs);
! 132: return NULL;
! 133: }
! 134:
1.2 misho 135: int
1.3 ! misho 136: pubWill(struct tagSession * __restrict sess)
! 137: {
! 138: int datlen;
! 139:
! 140: ioTRACE(2);
! 141:
! 142: /* prepare will packet */
! 143: datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0,
! 144: sess->sess_will.msg, sess->sess_will.msg ? strlen(sess->sess_will.msg) : 0);
! 145: if (datlen == -1)
! 146: return -1; /* error */
! 147:
! 148: return search4send(sess, sess->sess_will.topic, datlen, MQTT_QOS_ACK);
! 149: }
! 150:
! 151: static int
! 152: pubOnce(struct tagSession *sess, char * __restrict psTopic, int datlen)
! 153: {
! 154: return search4send(sess, psTopic, datlen, MQTT_QOS_ONCE);
! 155: }
! 156:
! 157: static int
! 158: pubAck(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
! 159: {
! 160: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
! 161:
! 162: /* write topic to database */
! 163: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
! 164: sess->sess_addr, hdr->mqtt_msg.retain);
! 165: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
! 166: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
! 167: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
! 168:
! 169: search4send(sess, psTopic, datlen, MQTT_QOS_ACK);
! 170:
! 171: /* delete not retain message */
! 172: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
! 173: sess->sess_addr, 0);
! 174: return 0;
! 175: }
! 176:
! 177: static int
! 178: pubExactly(struct tagSession *sess, u_short mid, char * __restrict psTopic, int datlen)
! 179: {
! 180: struct mqtthdr *hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
! 181:
! 182: /* write topic to database */
! 183: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic, sess->sess_user,
! 184: sess->sess_addr, hdr->mqtt_msg.retain);
! 185: call.WritePUB_topic(&cfg, pub, sess->sess_cid, mid, psTopic,
! 186: sess->sess_buf->msg_base, datlen, sess->sess_user, sess->sess_addr,
! 187: hdr->mqtt_msg.qos, hdr->mqtt_msg.retain);
! 188:
! 189: return search4send(sess, psTopic, datlen, MQTT_QOS_EXACTLY);
! 190: }
! 191:
! 192:
! 193: int
! 194: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 195: {
196: struct mqtthdr *hdr;
1.3 ! misho 197: struct tagSession *sess = (struct tagSession*) arg;
! 198: char szTopic[STRSIZ] = { 0 };
! 199: int siz = 0;
! 200: u_short mid = 0;
! 201: ait_val_t *p = NULL;
1.2 misho 202:
203: ioTRACE(2);
204:
205: if (!sess)
206: return -1;
207:
1.3 ! misho 208: ioDEBUG(5, "Exec PUBLISH session");
! 209: siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
! 210: if (siz == -1) {
! 211: ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 212: return 0;
! 213: }
! 214:
1.2 misho 215: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
216: switch (hdr->mqtt_msg.qos) {
217: case MQTT_QOS_ACK:
1.3 ! misho 218: if (pubAck(sess, mid, szTopic, mqtt_pktLen(hdr)))
! 219: return 0;
! 220: siz = mqtt_msgPUBACK(sess->sess_buf, mid);
! 221: if (siz == -1) {
! 222: ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",
! 223: mqtt_GetErrno(), mqtt_GetError());
! 224: return 0;
! 225: }
1.2 misho 226: break;
227: case MQTT_QOS_EXACTLY:
1.3 ! misho 228: if (pubExactly(sess, mid, szTopic, mqtt_pktLen(hdr)))
! 229: return 0;
! 230: siz = mqtt_msgPUBREC(sess->sess_buf, mid);
! 231: if (siz == -1) {
! 232: ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",
! 233: mqtt_GetErrno(), mqtt_GetError());
! 234: return 0;
! 235: }
1.2 misho 236: break;
1.3 ! misho 237: case MQTT_QOS_ONCE:
! 238: pubOnce(sess, szTopic, mqtt_pktLen(hdr));
1.2 misho 239: default:
240: return 0;
241: }
242:
1.3 ! misho 243: p = mkPkt(sess->sess_buf->msg_base, siz);
! 244: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 245: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
1.2 misho 246: return 0;
247: }
1.3 ! misho 248:
! 249: int
! 250: cmdPUBREL(void *srv, int len, void *arg)
! 251: {
! 252: struct tagSession *sess = (struct tagSession*) arg;
! 253: int siz = 0;
! 254: u_short mid = 0;
! 255: ait_val_t *p = NULL;
! 256:
! 257: ioTRACE(2);
! 258:
! 259: if (!sess)
! 260: return -1;
! 261:
! 262: ioDEBUG(5, "Exec PUBREL session");
! 263: mid = mqtt_readPUBREL(sess->sess_buf);
! 264: if (mid == (u_short) -1) {
! 265: ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 266: return 0;
! 267: }
! 268:
! 269: /* delete not retain message */
! 270: call.DeletePUB_topic(&cfg, pub, sess->sess_cid, mid, "%", sess->sess_user,
! 271: sess->sess_addr, 0);
! 272:
! 273: siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
! 274: if (siz == -1) {
! 275: ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 276: return 0;
! 277: }
! 278:
! 279: p = mkPkt(sess->sess_buf->msg_base, siz);
! 280: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 281: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
! 282: return 0;
! 283: }
! 284:
! 285: int
! 286: cmdSUBSCRIBE(void *srv, int len, void *arg)
! 287: {
! 288: struct tagSession *sess = (struct tagSession*) arg;
! 289: mqtt_subscr_t *subs = NULL;
! 290: int siz = 0;
! 291: u_short mid = 0;
! 292: register int i;
! 293: struct tagStore *store;
! 294: char buf[BUFSIZ];
! 295: void *ptr;
! 296: ait_val_t *p = NULL;
! 297:
! 298: ioTRACE(2);
! 299:
! 300: if (!sess)
! 301: return -1;
! 302:
! 303: ioDEBUG(5, "Exec SUBSCRIBE session");
! 304: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
! 305: if (siz == -1) {
! 306: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 307: return 0;
! 308: }
! 309:
! 310: /* add to db */
! 311: for (i = 0; i < siz; i++) {
! 312: store = io_malloc(sizeof(struct tagStore));
! 313: if (!store) {
! 314: ioSYSERR(0);
! 315: continue;
! 316: } else {
! 317: store->st_msgid = mid;
! 318: mqtt_subCopy(&store->st_subscr, &subs[i]);
! 319: subs[i].sub_ret = MQTT_QOS_DENY;
! 320: }
! 321:
! 322: /* add to cache */
! 323: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
! 324:
! 325: /* convert topic to regexp */
! 326: if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
! 327: ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 328: } else {
! 329: ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
! 330: if (!ptr) {
! 331: ioSYSERR(0);
! 332: continue;
! 333: } else {
! 334: store->st_subscr.sub_topic.msg_base = ptr;
! 335: store->st_subscr.sub_topic.msg_len = strlen(buf) + 1;
! 336: memcpy(store->st_subscr.sub_topic.msg_base, buf,
! 337: store->st_subscr.sub_topic.msg_len);
! 338: }
! 339:
! 340: /* store to db */
! 341: call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, buf,
! 342: sess->sess_user, sess->sess_addr, store->st_subscr.sub_ret);
! 343: /* subscribe pass */
! 344: subs[i].sub_ret = MQTT_QOS_PASS;
! 345:
! 346: call.LOG(logg, "Added [%s] SUBSCRIBE '%s'(%d) QoS=%d from %s\n", sess->sess_cid,
! 347: store->st_subscr.sub_topic.msg_base,
! 348: store->st_subscr.sub_topic.msg_len,
! 349: store->st_subscr.sub_ret, sess->sess_addr);
! 350: }
! 351: }
! 352:
! 353: /* send acknowledge */
! 354: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
! 355: if (siz == -1) {
! 356: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 357: goto end;
! 358: } else {
! 359: p = mkPkt(sess->sess_buf->msg_base, siz);
! 360: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 361: }
! 362:
! 363: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
! 364: end:
! 365: mqtt_subFree(&subs);
! 366: return 0;
! 367: }
! 368:
! 369: int
! 370: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
! 371: {
! 372: struct tagSession *sess = (struct tagSession*) arg;
! 373: mqtt_subscr_t *subs = NULL;
! 374: int siz = 0;
! 375: u_short mid = 0;
! 376: register int i;
! 377: struct tagStore *store, *tmp;
! 378: ait_val_t *p = NULL;
! 379:
! 380: ioTRACE(2);
! 381:
! 382: if (!sess)
! 383: return -1;
! 384:
! 385: ioDEBUG(5, "Exec UNSUBSCRIBE session");
! 386: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
! 387: if (siz == -1) {
! 388: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 389: return 0;
! 390: }
! 391:
! 392: /* del from db */
! 393: for (i = 0; i < siz; i++) {
! 394: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
! 395: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
! 396: store->st_subscr.sub_topic.msg_base &&
! 397: !strcmp(store->st_subscr.sub_topic.msg_base,
! 398: subs[i].sub_topic.msg_base)) {
! 399: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
! 400:
! 401: if (store->st_subscr.sub_topic.msg_base)
! 402: free(store->st_subscr.sub_topic.msg_base);
! 403: if (store->st_subscr.sub_value.msg_base)
! 404: free(store->st_subscr.sub_value.msg_base);
! 405: io_free(store);
! 406: }
! 407: }
! 408:
! 409: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base,
! 410: sess->sess_user, "%");
! 411: }
! 412:
! 413: /* send acknowledge */
! 414: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
! 415: if (siz == -1) {
! 416: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 417: goto end;
! 418: } else {
! 419: p = mkPkt(sess->sess_buf->msg_base, siz);
! 420: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 421: }
! 422:
! 423: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
! 424: end:
! 425: mqtt_subFree(&subs);
! 426: return 0;
! 427: }
! 428:
! 429: int
! 430: cmdPINGREQ(void *srv, int len, void *arg)
! 431: {
! 432: struct tagSession *sess = (struct tagSession*) arg;
! 433: int siz = 0;
! 434: ait_val_t *p = NULL;
! 435:
! 436: ioTRACE(2);
! 437:
! 438: if (!sess)
! 439: return -1;
! 440:
! 441: ioDEBUG(5, "Exec PINGREQ session");
! 442: siz = mqtt_msgPINGRESP(sess->sess_buf);
! 443: if (siz == -1) {
! 444: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 445: return 0;
! 446: } else {
! 447: p = mkPkt(sess->sess_buf->msg_base, siz);
! 448: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 449: }
! 450:
! 451: schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0);
! 452: return 0;
! 453: }
! 454:
! 455: int
! 456: cmdCONNECT(void *srv, int len, void *arg)
! 457: {
! 458: struct tagStore *store;
! 459: struct tagSession *sess = (struct tagSession*) arg;
! 460:
! 461: ioTRACE(2);
! 462:
! 463: if (!sess)
! 464: return -1;
! 465:
! 466: ioDEBUG(5, "Exec CONNECT session");
! 467: TAILQ_REMOVE(&Sessions, sess, sess_node);
! 468:
! 469: schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
! 470:
! 471: if (sess->sess_clean) {
! 472: if (call.FiniSessPUB)
! 473: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
! 474: if (call.DeletePUB_subscribe)
! 475: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
! 476: if (call.WipePUB_topic)
! 477: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
! 478: }
! 479:
! 480: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
! 481: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
! 482:
! 483: if (store->st_subscr.sub_topic.msg_base)
! 484: free(store->st_subscr.sub_topic.msg_base);
! 485: if (store->st_subscr.sub_value.msg_base)
! 486: free(store->st_subscr.sub_value.msg_base);
! 487:
! 488: io_free(store);
! 489: }
! 490:
! 491: if (sess->sess_will.flag)
! 492: pubWill(sess);
! 493:
! 494: if (sess->sess_will.msg)
! 495: free(sess->sess_will.msg);
! 496: if (sess->sess_will.topic)
! 497: free(sess->sess_will.topic);
! 498:
! 499: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
! 500: sess->sess_addr, sess->sess_user);
! 501:
! 502: return -3; /* reconnect client */
! 503: }
! 504:
! 505: int
! 506: cmdDISCONNECT(void *srv, int len, void *arg)
! 507: {
! 508: struct tagSession *sess = (struct tagSession*) arg;
! 509:
! 510: ioTRACE(2);
! 511:
! 512: if (!sess)
! 513: return -1;
! 514:
! 515: ioDEBUG(5, "Exec DISCONNECT session");
! 516:
! 517: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
! 518: sess->sess_addr, sess->sess_user);
! 519:
! 520: return -2; /* must terminate dispatcher */
! 521: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>