--- mqtt/src/mqtt_pub.c 2011/12/29 14:13:13 1.1.2.4 +++ mqtt/src/mqtt_pub.c 2012/01/05 10:01:20 1.1.2.5 @@ -19,6 +19,7 @@ Usage(void) " Syntax: mqtt_pub [options] \n\n" "\t-f\t\t\t'value_for_publish' is file name instead text\n" "\t-q \t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n" + "\t-m \t\tMessage ID for publish message\n" "\t-d\t\t\tSend duplicate message\n" "\t-r\t\t\tRetain message from broker\n\n" "\t-C\t\t\tNot clear before connect!!!\n" @@ -50,8 +51,94 @@ static int Publish(int sock) { int siz = 0; + struct pollfd pfd; + struct mqtthdr *hdr; - return siz; + siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup, + args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value)); + if (siz == -1) { + printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); + return -1; + } + + siz = send(sock, args->msg->msg_base, siz, 0); + if (siz == -1) { + printf("Error:: send() #%d - %s\n", errno, strerror(errno)); + return -1; + } else + ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz); + + /* QoS == MQTT_QOS_ONCE, no wait for reply */ + if (args->QoS == MQTT_QOS_ONCE) + return 0; + + pfd.fd = sock; + pfd.events = POLLIN | POLLPRI; + switch (poll(&pfd, 1, args->ka * 1000)) { + case -1: + printf("Error:: poll() #%d - %s\n", errno, strerror(errno)); + return -1; + case 0: + ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); + return -1; + } + if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) + return -1; + + siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0); + if (siz == -1) { + printf("Error:: recv() #%d - %s\n", errno, strerror(errno)); + return -1; + } else + ioVERBOSE(3) printf("Received %d bytes\n", siz); + if (!siz) + return -1; + + /* QoS == MQTT_QOS_ACK, wait for PUBACK */ + if (args->QoS == MQTT_QOS_ACK) + return mqtt_readPUBACK(args->msg); + + /* QoS == MQTT_QOS_EXACTLY */ + if (mqtt_readPUBREC(args->msg) != args->MsgID) { + printf("Error:: Message not delivered\n"); + return -1; + } + + siz = mqtt_msgPUBREL(args->msg, args->MsgID); + if (siz == -1) { + printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); + return -1; + } + + siz = send(sock, args->msg->msg_base, siz, 0); + if (siz == -1) { + printf("Error:: send() #%d - %s\n", errno, strerror(errno)); + return -1; + } else + ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz); + + pfd.events = POLLIN | POLLPRI; + switch (poll(&pfd, 1, args->ka * 1000)) { + case -1: + printf("Error:: poll() #%d - %s\n", errno, strerror(errno)); + return -1; + case 0: + ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000); + return -1; + } + if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) + return -1; + + siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0); + if (siz == -1) { + printf("Error:: recv() #%d - %s\n", errno, strerror(errno)); + return -1; + } else + ioVERBOSE(3) printf("Received %d bytes\n", siz); + if (!siz) + return -1; + + return mqtt_readPUBCOMP(args->msg); } @@ -82,7 +169,7 @@ main(int argc, char **argv) AIT_SET_STR(&args->Pass, ""); args->ka = MQTT_KEEPALIVE; - while ((ch = getopt(argc, argv, "T:U:P:p:q:drCW:M:fvh")) != -1) + while ((ch = getopt(argc, argv, "T:U:P:p:q:drCW:M:m:fvh")) != -1) switch (ch) { case 'T': args->ka = (u_short) strtol(optarg, NULL, 0); @@ -103,6 +190,9 @@ main(int argc, char **argv) AIT_FREE_VAL(&args->Pass); AIT_SET_STR(&args->Pass, optarg); break; + case 'm': + args->MsgID = (u_short) strtol(optarg, NULL, 0); + break; case 'p': port = (u_short) strtol(optarg, NULL, 0); break; @@ -160,6 +250,8 @@ main(int argc, char **argv) Usage(); return 1; } + if (args->QoS && !args->MsgID) + args->MsgID = MQTT_DEFAULT_MSGID; ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr)); if ((sock = InitClient()) == -1) { @@ -200,8 +292,7 @@ main(int argc, char **argv) } if (ret == MQTT_RETCODE_ACCEPTED) { - ret = Publish(sock); - shutdown(sock, SHUT_RDWR); + ret = !(Publish(sock) == args->MsgID); CloseClient(sock); } else { close(sock);