--- libaitmqtt/src/Attic/read.c 2022/09/14 18:36:23 1.1.2.1 +++ libaitmqtt/src/Attic/read.c 2022/09/15 13:50:14 1.1.2.2 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: read.c,v 1.1.2.1 2022/09/14 18:36:23 misho Exp $ +* $Id: read.c,v 1.1.2.2 2022/09/15 13:50:14 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -500,6 +500,233 @@ mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf) caddr_t pos; hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &len, &pos); + if (!hdr) + return (u_short) -1; + if (len < sizeof(mqtt_len_t)) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return (u_short) -1; + } else + v = (mqtt_len_t*) pos; + + return ntohs(v->val); +} + +/* + * mqtt_readSUBSCRIBE() Read SUBSCRIBE message + * + * @buf = Message buffe + * @msgID = MessageID + * @subscr = Subscriptions, must be free after use with mqtt_subFree() + * return: -1 error or >-1 elements into subscr + */ +int +mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr) +{ + register int i; + int len; + struct mqtthdr *hdr; + mqtthdr_var_t *var; + mqtt_subscr_t *subs; + mqtt_len_t *v; + caddr_t pos; + + if (!buf || !msgID || !subscr) + return -1; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &len, &pos); + if (!hdr) + return -1; + else + v = (mqtt_len_t*) pos; + + /* MessageID */ + len -= sizeof(mqtt_len_t); + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else { + *msgID = ntohs(v->val); + pos += sizeof(mqtt_len_t); + } + + subs = mqtt_subAlloc(0); + if (!subs) + return -1; + else + *subscr = subs; + + /* Subscribes */ + for (i = 0; len > 0; i++) { + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var) + 1; + if (len < 0) { + mqtt_subFree(subscr); + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } + if (!mqtt_subRealloc(&subs, i + 1)) { + mqtt_subFree(subscr); + return -1; + } else + *subscr = subs; + + memset(&subs[i], 0, sizeof subs[i]); + subs[i].sub_topic.msg_len = ntohs(var->var_sb.val); + subs[i].sub_topic.msg_base = e_malloc(subs[i].sub_topic.msg_len + 1); + if (!subs[i].sub_topic.msg_base) { + LOGERR; + mqtt_subFree(subscr); + return -1; + } else { + memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len); + ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0; + } + pos += MQTTHDR_VAR_SIZEOF(var); + + subs[i].sub_qos = *pos; + pos++; + } + + return i; +} + +/* + * mqtt_readSUBACK() Read SUBACK message + * + * @buf = Message buffer + * @msgID = MessageID + * @subqos = Subscribes QoS, must be free after use with e_free() + * return: -1 error or >-1 readed subscribes QoS elements + */ +int +mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos) +{ + int len; + struct mqtthdr *hdr; + mqtt_len_t *v; + caddr_t pos; + + if (!buf || !msgID || !subqos) + return -1; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &len, &pos); + if (!hdr) + return -1; + else + v = (mqtt_len_t*) pos; + + /* MessageID */ + len -= sizeof(mqtt_len_t); + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else { + *msgID = ntohs(v->val); + pos += sizeof(mqtt_len_t); + } + + /* Subscribes */ + *subqos = e_malloc(len); + if (!*subqos) { + LOGERR; + return -1; + } else + memcpy(*subqos, pos, len); + + return len; +} + +/* + * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message + * + * @buf = Message buffer + * @msgID = MessageID + * @subscr = Subscriptions, must be free after use with mqtt_subFree() + * return: -1 error or >-1 elements into subscr + */ +int +mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr) +{ + register int i; + int len; + struct mqtthdr *hdr; + mqtthdr_var_t *var; + mqtt_subscr_t *subs; + mqtt_len_t *v; + caddr_t pos; + + if (!buf || !msgID || !subscr) + return -1; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &len, &pos); + if (!hdr) + return -1; + else + v = (mqtt_len_t*) pos; + + /* MessageID */ + len -= sizeof(mqtt_len_t); + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else { + *msgID = ntohs(v->val); + pos += sizeof(mqtt_len_t); + } + + subs = mqtt_subAlloc(0); + if (!subs) + return -1; + else + *subscr = subs; + + /* Subscribes */ + for (i = 0; len > 0; i++) { + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var); + if (len < 0) { + mqtt_subFree(subscr); + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } + if (!mqtt_subRealloc(&subs, i + 1)) { + mqtt_subFree(subscr); + return -1; + } else + *subscr = subs; + + memset(&subs[i], 0, sizeof subs[i]); + subs[i].sub_topic.msg_len = ntohs(var->var_sb.val); + subs[i].sub_topic.msg_base = e_malloc(subs[i].sub_topic.msg_len + 1); + if (!subs[i].sub_topic.msg_base) { + LOGERR; + mqtt_subFree(subscr); + return -1; + } else { + memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len); + ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0; + } + pos += MQTTHDR_VAR_SIZEOF(var); + } + + return i; +} + +/* + * mqtt_readUNSUBACK() Read UNSUBACK message + * + * @buf = Message buffer + * return: -1 error or MessageID + */ +u_short +mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf) +{ + int len; + struct mqtthdr *hdr; + mqtt_len_t *v; + caddr_t pos; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &len, &pos); if (!hdr) return (u_short) -1; if (len < sizeof(mqtt_len_t)) {