--- libaitmqtt/src/sub.c 2022/09/15 13:50:14 1.3.12.2 +++ libaitmqtt/src/sub.c 2022/09/16 04:14:03 1.3.12.4 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: sub.c,v 1.3.12.2 2022/09/15 13:50:14 misho Exp $ +* $Id: sub.c,v 1.3.12.4 2022/09/16 04:14:03 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -51,13 +51,10 @@ SUCH DAMAGE. * * @Topics = MQTT subscription topics * @msgID = MessageID - * @Dup = Duplicate message - * @QOS = QoS * return: NULL error or allocated SUBSCRIBE message */ mqtt_msg_t * -mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, u_short msgID, - u_char Dup, u_char QOS) +mqtt_msgSUBSCRIBE(mqtt_subscr_t * __restrict Topics, u_short msgID) { int len, siz; u_int n, *l; @@ -65,25 +62,20 @@ mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, mqtthdr_var_t *topic; mqtt_len_t *mid; mqtt_subscr_t *t; - u_char *qos; void *data; mqtt_msg_t *msg = NULL; - if (!Topics || !*Topics) + if (!Topics) return NULL; - if (QOS > MQTT_QOS_EXACTLY) { - mqtt_SetErr(EINVAL, "Invalid QoS parameter"); - return NULL; - } - if (!msgID && QOS != MQTT_QOS_ONCE) { + if (!msgID) { mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0"); return NULL; } /* calculate message size */ len = sizeof(mqtt_len_t); /* msgid */ - for (t = *Topics; t && t->sub_topic.msg_base; t++) /* subscribes & qos */ - len += sizeof(mqtt_len_t) + t->sub_topic.msg_len; + for (t = Topics; t && t->sub_topic.msg_base; t++) /* subscribes & qos */ + len += sizeof(mqtt_len_t) + t->sub_topic.msg_len + 1; /* calculate header size */ siz = sizeof(struct mqtthdr); /* mqtt fixed header */ @@ -99,9 +91,7 @@ mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, /* fixed header */ hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE; - hdr->mqtt_msg.qos = QOS; - hdr->mqtt_msg.dup = Dup ? 1 : 0; - hdr->mqtt_msg.retain = 0; + hdr->mqtt_msg.qos = MQTT_QOS_ACK; l = (u_int*) hdr->mqtt_len; *l = n; data += siz; @@ -112,13 +102,13 @@ mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, data += sizeof(mqtt_len_t); /* payload with subscriptions */ - for (t = *Topics; t && t->sub_topic.msg_base; t++) { + for (t = Topics; t && t->sub_topic.msg_base; t++) { topic = (mqtthdr_var_t*) data; topic->var_sb.val = htons(t->sub_topic.msg_len); memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val)); data += MQTTHDR_VAR_SIZEOF(topic); - qos = data++; - *qos = t->sub_qos; + *((char*) data) = t->sub_qos; + data++; } return msg; @@ -132,7 +122,7 @@ mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, * return: NULL error or allocated SUBACK message */ mqtt_msg_t * -mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_short msgID) +mqtt_msgSUBACK(mqtt_subscr_t * __restrict Topics, u_short msgID) { int siz = 0; struct mqtthdr *hdr; @@ -141,7 +131,7 @@ mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s u_char *qos; mqtt_msg_t *msg = NULL; - if (!Topics || !*Topics) + if (!Topics) return NULL; if (!(msg = mqtt_msgAlloc(MQTTMSG_MAX))) @@ -157,7 +147,7 @@ mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s v->val = htons(msgID); /* QoS payload from subscriptions */ - for (t = *Topics; t && t->sub_topic.msg_base; t++, siz++) { + for (t = Topics; t && t->sub_topic.msg_base; t++, siz++) { qos = (msg->msg_base + siz); *qos = t->sub_qos; } @@ -179,7 +169,7 @@ mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s * return: NULL error or allocated UNSUBSCRIBE message */ mqtt_msg_t * -mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, u_short msgID, +mqtt_msgUNSUBSCRIBE(mqtt_subscr_t * __restrict Topics, u_short msgID, u_char Dup, u_char QOS) { int len, siz = 0; @@ -191,7 +181,7 @@ mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics void *data; mqtt_msg_t *msg = NULL; - if (!Topics || !*Topics) + if (!Topics) return NULL; if (QOS > MQTT_QOS_EXACTLY) { mqtt_SetErr(EINVAL, "Invalid QoS parameter"); @@ -204,7 +194,7 @@ mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics /* calculate message size */ len = sizeof(mqtt_len_t); /* msgid */ - for (t = *Topics; t && t->sub_topic.msg_base; t++) /* subscribes */ + for (t = Topics; t && t->sub_topic.msg_base; t++) /* subscribes */ len += sizeof(mqtt_len_t) + t->sub_topic.msg_len; /* calculate header size */ @@ -234,7 +224,7 @@ mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics data += sizeof(mqtt_len_t); /* payload with subscriptions */ - for (t = *Topics; t && t->sub_topic.msg_base; t++) { + for (t = Topics; t && t->sub_topic.msg_base; t++) { topic = (mqtthdr_var_t*) data; topic->var_sb.val = htons(t->sub_topic.msg_len); memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));