File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / Attic / sub.c
Revision 1.2: download - view: text, annotated - select for diffs - revision graph
Fri Jan 27 15:05:38 2012 UTC (12 years, 5 months ago) by misho
Branches: MAIN
CVS tags: mqtt1_1, HEAD
added new files

#include "global.h"


/* ------------------------------------------------------------------- */

/*
 * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
 *
 * @buf = Message buffer
 * @Topics = MQTT subscription topics
 * @msgID = MessageID
 * @Dup = Duplicate message
 * @QOS = QoS
 * return: -1 error or >-1 message size for send
 */
int
mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
		u_short msgID, u_char Dup, u_char QOS)
{
	int siz = 0;
	struct mqtthdr *hdr;
	mqtthdr_var_t *topic;
	mqtt_v_t *mid;
	mqtt_subscr_t *t;
	u_char *qos;

	if (!buf || !Topics)
		return -1;
	if (QOS > MQTT_QOS_EXACTLY) {
		mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
		return -1;
	}
	if (!msgID && QOS != MQTT_QOS_ONCE) {
		mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
		return -1;
	}

	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
		return -1;
	else {
		hdr = (struct mqtthdr *) (buf->msg_base + siz);
		siz += sizeof(struct mqtthdr);
	}

	/* variable header */
	mid = (mqtt_v_t*) (buf->msg_base + siz);
	mid->val = htons(msgID);
	siz += sizeof(mqtt_v_t);

	/* payload with subscriptions */
	for (t = Topics; t && t->sub_topic._base; t++) {
		topic = (mqtthdr_var_t*) (buf->msg_base + siz);
		topic->var_sb.val = htons(t->sub_topic._size);
		memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
		siz += MQTTHDR_VAR_SIZEOF(topic);
		qos = (buf->msg_base + siz);
		*qos = t->sub_ret;
		siz++;
	}

	/* fixed header */
	MQTTHDR_MSGINIT(hdr);
	hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
	hdr->mqtt_msg.qos = QOS;
	hdr->mqtt_msg.dup = Dup ? 1 : 0;
	hdr->mqtt_msg.retain = 0;
	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));

	mqtt_msgRealloc(buf, siz);
	return siz;
}

/*
 * mqtt_msgSUBACK() Create SUBACK message
 *
 * @buf = Message buffer
 * @Topics = MQTT subscription topics
 * @msgID = MessageID
 * return: -1 error or >-1 message size for send
 */
int
mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
{
	int siz = 0;
	struct mqtthdr *hdr;
	mqtt_v_t *v;
	mqtt_subscr_t *t;
	u_char *qos;

	if (!buf || !Topics)
		return -1;

	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
		return -1;
	else {
		hdr = (struct mqtthdr *) (buf->msg_base + siz);
		siz += sizeof(struct mqtthdr);
		v = (mqtt_v_t*) (buf->msg_base + siz);
		siz += sizeof(mqtt_v_t);
	}

	/* MessageID */
	v->val = htons(msgID);

	/* QoS payload from subscriptions */
	for (t = Topics; t && t->sub_topic._base; t++) {
		qos = (buf->msg_base + siz);
		*qos = t->sub_ret;
		siz++;
	}

	/* fixed header */
	MQTTHDR_MSGINIT(hdr);
	hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));

	mqtt_msgRealloc(buf, siz);
	return siz;
}

/*
 * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
 *
 * @buf = Message buffer
 * @Topics = MQTT subscription topics
 * @msgID = MessageID
 * @Dup = Duplicate message
 * @QOS = QoS
 * return: -1 error or >-1 message size for send
 */
int
mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
		u_short msgID, u_char Dup, u_char QOS)
{
	int siz = 0;
	struct mqtthdr *hdr;
	mqtthdr_var_t *topic;
	mqtt_v_t *mid;
	mqtt_subscr_t *t;

	if (!buf || !Topics)
		return -1;
	if (QOS > MQTT_QOS_EXACTLY) {
		mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
		return -1;
	}
	if (!msgID && QOS != MQTT_QOS_ONCE) {
		mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
		return -1;
	}

	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
		return -1;
	else {
		hdr = (struct mqtthdr *) (buf->msg_base + siz);
		siz += sizeof(struct mqtthdr);
	}

	/* variable header */
	mid = (mqtt_v_t*) (buf->msg_base + siz);
	mid->val = htons(msgID);
	siz += sizeof(mqtt_v_t);

	/* payload with subscriptions */
	for (t = Topics; t && t->sub_topic._base; t++) {
		topic = (mqtthdr_var_t*) (buf->msg_base + siz);
		topic->var_sb.val = htons(t->sub_topic._size);
		memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
		siz += MQTTHDR_VAR_SIZEOF(topic);
	}

	/* fixed header */
	MQTTHDR_MSGINIT(hdr);
	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
	hdr->mqtt_msg.qos = QOS;
	hdr->mqtt_msg.dup = Dup ? 1 : 0;
	hdr->mqtt_msg.retain = 0;
	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));

	mqtt_msgRealloc(buf, siz);
	return siz;
}

/*
 * mqtt_msgUNSUBACK() Create UNSUBACK message
 *
 * @buf = Message buffer
 * @msgID = MessageID
 * return: -1 error or >-1 message size for send
 */
int
mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
{
	int siz = 0;
	struct mqtthdr *hdr;
	mqtt_v_t *v;

	if (!buf)
		return -1;

	if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
		return -1;
	else {
		hdr = (struct mqtthdr *) (buf->msg_base + siz);
		siz += sizeof(struct mqtthdr);
		v = (mqtt_v_t*) (buf->msg_base + siz);
		siz += sizeof(mqtt_v_t);
	}

	/* fixed header */
	MQTTHDR_MSGINIT(hdr);
	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
	*hdr->mqtt_len = sizeof(mqtt_v_t);

	/* MessageID */
	v->val = htons(msgID);

	return siz;
}


/* ============= decode ============ */

/*
 * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
 *
 * @buf = Message buffer
 * @msgID = MessageID
 * @subscr = Subscriptions, must be free after use with mqtt_subFree()
 * return: NULL error or !=NULL MQTT fixed header
 */
struct mqtthdr *
mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
{
	register int i;
	int len, ret;
	struct mqtthdr *hdr;
	mqtthdr_var_t *var;
	mqtt_subscr_t *subs;
	mqtt_v_t *v;
	caddr_t pos;

	if (!buf || !msgID || !subscr)
		return NULL;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
	if (!hdr)
		return NULL;
	pos = buf->msg_base + ret + 1;
	v = (mqtt_v_t*) pos;

	/* MessageID */
	len -= sizeof(mqtt_v_t);
	if (len < 0) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return NULL;
	} else {
		*msgID = ntohs(v->val);
		pos += sizeof(mqtt_v_t);
	}

	subs = mqtt_subAlloc(0);
	if (!subs)
		return NULL;
	else
		*subscr = subs;

	/* Subscribes */
	for (i = 0; len > 0; i++) {
		var = (mqtthdr_var_t*) pos;
		len -= MQTTHDR_VAR_SIZEOF(var) + 1;
		if (len < 0) {
			mqtt_subFree(subscr);
			mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
			return NULL;
		}
		subs = mqtt_subRealloc(subs, i + 1);
		if (!subs) {
			mqtt_subFree(subscr);
			return NULL;
		} else
			*subscr = subs;

		memset(&subs[i], 0, sizeof subs[i]);
		subs[i].sub_topic._size = ntohs(var->var_sb.val);
		subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
		if (!subs[i].sub_topic._base) {
			LOGERR;
			mqtt_subFree(subscr);
			return NULL;
		} else
			memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
		pos += MQTTHDR_VAR_SIZEOF(var);

		subs[i].sub_ret = *pos;
		pos++;
	}

	return hdr;
}

/*
 * mqtt_readSUBACK() Read SUBACK message
 *
 * @buf = Message buffer
 * @msgID = MessageID
 * @subqos = Subscribes QoS, must be free after use with free()
 * return: -1 error or >-1 readed subscribes QoS elements
 */
int
mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
{
	int len, ret;
	struct mqtthdr *hdr;
	mqtt_v_t *v;
	caddr_t pos;

	if (!buf || !msgID || !subqos)
		return -1;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
	if (!hdr)
		return -1;
	pos = buf->msg_base + ret + 1;
	v = (mqtt_v_t*) pos;

	/* MessageID */
	len -= sizeof(mqtt_v_t);
	if (len < 0) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return -1;
	} else {
		*msgID = ntohs(v->val);
		pos += sizeof(mqtt_v_t);
	}

	/* Subscribes */
	*subqos = malloc(len);
	if (!*subqos) {
		LOGERR;
		return -1;
	} else
		memcpy(*subqos, pos, len);

	return len;
}

/*
 * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
 *
 * @buf = Message buffer
 * @msgID = MessageID
 * @subscr = Subscriptions, must be free after use with mqtt_subFree()
 * return: NULL error or !=NULL MQTT fixed header
 */
struct mqtthdr *
mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
{
	register int i;
	int len, ret;
	struct mqtthdr *hdr;
	mqtthdr_var_t *var;
	mqtt_subscr_t *subs;
	mqtt_v_t *v;
	caddr_t pos;

	if (!buf || !msgID || !subscr)
		return NULL;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
	if (!hdr)
		return NULL;
	pos = buf->msg_base + ret + 1;
	v = (mqtt_v_t*) pos;

	/* MessageID */
	len -= sizeof(mqtt_v_t);
	if (len < 0) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return NULL;
	} else {
		*msgID = ntohs(v->val);
		pos += sizeof(mqtt_v_t);
	}

	subs = mqtt_subAlloc(0);
	if (!subs)
		return NULL;
	else
		*subscr = subs;

	/* Subscribes */
	for (i = 0; len > 0; i++) {
		var = (mqtthdr_var_t*) pos;
		len -= MQTTHDR_VAR_SIZEOF(var);
		if (len < 0) {
			mqtt_subFree(subscr);
			mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
			return NULL;
		}
		subs = mqtt_subRealloc(subs, i + 1);
		if (!subs) {
			mqtt_subFree(subscr);
			return NULL;
		} else
			*subscr = subs;

		memset(&subs[i], 0, sizeof subs[i]);
		subs[i].sub_topic._size = ntohs(var->var_sb.val);
		subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
		if (!subs[i].sub_topic._base) {
			LOGERR;
			mqtt_subFree(subscr);
			return NULL;
		} else
			memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
		pos += MQTTHDR_VAR_SIZEOF(var);
	}

	return hdr;
}

/*
 * mqtt_readUNSUBACK() Read UNSUBACK message
 *
 * @buf = Message buffer
 * return: -1 error or MessageID
 */
u_short
mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
{
	int len, ret;
	struct mqtthdr *hdr;
	mqtt_v_t *v;
	caddr_t pos;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
	if (!hdr)
		return (u_short) -1;
	if (len < sizeof(mqtt_v_t)) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return (u_short) -1;
	} else {
		pos = buf->msg_base + ret + 1;
		v = (mqtt_v_t*) pos;
	}

	return ntohs(v->val);
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>