Diff for /libaitmqtt/src/pub.c between versions 1.1.1.1.2.4 and 1.4.4.2

version 1.1.1.1.2.4, 2012/04/27 15:49:07 version 1.4.4.2, 2022/09/14 18:36:23
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012Copyright 2004 - 2022
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 49  SUCH DAMAGE. Line 49  SUCH DAMAGE.
 /*  /*
  * mqtt_msgPUBLISH() Create PUBLISH message   * mqtt_msgPUBLISH() Create PUBLISH message
  *   *
  * @buf = Message buffer  
  * @csTopic = Publish topic   * @csTopic = Publish topic
  * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE   * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
  * @Dup = Duplicate message   * @Dup = Duplicate message
Line 57  SUCH DAMAGE. Line 56  SUCH DAMAGE.
  * @Retain = Retain message   * @Retain = Retain message
  * @pData = Publish data into topic   * @pData = Publish data into topic
  * @datlen = Publish data length   * @datlen = Publish data length
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBLISH message
  */   */
intmqtt_msg_t *
mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID, mqtt_msgPUBLISH(const char *csTopic, u_short msgID, u_char Dup, 
                u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)                u_char QOS, u_char Retain, const void *pData, int datlen)
 {  {
        int siz = 0;        int len, siz;
         u_int n, *l;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
         mqtthdr_var_t *topic;          mqtthdr_var_t *topic;
         mqtt_len_t *mid;          mqtt_len_t *mid;
         void *data;          void *data;
           mqtt_msg_t *msg = NULL;
   
        if (!buf || !csTopic)        if (!csTopic)
                return -1;                return NULL;
         if (QOS > MQTT_QOS_EXACTLY) {          if (QOS > MQTT_QOS_EXACTLY) {
                 mqtt_SetErr(EINVAL, "Invalid QoS parameter");                  mqtt_SetErr(EINVAL, "Invalid QoS parameter");
                return -1;                return NULL;
         }          }
         if (!msgID && QOS != MQTT_QOS_ONCE) {          if (!msgID && QOS != MQTT_QOS_ONCE) {
                 mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");                  mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
                return -1;                return NULL;
         }          }
   
        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)        /* calculate message size */
                return -1;        len = sizeof(mqtt_len_t) + strlen(csTopic);     /* topic */
         len += sizeof(mqtt_len_t);                      /* msgid */
         len += datlen;                                  /* data len */
 
         /* calculate header size */
         siz = sizeof(struct mqtthdr);                   /* mqtt fixed header */
         n = mqtt_encodeLen(len);                        /* message size */
         siz += mqtt_sizeLen(n) - 1;                     /* length size */
 
         if (!(msg = mqtt_msgAlloc(siz + len)))
                 return NULL;
         else {          else {
                hdr = (struct mqtthdr *) (buf->msg_base + siz);                data = msg->msg_base;
                siz += sizeof(struct mqtthdr);                hdr = (struct mqtthdr *) data;
         }          }
   
           /* fixed header */
           hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
           hdr->mqtt_msg.qos = QOS;
           hdr->mqtt_msg.dup = Dup ? 1 : 0;
           hdr->mqtt_msg.retain = Retain ? 1 : 0;
           l = (u_int*) hdr->mqtt_len;
           *l = n;
           data += siz;
   
         /* variable header */          /* variable header */
        topic = (mqtthdr_var_t*) (buf->msg_base + siz);        topic = (mqtthdr_var_t*) data;
         topic->var_sb.val = htons(strlen(csTopic));          topic->var_sb.val = htons(strlen(csTopic));
         memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));          memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
        siz += MQTTHDR_VAR_SIZEOF(topic);        data += MQTTHDR_VAR_SIZEOF(topic);
   
        mid = (mqtt_len_t*) (buf->msg_base + siz);        mid = (mqtt_len_t*) data;
         mid->val = htons(msgID);          mid->val = htons(msgID);
        siz += sizeof(mqtt_len_t);        data += sizeof(mqtt_len_t);
   
         /* load with data */          /* load with data */
        if (pData && datlen) {        if (pData && datlen)
                data = buf->msg_base + siz; 
                 memcpy(data, pData, datlen);                  memcpy(data, pData, datlen);
                 siz += datlen;  
         }  
   
        /* fixed header */        return msg;
        MQTTHDR_MSGINIT(hdr); 
        hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH; 
        hdr->mqtt_msg.qos = QOS; 
        hdr->mqtt_msg.dup = Dup ? 1 : 0; 
        hdr->mqtt_msg.retain = Retain ? 1 : 0; 
        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr)); 
 
        return siz; 
 }  }
   
static intstatic mqtt_msg_t *
_mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)_mqtt_msgPUB_(u_char cmd, u_short msgID)
 {  {
         int siz = 0;  
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
         mqtt_len_t *v;          mqtt_len_t *v;
           mqtt_msg_t *msg = NULL;
   
        if (!buf)        if (!(msg = mqtt_msgAlloc(sizeof(struct mqtthdr) + sizeof(mqtt_len_t))))
                return -1;                return NULL;
 
        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1) 
                return -1; 
         else {          else {
                hdr = (struct mqtthdr *) (buf->msg_base + siz);                hdr = (struct mqtthdr *) msg->msg_base;
                siz += sizeof(struct mqtthdr);                v = (mqtt_len_t*) (msg->msg_base + sizeof(struct mqtthdr));
                v = (mqtt_len_t*) (buf->msg_base + siz); 
                siz += sizeof(mqtt_len_t); 
         }          }
   
         /* fixed header */          /* fixed header */
         MQTTHDR_MSGINIT(hdr);  
         hdr->mqtt_msg.type = cmd;          hdr->mqtt_msg.type = cmd;
         *hdr->mqtt_len = sizeof(mqtt_len_t);          *hdr->mqtt_len = sizeof(mqtt_len_t);
   
         /* MessageID */          /* MessageID */
         v->val = htons(msgID);          v->val = htons(msgID);
   
        return siz;        return msg;
 }  }
   
 /*  /*
  * mqtt_msgPUBACK() Create PUBACK message   * mqtt_msgPUBACK() Create PUBACK message
  *   *
  * @buf = Message buffer  
  * @msgID = MessageID   * @msgID = MessageID
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBACK message
  */   */
inline intmqtt_msg_t *
mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)mqtt_msgPUBACK(u_short msgID)
 {  {
        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);        return _mqtt_msgPUB_(MQTT_TYPE_PUBACK, msgID);
 }  }
   
 /*  /*
  * mqtt_msgPUBREC() Create PUBREC message   * mqtt_msgPUBREC() Create PUBREC message
  *   *
  * @buf = Message buffer  
  * @msgID = MessageID   * @msgID = MessageID
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBREC message
  */   */
inline intmqtt_msg_t *
mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)mqtt_msgPUBREC(u_short msgID)
 {  {
        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);        return _mqtt_msgPUB_(MQTT_TYPE_PUBREC, msgID);
 }  }
   
 /*  /*
  * mqtt_msgPUBREL() Create PUBREL message   * mqtt_msgPUBREL() Create PUBREL message
  *   *
  * @buf = Message buffer  
  * @msgID = MessageID   * @msgID = MessageID
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBREL message
  */   */
inline intmqtt_msg_t *
mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)mqtt_msgPUBREL(u_short msgID)
 {  {
        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);        return _mqtt_msgPUB_(MQTT_TYPE_PUBREL, msgID);
 }  }
   
 /*  /*
  * mqtt_msgPUBCOMP() Create PUBCOMP message   * mqtt_msgPUBCOMP() Create PUBCOMP message
  *   *
  * @buf = Message buffer  
  * @msgID = MessageID   * @msgID = MessageID
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBCOMP message
  */   */
inline intmqtt_msg_t *
mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)mqtt_msgPUBCOMP(u_short msgID)
 {  {
        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);        return _mqtt_msgPUB_(MQTT_TYPE_PUBCOMP, msgID);
} 
 
 
/* ============= decode ============ */ 
 
/* 
 * mqtt_readPUBLISH() Read PUBLISH message 
 * 
 * @buf = Message buffer 
 * @psTopic = Topic 
 * @topicLen = Topic length 
 * @msgID = MessageID 
 * @pData = Data buffer 
 * return: -1 error or !=-1 allocated data buffer length 
 */ 
int 
mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,  
                u_short *msgID, void ** __restrict pData) 
{ 
        int len, ret; 
        struct mqtthdr *hdr; 
        mqtthdr_var_t *var; 
        mqtt_len_t *v; 
        caddr_t pos; 
 
        if (!buf || !psTopic || !msgID || !pData) 
                return -1; 
 
        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len); 
        if (!hdr) 
                return -1; 
        pos = buf->msg_base + ret + 1; 
        var = (mqtthdr_var_t*) pos; 
 
        /* topic */ 
        len -= MQTTHDR_VAR_SIZEOF(var); 
        if (len < 0) { 
                mqtt_SetErr(EINVAL, "Short message length %d", len); 
                return -1; 
        } else { 
                memset(psTopic, 0, topicLen--); 
                memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?  
                                topicLen : ntohs(var->var_sb.val)); 
                pos += MQTTHDR_VAR_SIZEOF(var); 
                v = (mqtt_len_t*) pos; 
        } 
 
        len -= sizeof(mqtt_len_t); 
        if (len < 0) { 
                mqtt_SetErr(EINVAL, "Short message length %d", len); 
                return -1; 
        } else { 
                *msgID = ntohs(v->val); 
                pos += sizeof(mqtt_len_t); 
        } 
 
        /* data */ 
        if (len < 0) { 
                mqtt_SetErr(EINVAL, "Short message length %d", len); 
                return -1; 
        } else { 
                if (!(*pData = malloc(len + 1))) { 
                        LOGERR; 
                        return -1; 
                } else 
                        ((char*) (*pData))[len] = 0; 
 
                memcpy(*pData, pos, len); 
        } 
 
        return len; 
} 
 
/* 
 * mqtt_readPUBACK() Read PUBACK message 
 * 
 * @buf = Message buffer 
 * return: -1 error or MessageID 
 */ 
u_short 
mqtt_readPUBACK(mqtt_msg_t * __restrict buf) 
{ 
        int len, ret; 
        struct mqtthdr *hdr; 
        mqtt_len_t *v; 
        caddr_t pos; 
 
        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len); 
        if (!hdr) 
                return (u_short) -1; 
        if (len < sizeof(mqtt_len_t)) { 
                mqtt_SetErr(EINVAL, "Short message length %d", len); 
                return (u_short) -1; 
        } else { 
                pos = buf->msg_base + ret + 1; 
                v = (mqtt_len_t*) pos; 
        } 
 
        return ntohs(v->val); 
} 
 
/* 
 * mqtt_readPUBREC() Read PUBREC message 
 * 
 * @buf = Message buffer 
 * return: -1 error or MessageID 
 */ 
u_short 
mqtt_readPUBREC(mqtt_msg_t * __restrict buf) 
{ 
        int len, ret; 
        struct mqtthdr *hdr; 
        mqtt_len_t *v; 
        caddr_t pos; 
 
        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len); 
        if (!hdr) 
                return (u_short) -1; 
        if (len < sizeof(mqtt_len_t)) { 
                mqtt_SetErr(EINVAL, "Short message length %d", len); 
                return (u_short) -1; 
        } else { 
                pos = buf->msg_base + ret + 1; 
                v = (mqtt_len_t*) pos; 
        } 
 
        return ntohs(v->val); 
} 
 
/* 
 * mqtt_readPUBREL() Read PUBREL message 
 * 
 * @buf = Message buffer 
 * return: -1 error or MessageID 
 */ 
u_short 
mqtt_readPUBREL(mqtt_msg_t * __restrict buf) 
{ 
        int len, ret; 
        struct mqtthdr *hdr; 
        mqtt_len_t *v; 
        caddr_t pos; 
 
        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len); 
        if (!hdr) 
                return (u_short) -1; 
        if (len < sizeof(mqtt_len_t)) { 
                mqtt_SetErr(EINVAL, "Short message length %d", len); 
                return (u_short) -1; 
        } else { 
                pos = buf->msg_base + ret + 1; 
                v = (mqtt_len_t*) pos; 
        } 
 
        return ntohs(v->val); 
} 
 
/* 
 * mqtt_readPUBCOMP() Read PUBCOMP message 
 * 
 * @buf = Message buffer 
 * return: -1 error or MessageID 
 */ 
u_short 
mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf) 
{ 
        int len, ret; 
        struct mqtthdr *hdr; 
        mqtt_len_t *v; 
        caddr_t pos; 
 
        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len); 
        if (!hdr) 
                return (u_short) -1; 
        if (len < sizeof(mqtt_len_t)) { 
                mqtt_SetErr(EINVAL, "Short message length %d", len); 
                return (u_short) -1; 
        } else { 
                pos = buf->msg_base + ret + 1; 
                v = (mqtt_len_t*) pos; 
        } 
 
        return ntohs(v->val); 
 }  }

Removed from v.1.1.1.1.2.4  
changed lines
  Added in v.1.4.4.2


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>