Diff for /libaitmqtt/src/cliside.c between versions 1.1.2.3 and 1.1.2.6

version 1.1.2.3, 2012/05/01 01:12:16 version 1.1.2.6, 2012/05/09 13:48:31
Line 50  SUCH DAMAGE. Line 50  SUCH DAMAGE.
  * mqtt_cli_Open() - Open client connection to MQTT broker   * mqtt_cli_Open() - Open client connection to MQTT broker
  *   *
  * @addr = brokers address   * @addr = brokers address
    * @timeout = timeout
  * return: NULL error or !=NULL connected to broker   * return: NULL error or !=NULL connected to broker
  */   */
 mqtt_cli_t *  mqtt_cli_t *
mqtt_cli_Open(struct sockaddr *addr)mqtt_cli_Open(struct sockaddr *addr, u_short timeout)
 {  {
         mqtt_cli_t *cli;          mqtt_cli_t *cli;
   
Line 67  mqtt_cli_Open(struct sockaddr *addr) Line 68  mqtt_cli_Open(struct sockaddr *addr)
         } else          } else
                 memset(cli, 0, sizeof(mqtt_cli_t));                  memset(cli, 0, sizeof(mqtt_cli_t));
   
           cli->timeout = timeout;
         cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);          cli->sock = socket(addr->sa_family, SOCK_STREAM, IPPROTO_TCP);
         if (cli->sock == -1) {          if (cli->sock == -1) {
                 LOGERR;                  LOGERR;
Line 117  mqtt_cli_Close(mqtt_cli_t ** __restrict cli) Line 119  mqtt_cli_Close(mqtt_cli_t ** __restrict cli)
   
         free(*cli);          free(*cli);
         *cli = NULL;          *cli = NULL;
           return 0;
   }
   
   /*
    * mqtt_cli_Subscribe() - Subscribe to broker
    *
    * @cli = connected client
    * @Topics = Topics for subscribes
    * @msgID = Message ID
    * @Dup = Duplicated request
    * @QoS = Message QoS
    * return: NULL error or !=NULL allocated array with subscribed QoS responses, 
    *      must be free() result!
    */
   u_char *
   mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics, 
                   u_short msgID, u_char Dup, u_char QoS)
   {
           int siz = 0;
           u_short mid = 0;
           u_char *qoses = NULL;
   
           if (!cli || !Topics)
                   return NULL;
   
           /* send subscribe */
           siz = mqtt_msgSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
           if (siz == -1)
                   return NULL;
           siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
           if (siz == -1) {
                   LOGERR;
                   return NULL;
           }
   
           if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                   return NULL;
           } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
                   return NULL;
   
           /* receive suback */
           siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
           if (siz == -1) {
                   LOGERR;
                   return NULL;
           }
           siz = mqtt_readSUBACK(cli->buf, &mid, &qoses);
           if (siz == -1)
                   return NULL;
           if (msgID != mid) {
                   free(qoses);
                   mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, mid);
                   return NULL;
           }
   
           return qoses;
   }
   
   /*
    * mqtt_cli_Unsubscribe() - Unsubscribe from broker
    *
    * @cli = connected client
    * @Topics = Topics for unsubscribes
    * @msgID = Message ID
    * @Dup = Duplicated request
    * @QoS = Message QoS
    * return: -1 error or 0 ok
    */
   int
   mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics, 
                   u_short msgID, u_char Dup, u_char QoS)
   {
           int siz = 0;
   
           if (!cli || !Topics)
                   return -1;
   
           /* send unsubscribe */
           siz = mqtt_msgUNSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS);
           if (siz == -1)
                   return -1;
           siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
           if (siz == -1) {
                   LOGERR;
                   return -1;
           }
   
           if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                   return -1;
           } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
                   return -1;
   
           /* receive unsuback */
           siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
           if (siz == -1) {
                   LOGERR;
                   return -1;
           }
           siz = mqtt_readUNSUBACK(cli->buf);
           if (siz == -1)
                   return -1;
           if (msgID != siz) {
                   mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, siz);
                   return -1;
           }
   
           return 0;
   }
   
   /*
    * mqtt_cli_Publish() - Publish message to broker
    *
    * @cli = connected client
    * @msgID = Message ID
    * @Dup = Duplicated request
    * @QoS = Message QoS
    * @Retain = Retain message
    * @csTopic = Topic
    * @pData = Data
    * @datLen = Data length
    * return: -1 error or 0 ok
    */
   int
   mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short msgID, u_char Dup, u_char QoS, u_char Retain, 
                   const char *csTopic, const void *pData, int datLen)
   {
           int siz = 0;
   
           if (!cli || !csTopic)
                   return -1;
   
           /* send publish */
           siz = mqtt_msgPUBLISH(cli->buf, csTopic, msgID, Dup, QoS, Retain, pData, datLen);
           if (siz == -1)
                   return -1;
           siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
           if (siz == -1) {
                   LOGERR;
                   return -1;
           }
   
           if (QoS == MQTT_QOS_ONCE)       /* no reply */
                   goto end;
   
           if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                   return -1;
           } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1))
                   return -1;
   
           /* receive PUBxxx */
           siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0);
           if (siz == -1) {
                   LOGERR;
                   return -1;
           }
   
           if (QoS == MQTT_QOS_ACK) {      /* reply with PUBACK */
                   siz = mqtt_readPUBACK(cli->buf);
                   if (siz == -1)
                           return -1;
                   if (msgID != siz) {
                           mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, siz);
                           return -1;
                   }
                   goto end;
           } else {                        /* reply with PUBREC */
                   siz = mqtt_readPUBREC(cli->buf);
                   if (siz == -1)
                           return -1;
                   if (msgID != siz) {
                           mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, siz);
                           return -1;
                   }
           }
   
           do {
                   /* send publish release QoS == 2 */
                   siz = mqtt_msgPUBREL(cli->buf, msgID);
                   if (siz == -1)
                           return -1;
                   siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL);
                   if (siz == -1) {
                           LOGERR;
                           return -1;
                   }
   
                   if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) {
                           return -1;
                   } else if (siz && mqtt_KeepAlive(cli->sock, cli->timeout, 1)) {
                           if (Dup++ > 1)
                                   return -1;
                           else
                                   continue;
                   }
   
                   /* receive PUBCOMP */
                   siz = mqtt_readPUBCOMP(cli->buf);
                   if (siz == -1)
                           return -1;
                   if (msgID != siz) {
                           mqtt_SetErr(EBADMSG, "Receive different message ID %hu != %hu", msgID, siz);
                           if (Dup++ > 1)
                                   return -1;
                           else
                                   continue;
                   }
           } while (0);
   
   end:
         return 0;          return 0;
 }  }

Removed from v.1.1.2.3  
changed lines
  Added in v.1.1.2.6


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>