Annotation of mqtt/src/daemon.c, revision 1.1.2.13

1.1.2.1   misho       1: #include "global.h"
1.1.2.11  misho       2: #include "mqttd.h"
1.1.2.1   misho       3: 
                      4: 
1.1.2.7   misho       5: extern char cliCmd[], *cliStr[];
                      6: 
                      7: 
1.1.2.5   misho       8: static void *
                      9: startSession(sched_task_t *task)
1.1.2.4   misho      10: {
1.1.2.11  misho      11:        u_char basebuf[USHRT_MAX];
1.1.2.6   misho      12:        mqtt_cb_t cbs[MQTT_TYPE_MAX + 1] = { 0 };
1.1.2.12  misho      13:        mqtt_msg_t buf = { basebuf, sizeof basebuf };
1.1.2.10  misho      14:        mqtthdr_connflgs_t flg;
1.1.2.13! misho      15:        mqtthdr_connack_t cack;
1.1.2.9   misho      16:        int ret = 0;
1.1.2.10  misho      17:        struct timeval tv = { 0 };
                     18:        u_short ka;
1.1.2.12  misho      19:        struct tagSession *sess = NULL;
1.1.2.6   misho      20: 
                     21:        FTRACE(4);
                     22: 
1.1.2.12  misho      23:        sess = malloc(sizeof(struct tagSession));
                     24:        if (!sess) {
                     25:                VERB(3) syslog(LOG_ERR, "Error:: in malloc #%d - %s", errno, strerror(errno));
1.1.2.6   misho      26:                goto end;
                     27:        }
                     28: 
1.1.2.12  misho      29:        if (recv(TASK_FD(task), buf.msg_base, buf.msg_len, 0) == -1) {
1.1.2.8   misho      30:                VERB(3) syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", (int) TASK_FD(task), 
1.1.2.6   misho      31:                                errno, strerror(errno));
                     32:                goto end;
1.1.2.9   misho      33:        }
1.1.2.13! misho      34:        cack = mqtt_readCONNECT(&buf, &ka, sess->sess_cid, sizeof sess->sess_cid, 
1.1.2.12  misho      35:                        sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass, 
1.1.2.13! misho      36:                        &sess->sess_will.topic, &sess->sess_will.msg);
        !            37:        if (cack.reserved) {
        !            38:                VERB(3) syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.1.2.10  misho      39:                goto end;
1.1.2.12  misho      40:        }
1.1.2.6   misho      41: 
1.1.2.8   misho      42:        /* check online table for user */
1.1.2.6   misho      43: 
                     44: end:   /* close client connection */
1.1.2.12  misho      45:        if (sess)
                     46:                free(sess);
1.1.2.5   misho      47:        close(TASK_FD(task));
                     48:        VERB(1) syslog(LOG_DEBUG, "Close client %s with socket=%d", 
                     49:                        (char*) TASK_ARG(task), (int) TASK_FD(task));
                     50:        if (TASK_ARG(task))
                     51:                free(TASK_ARG(task));
1.1.2.6   misho      52:        return NULL;
1.1.2.4   misho      53: }
                     54: 
                     55: /* ----------------------------------------------------------------------- */
                     56: 
1.1.2.5   misho      57: static void *
                     58: thrSched(void *arg __unused)
                     59: {
                     60:        FTRACE(1);
                     61: 
                     62:        schedRun(root, (intptr_t*) &Kill);
                     63:        pthread_exit(NULL);
                     64: }
                     65: 
1.1.2.2   misho      66: int
1.1.2.3   misho      67: Run(int sock)
1.1.2.2   misho      68: {
1.1.2.4   misho      69:        io_sockaddr_t sa;
                     70:        socklen_t sslen = sizeof sa.ss;
                     71:        int cli;
1.1.2.5   misho      72:        char *str = NULL, szAddr[STRSIZ] = { 0 };
                     73:        pthread_t tid;
1.1.2.4   misho      74: 
1.1.2.2   misho      75:        FTRACE(1);
                     76: 
1.1.2.5   misho      77:        if (pthread_create(&tid, NULL, thrSched, NULL)) {
                     78:                syslog(LOG_ERR, "Error:: thread scheduler #%d - %s", errno, strerror(errno));
                     79:                return -1;
                     80:        } else
                     81:                pthread_detach(tid);
                     82:        VERB(2) syslog(LOG_DEBUG, "Run scheduler management thread");
                     83: 
1.1.2.3   misho      84:        if (listen(sock, SOMAXCONN) == -1) {
1.1.2.4   misho      85:                syslog(LOG_ERR, "Error:: listen(%d) #%d - %s\n", sock, errno, strerror(errno));
1.1.2.5   misho      86:                pthread_cancel(tid);
1.1.2.3   misho      87:                return -1;
                     88:        }
                     89: 
1.1.2.4   misho      90:        while (!Kill) {
                     91:                if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
                     92:                        syslog(LOG_ERR, "Error:: accept() #%d - %s", errno, strerror(errno));
                     93:                        continue;
                     94:                } else
                     95:                        VERB(1) {
                     96:                                switch (sa.sa.sa_family) {
                     97:                                        case AF_INET:
                     98:                                                inet_ntop(AF_INET, &sa.sin.sin_addr, szAddr, sslen);
                     99:                                                snprintf(szAddr, sizeof szAddr, "%s:%d", 
                    100:                                                                szAddr, ntohs(sa.sin.sin_port));
                    101:                                                break;
                    102:                                        case AF_INET6:
                    103:                                                inet_ntop(AF_INET6, &sa.sin6.sin6_addr, szAddr, sslen);
                    104:                                                snprintf(szAddr, sizeof szAddr, "%s:%d", 
                    105:                                                                szAddr, ntohs(sa.sin6.sin6_port));
                    106:                                                break;
                    107:                                        case AF_LOCAL:
                    108:                                                strlcpy(szAddr, sa.sun.sun_path, sizeof szAddr);
                    109:                                                break;
                    110:                                        default:
                    111:                                                close(cli);
                    112:                                                syslog(LOG_ERR, "Error:: unsupported address type %d", 
                    113:                                                                sa.sa.sa_family);
                    114:                                                continue;
                    115:                                }
1.1.2.5   misho     116:                                str = strdup(szAddr);
                    117:                                syslog(LOG_DEBUG, "Connected client with socket=%d from %s", cli, str);
1.1.2.4   misho     118:                        }
                    119: 
1.1.2.5   misho     120:                if (!schedRead(root, startSession, str, cli)) {
1.1.2.4   misho     121:                        close(cli);
                    122:                        VERB(1) syslog(LOG_DEBUG, "Terminated client with socket=%d", cli);
1.1.2.5   misho     123:                        if (str)
                    124:                                free(str);
1.1.2.4   misho     125:                }
                    126:        }
                    127: 
1.1.2.2   misho     128:        return 0;
                    129: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>