File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / daemon.c
Revision 1.1.2.31: download - view: text, annotated - select for diffs - revision graph
Tue Jan 24 16:28:28 2012 UTC (12 years, 5 months ago) by misho
Branches: mqtt1_0
finish heavy part from managment
reconnect connected users

    1: #include "global.h"
    2: #include "rtlm.h"
    3: #include "utils.h"
    4: #include "mqttd.h"
    5: 
    6: 
    7: static void *startSession(sched_task_t *task);
    8: 
    9: 
   10: static inline struct tagSession *
   11: initSession(int sock, ait_val_t * __restrict v)
   12: {
   13: 	struct tagSession *sess = NULL;
   14: 	const char *str;
   15: 
   16: 	ioTRACE(5);
   17: 
   18: 	if (!v)
   19: 		return NULL;
   20: 
   21: 	sess = malloc(sizeof(struct tagSession));
   22: 	if (!sess) {
   23: 		ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
   24: 		io_freeVar(v);
   25: 		return NULL;
   26: 	} else
   27: 		memset(sess, 0, sizeof(struct tagSession));
   28: 
   29: 	str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
   30: 	if (!str)
   31: 		sess->sess_retry = DEFAULT_RETRY;
   32: 	else
   33: 		sess->sess_retry = strtol(str, NULL, 0);
   34: 
   35: 	sess->sess_buf = mqtt_msgAlloc(USHRT_MAX);
   36: 	if (!sess->sess_buf) {
   37: 		ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
   38: 		free(sess);
   39: 		io_freeVar(v);
   40: 		return NULL;
   41: 	}
   42: 
   43: 	sess->sess_sock = sock;
   44: 	strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
   45: 	io_freeVar(v);
   46: 	return sess;
   47: }
   48: 
   49: static void
   50: finiSession(struct tagSession *sess, int preservSock)
   51: {
   52: 	struct tagStore *store;
   53: 
   54: 	ioTRACE(5);
   55: 
   56: 	if (!sess)
   57: 		return;
   58: 
   59: 	if (call.FiniSessPUB)
   60: 		call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
   61: 
   62: 	while ((store = SLIST_FIRST(&sess->sess_sndqueue))) {
   63: 		SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node);
   64: 		free(store);
   65: 	}
   66: 
   67: 	if (sess->sess_will.msg)
   68: 		free(sess->sess_will.msg);
   69: 	if (sess->sess_will.topic)
   70: 		free(sess->sess_will.topic);
   71: 
   72: 	if (sess->sess_sock > STDERR_FILENO && !preservSock)
   73: 		srv_Close(sess->sess_sock);
   74: 
   75: 	mqtt_msgFree(&sess->sess_buf, 42);
   76: 
   77: 	free(sess);
   78: }
   79: 
   80: static void
   81: stopSession(struct tagSession *sess)
   82: {
   83: 	mqtt_msg_t msg = { NULL, 0 };
   84: 	int ret;
   85: 
   86: 	ioTRACE(4);
   87: 
   88: 	assert(sess);
   89: 
   90: 	pthread_mutex_lock(&mtx_sess);
   91: 	TAILQ_REMOVE(&Sessions, sess, sess_node);
   92: 	pthread_mutex_unlock(&mtx_sess);
   93: 
   94: 	ret = mqtt_msgDISCONNECT(&msg);
   95: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
   96: 		ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
   97: 	else {
   98: 		ioDEBUG(5, "Sended %d bytes for disconnect", ret);
   99: 		free(msg.msg_base);
  100: 		memset(&msg, 0, sizeof msg);
  101: 	}
  102: 
  103: 	ioDEBUG(1, "Close socket=%d", sess->sess_sock);
  104: 	finiSession(sess, 0);
  105: 
  106: 	call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
  107: 			sess->sess_addr, sess->sess_user);
  108: }
  109: 
  110: static int
  111: KASession(struct tagSession *sess)
  112: {
  113: 	mqtt_msg_t msg = { NULL, 0 };
  114: 	int ret;
  115: 	struct pollfd pfd;
  116: 
  117: 	ioTRACE(4);
  118: 
  119: 	assert(sess);
  120: 
  121: 	/* ping request */
  122: 	ret = mqtt_msgPINGREQ(&msg);
  123: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
  124: 		ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  125: 		return -1;
  126: 	} else {
  127: 		ioDEBUG(5, "Sended %d bytes for ping request", ret);
  128: 		free(msg.msg_base);
  129: 		memset(&msg, 0, sizeof msg);
  130: 	}
  131: 
  132: 	pfd.fd = sess->sess_sock;
  133: 	pfd.events = POLLIN | POLLPRI;
  134: 	if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 || 
  135: 			pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
  136: 		ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  137: 		return -1;
  138: 	} else if (!ret) {
  139: 		ioDEBUG(5, "Warning:: Session is abandoned ... must be disconnect!");
  140: 		return 1;
  141: 	}
  142: 	/* receive & decode packet */
  143: 	if (recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0) == -1) {
  144: 		ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  145: 		return -1;
  146: 	}
  147: 	if (mqtt_readPINGRESP(sess->sess_buf)) {
  148: 		ioDEBUG(5, "Warning:: Session is broken, not hear ping response ... must be disconnect!");
  149: 		return 2;
  150: 	}
  151: 
  152: 	/* Keep Alive is OK! */
  153: 	return 0;
  154: }
  155: 
  156: static void *
  157: thrSession(struct tagSession *sess)
  158: {
  159: 	mqtt_msg_t msg = { NULL, 0 };
  160: 	int ret, locKill = 42;
  161: 	struct pollfd pfd;
  162: 	struct mqtthdr *hdr;
  163: 	ait_val_t *v;
  164: 	struct tagStore *store;
  165: 
  166: 	pthread_cleanup_push((void(*)(void*)) stopSession, sess);
  167: 	ioTRACE(2);
  168: 
  169: 	pfd.fd = sess->sess_sock;
  170: 	pfd.events = POLLIN | POLLPRI;
  171: 	while (!Kill && locKill) {
  172: 		if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 || 
  173: 				pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
  174: 			ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  175: 			break;
  176: 		} else if (!ret && (ret = KASession(sess))) {
  177: 			call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n", 
  178: 					sess->sess_cid, sess->sess_addr, sess->sess_user);
  179: 			break;
  180: 		}
  181: 		/* receive & decode packet */
  182: 		if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) {
  183: 			ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  184: 			break;
  185: 		} else if (!ret) {
  186: 			ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
  187: 			break;
  188: 		} else
  189: 			hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
  190: 
  191: 		/* dispatch message type */
  192: 		switch (hdr->mqtt_msg.type) {
  193: 			case MQTT_TYPE_CONNECT:
  194: 				ioDEBUG(5, "Exec CONNECT session");
  195: 				pthread_mutex_lock(&mtx_sess);
  196: 				TAILQ_REMOVE(&Sessions, sess, sess_node);
  197: 				pthread_mutex_unlock(&mtx_sess);
  198: 
  199: 				if (call.FiniSessPUB)
  200: 					call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
  201: 
  202: 				while ((store = SLIST_FIRST(&sess->sess_sndqueue))) {
  203: 					SLIST_REMOVE_HEAD(&sess->sess_sndqueue, st_node);
  204: 					free(store);
  205: 				}
  206: 
  207: 				if (sess->sess_will.msg)
  208: 					free(sess->sess_will.msg);
  209: 				if (sess->sess_will.topic)
  210: 					free(sess->sess_will.topic);
  211: 
  212: 				if ((v = io_allocVar())) {
  213: 					AIT_SET_STR(v, sess->sess_addr);
  214: 					if (!schedEvent(root, startSession, v, (u_long) sess->sess_sock, sess, ret))
  215: 						io_freeVar(v);
  216: 				} else
  217: 					ioLIBERR(mqtt);
  218: 
  219: 				locKill ^= locKill;
  220: 
  221: 				call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
  222: 						sess->sess_addr, sess->sess_user);
  223: 				continue;
  224: 			case MQTT_TYPE_DISCONNECT:
  225: 				ioDEBUG(5, "Exec DISCONNECT session");
  226: 				pthread_mutex_lock(&mtx_sess);
  227: 				TAILQ_REMOVE(&Sessions, sess, sess_node);
  228: 				pthread_mutex_unlock(&mtx_sess);
  229: 
  230: 				finiSession(sess, 0);
  231: 				locKill ^= locKill;
  232: 
  233: 				call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
  234: 						sess->sess_addr, sess->sess_user);
  235: 				continue;
  236: 			case MQTT_TYPE_PUBLISH:
  237: 				ioDEBUG(5, "Work in progress ...");
  238: 				break;
  239: 			case MQTT_TYPE_PUBREL:
  240: 				break;
  241: 			case MQTT_TYPE_SUBSCRIBE:
  242: 				break;
  243: 			case MQTT_TYPE_UNSUBSCRIBE:
  244: 				break;
  245: 			case MQTT_TYPE_PINGREQ:
  246: 				break;
  247: 			default:
  248: 				ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", 
  249: 						sess->sess_cid, hdr->mqtt_msg.type);
  250: 				break;
  251: 		}
  252: 	}
  253: 
  254: 	pthread_cleanup_pop(locKill);
  255: 	pthread_exit(NULL);
  256: }
  257: 
  258: static void *
  259: startSession(sched_task_t *task)
  260: {
  261: 	u_char basebuf[USHRT_MAX];
  262: 	mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
  263: 	mqtthdr_connflgs_t flg;
  264: 	mqtthdr_connack_t cack;
  265: 	struct tagSession *s, *sess = NULL;
  266: 	int ret;
  267: 	pthread_attr_t attr;
  268: 
  269: 	ioTRACE(4);
  270: 
  271: 	assert(task);
  272: 
  273: 	if (!TASK_DATA(task)) {
  274: 		sess = initSession(TASK_FD(task), TASK_ARG(task));
  275: 		if (!sess) {
  276: 			io_freeVar(TASK_ARG(task));
  277: 			close(TASK_FD(task));
  278: 			return NULL;
  279: 		}
  280: 
  281: 		/* receive & decode packet */
  282: 		if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
  283: 			ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  284: 			finiSession(sess, 0);
  285: 			return NULL;
  286: 		}
  287: 	} else {
  288: 		sess = TASK_DATA(task);
  289: 		buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task);
  290: 		memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len);
  291: 	}
  292: 
  293: 	cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid, 
  294: 			sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass, 
  295: 			&sess->sess_will.topic, &sess->sess_will.msg);
  296: 	ret = cack.retcode;
  297: 	flg.flags = cack.reserved;
  298: 	if (flg.reserved) {
  299: 		ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
  300: 		goto end;
  301: 	} else {
  302: 		sess->sess_clean = flg.clean_sess;
  303: 		sess->sess_will.qos = flg.will_qos;
  304: 		sess->sess_will.retain = flg.will_retain;
  305: 		sess->sess_will.flag = flg.will_flg;
  306: 	}
  307: 
  308: 	/* check online table for user */
  309: 	if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
  310: 		ioDEBUG(0, "Login:: DENIED for username %s and password %s", 
  311: 				sess->sess_user, sess->sess_pass);
  312: 		ret = MQTT_RETCODE_DENIED;
  313: 		goto end;
  314: 	} else {
  315: 		ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
  316: 		ret = MQTT_RETCODE_ACCEPTED;
  317: 	}
  318: 	if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
  319: 		ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
  320: 		TAILQ_FOREACH(s, &Sessions, sess_node)
  321: 			if (!strcmp(s->sess_cid, sess->sess_cid)) {
  322: 				/* found stale session & disconnect it! */
  323: 				stopSession(s);
  324: 				break;
  325: 			}
  326: 	}
  327: 	if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, 
  328: 				sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, 
  329: 				sess->sess_will.qos, sess->sess_will.retain) == -1) {
  330: 		ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
  331: 		ret = MQTT_RETCODE_DENIED;
  332: 		goto end;
  333: 	} else {
  334: 		ioDEBUG(0, "Session %s from %s and username %s is started", 
  335: 				sess->sess_cid, sess->sess_addr, sess->sess_user);
  336: 		ret = MQTT_RETCODE_ACCEPTED;
  337: 	}
  338: 
  339: 	ret = mqtt_msgCONNACK(&msg, ret);
  340: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
  341: 		ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  342: 		finiSession(sess, 0);
  343: 		return NULL;
  344: 	} else {
  345: 		ioDEBUG(5, "Sended %d bytes", ret);
  346: 		free(msg.msg_base);
  347: 		memset(&msg, 0, sizeof msg);
  348: 	}
  349: 
  350: 	/* Start session thread OK ... */
  351: 	pthread_attr_init(&attr);
  352: 	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  353: 
  354: 	pthread_mutex_lock(&mtx_sess);
  355: 	TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
  356: 	pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
  357: 	pthread_mutex_unlock(&mtx_sess);
  358: 
  359: 	call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, 
  360: 			sess->sess_addr, sess->sess_user, sess->sess_ka);
  361: 
  362: 	pthread_attr_destroy(&attr);
  363: 	return NULL;
  364: end:	/* close client connection */
  365: 	ret = mqtt_msgCONNACK(&msg, ret);
  366: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
  367: 		ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  368: 	} else {
  369: 		ioDEBUG(5, "Sended %d bytes", ret);
  370: 		free(msg.msg_base);
  371: 		memset(&msg, 0, sizeof msg);
  372: 	}
  373: 
  374: 	ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
  375: 	finiSession(sess, 0);
  376: 	return NULL;
  377: }
  378: 
  379: /* ----------------------------------------------------------------------- */
  380: 
  381: static void *
  382: thrSched(void *arg __unused)
  383: {
  384: 	struct tagSession *sess;
  385: 	struct timespec pl = { 0, 10000000 };
  386: 
  387: 	ioTRACE(1);
  388: 
  389: 	schedPolling(root, &pl, NULL);
  390: 	schedRun(root, &Kill);
  391: 
  392: 	TAILQ_FOREACH(sess, &Sessions, sess_node)
  393: 		if (sess->sess_tid)
  394: 			pthread_cancel(sess->sess_tid);
  395: 	ioDEBUG(5, "EXIT from Scheduler thread !!!");
  396: 	pthread_exit(NULL);
  397: }
  398: 
  399: int
  400: Run(int sock)
  401: {
  402: 	io_sockaddr_t sa;
  403: 	socklen_t sslen = sizeof sa.ss;
  404: 	int cli;
  405: 	pthread_t tid;
  406: 	ait_val_t *v;
  407: 	char str[STRSIZ];
  408: 
  409: 	ioTRACE(1);
  410: 
  411: 	if (pthread_create(&tid, NULL, thrSched, NULL)) {
  412: 		ioSYSERR(0);
  413: 		return -1;
  414: 	} else
  415: 		pthread_detach(tid);
  416: 	ioDEBUG(2, "Run scheduler management thread");
  417: 
  418: 	if (listen(sock, SOMAXCONN) == -1) {
  419: 		ioSYSERR(0);
  420: 		return -1;
  421: 	}
  422: 
  423: 	while (!Kill) {
  424: 		if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
  425: 			if (!Kill)
  426: 				continue;
  427: 			ioSYSERR(0);
  428: 			break;
  429: 		}
  430: 		v = io_allocVar();
  431: 		if (!v) {
  432: 			ioLIBERR(mqtt);
  433: 			break;
  434: 		} else {
  435: 			memset(str, 0, sizeof str);
  436: 			snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
  437: 			AIT_SET_STR(v, str);
  438: 		}
  439: 		ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
  440: 
  441: 		if (!schedRead(root, startSession, v, cli, NULL, 0)) {
  442: 			io_freeVar(v);
  443: 			close(cli);
  444: 			ioDEBUG(1, "Terminated client with socket=%d", cli);
  445: 		}
  446: 	}
  447: 
  448: 	return 0;
  449: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>