File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / daemon.c
Revision 1.1.2.19: download - view: text, annotated - select for diffs - revision graph
Wed Dec 14 13:15:35 2011 UTC (12 years, 6 months ago) by misho
Branches: mqtt1_0
fix typecast

    1: #include "global.h"
    2: #include "rtlm.h"
    3: #include "mqttd.h"
    4: 
    5: 
    6: static inline struct tagSession *
    7: initSession(int sock, ait_val_t * __restrict v)
    8: {
    9: 	struct tagSession *sess = NULL;
   10: 	const char *str;
   11: 
   12: 	FTRACE(5);
   13: 
   14: 	if (!v)
   15: 		return NULL;
   16: 
   17: 	sess = malloc(sizeof(struct tagSession));
   18: 	if (!sess) {
   19: 		VERB(3) syslog(LOG_ERR, "Error:: in malloc #%d - %s", errno, strerror(errno));
   20: 		io_freeVar(v);
   21: 		return NULL;
   22: 	} else
   23: 		memset(sess, 0, sizeof(struct tagSession));
   24: 
   25: 	str = (const char*) cfg_GetAttribute(&cfg, CFG("mqttd"), CFG("retry"));
   26: 	if (!str)
   27: 		sess->sess_retry = DEFAULT_RETRY;
   28: 	else
   29: 		sess->sess_retry = strtol(str, NULL, 0);
   30: 
   31: 	sess->sess_buf = mqtt_msgAlloc(0);
   32: 	if (!sess->sess_buf) {
   33: 		VERB(3) syslog(LOG_ERR, "Error:: in msgAlloc #%d - %s", mqtt_GetErrno(), mqtt_GetError());
   34: 		free(sess);
   35: 		io_freeVar(v);
   36: 		return NULL;
   37: 	}
   38: 
   39: 	sess->sess_sock = sock;
   40: 	strlcpy(sess->sess_addr, (char*) AIT_GET_STR(v), sizeof sess->sess_addr);
   41: 	io_freeVar(v);
   42: 	return sess;
   43: }
   44: 
   45: static void
   46: finiSession(struct tagSession *sess)
   47: {
   48: 	register int i;
   49: 	struct tagStore *store;
   50: 
   51: 	FTRACE(5);
   52: 
   53: 	if (!sess)
   54: 		return;
   55: 
   56: 	for (i = 0; i < MQTT_QOS_RESERVED; i++)
   57: 		while ((store = SLIST_FIRST(&sess->sess_store[i]))) {
   58: 			SLIST_REMOVE_HEAD(&sess->sess_store[i], st_node);
   59: 			free(store);
   60: 		}
   61: 
   62: 	if (sess->sess_will.msg)
   63: 		free(sess->sess_will.msg);
   64: 	if (sess->sess_will.topic)
   65: 		free(sess->sess_will.topic);
   66: 
   67: 	if (sess->sess_sock > STDERR_FILENO) {
   68: 		shutdown(sess->sess_sock, SHUT_RDWR);
   69: 		close(sess->sess_sock);
   70: 	}
   71: 
   72: 	mqtt_msgFree(&sess->sess_buf, 42);
   73: 
   74: 	free(sess);
   75: }
   76: 
   77: static void *
   78: thrSession(struct tagSession *sess)
   79: {
   80: 	void thrClean(struct tagSession *sess)
   81: 	{
   82: 		pthread_mutex_lock(&mtx_sess);
   83: 		TAILQ_REMOVE(&Sessions, sess, sess_node);
   84: 		pthread_mutex_unlock(&mtx_sess);
   85: 		VERB(1) syslog(LOG_DEBUG, "Close socket=%d", sess->sess_sock);
   86: 		finiSession(sess);
   87: 	}
   88: 
   89: 	pthread_cleanup_push((void(*)(void*)) thrClean, sess);
   90: 
   91: 	FTRACE(2);
   92: 
   93: 	pthread_cleanup_pop(42);
   94: 	pthread_exit(NULL);
   95: }
   96: 
   97: static void *
   98: startSession(sched_task_t *task)
   99: {
  100: 	u_char basebuf[USHRT_MAX];
  101: 	mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf };
  102: 	mqtthdr_connflgs_t flg;
  103: 	mqtthdr_connack_t cack;
  104: 	struct tagSession *sess = NULL;
  105: 	int ret;
  106: 	pthread_attr_t attr;
  107: 
  108: 	FTRACE(4);
  109: 
  110: 	sess = initSession(TASK_FD(task), TASK_ARG(task));
  111: 	if (!sess) {
  112: 		io_freeVar(TASK_ARG(task));
  113: 		close(TASK_FD(task));
  114: 		return NULL;
  115: 	}
  116: 
  117: 	/* receive & decode packet */
  118: 	if (recv(sess->sess_sock, buf.msg_base, buf.msg_len, 0) == -1) {
  119: 		VERB(3) syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  120: 		finiSession(sess);
  121: 		return NULL;
  122: 	}
  123: 	cack = mqtt_readCONNECT(&buf, &sess->sess_ka, sess->sess_cid, sizeof sess->sess_cid, 
  124: 			sess->sess_user, sizeof sess->sess_user, sess->sess_pass, sizeof sess->sess_pass, 
  125: 			&sess->sess_will.topic, &sess->sess_will.msg);
  126: 	ret = cack.retcode;
  127: 	flg.flags = cack.reserved;
  128: 	if (flg.reserved) {
  129: 		VERB(3) syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
  130: 		goto end;
  131: 	} else {
  132: 		sess->sess_clean = flg.clean_sess;
  133: 		sess->sess_will.qos = flg.will_qos;
  134: 		sess->sess_will.retain = flg.will_retain;
  135: 		sess->sess_will.flag = flg.will_flg;
  136: 	}
  137: 
  138: 	/* check online table for user */
  139: 	if (call.LoginACC(&cfg, acc, sess->sess_user, sess->sess_pass) < 1) {
  140: 		VERB(1) syslog(LOG_WARNING, "Login:: DENIED for username %s and password %s", 
  141: 				sess->sess_user, sess->sess_pass);
  142: 		goto end;
  143: 	} else
  144: 		VERB(1) syslog(LOG_WARNING, "Login:: ALLOWED for username %s ...", sess->sess_user);
  145: 
  146: 	ret = mqtt_msgCONNACK(&msg, ret);
  147: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
  148: 		VERB(3) syslog(LOG_ERR, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  149: 		finiSession(sess);
  150: 		return NULL;
  151: 	} else {
  152: 		VERB(5) ioLOGGER(LOG_DEBUG, "Sended %d bytes", ret);
  153: 		free(msg.msg_base);
  154: 		msg.msg_len = 0;
  155: 	}
  156: 
  157: 	/* Start session thread OK ... */
  158: 	pthread_attr_init(&attr);
  159: 	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
  160: 
  161: 	pthread_mutex_lock(&mtx_sess);
  162: 	TAILQ_INSERT_TAIL(&Sessions, sess, sess_node);
  163: 	pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess);
  164: 	pthread_mutex_unlock(&mtx_sess);
  165: 
  166: 	VERB(1) ioLOGGER(LOG_DEBUG, "ConnID=%s(%s) for %s login OK!", 
  167: 			sess->sess_cid, sess->sess_user, sess->sess_addr);
  168: 
  169: 	pthread_attr_destroy(&attr);
  170: 	return NULL;
  171: end:	/* close client connection */
  172: 	ret = mqtt_msgCONNACK(&msg, ret);
  173: 	if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) {
  174: 		VERB(3) syslog(LOG_ERR, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno));
  175: 	} else {
  176: 		VERB(5) ioLOGGER(LOG_DEBUG, "Sended %d bytes", ret);
  177: 		free(msg.msg_base);
  178: 		msg.msg_len = 0;
  179: 	}
  180: 
  181: 	VERB(1) ioLOGGER(LOG_DEBUG, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock);
  182: 	finiSession(sess);
  183: 	return NULL;
  184: }
  185: 
  186: /* ----------------------------------------------------------------------- */
  187: 
  188: static void *
  189: thrSched(void *arg __unused)
  190: {
  191: 	FTRACE(1);
  192: 	struct tagSession *sess;
  193: 
  194: 	schedRun(root, (intptr_t*) &Kill);
  195: 
  196: 	TAILQ_FOREACH(sess, &Sessions, sess_node)
  197: 		if (sess->sess_tid)
  198: 			pthread_cancel(sess->sess_tid);
  199: 	pthread_exit(NULL);
  200: }
  201: 
  202: int
  203: Run(int sock)
  204: {
  205: 	io_sockaddr_t sa;
  206: 	socklen_t sslen = sizeof sa.ss;
  207: 	int cli;
  208: 	pthread_t tid;
  209: 	ait_val_t *v;
  210: 
  211: 	FTRACE(1);
  212: 
  213: 	if (pthread_create(&tid, NULL, thrSched, NULL)) {
  214: 		syslog(LOG_ERR, "Error:: thread scheduler #%d - %s", errno, strerror(errno));
  215: 		return -1;
  216: 	} else
  217: 		pthread_detach(tid);
  218: 	VERB(2) ioLOGGER(LOG_DEBUG, "Run scheduler management thread");
  219: 
  220: 	if (listen(sock, SOMAXCONN) == -1) {
  221: 		syslog(LOG_ERR, "Error:: listen(%d) #%d - %s\n", sock, errno, strerror(errno));
  222: 		pthread_cancel(tid);
  223: 		return -1;
  224: 	}
  225: 
  226: 	while (!Kill) {
  227: 		if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
  228: 			syslog(LOG_ERR, "Error:: accept() #%d - %s", errno, strerror(errno));
  229: 			continue;
  230: 		}
  231: 		v = io_allocVar();
  232: 		if (!v) {
  233: 			ioLIBERR(mqtt);
  234: 			break;
  235: 		} else
  236: 			io_n2addr(&sa, v);
  237: 		VERB(1) ioLOGGER(LOG_DEBUG, "Connected client with socket=%d from %s:%d", cli, 
  238: 				AIT_GET_STR(v), io_n2port(&sa));
  239: 
  240: 		if (!schedRead(root, startSession, v, cli)) {
  241: 			io_freeVar(v);
  242: 			close(cli);
  243: 			VERB(1) ioLOGGER(LOG_DEBUG, "Terminated client with socket=%d", cli);
  244: 		}
  245: 	}
  246: 
  247: 	return 0;
  248: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>