Annotation of mqtt/src/daemon.c, revision 1.2.2.9

1.2       misho       1: #include "global.h"
                      2: #include "rtlm.h"
                      3: #include "utils.h"
                      4: #include "mqttd.h"
                      5: #include "mqttd_calls.h"
                      6: 
                      7: 
                      8: static void *startSession(sched_task_t *task);
                      9: 
                     10: 
                     11: static inline struct tagSession *
                     12: initSession(int sock, ait_val_t * __restrict v)
                     13: {
                     14:        struct tagSession *sess = NULL;
                     15:        const char *str;
                     16: 
                     17:        ioTRACE(5);
                     18: 
                     19:        if (!v)
                     20:                return NULL;
                     21: 
                     22:        sess = malloc(sizeof(struct tagSession));
                     23:        if (!sess) {
1.2.2.9 ! misho      24:                ioSYSERR(0);
1.2       misho      25:                io_freeVar(v);
                     26:                return NULL;
                     27:        } else
                     28:                memset(sess, 0, sizeof(struct tagSession));
                     29: 
1.2.2.4   misho      30:        pthread_mutex_init(&sess->sess_mtx, NULL);
                     31: 
1.2.2.9 ! misho      32:        SLIST_INIT(&sess->sess_subscr);
1.2       misho      33: 
1.2.2.5   misho      34:        str = cfg_getAttribute(&cfg, "mqttd", "retry");
1.2       misho      35:        if (!str)
                     36:                sess->sess_retry = DEFAULT_RETRY;
                     37:        else
                     38:                sess->sess_retry = strtol(str, NULL, 0);
                     39: 
1.2.2.9 ! misho      40:        if (!(sess->sess_root = schedBegin())) {
        !            41:                ioLIBERR(sched);
        !            42:                free(sess);
        !            43:                io_freeVar(v);
        !            44:                return NULL;
        !            45:        }
        !            46: 
1.2       misho      47:        sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
                     48:        if (!sess->sess_buf) {
1.2.2.9 ! misho      49:                ioLIBERR(mqtt);
        !            50:                schedEnd(&sess->sess_root);
1.2       misho      51:                free(sess);
                     52:                io_freeVar(v);
                     53:                return NULL;
                     54:        }
                     55: 
1.2.2.8   misho      56:        /* init server actor */
1.2.2.2   misho      57:        sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
                     58:        if (!sess->sess_srv) {
                     59:                ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                     60:                mqtt_msgFree(&sess->sess_buf, 42);
1.2.2.9 ! misho      61:                schedEnd(&sess->sess_root);
1.2.2.2   misho      62:                free(sess);
                     63:                io_freeVar(v);
                     64:                return NULL;
1.2.2.3   misho      65:        } else {
                     66:                mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT);
                     67:                mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH);
                     68:                mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL);
                     69:                mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE);
                     70:                mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE);
                     71:                mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ);
                     72:                mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT);
1.2.2.2   misho      73:        }
                     74: 
1.2       misho      75:        sess->sess_sock = sock;
                     76:        strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
                     77:        io_freeVar(v);
                     78:        return sess;
                     79: }
                     80: 
                     81: static void
1.2.2.9 ! misho      82: finiSession(struct tagSession *sess)
1.2       misho      83: {
                     84:        struct tagStore *store;
                     85: 
                     86:        ioTRACE(5);
                     87: 
                     88:        if (!sess)
                     89:                return;
                     90: 
                     91:        if (call.FiniSessPUB)
                     92:                call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
                     93: 
1.2.2.6   misho      94:        SESS_ELEM_LOCK(sess);
1.2       misho      95: 
1.2.2.9 ! misho      96:        while ((store = SLIST_FIRST(&sess->sess_subscr))) {
        !            97:                SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
        !            98: 
        !            99:                if (store->st_subscr.sub_topic.msg_base)
        !           100:                        free(store->st_subscr.sub_topic.msg_base);
        !           101:                if (store->st_subscr.sub_value.msg_base)
        !           102:                        free(store->st_subscr.sub_value.msg_base);
        !           103: 
        !           104:                free(store);
        !           105:        }
1.2.2.6   misho     106:        SESS_ELEM_UNLOCK(sess);
1.2.2.4   misho     107:        pthread_mutex_destroy(&sess->sess_mtx);
1.2       misho     108: 
1.2.2.9 ! misho     109:        schedEnd(&sess->sess_root);
        !           110: 
1.2       misho     111:        if (sess->sess_will.msg)
                    112:                free(sess->sess_will.msg);
                    113:        if (sess->sess_will.topic)
                    114:                free(sess->sess_will.topic);
                    115: 
1.2.2.9 ! misho     116:        if (sess->sess_sock > STDERR_FILENO)
1.2       misho     117:                srv_Close(sess->sess_sock);
                    118: 
1.2.2.2   misho     119:        mqtt_srv_Fini(&sess->sess_srv);
1.2       misho     120:        mqtt_msgFree(&sess->sess_buf, 42);
                    121: 
                    122:        free(sess);
                    123: }
                    124: 
                    125: static void
                    126: stopSession(struct tagSession *sess)
                    127: {
                    128:        mqtt_msg_t msg = { NULL, 0 };
                    129:        int ret;
                    130: 
                    131:        ioTRACE(4);
                    132: 
                    133:        assert(sess);
                    134: 
1.2.2.6   misho     135:        SESS_LOCK;
1.2       misho     136:        TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.6   misho     137:        SESS_UNLOCK;
1.2       misho     138: 
                    139:        ret = mqtt_msgDISCONNECT(&msg);
1.2.2.9 ! misho     140:        send(sess->sess_sock, msg.msg_base, ret, MSG_NOSIGNAL);
        !           141:        free(msg.msg_base);
1.2       misho     142: 
                    143:        ioDEBUG(1, "Close socket=%d", sess->sess_sock);
1.2.2.9 ! misho     144:        finiSession(sess);
1.2       misho     145: 
                    146:        call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                    147:                        sess->sess_addr, sess->sess_user);
                    148: }
                    149: 
                    150: static int
                    151: KASession(struct tagSession *sess)
                    152: {
                    153:        mqtt_msg_t msg = { NULL, 0 };
                    154:        int ret;
                    155:        struct pollfd pfd;
                    156: 
                    157:        ioTRACE(4);
                    158: 
                    159:        assert(sess);
                    160: 
                    161:        /* ping request */
                    162:        ret = mqtt_msgPINGREQ(&msg);
1.2.2.9 ! misho     163:        if ((ret = send(sess->sess_sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {
1.2       misho     164:                ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
                    165:                return -1;
                    166:        } else {
                    167:                ioDEBUG(5, "Sended %d bytes for ping request", ret);
                    168:                free(msg.msg_base);
                    169:                memset(&msg, 0, sizeof msg);
                    170:        }
                    171: 
                    172:        pfd.fd = sess->sess_sock;
                    173:        pfd.events = POLLIN | POLLPRI;
                    174:        if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 || 
                    175:                        pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                    176:                ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
                    177:                return -1;
                    178:        } else if (!ret) {
                    179:                ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
                    180:                return 1;
                    181:        }
                    182:        /* receive & decode packet */
                    183:        if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
                    184:                ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
                    185:                return -1;
                    186:        }
                    187:        if (mqtt_readPINGRESP(sess->sess_buf)) {
                    188:                ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
                    189:                return 2;
                    190:        }
                    191: 
                    192:        /* Keep Alive is OK! */
                    193:        return 0;
                    194: }
                    195: 
                    196: static void *
                    197: thrSession(struct tagSession *sess)
                    198: {
                    199:        int ret, locKill = 42;
                    200:        struct pollfd pfd;
                    201:        struct mqtthdr *hdr;
                    202:        ait_val_t *v;
                    203: 
                    204:        pthread_cleanup_push((void(*)(void*)) stopSession, sess);
                    205:        ioTRACE(2);
                    206: 
                    207:        pfd.fd = sess->sess_sock;
                    208:        pfd.events = POLLIN | POLLPRI;
                    209:        while (!Kill && locKill) {
                    210:                if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 || 
                    211:                                pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                    212:                        ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
                    213:                        break;
                    214:                } else if (!ret && (ret = KASession(sess))) {
                    215:                        call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n", 
                    216:                                        sess->sess_cid, sess->sess_addr, sess->sess_user);
                    217:                        break;
                    218:                }
                    219:                /* receive & decode packet */
                    220:                if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
                    221:                        ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
                    222:                        break;
                    223:                } else if (!ret) {
                    224:                        ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
                    225:                        break;
                    226:                } else
                    227:                        hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
                    228: 
                    229:                /* dispatch message type */
1.2.2.3   misho     230:                if (mqtt_srv_Dispatch(sess->sess_srv, sess))
                    231:                        ioLIBERR(mqtt);
1.2       misho     232:                switch (hdr->mqtt_msg.type) {
                    233:                        case MQTT_TYPE_CONNECT:
                    234:                                ioDEBUG(5, "Exec CONNECT session");
                    235:                                if ((v = io_allocVar())) {
                    236:                                        AIT_SET_STR(v, sess->sess_addr);
                    237:                                        if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
                    238:                                                io_freeVar(v);
                    239:                                } else
                    240:                                        ioLIBERR(mqtt);
                    241:                                locKill ^= locKill;
                    242:                                continue;
                    243:                        case MQTT_TYPE_DISCONNECT:
                    244:                                ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.9 ! misho     245:                                finiSession(sess);
1.2       misho     246:                                locKill ^= locKill;
                    247:                                continue;
                    248:                        case MQTT_TYPE_PUBLISH:
                    249:                                ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
1.2.2.3   misho     250:                                /*
                    251:                                if (cmdPUBLISH(sess))
1.2       misho     252:                                        locKill ^= locKill;
1.2.2.3   misho     253:                                        */
1.2       misho     254:                                break;
                    255:                        case MQTT_TYPE_PUBREL:
                    256:                                break;
                    257:                        case MQTT_TYPE_SUBSCRIBE:
                    258:                                break;
                    259:                        case MQTT_TYPE_UNSUBSCRIBE:
                    260:                                break;
                    261:                        case MQTT_TYPE_PINGREQ:
1.2.2.4   misho     262:                                ioDEBUG(5, "Exec PINGREQ session");
1.2       misho     263:                                break;
                    264:                        default:
                    265:                                ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", 
                    266:                                                sess->sess_cid, hdr->mqtt_msg.type);
                    267:                                break;
                    268:                }
                    269:        }
                    270: 
                    271:        pthread_cleanup_pop(locKill);
                    272:        pthread_exit(NULL);
                    273: }
                    274: 
                    275: static void *
                    276: startSession(sched_task_t *task)
                    277: {
                    278:        u_char basebuf[USHRT_MAX];
                    279:        mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
                    280:        mqtthdr_connflgs_t flg;
                    281:        mqtthdr_connack_t cack;
                    282:        struct tagSession *s, *sess = NULL;
                    283:        int ret;
                    284:        pthread_attr_t attr;
                    285: 
                    286:        ioTRACE(4);
                    287: 
                    288:        assert(task);
                    289: 
                    290:        if (!TASK_DATA(task)) {
1.2.2.8   misho     291:                /* flow from accept new clients */
1.2       misho     292:                sess = initSession(TASK_FD(task), TASK_ARG(task));
                    293:                if (!sess) {
                    294:                        io_freeVar(TASK_ARG(task));
                    295:                        close(TASK_FD(task));
                    296:                        return NULL;
                    297:                }
                    298: 
                    299:                /* receive & decode packet */
                    300:                if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
                    301:                        ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.2.2.9 ! misho     302:                        finiSession(sess);
1.2       misho     303:                        return NULL;
                    304:                }
                    305:        } else {
                    306:                sess = TASK_DATA(task);
                    307:                buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
                    308:                memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
                    309:        }
                    310: 
                    311:        cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid, 
                    312:                        sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass, 
                    313:                        &sess->sess_will.topic, &sess->sess_will.msg);
                    314:        ret = cack.retcode;
                    315:        flg.flags = cack.reserved;
                    316:        if (flg.reserved) {
                    317:                ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                    318:                goto end;
                    319:        } else {
                    320:                sess->sess_clean = flg.clean_sess;
                    321:                sess->sess_will.qos = flg.will_qos;
                    322:                sess->sess_will.retain = flg.will_retain;
                    323:                sess->sess_will.flag = flg.will_flg;
                    324:        }
                    325: 
                    326:        /* check online table for user */
                    327:        if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
                    328:                ioDEBUG(0, "Login:: DENIED for username %s and password %s", 
                    329:                                sess->sess_user, sess->sess_pass);
                    330:                ret = MQTT_RETCODE_DENIED;
                    331:                goto end;
                    332:        } else {
                    333:                ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
                    334:                ret = MQTT_RETCODE_ACCEPTED;
                    335:        }
                    336:        if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
                    337:                ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
                    338:                TAILQ_FOREACH(s, &Sessions, sess_node)
                    339:                        if (!strcmp(s->sess_cid, sess->sess_cid)) {
                    340:                                /* found stale session & disconnect it! */
                    341:                                stopSession(s);
                    342:                                break;
                    343:                        }
                    344:        }
                    345:        if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, 
                    346:                                sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, 
                    347:                                sess->sess_will.qos, sess->sess_will.retain) == -1) {
                    348:                ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
                    349:                ret = MQTT_RETCODE_DENIED;
                    350:                goto end;
                    351:        } else {
                    352:                ioDEBUG(0, "Session %s from %s and username %s is started", 
                    353:                                sess->sess_cid, sess->sess_addr, sess->sess_user);
                    354:                ret = MQTT_RETCODE_ACCEPTED;
                    355:        }
                    356: 
                    357:        ret = mqtt_msgCONNACK(&msg, ret);
                    358:        if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
                    359:                ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.2.2.9 ! misho     360:                finiSession(sess);
1.2       misho     361:                return NULL;
                    362:        } else {
                    363:                ioDEBUG(5, "Sended %d bytes", ret);
                    364:                free(msg.msg_base);
                    365:                memset(&msg, 0, sizeof msg);
                    366:        }
                    367: 
                    368:        /* Start session thread OK ... */
                    369:        pthread_attr_init(&attr);
                    370:        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
                    371: 
1.2.2.6   misho     372:        SESS_LOCK;
1.2       misho     373:        TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
                    374:        pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
1.2.2.6   misho     375:        SESS_UNLOCK;
1.2       misho     376: 
                    377:        call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, 
                    378:                        sess->sess_addr, sess->sess_user, sess->sess_ka);
                    379: 
                    380:        pthread_attr_destroy(&attr);
                    381:        return NULL;
                    382: end:   /* close client connection */
                    383:        ret = mqtt_msgCONNACK(&msg, ret);
                    384:        if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
                    385:                ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
                    386:        } else {
                    387:                ioDEBUG(5, "Sended %d bytes", ret);
                    388:                free(msg.msg_base);
                    389:                memset(&msg, 0, sizeof msg);
                    390:        }
                    391: 
                    392:        ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
1.2.2.9 ! misho     393:        finiSession(sess);
1.2       misho     394:        return NULL;
                    395: }
                    396: 
                    397: static void *
1.2.2.9 ! misho     398: acceptClient(sched_task_t *task)
1.2       misho     399: {
1.2.2.9 ! misho     400:        int cli;
        !           401:        io_sockaddr_t sa;
        !           402:        socklen_t sslen = sizeof sa.ss;
        !           403:        ait_val_t *v;
        !           404:        char str[STRSIZ];
1.2       misho     405: 
1.2.2.9 ! misho     406:        ioTRACE(4);
1.2       misho     407: 
1.2.2.9 ! misho     408:        assert(task);
1.2       misho     409: 
1.2.2.9 ! misho     410:        if ((cli = accept(TASK_FD(task), &sa.sa, &sslen)) == -1)
        !           411:                goto end;
        !           412:        else
        !           413:                fcntl(TASK_FD(task), F_SETFL, fcntl(TASK_FD(task), F_GETFL, 0) | O_NONBLOCK);
        !           414: 
        !           415:        v = io_allocVar();
        !           416:        if (!v) {
        !           417:                ioLIBERR(io);
        !           418:                close(cli);
        !           419:                goto end;
        !           420:        } else {
        !           421:                memset(str, 0, sizeof str);
        !           422:                snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
        !           423:                AIT_SET_STR(v, str);
        !           424:        }
        !           425:        ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
        !           426: 
        !           427:        if (!schedRead(root, startSession, v, cli, NULL, 0)) {
        !           428:                io_freeVar(v);
        !           429:                close(cli);
        !           430:                ioDEBUG(1, "Terminated client with socket=%d", cli);
        !           431:        }
        !           432: end:
        !           433:        schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0);
        !           434:        return NULL;
1.2       misho     435: }
                    436: 
1.2.2.9 ! misho     437: /* ----------------------------------------------------------------------- */
        !           438: 
1.2       misho     439: int
                    440: Run(int sock)
                    441: {
1.2.2.9 ! misho     442:        struct tagPub *pub;
        !           443:        struct timespec pl = { 0, 100000000 };
1.2       misho     444: 
                    445:        ioTRACE(1);
                    446: 
1.2.2.9 ! misho     447:        if (listen(sock, SOMAXCONN) == -1) {
1.2       misho     448:                ioSYSERR(0);
                    449:                return -1;
                    450:        } else
1.2.2.9 ! misho     451:                fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
1.2       misho     452: 
1.2.2.9 ! misho     453:        /* state machine - accept new connections */
        !           454:        if (!schedRead(root, acceptClient, NULL, sock, NULL, 0)) {
        !           455:                ioLIBERR(sched);
1.2       misho     456:                return -1;
                    457:        }
                    458: 
1.2.2.9 ! misho     459:        schedPolling(root, &pl, NULL);
        !           460:        schedRun(root, &Kill);
1.2       misho     461: 
1.2.2.9 ! misho     462:        /* free all undeleted elements into lists */
        !           463:        PUBS_LOCK;
        !           464:        TAILQ_FOREACH(pub, &Pubs, pub_node) {
        !           465:                TAILQ_REMOVE(&Pubs, pub, pub_node);
        !           466: 
        !           467:                AIT_FREE_VAL(&pub->pub_name);
        !           468:                if (pub->pub_packet.msg_base)
        !           469:                        free(pub->pub_packet.msg_base);
1.2       misho     470:        }
1.2.2.9 ! misho     471:        PUBS_UNLOCK;
1.2       misho     472:        return 0;
                    473: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>