File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / daemon.c
Revision 1.1.2.28: download - view: text, annotated - select for diffs - revision graph
Fri Dec 16 13:34:41 2011 UTC (12 years, 6 months ago) by misho
Branches: mqtt1_0
add base thread state machine

    1: #include "global.h"
    2: #include "rtlm.h"
    3: #include "utils.h"
    4: #include "mqttd.h"
    5: 
    6: 
    7: static inline struct tagSession *
    8: initSession(int sock, ait_val_t * __restrict v)
    9: {
   10: 	struct tagSession *sess = NULL;
   11: 	const char *str;
   12: 
   13: 	ioTRACE(5);
   14: 
   15: 	if (!v)
   16: 		return NULL;
   17: 
   18: 	sess = malloc(sizeof(struct tagSession));
   19: 	if (!sess) {
   20: 		ioDEBUG(3, "Error:: in malloc #%d - %s", errno, strerror(errno));
   21: 		io_freeVar(v);
   22: 		return NULL;
   23: 	} else
   24: 		memset(sess, 0, sizeof(struct tagSession));
   25: 
   26: 	str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
   27: 	if (!str)
   28: 		sess->sess_retry = DEFAULT_RETRY;
   29: 	else
   30: 		sess->sess_retry = strtol(str, NULL, 0);
   31: 
   32: 	sess->sess_buf = mqtt_msgAlloc(0);
   33: 	if (!sess->sess_buf) {
   34: 		ioDEBUG(3, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
   35: 		free(sess);
   36: 		io_freeVar(v);
   37: 		return NULL;
   38: 	}
   39: 
   40: 	sess->sess_sock = sock;
   41: 	strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
   42: 	io_freeVar(v);
   43: 	return sess;
   44: }
   45: 
   46: static void
   47: finiSession(struct tagSession *sess)
   48: {
   49: 	register int i;
   50: 	struct tagStore *store;
   51: 
   52: 	ioTRACE(5);
   53: 
   54: 	if (!sess)
   55: 		return;
   56: 
   57: 	for (i = 0; i < MQTT_QOS_RESERVED; i++)
   58: 		while ((store = SLIST_FIRST(&sess->sess_store[i]))) {
   59: 			SLIST_REMOVE_HEAD(&sess->sess_store[i], st_node);
   60: 			free(store);
   61: 		}
   62: 
   63: 	if (sess->sess_will.msg)
   64: 		free(sess->sess_will.msg);
   65: 	if (sess->sess_will.topic)
   66: 		free(sess->sess_will.topic);
   67: 
   68: 	if (sess->sess_sock > STDERR_FILENO)
   69: 		srv_Close(sess->sess_sock);
   70: 
   71: 	mqtt_msgFree(&sess->sess_buf, 42);
   72: 
   73: 	free(sess);
   74: }
   75: 
   76: static void
   77: stopSession(struct tagSession *sess)
   78: {
   79: 	mqtt_msg_t msg = { NULL, 0 };
   80: 	int ret;
   81: 
   82: 	ioTRACE(4);
   83: 
   84: 	assert(sess);
   85: 
   86: 	pthread_mutex_lock(&mtx_sess);
   87: 	TAILQ_REMOVE(&Sessions, sess, sess_node);
   88: 	pthread_mutex_unlock(&mtx_sess);
   89: 
   90: 	ret = mqtt_msgDISCONNECT(&msg);
   91: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1)
   92: 		ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
   93: 	else {
   94: 		ioDEBUG(5, "Sended %d bytes for disconnect", ret);
   95: 		free(msg.msg_base);
   96: 		memset(&msg, 0, sizeof msg);
   97: 	}
   98: 
   99: 	ioDEBUG(1, "Close socket=%d", sess->sess_sock);
  100: 	finiSession(sess);
  101: 
  102: 	call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
  103: 			sess->sess_addr, sess->sess_user);
  104: }
  105: 
  106: static int
  107: KASession(struct tagSession *sess)
  108: {
  109: 	u_char basebuf[USHRT_MAX];
  110: 	mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
  111: 	int ret;
  112: 	struct pollfd pfd;
  113: 
  114: 	ioTRACE(4);
  115: 
  116: 	assert(sess);
  117: 
  118: 	/* ping request */
  119: 	ret = mqtt_msgPINGREQ(&msg);
  120: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
  121: 		ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  122: 		return -1;
  123: 	} else {
  124: 		ioDEBUG(5, "Sended %d bytes for ping request", ret);
  125: 		free(msg.msg_base);
  126: 		memset(&msg, 0, sizeof msg);
  127: 	}
  128: 
  129: 	pfd.fd = sess->sess_sock;
  130: 	pfd.events = POLLIN | POLLPRI;
  131: 	if ((ret = poll(&pfd, 1, sess->sess_ka)) == -1 || 
  132: 			pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
  133: 		ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  134: 		return -1;
  135: 	} else if (!ret) {
  136: 		/* problem!!! session is abandoned ... must be disconnect! */
  137: 		return 1;
  138: 	}
  139: 	/* receive & decode packet */
  140: 	if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
  141: 		ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  142: 		return -1;
  143: 	}
  144: 	if (mqtt_readPINGRESP(&buf)) {
  145: 		/* problem!!! session is broken, not hear ping response ... must be disconnect! */
  146: 		return 2;
  147: 	}
  148: 
  149: 	/* Keep Alive is OK! */
  150: 	return 0;
  151: }
  152: 
  153: static void *
  154: thrSession(struct tagSession *sess)
  155: {
  156: 	u_char basebuf[USHRT_MAX];
  157: 	mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
  158: 	int ret;
  159: 	struct pollfd pfd;
  160: 	struct mqtthdr *hdr;
  161: 
  162: 	pthread_cleanup_push((void(*)(void*)) stopSession, sess);
  163: 	ioTRACE(2);
  164: 
  165: 	pfd.fd = sess->sess_sock;
  166: 	pfd.events = POLLIN | POLLPRI;
  167: 	while (!Kill) {
  168: 		if ((ret = poll(&pfd, 1, sess->sess_ka)) == -1 || 
  169: 				pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
  170: 			ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  171: 			break;
  172: 		} else if (!ret && (ret = KASession(sess))) {
  173: 			call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n", 
  174: 					sess->sess_cid, sess->sess_addr, sess->sess_user);
  175: 			break;
  176: 		}
  177: 		/* receive & decode packet */
  178: 		if ((ret = recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0)) == -1) {
  179: 			ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  180: 			break;
  181: 		} else if (!ret) {
  182: 			ioDEBUG(4, "Session %s EOF received.", sess->sess_cid);
  183: 			break;
  184: 		} else
  185: 			hdr = (struct mqtthdr*) buf.msg_base;
  186: 
  187: 		switch (hdr->mqtt_msg.type) {
  188: 			default:
  189: 				ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", 
  190: 						sess->sess_cid, hdr->mqtt_msg.type);
  191: 				break;
  192: 		}
  193: 	}
  194: 
  195: 	pthread_cleanup_pop(42);
  196: 	pthread_exit(NULL);
  197: }
  198: 
  199: static void *
  200: startSession(sched_task_t *task)
  201: {
  202: 	u_char basebuf[USHRT_MAX];
  203: 	mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
  204: 	mqtthdr_connflgs_t flg;
  205: 	mqtthdr_connack_t cack;
  206: 	struct tagSession *s, *sess = NULL;
  207: 	int ret;
  208: 	pthread_attr_t attr;
  209: 
  210: 	ioTRACE(4);
  211: 
  212: 	assert(task);
  213: 
  214: 	sess = initSession(TASK_FD(task), TASK_ARG(task));
  215: 	if (!sess) {
  216: 		io_freeVar(TASK_ARG(task));
  217: 		close(TASK_FD(task));
  218: 		return NULL;
  219: 	}
  220: 
  221: 	/* receive & decode packet */
  222: 	if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
  223: 		ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  224: 		finiSession(sess);
  225: 		return NULL;
  226: 	}
  227: 	cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid, 
  228: 			sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass, 
  229: 			&sess->sess_will.topic, &sess->sess_will.msg);
  230: 	ret = cack.retcode;
  231: 	flg.flags = cack.reserved;
  232: 	if (flg.reserved) {
  233: 		ioDEBUG(3, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
  234: 		goto end;
  235: 	} else {
  236: 		sess->sess_clean = flg.clean_sess;
  237: 		sess->sess_will.qos = flg.will_qos;
  238: 		sess->sess_will.retain = flg.will_retain;
  239: 		sess->sess_will.flag = flg.will_flg;
  240: 	}
  241: 
  242: 	/* check online table for user */
  243: 	if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
  244: 		ioDEBUG(0, "Login:: DENIED for username %s and password %s", 
  245: 				sess->sess_user, sess->sess_pass);
  246: 		ret = MQTT_RETCODE_DENIED;
  247: 		goto end;
  248: 	} else {
  249: 		ioDEBUG(1, "Login:: ALLOWED for username %s ...", sess->sess_user);
  250: 		ret = MQTT_RETCODE_ACCEPTED;
  251: 	}
  252: 	if (call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%") > 0) {
  253: 		ioDEBUG(2, "Old session %s should be disconnect!", sess->sess_cid);
  254: 		TAILQ_FOREACH(s, &Sessions, sess_node)
  255: 			if (!strcmp(s->sess_cid, sess->sess_cid)) {
  256: 				/* found stale session & disconnect it! */
  257: 				stopSession(s);
  258: 				break;
  259: 			}
  260: 	}
  261: 	if (call.InitSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, sess->sess_addr, 
  262: 				sess->sess_will.flag, sess->sess_will.topic, sess->sess_will.msg, 
  263: 				sess->sess_will.qos, sess->sess_will.retain) == -1) {
  264: 		ioDEBUG(0, "Session %s DENIED for username %s", sess->sess_cid, sess->sess_user);
  265: 		ret = MQTT_RETCODE_DENIED;
  266: 		goto end;
  267: 	} else {
  268: 		ioDEBUG(0, "Session %s from %s and username %s is started", 
  269: 				sess->sess_cid, sess->sess_addr, sess->sess_user);
  270: 		ret = MQTT_RETCODE_ACCEPTED;
  271: 	}
  272: 
  273: 	ret = mqtt_msgCONNACK(&msg, ret);
  274: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
  275: 		ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  276: 		finiSession(sess);
  277: 		return NULL;
  278: 	} else {
  279: 		ioDEBUG(5, "Sended %d bytes", ret);
  280: 		free(msg.msg_base);
  281: 		memset(&msg, 0, sizeof msg);
  282: 	}
  283: 
  284: 	/* Start session thread OK ... */
  285: 	pthread_attr_init(&attr);
  286: 	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  287: 
  288: 	pthread_mutex_lock(&mtx_sess);
  289: 	TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
  290: 	pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
  291: 	pthread_mutex_unlock(&mtx_sess);
  292: 
  293: 	call.LOG(logg, "Session %s started from %s for user %s OK!\n", sess->sess_cid, 
  294: 			sess->sess_addr, sess->sess_user);
  295: 
  296: 	pthread_attr_destroy(&attr);
  297: 	return NULL;
  298: end:	/* close client connection */
  299: 	ret = mqtt_msgCONNACK(&msg, ret);
  300: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
  301: 		ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  302: 	} else {
  303: 		ioDEBUG(5, "Sended %d bytes", ret);
  304: 		free(msg.msg_base);
  305: 		memset(&msg, 0, sizeof msg);
  306: 	}
  307: 
  308: 	ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
  309: 	finiSession(sess);
  310: 	return NULL;
  311: }
  312: 
  313: /* ----------------------------------------------------------------------- */
  314: 
  315: static void *
  316: thrSched(void *arg __unused)
  317: {
  318: 	struct tagSession *sess;
  319: 
  320: 	ioTRACE(1);
  321: 
  322: 	schedRun(root, &Kill);
  323: 
  324: 	TAILQ_FOREACH(sess, &Sessions, sess_node)
  325: 		if (sess->sess_tid)
  326: 			pthread_cancel(sess->sess_tid);
  327: 	pthread_exit(NULL);
  328: }
  329: 
  330: int
  331: Run(int sock)
  332: {
  333: 	io_sockaddr_t sa;
  334: 	socklen_t sslen = sizeof sa.ss;
  335: 	int cli;
  336: 	pthread_t tid;
  337: 	ait_val_t *v;
  338: 	char str[STRSIZ];
  339: 
  340: 	ioTRACE(1);
  341: 
  342: 	if (pthread_create(&tid, NULL, thrSched, NULL)) {
  343: 		ioLOGERR(0);
  344: 		return -1;
  345: 	} else
  346: 		pthread_detach(tid);
  347: 	ioDEBUG(2, "Run scheduler management thread");
  348: 
  349: 	if (listen(sock, SOMAXCONN) == -1) {
  350: 		ioLOGERR(0);
  351: 		return -1;
  352: 	}
  353: 
  354: 	while (!Kill) {
  355: 		if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
  356: 			if (!Kill)
  357: 				continue;
  358: 			ioLOGERR(0);
  359: 			break;
  360: 		}
  361: 		v = io_allocVar();
  362: 		if (!v) {
  363: 			ioLIBERR(mqtt);
  364: 			break;
  365: 		} else {
  366: 			memset(str, 0, sizeof str);
  367: 			snprintf(str, sizeof str, "%s:%hu", io_n2addr(&sa, v), io_n2port(&sa));
  368: 			AIT_SET_STR(v, str);
  369: 		}
  370: 		ioDEBUG(1, "Connected client with socket=%d from %s", cli, AIT_GET_STR(v));
  371: 
  372: 		if (!schedRead(root, startSession, v, cli)) {
  373: 			io_freeVar(v);
  374: 			close(cli);
  375: 			ioDEBUG(1, "Terminated client with socket=%d", cli);
  376: 		}
  377: 	}
  378: 
  379: 	return 0;
  380: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>