Annotation of mqtt/src/daemon.c, revision 1.2.2.8
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "utils.h"
4: #include "mqttd.h"
5: #include "mqttd_calls.h"
6:
7:
8: static void *startSession(sched_task_t *task);
9:
10:
11: static inline struct tagSession *
12: initSession(int sock, ait_val_t * __restrict v)
13: {
14: struct tagSession *sess = NULL;
15: const char *str;
16:
17: ioTRACE(5);
18:
19: if (!v)
20: return NULL;
21:
22: sess = malloc(sizeof(struct tagSession));
23: if (!sess) {
24: ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
25: io_freeVar(v);
26: return NULL;
27: } else
28: memset(sess, 0, sizeof(struct tagSession));
29:
1.2.2.4 misho 30: pthread_mutex_init(&sess->sess_mtx, NULL);
31:
1.2.2.6 misho 32: SLIST_INIT(&sess->sess_txque[MQTT_QOS_ONCE]);
33: SLIST_INIT(&sess->sess_txque[MQTT_QOS_ACK]);
34: SLIST_INIT(&sess->sess_txque[MQTT_QOS_EXACTLY]);
1.2 misho 35:
1.2.2.5 misho 36: str = cfg_getAttribute(&cfg, "mqttd", "retry");
1.2 misho 37: if (!str)
38: sess->sess_retry = DEFAULT_RETRY;
39: else
40: sess->sess_retry = strtol(str, NULL, 0);
41:
42: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
43: if (!sess->sess_buf) {
44: ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
45: free(sess);
46: io_freeVar(v);
47: return NULL;
48: }
49:
1.2.2.8 ! misho 50: /* init server actor */
1.2.2.2 misho 51: sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
52: if (!sess->sess_srv) {
53: ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
54: mqtt_msgFree(&sess->sess_buf, 42);
55: free(sess);
56: io_freeVar(v);
57: return NULL;
1.2.2.3 misho 58: } else {
59: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT);
60: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH);
61: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL);
62: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE);
63: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE);
64: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ);
65: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT);
1.2.2.2 misho 66: }
67:
1.2 misho 68: sess->sess_sock = sock;
69: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
70: io_freeVar(v);
71: return sess;
72: }
73:
74: static void
75: finiSession(struct tagSession *sess, int preservSock)
76: {
77: struct tagStore *store;
1.2.2.6 misho 78: register int i;
1.2 misho 79:
80: ioTRACE(5);
81:
82: if (!sess)
83: return;
84:
85: if (call.FiniSessPUB)
86: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
87:
1.2.2.6 misho 88: SESS_ELEM_LOCK(sess);
89: for (i = 0; i < MQTT_QOS_RESERVED; i++)
90: while ((store = SLIST_FIRST(&sess->sess_txque[i]))) {
91: SLIST_REMOVE_HEAD(&sess->sess_txque[i], st_node);
92:
93: if (store->st_subscr.sub_topic.msg_base)
94: free(store->st_subscr.sub_topic.msg_base);
95: if (store->st_subscr.sub_value.msg_base)
96: free(store->st_subscr.sub_value.msg_base);
1.2 misho 97:
1.2.2.6 misho 98: free(store);
99: }
100: SESS_ELEM_UNLOCK(sess);
1.2.2.4 misho 101: pthread_mutex_destroy(&sess->sess_mtx);
1.2 misho 102:
103: if (sess->sess_will.msg)
104: free(sess->sess_will.msg);
105: if (sess->sess_will.topic)
106: free(sess->sess_will.topic);
107:
108: if (sess->sess_sock > STDERR_FILENO && !preservSock)
109: srv_Close(sess->sess_sock);
110:
1.2.2.2 misho 111: mqtt_srv_Fini(&sess->sess_srv);
1.2 misho 112: mqtt_msgFree(&sess->sess_buf, 42);
113:
114: free(sess);
115: }
116:
117: static void
118: stopSession(struct tagSession *sess)
119: {
120: mqtt_msg_t msg = { NULL, 0 };
121: int ret;
122:
123: ioTRACE(4);
124:
125: assert(sess);
126:
1.2.2.6 misho 127: SESS_LOCK;
1.2 misho 128: TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.6 misho 129: SESS_UNLOCK;
1.2 misho 130:
131: ret = mqtt_msgDISCONNECT(&msg);
132: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
133: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
134: else {
135: ioDEBUG(5, "Sended %d bytes for disconnect", ret);
136: free(msg.msg_base);
137: memset(&msg, 0, sizeof msg);
138: }
139:
140: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
141: finiSession(sess, 0);
142:
143: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
144: sess->sess_addr, sess->sess_user);
145: }
146:
147: static int
148: KASession(struct tagSession *sess)
149: {
150: mqtt_msg_t msg = { NULL, 0 };
151: int ret;
152: struct pollfd pfd;
153:
154: ioTRACE(4);
155:
156: assert(sess);
157:
158: /* ping request */
159: ret = mqtt_msgPINGREQ(&msg);
160: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
161: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
162: return -1;
163: } else {
164: ioDEBUG(5, "Sended %d bytes for ping request", ret);
165: free(msg.msg_base);
166: memset(&msg, 0, sizeof msg);
167: }
168:
169: pfd.fd = sess->sess_sock;
170: pfd.events = POLLIN | POLLPRI;
171: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
172: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
173: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
174: return -1;
175: } else if (!ret) {
176: ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
177: return 1;
178: }
179: /* receive & decode packet */
180: if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
181: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
182: return -1;
183: }
184: if (mqtt_readPINGRESP(sess->sess_buf)) {
185: ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
186: return 2;
187: }
188:
189: /* Keep Alive is OK! */
190: return 0;
191: }
192:
193: static void *
194: thrSession(struct tagSession *sess)
195: {
196: int ret, locKill = 42;
197: struct pollfd pfd;
198: struct mqtthdr *hdr;
199: ait_val_t *v;
200:
201: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
202: ioTRACE(2);
203:
204: pfd.fd = sess->sess_sock;
205: pfd.events = POLLIN | POLLPRI;
206: while (!Kill && locKill) {
207: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
208: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
209: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
210: break;
211: } else if (!ret && (ret = KASession(sess))) {
212: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
213: sess->sess_cid, sess->sess_addr, sess->sess_user);
214: break;
215: }
216: /* receive & decode packet */
217: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
218: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
219: break;
220: } else if (!ret) {
221: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
222: break;
223: } else
224: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
225:
226: /* dispatch message type */
1.2.2.3 misho 227: if (mqtt_srv_Dispatch(sess->sess_srv, sess))
228: ioLIBERR(mqtt);
1.2 misho 229: switch (hdr->mqtt_msg.type) {
230: case MQTT_TYPE_CONNECT:
231: ioDEBUG(5, "Exec CONNECT session");
232: if ((v = io_allocVar())) {
233: AIT_SET_STR(v, sess->sess_addr);
234: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
235: io_freeVar(v);
236: } else
237: ioLIBERR(mqtt);
238: locKill ^= locKill;
239: continue;
240: case MQTT_TYPE_DISCONNECT:
241: ioDEBUG(5, "Exec DISCONNECT session");
242: finiSession(sess, 0);
243: locKill ^= locKill;
244: continue;
245: case MQTT_TYPE_PUBLISH:
246: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
1.2.2.3 misho 247: /*
248: if (cmdPUBLISH(sess))
1.2 misho 249: locKill ^= locKill;
1.2.2.3 misho 250: */
1.2 misho 251: break;
252: case MQTT_TYPE_PUBREL:
253: break;
254: case MQTT_TYPE_SUBSCRIBE:
255: break;
256: case MQTT_TYPE_UNSUBSCRIBE:
257: break;
258: case MQTT_TYPE_PINGREQ:
1.2.2.4 misho 259: ioDEBUG(5, "Exec PINGREQ session");
1.2 misho 260: break;
261: default:
262: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
263: sess->sess_cid, hdr->mqtt_msg.type);
264: break;
265: }
266: }
267:
268: pthread_cleanup_pop(locKill);
269: pthread_exit(NULL);
270: }
271:
272: static void *
273: startSession(sched_task_t *task)
274: {
275: u_char basebuf[USHRT_MAX];
276: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
277: mqtthdr_connflgs_t flg;
278: mqtthdr_connack_t cack;
279: struct tagSession *s, *sess = NULL;
280: int ret;
281: pthread_attr_t attr;
282:
283: ioTRACE(4);
284:
285: assert(task);
286:
287: if (!TASK_DATA(task)) {
1.2.2.8 ! misho 288: /* flow from accept new clients */
1.2 misho 289: sess = initSession(TASK_FD(task), TASK_ARG(task));
290: if (!sess) {
291: io_freeVar(TASK_ARG(task));
292: close(TASK_FD(task));
293: return NULL;
294: }
295:
296: /* receive & decode packet */
297: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
298: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
299: finiSession(sess, 0);
300: return NULL;
301: }
302: } else {
303: sess = TASK_DATA(task);
304: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
305: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
306: }
307:
308: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
309: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
310: &sess->sess_will.topic, &sess->sess_will.msg);
311: ret = cack.retcode;
312: flg.flags = cack.reserved;
313: if (flg.reserved) {
314: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
315: goto end;
316: } else {
317: sess->sess_clean = flg.clean_sess;
318: sess->sess_will.qos = flg.will_qos;
319: sess->sess_will.retain = flg.will_retain;
320: sess->sess_will.flag = flg.will_flg;
321: }
322:
323: /* check online table for user */
324: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
325: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
326: sess->sess_user, sess->sess_pass);
327: ret = MQTT_RETCODE_DENIED;
328: goto end;
329: } else {
330: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
331: ret = MQTT_RETCODE_ACCEPTED;
332: }
333: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
334: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
335: TAILQ_FOREACH(s, &Sessions, sess_node)
336: if (!strcmp(s->sess_cid, sess->sess_cid)) {
337: /* found stale session & disconnect it! */
338: stopSession(s);
339: break;
340: }
341: }
342: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
343: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
344: sess->sess_will.qos, sess->sess_will.retain) == -1) {
345: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
346: ret = MQTT_RETCODE_DENIED;
347: goto end;
348: } else {
349: ioDEBUG(0, "Session %s from %s and username %s is started",
350: sess->sess_cid, sess->sess_addr, sess->sess_user);
351: ret = MQTT_RETCODE_ACCEPTED;
352: }
353:
354: ret = mqtt_msgCONNACK(&msg, ret);
355: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
356: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
357: finiSession(sess, 0);
358: return NULL;
359: } else {
360: ioDEBUG(5, "Sended %d bytes", ret);
361: free(msg.msg_base);
362: memset(&msg, 0, sizeof msg);
363: }
364:
365: /* Start session thread OK ... */
366: pthread_attr_init(&attr);
367: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
368:
1.2.2.6 misho 369: SESS_LOCK;
1.2 misho 370: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
371: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
1.2.2.6 misho 372: SESS_UNLOCK;
1.2 misho 373:
374: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
375: sess->sess_addr, sess->sess_user, sess->sess_ka);
376:
377: pthread_attr_destroy(&attr);
378: return NULL;
379: end: /* close client connection */
380: ret = mqtt_msgCONNACK(&msg, ret);
381: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
382: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
383: } else {
384: ioDEBUG(5, "Sended %d bytes", ret);
385: free(msg.msg_base);
386: memset(&msg, 0, sizeof msg);
387: }
388:
389: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
390: finiSession(sess, 0);
391: return NULL;
392: }
393:
394: /* ----------------------------------------------------------------------- */
395:
396: static void *
397: thrSched(void *arg __unused)
398: {
399: struct tagSession *sess;
1.2.2.8 ! misho 400: struct timespec pl = { 0, 100000000 };
1.2 misho 401:
402: ioTRACE(1);
403:
1.2.2.8 ! misho 404: /* start scheduler loop */
1.2 misho 405: schedPolling(root, &pl, NULL);
406: schedRun(root, &Kill);
407:
408: TAILQ_FOREACH(sess, &Sessions, sess_node)
409: if (sess->sess_tid)
410: pthread_cancel(sess->sess_tid);
411: ioDEBUG(5, "EXIT from Scheduler thread !!!");
412: pthread_exit(NULL);
413: }
414:
415: int
416: Run(int sock)
417: {
418: io_sockaddr_t sa;
419: socklen_t sslen = sizeof sa.ss;
420: int cli;
421: pthread_t tid;
422: ait_val_t *v;
423: char str[STRSIZ];
424:
425: ioTRACE(1);
426:
1.2.2.7 misho 427: /* start alternative thread for scheduler */
1.2 misho 428: if (pthread_create(&tid, NULL, thrSched, NULL)) {
429: ioSYSERR(0);
430: return -1;
431: } else
432: pthread_detach(tid);
433: ioDEBUG(2, "Run scheduler management thread");
434:
435: if (listen(sock, SOMAXCONN) == -1) {
436: ioSYSERR(0);
437: return -1;
438: }
439:
1.2.2.8 ! misho 440: /* state machine - accept new connections */
1.2 misho 441: while (!Kill) {
442: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
443: if (!Kill)
444: continue;
445: ioSYSERR(0);
446: break;
447: }
448: v = io_allocVar();
449: if (!v) {
1.2.2.7 misho 450: ioLIBERR(io);
1.2 misho 451: break;
452: } else {
453: memset(str, 0, sizeof str);
454: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
1.2.2.7 misho 455: AIT_FREE_VAL(v);
1.2 misho 456: AIT_SET_STR(v, str);
457: }
458: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
459:
460: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
461: io_freeVar(v);
462: close(cli);
463: ioDEBUG(1, "Terminated client with socket=%d", cli);
464: }
465: }
466:
467: return 0;
468: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>