Diff for /mqtt/src/Attic/sub.c between versions 1.1.2.4 and 1.1.2.10

version 1.1.2.4, 2011/11/21 16:28:05 version 1.1.2.10, 2011/12/06 10:33:37
Line 3 Line 3
   
 /* ------------------------------------------------------------------- */  /* ------------------------------------------------------------------- */
   
   /*
    * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
    *
    * @buf = Message buffer
    * @Topics = MQTT subscription topics
    * @msgID = MessageID
    * @Dup = Duplicate message
    * @QOS = QoS
    * return: -1 error or >-1 message size for send
    */
 int  int
mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID, mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                u_char Dup, u_char QOS)                u_short msgID, u_char Dup, u_char QOS)
 {  {
         int siz = 0;          int siz = 0;
         struct mqtthdr *hdr;          struct mqtthdr *hdr;
         mqtthdr_var_t *topic;          mqtthdr_var_t *topic;
         mqtt_v_t *mid;          mqtt_v_t *mid;
           mqtt_subscr_t *t;
           u_char *qos;
   
        if (!buf || !csTopic)        if (!buf || !Topics)
                 return -1;                  return -1;
         if (QOS > MQTT_QOS_EXACTLY) {          if (QOS > MQTT_QOS_EXACTLY) {
                 mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");                  mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
Line 31  mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, const c Line 43  mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, const c
         }          }
   
         /* variable header */          /* variable header */
   
         mid = (mqtt_v_t*) (buf->msg_base + siz);          mid = (mqtt_v_t*) (buf->msg_base + siz);
         mid->val = htons(msgID);          mid->val = htons(msgID);
         siz += sizeof(mqtt_v_t);          siz += sizeof(mqtt_v_t);
   
        topic = (mqtthdr_var_t*) (buf->msg_base + siz);        /* payload with subscriptions */
        topic->var_sb.val = htons(strlen(csTopic));        for (t = Topics; t && t->sub_topic._base; t++) {
        memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));                topic = (mqtthdr_var_t*) (buf->msg_base + siz);
        siz += MQTTHDR_VAR_SIZEOF(topic);                topic->var_sb.val = htons(t->sub_topic._size);
                 memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
                 siz += MQTTHDR_VAR_SIZEOF(topic);
                 qos = (buf->msg_base + siz);
                 *qos = t->sub_ret;
                 siz++;
         }
   
         /* fixed header */          /* fixed header */
        MQTTHDR_MSGINIT(hdr);
         hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;          hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
         hdr->mqtt_msg.qos = QOS;          hdr->mqtt_msg.qos = QOS;
         hdr->mqtt_msg.dup = Dup ? 1 : 0;          hdr->mqtt_msg.dup = Dup ? 1 : 0;
Line 51  mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, const c Line 68  mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, const c
   
         mqtt_msgRealloc(buf, siz);          mqtt_msgRealloc(buf, siz);
         return siz;          return siz;
   }
   
   /*
    * mqtt_msgSUBACK() Create SUBACK message
    *
    * @buf = Message buffer
    * @Topics = MQTT subscription topics
    * @msgID = MessageID
    * return: -1 error or >-1 message size for send
    */
   int
   mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
   {
           int siz = 0;
           struct mqtthdr *hdr;
           mqtt_v_t *v;
           mqtt_subscr_t *t;
           u_char *qos;
   
           if (!buf || !Topics)
                   return -1;
   
           if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                   return -1;
           else {
                   hdr = (struct mqtthdr *) (buf->msg_base + siz);
                   siz += sizeof(struct mqtthdr);
                   v = (mqtt_v_t*) (buf->msg_base + siz);
                   siz += sizeof(mqtt_v_t);
           }
   
           /* MessageID */
           v->val = htons(msgID);
   
           /* QoS payload from subscriptions */
           for (t = Topics; t && t->sub_topic._base; t++) {
                   qos = (buf->msg_base + siz);
                   *qos = t->sub_ret;
                   siz++;
           }
   
           /* fixed header */
           MQTTHDR_MSGINIT(hdr);
           hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
           *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
   
           mqtt_msgRealloc(buf, siz);
           return siz;
   }
   
   /*
    * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
    *
    * @buf = Message buffer
    * @Topics = MQTT subscription topics
    * @msgID = MessageID
    * @Dup = Duplicate message
    * @QOS = QoS
    * return: -1 error or >-1 message size for send
    */
   int
   mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
                   u_short msgID, u_char Dup, u_char QOS)
   {
           int siz = 0;
           struct mqtthdr *hdr;
           mqtthdr_var_t *topic;
           mqtt_v_t *mid;
           mqtt_subscr_t *t;
   
           if (!buf || !Topics)
                   return -1;
           if (QOS > MQTT_QOS_EXACTLY) {
                   mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                   return -1;
           }
           if (!msgID && QOS != MQTT_QOS_ONCE) {
                   mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                   return -1;
           }
   
           if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                   return -1;
           else {
                   hdr = (struct mqtthdr *) (buf->msg_base + siz);
                   siz += sizeof(struct mqtthdr);
           }
   
           /* variable header */
           mid = (mqtt_v_t*) (buf->msg_base + siz);
           mid->val = htons(msgID);
           siz += sizeof(mqtt_v_t);
   
           /* payload with subscriptions */
           for (t = Topics; t && t->sub_topic._base; t++) {
                   topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                   topic->var_sb.val = htons(t->sub_topic._size);
                   memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
                   siz += MQTTHDR_VAR_SIZEOF(topic);
           }
   
           /* fixed header */
           MQTTHDR_MSGINIT(hdr);
           hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
           hdr->mqtt_msg.qos = QOS;
           hdr->mqtt_msg.dup = Dup ? 1 : 0;
           hdr->mqtt_msg.retain = 0;
           *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
   
           mqtt_msgRealloc(buf, siz);
           return siz;
   }
   
   /*
    * mqtt_msgUNSUBACK() Create UNSUBACK message
    *
    * @buf = Message buffer
    * @msgID = MessageID
    * return: -1 error or >-1 message size for send
    */
   int
   mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
   {
           int siz = 0;
           struct mqtthdr *hdr;
           mqtt_v_t *v;
   
           if (!buf)
                   return -1;
   
           if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
                   return -1;
           else {
                   hdr = (struct mqtthdr *) (buf->msg_base + siz);
                   siz += sizeof(struct mqtthdr);
                   v = (mqtt_v_t*) (buf->msg_base + siz);
                   siz += sizeof(mqtt_v_t);
           }
   
           /* fixed header */
           MQTTHDR_MSGINIT(hdr);
           hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
           *hdr->mqtt_len = sizeof(mqtt_v_t);
   
           /* MessageID */
           v->val = htons(msgID);
   
           return siz;
   }
   
   
   /* ============= decode ============ */
   
   /*
    * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
    *
    * @buf = Message buffer
    * @msgID = MessageID
    * @subscr = Subscriptions, must be free after use with mqtt_subFree()
    * return: NULL error or !=NULL MQTT fixed header
    */
   struct mqtthdr *
   mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
   {
           register int i;
           int len, ret;
           struct mqtthdr *hdr;
           mqtthdr_var_t *var;
           mqtt_subscr_t *subs;
           mqtt_v_t *v;
           caddr_t pos;
   
           if (!buf || !msgID || !subscr)
                   return NULL;
   
           hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
           if (!hdr)
                   return NULL;
           pos = buf->msg_base + ret + 1;
           v = (mqtt_v_t*) pos;
   
           /* MessageID */
           len -= sizeof(mqtt_v_t);
           if (len < 0) {
                   mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                   return NULL;
           } else {
                   *msgID = ntohs(v->val);
                   pos += sizeof(mqtt_v_t);
           }
   
           subs = mqtt_subAlloc(0);
           if (!subs)
                   return NULL;
           else
                   *subscr = subs;
   
           /* Subscribes */
           for (i = 0; len > 0; i++) {
                   var = (mqtthdr_var_t*) pos;
                   len -= MQTTHDR_VAR_SIZEOF(var) + 1;
                   if (len < 0) {
                           mqtt_subFree(subscr);
                           mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                           return NULL;
                   }
                   subs = mqtt_subRealloc(subs, i + 1);
                   if (!subs) {
                           mqtt_subFree(subscr);
                           return NULL;
                   } else
                           *subscr = subs;
   
                   memset(&subs[i], 0, sizeof subs[i]);
                   subs[i].sub_topic._size = ntohs(var->var_sb.val);
                   subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
                   if (!subs[i].sub_topic._base) {
                           LOGERR;
                           mqtt_subFree(subscr);
                           return NULL;
                   } else
                           memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
                   pos += MQTTHDR_VAR_SIZEOF(var);
   
                   subs[i].sub_ret = *pos;
                   pos++;
           }
   
           return hdr;
   }
   
   /*
    * mqtt_readUNSUBACK() Read UNSUBACK message
    *
    * @buf = Message buffer
    * return: -1 error or MessageID
    */
   u_short
   mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
   {
           int len, ret;
           struct mqtthdr *hdr;
           mqtt_v_t *v;
           caddr_t pos;
   
           hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
           if (!hdr)
                   return (u_short) -1;
           if (len < sizeof(mqtt_v_t)) {
                   mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                   return (u_short) -1;
           } else {
                   pos = buf->msg_base + ret + 1;
                   v = (mqtt_v_t*) pos;
           }
   
           return ntohs(v->val);
 }  }

Removed from v.1.1.2.4  
changed lines
  Added in v.1.1.2.10


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>