Diff for /mqtt/src/mqtt_subs.c between versions 1.2 and 1.3

version 1.2, 2012/01/27 15:05:38 version 1.3, 2012/07/03 09:02:50
Line 7 Line 7
 io_enableDEBUG;  io_enableDEBUG;
   
 extern char compiled[], compiledby[], compilehost[];  extern char compiled[], compiledby[], compilehost[];
intptr_t Kill;volatile intptr_t Kill;
 sched_root_task_t *root;
   
 struct tagArgs *args;  struct tagArgs *args;
   
Line 17  Usage(void) Line 18  Usage(void)
 {  {
         printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n"          printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n"
                 "=== %s@%s === Compiled: %s ===\n\n"                  "=== %s@%s === Compiled: %s ===\n\n"
                " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> <topic> [exec_script <value>]\n\n"                " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"
                 "\t-l <value2file>\t\tSave received values to file\n"                  "\t-l <value2file>\t\tSave received values to file\n"
                   "\t-u\t\t\tUnsubscribe given topic(s)\n"
                 "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"                  "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"
                 "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"  
                 "\t-d\t\t\tSend duplicate message\n\n"                  "\t-d\t\t\tSend duplicate message\n\n"
                 "\t-C\t\t\tNot clear before connect!\n"                  "\t-C\t\t\tNot clear before connect!\n"
                 "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"                  "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
Line 39  static void Line 40  static void
 cleanArgs(struct tagArgs * __restrict args)  cleanArgs(struct tagArgs * __restrict args)
 {  {
         mqtt_msgFree(&args->msg, 42);          mqtt_msgFree(&args->msg, 42);
           mqtt_subFree(&args->subscr);
         AIT_FREE_VAL(&args->Will.Msg);          AIT_FREE_VAL(&args->Will.Msg);
         AIT_FREE_VAL(&args->Will.Topic);          AIT_FREE_VAL(&args->Will.Topic);
         AIT_FREE_VAL(&args->User);          AIT_FREE_VAL(&args->User);
Line 46  cleanArgs(struct tagArgs * __restrict args) Line 48  cleanArgs(struct tagArgs * __restrict args)
         AIT_FREE_VAL(&args->Publish);          AIT_FREE_VAL(&args->Publish);
         AIT_FREE_VAL(&args->Value);          AIT_FREE_VAL(&args->Value);
         AIT_FREE_VAL(&args->ConnID);          AIT_FREE_VAL(&args->ConnID);
         io_freeVars(&args->Subscribes);  
 }  }
   
 static int  static int
 Subscribe(int sock, FILE *lf)  Subscribe(int sock, FILE *lf)
 {  {
           u_char *qoses, *qos;
           u_short mid;
           mqtt_subscr_t *sub;
   
   #ifdef __NetBSD__
           srandom(getpid() ^ time(NULL));
   #else
           srandomdev();
   #endif
           mid = random() % USHRT_MAX;
   
           printf(" > Execute SUBSCRIBE request #%d ... ", mid);
           qoses = mqtt_cli_Subscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK);
           if (!qoses) {
                   printf("Error:: Subscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                   return -1;
           } else
                   printf("OK\n");
   
           for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
                   printf("  + Topic %s with QoS %d subscribe %s\n", (char*)
                                   sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
   
           free(qoses);
         return 0;          return 0;
 }  }
   
   static int
   Unsubscribe(int sock)
   {
           u_short mid;
   
   #ifdef __NetBSD__
           srandom(getpid() ^ time(NULL));
   #else
           srandomdev();
   #endif
           mid = random() % USHRT_MAX;
   
           printf(" > Execute UNSUBSCRIBE request #%d ... ", mid);
           if (mqtt_cli_Unsubscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK)) {
                   printf("Error:: Unsubscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                   return -1;
           } else
                   printf("OK\n");
   
           return 0;
   }
   
   static void
   sigz(int sig)
   {
           int stat;
   
           switch (sig) {
                   case SIGINT:
                   case SIGTERM:
                           Kill++;
                           break;
                   case SIGCHLD:
                           while (waitpid(-1, &stat, WNOHANG) > 0);
                           break;
           }
   }
   
   
   static void *
   execProc(sched_task_t *task)
   {
           FILE *f;
           char szLine[MAXPATHLEN + BUFSIZ] = { 0 };
   
           snprintf(szLine, sizeof szLine, "%s '%s' %hu %u", AIT_GET_STR(&args->Value), 
                           (char*) TASK_ARG(task), (u_short) TASK_VAL(task), (u_int) TASK_DATLEN(task));
           if (TASK_ARG(task))
                   io_free(TASK_ARG(task));
   
           f = popen(szLine, "w");
           if (!f) {
                   ioSYSERR(0);
                   return NULL;
           } else
                   fputs(TASK_DATA(task), f);
           pclose(f);
           return NULL;
   }
   
   static void *
   pubRX(sched_task_t *task)
   {
           int siz, rlen;
           char szTime[STRSIZ] = { 0 }, szTopic[STRSIZ] = { 0 };
           void *data = NULL;
           u_short mid;
           time_t tim;
           struct mqtthdr *hdr;
   
           rlen = RecvFrom(TASK_FD(task));
           if (rlen == -1)
                   goto end;
           if (!rlen) {
                   Kill++;
                   return NULL;
           }
   
           while (rlen > 0) {
                   hdr = (struct mqtthdr*) args->msg->msg_base;
   
                   switch (hdr->mqtt_msg.type) {
                           case MQTT_TYPE_PUBLISH:
                                   siz = mqtt_readPUBLISH(args->msg, szTopic, sizeof szTopic, &mid, &data);
                                   if (siz == -1)
                                           goto end;
                                   else {
                                           siz = mqtt_pktLen(hdr);
                                           rlen -= siz;
                                           ioVERBOSE(4) printf("Remains %d bytes, packet %d bytes\n", 
                                                           rlen, siz);
                                   }
   
                                   /* send to output */
                                   tim = time(NULL);
                                   strftime(szTime, sizeof szTime, "%Y-%m-%d %H:%M:%S", localtime(&tim));
                                   fprintf(TASK_ARG(task), "\n[%s] Message ID: %04hu, QoS: %hhu, "
                                                   "Length: %u, Topic: %s\n", szTime, mid, hdr->mqtt_msg.qos, 
                                                   siz, szTopic);
   
                                   if (data) {
                                           fputs((const char*) data, TASK_ARG(task));
                                           free(data);
                                   }
   
                                   fprintf(TASK_ARG(task), "\n");
                                   fflush(TASK_ARG(task));
   
                                   /* if exists exec script */
                                   if (!AIT_ISEMPTY(&args->Value))
                                           schedEvent(root, execProc, io_strdup(szTopic), mid, data, siz);
   
                                   memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
                                   break;
                           case MQTT_TYPE_PINGREQ:
                                   siz = mqtt_msgPINGRESP(args->msg);
                                   if (siz == -1)
                                           goto end;
                                   else
                                           rlen -= siz;
   
                                   /* send ping reply */
                                   if (SendTo(TASK_FD(task), siz) == -1)
                                           goto end;
   
                                   memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
                                   break;
                           default:
                                   ioVERBOSE(1) printf("Unwanted message type #%d ...\n", hdr->mqtt_msg.type);
                                   goto end;
                   }
           }
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   
 int  int
 main(int argc, char **argv)  main(int argc, char **argv)
 {  {
        char ch, batch = 1;        char ch, un = 0, idx = 0, batch = 1;
        ait_val_t *v, val;        ait_val_t val;
         u_short port = atoi(MQTT_PORT);          u_short port = atoi(MQTT_PORT);
        int sock, ret = 0;        mqtt_subscr_t *sub;
        char szLogName[MAXPATHLEN] = { 0 };        int ret = 0;
         char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
         FILE *lf;          FILE *lf;
           struct sigaction sa;
   
        if (!(args = malloc(sizeof(struct tagArgs)))) {        if (!(args = io_malloc(sizeof(struct tagArgs)))) {
                 printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));                  printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
                 return 1;                  return 1;
         } else          } else
                 memset(args, 0, sizeof(struct tagArgs));                  memset(args, 0, sizeof(struct tagArgs));
        if (!(args->Subscribes = io_allocVars(1))) {        if (!(args->subscr = mqtt_subAlloc(idx))) {
                printf("Error:: in subscribes array #%d - %s\n", io_GetErrno(), io_GetError());                printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                free(args);                io_free(args);
                 return 1;                  return 1;
         } else          } else
                 args->free = cleanArgs;                  args->free = cleanArgs;
Line 81  main(int argc, char **argv) Line 245  main(int argc, char **argv)
         if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {          if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
                 printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());                  printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 return 1;                  return 1;
         }          }
   
Line 90  main(int argc, char **argv) Line 254  main(int argc, char **argv)
         AIT_SET_STR(&args->Pass, "");          AIT_SET_STR(&args->Pass, "");
   
         args->ka = MQTT_KEEPALIVE;          args->ka = MQTT_KEEPALIVE;
        while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvh")) != -1)        while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvuh")) != -1)
                 switch (ch) {                  switch (ch) {
                         case 'T':                          case 'T':
                                 args->ka = (u_short) strtol(optarg, NULL, 0);                                  args->ka = (u_short) strtol(optarg, NULL, 0);
Line 115  main(int argc, char **argv) Line 279  main(int argc, char **argv)
                                 port = (u_short) strtol(optarg, NULL, 0);                                  port = (u_short) strtol(optarg, NULL, 0);
                                 break;                                  break;
                         case 's':                          case 's':
                                v = io_allocVar();                                sub = mqtt_subRealloc(&args->subscr, idx + 1);
                                if (!v) {                                if (!sub) {
                                        printf("Error:: not enough memory #%d - %s\n", errno, strerror(errno));                                        printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
                                         args->free(args);                                          args->free(args);
                                        free(args);                                        io_free(args);
                                         return 1;                                          return 1;
                                 } else                                  } else
                                        AIT_SET_STR(v, optarg);                                        sub += idx++;
                                io_arrayElem(args->Subscribes, io_arraySize(args->Subscribes), v);
                                 strlcpy(szStr, optarg, sizeof szStr);
                                 if ((str = strchr(szStr, '|'))) {
                                         *str++ = 0;
                                         *str -= 0x30;
                                         if (*str < 0 || *str > MQTT_QOS_RESERVED)
                                                 sub->sub_ret = (u_char) args->QoS;
                                         else
                                                 sub->sub_ret = (u_char) *str;
                                 } else
                                         sub->sub_ret = (u_char) args->QoS;
                                 sub->sub_topic.msg_base = strdup(szStr);
                                 sub->sub_topic.msg_len = strlen(szStr);
                                 break;                                  break;
                         case 'q':                          case 'q':
                                 args->QoS = (char) strtol(optarg, NULL, 0);                                  args->QoS = (char) strtol(optarg, NULL, 0);
                                 if (args->QoS > MQTT_QOS_EXACTLY) {                                  if (args->QoS > MQTT_QOS_EXACTLY) {
                                         printf("Error:: invalid QoS level %d\n", args->QoS);                                          printf("Error:: invalid QoS level %d\n", args->QoS);
                                         args->free(args);                                          args->free(args);
                                        free(args);                                        io_free(args);
                                         return 1;                                          return 1;
                                 }                                  }
                                 break;                                  break;
Line 149  main(int argc, char **argv) Line 325  main(int argc, char **argv)
                         case 'v':                          case 'v':
                                 io_incDebug;                                  io_incDebug;
                                 break;                                  break;
                           case 'u':
                                   un = 1;
                                   break;
                         case 'h':                          case 'h':
                         default:                          default:
                                 args->free(args);                                  args->free(args);
                                free(args);                                io_free(args);
                                 Usage();                                  Usage();
                                 return 1;                                  return 1;
                 }                  }
         argc -= optind;          argc -= optind;
         argv += optind;          argv += optind;
        if (argc < 3) {        if (argc < 2) {
                 printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");                  printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 Usage();                  Usage();
                 return 1;                  return 1;
         } else {          } else {
                 AIT_FREE_VAL(&args->ConnID);                  AIT_FREE_VAL(&args->ConnID);
                 AIT_SET_STR(&args->ConnID, argv[1]);                  AIT_SET_STR(&args->ConnID, argv[1]);
                 AIT_FREE_VAL(&args->Publish);  
                 AIT_SET_STR(&args->Publish, argv[2]);  
         }          }
        if (argc > 3) {        if (argc > 2) {
                 AIT_FREE_VAL(&args->Value);                  AIT_FREE_VAL(&args->Value);
                AIT_SET_STR(&args->Value, argv[3]);                AIT_SET_STR(&args->Value, argv[2]);
         }          }
         if (!io_gethostbyname(*argv, port, &args->addr)) {          if (!io_gethostbyname(*argv, port, &args->addr)) {
                 printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());                  printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
                 args->free(args);                  args->free(args);
                free(args);                io_free(args);
                 Usage();                  Usage();
                 return 1;                  return 1;
         }          }
        ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr));        printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
         AIT_FREE_VAL(&val);
   
        if ((sock = InitClient()) == -1) {        sa.sa_handler = sigz;
                args->free(args);        sigemptyset(&sa.sa_mask);
                free(args);        sigaction(SIGTERM, &sa, NULL);
                return 2;        sigaction(SIGINT, &sa, NULL);
         sigaction(SIGCHLD, &sa, NULL);
 
         if (!batch)
                 switch (fork()) {
                         case -1:        /* error */
                                 printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
                                 ret = 2;
                                 goto end;
                         case 0:         /* child */
                                 setsid();
 
                                 ret = open("/dev/null", O_RDWR);
                                 if (ret != -1) {
                                         dup2(ret, STDIN_FILENO);
                                         dup2(ret, STDOUT_FILENO);
                                         dup2(ret, STDERR_FILENO);
                                         close(ret);
                                 }
                                 break;
                         default:        /* parent */
                                 printf(">> Service started\n");
                                 ret = 0;
                                 goto end;
                 }
 
         if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
                 ret = 2;
                 goto end;
         }          }
   
        printf("Connected ... ");        switch ((ret = ConnectClient(args->cli->sock))) {
        switch ((ret = ConnectClient(sock))) { 
                 case -1:                  case -1:
                         printf(">> FAILED!\n");                          printf(">> FAILED!\n");
                         break;                          break;
Line 220  main(int argc, char **argv) Line 425  main(int argc, char **argv)
                 else                  else
                         lf = stdout;                          lf = stdout;
                 if (lf) {                  if (lf) {
                        ret = Subscribe(sock, lf);                        ret = Subscribe(args->cli->sock, lf);
 
                         root = schedBegin();
 
                         schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
                         schedRun(root, &Kill);
 
                         schedEnd(&root);
 
                         if (un)
                                 Unsubscribe(args->cli->sock);
                         fclose(lf);                          fclose(lf);
                         shutdown(sock, SHUT_RDWR);  
                 } else                  } else
                        printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno));                        printf("Error:: in output file #%d - %s\n", errno, strerror(errno));
                CloseClient(sock);        } else
        } else { 
                close(sock); 
                 ret = 3;                  ret = 3;
         }  
   
           mqtt_cli_Close(&args->cli);
   end:
         args->free(args);          args->free(args);
        free(args);        io_free(args);
         return ret;          return ret;
 }  }

Removed from v.1.2  
changed lines
  Added in v.1.3


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>