File:  [ELWIX - Embedded LightWeight unIX -] / libaitmqtt / src / sub.c
Revision 1.2: download - view: text, annotated - select for diffs - revision graph
Wed Jun 20 15:02:24 2012 UTC (12 years ago) by misho
Branches: MAIN
CVS tags: mqtt1_2, MQTT1_1, HEAD
version 1.1

    1: /*************************************************************************
    2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
    3: *  by Michael Pounov <misho@openbsd-bg.org>
    4: *
    5: * $Author: misho $
    6: * $Id: sub.c,v 1.2 2012/06/20 15:02:24 misho Exp $
    7: *
    8: **************************************************************************
    9: The ELWIX and AITNET software is distributed under the following
   10: terms:
   11: 
   12: All of the documentation and software included in the ELWIX and AITNET
   13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   14: 
   15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
   16: 	by Michael Pounov <misho@elwix.org>.  All rights reserved.
   17: 
   18: Redistribution and use in source and binary forms, with or without
   19: modification, are permitted provided that the following conditions
   20: are met:
   21: 1. Redistributions of source code must retain the above copyright
   22:    notice, this list of conditions and the following disclaimer.
   23: 2. Redistributions in binary form must reproduce the above copyright
   24:    notice, this list of conditions and the following disclaimer in the
   25:    documentation and/or other materials provided with the distribution.
   26: 3. All advertising materials mentioning features or use of this software
   27:    must display the following acknowledgement:
   28: This product includes software developed by Michael Pounov <misho@elwix.org>
   29: ELWIX - Embedded LightWeight unIX and its contributors.
   30: 4. Neither the name of AITNET nor the names of its contributors
   31:    may be used to endorse or promote products derived from this software
   32:    without specific prior written permission.
   33: 
   34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
   35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
   38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   44: SUCH DAMAGE.
   45: */
   46: #include "global.h"
   47: 
   48: 
   49: /*
   50:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
   51:  *
   52:  * @buf = Message buffer
   53:  * @Topics = MQTT subscription topics
   54:  * @msgID = MessageID
   55:  * @Dup = Duplicate message
   56:  * @QOS = QoS
   57:  * return: -1 error or >-1 message size for send
   58:  */
   59: int
   60: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
   61: 		u_short msgID, u_char Dup, u_char QOS)
   62: {
   63: 	int len, siz = 0;
   64: 	u_int n;
   65: 	struct mqtthdr *hdr;
   66: 	mqtthdr_var_t *topic;
   67: 	mqtt_len_t *mid;
   68: 	mqtt_subscr_t *t;
   69: 	u_char *qos;
   70: 	void *data;
   71: 
   72: 	if (!buf || !Topics)
   73: 		return -1;
   74: 	if (QOS > MQTT_QOS_EXACTLY) {
   75: 		mqtt_SetErr(EINVAL, "Invalid QoS parameter");
   76: 		return -1;
   77: 	}
   78: 	if (!msgID && QOS != MQTT_QOS_ONCE) {
   79: 		mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
   80: 		return -1;
   81: 	}
   82: 
   83: 	/* calculate message size */
   84: 	len = sizeof(mqtt_len_t);				/* msgid */
   85: 	for (t = Topics; t && t->sub_topic.msg_base; t++)	/* subscribes & qos */
   86: 		len += sizeof(mqtt_len_t) + t->sub_topic.msg_len + 1;
   87: 
   88: 	/* calculate header size */
   89: 	siz = sizeof(struct mqtthdr);				/* mqtt fixed header */
   90: 	n = mqtt_encodeLen(len);				/* message size */
   91: 	siz += mqtt_sizeLen(n) - 1;				/* length size */
   92: 
   93: 	if (mqtt_msgRealloc(buf, siz + len) == -1)
   94: 		return -1;
   95: 	else {
   96: 		data = buf->msg_base;
   97: 		hdr = (struct mqtthdr *) data;
   98: 	}
   99: 
  100: 	/* fixed header */
  101: 	MQTTHDR_MSGINIT(hdr);
  102: 	hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
  103: 	hdr->mqtt_msg.qos = QOS;
  104: 	hdr->mqtt_msg.dup = Dup ? 1 : 0;
  105: 	hdr->mqtt_msg.retain = 0;
  106: 	*(u_int*) hdr->mqtt_len = n;
  107: 	data += siz;
  108: 
  109: 	/* variable header */
  110: 	mid = (mqtt_len_t*) data;
  111: 	mid->val = htons(msgID);
  112: 	data += sizeof(mqtt_len_t);
  113: 
  114: 	/* payload with subscriptions */
  115: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
  116: 		topic = (mqtthdr_var_t*) data;
  117: 		topic->var_sb.val = htons(t->sub_topic.msg_len);
  118: 		memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
  119: 		data += MQTTHDR_VAR_SIZEOF(topic);
  120: 		qos = data++;
  121: 		*qos = t->sub_ret;
  122: 	}
  123: 
  124: 	return siz;
  125: }
  126: 
  127: /*
  128:  * mqtt_msgSUBACK() Create SUBACK message
  129:  *
  130:  * @buf = Message buffer
  131:  * @Topics = MQTT subscription topics
  132:  * @msgID = MessageID
  133:  * return: -1 error or >-1 message size for send
  134:  */
  135: int
  136: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
  137: {
  138: 	int siz = 0;
  139: 	struct mqtthdr *hdr;
  140: 	mqtt_len_t *v;
  141: 	mqtt_subscr_t *t;
  142: 	u_char *qos;
  143: 
  144: 	if (!buf || !Topics)
  145: 		return -1;
  146: 
  147: 	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
  148: 		return -1;
  149: 	else {
  150: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  151: 		siz += sizeof(struct mqtthdr);
  152: 		v = (mqtt_len_t*) (buf->msg_base + siz);
  153: 		siz += sizeof(mqtt_len_t);
  154: 	}
  155: 
  156: 	/* MessageID */
  157: 	v->val = htons(msgID);
  158: 
  159: 	/* QoS payload from subscriptions */
  160: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
  161: 		qos = (buf->msg_base + siz);
  162: 		*qos = t->sub_ret;
  163: 		siz++;
  164: 	}
  165: 
  166: 	/* fixed header */
  167: 	MQTTHDR_MSGINIT(hdr);
  168: 	hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
  169: 	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
  170: 
  171: 	return siz;
  172: }
  173: 
  174: /*
  175:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
  176:  *
  177:  * @buf = Message buffer
  178:  * @Topics = MQTT subscription topics
  179:  * @msgID = MessageID
  180:  * @Dup = Duplicate message
  181:  * @QOS = QoS
  182:  * return: -1 error or >-1 message size for send
  183:  */
  184: int
  185: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
  186: 		u_short msgID, u_char Dup, u_char QOS)
  187: {
  188: 	int len, siz = 0;
  189: 	u_int n;
  190: 	struct mqtthdr *hdr;
  191: 	mqtthdr_var_t *topic;
  192: 	mqtt_len_t *mid;
  193: 	mqtt_subscr_t *t;
  194: 	void *data;
  195: 
  196: 	if (!buf || !Topics)
  197: 		return -1;
  198: 	if (QOS > MQTT_QOS_EXACTLY) {
  199: 		mqtt_SetErr(EINVAL, "Invalid QoS parameter");
  200: 		return -1;
  201: 	}
  202: 	if (!msgID && QOS != MQTT_QOS_ONCE) {
  203: 		mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
  204: 		return -1;
  205: 	}
  206: 
  207: 	/* calculate message size */
  208: 	len = sizeof(mqtt_len_t);				/* msgid */
  209: 	for (t = Topics; t && t->sub_topic.msg_base; t++)	/* subscribes */
  210: 		len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
  211: 
  212: 	/* calculate header size */
  213: 	siz = sizeof(struct mqtthdr);				/* mqtt fixed header */
  214: 	n = mqtt_encodeLen(len);				/* message size */
  215: 	siz += mqtt_sizeLen(n) - 1;				/* length size */
  216: 
  217: 	if (mqtt_msgRealloc(buf, siz + len) == -1)
  218: 		return -1;
  219: 	else {
  220: 		data = buf->msg_base;
  221: 		hdr = (struct mqtthdr *) data;
  222: 	}
  223: 
  224: 	/* fixed header */
  225: 	MQTTHDR_MSGINIT(hdr);
  226: 	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
  227: 	hdr->mqtt_msg.qos = QOS;
  228: 	hdr->mqtt_msg.dup = Dup ? 1 : 0;
  229: 	hdr->mqtt_msg.retain = 0;
  230: 	*(u_int*) hdr->mqtt_len = n;
  231: 	data += siz;
  232: 
  233: 	/* variable header */
  234: 	mid = (mqtt_len_t*) (buf->msg_base + siz);
  235: 	mid->val = htons(msgID);
  236: 	data += sizeof(mqtt_len_t);
  237: 
  238: 	/* payload with subscriptions */
  239: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
  240: 		topic = (mqtthdr_var_t*) data;
  241: 		topic->var_sb.val = htons(t->sub_topic.msg_len);
  242: 		memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
  243: 		data += MQTTHDR_VAR_SIZEOF(topic);
  244: 	}
  245: 
  246: 	return siz;
  247: }
  248: 
  249: /*
  250:  * mqtt_msgUNSUBACK() Create UNSUBACK message
  251:  *
  252:  * @buf = Message buffer
  253:  * @msgID = MessageID
  254:  * return: -1 error or >-1 message size for send
  255:  */
  256: int
  257: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
  258: {
  259: 	int siz = 0;
  260: 	struct mqtthdr *hdr;
  261: 	mqtt_len_t *v;
  262: 
  263: 	if (!buf)
  264: 		return -1;
  265: 
  266: 	if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
  267: 		return -1;
  268: 	else {
  269: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  270: 		siz += sizeof(struct mqtthdr);
  271: 		v = (mqtt_len_t*) (buf->msg_base + siz);
  272: 		siz += sizeof(mqtt_len_t);
  273: 	}
  274: 
  275: 	/* fixed header */
  276: 	MQTTHDR_MSGINIT(hdr);
  277: 	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
  278: 	*hdr->mqtt_len = sizeof(mqtt_len_t);
  279: 
  280: 	/* MessageID */
  281: 	v->val = htons(msgID);
  282: 
  283: 	return siz;
  284: }
  285: 
  286: 
  287: /* ============= decode ============ */
  288: 
  289: /*
  290:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
  291:  *
  292:  * @buf = Message buffer
  293:  * @msgID = MessageID
  294:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
  295:  * return: -1 error or >-1 elements into subscr
  296:  */
  297: int
  298: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
  299: {
  300: 	register int i;
  301: 	int len, ret;
  302: 	struct mqtthdr *hdr;
  303: 	mqtthdr_var_t *var;
  304: 	mqtt_subscr_t *subs;
  305: 	mqtt_len_t *v;
  306: 	caddr_t pos;
  307: 
  308: 	if (!buf || !msgID || !subscr)
  309: 		return -1;
  310: 
  311: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
  312: 	if (!hdr)
  313: 		return -1;
  314: 	pos = buf->msg_base + ret + 1;
  315: 	v = (mqtt_len_t*) pos;
  316: 
  317: 	/* MessageID */
  318: 	len -= sizeof(mqtt_len_t);
  319: 	if (len < 0) {
  320: 		mqtt_SetErr(EINVAL, "Short message length %d", len);
  321: 		return -1;
  322: 	} else {
  323: 		*msgID = ntohs(v->val);
  324: 		pos += sizeof(mqtt_len_t);
  325: 	}
  326: 
  327: 	subs = mqtt_subAlloc(0);
  328: 	if (!subs)
  329: 		return -1;
  330: 	else
  331: 		*subscr = subs;
  332: 
  333: 	/* Subscribes */
  334: 	for (i = 0; len > 0; i++) {
  335: 		var = (mqtthdr_var_t*) pos;
  336: 		len -= MQTTHDR_VAR_SIZEOF(var) + 1;
  337: 		if (len < 0) {
  338: 			mqtt_subFree(subscr);
  339: 			mqtt_SetErr(EINVAL, "Short message length %d", len);
  340: 			return -1;
  341: 		}
  342: 		if (!mqtt_subRealloc(&subs, i + 1)) {
  343: 			mqtt_subFree(subscr);
  344: 			return -1;
  345: 		} else
  346: 			*subscr = subs;
  347: 
  348: 		memset(&subs[i], 0, sizeof subs[i]);
  349: 		subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
  350: 		subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
  351: 		if (!subs[i].sub_topic.msg_base) {
  352: 			LOGERR;
  353: 			mqtt_subFree(subscr);
  354: 			return -1;
  355: 		} else {
  356: 			memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
  357: 			((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
  358: 		}
  359: 		pos += MQTTHDR_VAR_SIZEOF(var);
  360: 
  361: 		subs[i].sub_ret = *pos;
  362: 		pos++;
  363: 	}
  364: 
  365: 	return i;
  366: }
  367: 
  368: /*
  369:  * mqtt_readSUBACK() Read SUBACK message
  370:  *
  371:  * @buf = Message buffer
  372:  * @msgID = MessageID
  373:  * @subqos = Subscribes QoS, must be free after use with free()
  374:  * return: -1 error or >-1 readed subscribes QoS elements
  375:  */
  376: int
  377: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
  378: {
  379: 	int len, ret;
  380: 	struct mqtthdr *hdr;
  381: 	mqtt_len_t *v;
  382: 	caddr_t pos;
  383: 
  384: 	if (!buf || !msgID || !subqos)
  385: 		return -1;
  386: 
  387: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
  388: 	if (!hdr)
  389: 		return -1;
  390: 	pos = buf->msg_base + ret + 1;
  391: 	v = (mqtt_len_t*) pos;
  392: 
  393: 	/* MessageID */
  394: 	len -= sizeof(mqtt_len_t);
  395: 	if (len < 0) {
  396: 		mqtt_SetErr(EINVAL, "Short message length %d", len);
  397: 		return -1;
  398: 	} else {
  399: 		*msgID = ntohs(v->val);
  400: 		pos += sizeof(mqtt_len_t);
  401: 	}
  402: 
  403: 	/* Subscribes */
  404: 	*subqos = malloc(len);
  405: 	if (!*subqos) {
  406: 		LOGERR;
  407: 		return -1;
  408: 	} else
  409: 		memcpy(*subqos, pos, len);
  410: 
  411: 	return len;
  412: }
  413: 
  414: /*
  415:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
  416:  *
  417:  * @buf = Message buffer
  418:  * @msgID = MessageID
  419:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
  420:  * return: -1 error or >-1 elements into subscr
  421:  */
  422: int
  423: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
  424: {
  425: 	register int i;
  426: 	int len, ret;
  427: 	struct mqtthdr *hdr;
  428: 	mqtthdr_var_t *var;
  429: 	mqtt_subscr_t *subs;
  430: 	mqtt_len_t *v;
  431: 	caddr_t pos;
  432: 
  433: 	if (!buf || !msgID || !subscr)
  434: 		return -1;
  435: 
  436: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
  437: 	if (!hdr)
  438: 		return -1;
  439: 	pos = buf->msg_base + ret + 1;
  440: 	v = (mqtt_len_t*) pos;
  441: 
  442: 	/* MessageID */
  443: 	len -= sizeof(mqtt_len_t);
  444: 	if (len < 0) {
  445: 		mqtt_SetErr(EINVAL, "Short message length %d", len);
  446: 		return -1;
  447: 	} else {
  448: 		*msgID = ntohs(v->val);
  449: 		pos += sizeof(mqtt_len_t);
  450: 	}
  451: 
  452: 	subs = mqtt_subAlloc(0);
  453: 	if (!subs)
  454: 		return -1;
  455: 	else
  456: 		*subscr = subs;
  457: 
  458: 	/* Subscribes */
  459: 	for (i = 0; len > 0; i++) {
  460: 		var = (mqtthdr_var_t*) pos;
  461: 		len -= MQTTHDR_VAR_SIZEOF(var);
  462: 		if (len < 0) {
  463: 			mqtt_subFree(subscr);
  464: 			mqtt_SetErr(EINVAL, "Short message length %d", len);
  465: 			return -1;
  466: 		}
  467: 		if (!mqtt_subRealloc(&subs, i + 1)) {
  468: 			mqtt_subFree(subscr);
  469: 			return -1;
  470: 		} else
  471: 			*subscr = subs;
  472: 
  473: 		memset(&subs[i], 0, sizeof subs[i]);
  474: 		subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
  475: 		subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
  476: 		if (!subs[i].sub_topic.msg_base) {
  477: 			LOGERR;
  478: 			mqtt_subFree(subscr);
  479: 			return -1;
  480: 		} else {
  481: 			memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
  482: 			((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
  483: 		}
  484: 		pos += MQTTHDR_VAR_SIZEOF(var);
  485: 	}
  486: 
  487: 	return i;
  488: }
  489: 
  490: /*
  491:  * mqtt_readUNSUBACK() Read UNSUBACK message
  492:  *
  493:  * @buf = Message buffer
  494:  * return: -1 error or MessageID
  495:  */
  496: u_short
  497: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
  498: {
  499: 	int len, ret;
  500: 	struct mqtthdr *hdr;
  501: 	mqtt_len_t *v;
  502: 	caddr_t pos;
  503: 
  504: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
  505: 	if (!hdr)
  506: 		return (u_short) -1;
  507: 	if (len < sizeof(mqtt_len_t)) {
  508: 		mqtt_SetErr(EINVAL, "Short message length %d", len);
  509: 		return (u_short) -1;
  510: 	} else {
  511: 		pos = buf->msg_base + ret + 1;
  512: 		v = (mqtt_len_t*) pos;
  513: 	}
  514: 
  515: 	return ntohs(v->val);
  516: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>