Diff for /libaitmqtt/src/sub.c between versions 1.1 and 1.3

version 1.1, 2012/01/26 13:07:33 version 1.3, 2012/06/28 11:06:17
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
 /* ------------------------------------------------------------------- */  
   
 /*  /*
  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message   * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
  *   *
Line 62  int Line 60  int
 mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,   mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                 u_short msgID, u_char Dup, u_char QOS)                  u_short msgID, u_char Dup, u_char QOS)
 {  {
        int siz = 0;        int len, siz = 0;
         u_int n, *l;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
         mqtthdr_var_t *topic;          mqtthdr_var_t *topic;
         mqtt_len_t *mid;          mqtt_len_t *mid;
         mqtt_subscr_t *t;          mqtt_subscr_t *t;
         u_char *qos;          u_char *qos;
           void *data;
   
         if (!buf || !Topics)          if (!buf || !Topics)
                 return -1;                  return -1;
         if (QOS > MQTT_QOS_EXACTLY) {          if (QOS > MQTT_QOS_EXACTLY) {
                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
                 return -1;                  return -1;
         }          }
         if (!msgID && QOS != MQTT_QOS_ONCE) {          if (!msgID && QOS != MQTT_QOS_ONCE) {
                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
                 return -1;                  return -1;
         }          }
   
        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)        /* calculate message size */
         len = sizeof(mqtt_len_t);                               /* msgid */
         for (t = Topics; t && t->sub_topic.msg_base; t++)       /* subscribes & qos */
                 len += sizeof(mqtt_len_t) + t->sub_topic.msg_len + 1;
 
         /* calculate header size */
         siz = sizeof(struct mqtthdr);                           /* mqtt fixed header */
         n = mqtt_encodeLen(len);                                /* message size */
         siz += mqtt_sizeLen(n) - 1;                             /* length size */
 
         if (mqtt_msgRealloc(buf, siz + len) == -1)
                 return -1;                  return -1;
         else {          else {
                hdr = (struct mqtthdr *) (buf->msg_base + siz);                data = buf->msg_base;
                siz += sizeof(struct mqtthdr);                hdr = (struct mqtthdr *) data;
         }          }
   
           /* fixed header */
           MQTTHDR_MSGINIT(hdr);
           hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
           hdr->mqtt_msg.qos = QOS;
           hdr->mqtt_msg.dup = Dup ? 1 : 0;
           hdr->mqtt_msg.retain = 0;
           l = (u_int*) hdr->mqtt_len;
           *l = n;
           data += siz;
   
         /* variable header */          /* variable header */
        mid = (mqtt_len_t*) (buf->msg_base + siz);        mid = (mqtt_len_t*) data;
         mid->val = htons(msgID);          mid->val = htons(msgID);
        siz += sizeof(mqtt_len_t);        data += sizeof(mqtt_len_t);
   
         /* payload with subscriptions */          /* payload with subscriptions */
         for (t = Topics; t && t->sub_topic.msg_base; t++) {          for (t = Topics; t && t->sub_topic.msg_base; t++) {
                topic = (mqtthdr_var_t*) (buf->msg_base + siz);                topic = (mqtthdr_var_t*) data;
                 topic->var_sb.val = htons(t->sub_topic.msg_len);                  topic->var_sb.val = htons(t->sub_topic.msg_len);
                 memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));                  memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                siz += MQTTHDR_VAR_SIZEOF(topic);                data += MQTTHDR_VAR_SIZEOF(topic);
                qos = (buf->msg_base + siz);                qos = data++;
                 *qos = t->sub_ret;                  *qos = t->sub_ret;
                 siz++;  
         }          }
   
        /* fixed header */        return siz + len;
        MQTTHDR_MSGINIT(hdr); 
        hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE; 
        hdr->mqtt_msg.qos = QOS; 
        hdr->mqtt_msg.dup = Dup ? 1 : 0; 
        hdr->mqtt_msg.retain = 0; 
        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr)); 
 
        mqtt_msgRealloc(buf, siz); 
        return siz; 
 }  }
   
 /*  /*
Line 159  mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subsc Line 169  mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subsc
         hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;          hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
         *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));          *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
   
         mqtt_msgRealloc(buf, siz);  
         return siz;          return siz;
 }  }
   
Line 177  int Line 186  int
 mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics,   mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                 u_short msgID, u_char Dup, u_char QOS)                  u_short msgID, u_char Dup, u_char QOS)
 {  {
        int siz = 0;        int len, siz = 0;
         u_int n, *l;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
         mqtthdr_var_t *topic;          mqtthdr_var_t *topic;
         mqtt_len_t *mid;          mqtt_len_t *mid;
         mqtt_subscr_t *t;          mqtt_subscr_t *t;
           void *data;
   
         if (!buf || !Topics)          if (!buf || !Topics)
                 return -1;                  return -1;
         if (QOS > MQTT_QOS_EXACTLY) {          if (QOS > MQTT_QOS_EXACTLY) {
                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");                mqtt_SetErr(EINVAL, "Invalid QoS parameter");
                 return -1;                  return -1;
         }          }
         if (!msgID && QOS != MQTT_QOS_ONCE) {          if (!msgID && QOS != MQTT_QOS_ONCE) {
                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");                mqtt_SetErr(EINVAL, "Invalid MessageID parameter must be >0");
                 return -1;                  return -1;
         }          }
   
        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)        /* calculate message size */
         len = sizeof(mqtt_len_t);                               /* msgid */
         for (t = Topics; t && t->sub_topic.msg_base; t++)       /* subscribes */
                 len += sizeof(mqtt_len_t) + t->sub_topic.msg_len;
 
         /* calculate header size */
         siz = sizeof(struct mqtthdr);                           /* mqtt fixed header */
         n = mqtt_encodeLen(len);                                /* message size */
         siz += mqtt_sizeLen(n) - 1;                             /* length size */
 
         if (mqtt_msgRealloc(buf, siz + len) == -1)
                 return -1;                  return -1;
         else {          else {
                hdr = (struct mqtthdr *) (buf->msg_base + siz);                data = buf->msg_base;
                siz += sizeof(struct mqtthdr);                hdr = (struct mqtthdr *) data;
         }          }
   
           /* fixed header */
           MQTTHDR_MSGINIT(hdr);
           hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
           hdr->mqtt_msg.qos = QOS;
           hdr->mqtt_msg.dup = Dup ? 1 : 0;
           hdr->mqtt_msg.retain = 0;
           l = (u_int*) hdr->mqtt_len;
           *l = n;
           data += siz;
   
         /* variable header */          /* variable header */
         mid = (mqtt_len_t*) (buf->msg_base + siz);          mid = (mqtt_len_t*) (buf->msg_base + siz);
         mid->val = htons(msgID);          mid->val = htons(msgID);
        siz += sizeof(mqtt_len_t);        data += sizeof(mqtt_len_t);
   
         /* payload with subscriptions */          /* payload with subscriptions */
         for (t = Topics; t && t->sub_topic.msg_base; t++) {          for (t = Topics; t && t->sub_topic.msg_base; t++) {
                topic = (mqtthdr_var_t*) (buf->msg_base + siz);                topic = (mqtthdr_var_t*) data;
                 topic->var_sb.val = htons(t->sub_topic.msg_len);                  topic->var_sb.val = htons(t->sub_topic.msg_len);
                 memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));                  memcpy(topic->var_data, t->sub_topic.msg_base, ntohs(topic->var_sb.val));
                siz += MQTTHDR_VAR_SIZEOF(topic);                data += MQTTHDR_VAR_SIZEOF(topic);
         }          }
   
        /* fixed header */        return siz + len;
        MQTTHDR_MSGINIT(hdr); 
        hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE; 
        hdr->mqtt_msg.qos = QOS; 
        hdr->mqtt_msg.dup = Dup ? 1 : 0; 
        hdr->mqtt_msg.retain = 0; 
        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr)); 
 
        mqtt_msgRealloc(buf, siz); 
        return siz; 
 }  }
   
 /*  /*
Line 272  mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short  Line 294  mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short 
  * @buf = Message buffer   * @buf = Message buffer
  * @msgID = MessageID   * @msgID = MessageID
  * @subscr = Subscriptions, must be free after use with mqtt_subFree()   * @subscr = Subscriptions, must be free after use with mqtt_subFree()
 * return: NULL error or !=NULL MQTT fixed header * return: -1 error or >-1 elements into subscr
  */   */
struct mqtthdr *int
 mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)  mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
 {  {
         register int i;          register int i;
Line 286  mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_shor Line 308  mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_shor
         caddr_t pos;          caddr_t pos;
   
         if (!buf || !msgID || !subscr)          if (!buf || !msgID || !subscr)
                return NULL;                return -1;
   
         hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);          hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
         if (!hdr)          if (!hdr)
                return NULL;                return -1;
         pos = buf->msg_base + ret + 1;          pos = buf->msg_base + ret + 1;
         v = (mqtt_len_t*) pos;          v = (mqtt_len_t*) pos;
   
         /* MessageID */          /* MessageID */
         len -= sizeof(mqtt_len_t);          len -= sizeof(mqtt_len_t);
         if (len < 0) {          if (len < 0) {
                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);                mqtt_SetErr(EINVAL, "Short message length %d", len);
                return NULL;                return -1;
         } else {          } else {
                 *msgID = ntohs(v->val);                  *msgID = ntohs(v->val);
                 pos += sizeof(mqtt_len_t);                  pos += sizeof(mqtt_len_t);
Line 306  mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_shor Line 328  mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_shor
   
         subs = mqtt_subAlloc(0);          subs = mqtt_subAlloc(0);
         if (!subs)          if (!subs)
                return NULL;                return -1;
         else          else
                 *subscr = subs;                  *subscr = subs;
   
Line 316  mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_shor Line 338  mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_shor
                 len -= MQTTHDR_VAR_SIZEOF(var) + 1;                  len -= MQTTHDR_VAR_SIZEOF(var) + 1;
                 if (len < 0) {                  if (len < 0) {
                         mqtt_subFree(subscr);                          mqtt_subFree(subscr);
                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);                        mqtt_SetErr(EINVAL, "Short message length %d", len);
                        return NULL;                        return -1;
                 }                  }
                subs = mqtt_subRealloc(subs, i + 1);                if (!mqtt_subRealloc(&subs, i + 1)) {
                if (!subs) { 
                         mqtt_subFree(subscr);                          mqtt_subFree(subscr);
                        return NULL;                        return -1;
                 } else                  } else
                         *subscr = subs;                          *subscr = subs;
   
                 memset(&subs[i], 0, sizeof subs[i]);                  memset(&subs[i], 0, sizeof subs[i]);
                 subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);                  subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
                 if (!subs[i].sub_topic.msg_base) {                  if (!subs[i].sub_topic.msg_base) {
                         LOGERR;                          LOGERR;
                         mqtt_subFree(subscr);                          mqtt_subFree(subscr);
                        return NULL;                        return -1;
                } else                } else {
                         memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);                          memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                           ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
                   }
                 pos += MQTTHDR_VAR_SIZEOF(var);                  pos += MQTTHDR_VAR_SIZEOF(var);
   
                 subs[i].sub_ret = *pos;                  subs[i].sub_ret = *pos;
                 pos++;                  pos++;
         }          }
   
        return hdr;        return i;
 }  }
   
 /*  /*
Line 372  mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short * Line 395  mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *
         /* MessageID */          /* MessageID */
         len -= sizeof(mqtt_len_t);          len -= sizeof(mqtt_len_t);
         if (len < 0) {          if (len < 0) {
                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);                mqtt_SetErr(EINVAL, "Short message length %d", len);
                 return -1;                  return -1;
         } else {          } else {
                 *msgID = ntohs(v->val);                  *msgID = ntohs(v->val);
Line 396  mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short * Line 419  mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *
  * @buf = Message buffer   * @buf = Message buffer
  * @msgID = MessageID   * @msgID = MessageID
  * @subscr = Subscriptions, must be free after use with mqtt_subFree()   * @subscr = Subscriptions, must be free after use with mqtt_subFree()
 * return: NULL error or !=NULL MQTT fixed header * return: -1 error or >-1 elements into subscr
  */   */
struct mqtthdr *int
 mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)  mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
 {  {
         register int i;          register int i;
Line 410  mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_sh Line 433  mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_sh
         caddr_t pos;          caddr_t pos;
   
         if (!buf || !msgID || !subscr)          if (!buf || !msgID || !subscr)
                return NULL;                return -1;
   
         hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);          hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
         if (!hdr)          if (!hdr)
                return NULL;                return -1;
         pos = buf->msg_base + ret + 1;          pos = buf->msg_base + ret + 1;
         v = (mqtt_len_t*) pos;          v = (mqtt_len_t*) pos;
   
         /* MessageID */          /* MessageID */
         len -= sizeof(mqtt_len_t);          len -= sizeof(mqtt_len_t);
         if (len < 0) {          if (len < 0) {
                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);                mqtt_SetErr(EINVAL, "Short message length %d", len);
                return NULL;                return -1;
         } else {          } else {
                 *msgID = ntohs(v->val);                  *msgID = ntohs(v->val);
                 pos += sizeof(mqtt_len_t);                  pos += sizeof(mqtt_len_t);
Line 430  mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_sh Line 453  mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_sh
   
         subs = mqtt_subAlloc(0);          subs = mqtt_subAlloc(0);
         if (!subs)          if (!subs)
                return NULL;                return -1;
         else          else
                 *subscr = subs;                  *subscr = subs;
   
Line 440  mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_sh Line 463  mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_sh
                 len -= MQTTHDR_VAR_SIZEOF(var);                  len -= MQTTHDR_VAR_SIZEOF(var);
                 if (len < 0) {                  if (len < 0) {
                         mqtt_subFree(subscr);                          mqtt_subFree(subscr);
                        mqtt_SetErr(EINVAL, "Error:: short message length %d", len);                        mqtt_SetErr(EINVAL, "Short message length %d", len);
                        return NULL;                        return -1;
                 }                  }
                subs = mqtt_subRealloc(subs, i + 1);                if (!mqtt_subRealloc(&subs, i + 1)) {
                if (!subs) { 
                         mqtt_subFree(subscr);                          mqtt_subFree(subscr);
                        return NULL;                        return -1;
                 } else                  } else
                         *subscr = subs;                          *subscr = subs;
   
                 memset(&subs[i], 0, sizeof subs[i]);                  memset(&subs[i], 0, sizeof subs[i]);
                 subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);                  subs[i].sub_topic.msg_len = ntohs(var->var_sb.val);
                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len);                subs[i].sub_topic.msg_base = malloc(subs[i].sub_topic.msg_len + 1);
                 if (!subs[i].sub_topic.msg_base) {                  if (!subs[i].sub_topic.msg_base) {
                         LOGERR;                          LOGERR;
                         mqtt_subFree(subscr);                          mqtt_subFree(subscr);
                        return NULL;                        return -1;
                } else                } else {
                         memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);                          memcpy(subs[i].sub_topic.msg_base, var->var_data, subs[i].sub_topic.msg_len);
                           ((char*) subs[i].sub_topic.msg_base)[subs[i].sub_topic.msg_len] = 0;
                   }
                 pos += MQTTHDR_VAR_SIZEOF(var);                  pos += MQTTHDR_VAR_SIZEOF(var);
         }          }
   
        return hdr;        return i;
 }  }
   
 /*  /*
Line 483  mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf) Line 507  mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
         if (!hdr)          if (!hdr)
                 return (u_short) -1;                  return (u_short) -1;
         if (len < sizeof(mqtt_len_t)) {          if (len < sizeof(mqtt_len_t)) {
                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);                mqtt_SetErr(EINVAL, "Short message length %d", len);
                 return (u_short) -1;                  return (u_short) -1;
         } else {          } else {
                 pos = buf->msg_base + ret + 1;                  pos = buf->msg_base + ret + 1;

Removed from v.1.1  
changed lines
  Added in v.1.3


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>