Annotation of mqtt/src/daemon.c, revision 1.2.2.1
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "utils.h"
4: #include "mqttd.h"
5: #include "mqttd_calls.h"
6:
7:
8: static void *startSession(sched_task_t *task);
9:
10:
11: static inline struct tagSession *
12: initSession(int sock, ait_val_t * __restrict v)
13: {
14: struct tagSession *sess = NULL;
15: const char *str;
16:
17: ioTRACE(5);
18:
19: if (!v)
20: return NULL;
21:
22: sess = malloc(sizeof(struct tagSession));
23: if (!sess) {
24: ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
25: io_freeVar(v);
26: return NULL;
27: } else
28: memset(sess, 0, sizeof(struct tagSession));
29:
30: TAILQ_INIT(&sess->sess_sndqueue);
31:
32: str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
33: if (!str)
34: sess->sess_retry = DEFAULT_RETRY;
35: else
36: sess->sess_retry = strtol(str, NULL, 0);
37:
38: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
39: if (!sess->sess_buf) {
40: ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
41: free(sess);
42: io_freeVar(v);
43: return NULL;
44: }
45:
46: sess->sess_sock = sock;
47: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
48: io_freeVar(v);
49: return sess;
50: }
51:
52: static void
53: finiSession(struct tagSession *sess, int preservSock)
54: {
55: struct tagStore *store;
56:
57: ioTRACE(5);
58:
59: if (!sess)
60: return;
61:
62: if (call.FiniSessPUB)
63: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
64:
65: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
66: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
67:
1.2.2.1 ! misho 68: if (store->st_subscr.sub_topic.msg_base)
! 69: free(store->st_subscr.sub_topic.msg_base);
! 70: if (store->st_subscr.sub_value.msg_base)
! 71: free(store->st_subscr.sub_value.msg_base);
1.2 misho 72:
73: free(store);
74: }
75:
76: if (sess->sess_will.msg)
77: free(sess->sess_will.msg);
78: if (sess->sess_will.topic)
79: free(sess->sess_will.topic);
80:
81: if (sess->sess_sock > STDERR_FILENO && !preservSock)
82: srv_Close(sess->sess_sock);
83:
84: mqtt_msgFree(&sess->sess_buf, 42);
85:
86: free(sess);
87: }
88:
89: static void
90: stopSession(struct tagSession *sess)
91: {
92: mqtt_msg_t msg = { NULL, 0 };
93: int ret;
94:
95: ioTRACE(4);
96:
97: assert(sess);
98:
99: pthread_mutex_lock(&mtx_sess);
100: TAILQ_REMOVE(&Sessions, sess, sess_node);
101: pthread_mutex_unlock(&mtx_sess);
102:
103: ret = mqtt_msgDISCONNECT(&msg);
104: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
105: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
106: else {
107: ioDEBUG(5, "Sended %d bytes for disconnect", ret);
108: free(msg.msg_base);
109: memset(&msg, 0, sizeof msg);
110: }
111:
112: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
113: finiSession(sess, 0);
114:
115: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
116: sess->sess_addr, sess->sess_user);
117: }
118:
119: static int
120: KASession(struct tagSession *sess)
121: {
122: mqtt_msg_t msg = { NULL, 0 };
123: int ret;
124: struct pollfd pfd;
125:
126: ioTRACE(4);
127:
128: assert(sess);
129:
130: /* ping request */
131: ret = mqtt_msgPINGREQ(&msg);
132: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
133: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
134: return -1;
135: } else {
136: ioDEBUG(5, "Sended %d bytes for ping request", ret);
137: free(msg.msg_base);
138: memset(&msg, 0, sizeof msg);
139: }
140:
141: pfd.fd = sess->sess_sock;
142: pfd.events = POLLIN | POLLPRI;
143: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
144: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
145: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
146: return -1;
147: } else if (!ret) {
148: ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
149: return 1;
150: }
151: /* receive & decode packet */
152: if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
153: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
154: return -1;
155: }
156: if (mqtt_readPINGRESP(sess->sess_buf)) {
157: ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
158: return 2;
159: }
160:
161: /* Keep Alive is OK! */
162: return 0;
163: }
164:
165: static void *
166: thrSession(struct tagSession *sess)
167: {
168: mqtt_msg_t msg = { NULL, 0 };
169: int ret, locKill = 42;
170: struct pollfd pfd;
171: struct mqtthdr *hdr;
172: ait_val_t *v;
173: struct tagStore *store;
174:
175: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
176: ioTRACE(2);
177:
178: pfd.fd = sess->sess_sock;
179: pfd.events = POLLIN | POLLPRI;
180: while (!Kill && locKill) {
181: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
182: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
183: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
184: break;
185: } else if (!ret && (ret = KASession(sess))) {
186: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
187: sess->sess_cid, sess->sess_addr, sess->sess_user);
188: break;
189: }
190: /* receive & decode packet */
191: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
192: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
193: break;
194: } else if (!ret) {
195: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
196: break;
197: } else
198: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
199:
200: /* dispatch message type */
201: switch (hdr->mqtt_msg.type) {
202: case MQTT_TYPE_CONNECT:
203: ioDEBUG(5, "Exec CONNECT session");
204: pthread_mutex_lock(&mtx_sess);
205: TAILQ_REMOVE(&Sessions, sess, sess_node);
206: pthread_mutex_unlock(&mtx_sess);
207:
208: if (call.FiniSessPUB)
209: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
210:
211: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
212: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
213:
1.2.2.1 ! misho 214: if (store->st_subscr.sub_topic.msg_base)
! 215: free(store->st_subscr.sub_topic.msg_base);
! 216: if (store->st_subscr.sub_value.msg_base)
! 217: free(store->st_subscr.sub_value.msg_base);
1.2 misho 218:
219: free(store);
220: }
221:
222: if (sess->sess_will.msg)
223: free(sess->sess_will.msg);
224: if (sess->sess_will.topic)
225: free(sess->sess_will.topic);
226:
227: if ((v = io_allocVar())) {
228: AIT_SET_STR(v, sess->sess_addr);
229: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
230: io_freeVar(v);
231: } else
232: ioLIBERR(mqtt);
233:
234: locKill ^= locKill;
235:
236: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
237: sess->sess_addr, sess->sess_user);
238: continue;
239: case MQTT_TYPE_DISCONNECT:
240: ioDEBUG(5, "Exec DISCONNECT session");
241: pthread_mutex_lock(&mtx_sess);
242: TAILQ_REMOVE(&Sessions, sess, sess_node);
243: pthread_mutex_unlock(&mtx_sess);
244:
245: finiSession(sess, 0);
246: locKill ^= locKill;
247:
248: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
249: sess->sess_addr, sess->sess_user);
250: continue;
251: case MQTT_TYPE_PUBLISH:
252: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
253: if (Publish(sess))
254: locKill ^= locKill;
255: break;
256: case MQTT_TYPE_PUBREL:
257: break;
258: case MQTT_TYPE_SUBSCRIBE:
259: break;
260: case MQTT_TYPE_UNSUBSCRIBE:
261: break;
262: case MQTT_TYPE_PINGREQ:
263: break;
264: default:
265: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
266: sess->sess_cid, hdr->mqtt_msg.type);
267: break;
268: }
269: }
270:
271: pthread_cleanup_pop(locKill);
272: pthread_exit(NULL);
273: }
274:
275: static void *
276: startSession(sched_task_t *task)
277: {
278: u_char basebuf[USHRT_MAX];
279: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
280: mqtthdr_connflgs_t flg;
281: mqtthdr_connack_t cack;
282: struct tagSession *s, *sess = NULL;
283: int ret;
284: pthread_attr_t attr;
285:
286: ioTRACE(4);
287:
288: assert(task);
289:
290: if (!TASK_DATA(task)) {
291: sess = initSession(TASK_FD(task), TASK_ARG(task));
292: if (!sess) {
293: io_freeVar(TASK_ARG(task));
294: close(TASK_FD(task));
295: return NULL;
296: }
297:
298: /* receive & decode packet */
299: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
300: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
301: finiSession(sess, 0);
302: return NULL;
303: }
304: } else {
305: sess = TASK_DATA(task);
306: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
307: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
308: }
309:
310: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
311: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
312: &sess->sess_will.topic, &sess->sess_will.msg);
313: ret = cack.retcode;
314: flg.flags = cack.reserved;
315: if (flg.reserved) {
316: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
317: goto end;
318: } else {
319: sess->sess_clean = flg.clean_sess;
320: sess->sess_will.qos = flg.will_qos;
321: sess->sess_will.retain = flg.will_retain;
322: sess->sess_will.flag = flg.will_flg;
323: }
324:
325: /* check online table for user */
326: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
327: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
328: sess->sess_user, sess->sess_pass);
329: ret = MQTT_RETCODE_DENIED;
330: goto end;
331: } else {
332: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
333: ret = MQTT_RETCODE_ACCEPTED;
334: }
335: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
336: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
337: TAILQ_FOREACH(s, &Sessions, sess_node)
338: if (!strcmp(s->sess_cid, sess->sess_cid)) {
339: /* found stale session & disconnect it! */
340: stopSession(s);
341: break;
342: }
343: }
344: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
345: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
346: sess->sess_will.qos, sess->sess_will.retain) == -1) {
347: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
348: ret = MQTT_RETCODE_DENIED;
349: goto end;
350: } else {
351: ioDEBUG(0, "Session %s from %s and username %s is started",
352: sess->sess_cid, sess->sess_addr, sess->sess_user);
353: ret = MQTT_RETCODE_ACCEPTED;
354: }
355:
356: ret = mqtt_msgCONNACK(&msg, ret);
357: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
358: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
359: finiSession(sess, 0);
360: return NULL;
361: } else {
362: ioDEBUG(5, "Sended %d bytes", ret);
363: free(msg.msg_base);
364: memset(&msg, 0, sizeof msg);
365: }
366:
367: /* Start session thread OK ... */
368: pthread_attr_init(&attr);
369: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
370:
371: pthread_mutex_lock(&mtx_sess);
372: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
373: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
374: pthread_mutex_unlock(&mtx_sess);
375:
376: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
377: sess->sess_addr, sess->sess_user, sess->sess_ka);
378:
379: pthread_attr_destroy(&attr);
380: return NULL;
381: end: /* close client connection */
382: ret = mqtt_msgCONNACK(&msg, ret);
383: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
384: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
385: } else {
386: ioDEBUG(5, "Sended %d bytes", ret);
387: free(msg.msg_base);
388: memset(&msg, 0, sizeof msg);
389: }
390:
391: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
392: finiSession(sess, 0);
393: return NULL;
394: }
395:
396: /* ----------------------------------------------------------------------- */
397:
398: static void *
399: thrSched(void *arg __unused)
400: {
401: struct tagSession *sess;
402: struct timespec pl = { 0, 10000000 };
403:
404: ioTRACE(1);
405:
406: schedPolling(root, &pl, NULL);
407: schedRun(root, &Kill);
408:
409: TAILQ_FOREACH(sess, &Sessions, sess_node)
410: if (sess->sess_tid)
411: pthread_cancel(sess->sess_tid);
412: ioDEBUG(5, "EXIT from Scheduler thread !!!");
413: pthread_exit(NULL);
414: }
415:
416: int
417: Run(int sock)
418: {
419: io_sockaddr_t sa;
420: socklen_t sslen = sizeof sa.ss;
421: int cli;
422: pthread_t tid;
423: ait_val_t *v;
424: char str[STRSIZ];
425:
426: ioTRACE(1);
427:
428: if (pthread_create(&tid, NULL, thrSched, NULL)) {
429: ioSYSERR(0);
430: return -1;
431: } else
432: pthread_detach(tid);
433: ioDEBUG(2, "Run scheduler management thread");
434:
435: if (listen(sock, SOMAXCONN) == -1) {
436: ioSYSERR(0);
437: return -1;
438: }
439:
440: while (!Kill) {
441: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
442: if (!Kill)
443: continue;
444: ioSYSERR(0);
445: break;
446: }
447: v = io_allocVar();
448: if (!v) {
449: ioLIBERR(mqtt);
450: break;
451: } else {
452: memset(str, 0, sizeof str);
453: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
454: AIT_SET_STR(v, str);
455: }
456: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
457:
458: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
459: io_freeVar(v);
460: close(cli);
461: ioDEBUG(1, "Terminated client with socket=%d", cli);
462: }
463: }
464:
465: return 0;
466: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>