--- mqtt/src/mqtt_subs.c 2012/05/27 10:04:05 1.2.2.9 +++ mqtt/src/mqtt_subs.c 2012/06/19 08:32:17 1.2.2.12 @@ -8,6 +8,7 @@ io_enableDEBUG; extern char compiled[], compiledby[], compilehost[]; volatile intptr_t Kill; +sched_root_task_t *root; struct tagArgs *args; @@ -56,7 +57,11 @@ Subscribe(int sock, FILE *lf) u_short mid; mqtt_subscr_t *sub; +#ifdef __NetBSD__ + srandom(getpid() ^ time(NULL)); +#else srandomdev(); +#endif mid = random() % USHRT_MAX; printf(" > Execute SUBSCRIBE request #%d ... ", mid); @@ -80,7 +85,11 @@ Unsubscribe(int sock) { u_short mid; +#ifdef __NetBSD__ + srandom(getpid() ^ time(NULL)); +#else srandomdev(); +#endif mid = random() % USHRT_MAX; printf(" > Execute UNSUBSCRIBE request #%d ... ", mid); @@ -94,6 +103,33 @@ Unsubscribe(int sock) } +static void * +pubRX(sched_task_t *task) +{ + int siz, rlen; + char szTopic[BUFSIZ] = { 0 }; + void *data = NULL; + u_short mid; + + rlen = RecvFrom(TASK_FD(task)); + if (siz == -1) + goto end; + siz = mqtt_readPUBLISH(args->msg->msg_base, szTopic, sizeof szTopic, &mid, &data); + if (siz == -1) + goto end; + + fprintf(TASK_ARG(task), "\nMessage ID: 0x%04hu\n", mid); + + if (data) { + fputs((const char*) data, TASK_ARG(task)); + free(data); + } +end: + schedReadSelf(task); + return NULL; +} + + int main(int argc, char **argv) { @@ -173,7 +209,7 @@ main(int argc, char **argv) sub->sub_ret = (u_char) *str; } else sub->sub_ret = (u_char) args->QoS; - sub->sub_topic.msg_base = io_strdup(szStr); + sub->sub_topic.msg_base = strdup(szStr); sub->sub_topic.msg_len = strlen(szStr); break; case 'q': @@ -272,10 +308,18 @@ main(int argc, char **argv) else lf = stdout; if (lf) { + root = schedBegin(); + ret = Subscribe(args->cli->sock, lf); + + schedRead(root, pubRX, lf, args->cli->sock, NULL, 0); + schedRun(root, &Kill); + if (un) Unsubscribe(args->cli->sock); fclose(lf); + + schedEnd(&root); } else printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno)); } else