Annotation of mqtt/src/daemon.c, revision 1.1.2.32
1.1.2.1 misho 1: #include "global.h"
1.1.2.16 misho 2: #include "rtlm.h"
1.1.2.24 misho 3: #include "utils.h"
1.1.2.11 misho 4: #include "mqttd.h"
1.1.2.32! misho 5: #include "mqttd_calls.h"
1.1.2.1 misho 6:
7:
1.1.2.30 misho 8: static void *startSession(sched_task_t *task);
9:
10:
1.1.2.14 misho 11: static inline struct tagSession *
1.1.2.18 misho 12: initSession(int sock, ait_val_t * __restrict v)
1.1.2.14 misho 13: {
14: struct tagSession *sess = NULL;
15: const char *str;
16:
1.1.2.20 misho 17: ioTRACE(5);
1.1.2.15 misho 18:
1.1.2.18 misho 19: if (!v)
1.1.2.14 misho 20: return NULL;
21:
22: sess = malloc(sizeof(struct tagSession));
23: if (!sess) {
1.1.2.20 misho 24: ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
1.1.2.18 misho 25: io_freeVar(v);
1.1.2.14 misho 26: return NULL;
27: } else
28: memset(sess, 0, sizeof(struct tagSession));
29:
1.1.2.32! misho 30: TAILQ_INIT(&sess->sess_sndqueue);
! 31:
1.1.2.17 misho 32: str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
1.1.2.14 misho 33: if (!str)
34: sess->sess_retry = DEFAULT_RETRY;
35: else
36: sess->sess_retry = strtol(str, NULL, 0);
37:
1.1.2.30 misho 38: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
1.1.2.14 misho 39: if (!sess->sess_buf) {
1.1.2.20 misho 40: ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.1.2.14 misho 41: free(sess);
1.1.2.18 misho 42: io_freeVar(v);
1.1.2.14 misho 43: return NULL;
44: }
45:
1.1.2.15 misho 46: sess->sess_sock = sock;
1.1.2.19 misho 47: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
1.1.2.18 misho 48: io_freeVar(v);
1.1.2.14 misho 49: return sess;
50: }
51:
52: static void
1.1.2.30 misho 53: finiSession(struct tagSession *sess, int preservSock)
1.1.2.14 misho 54: {
55: struct tagStore *store;
56:
1.1.2.20 misho 57: ioTRACE(5);
1.1.2.15 misho 58:
1.1.2.14 misho 59: if (!sess)
60: return;
61:
1.1.2.30 misho 62: if (call.FiniSessPUB)
63: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
64:
1.1.2.32! misho 65: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
! 66: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
! 67:
! 68: if (store->st_subscr.sub_topic._base)
! 69: free(store->st_subscr.sub_topic._base);
! 70: if (store->st_subscr.sub_value._base)
! 71: free(store->st_subscr.sub_value._base);
! 72:
1.1.2.30 misho 73: free(store);
74: }
1.1.2.14 misho 75:
76: if (sess->sess_will.msg)
77: free(sess->sess_will.msg);
78: if (sess->sess_will.topic)
79: free(sess->sess_will.topic);
80:
1.1.2.30 misho 81: if (sess->sess_sock > STDERR_FILENO && !preservSock)
1.1.2.24 misho 82: srv_Close(sess->sess_sock);
1.1.2.14 misho 83:
84: mqtt_msgFree(&sess->sess_buf, 42);
85:
86: free(sess);
87: }
88:
1.1.2.24 misho 89: static void
90: stopSession(struct tagSession *sess)
1.1.2.15 misho 91: {
1.1.2.24 misho 92: mqtt_msg_t msg = { NULL, 0 };
93: int ret;
94:
1.1.2.26 misho 95: ioTRACE(4);
96:
97: assert(sess);
98:
1.1.2.24 misho 99: pthread_mutex_lock(&mtx_sess);
100: TAILQ_REMOVE(&Sessions, sess, sess_node);
101: pthread_mutex_unlock(&mtx_sess);
102:
103: ret = mqtt_msgDISCONNECT(&msg);
104: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
105: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
106: else {
107: ioDEBUG(5, "Sended %d bytes for disconnect", ret);
108: free(msg.msg_base);
1.1.2.27 misho 109: memset(&msg, 0, sizeof msg);
1.1.2.18 misho 110: }
111:
1.1.2.24 misho 112: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
1.1.2.30 misho 113: finiSession(sess, 0);
1.1.2.27 misho 114:
1.1.2.28 misho 115: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
1.1.2.27 misho 116: sess->sess_addr, sess->sess_user);
1.1.2.24 misho 117: }
118:
1.1.2.26 misho 119: static int
120: KASession(struct tagSession *sess)
121: {
1.1.2.30 misho 122: mqtt_msg_t msg = { NULL, 0 };
1.1.2.26 misho 123: int ret;
124: struct pollfd pfd;
125:
126: ioTRACE(4);
127:
128: assert(sess);
129:
130: /* ping request */
131: ret = mqtt_msgPINGREQ(&msg);
132: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
133: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
134: return -1;
135: } else {
136: ioDEBUG(5, "Sended %d bytes for ping request", ret);
137: free(msg.msg_base);
138: memset(&msg, 0, sizeof msg);
139: }
140:
141: pfd.fd = sess->sess_sock;
142: pfd.events = POLLIN | POLLPRI;
1.1.2.30 misho 143: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
1.1.2.28 misho 144: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
1.1.2.26 misho 145: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
146: return -1;
147: } else if (!ret) {
1.1.2.30 misho 148: ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
1.1.2.26 misho 149: return 1;
150: }
151: /* receive & decode packet */
1.1.2.30 misho 152: if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
1.1.2.26 misho 153: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
154: return -1;
155: }
1.1.2.30 misho 156: if (mqtt_readPINGRESP(sess->sess_buf)) {
157: ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
1.1.2.26 misho 158: return 2;
159: }
160:
161: /* Keep Alive is OK! */
162: return 0;
163: }
164:
1.1.2.24 misho 165: static void *
166: thrSession(struct tagSession *sess)
167: {
1.1.2.30 misho 168: mqtt_msg_t msg = { NULL, 0 };
169: int ret, locKill = 42;
1.1.2.28 misho 170: struct pollfd pfd;
171: struct mqtthdr *hdr;
1.1.2.30 misho 172: ait_val_t *v;
173: struct tagStore *store;
1.1.2.18 misho 174:
1.1.2.28 misho 175: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
1.1.2.20 misho 176: ioTRACE(2);
1.1.2.15 misho 177:
1.1.2.28 misho 178: pfd.fd = sess->sess_sock;
179: pfd.events = POLLIN | POLLPRI;
1.1.2.30 misho 180: while (!Kill && locKill) {
181: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
1.1.2.28 misho 182: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
183: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
184: break;
185: } else if (!ret && (ret = KASession(sess))) {
186: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
187: sess->sess_cid, sess->sess_addr, sess->sess_user);
188: break;
189: }
190: /* receive & decode packet */
1.1.2.30 misho 191: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
1.1.2.28 misho 192: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
193: break;
194: } else if (!ret) {
195: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
196: break;
197: } else
1.1.2.30 misho 198: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
1.1.2.28 misho 199:
1.1.2.29 misho 200: /* dispatch message type */
1.1.2.28 misho 201: switch (hdr->mqtt_msg.type) {
1.1.2.30 misho 202: case MQTT_TYPE_CONNECT:
203: ioDEBUG(5, "Exec CONNECT session");
204: pthread_mutex_lock(&mtx_sess);
205: TAILQ_REMOVE(&Sessions, sess, sess_node);
206: pthread_mutex_unlock(&mtx_sess);
207:
208: if (call.FiniSessPUB)
209: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
210:
1.1.2.32! misho 211: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
! 212: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
! 213:
! 214: if (store->st_subscr.sub_topic._base)
! 215: free(store->st_subscr.sub_topic._base);
! 216: if (store->st_subscr.sub_value._base)
! 217: free(store->st_subscr.sub_value._base);
! 218:
1.1.2.30 misho 219: free(store);
220: }
221:
222: if (sess->sess_will.msg)
223: free(sess->sess_will.msg);
224: if (sess->sess_will.topic)
225: free(sess->sess_will.topic);
226:
227: if ((v = io_allocVar())) {
228: AIT_SET_STR(v, sess->sess_addr);
229: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
230: io_freeVar(v);
231: } else
232: ioLIBERR(mqtt);
233:
234: locKill ^= locKill;
1.1.2.31 misho 235:
236: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
237: sess->sess_addr, sess->sess_user);
1.1.2.30 misho 238: continue;
239: case MQTT_TYPE_DISCONNECT:
240: ioDEBUG(5, "Exec DISCONNECT session");
241: pthread_mutex_lock(&mtx_sess);
242: TAILQ_REMOVE(&Sessions, sess, sess_node);
243: pthread_mutex_unlock(&mtx_sess);
244:
245: finiSession(sess, 0);
246: locKill ^= locKill;
1.1.2.31 misho 247:
248: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
249: sess->sess_addr, sess->sess_user);
1.1.2.30 misho 250: continue;
251: case MQTT_TYPE_PUBLISH:
1.1.2.32! misho 252: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
! 253: if (Publish(sess))
! 254: locKill ^= locKill;
1.1.2.30 misho 255: break;
256: case MQTT_TYPE_PUBREL:
257: break;
258: case MQTT_TYPE_SUBSCRIBE:
259: break;
260: case MQTT_TYPE_UNSUBSCRIBE:
261: break;
262: case MQTT_TYPE_PINGREQ:
263: break;
1.1.2.28 misho 264: default:
265: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
266: sess->sess_cid, hdr->mqtt_msg.type);
267: break;
268: }
269: }
270:
1.1.2.30 misho 271: pthread_cleanup_pop(locKill);
1.1.2.15 misho 272: pthread_exit(NULL);
273: }
274:
275: static void *
1.1.2.5 misho 276: startSession(sched_task_t *task)
1.1.2.4 misho 277: {
1.1.2.11 misho 278: u_char basebuf[USHRT_MAX];
1.1.2.18 misho 279: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
1.1.2.10 misho 280: mqtthdr_connflgs_t flg;
1.1.2.13 misho 281: mqtthdr_connack_t cack;
1.1.2.24 misho 282: struct tagSession *s, *sess = NULL;
1.1.2.18 misho 283: int ret;
284: pthread_attr_t attr;
1.1.2.6 misho 285:
1.1.2.20 misho 286: ioTRACE(4);
1.1.2.6 misho 287:
1.1.2.26 misho 288: assert(task);
289:
1.1.2.30 misho 290: if (!TASK_DATA(task)) {
291: sess = initSession(TASK_FD(task), TASK_ARG(task));
292: if (!sess) {
293: io_freeVar(TASK_ARG(task));
294: close(TASK_FD(task));
295: return NULL;
296: }
1.1.2.6 misho 297:
1.1.2.30 misho 298: /* receive & decode packet */
299: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
300: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
301: finiSession(sess, 0);
302: return NULL;
303: }
304: } else {
305: sess = TASK_DATA(task);
306: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
307: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
1.1.2.9 misho 308: }
1.1.2.30 misho 309:
1.1.2.14 misho 310: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
1.1.2.12 misho 311: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
1.1.2.13 misho 312: &sess->sess_will.topic, &sess->sess_will.msg);
1.1.2.18 misho 313: ret = cack.retcode;
1.1.2.14 misho 314: flg.flags = cack.reserved;
315: if (flg.reserved) {
1.1.2.20 misho 316: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
1.1.2.10 misho 317: goto end;
1.1.2.14 misho 318: } else {
319: sess->sess_clean = flg.clean_sess;
320: sess->sess_will.qos = flg.will_qos;
321: sess->sess_will.retain = flg.will_retain;
322: sess->sess_will.flag = flg.will_flg;
1.1.2.12 misho 323: }
1.1.2.6 misho 324:
1.1.2.8 misho 325: /* check online table for user */
1.1.2.18 misho 326: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
1.1.2.20 misho 327: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
1.1.2.16 misho 328: sess->sess_user, sess->sess_pass);
1.1.2.21 misho 329: ret = MQTT_RETCODE_DENIED;
1.1.2.16 misho 330: goto end;
1.1.2.21 misho 331: } else {
1.1.2.24 misho 332: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
1.1.2.21 misho 333: ret = MQTT_RETCODE_ACCEPTED;
334: }
1.1.2.23 misho 335: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
1.1.2.24 misho 336: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
337: TAILQ_FOREACH(s, &Sessions, sess_node)
338: if (!strcmp(s->sess_cid, sess->sess_cid)) {
1.1.2.25 misho 339: /* found stale session & disconnect it! */
1.1.2.24 misho 340: stopSession(s);
341: break;
342: }
1.1.2.23 misho 343: }
344: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
345: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
346: sess->sess_will.qos, sess->sess_will.retain) == -1) {
347: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
348: ret = MQTT_RETCODE_DENIED;
349: goto end;
350: } else {
351: ioDEBUG(0, "Session %s from %s and username %s is started",
352: sess->sess_cid, sess->sess_addr, sess->sess_user);
353: ret = MQTT_RETCODE_ACCEPTED;
354: }
1.1.2.6 misho 355:
1.1.2.18 misho 356: ret = mqtt_msgCONNACK(&msg, ret);
357: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
1.1.2.20 misho 358: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.1.2.30 misho 359: finiSession(sess, 0);
1.1.2.18 misho 360: return NULL;
361: } else {
1.1.2.20 misho 362: ioDEBUG(5, "Sended %d bytes", ret);
1.1.2.19 misho 363: free(msg.msg_base);
1.1.2.27 misho 364: memset(&msg, 0, sizeof msg);
1.1.2.18 misho 365: }
366:
1.1.2.15 misho 367: /* Start session thread OK ... */
1.1.2.18 misho 368: pthread_attr_init(&attr);
369: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
370:
371: pthread_mutex_lock(&mtx_sess);
1.1.2.15 misho 372: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
1.1.2.18 misho 373: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
374: pthread_mutex_unlock(&mtx_sess);
375:
1.1.2.30 misho 376: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
377: sess->sess_addr, sess->sess_user, sess->sess_ka);
1.1.2.18 misho 378:
379: pthread_attr_destroy(&attr);
1.1.2.15 misho 380: return NULL;
1.1.2.6 misho 381: end: /* close client connection */
1.1.2.18 misho 382: ret = mqtt_msgCONNACK(&msg, ret);
383: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
1.1.2.20 misho 384: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.1.2.18 misho 385: } else {
1.1.2.20 misho 386: ioDEBUG(5, "Sended %d bytes", ret);
1.1.2.19 misho 387: free(msg.msg_base);
1.1.2.27 misho 388: memset(&msg, 0, sizeof msg);
1.1.2.18 misho 389: }
390:
1.1.2.20 misho 391: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
1.1.2.30 misho 392: finiSession(sess, 0);
1.1.2.6 misho 393: return NULL;
1.1.2.4 misho 394: }
395:
396: /* ----------------------------------------------------------------------- */
397:
1.1.2.5 misho 398: static void *
399: thrSched(void *arg __unused)
400: {
1.1.2.18 misho 401: struct tagSession *sess;
1.1.2.31 misho 402: struct timespec pl = { 0, 10000000 };
1.1.2.5 misho 403:
1.1.2.20 misho 404: ioTRACE(1);
405:
1.1.2.31 misho 406: schedPolling(root, &pl, NULL);
1.1.2.20 misho 407: schedRun(root, &Kill);
1.1.2.18 misho 408:
409: TAILQ_FOREACH(sess, &Sessions, sess_node)
410: if (sess->sess_tid)
411: pthread_cancel(sess->sess_tid);
1.1.2.30 misho 412: ioDEBUG(5, "EXIT from Scheduler thread !!!");
1.1.2.5 misho 413: pthread_exit(NULL);
414: }
415:
1.1.2.2 misho 416: int
1.1.2.3 misho 417: Run(int sock)
1.1.2.2 misho 418: {
1.1.2.4 misho 419: io_sockaddr_t sa;
420: socklen_t sslen = sizeof sa.ss;
421: int cli;
1.1.2.5 misho 422: pthread_t tid;
1.1.2.18 misho 423: ait_val_t *v;
1.1.2.23 misho 424: char str[STRSIZ];
1.1.2.4 misho 425:
1.1.2.20 misho 426: ioTRACE(1);
1.1.2.2 misho 427:
1.1.2.5 misho 428: if (pthread_create(&tid, NULL, thrSched, NULL)) {
1.1.2.30 misho 429: ioSYSERR(0);
1.1.2.5 misho 430: return -1;
431: } else
432: pthread_detach(tid);
1.1.2.20 misho 433: ioDEBUG(2, "Run scheduler management thread");
1.1.2.5 misho 434:
1.1.2.24 misho 435: if (listen(sock, SOMAXCONN) == -1) {
1.1.2.30 misho 436: ioSYSERR(0);
1.1.2.3 misho 437: return -1;
438: }
439:
1.1.2.4 misho 440: while (!Kill) {
441: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
1.1.2.20 misho 442: if (!Kill)
443: continue;
1.1.2.30 misho 444: ioSYSERR(0);
1.1.2.20 misho 445: break;
1.1.2.18 misho 446: }
447: v = io_allocVar();
448: if (!v) {
449: ioLIBERR(mqtt);
450: break;
1.1.2.23 misho 451: } else {
452: memset(str, 0, sizeof str);
453: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
454: AIT_SET_STR(v, str);
455: }
456: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
1.1.2.4 misho 457:
1.1.2.30 misho 458: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
1.1.2.18 misho 459: io_freeVar(v);
1.1.2.4 misho 460: close(cli);
1.1.2.20 misho 461: ioDEBUG(1, "Terminated client with socket=%d", cli);
1.1.2.4 misho 462: }
463: }
464:
1.1.2.2 misho 465: return 0;
466: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>