Diff for /libaitmqtt/src/aitmqtt.c between versions 1.1.1.1.2.1 and 1.1.1.1.2.11

version 1.1.1.1.2.1, 2012/01/26 14:44:33 version 1.1.1.1.2.11, 2012/04/27 16:17:11
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 53  char mqtt_Error[STRSIZ]; Line 53  char mqtt_Error[STRSIZ];
   
 #pragma GCC visibility pop  #pragma GCC visibility pop
   
 //  
 // Error maintenance functions ...  
 //  
   
 // mqtt_GetErrno() Get error code of last operation  // mqtt_GetErrno() Get error code of last operation
 inline int  inline int
 mqtt_GetErrno()  mqtt_GetErrno()
Line 85  mqtt_SetErr(int eno, char *estr, ...) Line 81  mqtt_SetErr(int eno, char *estr, ...)
 }  }
   
 #pragma GCC visibility push(hidden)  #pragma GCC visibility push(hidden)
// _mqtt_readHEADER() read fixed header from MQTT message/* _mqtt_readHEADER() read fixed header from MQTT message */
 inline struct mqtthdr *  inline struct mqtthdr *
 _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *bytes, int *len)  _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *bytes, int *len)
 {  {
Line 106  _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char c Line 102  _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char c
 }  }
 #pragma GCC visibility pop  #pragma GCC visibility pop
   
 // ----------------------------------------------------------  
   
 /*  /*
  * mqtt_msgFree() Free MQTT message   * mqtt_msgFree() Free MQTT message
Line 179  mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l Line 174  mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l
         if (!msg)          if (!msg)
                 return -1;                  return -1;
   
        if (len == msg->msg_len)        if (len <= msg->msg_len)
                 return len;                  return len;
   
         p = realloc(msg->msg_base, len);          p = realloc(msg->msg_base, len);
Line 275  mqtt_sizeLen(u_int len) Line 270  mqtt_sizeLen(u_int len)
 }  }
   
 /*  /*
 * mqtt_str2sub Create MQTT subscribe variable from string(s) * mqtt_str2subs Create MQTT subscribe variable from string(s)
  *   *
 * @csStr = strings * @csStr = null terminated string array
 * @strnum = number of strings elements * @strnum = copy at most number of strings elements
  * @qoses = QoS elements applied to subscribe variable,    * @qoses = QoS elements applied to subscribe variable, 
  *              count of elements must be equal with csStr elements   *              count of elements must be equal with csStr elements
  * return: NULL error or != subscribe variables array, must be free after use with mqtt_freeSub()   * return: NULL error or != subscribe variables array, must be free after use with mqtt_freeSub()
  */   */
 inline mqtt_subscr_t *  inline mqtt_subscr_t *
mqtt_str2sub(const char **csStr, u_short strnum, u_char *qoses)mqtt_str2subs(const char **csStr, u_short strnum, u_char *qoses)
 {  {
         mqtt_subscr_t *v;          mqtt_subscr_t *v;
         register int i, items;          register int i, items;
Line 292  mqtt_str2sub(const char **csStr, u_short strnum, u_cha Line 287  mqtt_str2sub(const char **csStr, u_short strnum, u_cha
   
         if (!csStr)          if (!csStr)
                 return NULL;                  return NULL;
         for (items = 0, strs = csStr; *strs; items++, strs++)  
                 if (strnum && items >= strnum) {  
                         items = strnum;  
                         break;  
                 }  
   
           for (items = 0, strs = csStr; 
                           (!strnum || (strnum && items < strnum)) && *strs; 
                           items++, strs++);
   
         if (!(v = malloc((items + 1) * sizeof(mqtt_subscr_t)))) {          if (!(v = malloc((items + 1) * sizeof(mqtt_subscr_t)))) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
Line 373  mqtt_subAlloc(u_short num) Line 367  mqtt_subAlloc(u_short num)
  * return: NULL error or subscribe array, after use must call mqtt_subFree()   * return: NULL error or subscribe array, after use must call mqtt_subFree()
  */   */
 inline mqtt_subscr_t *  inline mqtt_subscr_t *
mqtt_subRealloc(mqtt_subscr_t * __restrict subs, u_short num)mqtt_subRealloc(mqtt_subscr_t ** __restrict subs, u_short num)
 {  {
         mqtt_subscr_t *s = NULL;          mqtt_subscr_t *s = NULL;
   
        s = realloc(subs, (num + 1) * sizeof(mqtt_subscr_t));        if (!subs)
                 return NULL;
 
         s = realloc(*subs, (num + 1) * sizeof(mqtt_subscr_t));
         if (!s) {          if (!s) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
           } else {
                   memset(s + num, 0, sizeof(mqtt_subscr_t));
                   *subs = s;
         }          }
   
        return s;        return *subs;
 }  }
   
 /*  /*
    * mqtt_subCopy() - Copy subscription structure to another one
    *
    * @dst = destination subscription
    * @src = source subscription
    * return: =NULL error or !=NULL successful copied a structure
    */
   inline mqtt_subscr_t *
   mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subscr_t * __restrict src)
   {
           if (!dst || !src)
                   return NULL;
   
           if (src->sub_topic.msg_base) {
                   dst->sub_topic.msg_base = malloc(src->sub_topic.msg_len + 1);
                   if (!dst->sub_topic.msg_base) {
                           LOGERR;
                           memset(dst, 0, sizeof(mqtt_subscr_t));
                           return NULL;
                   } else {
                           dst->sub_topic.msg_len = src->sub_topic.msg_len;
                           ((char*) dst->sub_topic.msg_base)[dst->sub_topic.msg_len] = 0;
                           memcpy(dst->sub_topic.msg_base, src->sub_topic.msg_base, 
                                           dst->sub_topic.msg_len);
                   }
           }
           if (src->sub_value.msg_base) {
                   dst->sub_value.msg_base = malloc(src->sub_value.msg_len + 1);
                   if (!dst->sub_value.msg_base) {
                           LOGERR;
                           if (dst->sub_topic.msg_base)
                                   free(dst->sub_topic.msg_base);
                           memset(dst, 0, sizeof(mqtt_subscr_t));
                           return NULL;
                   } else {
                           dst->sub_value.msg_len = src->sub_value.msg_len;
                           ((char*) dst->sub_value.msg_base)[dst->sub_value.msg_len] = 0;
                           memcpy(dst->sub_value.msg_base, src->sub_value.msg_base, 
                                           dst->sub_value.msg_len);
                   }
           }
   
           dst->sub_ret = src->sub_ret;
           return dst;
   }
   
   
   /*
  * mqtt_expandTopic() - Expanding topic to regular expression   * mqtt_expandTopic() - Expanding topic to regular expression
  *   *
  * @csInput = Input topic   * @csInput = Input topic
Line 468  mqtt_expandTopic(const char *csInput, char * __restric Line 515  mqtt_expandTopic(const char *csInput, char * __restric
                 ret++;                  ret++;
         }          }
   
           return ret;
   }
   
   /*
    * mqtt_sqlTopic() - Expanding topic to SQL search string
    *
    * @csInput = Input topic
    * @psSQL = Output to SQL search string
    * @sqlLen = Length of psSQL
    * return: -1 error, 0 changed bytes
    */
   int
   mqtt_sqlTopic(const char *csInput, char * __restrict psSQL, int sqlLen)
   {
           int ret = 0;
           register int i;
           char *pos, *s;
   
           if (!csInput || !psSQL || sqlLen < 1)
                   return -1;
           else
                   memset(psSQL, 0, sqlLen);
   
           /* check # */
           for (i = 0, pos = (char*) csInput; *pos && i < 2; pos++)
                   if (*pos == '#')
                           i++;
           if (i == 2) {
                   mqtt_SetErr(EINVAL, "Syntax error, multiple occurrences of #..#");
                   return -1;
           }
           if (i == 1 && (pos = strrchr(csInput, '#')))
                   if ((pos != csInput && *(pos - 1) != '/') || *(pos + 1)) {
                           mqtt_SetErr(EINVAL, "Syntax error, bad format of #");
                           return -1;
                   }
           /* check + */
           for (pos = (char*) csInput; *pos && (pos = strchr(pos, '+')); pos++)
                   if ((pos != csInput && *(pos - 1) != '/') || (*(pos + 1) && *(pos + 1) != '/')) {
                           mqtt_SetErr(EINVAL, "Syntax error, bad format of +");
                           return -1;
                   }
   
           /* BUILD SEARCH STRING */
           s = psSQL;
           for (pos = (char*) csInput; s < psSQL + sqlLen && *pos; s++, pos++) {
                   if (*pos == '#') {
                           *s = '%';
                           s++;
                           ret++;
                           break;
                   }
                   if (*pos == '+') {
                           if (*(pos + 1)) {
                                   *s = '%';
                                   ret++;
                                   continue;
                           } else {
                                   strlcat(s, "%/", sqlLen - (s - psSQL));
                                   ret += 2;
                                   break;
                           }
                   }
                   /*
                   for (i = 0; i < sizeof reROM - 1; i++)
                           if (*pos == reROM[i] && regexLen - (s - psRegEx) - 1 > 0) {
                                   *s++ = '\\';
                                   ret++;
                                   break;
                           }
                           */
   
                   *s = *pos;
           }
   
           return ret;
   }
   
   
   /*
    * mqtt_KeepAlive() - Keep Alive check routine
    *
    * @sock = connected socket
    * @ka = keep alive timeout
    * @tries = tries for receive correct ping response, usually ==1
    * return: -1 error, 0 host is alive, 1 timeout session or 2 broken session
    */
   int
   mqtt_KeepAlive(int sock, u_short ka, u_char tries)
   {
           int ret = 0;
           struct pollfd pfd;
           mqtt_msg_t msg = { NULL, 0 };
   
           if (sock < 3)
                   return -1;      /* error */
   
           pfd.fd = sock;
           pfd.events = POLLOUT;
           if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || 
                           pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                   LOGERR;
                   return -1;      /* error */
           } else if (!ret)
                   return 1;       /* session is abandoned ... must be disconnect! */
           /* ping request */
           if ((ret = mqtt_msgPINGREQ(&msg)) == -1)
                   return -1;      /* error */
           if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {
                   LOGERR;
                   goto end;
           }
   
           pfd.events = POLLIN | POLLPRI;
           while (tries--) {
                   if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || 
                                   pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                           LOGERR;
                           break;
                   } else if (!ret) {
                           ret = 1;        /* session is abandoned ... must be disconnect! */
                           continue;
                   }
                   /* receive & decode packet */
                   if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) {
                           LOGERR;
                           break;
                   }
                   if (!mqtt_readPINGRESP(&msg)) {
                           ret = 0;        /* Host is alive */
                           break;
                   } else
                           ret = 2;        /* Session is broken ... must be disconnect! */
           }
   end:
           free(msg.msg_base);
         return ret;          return ret;
 }  }

Removed from v.1.1.1.1.2.1  
changed lines
  Added in v.1.1.1.1.2.11


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>