--- mqtt/src/mqtt_pub.c 2012/01/05 10:01:20 1.1.2.5 +++ mqtt/src/mqtt_pub.c 2012/01/05 10:26:02 1.1.2.6 @@ -51,8 +51,6 @@ static int Publish(int sock) { int siz = 0; - struct pollfd pfd; - struct mqtthdr *hdr; siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup, args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value)); @@ -60,44 +58,19 @@ Publish(int sock) printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); return -1; } - - siz = send(sock, args->msg->msg_base, siz, 0); - if (siz == -1) { - printf("Error:: send() #%d - %s\n", errno, strerror(errno)); + if (SendTo(sock, siz) == -1) return -1; - } else - ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz); - /* QoS == MQTT_QOS_ONCE, no wait for reply */ if (args->QoS == MQTT_QOS_ONCE) return 0; - pfd.fd = sock; - pfd.events = POLLIN | POLLPRI; - switch (poll(&pfd, 1, args->ka * 1000)) { - case -1: - printf("Error:: poll() #%d - %s\n", errno, strerror(errno)); - return -1; - case 0: - ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); - return -1; - } - if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) + if ((siz = RecvFrom(sock)) == -1 || !siz) return -1; - - siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0); - if (siz == -1) { - printf("Error:: recv() #%d - %s\n", errno, strerror(errno)); - return -1; - } else - ioVERBOSE(3) printf("Received %d bytes\n", siz); - if (!siz) - return -1; - /* QoS == MQTT_QOS_ACK, wait for PUBACK */ if (args->QoS == MQTT_QOS_ACK) return mqtt_readPUBACK(args->msg); + /* QoS == MQTT_QOS_EXACTLY */ if (mqtt_readPUBREC(args->msg) != args->MsgID) { printf("Error:: Message not delivered\n"); @@ -106,38 +79,14 @@ Publish(int sock) siz = mqtt_msgPUBREL(args->msg, args->MsgID); if (siz == -1) { - printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); + printf("Error:: msgPUBREL #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); return -1; } - - siz = send(sock, args->msg->msg_base, siz, 0); - if (siz == -1) { - printf("Error:: send() #%d - %s\n", errno, strerror(errno)); + if (SendTo(sock, siz) == -1) return -1; - } else - ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz); - pfd.events = POLLIN | POLLPRI; - switch (poll(&pfd, 1, args->ka * 1000)) { - case -1: - printf("Error:: poll() #%d - %s\n", errno, strerror(errno)); - return -1; - case 0: - ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); - return -1; - } - if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) + if ((siz = RecvFrom(sock)) == -1 || !siz) return -1; - - siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0); - if (siz == -1) { - printf("Error:: recv() #%d - %s\n", errno, strerror(errno)); - return -1; - } else - ioVERBOSE(3) printf("Received %d bytes\n", siz); - if (!siz) - return -1; - return mqtt_readPUBCOMP(args->msg); }