Annotation of mqtt/src/daemon.c, revision 1.2.2.15
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "utils.h"
4: #include "mqttd.h"
5: #include "mqttd_calls.h"
6:
7:
8: static void *startSession(sched_task_t *task);
1.2.2.10 misho 9: static pthread_attr_t attr;
1.2 misho 10:
11:
12: static inline struct tagSession *
13: initSession(int sock, ait_val_t * __restrict v)
14: {
15: struct tagSession *sess = NULL;
16: const char *str;
17:
18: ioTRACE(5);
19:
20: if (!v)
21: return NULL;
22:
23: sess = malloc(sizeof(struct tagSession));
24: if (!sess) {
1.2.2.9 misho 25: ioSYSERR(0);
1.2 misho 26: io_freeVar(v);
27: return NULL;
28: } else
29: memset(sess, 0, sizeof(struct tagSession));
30:
1.2.2.4 misho 31: pthread_mutex_init(&sess->sess_mtx, NULL);
32:
1.2.2.9 misho 33: SLIST_INIT(&sess->sess_subscr);
1.2 misho 34:
1.2.2.5 misho 35: str = cfg_getAttribute(&cfg, "mqttd", "retry");
1.2 misho 36: if (!str)
37: sess->sess_retry = DEFAULT_RETRY;
38: else
39: sess->sess_retry = strtol(str, NULL, 0);
40:
41: sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
42: if (!sess->sess_buf) {
1.2.2.9 misho 43: ioLIBERR(mqtt);
1.2 misho 44: free(sess);
45: io_freeVar(v);
46: return NULL;
47: }
48:
1.2.2.8 misho 49: /* init server actor */
1.2.2.2 misho 50: sess->sess_srv = mqtt_srv_Init(sock, sess->sess_buf);
51: if (!sess->sess_srv) {
52: ioDEBUG(3, "Error:: in srv_Init #%d - %s", mqtt_GetErrno(), mqtt_GetError());
53: mqtt_msgFree(&sess->sess_buf, 42);
54: free(sess);
55: io_freeVar(v);
56: return NULL;
1.2.2.3 misho 57: } else {
58: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_CONNECT, cmdCONNECT);
59: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBLISH, cmdPUBLISH);
60: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PUBREL, cmdPUBREL);
61: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_SUBSCRIBE, cmdSUBSCRIBE);
62: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_UNSUBSCRIBE, cmdUNSUBSCRIBE);
63: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_PINGREQ, cmdPINGREQ);
64: mqtt_srv_setCmd(sess->sess_srv, MQTT_TYPE_DISCONNECT, cmdDISCONNECT);
1.2.2.2 misho 65: }
66:
1.2 misho 67: sess->sess_sock = sock;
68: strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
69: io_freeVar(v);
70: return sess;
71: }
72:
73: static void
1.2.2.9 misho 74: finiSession(struct tagSession *sess)
1.2 misho 75: {
76: struct tagStore *store;
77:
78: ioTRACE(5);
79:
80: if (!sess)
81: return;
82:
83: if (call.FiniSessPUB)
84: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
85:
1.2.2.6 misho 86: SESS_ELEM_LOCK(sess);
1.2 misho 87:
1.2.2.9 misho 88: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
89: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
90:
91: if (store->st_subscr.sub_topic.msg_base)
92: free(store->st_subscr.sub_topic.msg_base);
93: if (store->st_subscr.sub_value.msg_base)
94: free(store->st_subscr.sub_value.msg_base);
95:
96: free(store);
97: }
1.2.2.6 misho 98: SESS_ELEM_UNLOCK(sess);
1.2.2.4 misho 99: pthread_mutex_destroy(&sess->sess_mtx);
1.2 misho 100:
101: if (sess->sess_will.msg)
102: free(sess->sess_will.msg);
103: if (sess->sess_will.topic)
104: free(sess->sess_will.topic);
105:
1.2.2.9 misho 106: if (sess->sess_sock > STDERR_FILENO)
1.2 misho 107: srv_Close(sess->sess_sock);
108:
1.2.2.2 misho 109: mqtt_srv_Fini(&sess->sess_srv);
1.2 misho 110: mqtt_msgFree(&sess->sess_buf, 42);
111:
112: free(sess);
113: }
114:
115: static void
116: stopSession(struct tagSession *sess)
117: {
118: mqtt_msg_t msg = { NULL, 0 };
119: int ret;
120:
121: ioTRACE(4);
122:
123: assert(sess);
124:
1.2.2.6 misho 125: SESS_LOCK;
1.2 misho 126: TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.6 misho 127: SESS_UNLOCK;
1.2 misho 128:
129: ret = mqtt_msgDISCONNECT(&msg);
1.2.2.9 misho 130: send(sess->sess_sock, msg.msg_base, ret, MSG_NOSIGNAL);
131: free(msg.msg_base);
1.2 misho 132:
133: ioDEBUG(1, "Close socket=%d", sess->sess_sock);
1.2.2.9 misho 134: finiSession(sess);
1.2 misho 135:
136: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
137: sess->sess_addr, sess->sess_user);
138: }
139:
140: static void *
141: thrSession(struct tagSession *sess)
142: {
143: int ret, locKill = 42;
144: struct pollfd pfd;
145: struct mqtthdr *hdr;
146: ait_val_t *v;
147:
148: pthread_cleanup_push((void(*)(void*)) stopSession, sess);
149: ioTRACE(2);
150:
151: pfd.fd = sess->sess_sock;
152: pfd.events = POLLIN | POLLPRI;
153: while (!Kill && locKill) {
154: if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 ||
155: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
156: ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
157: break;
1.2.2.15! misho 158: } else if (!ret && (ret = mqtt_KeepAlive(sess->sess_sock, sess->sess_ka, 1))) {
1.2 misho 159: call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n",
160: sess->sess_cid, sess->sess_addr, sess->sess_user);
161: break;
162: }
163: /* receive & decode packet */
164: if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
165: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
166: break;
167: } else if (!ret) {
168: ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
169: break;
170: } else
171: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
172:
173: /* dispatch message type */
1.2.2.3 misho 174: if (mqtt_srv_Dispatch(sess->sess_srv, sess))
175: ioLIBERR(mqtt);
1.2 misho 176: switch (hdr->mqtt_msg.type) {
177: case MQTT_TYPE_CONNECT:
178: ioDEBUG(5, "Exec CONNECT session");
179: if ((v = io_allocVar())) {
180: AIT_SET_STR(v, sess->sess_addr);
181: if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
182: io_freeVar(v);
183: } else
184: ioLIBERR(mqtt);
1.2.2.13 misho 185:
186: SESS_LOCK;
187: TAILQ_REMOVE(&Sessions, sess, sess_node);
188: SESS_UNLOCK;
189:
190: locKill ^= locKill;
191: break;
1.2 misho 192: case MQTT_TYPE_DISCONNECT:
193: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.13 misho 194: SESS_LOCK;
195: TAILQ_REMOVE(&Sessions, sess, sess_node);
196: SESS_UNLOCK;
197:
1.2.2.14 misho 198: finiSession(sess);
1.2.2.13 misho 199: locKill ^= locKill;
1.2 misho 200: continue;
201: case MQTT_TYPE_PUBLISH:
202: ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos);
1.2.2.3 misho 203: /*
204: if (cmdPUBLISH(sess))
1.2 misho 205: locKill ^= locKill;
1.2.2.3 misho 206: */
1.2 misho 207: break;
208: case MQTT_TYPE_PUBREL:
209: break;
210: case MQTT_TYPE_SUBSCRIBE:
211: break;
212: case MQTT_TYPE_UNSUBSCRIBE:
213: break;
214: case MQTT_TYPE_PINGREQ:
1.2.2.4 misho 215: ioDEBUG(5, "Exec PINGREQ session");
1.2 misho 216: break;
1.2.2.11 misho 217: case MQTT_TYPE_PINGRESP:
218: ioDEBUG(5, "Exec PINGRESP session");
219: break;
1.2 misho 220: default:
221: ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED",
222: sess->sess_cid, hdr->mqtt_msg.type);
223: break;
224: }
225: }
226:
227: pthread_cleanup_pop(locKill);
228: pthread_exit(NULL);
1.2.2.11 misho 229: return NULL;
1.2 misho 230: }
231:
232: static void *
233: startSession(sched_task_t *task)
234: {
235: u_char basebuf[USHRT_MAX];
236: mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
237: mqtthdr_connflgs_t flg;
238: mqtthdr_connack_t cack;
239: struct tagSession *s, *sess = NULL;
240: int ret;
241:
242: ioTRACE(4);
243:
244: assert(task);
245:
246: if (!TASK_DATA(task)) {
1.2.2.8 misho 247: /* flow from accept new clients */
1.2 misho 248: sess = initSession(TASK_FD(task), TASK_ARG(task));
249: if (!sess) {
250: io_freeVar(TASK_ARG(task));
251: close(TASK_FD(task));
252: return NULL;
253: }
254:
255: /* receive & decode packet */
256: if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
257: ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.2.2.9 misho 258: finiSession(sess);
1.2 misho 259: return NULL;
260: }
261: } else {
262: sess = TASK_DATA(task);
263: buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
264: memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
265: }
266:
267: cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid,
268: sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass,
269: &sess->sess_will.topic, &sess->sess_will.msg);
270: ret = cack.retcode;
271: flg.flags = cack.reserved;
272: if (flg.reserved) {
273: ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
274: goto end;
275: } else {
276: sess->sess_clean = flg.clean_sess;
277: sess->sess_will.qos = flg.will_qos;
278: sess->sess_will.retain = flg.will_retain;
279: sess->sess_will.flag = flg.will_flg;
280: }
281:
282: /* check online table for user */
283: if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
284: ioDEBUG(0, "Login:: DENIED for username %s and password %s",
285: sess->sess_user, sess->sess_pass);
286: ret = MQTT_RETCODE_DENIED;
287: goto end;
288: } else {
289: ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
290: ret = MQTT_RETCODE_ACCEPTED;
291: }
1.2.2.12 misho 292:
1.2 misho 293: if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
294: ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
295: TAILQ_FOREACH(s, &Sessions, sess_node)
296: if (!strcmp(s->sess_cid, sess->sess_cid)) {
297: /* found stale session & disconnect it! */
298: stopSession(s);
299: break;
300: }
301: }
1.2.2.12 misho 302:
1.2 misho 303: if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr,
304: sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg,
305: sess->sess_will.qos, sess->sess_will.retain) == -1) {
306: ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
307: ret = MQTT_RETCODE_DENIED;
308: goto end;
309: } else {
310: ioDEBUG(0, "Session %s from %s and username %s is started",
311: sess->sess_cid, sess->sess_addr, sess->sess_user);
312: ret = MQTT_RETCODE_ACCEPTED;
313: }
314:
315: ret = mqtt_msgCONNACK(&msg, ret);
316: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
317: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
1.2.2.9 misho 318: finiSession(sess);
1.2 misho 319: return NULL;
320: } else {
321: ioDEBUG(5, "Sended %d bytes", ret);
322: free(msg.msg_base);
323: memset(&msg, 0, sizeof msg);
324: }
325:
326: /* Start session thread OK ... */
1.2.2.6 misho 327: SESS_LOCK;
1.2 misho 328: TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
1.2.2.12 misho 329: pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
1.2.2.6 misho 330: SESS_UNLOCK;
1.2 misho 331:
332: call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid,
333: sess->sess_addr, sess->sess_user, sess->sess_ka);
334: return NULL;
335: end: /* close client connection */
336: ret = mqtt_msgCONNACK(&msg, ret);
337: if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
338: ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
339: } else {
340: ioDEBUG(5, "Sended %d bytes", ret);
341: free(msg.msg_base);
342: memset(&msg, 0, sizeof msg);
343: }
344:
345: ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
1.2.2.9 misho 346: finiSession(sess);
1.2 misho 347: return NULL;
348: }
349:
350: static void *
1.2.2.9 misho 351: acceptClient(sched_task_t *task)
1.2 misho 352: {
1.2.2.9 misho 353: int cli;
354: io_sockaddr_t sa;
355: socklen_t sslen = sizeof sa.ss;
356: ait_val_t *v;
357: char str[STRSIZ];
1.2 misho 358:
1.2.2.9 misho 359: ioTRACE(4);
1.2 misho 360:
1.2.2.9 misho 361: assert(task);
1.2 misho 362:
1.2.2.9 misho 363: if ((cli = accept(TASK_FD(task), &sa.sa, &sslen)) == -1)
364: goto end;
365: else
366: fcntl(TASK_FD(task), F_SETFL, fcntl(TASK_FD(task), F_GETFL, 0) | O_NONBLOCK);
367:
368: v = io_allocVar();
369: if (!v) {
370: ioLIBERR(io);
371: close(cli);
372: goto end;
373: } else {
374: memset(str, 0, sizeof str);
375: snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
376: AIT_SET_STR(v, str);
377: }
378: ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
379:
380: if (!schedRead(root, startSession, v, cli, NULL, 0)) {
381: io_freeVar(v);
382: close(cli);
383: ioDEBUG(1, "Terminated client with socket=%d", cli);
384: }
385: end:
1.2.2.11 misho 386: if (!schedRead(TASK_ROOT(task), acceptClient, NULL, TASK_FD(task), NULL, 0))
387: ioLIBERR(sched);
1.2.2.9 misho 388: return NULL;
1.2 misho 389: }
390:
1.2.2.9 misho 391: /* ----------------------------------------------------------------------- */
392:
1.2 misho 393: int
394: Run(int sock)
395: {
1.2.2.9 misho 396: struct tagPub *pub;
397: struct timespec pl = { 0, 100000000 };
1.2 misho 398:
399: ioTRACE(1);
400:
1.2.2.9 misho 401: if (listen(sock, SOMAXCONN) == -1) {
1.2 misho 402: ioSYSERR(0);
403: return -1;
404: } else
1.2.2.9 misho 405: fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK);
1.2 misho 406:
1.2.2.9 misho 407: /* state machine - accept new connections */
408: if (!schedRead(root, acceptClient, NULL, sock, NULL, 0)) {
409: ioLIBERR(sched);
1.2 misho 410: return -1;
411: }
412:
1.2.2.10 misho 413: pthread_attr_init(&attr);
414: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
415:
1.2.2.9 misho 416: schedPolling(root, &pl, NULL);
417: schedRun(root, &Kill);
1.2 misho 418:
1.2.2.10 misho 419: pthread_attr_destroy(&attr);
420:
1.2.2.9 misho 421: /* free all undeleted elements into lists */
422: PUBS_LOCK;
423: TAILQ_FOREACH(pub, &Pubs, pub_node) {
424: TAILQ_REMOVE(&Pubs, pub, pub_node);
425:
426: AIT_FREE_VAL(&pub->pub_name);
427: if (pub->pub_packet.msg_base)
428: free(pub->pub_packet.msg_base);
1.2 misho 429: }
1.2.2.9 misho 430: PUBS_UNLOCK;
1.2 misho 431: return 0;
432: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>