Diff for /libaitmqtt/src/aitmqtt.c between versions 1.1.1.1.2.11 and 1.3.4.5

version 1.1.1.1.2.11, 2012/04/27 16:17:11 version 1.3.4.5, 2022/09/15 15:04:44
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012Copyright 2004 - 2022
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 54  char mqtt_Error[STRSIZ]; Line 54  char mqtt_Error[STRSIZ];
 #pragma GCC visibility pop  #pragma GCC visibility pop
   
 // mqtt_GetErrno() Get error code of last operation  // mqtt_GetErrno() Get error code of last operation
inline intint
 mqtt_GetErrno()  mqtt_GetErrno()
 {  {
         return mqtt_Errno;          return mqtt_Errno;
 }  }
   
 // mqtt_GetError() Get error text of last operation  // mqtt_GetError() Get error text of last operation
inline const char *const char *
 mqtt_GetError()  mqtt_GetError()
 {  {
         return mqtt_Error;          return mqtt_Error;
 }  }
   
 // mqtt_SetErr() Set error to variables for internal use!!!  // mqtt_SetErr() Set error to variables for internal use!!!
inline voidvoid
 mqtt_SetErr(int eno, char *estr, ...)  mqtt_SetErr(int eno, char *estr, ...)
 {  {
         va_list lst;          va_list lst;
Line 80  mqtt_SetErr(int eno, char *estr, ...) Line 80  mqtt_SetErr(int eno, char *estr, ...)
         va_end(lst);          va_end(lst);
 }  }
   
 #pragma GCC visibility push(hidden)  
 /* _mqtt_readHEADER() read fixed header from MQTT message */  
 inline struct mqtthdr *  
 _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *bytes, int *len)  
 {  
         struct mqtthdr *hdr;  
   
         if (!buf || !buf->msg_base || !buf->msg_len)  
                 return NULL;  
   
         hdr = (struct mqtthdr*) buf->msg_base;  
         if (hdr->mqtt_msg.type != cmd) {  
                 mqtt_SetErr(EINVAL, "Error:: wrong command #%d should be %d",   
                                 hdr->mqtt_msg.type, cmd);  
                 return NULL;  
         }  
   
         *len = mqtt_decodeLen(hdr->mqtt_len, bytes);  
         return hdr;  
 }  
 #pragma GCC visibility pop  
   
   
 /*  /*
  * mqtt_msgFree() Free MQTT message   * mqtt_msgFree() Free MQTT message
  *   *
  * @msg = Message buffer   * @msg = Message buffer
 * @all = !=0 Destroy entire message, if MQTT Message allocated with mqtt_msgAlloc() * @keepmsg = !=0 just free message content
  * return: none   * return: none
  */   */
inline voidvoid
mqtt_msgFree(mqtt_msg_t ** __restrict msg, int all)mqtt_msgFree(mqtt_msg_t ** __restrict msg, int keepmsg)
 {  {
         if (msg && *msg) {          if (msg && *msg) {
                 if ((*msg)->msg_base) {                  if ((*msg)->msg_base) {
                        free((*msg)->msg_base);                        e_free((*msg)->msg_base);
                         (*msg)->msg_base = NULL;                          (*msg)->msg_base = NULL;
                 }                  }
                if (all) {                if (!keepmsg) {
                        free(*msg);                        e_free(*msg);
                         *msg = NULL;                          *msg = NULL;
                 } else                  } else
                         (*msg)->msg_len ^= (*msg)->msg_len;                          (*msg)->msg_len ^= (*msg)->msg_len;
Line 132  mqtt_msgFree(mqtt_msg_t ** __restrict msg, int all) Line 110  mqtt_msgFree(mqtt_msg_t ** __restrict msg, int all)
  * @len = >0 Allocate buffer with length   * @len = >0 Allocate buffer with length
  * return: NULL error or Message, after use must call mqtt_msgFree() with all!=0   * return: NULL error or Message, after use must call mqtt_msgFree() with all!=0
  */   */
inline mqtt_msg_t *mqtt_msg_t *
mqtt_msgAlloc(u_short len)mqtt_msgAlloc(u_int len)
 {  {
         mqtt_msg_t *m = NULL;          mqtt_msg_t *m = NULL;
   
        m = malloc(sizeof(mqtt_msg_t));        m = e_malloc(sizeof(mqtt_msg_t));
         if (!m) {          if (!m) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
Line 146  mqtt_msgAlloc(u_short len) Line 124  mqtt_msgAlloc(u_short len)
   
         if (len) {          if (len) {
                 m->msg_len = len;                  m->msg_len = len;
                m->msg_base = malloc(m->msg_len);                m->msg_base = e_malloc(m->msg_len);
                 if (!m->msg_base) {                  if (!m->msg_base) {
                         LOGERR;                          LOGERR;
                        free(m);                        e_free(m);
                         return NULL;                          return NULL;
                 } else                  } else
                         memset(m->msg_base, 0, m->msg_len);                          memset(m->msg_base, 0, m->msg_len);
Line 165  mqtt_msgAlloc(u_short len) Line 143  mqtt_msgAlloc(u_short len)
  * @len = new length   * @len = new length
  * return: -1 error or >-1 old buffer length   * return: -1 error or >-1 old buffer length
  */   */
inline intint
mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short len)mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_int len)
 {  {
         void *p = NULL;          void *p = NULL;
         int ret = 0;          int ret = 0;
Line 177  mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l Line 155  mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l
         if (len <= msg->msg_len)          if (len <= msg->msg_len)
                 return len;                  return len;
   
        p = realloc(msg->msg_base, len);        p = e_realloc(msg->msg_base, len);
         if (!p) {          if (!p) {
                 LOGERR;                  LOGERR;
                 return -1;                  return -1;
Line 191  mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l Line 169  mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l
 }  }
   
 /*  /*
    * mqtt_msgDup() - Duplicate message buffer
    *
    * @msg = Message
    * return: NULL error or !=NULL duplicated message, after use must call mqtt_msgFree() with all!=0
    */
   mqtt_msg_t *
   mqtt_msgDup(mqtt_msg_t * __restrict msg)
   {
           mqtt_msg_t *m = NULL;
   
           m = e_malloc(sizeof(mqtt_msg_t));
           if (!m) {
                   LOGERR;
                   return NULL;
           } else
                   memset(m, 0, sizeof(mqtt_msg_t));
   
           if (msg->msg_len) {
                   m->msg_len = msg->msg_len;
                   m->msg_base = e_malloc(m->msg_len);
                   if (!m->msg_base) {
                           LOGERR;
                           e_free(m);
                           return NULL;
                   } else
                           memcpy(m->msg_base, msg->msg_base, m->msg_len);
           }
   
           return m;
   }
   
   /*
  * mqtt_encodeLen() Encode number to MQTT length field   * mqtt_encodeLen() Encode number to MQTT length field
  *   *
  * @num = number for encode   * @num = number for encode
  * return: -1 error or >-1 length   * return: -1 error or >-1 length
  */   */
inline u_intu_int
 mqtt_encodeLen(u_int num)  mqtt_encodeLen(u_int num)
 {  {
         register u_int dig, i;          register u_int dig, i;
         u_int ret = 0;          u_int ret = 0;
   
        if (num > 268435455)        if (num > MQTT_DATA_MAX)
                 return (u_int) -1;                  return (u_int) -1;
   
         for (i = 0; i < sizeof ret && num > 0; i++) {          for (i = 0; i < sizeof ret && num > 0; i++) {
Line 224  mqtt_encodeLen(u_int num) Line 234  mqtt_encodeLen(u_int num)
  * @n = sizeof bytes, if !=NULL   * @n = sizeof bytes, if !=NULL
  * return: -1 error, >-1 length of message   * return: -1 error, >-1 length of message
  */   */
inline u_intu_int
 mqtt_decodeLen(void * __restrict len, int * __restrict n)  mqtt_decodeLen(void * __restrict len, int * __restrict n)
 {  {
         register u_int i, dig, mul;          register u_int i, dig, mul;
Line 244  mqtt_decodeLen(void * __restrict len, int * __restrict Line 254  mqtt_decodeLen(void * __restrict len, int * __restrict
   
         if (n)          if (n)
                 *n = (char) (i & 0x7f) + 1;                  *n = (char) (i & 0x7f) + 1;
   
         return ret;          return ret;
 }  }
   
Line 253  mqtt_decodeLen(void * __restrict len, int * __restrict Line 264  mqtt_decodeLen(void * __restrict len, int * __restrict
  * @len = length   * @len = length
  * return: -1 error, >-1 sizeof len in bytes   * return: -1 error, >-1 sizeof len in bytes
  */   */
inline charchar
 mqtt_sizeLen(u_int len)  mqtt_sizeLen(u_int len)
 {  {
         register char i;          register char i;
Line 270  mqtt_sizeLen(u_int len) Line 281  mqtt_sizeLen(u_int len)
 }  }
   
 /*  /*
    * mqtt_pktLen() - Get total packet length
    *
    * @hdr = MQTT packet header
    * return: packet length
    */
   u_int
   mqtt_pktLen(struct mqtthdr * __restrict hdr)
   {
           int siz, n = 0;
   
           if (!hdr)
                   return 0;
   
           siz = mqtt_decodeLen(hdr->mqtt_len, &n);
           siz += sizeof(struct mqtthdr) + n - 1;
   
           return siz;
   }
   
   /*
  * mqtt_str2subs Create MQTT subscribe variable from string(s)   * mqtt_str2subs Create MQTT subscribe variable from string(s)
  *   *
  * @csStr = null terminated string array   * @csStr = null terminated string array
 * @strnum = copy at most number of strings elements * @strnum = copy at most number of strings elements, ==0 till NULL element
  * @qoses = QoS elements applied to subscribe variable,    * @qoses = QoS elements applied to subscribe variable, 
  *              count of elements must be equal with csStr elements   *              count of elements must be equal with csStr elements
  * return: NULL error or != subscribe variables array, must be free after use with mqtt_freeSub()   * return: NULL error or != subscribe variables array, must be free after use with mqtt_freeSub()
  */   */
inline mqtt_subscr_t *mqtt_subscr_t *
mqtt_str2subs(const char **csStr, u_short strnum, u_char *qoses)mqtt_strs2subs(const char **csStr, u_short strnum, u_char *qoses)
 {  {
         mqtt_subscr_t *v;          mqtt_subscr_t *v;
         register int i, items;          register int i, items;
Line 292  mqtt_str2subs(const char **csStr, u_short strnum, u_ch Line 323  mqtt_str2subs(const char **csStr, u_short strnum, u_ch
                         (!strnum || (strnum && items < strnum)) && *strs;                           (!strnum || (strnum && items < strnum)) && *strs; 
                         items++, strs++);                          items++, strs++);
   
        if (!(v = malloc((items + 1) * sizeof(mqtt_subscr_t)))) {        if (!(v = e_malloc((items + 1) * sizeof(mqtt_subscr_t)))) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
         } else          } else
Line 300  mqtt_str2subs(const char **csStr, u_short strnum, u_ch Line 331  mqtt_str2subs(const char **csStr, u_short strnum, u_ch
   
         for (i = 0; i < items; i++) {          for (i = 0; i < items; i++) {
                 v[i].sub_topic.msg_len = strlen(csStr[i]);                  v[i].sub_topic.msg_len = strlen(csStr[i]);
                v[i].sub_topic.msg_base = (u_char*) strdup(csStr[i]);                v[i].sub_topic.msg_base = (u_char*) e_strdup(csStr[i]);
                 if (qoses && qoses[i] < MQTT_QOS_RESERVED)                  if (qoses && qoses[i] < MQTT_QOS_RESERVED)
                        v[i].sub_ret = qoses[i];                        v[i].sub_qos = qoses[i];
         }          }
   
         return v;          return v;
Line 314  mqtt_str2subs(const char **csStr, u_short strnum, u_ch Line 345  mqtt_str2subs(const char **csStr, u_short strnum, u_ch
  * @subs = Subscribe variables   * @subs = Subscribe variables
  * return: none   * return: none
  */   */
inline voidvoid
 mqtt_subFree(mqtt_subscr_t ** __restrict subs)  mqtt_subFree(mqtt_subscr_t ** __restrict subs)
 {  {
         mqtt_subscr_t *v;          mqtt_subscr_t *v;
Line 323  mqtt_subFree(mqtt_subscr_t ** __restrict subs) Line 354  mqtt_subFree(mqtt_subscr_t ** __restrict subs)
                 return;                  return;
   
         for (v = *subs; v->sub_topic.msg_base; v++) {          for (v = *subs; v->sub_topic.msg_base; v++) {
                free(v->sub_topic.msg_base);                e_free(v->sub_topic.msg_base);
                 v->sub_topic.msg_base = NULL;                  v->sub_topic.msg_base = NULL;
                 v->sub_topic.msg_len = 0;                  v->sub_topic.msg_len = 0;
   
                 if (v->sub_value.msg_base) {                  if (v->sub_value.msg_base) {
                        free(v->sub_value.msg_base);                        e_free(v->sub_value.msg_base);
                         v->sub_value.msg_base = NULL;                          v->sub_value.msg_base = NULL;
                         v->sub_value.msg_len = 0;                          v->sub_value.msg_len = 0;
                 }                  }
         }          }
   
        free(*subs);        e_free(*subs);
         *subs = NULL;          *subs = NULL;
 }  }
   
Line 344  mqtt_subFree(mqtt_subscr_t ** __restrict subs) Line 375  mqtt_subFree(mqtt_subscr_t ** __restrict subs)
  * @num = Number of elements   * @num = Number of elements
  * return: NULL error or subscribe array, after use must call mqtt_subFree()   * return: NULL error or subscribe array, after use must call mqtt_subFree()
  */   */
inline mqtt_subscr_t *mqtt_subscr_t *
 mqtt_subAlloc(u_short num)  mqtt_subAlloc(u_short num)
 {  {
         mqtt_subscr_t *s = NULL;          mqtt_subscr_t *s = NULL;
   
        s = malloc((num + 1) * sizeof(mqtt_subscr_t));        s = e_malloc((num + 1) * sizeof(mqtt_subscr_t));
         if (!s) {          if (!s) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
Line 366  mqtt_subAlloc(u_short num) Line 397  mqtt_subAlloc(u_short num)
  * @num = Number of elements   * @num = Number of elements
  * return: NULL error or subscribe array, after use must call mqtt_subFree()   * return: NULL error or subscribe array, after use must call mqtt_subFree()
  */   */
inline mqtt_subscr_t *mqtt_subscr_t *
 mqtt_subRealloc(mqtt_subscr_t ** __restrict subs, u_short num)  mqtt_subRealloc(mqtt_subscr_t ** __restrict subs, u_short num)
 {  {
        mqtt_subscr_t *s = NULL;        mqtt_subscr_t *ss, *s = NULL;
         register int i;
   
         if (!subs)          if (!subs)
                 return NULL;                  return NULL;
   
        s = realloc(*subs, (num + 1) * sizeof(mqtt_subscr_t));        for (i = 0, ss = *subs; ss; i++, ss++);
         if (i < num)
                 return NULL;
         if (i == num)
                 return *subs;
 
         s = e_realloc(*subs, (num + 1) * sizeof(mqtt_subscr_t));
         if (!s) {          if (!s) {
                 LOGERR;                  LOGERR;
                 return NULL;                  return NULL;
Line 393  mqtt_subRealloc(mqtt_subscr_t ** __restrict subs, u_sh Line 431  mqtt_subRealloc(mqtt_subscr_t ** __restrict subs, u_sh
  * @src = source subscription   * @src = source subscription
  * return: =NULL error or !=NULL successful copied a structure   * return: =NULL error or !=NULL successful copied a structure
  */   */
inline mqtt_subscr_t *mqtt_subscr_t *
 mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subscr_t * __restrict src)  mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subscr_t * __restrict src)
 {  {
         if (!dst || !src)          if (!dst || !src)
                 return NULL;                  return NULL;
   
         if (src->sub_topic.msg_base) {          if (src->sub_topic.msg_base) {
                dst->sub_topic.msg_base = malloc(src->sub_topic.msg_len + 1);                dst->sub_topic.msg_base = e_malloc(src->sub_topic.msg_len + 1);
                 if (!dst->sub_topic.msg_base) {                  if (!dst->sub_topic.msg_base) {
                         LOGERR;                          LOGERR;
                         memset(dst, 0, sizeof(mqtt_subscr_t));                          memset(dst, 0, sizeof(mqtt_subscr_t));
Line 411  mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subs Line 449  mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subs
                         memcpy(dst->sub_topic.msg_base, src->sub_topic.msg_base,                           memcpy(dst->sub_topic.msg_base, src->sub_topic.msg_base, 
                                         dst->sub_topic.msg_len);                                          dst->sub_topic.msg_len);
                 }                  }
           } else {
                   dst->sub_topic.msg_base = NULL;
                   dst->sub_topic.msg_len = 0;
         }          }
   
         if (src->sub_value.msg_base) {          if (src->sub_value.msg_base) {
                dst->sub_value.msg_base = malloc(src->sub_value.msg_len + 1);                dst->sub_value.msg_base = e_malloc(src->sub_value.msg_len + 1);
                 if (!dst->sub_value.msg_base) {                  if (!dst->sub_value.msg_base) {
                         LOGERR;                          LOGERR;
                         if (dst->sub_topic.msg_base)                          if (dst->sub_topic.msg_base)
                                free(dst->sub_topic.msg_base);                                e_free(dst->sub_topic.msg_base);
                         memset(dst, 0, sizeof(mqtt_subscr_t));                          memset(dst, 0, sizeof(mqtt_subscr_t));
                         return NULL;                          return NULL;
                 } else {                  } else {
Line 426  mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subs Line 468  mqtt_subCopy(mqtt_subscr_t * __restrict dst, mqtt_subs
                         memcpy(dst->sub_value.msg_base, src->sub_value.msg_base,                           memcpy(dst->sub_value.msg_base, src->sub_value.msg_base, 
                                         dst->sub_value.msg_len);                                          dst->sub_value.msg_len);
                 }                  }
           } else {
                   dst->sub_value.msg_base = NULL;
                   dst->sub_value.msg_len = 0;
         }          }
   
        dst->sub_ret = src->sub_ret;        dst->sub_qos = src->sub_qos;
         return dst;          return dst;
 }  }
   
Line 590  mqtt_sqlTopic(const char *csInput, char * __restrict p Line 635  mqtt_sqlTopic(const char *csInput, char * __restrict p
                 *s = *pos;                  *s = *pos;
         }          }
   
         return ret;  
 }  
   
   
 /*  
  * mqtt_KeepAlive() - Keep Alive check routine  
  *  
  * @sock = connected socket  
  * @ka = keep alive timeout  
  * @tries = tries for receive correct ping response, usually ==1  
  * return: -1 error, 0 host is alive, 1 timeout session or 2 broken session  
  */  
 int  
 mqtt_KeepAlive(int sock, u_short ka, u_char tries)  
 {  
         int ret = 0;  
         struct pollfd pfd;  
         mqtt_msg_t msg = { NULL, 0 };  
   
         if (sock < 3)  
                 return -1;      /* error */  
   
         pfd.fd = sock;  
         pfd.events = POLLOUT;  
         if ((ret = poll(&pfd, 1, ka * 1000)) == -1 ||   
                         pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {  
                 LOGERR;  
                 return -1;      /* error */  
         } else if (!ret)  
                 return 1;       /* session is abandoned ... must be disconnect! */  
         /* ping request */  
         if ((ret = mqtt_msgPINGREQ(&msg)) == -1)  
                 return -1;      /* error */  
         if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {  
                 LOGERR;  
                 goto end;  
         }  
   
         pfd.events = POLLIN | POLLPRI;  
         while (tries--) {  
                 if ((ret = poll(&pfd, 1, ka * 1000)) == -1 ||   
                                 pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {  
                         LOGERR;  
                         break;  
                 } else if (!ret) {  
                         ret = 1;        /* session is abandoned ... must be disconnect! */  
                         continue;  
                 }  
                 /* receive & decode packet */  
                 if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) {  
                         LOGERR;  
                         break;  
                 }  
                 if (!mqtt_readPINGRESP(&msg)) {  
                         ret = 0;        /* Host is alive */  
                         break;  
                 } else  
                         ret = 2;        /* Session is broken ... must be disconnect! */  
         }  
 end:  
         free(msg.msg_base);  
         return ret;          return ret;
 }  }

Removed from v.1.1.1.1.2.11  
changed lines
  Added in v.1.3.4.5


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>