Diff for /libaitmqtt/src/pub.c between versions 1.1.1.1.2.1 and 1.4.4.1

version 1.1.1.1.2.1, 2012/04/07 20:56:49 version 1.4.4.1, 2022/09/14 17:37:13
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012Copyright 2004 - 2022
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 49  SUCH DAMAGE. Line 49  SUCH DAMAGE.
 /*  /*
  * mqtt_msgPUBLISH() Create PUBLISH message   * mqtt_msgPUBLISH() Create PUBLISH message
  *   *
  * @buf = Message buffer  
  * @csTopic = Publish topic   * @csTopic = Publish topic
  * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE   * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
  * @Dup = Duplicate message   * @Dup = Duplicate message
Line 57  SUCH DAMAGE. Line 56  SUCH DAMAGE.
  * @Retain = Retain message   * @Retain = Retain message
  * @pData = Publish data into topic   * @pData = Publish data into topic
  * @datlen = Publish data length   * @datlen = Publish data length
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBLISH message
  */   */
intmqtt_msg_t *
mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID, mqtt_msgPUBLISH(const char *csTopic, u_short msgID, u_char Dup, 
                u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)                u_char QOS, u_char Retain, const void *pData, int datlen)
 {  {
        int siz = 0;        int len, siz;
         u_int n, *l;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
         mqtthdr_var_t *topic;          mqtthdr_var_t *topic;
         mqtt_len_t *mid;          mqtt_len_t *mid;
         void *data;          void *data;
           mqtt_msg_t *msg = NULL;
   
        if (!buf || !csTopic)        if (!csTopic)
                return -1;                return NULL;
         if (QOS > MQTT_QOS_EXACTLY) {          if (QOS > MQTT_QOS_EXACTLY) {
                 mqtt_SetErr(EINVAL, "Invalid QoS parameter");                  mqtt_SetErr(EINVAL, "Invalid QoS parameter");
                return -1;                return NULL;
         }          }
         if (!msgID && QOS != MQTT_QOS_ONCE) {          if (!msgID && QOS != MQTT_QOS_ONCE) {
                 mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");                  mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
                return -1;                return NULL;
         }          }
   
        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)        /* calculate message size */
                return -1;        len = sizeof(mqtt_len_t) + strlen(csTopic);     /* topic */
         len += sizeof(mqtt_len_t);                      /* msgid */
         len += datlen;                                  /* data len */
 
         /* calculate header size */
         siz = sizeof(struct mqtthdr);                   /* mqtt fixed header */
         n = mqtt_encodeLen(len);                        /* message size */
         siz += mqtt_sizeLen(n) - 1;                     /* length size */
 
         if (!(msg = mqtt_msgAlloc(siz + len)))
                 return NULL;
         else {          else {
                hdr = (struct mqtthdr *) (buf->msg_base + siz);                data = msg->msg_base;
                siz += sizeof(struct mqtthdr);                hdr = (struct mqtthdr *) data;
         }          }
   
           /* fixed header */
           hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
           hdr->mqtt_msg.qos = QOS;
           hdr->mqtt_msg.dup = Dup ? 1 : 0;
           hdr->mqtt_msg.retain = Retain ? 1 : 0;
           l = (u_int*) hdr->mqtt_len;
           *l = n;
           data += siz;
   
         /* variable header */          /* variable header */
        topic = (mqtthdr_var_t*) (buf->msg_base + siz);        topic = (mqtthdr_var_t*) data;
         topic->var_sb.val = htons(strlen(csTopic));          topic->var_sb.val = htons(strlen(csTopic));
         memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));          memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
        siz += MQTTHDR_VAR_SIZEOF(topic);        data += MQTTHDR_VAR_SIZEOF(topic);
   
        mid = (mqtt_len_t*) (buf->msg_base + siz);        mid = (mqtt_len_t*) data;
         mid->val = htons(msgID);          mid->val = htons(msgID);
        siz += sizeof(mqtt_len_t);        data += sizeof(mqtt_len_t);
   
         /* load with data */          /* load with data */
        if (pData && datlen) {        if (pData && datlen)
                data = buf->msg_base + siz; 
                 memcpy(data, pData, datlen);                  memcpy(data, pData, datlen);
                 siz += datlen;  
         }  
   
        /* fixed header */        return msg;
        MQTTHDR_MSGINIT(hdr); 
        hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH; 
        hdr->mqtt_msg.qos = QOS; 
        hdr->mqtt_msg.dup = Dup ? 1 : 0; 
        hdr->mqtt_msg.retain = Retain ? 1 : 0; 
        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr)); 
 
        mqtt_msgRealloc(buf, siz); 
        return siz; 
 }  }
   
static intstatic mqtt_msg_t *
_mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)_mqtt_msgPUB_(u_char cmd, u_short msgID)
 {  {
         int siz = 0;  
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
         mqtt_len_t *v;          mqtt_len_t *v;
           mqtt_msg_t *msg = NULL;
   
        if (!buf)        if (!(msg = mqtt_msgAlloc(sizeof(struct mqtthdr) + sizeof(mqtt_len_t))))
                return -1;                return NULL;
 
        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_len_t)) == -1) 
                return -1; 
         else {          else {
                hdr = (struct mqtthdr *) (buf->msg_base + siz);                hdr = (struct mqtthdr *) msg->msg_base;
                siz += sizeof(struct mqtthdr);                v = (mqtt_len_t*) (msg->msg_base + sizeof(struct mqtthdr));
                v = (mqtt_len_t*) (buf->msg_base + siz); 
                siz += sizeof(mqtt_len_t); 
         }          }
   
         /* fixed header */          /* fixed header */
         MQTTHDR_MSGINIT(hdr);  
         hdr->mqtt_msg.type = cmd;          hdr->mqtt_msg.type = cmd;
         *hdr->mqtt_len = sizeof(mqtt_len_t);          *hdr->mqtt_len = sizeof(mqtt_len_t);
   
         /* MessageID */          /* MessageID */
         v->val = htons(msgID);          v->val = htons(msgID);
   
        return siz;        return msg;
 }  }
   
 /*  /*
  * mqtt_msgPUBACK() Create PUBACK message   * mqtt_msgPUBACK() Create PUBACK message
  *   *
  * @buf = Message buffer  
  * @msgID = MessageID   * @msgID = MessageID
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBACK message
  */   */
inline intmqtt_msg_t *
mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)mqtt_msgPUBACK(u_short msgID)
 {  {
        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);        return _mqtt_msgPUB_(MQTT_TYPE_PUBACK, msgID);
 }  }
   
 /*  /*
  * mqtt_msgPUBREC() Create PUBREC message   * mqtt_msgPUBREC() Create PUBREC message
  *   *
  * @buf = Message buffer  
  * @msgID = MessageID   * @msgID = MessageID
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBREC message
  */   */
inline intmqtt_msg_t *
mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)mqtt_msgPUBREC(u_short msgID)
 {  {
        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);        return _mqtt_msgPUB_(MQTT_TYPE_PUBREC, msgID);
 }  }
   
 /*  /*
  * mqtt_msgPUBREL() Create PUBREL message   * mqtt_msgPUBREL() Create PUBREL message
  *   *
  * @buf = Message buffer  
  * @msgID = MessageID   * @msgID = MessageID
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBREL message
  */   */
inline intmqtt_msg_t *
mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)mqtt_msgPUBREL(u_short msgID)
 {  {
        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);        return _mqtt_msgPUB_(MQTT_TYPE_PUBREL, msgID);
 }  }
   
 /*  /*
  * mqtt_msgPUBCOMP() Create PUBCOMP message   * mqtt_msgPUBCOMP() Create PUBCOMP message
  *   *
  * @buf = Message buffer  
  * @msgID = MessageID   * @msgID = MessageID
 * return: -1 error or >-1 message size for send * return: NULL error or allocated PUBCOMP message
  */   */
inline intmqtt_msg_t *
mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)mqtt_msgPUBCOMP(u_short msgID)
 {  {
        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);        return _mqtt_msgPUB_(MQTT_TYPE_PUBCOMP, msgID);
 }  }
   
   
 /* ============= decode ============ */  /* ============= decode ============ */
   
   #if 0
 /*  /*
  * mqtt_readPUBLISH() Read PUBLISH message   * mqtt_readPUBLISH() Read PUBLISH message
  *   *
Line 208  mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short m Line 207  mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short m
  * @psTopic = Topic   * @psTopic = Topic
  * @topicLen = Topic length   * @topicLen = Topic length
  * @msgID = MessageID   * @msgID = MessageID
 * @pData = Data buffer * @pData = Data buffer, may be NULL
 * @datLen = Data buffer length, if *datLen == 0 allocate memory for pData * return: -1 error or !=-1 allocated data buffer length
 * return: NULL error or !=NULL MQTT fixed header 
  */   */
struct mqtthdr *int
 mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen,   mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen, 
                u_short *msgID, void * __restrict pData, int *datLen)                u_short *msgID, void ** __restrict pData)
 {  {
         int len, ret;          int len, ret;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
Line 222  mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * _ Line 220  mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * _
         mqtt_len_t *v;          mqtt_len_t *v;
         caddr_t pos;          caddr_t pos;
   
        if (!buf || !psTopic || !msgID || !pData)        if (!buf || !psTopic || !msgID)
                return NULL;                return -1;
   
         hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);          hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
         if (!hdr)          if (!hdr)
                return NULL;                return -1;
         pos = buf->msg_base + ret + 1;          pos = buf->msg_base + ret + 1;
         var = (mqtthdr_var_t*) pos;          var = (mqtthdr_var_t*) pos;
   
Line 235  mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * _ Line 233  mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * _
         len -= MQTTHDR_VAR_SIZEOF(var);          len -= MQTTHDR_VAR_SIZEOF(var);
         if (len < 0) {          if (len < 0) {
                 mqtt_SetErr(EINVAL, "Short message length %d", len);                  mqtt_SetErr(EINVAL, "Short message length %d", len);
                return NULL;                return -1;
         } else {          } else {
                 memset(psTopic, 0, topicLen--);                  memset(psTopic, 0, topicLen--);
                 memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ?                   memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ? 
Line 247  mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * _ Line 245  mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * _
         len -= sizeof(mqtt_len_t);          len -= sizeof(mqtt_len_t);
         if (len < 0) {          if (len < 0) {
                 mqtt_SetErr(EINVAL, "Short message length %d", len);                  mqtt_SetErr(EINVAL, "Short message length %d", len);
                return NULL;                return -1;
         } else {          } else {
                 *msgID = ntohs(v->val);                  *msgID = ntohs(v->val);
                 pos += sizeof(mqtt_len_t);                  pos += sizeof(mqtt_len_t);
Line 256  mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * _ Line 254  mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * _
         /* data */          /* data */
         if (len < 0) {          if (len < 0) {
                 mqtt_SetErr(EINVAL, "Short message length %d", len);                  mqtt_SetErr(EINVAL, "Short message length %d", len);
                return NULL;                return -1;
        } else {        } else if (pData) {
                if (!*datLen) {                if (!(*pData = malloc(len + 1))) {
                        if (!(pData = malloc(len))) {                        LOGERR;
                                LOGERR;                        return -1;
                                return NULL;                } else
                        } else                        ((char*) (*pData))[len] = 0;
                                *datLen = len; 
                } 
   
                memset(pData, 0, *datLen);                memcpy(*pData, pos, len);
                if (len < *datLen) 
                        *datLen = len; 
                memcpy(pData, pos, *datLen); 
         }          }
   
        return hdr;        return len;
 }  }
   
 /*  /*
Line 386  mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf) Line 379  mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
   
         return ntohs(v->val);          return ntohs(v->val);
 }  }
   #endif

Removed from v.1.1.1.1.2.1  
changed lines
  Added in v.1.4.4.1


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>