Diff for /libaitmqtt/src/sub.c between versions 1.3.12.1 and 1.3.12.4

version 1.3.12.1, 2022/09/14 21:44:21 version 1.3.12.4, 2022/09/16 04:14:03
Line 51  SUCH DAMAGE. Line 51  SUCH DAMAGE.
  *   *
  * @Topics = MQTT subscription topics   * @Topics = MQTT subscription topics
  * @msgID = MessageID   * @msgID = MessageID
  * @Dup = Duplicate message  
  * @QOS = QoS  
  * return: NULL error or allocated SUBSCRIBE message   * return: NULL error or allocated SUBSCRIBE message
  */   */
 mqtt_msg_t *  mqtt_msg_t *
mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, u_short msgIDmqtt_msgSUBSCRIBE(mqtt_subscr_t * __restrict Topics, u_short msgID)
                u_char Dup, u_char QOS) 
 {  {
         int len, siz;          int len, siz;
         u_int n, *l;          u_int n, *l;
Line 65  mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics,  Line 62  mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, 
         mqtthdr_var_t *topic;          mqtthdr_var_t *topic;
         mqtt_len_t *mid;          mqtt_len_t *mid;
         mqtt_subscr_t *t;          mqtt_subscr_t *t;
         u_char *qos;  
         void *data;          void *data;
         mqtt_msg_t *msg = NULL;          mqtt_msg_t *msg = NULL;
   
        if (!Topics || !*Topics)        if (!Topics)
                 return NULL;                  return NULL;
        if (QOS > MQTT_QOS_EXACTLY) {        if (!msgID) {
                mqtt_SetErr(EINVAL, "Invalid QoS parameter"); 
                return NULL; 
        } 
        if (!msgID && QOS != MQTT_QOS_ONCE) { 
                 mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");                  mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
                 return NULL;                  return NULL;
         }          }
   
         /* calculate message size */          /* calculate message size */
         len = sizeof(mqtt_len_t);                               /* msgid */          len = sizeof(mqtt_len_t);                               /* msgid */
        for (t = *Topics; t && t->sub_topic.msg_base; t++)        /* subscribes & qos */        for (t = Topics; t && t->sub_topic.msg_base; t++)        /* subscribes & qos */
                len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;                len += sizeof(mqtt_len_t) + t->sub_topic.msg_len + 1;
   
         /* calculate header size */          /* calculate header size */
         siz = sizeof(struct mqtthdr);                           /* mqtt fixed header */          siz = sizeof(struct mqtthdr);                           /* mqtt fixed header */
Line 99  mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics,  Line 91  mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, 
   
         /* fixed header */          /* fixed header */
         hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;          hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
        hdr->mqtt_msg.qos = QOS;        hdr->mqtt_msg.qos = MQTT_QOS_ACK;
        hdr->mqtt_msg.dup = Dup ? 1 : 0; 
        hdr->mqtt_msg.retain = 0; 
         l = (u_int*) hdr->mqtt_len;          l = (u_int*) hdr->mqtt_len;
         *l = n;          *l = n;
         data += siz;          data += siz;
Line 112  mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics,  Line 102  mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, 
         data += sizeof(mqtt_len_t);          data += sizeof(mqtt_len_t);
   
         /* payload with subscriptions */          /* payload with subscriptions */
        for (t = *Topics; t && t->sub_topic.msg_base; t++) {        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                 topic = (mqtthdr_var_t*) data;                  topic = (mqtthdr_var_t*) data;
                 topic->var_sb.val = htons(t->sub_topic.msg_len);                  topic->var_sb.val = htons(t->sub_topic.msg_len);
                 memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));                  memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                 data += MQTTHDR_VAR_SIZEOF(topic);                  data += MQTTHDR_VAR_SIZEOF(topic);
                qos = data++;                *((char*) data) = t->sub_qos;
                *qos = t->sub_ret;                data++;
         }          }
   
         return msg;          return msg;
Line 132  mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics,  Line 122  mqtt_msgSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, 
  * return: NULL error or allocated SUBACK message   * return: NULL error or allocated SUBACK message
  */   */
 mqtt_msg_t *  mqtt_msg_t *
mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_short msgID)mqtt_msgSUBACK(mqtt_subscr_t * __restrict Topics, u_short msgID)
 {  {
         int siz = 0;          int siz = 0;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
Line 141  mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s Line 131  mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s
         u_char *qos;          u_char *qos;
         mqtt_msg_t *msg = NULL;          mqtt_msg_t *msg = NULL;
   
        if (!Topics || !*Topics)        if (!Topics)
                 return NULL;                  return NULL;
   
         if (!(msg = mqtt_msgAlloc(MQTTMSG_MAX)))          if (!(msg = mqtt_msgAlloc(MQTTMSG_MAX)))
Line 157  mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s Line 147  mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s
         v->val = htons(msgID);          v->val = htons(msgID);
   
         /* QoS payload from subscriptions */          /* QoS payload from subscriptions */
        for (t = *Topics; t && t->sub_topic.msg_base; t++, siz++) {        for (t = Topics; t && t->sub_topic.msg_base; t++, siz++) {
                 qos = (msg->msg_base + siz);                  qos = (msg->msg_base + siz);
                *qos = t->sub_ret;                *qos = t->sub_qos;
         }          }
   
         /* fixed header */          /* fixed header */
Line 179  mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s Line 169  mqtt_msgSUBACK(mqtt_subscr_t ** __restrict Topics, u_s
  * return: NULL error or allocated UNSUBSCRIBE message   * return: NULL error or allocated UNSUBSCRIBE message
  */   */
 mqtt_msg_t *  mqtt_msg_t *
mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics, u_short msgID, mqtt_msgUNSUBSCRIBE(mqtt_subscr_t * __restrict Topics, u_short msgID, 
                 u_char Dup, u_char QOS)                  u_char Dup, u_char QOS)
 {  {
         int len, siz = 0;          int len, siz = 0;
Line 191  mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics Line 181  mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics
         void *data;          void *data;
         mqtt_msg_t *msg = NULL;          mqtt_msg_t *msg = NULL;
   
        if (!Topics || !*Topics)        if (!Topics)
                 return NULL;                  return NULL;
         if (QOS > MQTT_QOS_EXACTLY) {          if (QOS > MQTT_QOS_EXACTLY) {
                 mqtt_SetErr(EINVAL, "Invalid QoS parameter");                  mqtt_SetErr(EINVAL, "Invalid QoS parameter");
Line 204  mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics Line 194  mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics
   
         /* calculate message size */          /* calculate message size */
         len = sizeof(mqtt_len_t);                               /* msgid */          len = sizeof(mqtt_len_t);                               /* msgid */
        for (t = *Topics; t && t->sub_topic.msg_base; t++)        /* subscribes */        for (t = Topics; t && t->sub_topic.msg_base; t++)        /* subscribes */
                 len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;                  len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
   
         /* calculate header size */          /* calculate header size */
Line 234  mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics Line 224  mqtt_msgUNSUBSCRIBE(mqtt_subscr_t ** __restrict Topics
         data += sizeof(mqtt_len_t);          data += sizeof(mqtt_len_t);
   
         /* payload with subscriptions */          /* payload with subscriptions */
        for (t = *Topics; t && t->sub_topic.msg_base; t++) {        for (t = Topics; t && t->sub_topic.msg_base; t++) {
                 topic = (mqtthdr_var_t*) data;                  topic = (mqtthdr_var_t*) data;
                 topic->var_sb.val = htons(t->sub_topic.msg_len);                  topic->var_sb.val = htons(t->sub_topic.msg_len);
                 memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));                  memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
Line 273  mqtt_msgUNSUBACK(u_short msgID) Line 263  mqtt_msgUNSUBACK(u_short msgID)
   
         return msg;          return msg;
 }  }
   
   
 /* ============= decode ============ */  
   
 #if 0  
 /*  
  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message  
  *  
  * @buf = Message buffer  
  * @msgID = MessageID  
  * @subscr = Subscriptions, must be free after use with mqtt_subFree()  
  * return: -1 error or >-1 elements into subscr  
  */  
 int  
 mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)  
 {  
         register int i;  
         int len, ret;  
         struct mqtthdr *hdr;  
         mqtthdr_var_t *var;  
         mqtt_subscr_t *subs;  
         mqtt_len_t *v;  
         caddr_t pos;  
   
         if (!buf || !msgID || !subscr)  
                 return -1;  
   
         hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);  
         if (!hdr)  
                 return -1;  
         pos = buf->msg_base + ret + 1;  
         v = (mqtt_len_t*) pos;  
   
         /* MessageID */  
         len -= sizeof(mqtt_len_t);  
         if (len < 0) {  
                 mqtt_SetErr(EINVAL, "Short message length %d", len);  
                 return -1;  
         } else {  
                 *msgID = ntohs(v->val);  
                 pos += sizeof(mqtt_len_t);  
         }  
   
         subs = mqtt_subAlloc(0);  
         if (!subs)  
                 return -1;  
         else  
                 *subscr = subs;  
   
         /* Subscribes */  
         for (i = 0; len > 0; i++) {  
                 var = (mqtthdr_var_t*) pos;  
                 len -= MQTTHDR_VAR_SIZEOF(var) + 1;  
                 if (len < 0) {  
                         mqtt_subFree(subscr);  
                         mqtt_SetErr(EINVAL, "Short message length %d", len);  
                         return -1;  
                 }  
                 if (!mqtt_subRealloc(&subs, i + 1)) {  
                         mqtt_subFree(subscr);  
                         return -1;  
                 } else  
                         *subscr = subs;  
   
                 memset(&subs[i], 0, sizeof subs[i]);  
                 subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);  
                 subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);  
                 if (!subs[i].sub_topic.msg_base) {  
                         LOGERR;  
                         mqtt_subFree(subscr);  
                         return -1;  
                 } else {  
                         memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);  
                         ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;  
                 }  
                 pos += MQTTHDR_VAR_SIZEOF(var);  
   
                 subs[i].sub_ret = *pos;  
                 pos++;  
         }  
   
         return i;  
 }  
   
 /*  
  * mqtt_readSUBACK() Read SUBACK message  
  *  
  * @buf = Message buffer  
  * @msgID = MessageID  
  * @subqos = Subscribes QoS, must be free after use with free()  
  * return: -1 error or >-1 readed subscribes QoS elements  
  */  
 int  
 mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)  
 {  
         int len, ret;  
         struct mqtthdr *hdr;  
         mqtt_len_t *v;  
         caddr_t pos;  
   
         if (!buf || !msgID || !subqos)  
                 return -1;  
   
         hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);  
         if (!hdr)  
                 return -1;  
         pos = buf->msg_base + ret + 1;  
         v = (mqtt_len_t*) pos;  
   
         /* MessageID */  
         len -= sizeof(mqtt_len_t);  
         if (len < 0) {  
                 mqtt_SetErr(EINVAL, "Short message length %d", len);  
                 return -1;  
         } else {  
                 *msgID = ntohs(v->val);  
                 pos += sizeof(mqtt_len_t);  
         }  
   
         /* Subscribes */  
         *subqos = malloc(len);  
         if (!*subqos) {  
                 LOGERR;  
                 return -1;  
         } else  
                 memcpy(*subqos, pos, len);  
   
         return len;  
 }  
   
 /*  
  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message  
  *  
  * @buf = Message buffer  
  * @msgID = MessageID  
  * @subscr = Subscriptions, must be free after use with mqtt_subFree()  
  * return: -1 error or >-1 elements into subscr  
  */  
 int  
 mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)  
 {  
         register int i;  
         int len, ret;  
         struct mqtthdr *hdr;  
         mqtthdr_var_t *var;  
         mqtt_subscr_t *subs;  
         mqtt_len_t *v;  
         caddr_t pos;  
   
         if (!buf || !msgID || !subscr)  
                 return -1;  
   
         hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);  
         if (!hdr)  
                 return -1;  
         pos = buf->msg_base + ret + 1;  
         v = (mqtt_len_t*) pos;  
   
         /* MessageID */  
         len -= sizeof(mqtt_len_t);  
         if (len < 0) {  
                 mqtt_SetErr(EINVAL, "Short message length %d", len);  
                 return -1;  
         } else {  
                 *msgID = ntohs(v->val);  
                 pos += sizeof(mqtt_len_t);  
         }  
   
         subs = mqtt_subAlloc(0);  
         if (!subs)  
                 return -1;  
         else  
                 *subscr = subs;  
   
         /* Subscribes */  
         for (i = 0; len > 0; i++) {  
                 var = (mqtthdr_var_t*) pos;  
                 len -= MQTTHDR_VAR_SIZEOF(var);  
                 if (len < 0) {  
                         mqtt_subFree(subscr);  
                         mqtt_SetErr(EINVAL, "Short message length %d", len);  
                         return -1;  
                 }  
                 if (!mqtt_subRealloc(&subs, i + 1)) {  
                         mqtt_subFree(subscr);  
                         return -1;  
                 } else  
                         *subscr = subs;  
   
                 memset(&subs[i], 0, sizeof subs[i]);  
                 subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);  
                 subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);  
                 if (!subs[i].sub_topic.msg_base) {  
                         LOGERR;  
                         mqtt_subFree(subscr);  
                         return -1;  
                 } else {  
                         memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);  
                         ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;  
                 }  
                 pos += MQTTHDR_VAR_SIZEOF(var);  
         }  
   
         return i;  
 }  
   
 /*  
  * mqtt_readUNSUBACK() Read UNSUBACK message  
  *  
  * @buf = Message buffer  
  * return: -1 error or MessageID  
  */  
 u_short  
 mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)  
 {  
         int len, ret;  
         struct mqtthdr *hdr;  
         mqtt_len_t *v;  
         caddr_t pos;  
   
         hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);  
         if (!hdr)  
                 return (u_short) -1;  
         if (len < sizeof(mqtt_len_t)) {  
                 mqtt_SetErr(EINVAL, "Short message length %d", len);  
                 return (u_short) -1;  
         } else {  
                 pos = buf->msg_base + ret + 1;  
                 v = (mqtt_len_t*) pos;  
         }  
   
         return ntohs(v->val);  
 }  
 #endif  

Removed from v.1.3.12.1  
changed lines
  Added in v.1.3.12.4


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>