File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / Attic / pub.c
Revision 1.1.2.9: download - view: text, annotated - select for diffs - revision graph
Tue Dec 6 10:33:37 2011 UTC (12 years, 9 months ago) by misho
Branches: mqtt1_0
add read for subscribe message

#include "global.h"


/* ------------------------------------------------------------------- */

/*
 * mqtt_msgPUBLISH() Create PUBLISH message
 *
 * @buf = Message buffer
 * @csTopic = Publish topic
 * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
 * @Dup = Duplicate message
 * @QOS = QoS
 * @Retain = Retain message
 * @pData = Publish data into topic
 * @datlen = Publish data length
 * return: -1 error or >-1 message size for send
 */
int
mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID, 
		u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
{
	int siz = 0;
	struct mqtthdr *hdr;
	mqtthdr_var_t *topic;
	mqtt_v_t *mid;
	void *data;

	if (!buf || !csTopic)
		return -1;
	if (QOS > MQTT_QOS_EXACTLY) {
		mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
		return -1;
	}
	if (!msgID && QOS != MQTT_QOS_ONCE) {
		mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
		return -1;
	}

	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
		return -1;
	else {
		hdr = (struct mqtthdr *) (buf->msg_base + siz);
		siz += sizeof(struct mqtthdr);
	}

	/* variable header */
	topic = (mqtthdr_var_t*) (buf->msg_base + siz);
	topic->var_sb.val = htons(strlen(csTopic));
	memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
	siz += MQTTHDR_VAR_SIZEOF(topic);

	mid = (mqtt_v_t*) (buf->msg_base + siz);
	mid->val = htons(msgID);
	siz += sizeof(mqtt_v_t);

	/* load with data */
	if (pData && datlen) {
		data = buf->msg_base + siz;
		memcpy(data, pData, datlen);
		siz += datlen;
	}

	/* fixed header */
	MQTTHDR_MSGINIT(hdr);
	hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
	hdr->mqtt_msg.qos = QOS;
	hdr->mqtt_msg.dup = Dup ? 1 : 0;
	hdr->mqtt_msg.retain = Retain ? 1 : 0;
	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));

	mqtt_msgRealloc(buf, siz);
	return siz;
}

static int
_mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
{
	int siz = 0;
	struct mqtthdr *hdr;
	mqtt_v_t *v;

	if (!buf)
		return -1;

	if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
		return -1;
	else {
		hdr = (struct mqtthdr *) (buf->msg_base + siz);
		siz += sizeof(struct mqtthdr);
		v = (mqtt_v_t*) (buf->msg_base + siz);
		siz += sizeof(mqtt_v_t);
	}

	/* fixed header */
	MQTTHDR_MSGINIT(hdr);
	hdr->mqtt_msg.type = cmd;
	*hdr->mqtt_len = sizeof(mqtt_v_t);

	/* MessageID */
	v->val = htons(msgID);

	return siz;
}

/*
 * mqtt_msgPUBACK() Create PUBACK message
 *
 * @buf = Message buffer
 * @msgID = MessageID
 * return: -1 error or >-1 message size for send
 */
inline int
mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
{
	return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
}

/*
 * mqtt_msgPUBREC() Create PUBREC message
 *
 * @buf = Message buffer
 * @msgID = MessageID
 * return: -1 error or >-1 message size for send
 */
inline int
mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
{
	return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
}

/*
 * mqtt_msgPUBREL() Create PUBREL message
 *
 * @buf = Message buffer
 * @msgID = MessageID
 * return: -1 error or >-1 message size for send
 */
inline int
mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
{
	return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
}

/*
 * mqtt_msgPUBCOMP() Create PUBCOMP message
 *
 * @buf = Message buffer
 * @msgID = MessageID
 * return: -1 error or >-1 message size for send
 */
inline int
mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
{
	return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
}


/* ============= decode ============ */

/*
 * mqtt_readPUBLISH() Read PUBLISH message
 *
 * @buf = Message buffer
 * @psTopic = Topic
 * @topicLen = Topic length
 * @msgID = MessageID
 * @pData = Data buffer
 * @datLen = Data buffer length, if *datLen == 0 allocate memory for pData
 * return: NULL error or !=NULL MQTT fixed header
 */
struct mqtthdr *
mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen, 
		u_short *msgID, void * __restrict pData, int *datLen)
{
	int len, ret;
	struct mqtthdr *hdr;
	mqtthdr_var_t *var;
	mqtt_v_t *v;
	caddr_t pos;

	if (!buf || !psTopic || !msgID || !pData)
		return NULL;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
	if (!hdr)
		return NULL;
	pos = buf->msg_base + ret + 1;
	var = (mqtthdr_var_t*) pos;

	/* topic */
	len -= MQTTHDR_VAR_SIZEOF(var);
	if (len < 0) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return NULL;
	} else {
		memset(psTopic, 0, topicLen--);
		memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ? 
				topicLen : ntohs(var->var_sb.val));
		pos += MQTTHDR_VAR_SIZEOF(var);
		v = (mqtt_v_t*) pos;
	}

	len -= sizeof(mqtt_v_t);
	if (len < 0) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return NULL;
	} else {
		*msgID = ntohs(v->val);
		pos += sizeof(mqtt_v_t);
	}

	/* data */
	if (len < 0) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return NULL;
	} else {
		if (!*datLen) {
			if (!(pData = malloc(len))) {
				LOGERR;
				return NULL;
			} else
				*datLen = len;
		}

		memset(pData, 0, *datLen);
		if (len < *datLen)
			*datLen = len;
		memcpy(pData, pos, *datLen);
	}

	return hdr;
}

/*
 * mqtt_readPUBACK() Read PUBACK message
 *
 * @buf = Message buffer
 * return: -1 error or MessageID
 */
u_short
mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
{
	int len, ret;
	struct mqtthdr *hdr;
	mqtt_v_t *v;
	caddr_t pos;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
	if (!hdr)
		return (u_short) -1;
	if (len < sizeof(mqtt_v_t)) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return (u_short) -1;
	} else {
		pos = buf->msg_base + ret + 1;
		v = (mqtt_v_t*) pos;
	}

	return ntohs(v->val);
}

/*
 * mqtt_readPUBREC() Read PUBREC message
 *
 * @buf = Message buffer
 * return: -1 error or MessageID
 */
u_short
mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
{
	int len, ret;
	struct mqtthdr *hdr;
	mqtt_v_t *v;
	caddr_t pos;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
	if (!hdr)
		return (u_short) -1;
	if (len < sizeof(mqtt_v_t)) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return (u_short) -1;
	} else {
		pos = buf->msg_base + ret + 1;
		v = (mqtt_v_t*) pos;
	}

	return ntohs(v->val);
}

/*
 * mqtt_readPUBREL() Read PUBREL message
 *
 * @buf = Message buffer
 * return: -1 error or MessageID
 */
u_short
mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
{
	int len, ret;
	struct mqtthdr *hdr;
	mqtt_v_t *v;
	caddr_t pos;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
	if (!hdr)
		return (u_short) -1;
	if (len < sizeof(mqtt_v_t)) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return (u_short) -1;
	} else {
		pos = buf->msg_base + ret + 1;
		v = (mqtt_v_t*) pos;
	}

	return ntohs(v->val);
}

/*
 * mqtt_readPUBCOMP() Read PUBCOMP message
 *
 * @buf = Message buffer
 * return: -1 error or MessageID
 */
u_short
mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
{
	int len, ret;
	struct mqtthdr *hdr;
	mqtt_v_t *v;
	caddr_t pos;

	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
	if (!hdr)
		return (u_short) -1;
	if (len < sizeof(mqtt_v_t)) {
		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
		return (u_short) -1;
	} else {
		pos = buf->msg_base + ret + 1;
		v = (mqtt_v_t*) pos;
	}

	return ntohs(v->val);
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>