Diff for /libaitmqtt/src/aitmqtt.c between versions 1.1 and 1.1.1.1.2.11

version 1.1, 2012/01/26 13:07:33 version 1.1.1.1.2.11, 2012/04/27 16:17:11
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 53  char mqtt_Error[STRSIZ]; Line 53  char mqtt_Error[STRSIZ];
   
 #pragma GCC visibility pop  #pragma GCC visibility pop
   
 //  
 // Error maintenance functions ...  
 //  
   
 // mqtt_GetErrno() Get error code of last operation  // mqtt_GetErrno() Get error code of last operation
 inline int  inline int
 mqtt_GetErrno()  mqtt_GetErrno()
Line 85  mqtt_SetErr(int eno, char *estr, ...) Line 81  mqtt_SetErr(int eno, char *estr, ...)
 }  }
   
 #pragma GCC visibility push(hidden)  #pragma GCC visibility push(hidden)
// _mqtt_readHEADER() read fixed header from MQTT message/* _mqtt_readHEADER() read fixed header from MQTT message */
 inline struct mqtthdr *  inline struct mqtthdr *
 _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *bytes, int *len)  _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *bytes, int *len)
 {  {
Line 106  _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char c Line 102  _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char c
 }  }
 #pragma GCC visibility pop  #pragma GCC visibility pop
   
 // ----------------------------------------------------------  
   
 /*  /*
  * mqtt_msgFree() Free MQTT message   * mqtt_msgFree() Free MQTT message
Line 179  mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l Line 174  mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l
         if (!msg)          if (!msg)
                 return -1;                  return -1;
   
        if (len == msg->msg_len)        if (len <= msg->msg_len)
                 return len;                  return len;
   
         p = realloc(msg->msg_base, len);          p = realloc(msg->msg_base, len);
Line 275  mqtt_sizeLen(u_int len) Line 270  mqtt_sizeLen(u_int len)
 }  }
   
 /*  /*
 * mqtt_str2sub Create MQTT subscribe variable from string(s) * mqtt_str2subs Create MQTT subscribe variable from string(s)
  *   *
 * @csStr = strings * @csStr = null terminated string array
 * @strnum = number of strings elements * @strnum = copy at most number of strings elements
  * @qoses = QoS elements applied to subscribe variable,    * @qoses = QoS elements applied to subscribe variable, 
  *              count of elements must be equal with csStr elements   *              count of elements must be equal with csStr elements
  * return: NULL error or != subscribe variables array, must be free after use with mqtt_freeSub()   * return: NULL error or != subscribe variables array, must be free after use with mqtt_freeSub()
  */   */
 inline mqtt_subscr_t *  inline mqtt_subscr_t *
mqtt_str2sub(const char **csStr, u_short strnum, u_char *qoses)mqtt_str2subs(const char **csStr, u_short strnum, u_char *qoses)
 {  {
         mqtt_subscr_t *v;          mqtt_subscr_t *v;
         register int i, items;          register int i, items;
Line 292  mqtt_str2sub(const char **csStr, u_short strnum, u_cha Line 287  mqtt_str2sub(const char **csStr, u_short strnum, u_cha
   
         if (!csStr)          if (!csStr)
                 return NULL;                  return NULL;
         for (items = 0, strs = csStr; *strs; items++, strs++)  
                 if (strnum && items >= strnum) {  
                         items = strnum;  
                         break;  
                 }  
   
           for (items = 0, strs = csStr; 
                           (!strnum || (strnum && items < strnum)) && *strs; 
                           items++, strs++);
   
         if (!(v = malloc((items + 1) * sizeof(mqtt_subscr_t)))) {          if (!(v = malloc((items + 1) * sizeof(mqtt_subscr_t)))) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
Line 373  mqtt_subAlloc(u_short num) Line 367  mqtt_subAlloc(u_short num)
  * return: NULL error or subscribe array, after use must call mqtt_subFree()   * return: NULL error or subscribe array, after use must call mqtt_subFree()
  */   */
 inline mqtt_subscr_t *  inline mqtt_subscr_t *
mqtt_subRealloc(mqtt_subscr_t * __restrict subs, u_short num)mqtt_subRealloc(mqtt_subscr_t ** __restrict subs, u_short num)
 {  {
         mqtt_subscr_t *s = NULL;          mqtt_subscr_t *s = NULL;
   
        s = realloc(subs, (num + 1) * sizeof(mqtt_subscr_t));        if (!subs)
                 return NULL;
 
         s = realloc(*subs, (num + 1) * sizeof(mqtt_subscr_t));
         if (!s) {          if (!s) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
           } else {
                   memset(s + num, 0, sizeof(mqtt_subscr_t));
                   *subs = s;
         }          }
   
        return s;        return *subs;
 }
 
 /*
  * mqtt_subCopy() - Copy subscription structure to another one
  *
  * @dst = destination subscription
  * @src = source subscription
  * return: =NULL error or !=NULL successful copied a structure
  */
 inline mqtt_subscr_t *
 mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subscr_t * __restrict src)
 {
         if (!dst || !src)
                 return NULL;
 
         if (src->sub_topic.msg_base) {
                 dst->sub_topic.msg_base = malloc(src->sub_topic.msg_len + 1);
                 if (!dst->sub_topic.msg_base) {
                         LOGERR;
                         memset(dst, 0, sizeof(mqtt_subscr_t));
                         return NULL;
                 } else {
                         dst->sub_topic.msg_len = src->sub_topic.msg_len;
                         ((char*) dst->sub_topic.msg_base)[dst->sub_topic.msg_len] = 0;
                         memcpy(dst->sub_topic.msg_base, src->sub_topic.msg_base, 
                                         dst->sub_topic.msg_len);
                 }
         }
         if (src->sub_value.msg_base) {
                 dst->sub_value.msg_base = malloc(src->sub_value.msg_len + 1);
                 if (!dst->sub_value.msg_base) {
                         LOGERR;
                         if (dst->sub_topic.msg_base)
                                 free(dst->sub_topic.msg_base);
                         memset(dst, 0, sizeof(mqtt_subscr_t));
                         return NULL;
                 } else {
                         dst->sub_value.msg_len = src->sub_value.msg_len;
                         ((char*) dst->sub_value.msg_base)[dst->sub_value.msg_len] = 0;
                         memcpy(dst->sub_value.msg_base, src->sub_value.msg_base, 
                                         dst->sub_value.msg_len);
                 }
         }
 
         dst->sub_ret = src->sub_ret;
         return dst;
 }
 
 
 /*
  * mqtt_expandTopic() - Expanding topic to regular expression
  *
  * @csInput = Input topic
  * @psRegEx = Output to regular expression
  * @regexLen = Length of psRegEx
  * @BOL = Begin of Line, if =0 not added
  * @EOL = End of Line, if =0 not appended
  * return: -1 error, 0 nothing expanded or >0 expanded bytes
  */
 int
 mqtt_expandTopic(const char *csInput, char * __restrict psRegEx, int regexLen, u_char BOL, u_char EOL)
 {
         int ret = 0;
         register int i;
         char *pos, *s;
         const char reROM[] = "[](){}^$\\-|?.+*";
 
         if (!csInput || !psRegEx || regexLen < 1)
                 return -1;
         else
                 memset(psRegEx, 0, regexLen);
 
         /* check # */
         for (i = 0, pos = (char*) csInput; *pos && i < 2; pos++)
                 if (*pos == '#')
                         i++;
         if (i == 2) {
                 mqtt_SetErr(EINVAL, "Syntax error, multiple occurrences of #..#");
                 return -1;
         }
         if (i == 1 && (pos = strrchr(csInput, '#')))
                 if ((pos != csInput && *(pos - 1) != '/') || *(pos + 1)) {
                         mqtt_SetErr(EINVAL, "Syntax error, bad format of #");
                         return -1;
                 }
         /* check + */
         for (pos = (char*) csInput; *pos && (pos = strchr(pos, '+')); pos++)
                 if ((pos != csInput && *(pos - 1) != '/') || (*(pos + 1) && *(pos + 1) != '/')) {
                         mqtt_SetErr(EINVAL, "Syntax error, bad format of +");
                         return -1;
                 }
 
         /* BUILD REGEX */
         s = psRegEx;
         if (BOL) {
                 *s++ = '^';
                 ret++;
         }
         for (pos = (char*) csInput; s < psRegEx + regexLen && *pos; s++, pos++) {
                 if (*pos == '#') {
                         strlcat(s, ".*", regexLen - (s - psRegEx));
                         s++;
                         ret++;
                         break;
                 }
                 if (*pos == '+') {
                         if (*(pos + 1)) {
                                 strlcat(s, ".*", regexLen - (s - psRegEx));
                                 s++;
                                 ret++;
                                 continue;
                         } else {
                                 strlcat(s, ".*/", regexLen - (s - psRegEx));
                                 ret += 2;
                                 break;
                         }
                 }
                 for (i = 0; i < sizeof reROM - 1; i++)
                         if (*pos == reROM[i] && regexLen - (s - psRegEx) - 1 > 0) {
                                 *s++ = '\\';
                                 ret++;
                                 break;
                         }
 
                 *s = *pos;
         }
         if (EOL) {
                 strlcat(psRegEx, "$", regexLen);
                 ret++;
         }
 
         return ret;
 }
 
 /*
  * mqtt_sqlTopic() - Expanding topic to SQL search string
  *
  * @csInput = Input topic
  * @psSQL = Output to SQL search string
  * @sqlLen = Length of psSQL
  * return: -1 error, 0 changed bytes
  */
 int
 mqtt_sqlTopic(const char *csInput, char * __restrict psSQL, int sqlLen)
 {
         int ret = 0;
         register int i;
         char *pos, *s;
 
         if (!csInput || !psSQL || sqlLen < 1)
                 return -1;
         else
                 memset(psSQL, 0, sqlLen);
 
         /* check # */
         for (i = 0, pos = (char*) csInput; *pos && i < 2; pos++)
                 if (*pos == '#')
                         i++;
         if (i == 2) {
                 mqtt_SetErr(EINVAL, "Syntax error, multiple occurrences of #..#");
                 return -1;
         }
         if (i == 1 && (pos = strrchr(csInput, '#')))
                 if ((pos != csInput && *(pos - 1) != '/') || *(pos + 1)) {
                         mqtt_SetErr(EINVAL, "Syntax error, bad format of #");
                         return -1;
                 }
         /* check + */
         for (pos = (char*) csInput; *pos && (pos = strchr(pos, '+')); pos++)
                 if ((pos != csInput && *(pos - 1) != '/') || (*(pos + 1) && *(pos + 1) != '/')) {
                         mqtt_SetErr(EINVAL, "Syntax error, bad format of +");
                         return -1;
                 }
 
         /* BUILD SEARCH STRING */
         s = psSQL;
         for (pos = (char*) csInput; s < psSQL + sqlLen && *pos; s++, pos++) {
                 if (*pos == '#') {
                         *s = '%';
                         s++;
                         ret++;
                         break;
                 }
                 if (*pos == '+') {
                         if (*(pos + 1)) {
                                 *s = '%';
                                 ret++;
                                 continue;
                         } else {
                                 strlcat(s, "%/", sqlLen - (s - psSQL));
                                 ret += 2;
                                 break;
                         }
                 }
                 /*
                 for (i = 0; i < sizeof reROM - 1; i++)
                         if (*pos == reROM[i] && regexLen - (s - psRegEx) - 1 > 0) {
                                 *s++ = '\\';
                                 ret++;
                                 break;
                         }
                         */
 
                 *s = *pos;
         }
 
         return ret;
 }
 
 
 /*
  * mqtt_KeepAlive() - Keep Alive check routine
  *
  * @sock = connected socket
  * @ka = keep alive timeout
  * @tries = tries for receive correct ping response, usually ==1
  * return: -1 error, 0 host is alive, 1 timeout session or 2 broken session
  */
 int
 mqtt_KeepAlive(int sock, u_short ka, u_char tries)
 {
         int ret = 0;
         struct pollfd pfd;
         mqtt_msg_t msg = { NULL, 0 };
 
         if (sock < 3)
                 return -1;      /* error */
 
         pfd.fd = sock;
         pfd.events = POLLOUT;
         if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || 
                         pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                 LOGERR;
                 return -1;      /* error */
         } else if (!ret)
                 return 1;       /* session is abandoned ... must be disconnect! */
         /* ping request */
         if ((ret = mqtt_msgPINGREQ(&msg)) == -1)
                 return -1;      /* error */
         if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {
                 LOGERR;
                 goto end;
         }
 
         pfd.events = POLLIN | POLLPRI;
         while (tries--) {
                 if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || 
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
                         LOGERR;
                         break;
                 } else if (!ret) {
                         ret = 1;        /* session is abandoned ... must be disconnect! */
                         continue;
                 }
                 /* receive & decode packet */
                 if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) {
                         LOGERR;
                         break;
                 }
                 if (!mqtt_readPINGRESP(&msg)) {
                         ret = 0;        /* Host is alive */
                         break;
                 } else
                         ret = 2;        /* Session is broken ... must be disconnect! */
         }
 end:
         free(msg.msg_base);
         return ret;
 }  }

Removed from v.1.1  
changed lines
  Added in v.1.1.1.1.2.11


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>