--- mqtt/src/mqtt_pub.c 2012/01/05 10:01:20 1.1.2.5 +++ mqtt/src/mqtt_pub.c 2012/05/01 22:30:31 1.2.2.4 @@ -51,94 +51,68 @@ static int Publish(int sock) { int siz = 0; - struct pollfd pfd; - struct mqtthdr *hdr; siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup, - args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value)); + args->QoS, args->Retain, AIT_ADDR(&args->Value), AIT_LEN(&args->Value)); if (siz == -1) { printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); return -1; } - - siz = send(sock, args->msg->msg_base, siz, 0); - if (siz == -1) { - printf("Error:: send() #%d - %s\n", errno, strerror(errno)); + if (SendTo(sock, siz) == -1) return -1; - } else - ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz); - /* QoS == MQTT_QOS_ONCE, no wait for reply */ if (args->QoS == MQTT_QOS_ONCE) return 0; - pfd.fd = sock; - pfd.events = POLLIN | POLLPRI; - switch (poll(&pfd, 1, args->ka * 1000)) { - case -1: - printf("Error:: poll() #%d - %s\n", errno, strerror(errno)); - return -1; - case 0: - ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); - return -1; - } - if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) + if ((siz = RecvFrom(sock)) == -1 || !siz) return -1; - - siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0); - if (siz == -1) { - printf("Error:: recv() #%d - %s\n", errno, strerror(errno)); - return -1; - } else - ioVERBOSE(3) printf("Received %d bytes\n", siz); - if (!siz) - return -1; - /* QoS == MQTT_QOS_ACK, wait for PUBACK */ - if (args->QoS == MQTT_QOS_ACK) - return mqtt_readPUBACK(args->msg); + if (args->QoS == MQTT_QOS_ACK) { + siz = mqtt_readPUBACK(args->msg); + if (siz == args->MsgID) + return siz; + if (!args->Dup) { + args->Dup++; + return Publish(sock); + } + goto end; + } + /* QoS == MQTT_QOS_EXACTLY */ - if (mqtt_readPUBREC(args->msg) != args->MsgID) { - printf("Error:: Message not delivered\n"); - return -1; + siz = mqtt_readPUBREC(args->msg); + if (siz != args->MsgID) { + if (!args->Dup) { + args->Dup++; + return Publish(sock); + } + goto end; } - siz = mqtt_msgPUBREL(args->msg, args->MsgID); - if (siz == -1) { - printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); - return -1; - } - - siz = send(sock, args->msg->msg_base, siz, 0); - if (siz == -1) { - printf("Error:: send() #%d - %s\n", errno, strerror(errno)); - return -1; - } else - ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz); - - pfd.events = POLLIN | POLLPRI; - switch (poll(&pfd, 1, args->ka * 1000)) { - case -1: - printf("Error:: poll() #%d - %s\n", errno, strerror(errno)); + do { + siz = mqtt_msgPUBREL(args->msg, args->MsgID); + if (siz == -1) { + printf("Error:: msgPUBREL #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); return -1; - case 0: - ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); + } + if (SendTo(sock, siz) == -1) return -1; - } - if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) - return -1; - siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0); - if (siz == -1) { - printf("Error:: recv() #%d - %s\n", errno, strerror(errno)); - return -1; - } else - ioVERBOSE(3) printf("Received %d bytes\n", siz); - if (!siz) - return -1; + if ((siz = RecvFrom(sock)) == -1 || !siz) + return -1; - return mqtt_readPUBCOMP(args->msg); + siz = mqtt_readPUBCOMP(args->msg); + if (siz == args->MsgID) + return siz; + if (!args->Dup) { + args->Dup++; + continue; + } + } while (0); + +end: + printf("Error:: Message not delivered\n"); + return -1; } @@ -148,7 +122,7 @@ main(int argc, char **argv) char ch; ait_val_t val; u_short port = atoi(MQTT_PORT); - int sock, ret = 0; + int ret = 0; if (!(args = malloc(sizeof(struct tagArgs)))) { printf("Error:: in alloc arguments #%d - %s\n", errno, strerror(errno)); @@ -252,22 +226,23 @@ main(int argc, char **argv) } if (args->QoS && !args->MsgID) args->MsgID = MQTT_DEFAULT_MSGID; - ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr)); + printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr)); + AIT_FREE_VAL(&val); - if ((sock = InitClient()) == -1) { + if (!(args->cli = mqtt_cli_Open(&args->addr.sa))) { args->free(args); free(args); return 2; } if (args->isFile && !OpenFile()) { + mqtt_cli_Close(&args->cli); args->free(args); free(args); return 3; } - printf("Connected ... "); - switch ((ret = ConnectClient(sock))) { + switch ((ret = ConnectClient(args->cli->sock))) { case -1: printf(">> FAILED!\n"); break; @@ -292,12 +267,11 @@ main(int argc, char **argv) } if (ret == MQTT_RETCODE_ACCEPTED) { - ret = !(Publish(sock) == args->MsgID); - CloseClient(sock); - } else { - close(sock); + ret = !(Publish(args->cli->sock) == args->MsgID); + } else ret = 4; - } + + mqtt_cli_Close(&args->cli); CloseFile(); args->free(args);