Diff for /mqtt/src/mqtt_pub.c between versions 1.1.2.1 and 1.2.2.5

version 1.1.2.1, 2011/12/20 15:03:42 version 1.2.2.5, 2012/05/08 11:45:57
Line 6 Line 6
   
 io_enableDEBUG;  io_enableDEBUG;
   
 sl_config cfg;  
 extern char compiled[], compiledby[], compilehost[];  extern char compiled[], compiledby[], compilehost[];
 static char szCfgName[MAXPATHLEN];  
   
 struct tagArgs *args;  struct tagArgs *args;
   
Line 21  Usage(void) Line 19  Usage(void)
                 " Syntax: mqtt_pub [options] <connect_to_broker[:port]> <ConnectID> <topic> <value_for_publish>\n\n"                  " Syntax: mqtt_pub [options] <connect_to_broker[:port]> <ConnectID> <topic> <value_for_publish>\n\n"
                 "\t-f\t\t\t'value_for_publish' is file name instead text\n"                  "\t-f\t\t\t'value_for_publish' is file name instead text\n"
                 "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"                  "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"
                   "\t-m <message_id>\t\tMessage ID for publish message\n"
                 "\t-d\t\t\tSend duplicate message\n"                  "\t-d\t\t\tSend duplicate message\n"
                 "\t-r\t\t\tRetain message from broker\n\n"                  "\t-r\t\t\tRetain message from broker\n\n"
                 "\t-C\t\t\tNot clear before connect!!!\n"                  "\t-C\t\t\tNot clear before connect!!!\n"
Line 30  Usage(void) Line 29  Usage(void)
                 "\t-P <password>\t\tPassword\n"                  "\t-P <password>\t\tPassword\n"
                 "\t-W <topic>\t\tWill Topic\n"                  "\t-W <topic>\t\tWill Topic\n"
                 "\t-M <message>\t\tWill Message\n"                  "\t-M <message>\t\tWill Message\n"
                 "\t-c <config>\t\tService config\n"  
                 "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"                  "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
                 "\t-h\t\t\tHelp! This screen\n\n",                   "\t-h\t\t\tHelp! This screen\n\n", 
                 compiledby, compilehost, compiled);                  compiledby, compilehost, compiled);
Line 52  cleanArgs(struct tagArgs * __restrict args) Line 50  cleanArgs(struct tagArgs * __restrict args)
 static int  static int
 Publish(int sock)  Publish(int sock)
 {  {
        return 0;        int siz = 0;
 
         siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup, 
                         args->QoS, args->Retain, AIT_ADDR(&args->Value), AIT_LEN(&args->Value));
         if (siz == -1) {
                 printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 return -1;
         }
         if (SendTo(sock, siz) == -1)
                 return -1;
         /* QoS == MQTT_QOS_ONCE, no wait for reply */
         if (args->QoS == MQTT_QOS_ONCE)
                 return 0;
 
         if ((siz = RecvFrom(sock)) == -1 || !siz)
                 return -1;
         /* QoS == MQTT_QOS_ACK, wait for PUBACK */
         if (args->QoS == MQTT_QOS_ACK) {
                 siz = mqtt_readPUBACK(args->msg);
                 if (siz == args->MsgID)
                         return siz;
                 if (!args->Dup) {
                         args->Dup++;
                         return Publish(sock);
                 }
                 goto end;
         }
 
 
         /* QoS == MQTT_QOS_EXACTLY */
         siz = mqtt_readPUBREC(args->msg);
         if (siz != args->MsgID) {
                 if (!args->Dup) {
                         args->Dup++;
                         return Publish(sock);
                 }
                 goto end;
         }
 
         do {
                 siz = mqtt_msgPUBREL(args->msg, args->MsgID);
                 if (siz == -1) {
                         printf("Error:: msgPUBREL #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                         return -1;
                 }
                 if (SendTo(sock, siz) == -1)
                         return -1;
 
                 if ((siz = RecvFrom(sock)) == -1 || !siz)
                         return -1;
 
                 siz = mqtt_readPUBCOMP(args->msg);
                 if (siz == args->MsgID)
                         return siz;
                 if (!args->Dup) {
                         args->Dup++;
                         continue;
                 }
         } while (0);
 
 end:
         printf("Error:: Message not delivered\n");
         return -1;
 }  }
   
   
Line 62  main(int argc, char **argv) Line 122  main(int argc, char **argv)
         char ch;          char ch;
         ait_val_t val;          ait_val_t val;
         u_short port = atoi(MQTT_PORT);          u_short port = atoi(MQTT_PORT);
        int sock, ret = 0;        int ret = 0;
   
         if (!(args = malloc(sizeof(struct tagArgs)))) {          if (!(args = malloc(sizeof(struct tagArgs)))) {
                 printf("Error:: in alloc arguments #%d - %s\n", errno, strerror(errno));                  printf("Error:: in alloc arguments #%d - %s\n", errno, strerror(errno));
Line 83  main(int argc, char **argv) Line 143  main(int argc, char **argv)
         AIT_SET_STR(&args->Pass, "");          AIT_SET_STR(&args->Pass, "");
   
         args->ka = MQTT_KEEPALIVE;          args->ka = MQTT_KEEPALIVE;
        strlcpy(szCfgName, DEFAULT_CONFIG, sizeof szCfgName);        while ((ch = getopt(argc, argv, "T:U:P:p:q:drCW:M:m:fvh")) != -1)
        while ((ch = getopt(argc, argv, "T:U:P:p:q:drCc:W:M:fvh")) != -1) 
                 switch (ch) {                  switch (ch) {
                         case 'T':                          case 'T':
                                 args->ka = (u_short) strtol(optarg, NULL, 0);                                  args->ka = (u_short) strtol(optarg, NULL, 0);
Line 105  main(int argc, char **argv) Line 164  main(int argc, char **argv)
                                 AIT_FREE_VAL(&args->Pass);                                  AIT_FREE_VAL(&args->Pass);
                                 AIT_SET_STR(&args->Pass, optarg);                                  AIT_SET_STR(&args->Pass, optarg);
                                 break;                                  break;
                           case 'm':
                                   args->MsgID = (u_short) strtol(optarg, NULL, 0);
                                   break;
                         case 'p':                          case 'p':
                                 port = (u_short) strtol(optarg, NULL, 0);                                  port = (u_short) strtol(optarg, NULL, 0);
                                 break;                                  break;
Line 129  main(int argc, char **argv) Line 191  main(int argc, char **argv)
                         case 'f':                          case 'f':
                                 args->isFile++;                                  args->isFile++;
                                 break;                                  break;
                         case 'c':  
                                 strlcpy(szCfgName, optarg, sizeof szCfgName);  
                                 break;  
                         case 'v':                          case 'v':
                                 io_incDebug;                                  io_incDebug;
                                 break;                                  break;
Line 165  main(int argc, char **argv) Line 224  main(int argc, char **argv)
                 Usage();                  Usage();
                 return 1;                  return 1;
         }          }
        ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr));        if (args->QoS && !args->MsgID)
                 args->MsgID = MQTT_DEFAULT_MSGID;
         printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
         AIT_FREE_VAL(&val);
   
        if ((sock = InitClient()) == -1) {        if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
                 args->free(args);                  args->free(args);
                 free(args);                  free(args);
                 return 2;                  return 2;
         }          }
   
        printf("Connected ... ");        if (args->isFile && !OpenFile()) {
        switch ((ret = try2Connect(sock))) {                mqtt_cli_Close(&args->cli);
                 args->free(args);
                 free(args);
                 return 3;
         }
 
         switch ((ret = ConnectClient(args->cli->sock))) {
                 case -1:                  case -1:
                         printf(">> FAILED!\n");                          printf(">> FAILED!\n");
                         break;                          break;
Line 199  main(int argc, char **argv) Line 267  main(int argc, char **argv)
         }          }
   
         if (ret == MQTT_RETCODE_ACCEPTED) {          if (ret == MQTT_RETCODE_ACCEPTED) {
                ret = Publish(sock);                ret = !(Publish(args->cli->sock) == args->MsgID);
                shutdown(sock, SHUT_RDWR); 
         } else          } else
                ret = 3;                ret = 4;
   
        close(sock);        mqtt_cli_Close(&args->cli);
 
         CloseFile();
         args->free(args);          args->free(args);
         free(args);          free(args);
         return ret;          return ret;

Removed from v.1.1.2.1  
changed lines
  Added in v.1.2.2.5


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>