--- mqtt/src/mqtt_subs.c 2012/06/01 13:13:40 1.2.2.11 +++ mqtt/src/mqtt_subs.c 2012/06/19 08:32:17 1.2.2.12 @@ -8,6 +8,7 @@ io_enableDEBUG; extern char compiled[], compiledby[], compilehost[]; volatile intptr_t Kill; +sched_root_task_t *root; struct tagArgs *args; @@ -102,6 +103,33 @@ Unsubscribe(int sock) } +static void * +pubRX(sched_task_t *task) +{ + int siz, rlen; + char szTopic[BUFSIZ] = { 0 }; + void *data = NULL; + u_short mid; + + rlen = RecvFrom(TASK_FD(task)); + if (siz == -1) + goto end; + siz = mqtt_readPUBLISH(args->msg->msg_base, szTopic, sizeof szTopic, &mid, &data); + if (siz == -1) + goto end; + + fprintf(TASK_ARG(task), "\nMessage ID: 0x%04hu\n", mid); + + if (data) { + fputs((const char*) data, TASK_ARG(task)); + free(data); + } +end: + schedReadSelf(task); + return NULL; +} + + int main(int argc, char **argv) { @@ -280,10 +308,18 @@ main(int argc, char **argv) else lf = stdout; if (lf) { + root = schedBegin(); + ret = Subscribe(args->cli->sock, lf); + + schedRead(root, pubRX, lf, args->cli->sock, NULL, 0); + schedRun(root, &Kill); + if (un) Unsubscribe(args->cli->sock); fclose(lf); + + schedEnd(&root); } else printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno)); } else