Diff for /mqtt/src/mqtt_pub.c between versions 1.1.2.5 and 1.1.2.9

version 1.1.2.5, 2012/01/05 10:01:20 version 1.1.2.9, 2012/01/24 16:28:28
Line 51  static int Line 51  static int
 Publish(int sock)  Publish(int sock)
 {  {
         int siz = 0;          int siz = 0;
         struct pollfd pfd;  
         struct mqtthdr *hdr;  
   
         siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup,           siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup, 
                         args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value));                          args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value));
Line 60  Publish(int sock) Line 58  Publish(int sock)
                 printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                  printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 return -1;                  return -1;
         }          }
        if (SendTo(sock, siz) == -1)
        siz = send(sock, args->msg->msg_base, siz, 0); 
        if (siz == -1) { 
                printf("Error:: send() #%d - %s\n", errno, strerror(errno)); 
                 return -1;                  return -1;
         } else  
                 ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz);  
   
         /* QoS == MQTT_QOS_ONCE, no wait for reply */          /* QoS == MQTT_QOS_ONCE, no wait for reply */
         if (args->QoS == MQTT_QOS_ONCE)          if (args->QoS == MQTT_QOS_ONCE)
                 return 0;                  return 0;
   
        pfd.fd = sock;        if ((siz = RecvFrom(sock)) == -1 || !siz)
        pfd.events = POLLIN | POLLPRI; 
        switch (poll(&pfd, 1, args->ka * 1000)) { 
                case -1: 
                        printf("Error:: poll() #%d - %s\n", errno, strerror(errno)); 
                        return -1; 
                case 0: 
                        ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); 
                        return -1; 
        } 
        if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) 
                 return -1;                  return -1;
   
         siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0);  
         if (siz == -1) {  
                 printf("Error:: recv() #%d - %s\n", errno, strerror(errno));  
                 return -1;  
         } else  
                 ioVERBOSE(3) printf("Received %d bytes\n", siz);  
         if (!siz)  
                 return -1;  
   
         /* QoS == MQTT_QOS_ACK, wait for PUBACK */          /* QoS == MQTT_QOS_ACK, wait for PUBACK */
        if (args->QoS == MQTT_QOS_ACK)        if (args->QoS == MQTT_QOS_ACK) {
                return mqtt_readPUBACK(args->msg);                siz = mqtt_readPUBACK(args->msg);
                 if (siz == args->MsgID)
                         return siz;
                 if (!args->Dup) {
                         args->Dup++;
                         return Publish(sock);
                 }
                 goto end;
         }
   
   
         /* QoS == MQTT_QOS_EXACTLY */          /* QoS == MQTT_QOS_EXACTLY */
        if (mqtt_readPUBREC(args->msg) != args->MsgID) {        siz = mqtt_readPUBREC(args->msg);
                printf("Error:: Message not delivered\n");        if (siz != args->MsgID) {
                return -1;                if (!args->Dup) {
                         args->Dup++;
                         return Publish(sock);
                 }
                 goto end;
         }          }
   
         siz = mqtt_msgPUBREL(args->msg, args->MsgID);          siz = mqtt_msgPUBREL(args->msg, args->MsgID);
         if (siz == -1) {          if (siz == -1) {
                printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                printf("Error:: msgPUBREL #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 return -1;                  return -1;
         }          }
           if (SendTo(sock, siz) == -1)
                   return -1;
   
        siz = send(sock, args->msg->msg_base, siz, 0);        if ((siz = RecvFrom(sock)) == -1 || !siz)
        if (siz == -1) { 
                printf("Error:: send() #%d - %s\n", errno, strerror(errno)); 
                 return -1;                  return -1;
         } else  
                 ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz);  
   
        pfd.events = POLLIN | POLLPRI;        siz = mqtt_readPUBCOMP(args->msg);
        switch (poll(&pfd, 1, args->ka * 1000)) {        if (siz == args->MsgID)
                case -1:                return siz;
                        printf("Error:: poll() #%d - %s\n", errno, strerror(errno));        if (!args->Dup) {
                        return -1;                args->Dup++;
                case 0:                return Publish(sock);
                        ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); 
                        return -1; 
         }          }
         if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))  
                 return -1;  
   
        siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0);end:
        if (siz == -1) {        printf("Error:: Message not delivered\n");
                printf("Error:: recv() #%d - %s\n", errno, strerror(errno));        return -1;
                return -1; 
        } else 
                ioVERBOSE(3) printf("Received %d bytes\n", siz); 
        if (!siz) 
                return -1; 
 
        return mqtt_readPUBCOMP(args->msg); 
 }  }
   
   

Removed from v.1.1.2.5  
changed lines
  Added in v.1.1.2.9


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>