--- libaitmqtt/src/Attic/read.c 2022/09/14 18:36:23 1.1 +++ libaitmqtt/src/Attic/read.c 2022/09/15 13:50:14 1.1.2.2 @@ -0,0 +1,739 @@ +/************************************************************************* +* (C) 2022 AITNET ltd - Sofia/Bulgaria - +* by Michael Pounov +* +* $Author: misho $ +* $Id: read.c,v 1.1.2.2 2022/09/15 13:50:14 misho Exp $ +* +************************************************************************** +The ELWIX and AITNET software is distributed under the following +terms: + +All of the documentation and software included in the ELWIX and AITNET +Releases is copyrighted by ELWIX - Sofia/Bulgaria + +Copyright 2004 - 2022 + by Michael Pounov . All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. All advertising materials mentioning features or use of this software + must display the following acknowledgement: +This product includes software developed by Michael Pounov +ELWIX - Embedded LightWeight unIX and its contributors. +4. Neither the name of AITNET nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +SUCH DAMAGE. +*/ +#include "global.h" + + +/* _mqtt_readHEADER() read fixed header from MQTT message */ +static struct mqtthdr * +_mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *len, caddr_t *next) +{ + struct mqtthdr *hdr; + int bytes; + + if (!buf || !buf->msg_base || !buf->msg_len) + return NULL; + + hdr = (struct mqtthdr*) buf->msg_base; + if (hdr->mqtt_msg.type != cmd) { + mqtt_SetErr(EINVAL, "Error:: wrong command #%d should be %d", + hdr->mqtt_msg.type, cmd); + return NULL; + } + + if (len) + *len = mqtt_decodeLen(hdr->mqtt_len, &bytes); + + if (next) + *next = buf->msg_base + bytes + 1; + + return hdr; +} + + +/* + * mqtt_readCONNECT() Read elements from CONNECT message + * + * @buf = Message buffer + * @KASec = Keep Alive in seconds for current connection + * @psConnID = ConnectID + * @connLen = ConnectID length + * @psUser = Username if !=NULL + * @userLen = Username length + * @psPass = Password for Username, only if csUser is set + * @passLen = Password length + * @psWillTopic = Will Topic if !=NULL Will Flags set into message and must be e_free() + * @psWillMessage = Will Message, may be NULL if !NULL must be e_free() after use! + * return: .reserved == 1 is error or == 0 connection flags & msg ok + */ +mqtthdr_connack_t +mqtt_readCONNECT(mqtt_msg_t * __restrict buf, u_short *KASec, char * __restrict psConnID, int connLen, + char * __restrict psUser, int userLen, char * __restrict psPass, int passLen, + char ** __restrict psWillTopic, char ** __restrict psWillMessage) +{ + mqtthdr_connflgs_t flg = { MQTT_CONNFLGS_INIT }; + mqtthdr_connack_t cack = { 1, MQTT_RETCODE_DENIED }; + struct mqtthdr *hdr; + mqtthdr_var_t *var; + mqtt_len_t *ka; + int len; + caddr_t pos; + + if (!buf || !buf->msg_base || !buf->msg_len || !psConnID || !connLen) + return cack; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_CONNECT, &len, &pos); + if (!hdr) + return cack; + if (len < 12) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return cack; + } else + var = (mqtthdr_var_t*) pos; + + /* check init string & protocol */ + if (var->var_sb.sb.l == 4 && !strcmp((char*) var->var_data, MQTT_PROTO_STR)) + pos += var->var_sb.sb.l + sizeof(mqtt_len_t); + else if (var->var_sb.sb.l == 6 || strcmp((char*) var->var_data, MQTT_CONN_STR)) + pos += var->var_sb.sb.l + sizeof(mqtt_len_t); + else { + mqtt_SetErr(EINVAL, "Invalid init string %.6s(%d)", + var->var_data, var->var_sb.sb.l); + cack.retcode = MQTT_RETCODE_REFUSE_UNAVAIL; + return cack; + } + switch (*pos) { + case MQTT_PROTO_VER_3: + case MQTT_PROTO_VER_311: + case MQTT_PROTO_VER_5: + pos++; + break; + default: + mqtt_SetErr(EINVAL, "Invalid protocol version %d", *pos); + cack.retcode = MQTT_RETCODE_REFUSE_VER; + return cack; + } + flg = *(mqtthdr_connflgs_t*) pos; + pos++; + ka = (mqtt_len_t*) pos; + *KASec = ntohs(ka->val); + pos += sizeof(mqtt_len_t); + + len -= pos - (caddr_t) var; + + /* get ConnID */ + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var); + if (len < 0 || var->var_sb.sb.l >= MQTT_CONNID_MAX) { + mqtt_SetErr(EINVAL, "Unexpected EOM at Connection ID %d", len); + cack.retcode = MQTT_RETCODE_REFUSE_ID; + return cack; + } else { + memset(psConnID, 0, connLen--); + memcpy(psConnID, var->var_data, + ntohs(var->var_sb.val) > connLen ? connLen : ntohs(var->var_sb.val)); + pos += MQTTHDR_VAR_SIZEOF(var); + } + + /* get Willz */ + if (flg.will_flg) { + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var); + if (len < 0) { + mqtt_SetErr(EINVAL, "Unexpected EOM at Will Topic %d", len); + cack.retcode = MQTT_RETCODE_REFUSE_ID; + return cack; + } else { + if (psWillTopic) { + *psWillTopic = e_malloc(ntohs(var->var_sb.val) + 1); + if (!*psWillTopic) { + LOGERR; + cack.retcode = MQTT_RETCODE_REFUSE_UNAVAIL; + return cack; + } else + memset(*psWillTopic, 0, ntohs(var->var_sb.val) + 1); + memcpy(*psWillTopic, var->var_data, ntohs(var->var_sb.val)); + } + pos += MQTTHDR_VAR_SIZEOF(var); + } + + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var); + if (len < 0) { + mqtt_SetErr(EINVAL, "Unexpected EOM at Will Message %d", len); + e_free(psWillTopic); + cack.retcode = MQTT_RETCODE_REFUSE_ID; + return cack; + } else { + if (psWillMessage) { + *psWillMessage = e_malloc(ntohs(var->var_sb.val) + 1); + if (!*psWillMessage) { + LOGERR; + e_free(psWillTopic); + cack.retcode = MQTT_RETCODE_REFUSE_UNAVAIL; + return cack; + } else + memset(*psWillMessage, 0, ntohs(var->var_sb.val) + 1); + memcpy(*psWillMessage, var->var_data, ntohs(var->var_sb.val)); + } + pos += MQTTHDR_VAR_SIZEOF(var); + } + } + + /* get User/Pass */ + if (flg.username) { + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var); + if (len < 0 || var->var_sb.sb.l > 12) { + mqtt_SetErr(EINVAL, "Unexpected EOM at Username %d", len); + if (flg.will_flg) { + if (psWillTopic) + e_free(psWillTopic); + if (psWillMessage) + e_free(psWillMessage); + } + cack.retcode = MQTT_RETCODE_REFUSE_USERPASS; + return cack; + } else { + if (psUser && userLen) { + memset(psUser, 0, userLen--); + memcpy(psUser, var->var_data, + ntohs(var->var_sb.val) > userLen ? userLen : ntohs(var->var_sb.val)); + } + pos += MQTTHDR_VAR_SIZEOF(var); + } + } + if (flg.password) { + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var); + if (len < 0 || var->var_sb.sb.l > 12) { + mqtt_SetErr(EINVAL, "Unexpected EOM at Password %d", len); + if (flg.will_flg) { + if (psWillTopic) + e_free(psWillTopic); + if (psWillMessage) + e_free(psWillMessage); + } + cack.retcode = MQTT_RETCODE_REFUSE_USERPASS; + return cack; + } else { + if (psPass && passLen) { + memset(psPass, 0, passLen--); + memcpy(psPass, var->var_data, + ntohs(var->var_sb.val) > passLen ? passLen : ntohs(var->var_sb.val)); + } + pos += MQTTHDR_VAR_SIZEOF(var); + } + } + + flg.reserved = 0; + cack.reserved = flg.flags; + cack.retcode = MQTT_RETCODE_ACCEPTED; + return cack; +} + +/* + * mqtt_readCONNACK() Read CONNACK message + * + * @buf = Message buffer + * return: -1 error or >-1 CONNECT message return code + */ +u_char +mqtt_readCONNACK(mqtt_msg_t * __restrict buf) +{ + int len; + struct mqtthdr *hdr; + mqtthdr_connack_t *ack; + caddr_t pos; + + if (!buf || !buf->msg_base || !buf->msg_len) + return (u_char) -1; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_CONNACK, &len, &pos); + if (!hdr) + return (u_char) -1; + if (len < sizeof(mqtthdr_connack_t)) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return (u_char) -1; + } else + ack = (mqtthdr_connack_t*) pos; + + if (ack->retcode > MQTT_RETCODE_DENIED) { + mqtt_SetErr(EINVAL, "Invalid retcode %u", ack->retcode); + return (u_char) -1; + } + + return ack->retcode; +} + +/* + * mqtt_readDISCONNECT() Read DISCONNECT message + * + * @buf = Message buffer + * return: -1 error, 0 ok, >0 undefined result + */ +int +mqtt_readDISCONNECT(mqtt_msg_t * __restrict buf) +{ + int len; + + if (!_mqtt_readHEADER(buf, MQTT_TYPE_DISCONNECT, &len, NULL)) + return -1; + + return len; +} + +/* + * mqtt_readPINGREQ() Read PINGREQ message + * + * @buf = Message buffer + * return: -1 error, 0 ok, >0 undefined result + */ +int +mqtt_readPINGREQ(mqtt_msg_t * __restrict buf) +{ + int len; + + if (!_mqtt_readHEADER(buf, MQTT_TYPE_PINGREQ, &len, NULL)) + return -1; + + return len; +} + +/* + * mqtt_readPINGRESP() Read PINGRESP message + * + * @buf = Message buffer + * return: -1 error, 0 ok, >0 undefined result + */ +int +mqtt_readPINGRESP(mqtt_msg_t * __restrict buf) +{ + int len; + + if (!_mqtt_readHEADER(buf, MQTT_TYPE_PINGRESP, &len, NULL)) + return -1; + + return len; +} + +/* + * mqtt_readPUBLISH() Read PUBLISH message + * + * @buf = Message buffer + * @psTopic = Topic + * @topicLen = Topic length + * @msgID = MessageID + * @pData = Data buffer, may be NULL + * return: -1 error or !=-1 allocated data buffer length + */ +int +mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen, + u_short *msgID, void ** __restrict pData) +{ + int len; + struct mqtthdr *hdr; + mqtthdr_var_t *var; + mqtt_len_t *v; + caddr_t pos; + + if (!buf || !psTopic || !msgID) + return -1; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &len, &pos); + if (!hdr) + return -1; + else + var = (mqtthdr_var_t*) pos; + + /* topic */ + len -= MQTTHDR_VAR_SIZEOF(var); + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else { + memset(psTopic, 0, topicLen--); + memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ? + topicLen : ntohs(var->var_sb.val)); + pos += MQTTHDR_VAR_SIZEOF(var); + v = (mqtt_len_t*) pos; + } + + len -= sizeof(mqtt_len_t); + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else { + *msgID = ntohs(v->val); + pos += sizeof(mqtt_len_t); + } + + /* data */ + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else if (pData) { + if (!(*pData = e_malloc(len + 1))) { + LOGERR; + return -1; + } else + ((char*) (*pData))[len] = 0; + + memcpy(*pData, pos, len); + } + + return len; +} + +/* + * mqtt_readPUBACK() Read PUBACK message + * + * @buf = Message buffer + * return: -1 error or MessageID + */ +u_short +mqtt_readPUBACK(mqtt_msg_t * __restrict buf) +{ + int len; + struct mqtthdr *hdr; + mqtt_len_t *v; + caddr_t pos; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &len, &pos); + if (!hdr) + return (u_short) -1; + if (len < sizeof(mqtt_len_t)) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return (u_short) -1; + } else + v = (mqtt_len_t*) pos; + + return ntohs(v->val); +} + +/* + * mqtt_readPUBREC() Read PUBREC message + * + * @buf = Message buffer + * return: -1 error or MessageID + */ +u_short +mqtt_readPUBREC(mqtt_msg_t * __restrict buf) +{ + int len; + struct mqtthdr *hdr; + mqtt_len_t *v; + caddr_t pos; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &len, &pos); + if (!hdr) + return (u_short) -1; + if (len < sizeof(mqtt_len_t)) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return (u_short) -1; + } else + v = (mqtt_len_t*) pos; + + return ntohs(v->val); +} + +/* + * mqtt_readPUBREL() Read PUBREL message + * + * @buf = Message buffer + * return: -1 error or MessageID + */ +u_short +mqtt_readPUBREL(mqtt_msg_t * __restrict buf) +{ + int len; + struct mqtthdr *hdr; + mqtt_len_t *v; + caddr_t pos; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &len, &pos); + if (!hdr) + return (u_short) -1; + if (len < sizeof(mqtt_len_t)) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return (u_short) -1; + } else + v = (mqtt_len_t*) pos; + + return ntohs(v->val); +} + +/* + * mqtt_readPUBCOMP() Read PUBCOMP message + * + * @buf = Message buffer + * return: -1 error or MessageID + */ +u_short +mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf) +{ + int len; + struct mqtthdr *hdr; + mqtt_len_t *v; + caddr_t pos; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &len, &pos); + if (!hdr) + return (u_short) -1; + if (len < sizeof(mqtt_len_t)) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return (u_short) -1; + } else + v = (mqtt_len_t*) pos; + + return ntohs(v->val); +} + +/* + * mqtt_readSUBSCRIBE() Read SUBSCRIBE message + * + * @buf = Message buffe + * @msgID = MessageID + * @subscr = Subscriptions, must be free after use with mqtt_subFree() + * return: -1 error or >-1 elements into subscr + */ +int +mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr) +{ + register int i; + int len; + struct mqtthdr *hdr; + mqtthdr_var_t *var; + mqtt_subscr_t *subs; + mqtt_len_t *v; + caddr_t pos; + + if (!buf || !msgID || !subscr) + return -1; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &len, &pos); + if (!hdr) + return -1; + else + v = (mqtt_len_t*) pos; + + /* MessageID */ + len -= sizeof(mqtt_len_t); + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else { + *msgID = ntohs(v->val); + pos += sizeof(mqtt_len_t); + } + + subs = mqtt_subAlloc(0); + if (!subs) + return -1; + else + *subscr = subs; + + /* Subscribes */ + for (i = 0; len > 0; i++) { + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var) + 1; + if (len < 0) { + mqtt_subFree(subscr); + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } + if (!mqtt_subRealloc(&subs, i + 1)) { + mqtt_subFree(subscr); + return -1; + } else + *subscr = subs; + + memset(&subs[i], 0, sizeof subs[i]); + subs[i].sub_topic.msg_len = ntohs(var->var_sb.val); + subs[i].sub_topic.msg_base = e_malloc(subs[i].sub_topic.msg_len + 1); + if (!subs[i].sub_topic.msg_base) { + LOGERR; + mqtt_subFree(subscr); + return -1; + } else { + memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len); + ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0; + } + pos += MQTTHDR_VAR_SIZEOF(var); + + subs[i].sub_qos = *pos; + pos++; + } + + return i; +} + +/* + * mqtt_readSUBACK() Read SUBACK message + * + * @buf = Message buffer + * @msgID = MessageID + * @subqos = Subscribes QoS, must be free after use with e_free() + * return: -1 error or >-1 readed subscribes QoS elements + */ +int +mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos) +{ + int len; + struct mqtthdr *hdr; + mqtt_len_t *v; + caddr_t pos; + + if (!buf || !msgID || !subqos) + return -1; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &len, &pos); + if (!hdr) + return -1; + else + v = (mqtt_len_t*) pos; + + /* MessageID */ + len -= sizeof(mqtt_len_t); + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else { + *msgID = ntohs(v->val); + pos += sizeof(mqtt_len_t); + } + + /* Subscribes */ + *subqos = e_malloc(len); + if (!*subqos) { + LOGERR; + return -1; + } else + memcpy(*subqos, pos, len); + + return len; +} + +/* + * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message + * + * @buf = Message buffer + * @msgID = MessageID + * @subscr = Subscriptions, must be free after use with mqtt_subFree() + * return: -1 error or >-1 elements into subscr + */ +int +mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr) +{ + register int i; + int len; + struct mqtthdr *hdr; + mqtthdr_var_t *var; + mqtt_subscr_t *subs; + mqtt_len_t *v; + caddr_t pos; + + if (!buf || !msgID || !subscr) + return -1; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &len, &pos); + if (!hdr) + return -1; + else + v = (mqtt_len_t*) pos; + + /* MessageID */ + len -= sizeof(mqtt_len_t); + if (len < 0) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } else { + *msgID = ntohs(v->val); + pos += sizeof(mqtt_len_t); + } + + subs = mqtt_subAlloc(0); + if (!subs) + return -1; + else + *subscr = subs; + + /* Subscribes */ + for (i = 0; len > 0; i++) { + var = (mqtthdr_var_t*) pos; + len -= MQTTHDR_VAR_SIZEOF(var); + if (len < 0) { + mqtt_subFree(subscr); + mqtt_SetErr(EINVAL, "Short message length %d", len); + return -1; + } + if (!mqtt_subRealloc(&subs, i + 1)) { + mqtt_subFree(subscr); + return -1; + } else + *subscr = subs; + + memset(&subs[i], 0, sizeof subs[i]); + subs[i].sub_topic.msg_len = ntohs(var->var_sb.val); + subs[i].sub_topic.msg_base = e_malloc(subs[i].sub_topic.msg_len + 1); + if (!subs[i].sub_topic.msg_base) { + LOGERR; + mqtt_subFree(subscr); + return -1; + } else { + memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len); + ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0; + } + pos += MQTTHDR_VAR_SIZEOF(var); + } + + return i; +} + +/* + * mqtt_readUNSUBACK() Read UNSUBACK message + * + * @buf = Message buffer + * return: -1 error or MessageID + */ +u_short +mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf) +{ + int len; + struct mqtthdr *hdr; + mqtt_len_t *v; + caddr_t pos; + + hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &len, &pos); + if (!hdr) + return (u_short) -1; + if (len < sizeof(mqtt_len_t)) { + mqtt_SetErr(EINVAL, "Short message length %d", len); + return (u_short) -1; + } else + v = (mqtt_len_t*) pos; + + return ntohs(v->val); +}