Diff for /mqtt/src/mqtt_subs.c between versions 1.2.2.10 and 1.2.2.19

version 1.2.2.10, 2012/05/28 08:24:07 version 1.2.2.19, 2012/07/03 08:07:15
Line 8  io_enableDEBUG; Line 8  io_enableDEBUG;
   
 extern char compiled[], compiledby[], compilehost[];  extern char compiled[], compiledby[], compilehost[];
 volatile intptr_t Kill;  volatile intptr_t Kill;
   sched_root_task_t *root;
   
 struct tagArgs *args;  struct tagArgs *args;
   
Line 72  Subscribe(int sock, FILE *lf) Line 73  Subscribe(int sock, FILE *lf)
                 printf("OK\n");                  printf("OK\n");
   
         for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)          for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
                printf("  + Topic %s with QoS %d subscribed %s\n", (char*)                printf("  + Topic %s with QoS %d subscribe %s\n", (char*)
                                 sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");                                  sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
   
         free(qoses);          free(qoses);
Line 101  Unsubscribe(int sock) Line 102  Unsubscribe(int sock)
         return 0;          return 0;
 }  }
   
   static void
   sigz(int sig)
   {
           int stat;
   
           switch (sig) {
                   case SIGINT:
                   case SIGTERM:
                           Kill++;
                           break;
                   case SIGCHLD:
                           while (waitpid(-1, &stat, WNOHANG) > 0);
                           break;
           }
   }
   
   
   static void *
   execProc(sched_task_t *task)
   {
           FILE *f;
           char szLine[MAXPATHLEN + BUFSIZ] = { 0 };
   
           snprintf(szLine, sizeof szLine, "%s '%s' %hu %u", AIT_GET_STR(&args->Value), 
                           (char*) TASK_ARG(task), (u_short) TASK_VAL(task), (u_int) TASK_DATLEN(task));
           if (TASK_ARG(task))
                   io_free(TASK_ARG(task));
   
           f = popen(szLine, "w");
           if (!f) {
                   ioSYSERR(0);
                   return NULL;
           } else
                   fputs(TASK_DATA(task), f);
           pclose(f);
           return NULL;
   }
   
   static void *
   pubRX(sched_task_t *task)
   {
           int siz, rlen;
           char szTime[STRSIZ] = { 0 }, szTopic[STRSIZ] = { 0 };
           void *data = NULL;
           u_short mid;
           time_t tim;
           struct mqtthdr *hdr;
   
           rlen = RecvFrom(TASK_FD(task));
           if (rlen == -1)
                   goto end;
           if (!rlen) {
                   Kill++;
                   return NULL;
           }
   
           while (rlen > 0) {
                   hdr = (struct mqtthdr*) args->msg->msg_base;
   
                   switch (hdr->mqtt_msg.type) {
                           case MQTT_TYPE_PUBLISH:
                                   siz = mqtt_readPUBLISH(args->msg, szTopic, sizeof szTopic, &mid, &data);
                                   if (siz == -1)
                                           goto end;
                                   else {
                                           siz = mqtt_pktLen(hdr);
                                           rlen -= siz;
                                           ioVERBOSE(4) printf("Remains %d bytes, packet %d bytes\n", 
                                                           rlen, siz);
                                   }
   
                                   /* send to output */
                                   tim = time(NULL);
                                   strftime(szTime, sizeof szTime, "%Y-%m-%d %H:%M:%S", localtime(&tim));
                                   fprintf(TASK_ARG(task), "\n[%s] Message ID: %04hu, QoS: %hhu, "
                                                   "Length: %u, Topic: %s\n", szTime, mid, hdr->mqtt_msg.qos, 
                                                   siz, szTopic);
   
                                   if (data) {
                                           fputs((const char*) data, TASK_ARG(task));
                                           free(data);
                                   }
   
                                   fprintf(TASK_ARG(task), "\n");
                                   fflush(TASK_ARG(task));
   
                                   /* if exists exec script */
                                   if (!AIT_ISEMPTY(&args->Value))
                                           schedEvent(root, execProc, io_strdup(szTopic), mid, data, siz);
   
                                   memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
                                   break;
                           case MQTT_TYPE_PINGREQ:
                                   siz = mqtt_msgPINGRESP(args->msg);
                                   if (siz == -1)
                                           goto end;
                                   else
                                           rlen -= siz;
   
                                   /* send ping reply */
                                   if (SendTo(TASK_FD(task), siz) == -1)
                                           goto end;
   
                                   memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
                                   break;
                           default:
                                   ioVERBOSE(1) printf("Unwanted message type #%d ...\n", hdr->mqtt_msg.type);
                                   goto end;
                   }
           }
   end:
           schedReadSelf(task);
           return NULL;
   }
   
   
 int  int
 main(int argc, char **argv)  main(int argc, char **argv)
 {  {
Line 112  main(int argc, char **argv) Line 228  main(int argc, char **argv)
         int ret = 0;          int ret = 0;
         char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };          char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
         FILE *lf;          FILE *lf;
           struct sigaction sa;
   
         if (!(args = io_malloc(sizeof(struct tagArgs)))) {          if (!(args = io_malloc(sizeof(struct tagArgs)))) {
                 printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));                  printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
Line 181  main(int argc, char **argv) Line 298  main(int argc, char **argv)
                                                 sub->sub_ret = (u_char) *str;                                                  sub->sub_ret = (u_char) *str;
                                 } else                                  } else
                                         sub->sub_ret = (u_char) args->QoS;                                          sub->sub_ret = (u_char) args->QoS;
                                sub->sub_topic.msg_base = io_strdup(szStr);                                sub->sub_topic.msg_base = strdup(szStr);
                                 sub->sub_topic.msg_len = strlen(szStr);                                  sub->sub_topic.msg_len = strlen(szStr);
                                 break;                                  break;
                         case 'q':                          case 'q':
Line 244  main(int argc, char **argv) Line 361  main(int argc, char **argv)
         printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));          printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
         AIT_FREE_VAL(&val);          AIT_FREE_VAL(&val);
   
           sa.sa_handler = sigz;
           sigemptyset(&sa.sa_mask);
           sigaction(SIGTERM, &sa, NULL);
           sigaction(SIGINT, &sa, NULL);
           sigaction(SIGCHLD, &sa, NULL);
   
           if (!batch)
                   switch (fork()) {
                           case -1:        /* error */
                                   printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
                                   ret = 2;
                                   goto end;
                           case 0:         /* child */
                                   setsid();
   
                                   ret = open("/dev/null", O_RDWR);
                                   if (ret != -1) {
                                           dup2(ret, STDIN_FILENO);
                                           dup2(ret, STDOUT_FILENO);
                                           dup2(ret, STDERR_FILENO);
                                           close(ret);
                                   }
                                   break;
                           default:        /* parent */
                                   printf(">> Service started\n");
                                   ret = 0;
                                   goto end;
                   }
   
         if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {          if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
                args->free(args);                ret = 2;
                io_free(args);                goto end;
                return 2; 
         }          }
   
         switch ((ret = ConnectClient(args->cli->sock))) {          switch ((ret = ConnectClient(args->cli->sock))) {
Line 281  main(int argc, char **argv) Line 426  main(int argc, char **argv)
                         lf = stdout;                          lf = stdout;
                 if (lf) {                  if (lf) {
                         ret = Subscribe(args->cli->sock, lf);                          ret = Subscribe(args->cli->sock, lf);
   
                           root = schedBegin();
   
                           schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
                           schedRun(root, &Kill);
   
                           schedEnd(&root);
   
                         if (un)                          if (un)
                                 Unsubscribe(args->cli->sock);                                  Unsubscribe(args->cli->sock);
                         fclose(lf);                          fclose(lf);
                 } else                  } else
                        printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno));                        printf("Error:: in output file #%d - %s\n", errno, strerror(errno));
         } else          } else
                 ret = 3;                  ret = 3;
   
         mqtt_cli_Close(&args->cli);          mqtt_cli_Close(&args->cli);
end:
         args->free(args);          args->free(args);
         io_free(args);          io_free(args);
         return ret;          return ret;

Removed from v.1.2.2.10  
changed lines
  Added in v.1.2.2.19


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>