Annotation of mqtt/src/daemon.c, revision 1.2.2.5
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "utils.h"
4: #include "mqttd.h"
5: #include "mqttd_calls.h"
6:
7:
8: static void *startSession(sched_task_t *task);
9:
10:
11: static inline struct tagSession *
12: initSession(int sock, ait_val_t * __restrict v)
13: {
14: struct tagSession *sess = NULL;
15: const char *str;
16:
17: ioTRACE(5);
18:
19: if (!v)
20: return NULL;
21:
22: sess = malloc(sizeof(struct tagSession));
23: if (!sess) {
24: ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
25: io_freeVar(v);
26: return NULL;
27: } else
28: memset(sess, 0, sizeof(struct tagSession));
29:
1.2.2.4 misho 30: pthread_mutex_init(&sess->sess_mtx, NULL);
31:
1.2 misho 32: TAILQ_INIT(&sess->sess_sndqueue);
33:
1.2.2.5 ! misho 34: str = cfg_getAttribute(&cfg, "mqttd", "retry");
1.2 misho 35: if (!str)
36: sess->sess_retry = DEFAULT_RETRY;
37: else
38: sess->sess_retry = strtol(str, NULL, 0);
39:
40: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
41: if (!sess->sess_buf) {
42: ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
43: free(sess);
44: io_freeVar(v);
45: return NULL;
46: }
47:
1.2.2.2 misho 48: sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
49: if (!sess->sess_srv) {
50: ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
51: mqtt_msgFree(&sess->sess_buf, 42);
52: free(sess);
53: io_freeVar(v);
54: return NULL;
1.2.2.3 misho 55: } else {
56: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT);
57: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH);
58: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL);
59: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE);
60: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE);
61: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ);
62: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT);
1.2.2.2 misho 63: }
64:
1.2 misho 65: sess->sess_sock = sock;
66: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
67: io_freeVar(v);
68: return sess;
69: }
70:
71: static void
72: finiSession(struct tagSession *sess, int preservSock)
73: {
74: struct tagStore *store;
75:
76: ioTRACE(5);
77:
78: if (!sess)
79: return;
80:
81: if (call.FiniSessPUB)
82: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
83:
1.2.2.4 misho 84: pthread_mutex_lock(&sess->sess_mtx);
1.2 misho 85: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
86: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
87:
1.2.2.1 misho 88: if (store->st_subscr.sub_topic.msg_base)
89: free(store->st_subscr.sub_topic.msg_base);
90: if (store->st_subscr.sub_value.msg_base)
91: free(store->st_subscr.sub_value.msg_base);
1.2 misho 92:
93: free(store);
94: }
1.2.2.4 misho 95: pthread_mutex_unlock(&sess->sess_mtx);
96: pthread_mutex_destroy(&sess->sess_mtx);
1.2 misho 97:
98: if (sess->sess_will.msg)
99: free(sess->sess_will.msg);
100: if (sess->sess_will.topic)
101: free(sess->sess_will.topic);
102:
103: if (sess->sess_sock > STDERR_FILENO && !preservSock)
104: srv_Close(sess->sess_sock);
105:
1.2.2.2 misho 106: mqtt_srv_Fini(&sess->sess_srv);
1.2 misho 107: mqtt_msgFree(&sess->sess_buf, 42);
108:
109: free(sess);
110: }
111:
112: static void
113: stopSession(struct tagSession *sess)
114: {
115: mqtt_msg_t msg = { NULL, 0 };
116: int ret;
117:
118: ioTRACE(4);
119:
120: assert(sess);
121:
122: pthread_mutex_lock(&mtx_sess);
123: TAILQ_REMOVE(&Sessions, sess, sess_node);
124: pthread_mutex_unlock(&mtx_sess);
125:
126: ret = mqtt_msgDISCONNECT(&msg);
127: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
128: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
129: else {
130: ioDEBUG(5, "Sended %d bytes for disconnect", ret);
131: free(msg.msg_base);
132: memset(&msg, 0, sizeof msg);
133: }
134:
135: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
136: finiSession(sess, 0);
137:
138: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
139: sess->sess_addr, sess->sess_user);
140: }
141:
142: static int
143: KASession(struct tagSession *sess)
144: {
145: mqtt_msg_t msg = { NULL, 0 };
146: int ret;
147: struct pollfd pfd;
148:
149: ioTRACE(4);
150:
151: assert(sess);
152:
153: /* ping request */
154: ret = mqtt_msgPINGREQ(&msg);
155: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
156: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
157: return -1;
158: } else {
159: ioDEBUG(5, "Sended %d bytes for ping request", ret);
160: free(msg.msg_base);
161: memset(&msg, 0, sizeof msg);
162: }
163:
164: pfd.fd = sess->sess_sock;
165: pfd.events = POLLIN | POLLPRI;
166: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
167: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
168: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
169: return -1;
170: } else if (!ret) {
171: ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
172: return 1;
173: }
174: /* receive & decode packet */
175: if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
176: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
177: return -1;
178: }
179: if (mqtt_readPINGRESP(sess->sess_buf)) {
180: ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
181: return 2;
182: }
183:
184: /* Keep Alive is OK! */
185: return 0;
186: }
187:
188: static void *
189: thrSession(struct tagSession *sess)
190: {
191: int ret, locKill = 42;
192: struct pollfd pfd;
193: struct mqtthdr *hdr;
194: ait_val_t *v;
195:
196: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
197: ioTRACE(2);
198:
199: pfd.fd = sess->sess_sock;
200: pfd.events = POLLIN | POLLPRI;
201: while (!Kill && locKill) {
202: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
203: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
204: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
205: break;
206: } else if (!ret && (ret = KASession(sess))) {
207: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
208: sess->sess_cid, sess->sess_addr, sess->sess_user);
209: break;
210: }
211: /* receive & decode packet */
212: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
213: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
214: break;
215: } else if (!ret) {
216: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
217: break;
218: } else
219: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
220:
221: /* dispatch message type */
1.2.2.3 misho 222: if (mqtt_srv_Dispatch(sess->sess_srv, sess))
223: ioLIBERR(mqtt);
1.2 misho 224: switch (hdr->mqtt_msg.type) {
225: case MQTT_TYPE_CONNECT:
226: ioDEBUG(5, "Exec CONNECT session");
227: if ((v = io_allocVar())) {
228: AIT_SET_STR(v, sess->sess_addr);
229: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
230: io_freeVar(v);
231: } else
232: ioLIBERR(mqtt);
233: locKill ^= locKill;
234: continue;
235: case MQTT_TYPE_DISCONNECT:
236: ioDEBUG(5, "Exec DISCONNECT session");
237: finiSession(sess, 0);
238: locKill ^= locKill;
239: continue;
240: case MQTT_TYPE_PUBLISH:
241: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
1.2.2.3 misho 242: /*
243: if (cmdPUBLISH(sess))
1.2 misho 244: locKill ^= locKill;
1.2.2.3 misho 245: */
1.2 misho 246: break;
247: case MQTT_TYPE_PUBREL:
248: break;
249: case MQTT_TYPE_SUBSCRIBE:
250: break;
251: case MQTT_TYPE_UNSUBSCRIBE:
252: break;
253: case MQTT_TYPE_PINGREQ:
1.2.2.4 misho 254: ioDEBUG(5, "Exec PINGREQ session");
1.2 misho 255: break;
256: default:
257: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
258: sess->sess_cid, hdr->mqtt_msg.type);
259: break;
260: }
261: }
262:
263: pthread_cleanup_pop(locKill);
264: pthread_exit(NULL);
265: }
266:
267: static void *
268: startSession(sched_task_t *task)
269: {
270: u_char basebuf[USHRT_MAX];
271: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
272: mqtthdr_connflgs_t flg;
273: mqtthdr_connack_t cack;
274: struct tagSession *s, *sess = NULL;
275: int ret;
276: pthread_attr_t attr;
277:
278: ioTRACE(4);
279:
280: assert(task);
281:
282: if (!TASK_DATA(task)) {
283: sess = initSession(TASK_FD(task), TASK_ARG(task));
284: if (!sess) {
285: io_freeVar(TASK_ARG(task));
286: close(TASK_FD(task));
287: return NULL;
288: }
289:
290: /* receive & decode packet */
291: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
292: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
293: finiSession(sess, 0);
294: return NULL;
295: }
296: } else {
297: sess = TASK_DATA(task);
298: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
299: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
300: }
301:
302: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
303: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
304: &sess->sess_will.topic, &sess->sess_will.msg);
305: ret = cack.retcode;
306: flg.flags = cack.reserved;
307: if (flg.reserved) {
308: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
309: goto end;
310: } else {
311: sess->sess_clean = flg.clean_sess;
312: sess->sess_will.qos = flg.will_qos;
313: sess->sess_will.retain = flg.will_retain;
314: sess->sess_will.flag = flg.will_flg;
315: }
316:
317: /* check online table for user */
318: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
319: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
320: sess->sess_user, sess->sess_pass);
321: ret = MQTT_RETCODE_DENIED;
322: goto end;
323: } else {
324: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
325: ret = MQTT_RETCODE_ACCEPTED;
326: }
327: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
328: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
329: TAILQ_FOREACH(s, &Sessions, sess_node)
330: if (!strcmp(s->sess_cid, sess->sess_cid)) {
331: /* found stale session & disconnect it! */
332: stopSession(s);
333: break;
334: }
335: }
336: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
337: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
338: sess->sess_will.qos, sess->sess_will.retain) == -1) {
339: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
340: ret = MQTT_RETCODE_DENIED;
341: goto end;
342: } else {
343: ioDEBUG(0, "Session %s from %s and username %s is started",
344: sess->sess_cid, sess->sess_addr, sess->sess_user);
345: ret = MQTT_RETCODE_ACCEPTED;
346: }
347:
348: ret = mqtt_msgCONNACK(&msg, ret);
349: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
350: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
351: finiSession(sess, 0);
352: return NULL;
353: } else {
354: ioDEBUG(5, "Sended %d bytes", ret);
355: free(msg.msg_base);
356: memset(&msg, 0, sizeof msg);
357: }
358:
359: /* Start session thread OK ... */
360: pthread_attr_init(&attr);
361: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
362:
363: pthread_mutex_lock(&mtx_sess);
364: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
365: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
366: pthread_mutex_unlock(&mtx_sess);
367:
368: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
369: sess->sess_addr, sess->sess_user, sess->sess_ka);
370:
371: pthread_attr_destroy(&attr);
372: return NULL;
373: end: /* close client connection */
374: ret = mqtt_msgCONNACK(&msg, ret);
375: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
376: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
377: } else {
378: ioDEBUG(5, "Sended %d bytes", ret);
379: free(msg.msg_base);
380: memset(&msg, 0, sizeof msg);
381: }
382:
383: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
384: finiSession(sess, 0);
385: return NULL;
386: }
387:
388: /* ----------------------------------------------------------------------- */
389:
390: static void *
391: thrSched(void *arg __unused)
392: {
393: struct tagSession *sess;
394: struct timespec pl = { 0, 10000000 };
395:
396: ioTRACE(1);
397:
398: schedPolling(root, &pl, NULL);
399: schedRun(root, &Kill);
400:
401: TAILQ_FOREACH(sess, &Sessions, sess_node)
402: if (sess->sess_tid)
403: pthread_cancel(sess->sess_tid);
404: ioDEBUG(5, "EXIT from Scheduler thread !!!");
405: pthread_exit(NULL);
406: }
407:
408: int
409: Run(int sock)
410: {
411: io_sockaddr_t sa;
412: socklen_t sslen = sizeof sa.ss;
413: int cli;
414: pthread_t tid;
415: ait_val_t *v;
416: char str[STRSIZ];
417:
418: ioTRACE(1);
419:
420: if (pthread_create(&tid, NULL, thrSched, NULL)) {
421: ioSYSERR(0);
422: return -1;
423: } else
424: pthread_detach(tid);
425: ioDEBUG(2, "Run scheduler management thread");
426:
427: if (listen(sock, SOMAXCONN) == -1) {
428: ioSYSERR(0);
429: return -1;
430: }
431:
432: while (!Kill) {
433: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
434: if (!Kill)
435: continue;
436: ioSYSERR(0);
437: break;
438: }
439: v = io_allocVar();
440: if (!v) {
441: ioLIBERR(mqtt);
442: break;
443: } else {
444: memset(str, 0, sizeof str);
445: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
446: AIT_SET_STR(v, str);
447: }
448: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
449:
450: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
451: io_freeVar(v);
452: close(cli);
453: ioDEBUG(1, "Terminated client with socket=%d", cli);
454: }
455: }
456:
457: return 0;
458: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>