--- mqtt/src/mqtt_subs.c 2012/05/22 14:15:04 1.2.2.8 +++ mqtt/src/mqtt_subs.c 2012/06/19 08:32:17 1.2.2.12 @@ -8,6 +8,7 @@ io_enableDEBUG; extern char compiled[], compiledby[], compilehost[]; volatile intptr_t Kill; +sched_root_task_t *root; struct tagArgs *args; @@ -56,7 +57,11 @@ Subscribe(int sock, FILE *lf) u_short mid; mqtt_subscr_t *sub; +#ifdef __NetBSD__ + srandom(getpid() ^ time(NULL)); +#else srandomdev(); +#endif mid = random() % USHRT_MAX; printf(" > Execute SUBSCRIBE request #%d ... ", mid); @@ -80,7 +85,11 @@ Unsubscribe(int sock) { u_short mid; +#ifdef __NetBSD__ + srandom(getpid() ^ time(NULL)); +#else srandomdev(); +#endif mid = random() % USHRT_MAX; printf(" > Execute UNSUBSCRIBE request #%d ... ", mid); @@ -94,6 +103,33 @@ Unsubscribe(int sock) } +static void * +pubRX(sched_task_t *task) +{ + int siz, rlen; + char szTopic[BUFSIZ] = { 0 }; + void *data = NULL; + u_short mid; + + rlen = RecvFrom(TASK_FD(task)); + if (siz == -1) + goto end; + siz = mqtt_readPUBLISH(args->msg->msg_base, szTopic, sizeof szTopic, &mid, &data); + if (siz == -1) + goto end; + + fprintf(TASK_ARG(task), "\nMessage ID: 0x%04hu\n", mid); + + if (data) { + fputs((const char*) data, TASK_ARG(task)); + free(data); + } +end: + schedReadSelf(task); + return NULL; +} + + int main(int argc, char **argv) { @@ -105,14 +141,14 @@ main(int argc, char **argv) char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 }; FILE *lf; - if (!(args = malloc(sizeof(struct tagArgs)))) { + if (!(args = io_malloc(sizeof(struct tagArgs)))) { printf("Error:: in arguments #%d - %s\n", errno, strerror(errno)); return 1; } else memset(args, 0, sizeof(struct tagArgs)); if (!(args->subscr = mqtt_subAlloc(idx))) { printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); - free(args); + io_free(args); return 1; } else args->free = cleanArgs; @@ -120,7 +156,7 @@ main(int argc, char **argv) if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) { printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); args->free(args); - free(args); + io_free(args); return 1; } @@ -158,7 +194,7 @@ main(int argc, char **argv) if (!sub) { printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); args->free(args); - free(args); + io_free(args); return 1; } else sub += idx++; @@ -181,7 +217,7 @@ main(int argc, char **argv) if (args->QoS > MQTT_QOS_EXACTLY) { printf("Error:: invalid QoS level %d\n", args->QoS); args->free(args); - free(args); + io_free(args); return 1; } break; @@ -206,7 +242,7 @@ main(int argc, char **argv) case 'h': default: args->free(args); - free(args); + io_free(args); Usage(); return 1; } @@ -215,7 +251,7 @@ main(int argc, char **argv) if (argc < 2) { printf("Error:: host for connect not found, connection id or topic not supplied!\n\n"); args->free(args); - free(args); + io_free(args); Usage(); return 1; } else { @@ -229,7 +265,7 @@ main(int argc, char **argv) if (!io_gethostbyname(*argv, port, &args->addr)) { printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError()); args->free(args); - free(args); + io_free(args); Usage(); return 1; } @@ -238,7 +274,7 @@ main(int argc, char **argv) if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) { args->free(args); - free(args); + io_free(args); return 2; } @@ -272,10 +308,18 @@ main(int argc, char **argv) else lf = stdout; if (lf) { + root = schedBegin(); + ret = Subscribe(args->cli->sock, lf); + + schedRead(root, pubRX, lf, args->cli->sock, NULL, 0); + schedRun(root, &Kill); + if (un) Unsubscribe(args->cli->sock); fclose(lf); + + schedEnd(&root); } else printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno)); } else @@ -284,6 +328,6 @@ main(int argc, char **argv) mqtt_cli_Close(&args->cli); args->free(args); - free(args); + io_free(args); return ret; }