Diff for /mqtt/src/mqtt_pub.c between versions 1.1.2.5 and 1.1.2.8

version 1.1.2.5, 2012/01/05 10:01:20 version 1.1.2.8, 2012/01/24 10:18:45
Line 51  static int Line 51  static int
 Publish(int sock)  Publish(int sock)
 {  {
         int siz = 0;          int siz = 0;
         struct pollfd pfd;  
         struct mqtthdr *hdr;  
   
         siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup,           siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup, 
                         args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value));                          args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value));
Line 60  Publish(int sock) Line 58  Publish(int sock)
                 printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                  printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 return -1;                  return -1;
         }          }
        if (SendTo(sock, siz) == -1)
        siz = send(sock, args->msg->msg_base, siz, 0); 
        if (siz == -1) { 
                printf("Error:: send() #%d - %s\n", errno, strerror(errno)); 
                 return -1;                  return -1;
         } else  
                 ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz);  
   
         /* QoS == MQTT_QOS_ONCE, no wait for reply */          /* QoS == MQTT_QOS_ONCE, no wait for reply */
         if (args->QoS == MQTT_QOS_ONCE)          if (args->QoS == MQTT_QOS_ONCE)
                 return 0;                  return 0;
   
        pfd.fd = sock;        if ((siz = RecvFrom(sock)) == -1 || !siz)
        pfd.events = POLLIN | POLLPRI; 
        switch (poll(&pfd, 1, args->ka * 1000)) { 
                case -1: 
                        printf("Error:: poll() #%d - %s\n", errno, strerror(errno)); 
                        return -1; 
                case 0: 
                        ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); 
                        return -1; 
        } 
        if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) 
                 return -1;                  return -1;
   
         siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0);  
         if (siz == -1) {  
                 printf("Error:: recv() #%d - %s\n", errno, strerror(errno));  
                 return -1;  
         } else  
                 ioVERBOSE(3) printf("Received %d bytes\n", siz);  
         if (!siz)  
                 return -1;  
   
         /* QoS == MQTT_QOS_ACK, wait for PUBACK */          /* QoS == MQTT_QOS_ACK, wait for PUBACK */
        if (args->QoS == MQTT_QOS_ACK)        if (args->QoS == MQTT_QOS_ACK) {
                return mqtt_readPUBACK(args->msg);                siz = mqtt_readPUBACK(args->msg);
                 if (siz == args->MsgID)
                         return siz;
                 if (!args->Dup) {
                         args->Dup++;
                         return Publish(sock);
                 }
                 goto end;
         }
   
   
         /* QoS == MQTT_QOS_EXACTLY */          /* QoS == MQTT_QOS_EXACTLY */
        if (mqtt_readPUBREC(args->msg) != args->MsgID) {        siz = mqtt_readPUBREC(args->msg);
                printf("Error:: Message not delivered\n");        if (siz != args->MsgID) {
                return -1;                if (!args->Dup) {
                         args->Dup++;
                         return Publish(sock);
                 }
                 goto end;
         }          }
   
         siz = mqtt_msgPUBREL(args->msg, args->MsgID);          siz = mqtt_msgPUBREL(args->msg, args->MsgID);
         if (siz == -1) {          if (siz == -1) {
                printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                printf("Error:: msgPUBREL #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 return -1;                  return -1;
         }          }
           if (SendTo(sock, siz) == -1)
                   return -1;
   
        siz = send(sock, args->msg->msg_base, siz, 0);        if ((siz = RecvFrom(sock)) == -1 || !siz)
        if (siz == -1) { 
                printf("Error:: send() #%d - %s\n", errno, strerror(errno)); 
                 return -1;                  return -1;
         } else  
                 ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz);  
   
        pfd.events = POLLIN | POLLPRI;        siz = mqtt_readPUBCOMP(args->msg);
        switch (poll(&pfd, 1, args->ka * 1000)) {        if (siz == args->MsgID)
                case -1:                return siz;
                        printf("Error:: poll() #%d - %s\n", errno, strerror(errno));        if (!args->Dup) {
                        return -1;                args->Dup++;
                case 0:                return Publish(sock);
                        ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); 
                        return -1; 
         }          }
         if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))  
                 return -1;  
   
        siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0);end:
        if (siz == -1) {        printf("Error:: Message not delivered\n");
                printf("Error:: recv() #%d - %s\n", errno, strerror(errno));        return -1;
                return -1; 
        } else 
                ioVERBOSE(3) printf("Received %d bytes\n", siz); 
        if (!siz) 
                return -1; 
 
        return mqtt_readPUBCOMP(args->msg); 
 }  }
   
   
Line 267  main(int argc, char **argv) Line 239  main(int argc, char **argv)
         }          }
   
         printf("Connected ... ");          printf("Connected ... ");
           ConnectClient(sock); ConnectClient(sock);
         switch ((ret = ConnectClient(sock))) {          switch ((ret = ConnectClient(sock))) {
                 case -1:                  case -1:
                         printf(">> FAILED!\n");                          printf(">> FAILED!\n");
Line 290  main(int argc, char **argv) Line 263  main(int argc, char **argv)
                         printf(">> DENIED.\n");                          printf(">> DENIED.\n");
                         break;                          break;
         }          }
           sleep(1);
   
         if (ret == MQTT_RETCODE_ACCEPTED) {          if (ret == MQTT_RETCODE_ACCEPTED) {
                 ret = !(Publish(sock) == args->MsgID);                  ret = !(Publish(sock) == args->MsgID);

Removed from v.1.1.2.5  
changed lines
  Added in v.1.1.2.8


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>