Annotation of mqtt/src/daemon.c, revision 1.2.2.16
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "utils.h"
4: #include "mqttd.h"
5: #include "mqttd_calls.h"
6:
7:
8: static void *startSession(sched_task_t *task);
1.2.2.10 misho 9: static pthread_attr_t attr;
1.2 misho 10:
11:
12: static inline struct tagSession *
13: initSession(int sock, ait_val_t * __restrict v)
14: {
15: struct tagSession *sess = NULL;
16: const char *str;
17:
18: ioTRACE(5);
19:
20: if (!v)
21: return NULL;
22:
23: sess = malloc(sizeof(struct tagSession));
24: if (!sess) {
1.2.2.9 misho 25: ioSYSERR(0);
1.2 misho 26: io_freeVar(v);
27: return NULL;
28: } else
29: memset(sess, 0, sizeof(struct tagSession));
30:
1.2.2.4 misho 31: pthread_mutex_init(&sess->sess_mtx, NULL);
32:
1.2.2.9 misho 33: SLIST_INIT(&sess->sess_subscr);
1.2 misho 34:
1.2.2.5 misho 35: str = cfg_getAttribute(&cfg, "mqttd", "retry");
1.2 misho 36: if (!str)
37: sess->sess_retry = DEFAULT_RETRY;
38: else
39: sess->sess_retry = strtol(str, NULL, 0);
40:
41: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
42: if (!sess->sess_buf) {
1.2.2.9 misho 43: ioLIBERR(mqtt);
1.2 misho 44: free(sess);
45: io_freeVar(v);
46: return NULL;
47: }
48:
1.2.2.8 misho 49: /* init server actor */
1.2.2.2 misho 50: sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
51: if (!sess->sess_srv) {
52: ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
53: mqtt_msgFree(&sess->sess_buf, 42);
54: free(sess);
55: io_freeVar(v);
56: return NULL;
1.2.2.3 misho 57: } else {
58: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT);
59: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH);
60: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL);
61: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE);
62: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE);
63: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ);
64: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT);
1.2.2.2 misho 65: }
66:
1.2 misho 67: sess->sess_sock = sock;
68: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
69: io_freeVar(v);
70: return sess;
71: }
72:
73: static void
1.2.2.9 misho 74: finiSession(struct tagSession *sess)
1.2 misho 75: {
76: struct tagStore *store;
77:
78: ioTRACE(5);
79:
80: if (!sess)
81: return;
82:
83: if (call.FiniSessPUB)
84: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
85:
1.2.2.6 misho 86: SESS_ELEM_LOCK(sess);
1.2 misho 87:
1.2.2.9 misho 88: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
89: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
90:
91: if (store->st_subscr.sub_topic.msg_base)
92: free(store->st_subscr.sub_topic.msg_base);
93: if (store->st_subscr.sub_value.msg_base)
94: free(store->st_subscr.sub_value.msg_base);
95:
96: free(store);
97: }
1.2.2.6 misho 98: SESS_ELEM_UNLOCK(sess);
1.2.2.4 misho 99: pthread_mutex_destroy(&sess->sess_mtx);
1.2 misho 100:
101: if (sess->sess_will.msg)
102: free(sess->sess_will.msg);
103: if (sess->sess_will.topic)
104: free(sess->sess_will.topic);
105:
1.2.2.9 misho 106: if (sess->sess_sock > STDERR_FILENO)
1.2 misho 107: srv_Close(sess->sess_sock);
108:
1.2.2.2 misho 109: mqtt_srv_Fini(&sess->sess_srv);
1.2 misho 110: mqtt_msgFree(&sess->sess_buf, 42);
111:
112: free(sess);
113: }
114:
115: static void
116: stopSession(struct tagSession *sess)
117: {
118: mqtt_msg_t msg = { NULL, 0 };
119: int ret;
120:
121: ioTRACE(4);
122:
123: assert(sess);
124:
1.2.2.6 misho 125: SESS_LOCK;
1.2 misho 126: TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.6 misho 127: SESS_UNLOCK;
1.2 misho 128:
129: ret = mqtt_msgDISCONNECT(&msg);
1.2.2.9 misho 130: send(sess->sess_sock, msg.msg_base, ret, MSG_NOSIGNAL);
131: free(msg.msg_base);
1.2 misho 132:
133: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
1.2.2.9 misho 134: finiSession(sess);
1.2 misho 135:
136: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
137: sess->sess_addr, sess->sess_user);
138: }
139:
140: static void *
141: thrSession(struct tagSession *sess)
142: {
143: int ret, locKill = 42;
144: struct pollfd pfd;
145: struct mqtthdr *hdr;
146: ait_val_t *v;
147:
148: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
149: ioTRACE(2);
150:
151: pfd.fd = sess->sess_sock;
152: pfd.events = POLLIN | POLLPRI;
153: while (!Kill && locKill) {
154: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
155: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
156: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
157: break;
1.2.2.15 misho 158: } else if (!ret && (ret = mqtt_KeepAlive(sess->sess_sock, sess->sess_ka, 1))) {
1.2 misho 159: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
160: sess->sess_cid, sess->sess_addr, sess->sess_user);
161: break;
162: }
163: /* receive & decode packet */
164: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
165: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
166: break;
167: } else if (!ret) {
168: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
169: break;
170: } else
171: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
172:
173: /* dispatch message type */
1.2.2.3 misho 174: if (mqtt_srv_Dispatch(sess->sess_srv, sess))
175: ioLIBERR(mqtt);
1.2 misho 176: switch (hdr->mqtt_msg.type) {
177: case MQTT_TYPE_CONNECT:
178: ioDEBUG(5, "Exec CONNECT session");
179: if ((v = io_allocVar())) {
180: AIT_SET_STR(v, sess->sess_addr);
181: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
182: io_freeVar(v);
183: } else
184: ioLIBERR(mqtt);
1.2.2.13 misho 185:
186: SESS_LOCK;
187: TAILQ_REMOVE(&Sessions, sess, sess_node);
188: SESS_UNLOCK;
189:
190: locKill ^= locKill;
191: break;
1.2 misho 192: case MQTT_TYPE_DISCONNECT:
1.2.2.16! misho 193: /*
1.2 misho 194: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.13 misho 195: SESS_LOCK;
196: TAILQ_REMOVE(&Sessions, sess, sess_node);
197: SESS_UNLOCK;
1.2.2.16! misho 198: */
1.2.2.13 misho 199:
1.2.2.14 misho 200: finiSession(sess);
1.2.2.13 misho 201: locKill ^= locKill;
1.2 misho 202: continue;
203: case MQTT_TYPE_PUBLISH:
204: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
1.2.2.3 misho 205: /*
206: if (cmdPUBLISH(sess))
1.2 misho 207: locKill ^= locKill;
1.2.2.3 misho 208: */
1.2 misho 209: break;
210: case MQTT_TYPE_PUBREL:
211: break;
212: case MQTT_TYPE_SUBSCRIBE:
213: break;
214: case MQTT_TYPE_UNSUBSCRIBE:
215: break;
216: case MQTT_TYPE_PINGREQ:
1.2.2.4 misho 217: ioDEBUG(5, "Exec PINGREQ session");
1.2 misho 218: break;
1.2.2.11 misho 219: case MQTT_TYPE_PINGRESP:
220: ioDEBUG(5, "Exec PINGRESP session");
221: break;
1.2 misho 222: default:
223: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
224: sess->sess_cid, hdr->mqtt_msg.type);
225: break;
226: }
227: }
228:
229: pthread_cleanup_pop(locKill);
230: pthread_exit(NULL);
1.2.2.11 misho 231: return NULL;
1.2 misho 232: }
233:
234: static void *
235: startSession(sched_task_t *task)
236: {
237: u_char basebuf[USHRT_MAX];
238: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
239: mqtthdr_connflgs_t flg;
240: mqtthdr_connack_t cack;
241: struct tagSession *s, *sess = NULL;
242: int ret;
243:
244: ioTRACE(4);
245:
246: assert(task);
247:
248: if (!TASK_DATA(task)) {
1.2.2.8 misho 249: /* flow from accept new clients */
1.2 misho 250: sess = initSession(TASK_FD(task), TASK_ARG(task));
251: if (!sess) {
252: io_freeVar(TASK_ARG(task));
253: close(TASK_FD(task));
254: return NULL;
255: }
256:
257: /* receive & decode packet */
258: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
259: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.2.2.9 misho 260: finiSession(sess);
1.2 misho 261: return NULL;
262: }
263: } else {
264: sess = TASK_DATA(task);
265: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
266: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
267: }
268:
269: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
270: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
271: &sess->sess_will.topic, &sess->sess_will.msg);
272: ret = cack.retcode;
273: flg.flags = cack.reserved;
274: if (flg.reserved) {
275: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
276: goto end;
277: } else {
278: sess->sess_clean = flg.clean_sess;
279: sess->sess_will.qos = flg.will_qos;
280: sess->sess_will.retain = flg.will_retain;
281: sess->sess_will.flag = flg.will_flg;
282: }
283:
284: /* check online table for user */
285: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
286: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
287: sess->sess_user, sess->sess_pass);
288: ret = MQTT_RETCODE_DENIED;
289: goto end;
290: } else {
291: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
292: ret = MQTT_RETCODE_ACCEPTED;
293: }
1.2.2.12 misho 294:
1.2 misho 295: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
296: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
297: TAILQ_FOREACH(s, &Sessions, sess_node)
298: if (!strcmp(s->sess_cid, sess->sess_cid)) {
299: /* found stale session & disconnect it! */
300: stopSession(s);
301: break;
302: }
303: }
1.2.2.12 misho 304:
1.2 misho 305: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
306: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
307: sess->sess_will.qos, sess->sess_will.retain) == -1) {
308: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
309: ret = MQTT_RETCODE_DENIED;
310: goto end;
311: } else {
312: ioDEBUG(0, "Session %s from %s and username %s is started",
313: sess->sess_cid, sess->sess_addr, sess->sess_user);
314: ret = MQTT_RETCODE_ACCEPTED;
315: }
316:
317: ret = mqtt_msgCONNACK(&msg, ret);
318: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
319: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.2.2.9 misho 320: finiSession(sess);
1.2 misho 321: return NULL;
322: } else {
323: ioDEBUG(5, "Sended %d bytes", ret);
324: free(msg.msg_base);
325: memset(&msg, 0, sizeof msg);
326: }
327:
328: /* Start session thread OK ... */
1.2.2.6 misho 329: SESS_LOCK;
1.2 misho 330: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
1.2.2.12 misho 331: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
1.2.2.6 misho 332: SESS_UNLOCK;
1.2 misho 333:
334: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
335: sess->sess_addr, sess->sess_user, sess->sess_ka);
336: return NULL;
337: end: /* close client connection */
338: ret = mqtt_msgCONNACK(&msg, ret);
339: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
340: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
341: } else {
342: ioDEBUG(5, "Sended %d bytes", ret);
343: free(msg.msg_base);
344: memset(&msg, 0, sizeof msg);
345: }
346:
347: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
1.2.2.9 misho 348: finiSession(sess);
1.2 misho 349: return NULL;
350: }
351:
352: static void *
1.2.2.9 misho 353: acceptClient(sched_task_t *task)
1.2 misho 354: {
1.2.2.9 misho 355: int cli;
356: io_sockaddr_t sa;
357: socklen_t sslen = sizeof sa.ss;
358: ait_val_t *v;
359: char str[STRSIZ];
1.2 misho 360:
1.2.2.9 misho 361: ioTRACE(4);
1.2 misho 362:
1.2.2.9 misho 363: assert(task);
1.2 misho 364:
1.2.2.9 misho 365: if ((cli = accept(TASK_FD(task), &sa.sa, &sslen)) == -1)
366: goto end;
367: else
368: fcntl(TASK_FD(task), F_SETFL, fcntl(TASK_FD(task), F_GETFL, 0) | O_NONBLOCK);
369:
370: v = io_allocVar();
371: if (!v) {
372: ioLIBERR(io);
373: close(cli);
374: goto end;
375: } else {
376: memset(str, 0, sizeof str);
377: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
378: AIT_SET_STR(v, str);
379: }
380: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
381:
382: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
383: io_freeVar(v);
384: close(cli);
385: ioDEBUG(1, "Terminated client with socket=%d", cli);
386: }
387: end:
1.2.2.11 misho 388: if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0))
389: ioLIBERR(sched);
1.2.2.9 misho 390: return NULL;
1.2 misho 391: }
392:
1.2.2.9 misho 393: /* ----------------------------------------------------------------------- */
394:
1.2 misho 395: int
396: Run(int sock)
397: {
1.2.2.9 misho 398: struct tagPub *pub;
399: struct timespec pl = { 0, 100000000 };
1.2 misho 400:
401: ioTRACE(1);
402:
1.2.2.9 misho 403: if (listen(sock, SOMAXCONN) == -1) {
1.2 misho 404: ioSYSERR(0);
405: return -1;
406: } else
1.2.2.9 misho 407: fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
1.2 misho 408:
1.2.2.9 misho 409: /* state machine - accept new connections */
410: if (!schedRead(root, acceptClient, NULL, sock, NULL, 0)) {
411: ioLIBERR(sched);
1.2 misho 412: return -1;
413: }
414:
1.2.2.10 misho 415: pthread_attr_init(&attr);
416: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
417:
1.2.2.9 misho 418: schedPolling(root, &pl, NULL);
419: schedRun(root, &Kill);
1.2 misho 420:
1.2.2.10 misho 421: pthread_attr_destroy(&attr);
422:
1.2.2.9 misho 423: /* free all undeleted elements into lists */
424: PUBS_LOCK;
425: TAILQ_FOREACH(pub, &Pubs, pub_node) {
426: TAILQ_REMOVE(&Pubs, pub, pub_node);
427:
428: AIT_FREE_VAL(&pub->pub_name);
429: if (pub->pub_packet.msg_base)
430: free(pub->pub_packet.msg_base);
1.2 misho 431: }
1.2.2.9 misho 432: PUBS_UNLOCK;
1.2 misho 433: return 0;
434: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>