--- mqtt/src/mqtt_pub.c 2012/01/05 10:26:02 1.1.2.6 +++ mqtt/src/mqtt_pub.c 2012/01/24 10:18:45 1.1.2.8 @@ -67,14 +67,26 @@ Publish(int sock) if ((siz = RecvFrom(sock)) == -1 || !siz) return -1; /* QoS == MQTT_QOS_ACK, wait for PUBACK */ - if (args->QoS == MQTT_QOS_ACK) - return mqtt_readPUBACK(args->msg); + if (args->QoS == MQTT_QOS_ACK) { + siz = mqtt_readPUBACK(args->msg); + if (siz == args->MsgID) + return siz; + if (!args->Dup) { + args->Dup++; + return Publish(sock); + } + goto end; + } /* QoS == MQTT_QOS_EXACTLY */ - if (mqtt_readPUBREC(args->msg) != args->MsgID) { - printf("Error:: Message not delivered\n"); - return -1; + siz = mqtt_readPUBREC(args->msg); + if (siz != args->MsgID) { + if (!args->Dup) { + args->Dup++; + return Publish(sock); + } + goto end; } siz = mqtt_msgPUBREL(args->msg, args->MsgID); @@ -87,7 +99,18 @@ Publish(int sock) if ((siz = RecvFrom(sock)) == -1 || !siz) return -1; - return mqtt_readPUBCOMP(args->msg); + + siz = mqtt_readPUBCOMP(args->msg); + if (siz == args->MsgID) + return siz; + if (!args->Dup) { + args->Dup++; + return Publish(sock); + } + +end: + printf("Error:: Message not delivered\n"); + return -1; } @@ -216,6 +239,7 @@ main(int argc, char **argv) } printf("Connected ... "); + ConnectClient(sock); ConnectClient(sock); switch ((ret = ConnectClient(sock))) { case -1: printf(">> FAILED!\n"); @@ -239,6 +263,7 @@ main(int argc, char **argv) printf(">> DENIED.\n"); break; } + sleep(1); if (ret == MQTT_RETCODE_ACCEPTED) { ret = !(Publish(sock) == args->MsgID);