1: #include "global.h"
2: #include "mqttd.h"
3:
4:
5: extern char cliCmd[], *cliStr[];
6:
7:
8: static void *
9: startSession(sched_task_t *task)
10: {
11: u_char basebuf[USHRT_MAX];
12: mqtt_cb_t cbs[MQTT_TYPE_MAX + 1] = { 0 };
13: mqtt_msg_t buf = { basebuf, sizeof basebuf };
14: mqtthdr_connflgs_t flg;
15: int ret = 0;
16: struct timeval tv = { 0 };
17: u_short ka;
18: struct tagSession *sess = NULL;
19:
20: FTRACE(4);
21:
22: sess = malloc(sizeof(struct tagSession));
23: if (!sess) {
24: VERB(3) syslog(LOG_ERR, "Error:: in malloc #%d - %s", errno, strerror(errno));
25: goto end;
26: }
27:
28: if (recv(TASK_FD(task), buf.msg_base, buf.msg_len, 0) == -1) {
29: VERB(3) syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", (int) TASK_FD(task),
30: errno, strerror(errno));
31: goto end;
32: }
33: flg = mqtt_readCONNECT(&buf, &ka, sess->sess_cid, sizeof sess->sess_cid,
34: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
35: sess->sess_will.topic, sess->sess_will.msg);
36: if (flg.reserved) {
37: VERB(3) syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s",
38: mqtt_GetErrno(), mqtt_GetError());
39: goto end;
40: }
41:
42: /* check online table for user */
43:
44: end: /* close client connection */
45: if (sess)
46: free(sess);
47: close(TASK_FD(task));
48: VERB(1) syslog(LOG_DEBUG, "Close client %s with socket=%d",
49: (char*) TASK_ARG(task), (int) TASK_FD(task));
50: if (TASK_ARG(task))
51: free(TASK_ARG(task));
52: return NULL;
53: }
54:
55: /* ----------------------------------------------------------------------- */
56:
57: static void *
58: thrSched(void *arg __unused)
59: {
60: FTRACE(1);
61:
62: schedRun(root, (intptr_t*) &Kill);
63: pthread_exit(NULL);
64: }
65:
66: int
67: Run(int sock)
68: {
69: io_sockaddr_t sa;
70: socklen_t sslen = sizeof sa.ss;
71: int cli;
72: char *str = NULL, szAddr[STRSIZ] = { 0 };
73: pthread_t tid;
74:
75: FTRACE(1);
76:
77: if (pthread_create(&tid, NULL, thrSched, NULL)) {
78: syslog(LOG_ERR, "Error:: thread scheduler #%d - %s", errno, strerror(errno));
79: return -1;
80: } else
81: pthread_detach(tid);
82: VERB(2) syslog(LOG_DEBUG, "Run scheduler management thread");
83:
84: if (listen(sock, SOMAXCONN) == -1) {
85: syslog(LOG_ERR, "Error:: listen(%d) #%d - %s\n", sock, errno, strerror(errno));
86: pthread_cancel(tid);
87: return -1;
88: }
89:
90: while (!Kill) {
91: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
92: syslog(LOG_ERR, "Error:: accept() #%d - %s", errno, strerror(errno));
93: continue;
94: } else
95: VERB(1) {
96: switch (sa.sa.sa_family) {
97: case AF_INET:
98: inet_ntop(AF_INET, &sa.sin.sin_addr, szAddr, sslen);
99: snprintf(szAddr, sizeof szAddr, "%s:%d",
100: szAddr, ntohs(sa.sin.sin_port));
101: break;
102: case AF_INET6:
103: inet_ntop(AF_INET6, &sa.sin6.sin6_addr, szAddr, sslen);
104: snprintf(szAddr, sizeof szAddr, "%s:%d",
105: szAddr, ntohs(sa.sin6.sin6_port));
106: break;
107: case AF_LOCAL:
108: strlcpy(szAddr, sa.sun.sun_path, sizeof szAddr);
109: break;
110: default:
111: close(cli);
112: syslog(LOG_ERR, "Error:: unsupported address type %d",
113: sa.sa.sa_family);
114: continue;
115: }
116: str = strdup(szAddr);
117: syslog(LOG_DEBUG, "Connected client with socket=%d from %s", cli, str);
118: }
119:
120: if (!schedRead(root, startSession, str, cli)) {
121: close(cli);
122: VERB(1) syslog(LOG_DEBUG, "Terminated client with socket=%d", cli);
123: if (str)
124: free(str);
125: }
126: }
127:
128: return 0;
129: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>