File:  [ELWIX - Embedded LightWeight unIX -] / libaitmqtt / src / sub.c
Revision 1.3: download - view: text, annotated - select for diffs - revision graph
Thu Jun 28 11:06:17 2012 UTC (11 years, 11 months ago) by misho
Branches: MAIN
CVS tags: mqtt1_8, mqtt1_7, mqtt1_6, mqtt1_5, mqtt1_4, mqtt1_3, MQTT1_7, MQTT1_6, MQTT1_5, MQTT1_4, MQTT1_3, MQTT1_2, HEAD
ver 1.2

    1: /*************************************************************************
    2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
    3: *  by Michael Pounov <misho@openbsd-bg.org>
    4: *
    5: * $Author: misho $
    6: * $Id: sub.c,v 1.3 2012/06/28 11:06:17 misho Exp $
    7: *
    8: **************************************************************************
    9: The ELWIX and AITNET software is distributed under the following
   10: terms:
   11: 
   12: All of the documentation and software included in the ELWIX and AITNET
   13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   14: 
   15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
   16: 	by Michael Pounov <misho@elwix.org>.  All rights reserved.
   17: 
   18: Redistribution and use in source and binary forms, with or without
   19: modification, are permitted provided that the following conditions
   20: are met:
   21: 1. Redistributions of source code must retain the above copyright
   22:    notice, this list of conditions and the following disclaimer.
   23: 2. Redistributions in binary form must reproduce the above copyright
   24:    notice, this list of conditions and the following disclaimer in the
   25:    documentation and/or other materials provided with the distribution.
   26: 3. All advertising materials mentioning features or use of this software
   27:    must display the following acknowledgement:
   28: This product includes software developed by Michael Pounov <misho@elwix.org>
   29: ELWIX - Embedded LightWeight unIX and its contributors.
   30: 4. Neither the name of AITNET nor the names of its contributors
   31:    may be used to endorse or promote products derived from this software
   32:    without specific prior written permission.
   33: 
   34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
   35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
   38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   44: SUCH DAMAGE.
   45: */
   46: #include "global.h"
   47: 
   48: 
   49: /*
   50:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
   51:  *
   52:  * @buf = Message buffer
   53:  * @Topics = MQTT subscription topics
   54:  * @msgID = MessageID
   55:  * @Dup = Duplicate message
   56:  * @QOS = QoS
   57:  * return: -1 error or >-1 message size for send
   58:  */
   59: int
   60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
   61: 		u_short msgID, u_char Dup, u_char QOS)
   62: {
   63: 	int len, siz = 0;
   64: 	u_int n, *l;
   65: 	struct mqtthdr *hdr;
   66: 	mqtthdr_var_t *topic;
   67: 	mqtt_len_t *mid;
   68: 	mqtt_subscr_t *t;
   69: 	u_char *qos;
   70: 	void *data;
   71: 
   72: 	if (!buf || !Topics)
   73: 		return -1;
   74: 	if (QOS > MQTT_QOS_EXACTLY) {
   75: 		mqtt_SetErr(EINVAL, "Invalid QoS parameter");
   76: 		return -1;
   77: 	}
   78: 	if (!msgID && QOS != MQTT_QOS_ONCE) {
   79: 		mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
   80: 		return -1;
   81: 	}
   82: 
   83: 	/* calculate message size */
   84: 	len = sizeof(mqtt_len_t);				/* msgid */
   85: 	for (t = Topics; t && t->sub_topic.msg_base; t++)	/* subscribes & qos */
   86: 		len += sizeof(mqtt_len_t) + t->sub_topic.msg_len + 1;
   87: 
   88: 	/* calculate header size */
   89: 	siz = sizeof(struct mqtthdr);				/* mqtt fixed header */
   90: 	n = mqtt_encodeLen(len);				/* message size */
   91: 	siz += mqtt_sizeLen(n) - 1;				/* length size */
   92: 
   93: 	if (mqtt_msgRealloc(buf, siz + len) == -1)
   94: 		return -1;
   95: 	else {
   96: 		data = buf->msg_base;
   97: 		hdr = (struct mqtthdr *) data;
   98: 	}
   99: 
  100: 	/* fixed header */
  101: 	MQTTHDR_MSGINIT(hdr);
  102: 	hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
  103: 	hdr->mqtt_msg.qos = QOS;
  104: 	hdr->mqtt_msg.dup = Dup ? 1 : 0;
  105: 	hdr->mqtt_msg.retain = 0;
  106: 	l = (u_int*) hdr->mqtt_len;
  107: 	*l = n;
  108: 	data += siz;
  109: 
  110: 	/* variable header */
  111: 	mid = (mqtt_len_t*) data;
  112: 	mid->val = htons(msgID);
  113: 	data += sizeof(mqtt_len_t);
  114: 
  115: 	/* payload with subscriptions */
  116: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
  117: 		topic = (mqtthdr_var_t*) data;
  118: 		topic->var_sb.val = htons(t->sub_topic.msg_len);
  119: 		memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
  120: 		data += MQTTHDR_VAR_SIZEOF(topic);
  121: 		qos = data++;
  122: 		*qos = t->sub_ret;
  123: 	}
  124: 
  125: 	return siz + len;
  126: }
  127: 
  128: /*
  129:  * mqtt_msgSUBACK() Create SUBACK message
  130:  *
  131:  * @buf = Message buffer
  132:  * @Topics = MQTT subscription topics
  133:  * @msgID = MessageID
  134:  * return: -1 error or >-1 message size for send
  135:  */
  136: int
  137: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
  138: {
  139: 	int siz = 0;
  140: 	struct mqtthdr *hdr;
  141: 	mqtt_len_t *v;
  142: 	mqtt_subscr_t *t;
  143: 	u_char *qos;
  144: 
  145: 	if (!buf || !Topics)
  146: 		return -1;
  147: 
  148: 	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
  149: 		return -1;
  150: 	else {
  151: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  152: 		siz += sizeof(struct mqtthdr);
  153: 		v = (mqtt_len_t*) (buf->msg_base + siz);
  154: 		siz += sizeof(mqtt_len_t);
  155: 	}
  156: 
  157: 	/* MessageID */
  158: 	v->val = htons(msgID);
  159: 
  160: 	/* QoS payload from subscriptions */
  161: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
  162: 		qos = (buf->msg_base + siz);
  163: 		*qos = t->sub_ret;
  164: 		siz++;
  165: 	}
  166: 
  167: 	/* fixed header */
  168: 	MQTTHDR_MSGINIT(hdr);
  169: 	hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
  170: 	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
  171: 
  172: 	return siz;
  173: }
  174: 
  175: /*
  176:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
  177:  *
  178:  * @buf = Message buffer
  179:  * @Topics = MQTT subscription topics
  180:  * @msgID = MessageID
  181:  * @Dup = Duplicate message
  182:  * @QOS = QoS
  183:  * return: -1 error or >-1 message size for send
  184:  */
  185: int
  186: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
  187: 		u_short msgID, u_char Dup, u_char QOS)
  188: {
  189: 	int len, siz = 0;
  190: 	u_int n, *l;
  191: 	struct mqtthdr *hdr;
  192: 	mqtthdr_var_t *topic;
  193: 	mqtt_len_t *mid;
  194: 	mqtt_subscr_t *t;
  195: 	void *data;
  196: 
  197: 	if (!buf || !Topics)
  198: 		return -1;
  199: 	if (QOS > MQTT_QOS_EXACTLY) {
  200: 		mqtt_SetErr(EINVAL, "Invalid QoS parameter");
  201: 		return -1;
  202: 	}
  203: 	if (!msgID && QOS != MQTT_QOS_ONCE) {
  204: 		mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
  205: 		return -1;
  206: 	}
  207: 
  208: 	/* calculate message size */
  209: 	len = sizeof(mqtt_len_t);				/* msgid */
  210: 	for (t = Topics; t && t->sub_topic.msg_base; t++)	/* subscribes */
  211: 		len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
  212: 
  213: 	/* calculate header size */
  214: 	siz = sizeof(struct mqtthdr);				/* mqtt fixed header */
  215: 	n = mqtt_encodeLen(len);				/* message size */
  216: 	siz += mqtt_sizeLen(n) - 1;				/* length size */
  217: 
  218: 	if (mqtt_msgRealloc(buf, siz + len) == -1)
  219: 		return -1;
  220: 	else {
  221: 		data = buf->msg_base;
  222: 		hdr = (struct mqtthdr *) data;
  223: 	}
  224: 
  225: 	/* fixed header */
  226: 	MQTTHDR_MSGINIT(hdr);
  227: 	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
  228: 	hdr->mqtt_msg.qos = QOS;
  229: 	hdr->mqtt_msg.dup = Dup ? 1 : 0;
  230: 	hdr->mqtt_msg.retain = 0;
  231: 	l = (u_int*) hdr->mqtt_len;
  232: 	*l = n;
  233: 	data += siz;
  234: 
  235: 	/* variable header */
  236: 	mid = (mqtt_len_t*) (buf->msg_base + siz);
  237: 	mid->val = htons(msgID);
  238: 	data += sizeof(mqtt_len_t);
  239: 
  240: 	/* payload with subscriptions */
  241: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
  242: 		topic = (mqtthdr_var_t*) data;
  243: 		topic->var_sb.val = htons(t->sub_topic.msg_len);
  244: 		memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
  245: 		data += MQTTHDR_VAR_SIZEOF(topic);
  246: 	}
  247: 
  248: 	return siz + len;
  249: }
  250: 
  251: /*
  252:  * mqtt_msgUNSUBACK() Create UNSUBACK message
  253:  *
  254:  * @buf = Message buffer
  255:  * @msgID = MessageID
  256:  * return: -1 error or >-1 message size for send
  257:  */
  258: int
  259: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
  260: {
  261: 	int siz = 0;
  262: 	struct mqtthdr *hdr;
  263: 	mqtt_len_t *v;
  264: 
  265: 	if (!buf)
  266: 		return -1;
  267: 
  268: 	if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
  269: 		return -1;
  270: 	else {
  271: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  272: 		siz += sizeof(struct mqtthdr);
  273: 		v = (mqtt_len_t*) (buf->msg_base + siz);
  274: 		siz += sizeof(mqtt_len_t);
  275: 	}
  276: 
  277: 	/* fixed header */
  278: 	MQTTHDR_MSGINIT(hdr);
  279: 	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
  280: 	*hdr->mqtt_len = sizeof(mqtt_len_t);
  281: 
  282: 	/* MessageID */
  283: 	v->val = htons(msgID);
  284: 
  285: 	return siz;
  286: }
  287: 
  288: 
  289: /* ============= decode ============ */
  290: 
  291: /*
  292:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
  293:  *
  294:  * @buf = Message buffer
  295:  * @msgID = MessageID
  296:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
  297:  * return: -1 error or >-1 elements into subscr
  298:  */
  299: int
  300: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
  301: {
  302: 	register int i;
  303: 	int len, ret;
  304: 	struct mqtthdr *hdr;
  305: 	mqtthdr_var_t *var;
  306: 	mqtt_subscr_t *subs;
  307: 	mqtt_len_t *v;
  308: 	caddr_t pos;
  309: 
  310: 	if (!buf || !msgID || !subscr)
  311: 		return -1;
  312: 
  313: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
  314: 	if (!hdr)
  315: 		return -1;
  316: 	pos = buf->msg_base + ret + 1;
  317: 	v = (mqtt_len_t*) pos;
  318: 
  319: 	/* MessageID */
  320: 	len -= sizeof(mqtt_len_t);
  321: 	if (len < 0) {
  322: 		mqtt_SetErr(EINVAL, "Short message length %d", len);
  323: 		return -1;
  324: 	} else {
  325: 		*msgID = ntohs(v->val);
  326: 		pos += sizeof(mqtt_len_t);
  327: 	}
  328: 
  329: 	subs = mqtt_subAlloc(0);
  330: 	if (!subs)
  331: 		return -1;
  332: 	else
  333: 		*subscr = subs;
  334: 
  335: 	/* Subscribes */
  336: 	for (i = 0; len > 0; i++) {
  337: 		var = (mqtthdr_var_t*) pos;
  338: 		len -= MQTTHDR_VAR_SIZEOF(var) + 1;
  339: 		if (len < 0) {
  340: 			mqtt_subFree(subscr);
  341: 			mqtt_SetErr(EINVAL, "Short message length %d", len);
  342: 			return -1;
  343: 		}
  344: 		if (!mqtt_subRealloc(&subs, i + 1)) {
  345: 			mqtt_subFree(subscr);
  346: 			return -1;
  347: 		} else
  348: 			*subscr = subs;
  349: 
  350: 		memset(&subs[i], 0, sizeof subs[i]);
  351: 		subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
  352: 		subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
  353: 		if (!subs[i].sub_topic.msg_base) {
  354: 			LOGERR;
  355: 			mqtt_subFree(subscr);
  356: 			return -1;
  357: 		} else {
  358: 			memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
  359: 			((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
  360: 		}
  361: 		pos += MQTTHDR_VAR_SIZEOF(var);
  362: 
  363: 		subs[i].sub_ret = *pos;
  364: 		pos++;
  365: 	}
  366: 
  367: 	return i;
  368: }
  369: 
  370: /*
  371:  * mqtt_readSUBACK() Read SUBACK message
  372:  *
  373:  * @buf = Message buffer
  374:  * @msgID = MessageID
  375:  * @subqos = Subscribes QoS, must be free after use with free()
  376:  * return: -1 error or >-1 readed subscribes QoS elements
  377:  */
  378: int
  379: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
  380: {
  381: 	int len, ret;
  382: 	struct mqtthdr *hdr;
  383: 	mqtt_len_t *v;
  384: 	caddr_t pos;
  385: 
  386: 	if (!buf || !msgID || !subqos)
  387: 		return -1;
  388: 
  389: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
  390: 	if (!hdr)
  391: 		return -1;
  392: 	pos = buf->msg_base + ret + 1;
  393: 	v = (mqtt_len_t*) pos;
  394: 
  395: 	/* MessageID */
  396: 	len -= sizeof(mqtt_len_t);
  397: 	if (len < 0) {
  398: 		mqtt_SetErr(EINVAL, "Short message length %d", len);
  399: 		return -1;
  400: 	} else {
  401: 		*msgID = ntohs(v->val);
  402: 		pos += sizeof(mqtt_len_t);
  403: 	}
  404: 
  405: 	/* Subscribes */
  406: 	*subqos = malloc(len);
  407: 	if (!*subqos) {
  408: 		LOGERR;
  409: 		return -1;
  410: 	} else
  411: 		memcpy(*subqos, pos, len);
  412: 
  413: 	return len;
  414: }
  415: 
  416: /*
  417:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
  418:  *
  419:  * @buf = Message buffer
  420:  * @msgID = MessageID
  421:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
  422:  * return: -1 error or >-1 elements into subscr
  423:  */
  424: int
  425: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
  426: {
  427: 	register int i;
  428: 	int len, ret;
  429: 	struct mqtthdr *hdr;
  430: 	mqtthdr_var_t *var;
  431: 	mqtt_subscr_t *subs;
  432: 	mqtt_len_t *v;
  433: 	caddr_t pos;
  434: 
  435: 	if (!buf || !msgID || !subscr)
  436: 		return -1;
  437: 
  438: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
  439: 	if (!hdr)
  440: 		return -1;
  441: 	pos = buf->msg_base + ret + 1;
  442: 	v = (mqtt_len_t*) pos;
  443: 
  444: 	/* MessageID */
  445: 	len -= sizeof(mqtt_len_t);
  446: 	if (len < 0) {
  447: 		mqtt_SetErr(EINVAL, "Short message length %d", len);
  448: 		return -1;
  449: 	} else {
  450: 		*msgID = ntohs(v->val);
  451: 		pos += sizeof(mqtt_len_t);
  452: 	}
  453: 
  454: 	subs = mqtt_subAlloc(0);
  455: 	if (!subs)
  456: 		return -1;
  457: 	else
  458: 		*subscr = subs;
  459: 
  460: 	/* Subscribes */
  461: 	for (i = 0; len > 0; i++) {
  462: 		var = (mqtthdr_var_t*) pos;
  463: 		len -= MQTTHDR_VAR_SIZEOF(var);
  464: 		if (len < 0) {
  465: 			mqtt_subFree(subscr);
  466: 			mqtt_SetErr(EINVAL, "Short message length %d", len);
  467: 			return -1;
  468: 		}
  469: 		if (!mqtt_subRealloc(&subs, i + 1)) {
  470: 			mqtt_subFree(subscr);
  471: 			return -1;
  472: 		} else
  473: 			*subscr = subs;
  474: 
  475: 		memset(&subs[i], 0, sizeof subs[i]);
  476: 		subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
  477: 		subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
  478: 		if (!subs[i].sub_topic.msg_base) {
  479: 			LOGERR;
  480: 			mqtt_subFree(subscr);
  481: 			return -1;
  482: 		} else {
  483: 			memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
  484: 			((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
  485: 		}
  486: 		pos += MQTTHDR_VAR_SIZEOF(var);
  487: 	}
  488: 
  489: 	return i;
  490: }
  491: 
  492: /*
  493:  * mqtt_readUNSUBACK() Read UNSUBACK message
  494:  *
  495:  * @buf = Message buffer
  496:  * return: -1 error or MessageID
  497:  */
  498: u_short
  499: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
  500: {
  501: 	int len, ret;
  502: 	struct mqtthdr *hdr;
  503: 	mqtt_len_t *v;
  504: 	caddr_t pos;
  505: 
  506: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
  507: 	if (!hdr)
  508: 		return (u_short) -1;
  509: 	if (len < sizeof(mqtt_len_t)) {
  510: 		mqtt_SetErr(EINVAL, "Short message length %d", len);
  511: 		return (u_short) -1;
  512: 	} else {
  513: 		pos = buf->msg_base + ret + 1;
  514: 		v = (mqtt_len_t*) pos;
  515: 	}
  516: 
  517: 	return ntohs(v->val);
  518: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>