Diff for /mqtt/src/mqtt_pub.c between versions 1.2 and 1.3

version 1.2, 2012/01/27 15:05:38 version 1.3, 2012/07/03 09:02:50
Line 19  Usage(void) Line 19  Usage(void)
                 " Syntax: mqtt_pub [options] <connect_to_broker[:port]> <ConnectID> <topic> <value_for_publish>\n\n"                  " Syntax: mqtt_pub [options] <connect_to_broker[:port]> <ConnectID> <topic> <value_for_publish>\n\n"
                 "\t-f\t\t\t'value_for_publish' is file name instead text\n"                  "\t-f\t\t\t'value_for_publish' is file name instead text\n"
                 "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"                  "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"
                 "\t-m <message_id>\t\tMessage ID for publish message\n"  
                 "\t-d\t\t\tSend duplicate message\n"                  "\t-d\t\t\tSend duplicate message\n"
                 "\t-r\t\t\tRetain message from broker\n\n"                  "\t-r\t\t\tRetain message from broker\n\n"
                 "\t-C\t\t\tNot clear before connect!!!\n"                  "\t-C\t\t\tNot clear before connect!!!\n"
Line 51  static int Line 50  static int
 Publish(int sock)  Publish(int sock)
 {  {
         int siz = 0;          int siz = 0;
           u_short mid = 0;
   
        siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup, #ifdef __NetBSD__
                        args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value));        srandom(getpid() ^ time(NULL));
        if (siz == -1) {#else
                printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());        srandomdev();
                return -1;#endif
        }        mid = random() % USHRT_MAX;
        if (SendTo(sock, siz) == -1) 
                return -1; 
        /* QoS == MQTT_QOS_ONCE, no wait for reply */ 
        if (args->QoS == MQTT_QOS_ONCE) 
                return 0; 
   
        if ((siz = RecvFrom(sock)) == -1 || !siz)        printf(" > Execute PUBLISH request #%d ... ", mid);
                return -1;        siz = mqtt_cli_Publish(args->cli, mid, args->Dup, args->QoS, args->Retain, 
        /* QoS == MQTT_QOS_ACK, wait for PUBACK */                        AIT_GET_STR(&args->Publish), AIT_ADDR(&args->Value), AIT_LEN(&args->Value));
        if (args->QoS == MQTT_QOS_ACK) { 
                siz = mqtt_readPUBACK(args->msg); 
                if (siz == args->MsgID) 
                        return siz; 
                if (!args->Dup) { 
                        args->Dup++; 
                        return Publish(sock); 
                } 
                goto end; 
        } 
 
 
        /* QoS == MQTT_QOS_EXACTLY */ 
        siz = mqtt_readPUBREC(args->msg); 
        if (siz != args->MsgID) { 
                if (!args->Dup) { 
                        args->Dup++; 
                        return Publish(sock); 
                } 
                goto end; 
        } 
 
        siz = mqtt_msgPUBREL(args->msg, args->MsgID); 
         if (siz == -1) {          if (siz == -1) {
                printf("Error:: msgPUBREL #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                printf("Error:: Publish #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 return -1;                  return -1;
        }        } else
        if (SendTo(sock, siz) == -1)                printf("Sended %d bytes \n", siz);
                return -1; 
   
        if ((siz = RecvFrom(sock)) == -1 || !siz)        return siz;
                return -1; 
 
        siz = mqtt_readPUBCOMP(args->msg); 
        if (siz == args->MsgID) 
                return siz; 
        if (!args->Dup) { 
                args->Dup++; 
                return Publish(sock); 
        } 
 
end: 
        printf("Error:: Message not delivered\n"); 
        return -1; 
 }  }
   
   
Line 120  main(int argc, char **argv) Line 78  main(int argc, char **argv)
         char ch;          char ch;
         ait_val_t val;          ait_val_t val;
         u_short port = atoi(MQTT_PORT);          u_short port = atoi(MQTT_PORT);
        int sock, ret = 0;        int ret = 0;
   
        if (!(args = malloc(sizeof(struct tagArgs)))) {        if (!(args = io_malloc(sizeof(struct tagArgs)))) {
                 printf("Error:: in alloc arguments #%d - %s\n", errno, strerror(errno));                  printf("Error:: in alloc arguments #%d - %s\n", errno, strerror(errno));
                 return 1;                  return 1;
         } else          } else
Line 132  main(int argc, char **argv) Line 90  main(int argc, char **argv)
         if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {          if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
                 printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                  printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 return 1;                  return 1;
         }          }
   
Line 141  main(int argc, char **argv) Line 99  main(int argc, char **argv)
         AIT_SET_STR(&args->Pass, "");          AIT_SET_STR(&args->Pass, "");
   
         args->ka = MQTT_KEEPALIVE;          args->ka = MQTT_KEEPALIVE;
        while ((ch = getopt(argc, argv, "T:U:P:p:q:drCW:M:m:fvh")) != -1)        while ((ch = getopt(argc, argv, "T:U:P:p:q:drCW:M:fvh")) != -1)
                 switch (ch) {                  switch (ch) {
                         case 'T':                          case 'T':
                                 args->ka = (u_short) strtol(optarg, NULL, 0);                                  args->ka = (u_short) strtol(optarg, NULL, 0);
Line 162  main(int argc, char **argv) Line 120  main(int argc, char **argv)
                                 AIT_FREE_VAL(&args->Pass);                                  AIT_FREE_VAL(&args->Pass);
                                 AIT_SET_STR(&args->Pass, optarg);                                  AIT_SET_STR(&args->Pass, optarg);
                                 break;                                  break;
                         case 'm':  
                                 args->MsgID = (u_short) strtol(optarg, NULL, 0);  
                                 break;  
                         case 'p':                          case 'p':
                                 port = (u_short) strtol(optarg, NULL, 0);                                  port = (u_short) strtol(optarg, NULL, 0);
                                 break;                                  break;
Line 173  main(int argc, char **argv) Line 128  main(int argc, char **argv)
                                 if (args->QoS > MQTT_QOS_EXACTLY) {                                  if (args->QoS > MQTT_QOS_EXACTLY) {
                                         printf("Error:: invalid QoS level %d\n", args->QoS);                                          printf("Error:: invalid QoS level %d\n", args->QoS);
                                         args->free(args);                                          args->free(args);
                                        free(args);                                        io_free(args);
                                         return 1;                                          return 1;
                                 }                                  }
                                 break;                                  break;
Line 195  main(int argc, char **argv) Line 150  main(int argc, char **argv)
                         case 'h':                          case 'h':
                         default:                          default:
                                 args->free(args);                                  args->free(args);
                                free(args);                                io_free(args);
                                 Usage();                                  Usage();
                                 return 1;                                  return 1;
                 }                  }
Line 204  main(int argc, char **argv) Line 159  main(int argc, char **argv)
         if (argc < 4) {          if (argc < 4) {
                 printf("Error:: host for connect not found, connection id, topic or value not supplied!\n\n");                  printf("Error:: host for connect not found, connection id, topic or value not supplied!\n\n");
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 Usage();                  Usage();
                 return 1;                  return 1;
         } else {          } else {
Line 218  main(int argc, char **argv) Line 173  main(int argc, char **argv)
         if (!io_gethostbyname(*argv, port, &args->addr)) {          if (!io_gethostbyname(*argv, port, &args->addr)) {
                 printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());                  printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 Usage();                  Usage();
                 return 1;                  return 1;
         }          }
        if (args->QoS && !args->MsgID)        printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
                args->MsgID = MQTT_DEFAULT_MSGID;        AIT_FREE_VAL(&val);
        ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr)); 
   
        if ((sock = InitClient()) == -1) {        if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 return 2;                  return 2;
         }          }
   
         if (args->isFile && !OpenFile()) {          if (args->isFile && !OpenFile()) {
                   mqtt_cli_Close(&args->cli);
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 return 3;                  return 3;
         }          }
   
        printf("Connected ... ");        switch ((ret = ConnectClient(args->cli->sock))) {
        switch ((ret = ConnectClient(sock))) { 
                 case -1:                  case -1:
                         printf(">> FAILED!\n");                          printf(">> FAILED!\n");
                         break;                          break;
Line 264  main(int argc, char **argv) Line 218  main(int argc, char **argv)
         }          }
   
         if (ret == MQTT_RETCODE_ACCEPTED) {          if (ret == MQTT_RETCODE_ACCEPTED) {
                ret = !(Publish(sock) == args->MsgID);                ret = (Publish(args->cli->sock) == -1);
                CloseClient(sock);        } else
        } else { 
                close(sock); 
                 ret = 4;                  ret = 4;
         }  
   
           mqtt_cli_Close(&args->cli);
   
         CloseFile();          CloseFile();
         args->free(args);          args->free(args);
        free(args);        io_free(args);
         return ret;          return ret;
 }  }

Removed from v.1.2  
changed lines
  Added in v.1.3


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>