Annotation of mqtt/src/daemon.c, revision 1.2.2.3
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "utils.h"
4: #include "mqttd.h"
5: #include "mqttd_calls.h"
6:
7:
8: static void *startSession(sched_task_t *task);
9:
10:
11: static inline struct tagSession *
12: initSession(int sock, ait_val_t * __restrict v)
13: {
14: struct tagSession *sess = NULL;
15: const char *str;
16:
17: ioTRACE(5);
18:
19: if (!v)
20: return NULL;
21:
22: sess = malloc(sizeof(struct tagSession));
23: if (!sess) {
24: ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
25: io_freeVar(v);
26: return NULL;
27: } else
28: memset(sess, 0, sizeof(struct tagSession));
29:
30: TAILQ_INIT(&sess->sess_sndqueue);
31:
32: str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
33: if (!str)
34: sess->sess_retry = DEFAULT_RETRY;
35: else
36: sess->sess_retry = strtol(str, NULL, 0);
37:
38: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
39: if (!sess->sess_buf) {
40: ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
41: free(sess);
42: io_freeVar(v);
43: return NULL;
44: }
45:
1.2.2.2 misho 46: sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
47: if (!sess->sess_srv) {
48: ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
49: mqtt_msgFree(&sess->sess_buf, 42);
50: free(sess);
51: io_freeVar(v);
52: return NULL;
1.2.2.3 ! misho 53: } else {
! 54: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT);
! 55: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH);
! 56: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL);
! 57: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE);
! 58: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE);
! 59: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ);
! 60: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT);
1.2.2.2 misho 61: }
62:
1.2 misho 63: sess->sess_sock = sock;
64: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
65: io_freeVar(v);
66: return sess;
67: }
68:
69: static void
70: finiSession(struct tagSession *sess, int preservSock)
71: {
72: struct tagStore *store;
73:
74: ioTRACE(5);
75:
76: if (!sess)
77: return;
78:
79: if (call.FiniSessPUB)
80: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
81:
82: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
83: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
84:
1.2.2.1 misho 85: if (store->st_subscr.sub_topic.msg_base)
86: free(store->st_subscr.sub_topic.msg_base);
87: if (store->st_subscr.sub_value.msg_base)
88: free(store->st_subscr.sub_value.msg_base);
1.2 misho 89:
90: free(store);
91: }
92:
93: if (sess->sess_will.msg)
94: free(sess->sess_will.msg);
95: if (sess->sess_will.topic)
96: free(sess->sess_will.topic);
97:
98: if (sess->sess_sock > STDERR_FILENO && !preservSock)
99: srv_Close(sess->sess_sock);
100:
1.2.2.2 misho 101: mqtt_srv_Fini(&sess->sess_srv);
1.2 misho 102: mqtt_msgFree(&sess->sess_buf, 42);
103:
104: free(sess);
105: }
106:
107: static void
108: stopSession(struct tagSession *sess)
109: {
110: mqtt_msg_t msg = { NULL, 0 };
111: int ret;
112:
113: ioTRACE(4);
114:
115: assert(sess);
116:
117: pthread_mutex_lock(&mtx_sess);
118: TAILQ_REMOVE(&Sessions, sess, sess_node);
119: pthread_mutex_unlock(&mtx_sess);
120:
121: ret = mqtt_msgDISCONNECT(&msg);
122: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
123: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
124: else {
125: ioDEBUG(5, "Sended %d bytes for disconnect", ret);
126: free(msg.msg_base);
127: memset(&msg, 0, sizeof msg);
128: }
129:
130: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
131: finiSession(sess, 0);
132:
133: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
134: sess->sess_addr, sess->sess_user);
135: }
136:
137: static int
138: KASession(struct tagSession *sess)
139: {
140: mqtt_msg_t msg = { NULL, 0 };
141: int ret;
142: struct pollfd pfd;
143:
144: ioTRACE(4);
145:
146: assert(sess);
147:
148: /* ping request */
149: ret = mqtt_msgPINGREQ(&msg);
150: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
151: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
152: return -1;
153: } else {
154: ioDEBUG(5, "Sended %d bytes for ping request", ret);
155: free(msg.msg_base);
156: memset(&msg, 0, sizeof msg);
157: }
158:
159: pfd.fd = sess->sess_sock;
160: pfd.events = POLLIN | POLLPRI;
161: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
162: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
163: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
164: return -1;
165: } else if (!ret) {
166: ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
167: return 1;
168: }
169: /* receive & decode packet */
170: if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
171: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
172: return -1;
173: }
174: if (mqtt_readPINGRESP(sess->sess_buf)) {
175: ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
176: return 2;
177: }
178:
179: /* Keep Alive is OK! */
180: return 0;
181: }
182:
183: static void *
184: thrSession(struct tagSession *sess)
185: {
186: int ret, locKill = 42;
187: struct pollfd pfd;
188: struct mqtthdr *hdr;
189: ait_val_t *v;
190:
191: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
192: ioTRACE(2);
193:
194: pfd.fd = sess->sess_sock;
195: pfd.events = POLLIN | POLLPRI;
196: while (!Kill && locKill) {
197: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
198: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
199: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
200: break;
201: } else if (!ret && (ret = KASession(sess))) {
202: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
203: sess->sess_cid, sess->sess_addr, sess->sess_user);
204: break;
205: }
206: /* receive & decode packet */
207: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
208: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
209: break;
210: } else if (!ret) {
211: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
212: break;
213: } else
214: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
215:
216: /* dispatch message type */
1.2.2.3 ! misho 217: if (mqtt_srv_Dispatch(sess->sess_srv, sess))
! 218: ioLIBERR(mqtt);
1.2 misho 219: switch (hdr->mqtt_msg.type) {
220: case MQTT_TYPE_CONNECT:
221: ioDEBUG(5, "Exec CONNECT session");
222: if ((v = io_allocVar())) {
223: AIT_SET_STR(v, sess->sess_addr);
224: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
225: io_freeVar(v);
226: } else
227: ioLIBERR(mqtt);
228: locKill ^= locKill;
229: continue;
230: case MQTT_TYPE_DISCONNECT:
231: ioDEBUG(5, "Exec DISCONNECT session");
232: finiSession(sess, 0);
233: locKill ^= locKill;
234: continue;
235: case MQTT_TYPE_PUBLISH:
236: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
1.2.2.3 ! misho 237: /*
! 238: if (cmdPUBLISH(sess))
1.2 misho 239: locKill ^= locKill;
1.2.2.3 ! misho 240: */
1.2 misho 241: break;
242: case MQTT_TYPE_PUBREL:
243: break;
244: case MQTT_TYPE_SUBSCRIBE:
245: break;
246: case MQTT_TYPE_UNSUBSCRIBE:
247: break;
248: case MQTT_TYPE_PINGREQ:
249: break;
250: default:
251: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
252: sess->sess_cid, hdr->mqtt_msg.type);
253: break;
254: }
255: }
256:
257: pthread_cleanup_pop(locKill);
258: pthread_exit(NULL);
259: }
260:
261: static void *
262: startSession(sched_task_t *task)
263: {
264: u_char basebuf[USHRT_MAX];
265: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
266: mqtthdr_connflgs_t flg;
267: mqtthdr_connack_t cack;
268: struct tagSession *s, *sess = NULL;
269: int ret;
270: pthread_attr_t attr;
271:
272: ioTRACE(4);
273:
274: assert(task);
275:
276: if (!TASK_DATA(task)) {
277: sess = initSession(TASK_FD(task), TASK_ARG(task));
278: if (!sess) {
279: io_freeVar(TASK_ARG(task));
280: close(TASK_FD(task));
281: return NULL;
282: }
283:
284: /* receive & decode packet */
285: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
286: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
287: finiSession(sess, 0);
288: return NULL;
289: }
290: } else {
291: sess = TASK_DATA(task);
292: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
293: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
294: }
295:
296: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
297: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
298: &sess->sess_will.topic, &sess->sess_will.msg);
299: ret = cack.retcode;
300: flg.flags = cack.reserved;
301: if (flg.reserved) {
302: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
303: goto end;
304: } else {
305: sess->sess_clean = flg.clean_sess;
306: sess->sess_will.qos = flg.will_qos;
307: sess->sess_will.retain = flg.will_retain;
308: sess->sess_will.flag = flg.will_flg;
309: }
310:
311: /* check online table for user */
312: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
313: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
314: sess->sess_user, sess->sess_pass);
315: ret = MQTT_RETCODE_DENIED;
316: goto end;
317: } else {
318: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
319: ret = MQTT_RETCODE_ACCEPTED;
320: }
321: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
322: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
323: TAILQ_FOREACH(s, &Sessions, sess_node)
324: if (!strcmp(s->sess_cid, sess->sess_cid)) {
325: /* found stale session & disconnect it! */
326: stopSession(s);
327: break;
328: }
329: }
330: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
331: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
332: sess->sess_will.qos, sess->sess_will.retain) == -1) {
333: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
334: ret = MQTT_RETCODE_DENIED;
335: goto end;
336: } else {
337: ioDEBUG(0, "Session %s from %s and username %s is started",
338: sess->sess_cid, sess->sess_addr, sess->sess_user);
339: ret = MQTT_RETCODE_ACCEPTED;
340: }
341:
342: ret = mqtt_msgCONNACK(&msg, ret);
343: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
344: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
345: finiSession(sess, 0);
346: return NULL;
347: } else {
348: ioDEBUG(5, "Sended %d bytes", ret);
349: free(msg.msg_base);
350: memset(&msg, 0, sizeof msg);
351: }
352:
353: /* Start session thread OK ... */
354: pthread_attr_init(&attr);
355: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
356:
357: pthread_mutex_lock(&mtx_sess);
358: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
359: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
360: pthread_mutex_unlock(&mtx_sess);
361:
362: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
363: sess->sess_addr, sess->sess_user, sess->sess_ka);
364:
365: pthread_attr_destroy(&attr);
366: return NULL;
367: end: /* close client connection */
368: ret = mqtt_msgCONNACK(&msg, ret);
369: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
370: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
371: } else {
372: ioDEBUG(5, "Sended %d bytes", ret);
373: free(msg.msg_base);
374: memset(&msg, 0, sizeof msg);
375: }
376:
377: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
378: finiSession(sess, 0);
379: return NULL;
380: }
381:
382: /* ----------------------------------------------------------------------- */
383:
384: static void *
385: thrSched(void *arg __unused)
386: {
387: struct tagSession *sess;
388: struct timespec pl = { 0, 10000000 };
389:
390: ioTRACE(1);
391:
392: schedPolling(root, &pl, NULL);
393: schedRun(root, &Kill);
394:
395: TAILQ_FOREACH(sess, &Sessions, sess_node)
396: if (sess->sess_tid)
397: pthread_cancel(sess->sess_tid);
398: ioDEBUG(5, "EXIT from Scheduler thread !!!");
399: pthread_exit(NULL);
400: }
401:
402: int
403: Run(int sock)
404: {
405: io_sockaddr_t sa;
406: socklen_t sslen = sizeof sa.ss;
407: int cli;
408: pthread_t tid;
409: ait_val_t *v;
410: char str[STRSIZ];
411:
412: ioTRACE(1);
413:
414: if (pthread_create(&tid, NULL, thrSched, NULL)) {
415: ioSYSERR(0);
416: return -1;
417: } else
418: pthread_detach(tid);
419: ioDEBUG(2, "Run scheduler management thread");
420:
421: if (listen(sock, SOMAXCONN) == -1) {
422: ioSYSERR(0);
423: return -1;
424: }
425:
426: while (!Kill) {
427: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
428: if (!Kill)
429: continue;
430: ioSYSERR(0);
431: break;
432: }
433: v = io_allocVar();
434: if (!v) {
435: ioLIBERR(mqtt);
436: break;
437: } else {
438: memset(str, 0, sizeof str);
439: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
440: AIT_SET_STR(v, str);
441: }
442: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
443:
444: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
445: io_freeVar(v);
446: close(cli);
447: ioDEBUG(1, "Terminated client with socket=%d", cli);
448: }
449: }
450:
451: return 0;
452: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>