1: #include "global.h"
2: #include "mqttd.h"
3:
4:
5: extern char cliCmd[], *cliStr[];
6:
7:
8: static inline struct tagSession *
9: initSession(int sock, char * __restrict addr)
10: {
11: struct tagSession *sess = NULL;
12: const char *str;
13:
14: if (!addr)
15: return NULL;
16:
17: sess = malloc(sizeof(struct tagSession));
18: if (!sess) {
19: VERB(3) syslog(LOG_ERR, "Error:: in malloc #%d - %s", errno, strerror(errno));
20: free(addr);
21: return NULL;
22: } else
23: memset(sess, 0, sizeof(struct tagSession));
24:
25: str = cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
26: if (!str)
27: sess->sess_retry = DEFAULT_RETRY;
28: else
29: sess->sess_retry = strtol(str, NULL, 0);
30:
31: sess->sess_sock = sock;
32: strlcpy(sess->sess_addr, addr, sizeof sess->sess_addr);
33: free(addr);
34:
35: sess->sess_buf = mqtt_msgAlloc(0);
36: if (!sess->sess_buf) {
37: VERB(3) syslog(LOG_ERR, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
38: free(sess);
39: return NULL;
40: }
41:
42: return sess;
43: }
44:
45: static void
46: finiSession(struct tagSession *sess)
47: {
48: register int i;
49: struct tagStore *store;
50:
51: if (!sess)
52: return;
53:
54: for (i = 0; i < MQTT_QOS_RESERVED; i++)
55: while ((store = SLIST_FIRST(&sess->sess_store[i]))) {
56: SLIST_REMOVE_HEAD(&sess->sess_store[i], st_node);
57: free(store);
58: }
59:
60: if (sess->sess_will.msg)
61: free(sess->sess_will.msg);
62: if (sess->sess_will.topic)
63: free(sess->sess_will.topic);
64:
65: if (sess->sess_sock > STDERR_FILENO) {
66: shutdown(sess->sess_sock, SHUT_RDWR);
67: close(sess->sess_sock);
68: }
69:
70: mqtt_msgFree(&sess->sess_buf, 42);
71:
72: if (sess->sess_tid)
73: pthread_cancel(sess->sess_tid);
74: free(sess);
75: }
76:
77: static void *
78: startSession(sched_task_t *task)
79: {
80: u_char basebuf[USHRT_MAX];
81: mqtt_cb_t cbs[MQTT_TYPE_MAX + 1] = { 0 };
82: mqtt_msg_t buf = { basebuf, sizeof basebuf };
83: mqtthdr_connflgs_t flg;
84: mqtthdr_connack_t cack;
85: int ret = 0;
86: struct timeval tv = { 0 };
87: struct tagSession *sess = NULL;
88:
89: FTRACE(4);
90:
91: sess = initSession(TASK_FD(task), TASK_ARG(task));
92: if (!sess)
93: return NULL;
94:
95: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
96: VERB(3) syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
97: goto end;
98: }
99: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
100: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
101: &sess->sess_will.topic, &sess->sess_will.msg);
102: flg.flags = cack.reserved;
103: if (flg.reserved) {
104: VERB(3) syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
105: goto end;
106: } else {
107: sess->sess_clean = flg.clean_sess;
108: sess->sess_will.qos = flg.will_qos;
109: sess->sess_will.retain = flg.will_retain;
110: sess->sess_will.flag = flg.will_flg;
111: }
112:
113: /* check online table for user */
114:
115: end: /* close client connection */
116: if (sess)
117: free(sess);
118: close(TASK_FD(task));
119: VERB(1) syslog(LOG_DEBUG, "Close client %s with socket=%d",
120: (char*) TASK_ARG(task), (int) TASK_FD(task));
121: if (TASK_ARG(task))
122: free(TASK_ARG(task));
123: return NULL;
124: }
125:
126: /* ----------------------------------------------------------------------- */
127:
128: static void *
129: thrSched(void *arg __unused)
130: {
131: FTRACE(1);
132:
133: schedRun(root, (intptr_t*) &Kill);
134: pthread_exit(NULL);
135: }
136:
137: int
138: Run(int sock)
139: {
140: io_sockaddr_t sa;
141: socklen_t sslen = sizeof sa.ss;
142: int cli;
143: char *str = NULL, szAddr[STRSIZ] = { 0 };
144: pthread_t tid;
145:
146: FTRACE(1);
147:
148: if (pthread_create(&tid, NULL, thrSched, NULL)) {
149: syslog(LOG_ERR, "Error:: thread scheduler #%d - %s", errno, strerror(errno));
150: return -1;
151: } else
152: pthread_detach(tid);
153: VERB(2) syslog(LOG_DEBUG, "Run scheduler management thread");
154:
155: if (listen(sock, SOMAXCONN) == -1) {
156: syslog(LOG_ERR, "Error:: listen(%d) #%d - %s\n", sock, errno, strerror(errno));
157: pthread_cancel(tid);
158: return -1;
159: }
160:
161: while (!Kill) {
162: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
163: syslog(LOG_ERR, "Error:: accept() #%d - %s", errno, strerror(errno));
164: continue;
165: } else
166: VERB(1) {
167: switch (sa.sa.sa_family) {
168: case AF_INET:
169: inet_ntop(AF_INET, &sa.sin.sin_addr, szAddr, sslen);
170: snprintf(szAddr, sizeof szAddr, "%s:%d",
171: szAddr, ntohs(sa.sin.sin_port));
172: break;
173: case AF_INET6:
174: inet_ntop(AF_INET6, &sa.sin6.sin6_addr, szAddr, sslen);
175: snprintf(szAddr, sizeof szAddr, "%s:%d",
176: szAddr, ntohs(sa.sin6.sin6_port));
177: break;
178: case AF_LOCAL:
179: strlcpy(szAddr, sa.sun.sun_path, sizeof szAddr);
180: break;
181: default:
182: close(cli);
183: syslog(LOG_ERR, "Error:: unsupported address type %d",
184: sa.sa.sa_family);
185: continue;
186: }
187: str = strdup(szAddr);
188: syslog(LOG_DEBUG, "Connected client with socket=%d from %s", cli, str);
189: }
190:
191: if (!schedRead(root, startSession, str, cli)) {
192: close(cli);
193: VERB(1) syslog(LOG_DEBUG, "Terminated client with socket=%d", cli);
194: if (str)
195: free(str);
196: }
197: }
198:
199: return 0;
200: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>