Annotation of mqtt/src/daemon.c, revision 1.1.2.11
1.1.2.1 misho 1: #include "global.h"
1.1.2.11! misho 2: #include "mqttd.h"
1.1.2.1 misho 3:
4:
1.1.2.7 misho 5: extern char cliCmd[], *cliStr[];
6:
7:
1.1.2.5 misho 8: static void *
9: startSession(sched_task_t *task)
1.1.2.4 misho 10: {
1.1.2.11! misho 11: u_char basebuf[USHRT_MAX];
1.1.2.6 misho 12: mqtt_cb_t cbs[MQTT_TYPE_MAX + 1] = { 0 };
1.1.2.11! misho 13: // mqtt_msg_t *buf = { basebuf, sizeof basebuf };
1.1.2.10 misho 14: mqtthdr_connflgs_t flg;
1.1.2.9 misho 15: int ret = 0;
1.1.2.10 misho 16: struct timeval tv = { 0 };
17: u_short ka;
1.1.2.6 misho 18:
19: FTRACE(4);
20:
1.1.2.8 misho 21: /*
1.1.2.6 misho 22: buf = mqtt_msgAlloc(USHRT_MAX);
23: if (!buf) {
24: syslog(LOG_ERR, "Error:: allocate message buf (%d) #%d - %s", (int) TASK_FD(task),
25: mqtt_GetErrno(), mqtt_GetError());
26: goto end;
27: }
1.1.2.8 misho 28: */
1.1.2.6 misho 29:
1.1.2.8 misho 30: if (recv(TASK_FD(task), basebuf, sizeof basebuf, 0) == -1) {
31: VERB(3) syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", (int) TASK_FD(task),
1.1.2.6 misho 32: errno, strerror(errno));
33: goto end;
1.1.2.9 misho 34: }
1.1.2.8 misho 35:
1.1.2.11! misho 36: /*
1.1.2.10 misho 37: flg = mqtt_readCONNECT(buf, &ka, );
38: if (flg.reserved) {
39: VERB(3) syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s",
40: mqtt_GetErrno(), mqtt_GetError());
41: goto end;
1.1.2.11! misho 42: } */
1.1.2.10 misho 43:
1.1.2.8 misho 44: /*
1.1.2.7 misho 45: for (ret = i = 0; cliCmd[i] && !(ret = (hdr->mqtt_msg.type == cliCmd[i])); i++);
46: if (!ret) {
1.1.2.8 misho 47: VERB(2) syslog(LOG_ERR, "Error:: wrong command type #%d %s",
1.1.2.7 misho 48: hdr->mqtt_msg.type, cliStr[i]);
49: goto end;
50: }
1.1.2.8 misho 51: */
1.1.2.6 misho 52:
1.1.2.8 misho 53: /* check online table for user */
54: // ChkSessPUB(&cfg, );
1.1.2.6 misho 55:
1.1.2.10 misho 56: // ret = mqttDispatcher(cbs, buf);
1.1.2.9 misho 57: // mqtt_msgFree(&buf, 42);
1.1.2.6 misho 58:
1.1.2.10 misho 59: /*
1.1.2.6 misho 60: switch (ret) {
61: case -1:
62: syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
63: break;
64: default:
65: return NULL;
66: }
1.1.2.10 misho 67: */
1.1.2.6 misho 68: end: /* close client connection */
1.1.2.5 misho 69: close(TASK_FD(task));
70: VERB(1) syslog(LOG_DEBUG, "Close client %s with socket=%d",
71: (char*) TASK_ARG(task), (int) TASK_FD(task));
72: if (TASK_ARG(task))
73: free(TASK_ARG(task));
1.1.2.6 misho 74: return NULL;
1.1.2.4 misho 75: }
76:
77: /* ----------------------------------------------------------------------- */
78:
1.1.2.5 misho 79: static void *
80: thrSched(void *arg __unused)
81: {
82: FTRACE(1);
83:
84: schedRun(root, (intptr_t*) &Kill);
85: pthread_exit(NULL);
86: }
87:
1.1.2.2 misho 88: int
1.1.2.3 misho 89: Run(int sock)
1.1.2.2 misho 90: {
1.1.2.4 misho 91: io_sockaddr_t sa;
92: socklen_t sslen = sizeof sa.ss;
93: int cli;
1.1.2.5 misho 94: char *str = NULL, szAddr[STRSIZ] = { 0 };
95: pthread_t tid;
1.1.2.4 misho 96:
1.1.2.2 misho 97: FTRACE(1);
98:
1.1.2.5 misho 99: if (pthread_create(&tid, NULL, thrSched, NULL)) {
100: syslog(LOG_ERR, "Error:: thread scheduler #%d - %s", errno, strerror(errno));
101: return -1;
102: } else
103: pthread_detach(tid);
104: VERB(2) syslog(LOG_DEBUG, "Run scheduler management thread");
105:
1.1.2.3 misho 106: if (listen(sock, SOMAXCONN) == -1) {
1.1.2.4 misho 107: syslog(LOG_ERR, "Error:: listen(%d) #%d - %s\n", sock, errno, strerror(errno));
1.1.2.5 misho 108: pthread_cancel(tid);
1.1.2.3 misho 109: return -1;
110: }
111:
1.1.2.4 misho 112: while (!Kill) {
113: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
114: syslog(LOG_ERR, "Error:: accept() #%d - %s", errno, strerror(errno));
115: continue;
116: } else
117: VERB(1) {
118: switch (sa.sa.sa_family) {
119: case AF_INET:
120: inet_ntop(AF_INET, &sa.sin.sin_addr, szAddr, sslen);
121: snprintf(szAddr, sizeof szAddr, "%s:%d",
122: szAddr, ntohs(sa.sin.sin_port));
123: break;
124: case AF_INET6:
125: inet_ntop(AF_INET6, &sa.sin6.sin6_addr, szAddr, sslen);
126: snprintf(szAddr, sizeof szAddr, "%s:%d",
127: szAddr, ntohs(sa.sin6.sin6_port));
128: break;
129: case AF_LOCAL:
130: strlcpy(szAddr, sa.sun.sun_path, sizeof szAddr);
131: break;
132: default:
133: close(cli);
134: syslog(LOG_ERR, "Error:: unsupported address type %d",
135: sa.sa.sa_family);
136: continue;
137: }
1.1.2.5 misho 138: str = strdup(szAddr);
139: syslog(LOG_DEBUG, "Connected client with socket=%d from %s", cli, str);
1.1.2.4 misho 140: }
141:
1.1.2.5 misho 142: if (!schedRead(root, startSession, str, cli)) {
1.1.2.4 misho 143: close(cli);
144: VERB(1) syslog(LOG_DEBUG, "Terminated client with socket=%d", cli);
1.1.2.5 misho 145: if (str)
146: free(str);
1.1.2.4 misho 147: }
148: }
149:
1.1.2.2 misho 150: return 0;
151: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>