File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / mqttd_calls.c
Revision 1.2.2.16: download - view: text, annotated - select for diffs - revision graph
Tue May 8 14:36:10 2012 UTC (12 years, 1 month ago) by misho
Branches: mqtt1_1
added clean session support

    1: #include "global.h"
    2: #include "mqttd.h"
    3: #include "rtlm.h"
    4: #include "mqttd_calls.h"
    5: 
    6: 
    7: int
    8: cmdPUBLISH(void *srv, int len, void *arg)
    9: {
   10: 	struct mqtthdr *hdr;
   11: 	struct tagSession *sess = (struct tagSession*) arg;
   12: 
   13: 	ioTRACE(2);
   14: 
   15: 	if (!sess)
   16: 		return -1;
   17: 
   18: 	hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
   19: 	switch (hdr->mqtt_msg.qos) {
   20: 		case MQTT_QOS_ONCE:
   21: 			break;
   22: 		case MQTT_QOS_ACK:
   23: 			break;
   24: 		case MQTT_QOS_EXACTLY:
   25: 			break;
   26: 		default:
   27: 			ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request", 
   28: 					hdr->mqtt_msg.qos);
   29: 			return 0;
   30: 	}
   31: 
   32: 	return 0;
   33: }
   34: 
   35: int
   36: cmdPUBREL(void *srv, int len, void *arg)
   37: {
   38: 	struct mqtthdr *hdr;
   39: 	struct tagSession *sess = (struct tagSession*) arg;
   40: 
   41: 	ioTRACE(2);
   42: 
   43: 	if (!sess)
   44: 		return -1;
   45: 
   46: 	hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
   47: 
   48: 	return 0;
   49: }
   50: 
   51: int
   52: cmdSUBSCRIBE(void *srv, int len, void *arg)
   53: {
   54: 	struct tagSession *sess = (struct tagSession*) arg;
   55: 	mqtt_subscr_t *subs = NULL;
   56: 	int siz = 0;
   57: 	u_short mid = 0;
   58: 	register int i;
   59: 	struct tagStore *store;
   60: 
   61: 	ioTRACE(2);
   62: 
   63: 	if (!sess)
   64: 		return -1;
   65: 
   66: 	ioDEBUG(5, "Exec SUBSCRIBE session");
   67: 	siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
   68: 	if (siz == -1) {
   69: 		ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
   70: 		return 0;
   71: 	}
   72: 
   73: 	/* add to db */
   74: 	for (i = 0; i < siz; i++) {
   75: 		if (call.WritePUB_subscribe(&cfg, pub, sess->sess_cid, mid, subs[i].sub_topic.msg_base, 
   76: 				sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
   77: 			store = malloc(sizeof(struct tagStore));
   78: 			if (!store) {
   79: 				ioSYSERR(0);
   80: 				goto end;
   81: 			} else {
   82: 				store->st_msgid = mid;
   83: 				mqtt_subCopy(&store->st_subscr, &subs[i]);
   84: 			}
   85: 
   86: 			/* add to cache */
   87: 			SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
   88: 
   89: 			subs[i].sub_ret = MQTT_QOS_PASS;
   90: 		} else
   91: 			subs[i].sub_ret = MQTT_QOS_DENY;
   92: 	}
   93: 
   94: 	/* send acknowledge */
   95: 	siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
   96: 	if (siz == -1) {
   97: 		ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
   98: 		goto end;
   99: 	}
  100: 	if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
  101: 		ioSYSERR(0);
  102: 	else {
  103: 		ioDEBUG(5, "Sended %d bytes.", siz);
  104: 		memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
  105: 	}
  106: end:
  107: 	mqtt_subFree(&subs);
  108: 	return 0;
  109: }
  110: 
  111: int
  112: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
  113: {
  114: 	struct tagSession *sess = (struct tagSession*) arg;
  115: 	mqtt_subscr_t *subs = NULL;
  116: 	int siz = 0;
  117: 	u_short mid = 0;
  118: 	register int i;
  119: 	struct tagStore *store, *tmp;
  120: 
  121: 	ioTRACE(2);
  122: 
  123: 	if (!sess)
  124: 		return -1;
  125: 
  126: 	ioDEBUG(5, "Exec UNSUBSCRIBE session");
  127: 	siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
  128: 	if (siz == -1) {
  129: 		ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
  130: 		return 0;
  131: 	}
  132: 
  133: 	/* del from db */
  134: 	for (i = 0; i < siz; i++) {
  135: 		SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
  136: 			if (store->st_subscr.sub_ret == subs[i].sub_ret && 
  137: 					store->st_subscr.sub_topic.msg_base && 
  138: 					!strcmp(store->st_subscr.sub_topic.msg_base, 
  139: 						subs[i].sub_topic.msg_base)) {
  140: 				SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
  141: 
  142: 				if (store->st_subscr.sub_topic.msg_base)
  143: 					free(store->st_subscr.sub_topic.msg_base);
  144: 				if (store->st_subscr.sub_value.msg_base)
  145: 					free(store->st_subscr.sub_value.msg_base);
  146: 				free(store);
  147: 			}
  148: 		}
  149: 
  150: 		call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, subs[i].sub_topic.msg_base, 
  151: 				sess->sess_user, "%");
  152: 	}
  153: 
  154: 	/* send acknowledge */
  155: 	siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
  156: 	if (siz == -1) {
  157: 		ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
  158: 		goto end;
  159: 	}
  160: 	if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
  161: 		ioSYSERR(0);
  162: 	else {
  163: 		ioDEBUG(5, "Sended %d bytes.", siz);
  164: 		memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
  165: 	}
  166: end:
  167: 	mqtt_subFree(&subs);
  168: 	return 0;
  169: }
  170: 
  171: int
  172: cmdPINGREQ(void *srv, int len, void *arg)
  173: {
  174: 	struct tagSession *sess = (struct tagSession*) arg;
  175: 	int siz = 0;
  176: 
  177: 	ioTRACE(2);
  178: 
  179: 	if (!sess)
  180: 		return -1;
  181: 
  182: 	ioDEBUG(5, "Exec PINGREQ session");
  183: 	siz = mqtt_msgPINGRESP(sess->sess_buf);
  184: 	if (siz == -1) {
  185: 		ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
  186: 		return 0;
  187: 	}
  188: 	if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
  189: 		ioSYSERR(0);
  190: 		return 0;
  191: 	} else {
  192: 		ioDEBUG(5, "Sended %d bytes.", siz);
  193: 		memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
  194: 	}
  195: 
  196: 	return 0;
  197: }
  198: 
  199: int
  200: cmdCONNECT(void *srv, int len, void *arg)
  201: {
  202: 	struct tagStore *store;
  203: 	struct tagSession *sess = (struct tagSession*) arg;
  204: 
  205: 	ioTRACE(2);
  206: 
  207: 	if (!sess)
  208: 		return -1;
  209: 
  210: 	ioDEBUG(5, "Exec CONNECT session");
  211: 	TAILQ_REMOVE(&Sessions, sess, sess_node);
  212: 
  213: 	if (sess->sess_clean) {
  214: 		if (call.FiniSessPUB)
  215: 			call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
  216: 		if (call.DeletePUB_subscribe)
  217: 			call.DeletePUB_subscribe(&cfg, pub, sess->sess_cid, "%", sess->sess_user, "%");
  218: 		if (call.WipePUB_topic)
  219: 			call.WipePUB_topic(&cfg, pub, sess->sess_cid, sess->sess_user, -1);
  220: 	}
  221: 
  222: 	while ((store = SLIST_FIRST(&sess->sess_subscr))) {
  223: 		SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
  224: 
  225: 		if (store->st_subscr.sub_topic.msg_base)
  226: 			free(store->st_subscr.sub_topic.msg_base);
  227: 		if (store->st_subscr.sub_value.msg_base)
  228: 			free(store->st_subscr.sub_value.msg_base);
  229: 
  230: 		free(store);
  231: 	}
  232: 
  233: 	if (sess->sess_will.msg)
  234: 		free(sess->sess_will.msg);
  235: 	if (sess->sess_will.topic)
  236: 		free(sess->sess_will.topic);
  237: 
  238: 	call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
  239: 			sess->sess_addr, sess->sess_user);
  240: 
  241: 	return -3;	/* reconnect client */
  242: }
  243: 
  244: int
  245: cmdDISCONNECT(void *srv, int len, void *arg)
  246: {
  247: 	struct tagSession *sess = (struct tagSession*) arg;
  248: 
  249: 	ioTRACE(2);
  250: 
  251: 	if (!sess)
  252: 		return -1;
  253: 
  254: 	ioDEBUG(5, "Exec DISCONNECT session");
  255: 
  256: 	call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
  257: 			sess->sess_addr, sess->sess_user);
  258: 
  259: 	return -2;	/* must terminate dispatcher */
  260: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>