Annotation of mqtt/src/daemon.c, revision 1.2.2.2
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "utils.h"
4: #include "mqttd.h"
5: #include "mqttd_calls.h"
6:
7:
8: static void *startSession(sched_task_t *task);
9:
10:
11: static inline struct tagSession *
12: initSession(int sock, ait_val_t * __restrict v)
13: {
14: struct tagSession *sess = NULL;
15: const char *str;
16:
17: ioTRACE(5);
18:
19: if (!v)
20: return NULL;
21:
22: sess = malloc(sizeof(struct tagSession));
23: if (!sess) {
24: ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
25: io_freeVar(v);
26: return NULL;
27: } else
28: memset(sess, 0, sizeof(struct tagSession));
29:
30: TAILQ_INIT(&sess->sess_sndqueue);
31:
32: str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
33: if (!str)
34: sess->sess_retry = DEFAULT_RETRY;
35: else
36: sess->sess_retry = strtol(str, NULL, 0);
37:
38: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
39: if (!sess->sess_buf) {
40: ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
41: free(sess);
42: io_freeVar(v);
43: return NULL;
44: }
45:
1.2.2.2 ! misho 46: sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
! 47: if (!sess->sess_srv) {
! 48: ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 49: mqtt_msgFree(&sess->sess_buf, 42);
! 50: free(sess);
! 51: io_freeVar(v);
! 52: return NULL;
! 53: }
! 54:
1.2 misho 55: sess->sess_sock = sock;
56: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
57: io_freeVar(v);
58: return sess;
59: }
60:
61: static void
62: finiSession(struct tagSession *sess, int preservSock)
63: {
64: struct tagStore *store;
65:
66: ioTRACE(5);
67:
68: if (!sess)
69: return;
70:
71: if (call.FiniSessPUB)
72: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
73:
74: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
75: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
76:
1.2.2.1 misho 77: if (store->st_subscr.sub_topic.msg_base)
78: free(store->st_subscr.sub_topic.msg_base);
79: if (store->st_subscr.sub_value.msg_base)
80: free(store->st_subscr.sub_value.msg_base);
1.2 misho 81:
82: free(store);
83: }
84:
85: if (sess->sess_will.msg)
86: free(sess->sess_will.msg);
87: if (sess->sess_will.topic)
88: free(sess->sess_will.topic);
89:
90: if (sess->sess_sock > STDERR_FILENO && !preservSock)
91: srv_Close(sess->sess_sock);
92:
1.2.2.2 ! misho 93: mqtt_srv_Fini(&sess->sess_srv);
1.2 misho 94: mqtt_msgFree(&sess->sess_buf, 42);
95:
96: free(sess);
97: }
98:
99: static void
100: stopSession(struct tagSession *sess)
101: {
102: mqtt_msg_t msg = { NULL, 0 };
103: int ret;
104:
105: ioTRACE(4);
106:
107: assert(sess);
108:
109: pthread_mutex_lock(&mtx_sess);
110: TAILQ_REMOVE(&Sessions, sess, sess_node);
111: pthread_mutex_unlock(&mtx_sess);
112:
113: ret = mqtt_msgDISCONNECT(&msg);
114: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
115: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
116: else {
117: ioDEBUG(5, "Sended %d bytes for disconnect", ret);
118: free(msg.msg_base);
119: memset(&msg, 0, sizeof msg);
120: }
121:
122: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
123: finiSession(sess, 0);
124:
125: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
126: sess->sess_addr, sess->sess_user);
127: }
128:
129: static int
130: KASession(struct tagSession *sess)
131: {
132: mqtt_msg_t msg = { NULL, 0 };
133: int ret;
134: struct pollfd pfd;
135:
136: ioTRACE(4);
137:
138: assert(sess);
139:
140: /* ping request */
141: ret = mqtt_msgPINGREQ(&msg);
142: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
143: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
144: return -1;
145: } else {
146: ioDEBUG(5, "Sended %d bytes for ping request", ret);
147: free(msg.msg_base);
148: memset(&msg, 0, sizeof msg);
149: }
150:
151: pfd.fd = sess->sess_sock;
152: pfd.events = POLLIN | POLLPRI;
153: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
154: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
155: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
156: return -1;
157: } else if (!ret) {
158: ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
159: return 1;
160: }
161: /* receive & decode packet */
162: if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
163: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
164: return -1;
165: }
166: if (mqtt_readPINGRESP(sess->sess_buf)) {
167: ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
168: return 2;
169: }
170:
171: /* Keep Alive is OK! */
172: return 0;
173: }
174:
175: static void *
176: thrSession(struct tagSession *sess)
177: {
178: int ret, locKill = 42;
179: struct pollfd pfd;
180: struct mqtthdr *hdr;
181: ait_val_t *v;
182: struct tagStore *store;
183:
184: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
185: ioTRACE(2);
186:
187: pfd.fd = sess->sess_sock;
188: pfd.events = POLLIN | POLLPRI;
189: while (!Kill && locKill) {
190: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
191: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
192: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
193: break;
194: } else if (!ret && (ret = KASession(sess))) {
195: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
196: sess->sess_cid, sess->sess_addr, sess->sess_user);
197: break;
198: }
199: /* receive & decode packet */
200: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
201: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
202: break;
203: } else if (!ret) {
204: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
205: break;
206: } else
207: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
208:
209: /* dispatch message type */
210: switch (hdr->mqtt_msg.type) {
211: case MQTT_TYPE_CONNECT:
212: ioDEBUG(5, "Exec CONNECT session");
213: pthread_mutex_lock(&mtx_sess);
214: TAILQ_REMOVE(&Sessions, sess, sess_node);
215: pthread_mutex_unlock(&mtx_sess);
216:
217: if (call.FiniSessPUB)
218: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
219:
220: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
221: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
222:
1.2.2.1 misho 223: if (store->st_subscr.sub_topic.msg_base)
224: free(store->st_subscr.sub_topic.msg_base);
225: if (store->st_subscr.sub_value.msg_base)
226: free(store->st_subscr.sub_value.msg_base);
1.2 misho 227:
228: free(store);
229: }
230:
231: if (sess->sess_will.msg)
232: free(sess->sess_will.msg);
233: if (sess->sess_will.topic)
234: free(sess->sess_will.topic);
235:
236: if ((v = io_allocVar())) {
237: AIT_SET_STR(v, sess->sess_addr);
238: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
239: io_freeVar(v);
240: } else
241: ioLIBERR(mqtt);
242:
243: locKill ^= locKill;
244:
245: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
246: sess->sess_addr, sess->sess_user);
247: continue;
248: case MQTT_TYPE_DISCONNECT:
249: ioDEBUG(5, "Exec DISCONNECT session");
250: pthread_mutex_lock(&mtx_sess);
251: TAILQ_REMOVE(&Sessions, sess, sess_node);
252: pthread_mutex_unlock(&mtx_sess);
253:
254: finiSession(sess, 0);
255: locKill ^= locKill;
256:
257: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
258: sess->sess_addr, sess->sess_user);
259: continue;
260: case MQTT_TYPE_PUBLISH:
261: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
262: if (Publish(sess))
263: locKill ^= locKill;
264: break;
265: case MQTT_TYPE_PUBREL:
266: break;
267: case MQTT_TYPE_SUBSCRIBE:
268: break;
269: case MQTT_TYPE_UNSUBSCRIBE:
270: break;
271: case MQTT_TYPE_PINGREQ:
272: break;
273: default:
274: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
275: sess->sess_cid, hdr->mqtt_msg.type);
276: break;
277: }
278: }
279:
280: pthread_cleanup_pop(locKill);
281: pthread_exit(NULL);
282: }
283:
284: static void *
285: startSession(sched_task_t *task)
286: {
287: u_char basebuf[USHRT_MAX];
288: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
289: mqtthdr_connflgs_t flg;
290: mqtthdr_connack_t cack;
291: struct tagSession *s, *sess = NULL;
292: int ret;
293: pthread_attr_t attr;
294:
295: ioTRACE(4);
296:
297: assert(task);
298:
299: if (!TASK_DATA(task)) {
300: sess = initSession(TASK_FD(task), TASK_ARG(task));
301: if (!sess) {
302: io_freeVar(TASK_ARG(task));
303: close(TASK_FD(task));
304: return NULL;
305: }
306:
307: /* receive & decode packet */
308: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
309: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
310: finiSession(sess, 0);
311: return NULL;
312: }
313: } else {
314: sess = TASK_DATA(task);
315: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
316: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
317: }
318:
319: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
320: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
321: &sess->sess_will.topic, &sess->sess_will.msg);
322: ret = cack.retcode;
323: flg.flags = cack.reserved;
324: if (flg.reserved) {
325: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
326: goto end;
327: } else {
328: sess->sess_clean = flg.clean_sess;
329: sess->sess_will.qos = flg.will_qos;
330: sess->sess_will.retain = flg.will_retain;
331: sess->sess_will.flag = flg.will_flg;
332: }
333:
334: /* check online table for user */
335: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
336: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
337: sess->sess_user, sess->sess_pass);
338: ret = MQTT_RETCODE_DENIED;
339: goto end;
340: } else {
341: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
342: ret = MQTT_RETCODE_ACCEPTED;
343: }
344: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
345: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
346: TAILQ_FOREACH(s, &Sessions, sess_node)
347: if (!strcmp(s->sess_cid, sess->sess_cid)) {
348: /* found stale session & disconnect it! */
349: stopSession(s);
350: break;
351: }
352: }
353: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
354: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
355: sess->sess_will.qos, sess->sess_will.retain) == -1) {
356: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
357: ret = MQTT_RETCODE_DENIED;
358: goto end;
359: } else {
360: ioDEBUG(0, "Session %s from %s and username %s is started",
361: sess->sess_cid, sess->sess_addr, sess->sess_user);
362: ret = MQTT_RETCODE_ACCEPTED;
363: }
364:
365: ret = mqtt_msgCONNACK(&msg, ret);
366: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
367: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
368: finiSession(sess, 0);
369: return NULL;
370: } else {
371: ioDEBUG(5, "Sended %d bytes", ret);
372: free(msg.msg_base);
373: memset(&msg, 0, sizeof msg);
374: }
375:
376: /* Start session thread OK ... */
377: pthread_attr_init(&attr);
378: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
379:
380: pthread_mutex_lock(&mtx_sess);
381: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
382: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
383: pthread_mutex_unlock(&mtx_sess);
384:
385: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
386: sess->sess_addr, sess->sess_user, sess->sess_ka);
387:
388: pthread_attr_destroy(&attr);
389: return NULL;
390: end: /* close client connection */
391: ret = mqtt_msgCONNACK(&msg, ret);
392: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
393: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
394: } else {
395: ioDEBUG(5, "Sended %d bytes", ret);
396: free(msg.msg_base);
397: memset(&msg, 0, sizeof msg);
398: }
399:
400: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
401: finiSession(sess, 0);
402: return NULL;
403: }
404:
405: /* ----------------------------------------------------------------------- */
406:
407: static void *
408: thrSched(void *arg __unused)
409: {
410: struct tagSession *sess;
411: struct timespec pl = { 0, 10000000 };
412:
413: ioTRACE(1);
414:
415: schedPolling(root, &pl, NULL);
416: schedRun(root, &Kill);
417:
418: TAILQ_FOREACH(sess, &Sessions, sess_node)
419: if (sess->sess_tid)
420: pthread_cancel(sess->sess_tid);
421: ioDEBUG(5, "EXIT from Scheduler thread !!!");
422: pthread_exit(NULL);
423: }
424:
425: int
426: Run(int sock)
427: {
428: io_sockaddr_t sa;
429: socklen_t sslen = sizeof sa.ss;
430: int cli;
431: pthread_t tid;
432: ait_val_t *v;
433: char str[STRSIZ];
434:
435: ioTRACE(1);
436:
437: if (pthread_create(&tid, NULL, thrSched, NULL)) {
438: ioSYSERR(0);
439: return -1;
440: } else
441: pthread_detach(tid);
442: ioDEBUG(2, "Run scheduler management thread");
443:
444: if (listen(sock, SOMAXCONN) == -1) {
445: ioSYSERR(0);
446: return -1;
447: }
448:
449: while (!Kill) {
450: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
451: if (!Kill)
452: continue;
453: ioSYSERR(0);
454: break;
455: }
456: v = io_allocVar();
457: if (!v) {
458: ioLIBERR(mqtt);
459: break;
460: } else {
461: memset(str, 0, sizeof str);
462: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
463: AIT_SET_STR(v, str);
464: }
465: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
466:
467: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
468: io_freeVar(v);
469: close(cli);
470: ioDEBUG(1, "Terminated client with socket=%d", cli);
471: }
472: }
473:
474: return 0;
475: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>