Annotation of mqtt/src/daemon.c, revision 1.2
1.2 ! misho 1: #include "global.h"
! 2: #include "rtlm.h"
! 3: #include "utils.h"
! 4: #include "mqttd.h"
! 5: #include "mqttd_calls.h"
! 6:
! 7:
! 8: static void *startSession(sched_task_t *task);
! 9:
! 10:
! 11: static inline struct tagSession *
! 12: initSession(int sock, ait_val_t * __restrict v)
! 13: {
! 14: struct tagSession *sess = NULL;
! 15: const char *str;
! 16:
! 17: ioTRACE(5);
! 18:
! 19: if (!v)
! 20: return NULL;
! 21:
! 22: sess = malloc(sizeof(struct tagSession));
! 23: if (!sess) {
! 24: ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
! 25: io_freeVar(v);
! 26: return NULL;
! 27: } else
! 28: memset(sess, 0, sizeof(struct tagSession));
! 29:
! 30: TAILQ_INIT(&sess->sess_sndqueue);
! 31:
! 32: str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
! 33: if (!str)
! 34: sess->sess_retry = DEFAULT_RETRY;
! 35: else
! 36: sess->sess_retry = strtol(str, NULL, 0);
! 37:
! 38: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
! 39: if (!sess->sess_buf) {
! 40: ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 41: free(sess);
! 42: io_freeVar(v);
! 43: return NULL;
! 44: }
! 45:
! 46: sess->sess_sock = sock;
! 47: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
! 48: io_freeVar(v);
! 49: return sess;
! 50: }
! 51:
! 52: static void
! 53: finiSession(struct tagSession *sess, int preservSock)
! 54: {
! 55: struct tagStore *store;
! 56:
! 57: ioTRACE(5);
! 58:
! 59: if (!sess)
! 60: return;
! 61:
! 62: if (call.FiniSessPUB)
! 63: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
! 64:
! 65: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
! 66: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
! 67:
! 68: if (store->st_subscr.sub_topic._base)
! 69: free(store->st_subscr.sub_topic._base);
! 70: if (store->st_subscr.sub_value._base)
! 71: free(store->st_subscr.sub_value._base);
! 72:
! 73: free(store);
! 74: }
! 75:
! 76: if (sess->sess_will.msg)
! 77: free(sess->sess_will.msg);
! 78: if (sess->sess_will.topic)
! 79: free(sess->sess_will.topic);
! 80:
! 81: if (sess->sess_sock > STDERR_FILENO && !preservSock)
! 82: srv_Close(sess->sess_sock);
! 83:
! 84: mqtt_msgFree(&sess->sess_buf, 42);
! 85:
! 86: free(sess);
! 87: }
! 88:
! 89: static void
! 90: stopSession(struct tagSession *sess)
! 91: {
! 92: mqtt_msg_t msg = { NULL, 0 };
! 93: int ret;
! 94:
! 95: ioTRACE(4);
! 96:
! 97: assert(sess);
! 98:
! 99: pthread_mutex_lock(&mtx_sess);
! 100: TAILQ_REMOVE(&Sessions, sess, sess_node);
! 101: pthread_mutex_unlock(&mtx_sess);
! 102:
! 103: ret = mqtt_msgDISCONNECT(&msg);
! 104: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
! 105: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 106: else {
! 107: ioDEBUG(5, "Sended %d bytes for disconnect", ret);
! 108: free(msg.msg_base);
! 109: memset(&msg, 0, sizeof msg);
! 110: }
! 111:
! 112: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
! 113: finiSession(sess, 0);
! 114:
! 115: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
! 116: sess->sess_addr, sess->sess_user);
! 117: }
! 118:
! 119: static int
! 120: KASession(struct tagSession *sess)
! 121: {
! 122: mqtt_msg_t msg = { NULL, 0 };
! 123: int ret;
! 124: struct pollfd pfd;
! 125:
! 126: ioTRACE(4);
! 127:
! 128: assert(sess);
! 129:
! 130: /* ping request */
! 131: ret = mqtt_msgPINGREQ(&msg);
! 132: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
! 133: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 134: return -1;
! 135: } else {
! 136: ioDEBUG(5, "Sended %d bytes for ping request", ret);
! 137: free(msg.msg_base);
! 138: memset(&msg, 0, sizeof msg);
! 139: }
! 140:
! 141: pfd.fd = sess->sess_sock;
! 142: pfd.events = POLLIN | POLLPRI;
! 143: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
! 144: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
! 145: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 146: return -1;
! 147: } else if (!ret) {
! 148: ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
! 149: return 1;
! 150: }
! 151: /* receive & decode packet */
! 152: if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
! 153: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 154: return -1;
! 155: }
! 156: if (mqtt_readPINGRESP(sess->sess_buf)) {
! 157: ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
! 158: return 2;
! 159: }
! 160:
! 161: /* Keep Alive is OK! */
! 162: return 0;
! 163: }
! 164:
! 165: static void *
! 166: thrSession(struct tagSession *sess)
! 167: {
! 168: mqtt_msg_t msg = { NULL, 0 };
! 169: int ret, locKill = 42;
! 170: struct pollfd pfd;
! 171: struct mqtthdr *hdr;
! 172: ait_val_t *v;
! 173: struct tagStore *store;
! 174:
! 175: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
! 176: ioTRACE(2);
! 177:
! 178: pfd.fd = sess->sess_sock;
! 179: pfd.events = POLLIN | POLLPRI;
! 180: while (!Kill && locKill) {
! 181: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
! 182: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
! 183: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 184: break;
! 185: } else if (!ret && (ret = KASession(sess))) {
! 186: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
! 187: sess->sess_cid, sess->sess_addr, sess->sess_user);
! 188: break;
! 189: }
! 190: /* receive & decode packet */
! 191: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
! 192: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 193: break;
! 194: } else if (!ret) {
! 195: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
! 196: break;
! 197: } else
! 198: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
! 199:
! 200: /* dispatch message type */
! 201: switch (hdr->mqtt_msg.type) {
! 202: case MQTT_TYPE_CONNECT:
! 203: ioDEBUG(5, "Exec CONNECT session");
! 204: pthread_mutex_lock(&mtx_sess);
! 205: TAILQ_REMOVE(&Sessions, sess, sess_node);
! 206: pthread_mutex_unlock(&mtx_sess);
! 207:
! 208: if (call.FiniSessPUB)
! 209: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
! 210:
! 211: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
! 212: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
! 213:
! 214: if (store->st_subscr.sub_topic._base)
! 215: free(store->st_subscr.sub_topic._base);
! 216: if (store->st_subscr.sub_value._base)
! 217: free(store->st_subscr.sub_value._base);
! 218:
! 219: free(store);
! 220: }
! 221:
! 222: if (sess->sess_will.msg)
! 223: free(sess->sess_will.msg);
! 224: if (sess->sess_will.topic)
! 225: free(sess->sess_will.topic);
! 226:
! 227: if ((v = io_allocVar())) {
! 228: AIT_SET_STR(v, sess->sess_addr);
! 229: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
! 230: io_freeVar(v);
! 231: } else
! 232: ioLIBERR(mqtt);
! 233:
! 234: locKill ^= locKill;
! 235:
! 236: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
! 237: sess->sess_addr, sess->sess_user);
! 238: continue;
! 239: case MQTT_TYPE_DISCONNECT:
! 240: ioDEBUG(5, "Exec DISCONNECT session");
! 241: pthread_mutex_lock(&mtx_sess);
! 242: TAILQ_REMOVE(&Sessions, sess, sess_node);
! 243: pthread_mutex_unlock(&mtx_sess);
! 244:
! 245: finiSession(sess, 0);
! 246: locKill ^= locKill;
! 247:
! 248: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
! 249: sess->sess_addr, sess->sess_user);
! 250: continue;
! 251: case MQTT_TYPE_PUBLISH:
! 252: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
! 253: if (Publish(sess))
! 254: locKill ^= locKill;
! 255: break;
! 256: case MQTT_TYPE_PUBREL:
! 257: break;
! 258: case MQTT_TYPE_SUBSCRIBE:
! 259: break;
! 260: case MQTT_TYPE_UNSUBSCRIBE:
! 261: break;
! 262: case MQTT_TYPE_PINGREQ:
! 263: break;
! 264: default:
! 265: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
! 266: sess->sess_cid, hdr->mqtt_msg.type);
! 267: break;
! 268: }
! 269: }
! 270:
! 271: pthread_cleanup_pop(locKill);
! 272: pthread_exit(NULL);
! 273: }
! 274:
! 275: static void *
! 276: startSession(sched_task_t *task)
! 277: {
! 278: u_char basebuf[USHRT_MAX];
! 279: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
! 280: mqtthdr_connflgs_t flg;
! 281: mqtthdr_connack_t cack;
! 282: struct tagSession *s, *sess = NULL;
! 283: int ret;
! 284: pthread_attr_t attr;
! 285:
! 286: ioTRACE(4);
! 287:
! 288: assert(task);
! 289:
! 290: if (!TASK_DATA(task)) {
! 291: sess = initSession(TASK_FD(task), TASK_ARG(task));
! 292: if (!sess) {
! 293: io_freeVar(TASK_ARG(task));
! 294: close(TASK_FD(task));
! 295: return NULL;
! 296: }
! 297:
! 298: /* receive & decode packet */
! 299: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
! 300: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 301: finiSession(sess, 0);
! 302: return NULL;
! 303: }
! 304: } else {
! 305: sess = TASK_DATA(task);
! 306: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
! 307: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
! 308: }
! 309:
! 310: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
! 311: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
! 312: &sess->sess_will.topic, &sess->sess_will.msg);
! 313: ret = cack.retcode;
! 314: flg.flags = cack.reserved;
! 315: if (flg.reserved) {
! 316: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 317: goto end;
! 318: } else {
! 319: sess->sess_clean = flg.clean_sess;
! 320: sess->sess_will.qos = flg.will_qos;
! 321: sess->sess_will.retain = flg.will_retain;
! 322: sess->sess_will.flag = flg.will_flg;
! 323: }
! 324:
! 325: /* check online table for user */
! 326: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
! 327: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
! 328: sess->sess_user, sess->sess_pass);
! 329: ret = MQTT_RETCODE_DENIED;
! 330: goto end;
! 331: } else {
! 332: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
! 333: ret = MQTT_RETCODE_ACCEPTED;
! 334: }
! 335: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
! 336: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
! 337: TAILQ_FOREACH(s, &Sessions, sess_node)
! 338: if (!strcmp(s->sess_cid, sess->sess_cid)) {
! 339: /* found stale session & disconnect it! */
! 340: stopSession(s);
! 341: break;
! 342: }
! 343: }
! 344: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
! 345: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
! 346: sess->sess_will.qos, sess->sess_will.retain) == -1) {
! 347: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
! 348: ret = MQTT_RETCODE_DENIED;
! 349: goto end;
! 350: } else {
! 351: ioDEBUG(0, "Session %s from %s and username %s is started",
! 352: sess->sess_cid, sess->sess_addr, sess->sess_user);
! 353: ret = MQTT_RETCODE_ACCEPTED;
! 354: }
! 355:
! 356: ret = mqtt_msgCONNACK(&msg, ret);
! 357: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
! 358: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 359: finiSession(sess, 0);
! 360: return NULL;
! 361: } else {
! 362: ioDEBUG(5, "Sended %d bytes", ret);
! 363: free(msg.msg_base);
! 364: memset(&msg, 0, sizeof msg);
! 365: }
! 366:
! 367: /* Start session thread OK ... */
! 368: pthread_attr_init(&attr);
! 369: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
! 370:
! 371: pthread_mutex_lock(&mtx_sess);
! 372: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
! 373: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
! 374: pthread_mutex_unlock(&mtx_sess);
! 375:
! 376: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
! 377: sess->sess_addr, sess->sess_user, sess->sess_ka);
! 378:
! 379: pthread_attr_destroy(&attr);
! 380: return NULL;
! 381: end: /* close client connection */
! 382: ret = mqtt_msgCONNACK(&msg, ret);
! 383: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
! 384: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
! 385: } else {
! 386: ioDEBUG(5, "Sended %d bytes", ret);
! 387: free(msg.msg_base);
! 388: memset(&msg, 0, sizeof msg);
! 389: }
! 390:
! 391: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
! 392: finiSession(sess, 0);
! 393: return NULL;
! 394: }
! 395:
! 396: /* ----------------------------------------------------------------------- */
! 397:
! 398: static void *
! 399: thrSched(void *arg __unused)
! 400: {
! 401: struct tagSession *sess;
! 402: struct timespec pl = { 0, 10000000 };
! 403:
! 404: ioTRACE(1);
! 405:
! 406: schedPolling(root, &pl, NULL);
! 407: schedRun(root, &Kill);
! 408:
! 409: TAILQ_FOREACH(sess, &Sessions, sess_node)
! 410: if (sess->sess_tid)
! 411: pthread_cancel(sess->sess_tid);
! 412: ioDEBUG(5, "EXIT from Scheduler thread !!!");
! 413: pthread_exit(NULL);
! 414: }
! 415:
! 416: int
! 417: Run(int sock)
! 418: {
! 419: io_sockaddr_t sa;
! 420: socklen_t sslen = sizeof sa.ss;
! 421: int cli;
! 422: pthread_t tid;
! 423: ait_val_t *v;
! 424: char str[STRSIZ];
! 425:
! 426: ioTRACE(1);
! 427:
! 428: if (pthread_create(&tid, NULL, thrSched, NULL)) {
! 429: ioSYSERR(0);
! 430: return -1;
! 431: } else
! 432: pthread_detach(tid);
! 433: ioDEBUG(2, "Run scheduler management thread");
! 434:
! 435: if (listen(sock, SOMAXCONN) == -1) {
! 436: ioSYSERR(0);
! 437: return -1;
! 438: }
! 439:
! 440: while (!Kill) {
! 441: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
! 442: if (!Kill)
! 443: continue;
! 444: ioSYSERR(0);
! 445: break;
! 446: }
! 447: v = io_allocVar();
! 448: if (!v) {
! 449: ioLIBERR(mqtt);
! 450: break;
! 451: } else {
! 452: memset(str, 0, sizeof str);
! 453: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
! 454: AIT_SET_STR(v, str);
! 455: }
! 456: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
! 457:
! 458: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
! 459: io_freeVar(v);
! 460: close(cli);
! 461: ioDEBUG(1, "Terminated client with socket=%d", cli);
! 462: }
! 463: }
! 464:
! 465: return 0;
! 466: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>