![]() ![]() | ![]() |
change server to use new mqtt lib api calls
1: #include "global.h" 2: #include "rtlm.h" 3: #include "utils.h" 4: #include "mqttd.h" 5: #include "mqttd_calls.h" 6: 7: 8: static inline struct tagSession * 9: initSession(int sock, ait_val_t * __restrict v) 10: { 11: struct tagSession *sess = NULL; 12: const char *str; 13: 14: ioTRACE(5); 15: 16: if (!v) 17: return NULL; 18: 19: sess = io_malloc(sizeof(struct tagSession)); 20: if (!sess) { 21: ioSYSERR(0); 22: return NULL; 23: } else 24: memset(sess, 0, sizeof(struct tagSession)); 25: 26: SLIST_INIT(&sess->sess_subscr); 27: 28: str = cfg_getAttribute(&cfg, "mqttd", "retry"); 29: if (!str) 30: sess->sess_retry = DEFAULT_RETRY; 31: else 32: sess->sess_retry = strtol(str, NULL, 0); 33: 34: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX); 35: if (!sess->sess_buf) { 36: ioLIBERR(mqtt); 37: io_free(sess); 38: return NULL; 39: } 40: 41: /* init server actor */ 42: sess->sess_srv = mqtt_srv_cliInit(sock, sess->sess_buf, sess->sess_ka, 1); 43: if (!sess->sess_srv) { 44: ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 45: mqtt_msgFree(&sess->sess_buf, 42); 46: io_free(sess); 47: return NULL; 48: } else { 49: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT); 50: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH); 51: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL); 52: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE); 53: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE); 54: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ); 55: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT); 56: } 57: 58: sess->sess_sock = sock; 59: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr); 60: return sess; 61: } 62: 63: static void 64: finiSession(struct tagSession *sess) 65: { 66: struct tagStore *store; 67: 68: ioTRACE(5); 69: 70: if (!sess) 71: return; 72: 73: if (sess->sess_clean) { 74: if (call.FiniSessPUB) 75: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); 76: if (call.DeletePUB_subscribe) 77: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); 78: if (call.WipePUB_topic) 79: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); 80: } 81: 82: while ((store = SLIST_FIRST(&sess->sess_subscr))) { 83: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); 84: 85: if (store->st_subscr.sub_topic.msg_base) 86: free(store->st_subscr.sub_topic.msg_base); 87: if (store->st_subscr.sub_value.msg_base) 88: free(store->st_subscr.sub_value.msg_base); 89: 90: io_free(store); 91: } 92: 93: if (sess->sess_will.msg) 94: free(sess->sess_will.msg); 95: if (sess->sess_will.topic) 96: free(sess->sess_will.topic); 97: 98: if (sess->sess_sock > STDERR_FILENO) 99: srv_Close(sess->sess_sock); 100: 101: mqtt_srv_cliFini(&sess->sess_srv); 102: mqtt_msgFree(&sess->sess_buf, 42); 103: 104: io_free(sess); 105: } 106: 107: static void * 108: leaveClient(sched_task_t *task) 109: { 110: struct tagSession *sess; 111: int ret; 112: 113: ioTRACE(4); 114: 115: assert(task); 116: 117: sess = TASK_ARG(task); 118: assert(sess); 119: 120: TAILQ_REMOVE(&Sessions, sess, sess_node); 121: 122: ret = mqtt_msgDISCONNECT(sess->sess_buf); 123: send(TASK_FD(task), sess->sess_buf->msg_base, ret, MSG_NOSIGNAL); 124: 125: ioDEBUG(1, "Close socket=%ld", (long) TASK_FD(task)); 126: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 127: sess->sess_addr, sess->sess_user); 128: 129: finiSession(sess); 130: return NULL; 131: } 132: 133: static void * 134: dispatchSession(sched_task_t *task) 135: { 136: int ret, len = 0; 137: struct tagSession *sess; 138: 139: ioTRACE(2); 140: 141: assert(task); 142: 143: sess = TASK_ARG(task); 144: assert(sess); 145: 146: /* receive & decode packet */ 147: if ((ret = recv(TASK_FD(task), sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { 148: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); 149: TAILQ_REMOVE(&Sessions, sess, sess_node); 150: finiSession(sess); 151: return NULL; 152: } else if (!ret) { 153: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); 154: TAILQ_REMOVE(&Sessions, sess, sess_node); 155: finiSession(sess); 156: return NULL; 157: } 158: 159: do { 160: /* dispatch message type */ 161: if ((len = mqtt_srv_cliDispatch(sess->sess_srv, ret, sess)) < 0) { 162: if (len == -1) { 163: ioLIBERR(mqtt); 164: finiSession(sess); 165: } else if (len == -2) { 166: TAILQ_REMOVE(&Sessions, sess, sess_node); 167: finiSession(sess); 168: } else if (len == -3) 169: schedEvent(root, startSession, NULL, (u_long) TASK_FD(task), sess, ret); 170: } else 171: ret -= len; 172: } while (len > 0 && ret > 0); 173: 174: if (len >= 0 && !schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) { 175: ioLIBERR(sched); 176: TAILQ_REMOVE(&Sessions, sess, sess_node); 177: finiSession(sess); 178: } 179: return NULL; 180: } 181: 182: void * 183: startSession(sched_task_t *task) 184: { 185: u_char basebuf[USHRT_MAX]; 186: mqtt_msg_t buf = { basebuf, sizeof basebuf }; 187: mqtthdr_connflgs_t flg; 188: mqtthdr_connack_t cack; 189: ait_val_t *v; 190: struct tagSession *s, *sess = NULL; 191: int ret, wlen; 192: 193: ioTRACE(4); 194: 195: assert(task); 196: 197: if (!TASK_DATA(task)) { 198: v = TASK_ARG(task); 199: /* flow from accept new clients */ 200: sess = initSession(TASK_FD(task), v); 201: io_freeVar(&v); 202: if (!sess) { 203: close(TASK_FD(task)); 204: return NULL; 205: } 206: 207: /* receive & decode packet */ 208: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) { 209: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); 210: finiSession(sess); 211: return NULL; 212: } 213: } else { 214: sess = TASK_DATA(task); 215: assert(sess); 216: 217: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task); 218: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len); 219: } 220: 221: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid, 222: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass, 223: &sess->sess_will.topic, &sess->sess_will.msg); 224: ret = cack.retcode; 225: flg.flags = cack.reserved; 226: if (flg.reserved) { 227: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError()); 228: goto end; 229: } else { 230: sess->sess_clean = flg.clean_sess; 231: sess->sess_will.qos = flg.will_qos; 232: sess->sess_will.retain = flg.will_retain; 233: sess->sess_will.flag = flg.will_flg; 234: 235: sess->sess_srv->timeout = sess->sess_ka; 236: } 237: 238: /* check online table for user */ 239: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) { 240: ioDEBUG(0, "Login:: DENIED for username %s and password %s", 241: sess->sess_user, sess->sess_pass); 242: ret = MQTT_RETCODE_DENIED; 243: goto end; 244: } else { 245: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user); 246: ret = MQTT_RETCODE_ACCEPTED; 247: } 248: 249: /* db management */ 250: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) { 251: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid); 252: TAILQ_FOREACH(s, &Sessions, sess_node) 253: if (!strcmp(s->sess_cid, sess->sess_cid)) { 254: /* found stale session & disconnect it! */ 255: schedCancelby(root, taskMAX, CRITERIA_FD, (void*) s->sess_sock, NULL); 256: schedWrite(root, leaveClient, s, s->sess_sock, NULL, 0); 257: break; 258: } 259: } 260: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, 261: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, 262: sess->sess_will.qos, sess->sess_will.retain) == -1) { 263: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user); 264: ret = MQTT_RETCODE_DENIED; 265: goto end; 266: } else { 267: ioDEBUG(0, "Session %s from %s and username %s is started", 268: sess->sess_cid, sess->sess_addr, sess->sess_user); 269: ret = MQTT_RETCODE_ACCEPTED; 270: } 271: /* clean/load session if requested */ 272: if (sess->sess_clean) { 273: if (call.DeletePUB_subscribe) 274: call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%"); 275: if (call.WipePUB_topic) 276: call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1); 277: } else { 278: // TODO: read_sql subs and prepare publish 279: } 280: 281: /* Start session task OK ... */ 282: if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) { 283: ioLIBERR(sched); 284: ret = MQTT_RETCODE_DENIED; 285: } else 286: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); 287: 288: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, 289: sess->sess_addr, sess->sess_user, sess->sess_ka); 290: end: /* close client connection */ 291: wlen = mqtt_msgCONNACK(sess->sess_buf, ret); 292: if ((wlen = send(TASK_FD(task), sess->sess_buf->msg_base, wlen, 0)) == -1) { 293: ioDEBUG(3, "Error:: send(%ld) #%d - %s", (long) TASK_FD(task), errno, strerror(errno)); 294: } else 295: ioDEBUG(5, "Sended %d bytes", wlen); 296: 297: if (ret != MQTT_RETCODE_ACCEPTED) { 298: ioDEBUG(1, "Close client %s with socket=%ld", sess->sess_addr, (long) TASK_FD(task)); 299: finiSession(sess); 300: } 301: return NULL; 302: } 303: 304: static void * 305: acceptClient(sched_task_t *task) 306: { 307: int cli; 308: io_sockaddr_t sa; 309: socklen_t sslen = sizeof sa.ss; 310: ait_val_t *v; 311: char str[STRSIZ]; 312: 313: ioTRACE(4); 314: 315: assert(task); 316: 317: if ((cli = accept(TASK_FD(task), &sa.sa, &sslen)) == -1) 318: goto end; 319: 320: v = io_allocVar(); 321: if (!v) { 322: ioLIBERR(io); 323: close(cli); 324: goto end; 325: } else { 326: memset(str, 0, sizeof str); 327: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa)); 328: AIT_FREE_VAL(v); 329: AIT_SET_STR(v, str); 330: } 331: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v)); 332: 333: if (!schedRead(root, startSession, v, cli, NULL, 0)) { 334: io_freeVar(&v); 335: close(cli); 336: ioDEBUG(1, "Terminated client with socket=%d", cli); 337: } 338: end: 339: if (!schedRead(root, acceptClient, NULL, TASK_FD(task), NULL, 0)) 340: ioLIBERR(sched); 341: return NULL; 342: } 343: 344: /* ----------------------------------------------------------------------- */ 345: 346: int 347: Run(int sock) 348: { 349: struct tagSession *sess; 350: struct timespec pl = { 0, 100000000 }; 351: 352: ioTRACE(1); 353: 354: if (mqtt_srv_Listen(sock, 0, 1) == -1) { 355: ioLIBERR(mqtt); 356: return -1; 357: } 358: 359: /* state machine - accept new connections */ 360: if (!schedRead(root, acceptClient, NULL, sock, NULL, 0)) { 361: ioLIBERR(sched); 362: return -1; 363: } 364: 365: schedPolling(root, &pl, NULL); 366: schedRun(root, &Kill); 367: 368: /* free all undeleted elements into lists */ 369: TAILQ_FOREACH(sess, &Sessions, sess_node) { 370: TAILQ_REMOVE(&Sessions, sess, sess_node); 371: 372: finiSession(sess); 373: } 374: return 0; 375: }