File:  [ELWIX - Embedded LightWeight unIX -] / libaitmqtt / src / sub.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Thu Jan 26 13:07:33 2012 UTC (12 years, 5 months ago) by misho
Branches: misho
CVS tags: start, mqtt1_1, MQTT1_0
MQTT library

    1: /*************************************************************************
    2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
    3: *  by Michael Pounov <misho@openbsd-bg.org>
    4: *
    5: * $Author: misho $
    6: * $Id: sub.c,v 1.1.1.1 2012/01/26 13:07:33 misho Exp $
    7: *
    8: **************************************************************************
    9: The ELWIX and AITNET software is distributed under the following
   10: terms:
   11: 
   12: All of the documentation and software included in the ELWIX and AITNET
   13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   14: 
   15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011
   16: 	by Michael Pounov <misho@elwix.org>.  All rights reserved.
   17: 
   18: Redistribution and use in source and binary forms, with or without
   19: modification, are permitted provided that the following conditions
   20: are met:
   21: 1. Redistributions of source code must retain the above copyright
   22:    notice, this list of conditions and the following disclaimer.
   23: 2. Redistributions in binary form must reproduce the above copyright
   24:    notice, this list of conditions and the following disclaimer in the
   25:    documentation and/or other materials provided with the distribution.
   26: 3. All advertising materials mentioning features or use of this software
   27:    must display the following acknowledgement:
   28: This product includes software developed by Michael Pounov <misho@elwix.org>
   29: ELWIX - Embedded LightWeight unIX and its contributors.
   30: 4. Neither the name of AITNET nor the names of its contributors
   31:    may be used to endorse or promote products derived from this software
   32:    without specific prior written permission.
   33: 
   34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
   35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
   36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
   37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
   38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
   39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
   40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
   41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
   43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   44: SUCH DAMAGE.
   45: */
   46: #include "global.h"
   47: 
   48: 
   49: /* ------------------------------------------------------------------- */
   50: 
   51: /*
   52:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
   53:  *
   54:  * @buf = Message buffer
   55:  * @Topics = MQTT subscription topics
   56:  * @msgID = MessageID
   57:  * @Dup = Duplicate message
   58:  * @QOS = QoS
   59:  * return: -1 error or >-1 message size for send
   60:  */
   61: int
   62: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
   63: 		u_short msgID, u_char Dup, u_char QOS)
   64: {
   65: 	int siz = 0;
   66: 	struct mqtthdr *hdr;
   67: 	mqtthdr_var_t *topic;
   68: 	mqtt_len_t *mid;
   69: 	mqtt_subscr_t *t;
   70: 	u_char *qos;
   71: 
   72: 	if (!buf || !Topics)
   73: 		return -1;
   74: 	if (QOS > MQTT_QOS_EXACTLY) {
   75: 		mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
   76: 		return -1;
   77: 	}
   78: 	if (!msgID && QOS != MQTT_QOS_ONCE) {
   79: 		mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
   80: 		return -1;
   81: 	}
   82: 
   83: 	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
   84: 		return -1;
   85: 	else {
   86: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
   87: 		siz += sizeof(struct mqtthdr);
   88: 	}
   89: 
   90: 	/* variable header */
   91: 	mid = (mqtt_len_t*) (buf->msg_base + siz);
   92: 	mid->val = htons(msgID);
   93: 	siz += sizeof(mqtt_len_t);
   94: 
   95: 	/* payload with subscriptions */
   96: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
   97: 		topic = (mqtthdr_var_t*) (buf->msg_base + siz);
   98: 		topic->var_sb.val = htons(t->sub_topic.msg_len);
   99: 		memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
  100: 		siz += MQTTHDR_VAR_SIZEOF(topic);
  101: 		qos = (buf->msg_base + siz);
  102: 		*qos = t->sub_ret;
  103: 		siz++;
  104: 	}
  105: 
  106: 	/* fixed header */
  107: 	MQTTHDR_MSGINIT(hdr);
  108: 	hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
  109: 	hdr->mqtt_msg.qos = QOS;
  110: 	hdr->mqtt_msg.dup = Dup ? 1 : 0;
  111: 	hdr->mqtt_msg.retain = 0;
  112: 	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
  113: 
  114: 	mqtt_msgRealloc(buf, siz);
  115: 	return siz;
  116: }
  117: 
  118: /*
  119:  * mqtt_msgSUBACK() Create SUBACK message
  120:  *
  121:  * @buf = Message buffer
  122:  * @Topics = MQTT subscription topics
  123:  * @msgID = MessageID
  124:  * return: -1 error or >-1 message size for send
  125:  */
  126: int
  127: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
  128: {
  129: 	int siz = 0;
  130: 	struct mqtthdr *hdr;
  131: 	mqtt_len_t *v;
  132: 	mqtt_subscr_t *t;
  133: 	u_char *qos;
  134: 
  135: 	if (!buf || !Topics)
  136: 		return -1;
  137: 
  138: 	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
  139: 		return -1;
  140: 	else {
  141: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  142: 		siz += sizeof(struct mqtthdr);
  143: 		v = (mqtt_len_t*) (buf->msg_base + siz);
  144: 		siz += sizeof(mqtt_len_t);
  145: 	}
  146: 
  147: 	/* MessageID */
  148: 	v->val = htons(msgID);
  149: 
  150: 	/* QoS payload from subscriptions */
  151: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
  152: 		qos = (buf->msg_base + siz);
  153: 		*qos = t->sub_ret;
  154: 		siz++;
  155: 	}
  156: 
  157: 	/* fixed header */
  158: 	MQTTHDR_MSGINIT(hdr);
  159: 	hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
  160: 	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
  161: 
  162: 	mqtt_msgRealloc(buf, siz);
  163: 	return siz;
  164: }
  165: 
  166: /*
  167:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
  168:  *
  169:  * @buf = Message buffer
  170:  * @Topics = MQTT subscription topics
  171:  * @msgID = MessageID
  172:  * @Dup = Duplicate message
  173:  * @QOS = QoS
  174:  * return: -1 error or >-1 message size for send
  175:  */
  176: int
  177: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
  178: 		u_short msgID, u_char Dup, u_char QOS)
  179: {
  180: 	int siz = 0;
  181: 	struct mqtthdr *hdr;
  182: 	mqtthdr_var_t *topic;
  183: 	mqtt_len_t *mid;
  184: 	mqtt_subscr_t *t;
  185: 
  186: 	if (!buf || !Topics)
  187: 		return -1;
  188: 	if (QOS > MQTT_QOS_EXACTLY) {
  189: 		mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
  190: 		return -1;
  191: 	}
  192: 	if (!msgID && QOS != MQTT_QOS_ONCE) {
  193: 		mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
  194: 		return -1;
  195: 	}
  196: 
  197: 	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
  198: 		return -1;
  199: 	else {
  200: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  201: 		siz += sizeof(struct mqtthdr);
  202: 	}
  203: 
  204: 	/* variable header */
  205: 	mid = (mqtt_len_t*) (buf->msg_base + siz);
  206: 	mid->val = htons(msgID);
  207: 	siz += sizeof(mqtt_len_t);
  208: 
  209: 	/* payload with subscriptions */
  210: 	for (t = Topics; t && t->sub_topic.msg_base; t++) {
  211: 		topic = (mqtthdr_var_t*) (buf->msg_base + siz);
  212: 		topic->var_sb.val = htons(t->sub_topic.msg_len);
  213: 		memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
  214: 		siz += MQTTHDR_VAR_SIZEOF(topic);
  215: 	}
  216: 
  217: 	/* fixed header */
  218: 	MQTTHDR_MSGINIT(hdr);
  219: 	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
  220: 	hdr->mqtt_msg.qos = QOS;
  221: 	hdr->mqtt_msg.dup = Dup ? 1 : 0;
  222: 	hdr->mqtt_msg.retain = 0;
  223: 	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
  224: 
  225: 	mqtt_msgRealloc(buf, siz);
  226: 	return siz;
  227: }
  228: 
  229: /*
  230:  * mqtt_msgUNSUBACK() Create UNSUBACK message
  231:  *
  232:  * @buf = Message buffer
  233:  * @msgID = MessageID
  234:  * return: -1 error or >-1 message size for send
  235:  */
  236: int
  237: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
  238: {
  239: 	int siz = 0;
  240: 	struct mqtthdr *hdr;
  241: 	mqtt_len_t *v;
  242: 
  243: 	if (!buf)
  244: 		return -1;
  245: 
  246: 	if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1)
  247: 		return -1;
  248: 	else {
  249: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  250: 		siz += sizeof(struct mqtthdr);
  251: 		v = (mqtt_len_t*) (buf->msg_base + siz);
  252: 		siz += sizeof(mqtt_len_t);
  253: 	}
  254: 
  255: 	/* fixed header */
  256: 	MQTTHDR_MSGINIT(hdr);
  257: 	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
  258: 	*hdr->mqtt_len = sizeof(mqtt_len_t);
  259: 
  260: 	/* MessageID */
  261: 	v->val = htons(msgID);
  262: 
  263: 	return siz;
  264: }
  265: 
  266: 
  267: /* ============= decode ============ */
  268: 
  269: /*
  270:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
  271:  *
  272:  * @buf = Message buffer
  273:  * @msgID = MessageID
  274:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
  275:  * return: NULL error or !=NULL MQTT fixed header
  276:  */
  277: struct mqtthdr *
  278: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
  279: {
  280: 	register int i;
  281: 	int len, ret;
  282: 	struct mqtthdr *hdr;
  283: 	mqtthdr_var_t *var;
  284: 	mqtt_subscr_t *subs;
  285: 	mqtt_len_t *v;
  286: 	caddr_t pos;
  287: 
  288: 	if (!buf || !msgID || !subscr)
  289: 		return NULL;
  290: 
  291: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
  292: 	if (!hdr)
  293: 		return NULL;
  294: 	pos = buf->msg_base + ret + 1;
  295: 	v = (mqtt_len_t*) pos;
  296: 
  297: 	/* MessageID */
  298: 	len -= sizeof(mqtt_len_t);
  299: 	if (len < 0) {
  300: 		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  301: 		return NULL;
  302: 	} else {
  303: 		*msgID = ntohs(v->val);
  304: 		pos += sizeof(mqtt_len_t);
  305: 	}
  306: 
  307: 	subs = mqtt_subAlloc(0);
  308: 	if (!subs)
  309: 		return NULL;
  310: 	else
  311: 		*subscr = subs;
  312: 
  313: 	/* Subscribes */
  314: 	for (i = 0; len > 0; i++) {
  315: 		var = (mqtthdr_var_t*) pos;
  316: 		len -= MQTTHDR_VAR_SIZEOF(var) + 1;
  317: 		if (len < 0) {
  318: 			mqtt_subFree(subscr);
  319: 			mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  320: 			return NULL;
  321: 		}
  322: 		subs = mqtt_subRealloc(subs, i + 1);
  323: 		if (!subs) {
  324: 			mqtt_subFree(subscr);
  325: 			return NULL;
  326: 		} else
  327: 			*subscr = subs;
  328: 
  329: 		memset(&subs[i], 0, sizeof subs[i]);
  330: 		subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
  331: 		subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
  332: 		if (!subs[i].sub_topic.msg_base) {
  333: 			LOGERR;
  334: 			mqtt_subFree(subscr);
  335: 			return NULL;
  336: 		} else
  337: 			memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
  338: 		pos += MQTTHDR_VAR_SIZEOF(var);
  339: 
  340: 		subs[i].sub_ret = *pos;
  341: 		pos++;
  342: 	}
  343: 
  344: 	return hdr;
  345: }
  346: 
  347: /*
  348:  * mqtt_readSUBACK() Read SUBACK message
  349:  *
  350:  * @buf = Message buffer
  351:  * @msgID = MessageID
  352:  * @subqos = Subscribes QoS, must be free after use with free()
  353:  * return: -1 error or >-1 readed subscribes QoS elements
  354:  */
  355: int
  356: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
  357: {
  358: 	int len, ret;
  359: 	struct mqtthdr *hdr;
  360: 	mqtt_len_t *v;
  361: 	caddr_t pos;
  362: 
  363: 	if (!buf || !msgID || !subqos)
  364: 		return -1;
  365: 
  366: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
  367: 	if (!hdr)
  368: 		return -1;
  369: 	pos = buf->msg_base + ret + 1;
  370: 	v = (mqtt_len_t*) pos;
  371: 
  372: 	/* MessageID */
  373: 	len -= sizeof(mqtt_len_t);
  374: 	if (len < 0) {
  375: 		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  376: 		return -1;
  377: 	} else {
  378: 		*msgID = ntohs(v->val);
  379: 		pos += sizeof(mqtt_len_t);
  380: 	}
  381: 
  382: 	/* Subscribes */
  383: 	*subqos = malloc(len);
  384: 	if (!*subqos) {
  385: 		LOGERR;
  386: 		return -1;
  387: 	} else
  388: 		memcpy(*subqos, pos, len);
  389: 
  390: 	return len;
  391: }
  392: 
  393: /*
  394:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
  395:  *
  396:  * @buf = Message buffer
  397:  * @msgID = MessageID
  398:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
  399:  * return: NULL error or !=NULL MQTT fixed header
  400:  */
  401: struct mqtthdr *
  402: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
  403: {
  404: 	register int i;
  405: 	int len, ret;
  406: 	struct mqtthdr *hdr;
  407: 	mqtthdr_var_t *var;
  408: 	mqtt_subscr_t *subs;
  409: 	mqtt_len_t *v;
  410: 	caddr_t pos;
  411: 
  412: 	if (!buf || !msgID || !subscr)
  413: 		return NULL;
  414: 
  415: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
  416: 	if (!hdr)
  417: 		return NULL;
  418: 	pos = buf->msg_base + ret + 1;
  419: 	v = (mqtt_len_t*) pos;
  420: 
  421: 	/* MessageID */
  422: 	len -= sizeof(mqtt_len_t);
  423: 	if (len < 0) {
  424: 		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  425: 		return NULL;
  426: 	} else {
  427: 		*msgID = ntohs(v->val);
  428: 		pos += sizeof(mqtt_len_t);
  429: 	}
  430: 
  431: 	subs = mqtt_subAlloc(0);
  432: 	if (!subs)
  433: 		return NULL;
  434: 	else
  435: 		*subscr = subs;
  436: 
  437: 	/* Subscribes */
  438: 	for (i = 0; len > 0; i++) {
  439: 		var = (mqtthdr_var_t*) pos;
  440: 		len -= MQTTHDR_VAR_SIZEOF(var);
  441: 		if (len < 0) {
  442: 			mqtt_subFree(subscr);
  443: 			mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  444: 			return NULL;
  445: 		}
  446: 		subs = mqtt_subRealloc(subs, i + 1);
  447: 		if (!subs) {
  448: 			mqtt_subFree(subscr);
  449: 			return NULL;
  450: 		} else
  451: 			*subscr = subs;
  452: 
  453: 		memset(&subs[i], 0, sizeof subs[i]);
  454: 		subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
  455: 		subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);
  456: 		if (!subs[i].sub_topic.msg_base) {
  457: 			LOGERR;
  458: 			mqtt_subFree(subscr);
  459: 			return NULL;
  460: 		} else
  461: 			memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
  462: 		pos += MQTTHDR_VAR_SIZEOF(var);
  463: 	}
  464: 
  465: 	return hdr;
  466: }
  467: 
  468: /*
  469:  * mqtt_readUNSUBACK() Read UNSUBACK message
  470:  *
  471:  * @buf = Message buffer
  472:  * return: -1 error or MessageID
  473:  */
  474: u_short
  475: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
  476: {
  477: 	int len, ret;
  478: 	struct mqtthdr *hdr;
  479: 	mqtt_len_t *v;
  480: 	caddr_t pos;
  481: 
  482: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
  483: 	if (!hdr)
  484: 		return (u_short) -1;
  485: 	if (len < sizeof(mqtt_len_t)) {
  486: 		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  487: 		return (u_short) -1;
  488: 	} else {
  489: 		pos = buf->msg_base + ret + 1;
  490: 		v = (mqtt_len_t*) pos;
  491: 	}
  492: 
  493: 	return ntohs(v->val);
  494: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>