1: #include "global.h"
2: #include "mqttd.h"
3:
4:
5: extern char cliCmd[], *cliStr[];
6:
7:
8: static void *
9: startSession(sched_task_t *task)
10: {
11: u_char basebuf[USHRT_MAX];
12: mqtt_cb_t cbs[MQTT_TYPE_MAX + 1] = { 0 };
13: // mqtt_msg_t *buf = { basebuf, sizeof basebuf };
14: mqtthdr_connflgs_t flg;
15: int ret = 0;
16: struct timeval tv = { 0 };
17: u_short ka;
18:
19: FTRACE(4);
20:
21: /*
22: buf = mqtt_msgAlloc(USHRT_MAX);
23: if (!buf) {
24: syslog(LOG_ERR, "Error:: allocate message buf (%d) #%d - %s", (int) TASK_FD(task),
25: mqtt_GetErrno(), mqtt_GetError());
26: goto end;
27: }
28: */
29:
30: if (recv(TASK_FD(task), basebuf, sizeof basebuf, 0) == -1) {
31: VERB(3) syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", (int) TASK_FD(task),
32: errno, strerror(errno));
33: goto end;
34: }
35:
36: /*
37: flg = mqtt_readCONNECT(buf, &ka, );
38: if (flg.reserved) {
39: VERB(3) syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s",
40: mqtt_GetErrno(), mqtt_GetError());
41: goto end;
42: } */
43:
44: /*
45: for (ret = i = 0; cliCmd[i] && !(ret = (hdr->mqtt_msg.type == cliCmd[i])); i++);
46: if (!ret) {
47: VERB(2) syslog(LOG_ERR, "Error:: wrong command type #%d %s",
48: hdr->mqtt_msg.type, cliStr[i]);
49: goto end;
50: }
51: */
52:
53: /* check online table for user */
54: // ChkSessPUB(&cfg, );
55:
56: // ret = mqttDispatcher(cbs, buf);
57: // mqtt_msgFree(&buf, 42);
58:
59: /*
60: switch (ret) {
61: case -1:
62: syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
63: break;
64: default:
65: return NULL;
66: }
67: */
68: end: /* close client connection */
69: close(TASK_FD(task));
70: VERB(1) syslog(LOG_DEBUG, "Close client %s with socket=%d",
71: (char*) TASK_ARG(task), (int) TASK_FD(task));
72: if (TASK_ARG(task))
73: free(TASK_ARG(task));
74: return NULL;
75: }
76:
77: /* ----------------------------------------------------------------------- */
78:
79: static void *
80: thrSched(void *arg __unused)
81: {
82: FTRACE(1);
83:
84: schedRun(root, (intptr_t*) &Kill);
85: pthread_exit(NULL);
86: }
87:
88: int
89: Run(int sock)
90: {
91: io_sockaddr_t sa;
92: socklen_t sslen = sizeof sa.ss;
93: int cli;
94: char *str = NULL, szAddr[STRSIZ] = { 0 };
95: pthread_t tid;
96:
97: FTRACE(1);
98:
99: if (pthread_create(&tid, NULL, thrSched, NULL)) {
100: syslog(LOG_ERR, "Error:: thread scheduler #%d - %s", errno, strerror(errno));
101: return -1;
102: } else
103: pthread_detach(tid);
104: VERB(2) syslog(LOG_DEBUG, "Run scheduler management thread");
105:
106: if (listen(sock, SOMAXCONN) == -1) {
107: syslog(LOG_ERR, "Error:: listen(%d) #%d - %s\n", sock, errno, strerror(errno));
108: pthread_cancel(tid);
109: return -1;
110: }
111:
112: while (!Kill) {
113: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
114: syslog(LOG_ERR, "Error:: accept() #%d - %s", errno, strerror(errno));
115: continue;
116: } else
117: VERB(1) {
118: switch (sa.sa.sa_family) {
119: case AF_INET:
120: inet_ntop(AF_INET, &sa.sin.sin_addr, szAddr, sslen);
121: snprintf(szAddr, sizeof szAddr, "%s:%d",
122: szAddr, ntohs(sa.sin.sin_port));
123: break;
124: case AF_INET6:
125: inet_ntop(AF_INET6, &sa.sin6.sin6_addr, szAddr, sslen);
126: snprintf(szAddr, sizeof szAddr, "%s:%d",
127: szAddr, ntohs(sa.sin6.sin6_port));
128: break;
129: case AF_LOCAL:
130: strlcpy(szAddr, sa.sun.sun_path, sizeof szAddr);
131: break;
132: default:
133: close(cli);
134: syslog(LOG_ERR, "Error:: unsupported address type %d",
135: sa.sa.sa_family);
136: continue;
137: }
138: str = strdup(szAddr);
139: syslog(LOG_DEBUG, "Connected client with socket=%d from %s", cli, str);
140: }
141:
142: if (!schedRead(root, startSession, str, cli)) {
143: close(cli);
144: VERB(1) syslog(LOG_DEBUG, "Terminated client with socket=%d", cli);
145: if (str)
146: free(str);
147: }
148: }
149:
150: return 0;
151: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>