#include "global.h"
/* ------------------------------------------------------------------- */
/*
* mqtt_msgPUBLISH() Create PUBLISH message
*
* @buf = Message buffer
* @csTopic = Publish topic
* @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
* @Dup = Duplicate message
* @QOS = QoS
* @Retain = Retain message
* @pData = Publish data into topic
* @datlen = Publish data length
* return: -1 error or >-1 message size for send
*/
int
mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID,
u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
{
int siz = 0;
struct mqtthdr *hdr;
mqtthdr_var_t *topic;
mqtt_v_t *mid;
void *data;
if (!buf || !csTopic)
return -1;
if (QOS > MQTT_QOS_EXACTLY) {
mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
return -1;
}
if (!msgID && QOS != MQTT_QOS_ONCE) {
mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
return -1;
}
if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
return -1;
else {
hdr = (struct mqtthdr *) (buf->msg_base + siz);
siz += sizeof(struct mqtthdr);
}
/* variable header */
topic = (mqtthdr_var_t*) (buf->msg_base + siz);
topic->var_sb.val = htons(strlen(csTopic));
memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
siz += MQTTHDR_VAR_SIZEOF(topic);
mid = (mqtt_v_t*) (buf->msg_base + siz);
mid->val = htons(msgID);
siz += sizeof(mqtt_v_t);
/* load with data */
if (pData && datlen) {
data = buf->msg_base + siz;
memcpy(data, pData, datlen);
siz += datlen;
}
/* fixed header */
MQTTHDR_MSGINIT(hdr);
hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
hdr->mqtt_msg.qos = QOS;
hdr->mqtt_msg.dup = Dup ? 1 : 0;
hdr->mqtt_msg.retain = Retain ? 1 : 0;
*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
mqtt_msgRealloc(buf, siz);
return siz;
}
static int
_mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
{
int siz = 0;
struct mqtthdr *hdr;
mqtt_v_t *v;
if (!buf)
return -1;
if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
return -1;
else {
hdr = (struct mqtthdr *) (buf->msg_base + siz);
siz += sizeof(struct mqtthdr);
v = (mqtt_v_t*) (buf->msg_base + siz);
siz += sizeof(mqtt_v_t);
}
/* fixed header */
MQTTHDR_MSGINIT(hdr);
hdr->mqtt_msg.type = cmd;
*hdr->mqtt_len = sizeof(mqtt_v_t);
/* MessageID */
v->val = htons(msgID);
return siz;
}
/*
* mqtt_msgPUBACK() Create PUBACK message
*
* @buf = Message buffer
* @msgID = MessageID
* return: -1 error or >-1 message size for send
*/
inline int
mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
{
return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
}
/*
* mqtt_msgPUBREC() Create PUBREC message
*
* @buf = Message buffer
* @msgID = MessageID
* return: -1 error or >-1 message size for send
*/
inline int
mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
{
return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
}
/*
* mqtt_msgPUBREL() Create PUBREL message
*
* @buf = Message buffer
* @msgID = MessageID
* return: -1 error or >-1 message size for send
*/
inline int
mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
{
return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
}
/*
* mqtt_msgPUBCOMP() Create PUBCOMP message
*
* @buf = Message buffer
* @msgID = MessageID
* return: -1 error or >-1 message size for send
*/
inline int
mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
{
return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
}
/* ============= decode ============ */
/*
* mqtt_readPUBLISH() Read PUBLISH message
*
* @buf = Message buffer
* @psTopic = Topic
* @topicLen = Topic length
* @msgID = MessageID
* @pData = Data buffer
* @datLen = Data buffer length, if *datLen == 0 allocate memory for pData
* return: NULL error or !=NULL MQTT fixed header
*/
struct mqtthdr *
mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,
u_short *msgID, void * __restrict pData, int *datLen)
{
int len, ret;
struct mqtthdr *hdr;
mqtthdr_var_t *var;
mqtt_v_t *v;
caddr_t pos;
if (!buf || !psTopic || !msgID || !pData)
return NULL;
hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
if (!hdr)
return NULL;
pos = buf->msg_base + ret + 1;
var = (mqtthdr_var_t*) pos;
/* topic */
len -= MQTTHDR_VAR_SIZEOF(var);
if (len < 0) {
mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
return NULL;
} else {
memset(psTopic, 0, topicLen--);
memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?
topicLen : ntohs(var->var_sb.val));
pos += MQTTHDR_VAR_SIZEOF(var);
v = (mqtt_v_t*) pos;
}
len -= sizeof(mqtt_v_t);
if (len < 0) {
mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
return NULL;
} else {
*msgID = ntohs(v->val);
pos += sizeof(mqtt_v_t);
}
/* data */
if (len < 0) {
mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
return NULL;
} else {
if (!*datLen) {
if (!(pData = malloc(len))) {
LOGERR;
return NULL;
} else
*datLen = len;
}
memset(pData, 0, *datLen);
if (len < *datLen)
*datLen = len;
memcpy(pData, pos, *datLen);
}
return hdr;
}
/*
* mqtt_readPUBACK() Read PUBACK message
*
* @buf = Message buffer
* return: -1 error or MessageID
*/
u_short
mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
{
int len, ret;
struct mqtthdr *hdr;
mqtt_v_t *v;
caddr_t pos;
hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
if (!hdr)
return (u_short) -1;
if (len < sizeof(mqtt_v_t)) {
mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
return (u_short) -1;
} else {
pos = buf->msg_base + ret + 1;
v = (mqtt_v_t*) pos;
}
return ntohs(v->val);
}
/*
* mqtt_readPUBREC() Read PUBREC message
*
* @buf = Message buffer
* return: -1 error or MessageID
*/
u_short
mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
{
int len, ret;
struct mqtthdr *hdr;
mqtt_v_t *v;
caddr_t pos;
hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
if (!hdr)
return (u_short) -1;
if (len < sizeof(mqtt_v_t)) {
mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
return (u_short) -1;
} else {
pos = buf->msg_base + ret + 1;
v = (mqtt_v_t*) pos;
}
return ntohs(v->val);
}
/*
* mqtt_readPUBREL() Read PUBREL message
*
* @buf = Message buffer
* return: -1 error or MessageID
*/
u_short
mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
{
int len, ret;
struct mqtthdr *hdr;
mqtt_v_t *v;
caddr_t pos;
hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
if (!hdr)
return (u_short) -1;
if (len < sizeof(mqtt_v_t)) {
mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
return (u_short) -1;
} else {
pos = buf->msg_base + ret + 1;
v = (mqtt_v_t*) pos;
}
return ntohs(v->val);
}
/*
* mqtt_readPUBCOMP() Read PUBCOMP message
*
* @buf = Message buffer
* return: -1 error or MessageID
*/
u_short
mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
{
int len, ret;
struct mqtthdr *hdr;
mqtt_v_t *v;
caddr_t pos;
hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
if (!hdr)
return (u_short) -1;
if (len < sizeof(mqtt_v_t)) {
mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
return (u_short) -1;
} else {
pos = buf->msg_base + ret + 1;
v = (mqtt_v_t*) pos;
}
return ntohs(v->val);
}
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>