Annotation of mqtt/src/daemon.c, revision 1.1.2.17
1.1.2.1 misho 1: #include "global.h"
1.1.2.16 misho 2: #include "rtlm.h"
1.1.2.11 misho 3: #include "mqttd.h"
1.1.2.1 misho 4:
5:
1.1.2.14 misho 6: static inline struct tagSession *
7: initSession(int sock, char * __restrict addr)
8: {
9: struct tagSession *sess = NULL;
10: const char *str;
11:
1.1.2.15 misho 12: FTRACE(5);
13:
1.1.2.14 misho 14: if (!addr)
15: return NULL;
16:
17: sess = malloc(sizeof(struct tagSession));
18: if (!sess) {
19: VERB(3) syslog(LOG_ERR, "Error:: in malloc #%d - %s", errno, strerror(errno));
20: return NULL;
21: } else
22: memset(sess, 0, sizeof(struct tagSession));
23:
1.1.2.17! misho 24: str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
1.1.2.14 misho 25: if (!str)
26: sess->sess_retry = DEFAULT_RETRY;
27: else
28: sess->sess_retry = strtol(str, NULL, 0);
29:
30: sess->sess_buf = mqtt_msgAlloc(0);
31: if (!sess->sess_buf) {
32: VERB(3) syslog(LOG_ERR, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
33: free(sess);
34: return NULL;
35: }
36:
1.1.2.15 misho 37: sess->sess_sock = sock;
38: strlcpy(sess->sess_addr, addr, sizeof sess->sess_addr);
39: free(addr);
1.1.2.14 misho 40: return sess;
41: }
42:
43: static void
44: finiSession(struct tagSession *sess)
45: {
46: register int i;
47: struct tagStore *store;
48:
1.1.2.15 misho 49: FTRACE(5);
50:
1.1.2.14 misho 51: if (!sess)
52: return;
53:
54: for (i = 0; i < MQTT_QOS_RESERVED; i++)
55: while ((store = SLIST_FIRST(&sess->sess_store[i]))) {
56: SLIST_REMOVE_HEAD(&sess->sess_store[i], st_node);
57: free(store);
58: }
59:
60: if (sess->sess_will.msg)
61: free(sess->sess_will.msg);
62: if (sess->sess_will.topic)
63: free(sess->sess_will.topic);
64:
65: if (sess->sess_sock > STDERR_FILENO) {
66: shutdown(sess->sess_sock, SHUT_RDWR);
67: close(sess->sess_sock);
68: }
69:
70: mqtt_msgFree(&sess->sess_buf, 42);
71:
72: if (sess->sess_tid)
73: pthread_cancel(sess->sess_tid);
74: free(sess);
75: }
76:
1.1.2.5 misho 77: static void *
1.1.2.15 misho 78: thrSession(struct tagSession *sess)
79: {
80: FTRACE(2);
81:
82: TAILQ_REMOVE(&Sessions, sess, sess_node);
83: pthread_exit(NULL);
84: return NULL;
85: }
86:
87: static void *
1.1.2.5 misho 88: startSession(sched_task_t *task)
1.1.2.4 misho 89: {
1.1.2.11 misho 90: u_char basebuf[USHRT_MAX];
1.1.2.12 misho 91: mqtt_msg_t buf = { basebuf, sizeof basebuf };
1.1.2.10 misho 92: mqtthdr_connflgs_t flg;
1.1.2.13 misho 93: mqtthdr_connack_t cack;
1.1.2.12 misho 94: struct tagSession *sess = NULL;
1.1.2.6 misho 95:
96: FTRACE(4);
97:
1.1.2.14 misho 98: sess = initSession(TASK_FD(task), TASK_ARG(task));
1.1.2.15 misho 99: if (!sess) {
100: if (TASK_ARG(task))
101: free(TASK_ARG(task));
102: close(TASK_FD(task));
1.1.2.14 misho 103: return NULL;
1.1.2.15 misho 104: }
1.1.2.6 misho 105:
1.1.2.14 misho 106: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
107: VERB(3) syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.1.2.6 misho 108: goto end;
1.1.2.9 misho 109: }
1.1.2.14 misho 110: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
1.1.2.12 misho 111: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
1.1.2.13 misho 112: &sess->sess_will.topic, &sess->sess_will.msg);
1.1.2.14 misho 113: flg.flags = cack.reserved;
114: if (flg.reserved) {
1.1.2.13 misho 115: VERB(3) syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.1.2.10 misho 116: goto end;
1.1.2.14 misho 117: } else {
118: sess->sess_clean = flg.clean_sess;
119: sess->sess_will.qos = flg.will_qos;
120: sess->sess_will.retain = flg.will_retain;
121: sess->sess_will.flag = flg.will_flg;
1.1.2.12 misho 122: }
1.1.2.6 misho 123:
1.1.2.8 misho 124: /* check online table for user */
1.1.2.16 misho 125: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass)) {
126: VERB(1) syslog(LOG_WARNING, "Login:: DENIED for username %s and password %s",
127: sess->sess_user, sess->sess_pass);
128: goto end;
129: } else
130: VERB(1) syslog(LOG_WARNING, "Login:: ALLOWED for username %s ...", sess->sess_user);
1.1.2.6 misho 131:
1.1.2.15 misho 132: /* Start session thread OK ... */
133: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
134: pthread_create(&sess->sess_tid, NULL, (void*(*)(void*)) thrSession, sess);
135: pthread_detach(sess->sess_tid);
136: VERB(1) syslog(LOG_DEBUG, "ConnID=%s(%s) for %s login OK!",
137: sess->sess_cid, sess->sess_user, sess->sess_addr);
138: return NULL;
1.1.2.6 misho 139: end: /* close client connection */
1.1.2.5 misho 140: VERB(1) syslog(LOG_DEBUG, "Close client %s with socket=%d",
141: (char*) TASK_ARG(task), (int) TASK_FD(task));
1.1.2.15 misho 142: finiSession(sess);
1.1.2.6 misho 143: return NULL;
1.1.2.4 misho 144: }
145:
146: /* ----------------------------------------------------------------------- */
147:
1.1.2.5 misho 148: static void *
149: thrSched(void *arg __unused)
150: {
151: FTRACE(1);
152:
153: schedRun(root, (intptr_t*) &Kill);
154: pthread_exit(NULL);
155: }
156:
1.1.2.2 misho 157: int
1.1.2.3 misho 158: Run(int sock)
1.1.2.2 misho 159: {
1.1.2.4 misho 160: io_sockaddr_t sa;
161: socklen_t sslen = sizeof sa.ss;
162: int cli;
1.1.2.5 misho 163: char *str = NULL, szAddr[STRSIZ] = { 0 };
164: pthread_t tid;
1.1.2.4 misho 165:
1.1.2.2 misho 166: FTRACE(1);
167:
1.1.2.5 misho 168: if (pthread_create(&tid, NULL, thrSched, NULL)) {
169: syslog(LOG_ERR, "Error:: thread scheduler #%d - %s", errno, strerror(errno));
170: return -1;
171: } else
172: pthread_detach(tid);
173: VERB(2) syslog(LOG_DEBUG, "Run scheduler management thread");
174:
1.1.2.3 misho 175: if (listen(sock, SOMAXCONN) == -1) {
1.1.2.4 misho 176: syslog(LOG_ERR, "Error:: listen(%d) #%d - %s\n", sock, errno, strerror(errno));
1.1.2.5 misho 177: pthread_cancel(tid);
1.1.2.3 misho 178: return -1;
179: }
180:
1.1.2.4 misho 181: while (!Kill) {
182: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
183: syslog(LOG_ERR, "Error:: accept() #%d - %s", errno, strerror(errno));
184: continue;
185: } else
186: VERB(1) {
187: switch (sa.sa.sa_family) {
188: case AF_INET:
189: inet_ntop(AF_INET, &sa.sin.sin_addr, szAddr, sslen);
190: snprintf(szAddr, sizeof szAddr, "%s:%d",
191: szAddr, ntohs(sa.sin.sin_port));
192: break;
193: case AF_INET6:
194: inet_ntop(AF_INET6, &sa.sin6.sin6_addr, szAddr, sslen);
195: snprintf(szAddr, sizeof szAddr, "%s:%d",
196: szAddr, ntohs(sa.sin6.sin6_port));
197: break;
198: case AF_LOCAL:
199: strlcpy(szAddr, sa.sun.sun_path, sizeof szAddr);
200: break;
201: default:
202: close(cli);
203: syslog(LOG_ERR, "Error:: unsupported address type %d",
204: sa.sa.sa_family);
205: continue;
206: }
1.1.2.5 misho 207: str = strdup(szAddr);
208: syslog(LOG_DEBUG, "Connected client with socket=%d from %s", cli, str);
1.1.2.4 misho 209: }
210:
1.1.2.5 misho 211: if (!schedRead(root, startSession, str, cli)) {
1.1.2.4 misho 212: close(cli);
213: VERB(1) syslog(LOG_DEBUG, "Terminated client with socket=%d", cli);
1.1.2.5 misho 214: if (str)
215: free(str);
1.1.2.4 misho 216: }
217: }
218:
1.1.2.2 misho 219: return 0;
220: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>