Annotation of mqtt/src/daemon.c, revision 1.2.2.6
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "utils.h"
4: #include "mqttd.h"
5: #include "mqttd_calls.h"
6:
7:
8: static void *startSession(sched_task_t *task);
9:
10:
11: static inline struct tagSession *
12: initSession(int sock, ait_val_t * __restrict v)
13: {
14: struct tagSession *sess = NULL;
15: const char *str;
16:
17: ioTRACE(5);
18:
19: if (!v)
20: return NULL;
21:
22: sess = malloc(sizeof(struct tagSession));
23: if (!sess) {
24: ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
25: io_freeVar(v);
26: return NULL;
27: } else
28: memset(sess, 0, sizeof(struct tagSession));
29:
1.2.2.4 misho 30: pthread_mutex_init(&sess->sess_mtx, NULL);
31:
1.2.2.6 ! misho 32: SLIST_INIT(&sess->sess_txque[MQTT_QOS_ONCE]);
! 33: SLIST_INIT(&sess->sess_txque[MQTT_QOS_ACK]);
! 34: SLIST_INIT(&sess->sess_txque[MQTT_QOS_EXACTLY]);
1.2 misho 35:
1.2.2.5 misho 36: str = cfg_getAttribute(&cfg, "mqttd", "retry");
1.2 misho 37: if (!str)
38: sess->sess_retry = DEFAULT_RETRY;
39: else
40: sess->sess_retry = strtol(str, NULL, 0);
41:
42: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
43: if (!sess->sess_buf) {
44: ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
45: free(sess);
46: io_freeVar(v);
47: return NULL;
48: }
49:
1.2.2.2 misho 50: sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
51: if (!sess->sess_srv) {
52: ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
53: mqtt_msgFree(&sess->sess_buf, 42);
54: free(sess);
55: io_freeVar(v);
56: return NULL;
1.2.2.3 misho 57: } else {
58: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT);
59: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH);
60: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL);
61: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE);
62: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE);
63: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ);
64: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT);
1.2.2.2 misho 65: }
66:
1.2 misho 67: sess->sess_sock = sock;
68: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
69: io_freeVar(v);
70: return sess;
71: }
72:
73: static void
74: finiSession(struct tagSession *sess, int preservSock)
75: {
76: struct tagStore *store;
1.2.2.6 ! misho 77: register int i;
1.2 misho 78:
79: ioTRACE(5);
80:
81: if (!sess)
82: return;
83:
84: if (call.FiniSessPUB)
85: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
86:
1.2.2.6 ! misho 87: SESS_ELEM_LOCK(sess);
! 88: for (i = 0; i < MQTT_QOS_RESERVED; i++)
! 89: while ((store = SLIST_FIRST(&sess->sess_txque[i]))) {
! 90: SLIST_REMOVE_HEAD(&sess->sess_txque[i], st_node);
! 91:
! 92: if (store->st_subscr.sub_topic.msg_base)
! 93: free(store->st_subscr.sub_topic.msg_base);
! 94: if (store->st_subscr.sub_value.msg_base)
! 95: free(store->st_subscr.sub_value.msg_base);
1.2 misho 96:
1.2.2.6 ! misho 97: free(store);
! 98: }
! 99: SESS_ELEM_UNLOCK(sess);
1.2.2.4 misho 100: pthread_mutex_destroy(&sess->sess_mtx);
1.2 misho 101:
102: if (sess->sess_will.msg)
103: free(sess->sess_will.msg);
104: if (sess->sess_will.topic)
105: free(sess->sess_will.topic);
106:
107: if (sess->sess_sock > STDERR_FILENO && !preservSock)
108: srv_Close(sess->sess_sock);
109:
1.2.2.2 misho 110: mqtt_srv_Fini(&sess->sess_srv);
1.2 misho 111: mqtt_msgFree(&sess->sess_buf, 42);
112:
113: free(sess);
114: }
115:
116: static void
117: stopSession(struct tagSession *sess)
118: {
119: mqtt_msg_t msg = { NULL, 0 };
120: int ret;
121:
122: ioTRACE(4);
123:
124: assert(sess);
125:
1.2.2.6 ! misho 126: SESS_LOCK;
1.2 misho 127: TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.6 ! misho 128: SESS_UNLOCK;
1.2 misho 129:
130: ret = mqtt_msgDISCONNECT(&msg);
131: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
132: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
133: else {
134: ioDEBUG(5, "Sended %d bytes for disconnect", ret);
135: free(msg.msg_base);
136: memset(&msg, 0, sizeof msg);
137: }
138:
139: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
140: finiSession(sess, 0);
141:
142: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
143: sess->sess_addr, sess->sess_user);
144: }
145:
146: static int
147: KASession(struct tagSession *sess)
148: {
149: mqtt_msg_t msg = { NULL, 0 };
150: int ret;
151: struct pollfd pfd;
152:
153: ioTRACE(4);
154:
155: assert(sess);
156:
157: /* ping request */
158: ret = mqtt_msgPINGREQ(&msg);
159: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
160: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
161: return -1;
162: } else {
163: ioDEBUG(5, "Sended %d bytes for ping request", ret);
164: free(msg.msg_base);
165: memset(&msg, 0, sizeof msg);
166: }
167:
168: pfd.fd = sess->sess_sock;
169: pfd.events = POLLIN | POLLPRI;
170: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
171: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
172: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
173: return -1;
174: } else if (!ret) {
175: ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
176: return 1;
177: }
178: /* receive & decode packet */
179: if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
180: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
181: return -1;
182: }
183: if (mqtt_readPINGRESP(sess->sess_buf)) {
184: ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
185: return 2;
186: }
187:
188: /* Keep Alive is OK! */
189: return 0;
190: }
191:
192: static void *
193: thrSession(struct tagSession *sess)
194: {
195: int ret, locKill = 42;
196: struct pollfd pfd;
197: struct mqtthdr *hdr;
198: ait_val_t *v;
199:
200: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
201: ioTRACE(2);
202:
203: pfd.fd = sess->sess_sock;
204: pfd.events = POLLIN | POLLPRI;
205: while (!Kill && locKill) {
206: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
207: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
208: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
209: break;
210: } else if (!ret && (ret = KASession(sess))) {
211: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
212: sess->sess_cid, sess->sess_addr, sess->sess_user);
213: break;
214: }
215: /* receive & decode packet */
216: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
217: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
218: break;
219: } else if (!ret) {
220: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
221: break;
222: } else
223: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
224:
225: /* dispatch message type */
1.2.2.3 misho 226: if (mqtt_srv_Dispatch(sess->sess_srv, sess))
227: ioLIBERR(mqtt);
1.2 misho 228: switch (hdr->mqtt_msg.type) {
229: case MQTT_TYPE_CONNECT:
230: ioDEBUG(5, "Exec CONNECT session");
231: if ((v = io_allocVar())) {
232: AIT_SET_STR(v, sess->sess_addr);
233: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
234: io_freeVar(v);
235: } else
236: ioLIBERR(mqtt);
237: locKill ^= locKill;
238: continue;
239: case MQTT_TYPE_DISCONNECT:
240: ioDEBUG(5, "Exec DISCONNECT session");
241: finiSession(sess, 0);
242: locKill ^= locKill;
243: continue;
244: case MQTT_TYPE_PUBLISH:
245: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
1.2.2.3 misho 246: /*
247: if (cmdPUBLISH(sess))
1.2 misho 248: locKill ^= locKill;
1.2.2.3 misho 249: */
1.2 misho 250: break;
251: case MQTT_TYPE_PUBREL:
252: break;
253: case MQTT_TYPE_SUBSCRIBE:
254: break;
255: case MQTT_TYPE_UNSUBSCRIBE:
256: break;
257: case MQTT_TYPE_PINGREQ:
1.2.2.4 misho 258: ioDEBUG(5, "Exec PINGREQ session");
1.2 misho 259: break;
260: default:
261: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
262: sess->sess_cid, hdr->mqtt_msg.type);
263: break;
264: }
265: }
266:
267: pthread_cleanup_pop(locKill);
268: pthread_exit(NULL);
269: }
270:
271: static void *
272: startSession(sched_task_t *task)
273: {
274: u_char basebuf[USHRT_MAX];
275: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
276: mqtthdr_connflgs_t flg;
277: mqtthdr_connack_t cack;
278: struct tagSession *s, *sess = NULL;
279: int ret;
280: pthread_attr_t attr;
281:
282: ioTRACE(4);
283:
284: assert(task);
285:
286: if (!TASK_DATA(task)) {
287: sess = initSession(TASK_FD(task), TASK_ARG(task));
288: if (!sess) {
289: io_freeVar(TASK_ARG(task));
290: close(TASK_FD(task));
291: return NULL;
292: }
293:
294: /* receive & decode packet */
295: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
296: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
297: finiSession(sess, 0);
298: return NULL;
299: }
300: } else {
301: sess = TASK_DATA(task);
302: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
303: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
304: }
305:
306: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
307: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
308: &sess->sess_will.topic, &sess->sess_will.msg);
309: ret = cack.retcode;
310: flg.flags = cack.reserved;
311: if (flg.reserved) {
312: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
313: goto end;
314: } else {
315: sess->sess_clean = flg.clean_sess;
316: sess->sess_will.qos = flg.will_qos;
317: sess->sess_will.retain = flg.will_retain;
318: sess->sess_will.flag = flg.will_flg;
319: }
320:
321: /* check online table for user */
322: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
323: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
324: sess->sess_user, sess->sess_pass);
325: ret = MQTT_RETCODE_DENIED;
326: goto end;
327: } else {
328: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
329: ret = MQTT_RETCODE_ACCEPTED;
330: }
331: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
332: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
333: TAILQ_FOREACH(s, &Sessions, sess_node)
334: if (!strcmp(s->sess_cid, sess->sess_cid)) {
335: /* found stale session & disconnect it! */
336: stopSession(s);
337: break;
338: }
339: }
340: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
341: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
342: sess->sess_will.qos, sess->sess_will.retain) == -1) {
343: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
344: ret = MQTT_RETCODE_DENIED;
345: goto end;
346: } else {
347: ioDEBUG(0, "Session %s from %s and username %s is started",
348: sess->sess_cid, sess->sess_addr, sess->sess_user);
349: ret = MQTT_RETCODE_ACCEPTED;
350: }
351:
352: ret = mqtt_msgCONNACK(&msg, ret);
353: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
354: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
355: finiSession(sess, 0);
356: return NULL;
357: } else {
358: ioDEBUG(5, "Sended %d bytes", ret);
359: free(msg.msg_base);
360: memset(&msg, 0, sizeof msg);
361: }
362:
363: /* Start session thread OK ... */
364: pthread_attr_init(&attr);
365: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
366:
1.2.2.6 ! misho 367: SESS_LOCK;
1.2 misho 368: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
369: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
1.2.2.6 ! misho 370: SESS_UNLOCK;
1.2 misho 371:
372: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
373: sess->sess_addr, sess->sess_user, sess->sess_ka);
374:
375: pthread_attr_destroy(&attr);
376: return NULL;
377: end: /* close client connection */
378: ret = mqtt_msgCONNACK(&msg, ret);
379: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
380: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
381: } else {
382: ioDEBUG(5, "Sended %d bytes", ret);
383: free(msg.msg_base);
384: memset(&msg, 0, sizeof msg);
385: }
386:
387: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
388: finiSession(sess, 0);
389: return NULL;
390: }
391:
392: /* ----------------------------------------------------------------------- */
393:
394: static void *
395: thrSched(void *arg __unused)
396: {
397: struct tagSession *sess;
398: struct timespec pl = { 0, 10000000 };
399:
400: ioTRACE(1);
401:
402: schedPolling(root, &pl, NULL);
403: schedRun(root, &Kill);
404:
405: TAILQ_FOREACH(sess, &Sessions, sess_node)
406: if (sess->sess_tid)
407: pthread_cancel(sess->sess_tid);
408: ioDEBUG(5, "EXIT from Scheduler thread !!!");
409: pthread_exit(NULL);
410: }
411:
412: int
413: Run(int sock)
414: {
415: io_sockaddr_t sa;
416: socklen_t sslen = sizeof sa.ss;
417: int cli;
418: pthread_t tid;
419: ait_val_t *v;
420: char str[STRSIZ];
421:
422: ioTRACE(1);
423:
424: if (pthread_create(&tid, NULL, thrSched, NULL)) {
425: ioSYSERR(0);
426: return -1;
427: } else
428: pthread_detach(tid);
429: ioDEBUG(2, "Run scheduler management thread");
430:
431: if (listen(sock, SOMAXCONN) == -1) {
432: ioSYSERR(0);
433: return -1;
434: }
435:
436: while (!Kill) {
437: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
438: if (!Kill)
439: continue;
440: ioSYSERR(0);
441: break;
442: }
443: v = io_allocVar();
444: if (!v) {
445: ioLIBERR(mqtt);
446: break;
447: } else {
448: memset(str, 0, sizeof str);
449: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
450: AIT_SET_STR(v, str);
451: }
452: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
453:
454: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
455: io_freeVar(v);
456: close(cli);
457: ioDEBUG(1, "Terminated client with socket=%d", cli);
458: }
459: }
460:
461: return 0;
462: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>