Diff for /mqtt/src/mqtt_subs.c between versions 1.2.2.5 and 1.2.2.13

version 1.2.2.5, 2012/05/01 22:30:31 version 1.2.2.13, 2012/06/20 09:23:28
Line 8  io_enableDEBUG; Line 8  io_enableDEBUG;
   
 extern char compiled[], compiledby[], compilehost[];  extern char compiled[], compiledby[], compilehost[];
 volatile intptr_t Kill;  volatile intptr_t Kill;
   sched_root_task_t *root;
   
 struct tagArgs *args;  struct tagArgs *args;
   
Line 19  Usage(void) Line 20  Usage(void)
                 "=== %s@%s === Compiled: %s ===\n\n"                  "=== %s@%s === Compiled: %s ===\n\n"
                 " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"                  " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"
                 "\t-l <value2file>\t\tSave received values to file\n"                  "\t-l <value2file>\t\tSave received values to file\n"
                   "\t-u\t\t\tUnsubscribe given topic(s)\n"
                 "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"                  "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"
                 "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"  
                 "\t-d\t\t\tSend duplicate message\n\n"                  "\t-d\t\t\tSend duplicate message\n\n"
                 "\t-C\t\t\tNot clear before connect!\n"                  "\t-C\t\t\tNot clear before connect!\n"
                 "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"                  "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
Line 52  cleanArgs(struct tagArgs * __restrict args) Line 53  cleanArgs(struct tagArgs * __restrict args)
 static int  static int
 Subscribe(int sock, FILE *lf)  Subscribe(int sock, FILE *lf)
 {  {
        int siz;        u_char *qoses, *qos;
        u_char *qoses;        u_short mid;
        u_short mid[2];        mqtt_subscr_t *sub;
   
   #ifdef __NetBSD__
           srandom(getpid() ^ time(NULL));
   #else
         srandomdev();          srandomdev();
        mid[0] = random() % USHRT_MAX;#endif
         mid = random() % USHRT_MAX;
   
        siz = mqtt_msgSUBSCRIBE(args->msg, args->subscr, mid[0], args->Dup, args->QoS);        printf(" > Execute SUBSCRIBE request #%d ... ", mid);
        if (siz == -1) {        qoses = mqtt_cli_Subscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK);
                printf("Error:: in msgSUBSCRIBE #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());        if (!qoses) {
                 printf("Error:: Subscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 return -1;                  return -1;
        }        } else
        siz = SendTo(sock, siz);                printf("OK\n");
        if (siz == -1)
         for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
                 printf("  + Topic %s with QoS %d subscribed %s\n", (char*)
                                 sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
 
         free(qoses);
         return 0;
 }
 
 static int
 Unsubscribe(int sock)
 {
         u_short mid;
 
 #ifdef __NetBSD__
         srandom(getpid() ^ time(NULL));
 #else
         srandomdev();
 #endif
         mid = random() % USHRT_MAX;
 
         printf(" > Execute UNSUBSCRIBE request #%d ... ", mid);
         if (mqtt_cli_Unsubscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK)) {
                 printf("Error:: Unsubscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 return -1;                  return -1;
           } else
                   printf("OK\n");
   
        siz = RecvFrom(sock);        return 0;
 }
 
 
 static void *
 pubRX(sched_task_t *task)
 {
         int siz, rlen;
         char szTopic[STRSIZ] = { 0 };
         void *data = NULL;
         u_short mid;
 
         rlen = RecvFrom(TASK_FD(task));
         if (rlen == -1)
                 goto end;
         siz = mqtt_readPUBLISH(args->msg, szTopic, sizeof szTopic, &mid, &data);
         if (siz == -1)          if (siz == -1)
                return -1;                goto end;
        siz = mqtt_readSUBACK(args->msg, &mid[1], &qoses);
        if (siz == -1) {        fprintf(TASK_ARG(task), "\nMessage ID: 0x%04hu, Length: %u, Topic: %s\n", 
                printf("Error:: in readSUBACK #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                        mid, siz, szTopic);
                return -1;
         if (data) {
                 fputs((const char*) data, TASK_ARG(task));
                 free(data);
         }          }
         if (mid[1] != mid[0]) {  
                 printf("Error:: received different connection ID %d != %d\n", mid[1], mid[0]);  
                 return -1;  
         }  
   
        free(qoses);        fprintf(TASK_ARG(task), "\n.\n");
        return 0;        fflush(TASK_ARG(task));
 end:
         schedReadSelf(task);
         return NULL;
 }  }
   
   
 int  int
 main(int argc, char **argv)  main(int argc, char **argv)
 {  {
        char ch, idx = 0, batch = 1;        char ch, un = 0, idx = 0, batch = 1;
         ait_val_t val;          ait_val_t val;
         u_short port = atoi(MQTT_PORT);          u_short port = atoi(MQTT_PORT);
         mqtt_subscr_t *sub;          mqtt_subscr_t *sub;
Line 97  main(int argc, char **argv) Line 145  main(int argc, char **argv)
         char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };          char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
         FILE *lf;          FILE *lf;
   
        if (!(args = malloc(sizeof(struct tagArgs)))) {        if (!(args = io_malloc(sizeof(struct tagArgs)))) {
                 printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));                  printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
                 return 1;                  return 1;
         } else          } else
                 memset(args, 0, sizeof(struct tagArgs));                  memset(args, 0, sizeof(struct tagArgs));
         if (!(args->subscr = mqtt_subAlloc(idx))) {          if (!(args->subscr = mqtt_subAlloc(idx))) {
                 printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                  printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                free(args);                io_free(args);
                 return 1;                  return 1;
         } else          } else
                 args->free = cleanArgs;                  args->free = cleanArgs;
Line 112  main(int argc, char **argv) Line 160  main(int argc, char **argv)
         if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {          if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
                 printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                  printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 return 1;                  return 1;
         }          }
   
Line 121  main(int argc, char **argv) Line 169  main(int argc, char **argv)
         AIT_SET_STR(&args->Pass, "");          AIT_SET_STR(&args->Pass, "");
   
         args->ka = MQTT_KEEPALIVE;          args->ka = MQTT_KEEPALIVE;
        while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvh")) != -1)        while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvuh")) != -1)
                 switch (ch) {                  switch (ch) {
                         case 'T':                          case 'T':
                                 args->ka = (u_short) strtol(optarg, NULL, 0);                                  args->ka = (u_short) strtol(optarg, NULL, 0);
Line 150  main(int argc, char **argv) Line 198  main(int argc, char **argv)
                                 if (!sub) {                                  if (!sub) {
                                         printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                                          printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                                         args->free(args);                                          args->free(args);
                                        free(args);                                        io_free(args);
                                         return 1;                                          return 1;
                                 } else                                  } else
                                         sub += idx++;                                          sub += idx++;
Line 173  main(int argc, char **argv) Line 221  main(int argc, char **argv)
                                 if (args->QoS > MQTT_QOS_EXACTLY) {                                  if (args->QoS > MQTT_QOS_EXACTLY) {
                                         printf("Error:: invalid QoS level %d\n", args->QoS);                                          printf("Error:: invalid QoS level %d\n", args->QoS);
                                         args->free(args);                                          args->free(args);
                                        free(args);                                        io_free(args);
                                         return 1;                                          return 1;
                                 }                                  }
                                 break;                                  break;
Line 192  main(int argc, char **argv) Line 240  main(int argc, char **argv)
                         case 'v':                          case 'v':
                                 io_incDebug;                                  io_incDebug;
                                 break;                                  break;
                           case 'u':
                                   un = 1;
                                   break;
                         case 'h':                          case 'h':
                         default:                          default:
                                 args->free(args);                                  args->free(args);
                                free(args);                                io_free(args);
                                 Usage();                                  Usage();
                                 return 1;                                  return 1;
                 }                  }
Line 204  main(int argc, char **argv) Line 255  main(int argc, char **argv)
         if (argc < 2) {          if (argc < 2) {
                 printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");                  printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 Usage();                  Usage();
                 return 1;                  return 1;
         } else {          } else {
Line 218  main(int argc, char **argv) Line 269  main(int argc, char **argv)
         if (!io_gethostbyname(*argv, port, &args->addr)) {          if (!io_gethostbyname(*argv, port, &args->addr)) {
                 printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());                  printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 Usage();                  Usage();
                 return 1;                  return 1;
         }          }
         printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));          printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
         AIT_FREE_VAL(&val);          AIT_FREE_VAL(&val);
   
        if (!(args->cli = mqtt_cli_Open(&args->addr.sa))) {        if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 return 2;                  return 2;
         }          }
   
Line 261  main(int argc, char **argv) Line 312  main(int argc, char **argv)
                 else                  else
                         lf = stdout;                          lf = stdout;
                 if (lf) {                  if (lf) {
                           root = schedBegin();
   
                         ret = Subscribe(args->cli->sock, lf);                          ret = Subscribe(args->cli->sock, lf);
   
                           schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
                           schedRun(root, &Kill);
   
                           if (un)
                                   Unsubscribe(args->cli->sock);
                         fclose(lf);                          fclose(lf);
   
                           schedEnd(&root);
                 } else                  } else
                         printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno));                          printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno));
         } else          } else
Line 271  main(int argc, char **argv) Line 332  main(int argc, char **argv)
         mqtt_cli_Close(&args->cli);          mqtt_cli_Close(&args->cli);
   
         args->free(args);          args->free(args);
        free(args);        io_free(args);
         return ret;          return ret;
 }  }

Removed from v.1.2.2.5  
changed lines
  Added in v.1.2.2.13


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>