--- mqtt/src/mqtt_subs.c 2011/12/20 16:04:34 1.1.2.1 +++ mqtt/src/mqtt_subs.c 2012/05/01 22:30:31 1.2.2.5 @@ -7,7 +7,7 @@ io_enableDEBUG; extern char compiled[], compiledby[], compilehost[]; -intptr_t Kill; +volatile intptr_t Kill; struct tagArgs *args; @@ -17,7 +17,7 @@ Usage(void) { printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n" "=== %s@%s === Compiled: %s ===\n\n" - " Syntax: mqtt_subs [options] [exec_script ]\n\n" + " Syntax: mqtt_subs [options] [exec_script ]\n\n" "\t-l \t\tSave received values to file\n" "\t-s \tSubscribe for this topic, if wish add different |QoS to topic\n" "\t-q \t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n" @@ -39,6 +39,7 @@ static void cleanArgs(struct tagArgs * __restrict args) { mqtt_msgFree(&args->msg, 42); + mqtt_subFree(&args->subscr); AIT_FREE_VAL(&args->Will.Msg); AIT_FREE_VAL(&args->Will.Topic); AIT_FREE_VAL(&args->User); @@ -46,12 +47,41 @@ cleanArgs(struct tagArgs * __restrict args) AIT_FREE_VAL(&args->Publish); AIT_FREE_VAL(&args->Value); AIT_FREE_VAL(&args->ConnID); - io_freeVars(&args->Subscribes); } static int Subscribe(int sock, FILE *lf) { + int siz; + u_char *qoses; + u_short mid[2]; + + srandomdev(); + mid[0] = random() % USHRT_MAX; + + siz = mqtt_msgSUBSCRIBE(args->msg, args->subscr, mid[0], args->Dup, args->QoS); + if (siz == -1) { + printf("Error:: in msgSUBSCRIBE #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); + return -1; + } + siz = SendTo(sock, siz); + if (siz == -1) + return -1; + + siz = RecvFrom(sock); + if (siz == -1) + return -1; + siz = mqtt_readSUBACK(args->msg, &mid[1], &qoses); + if (siz == -1) { + printf("Error:: in readSUBACK #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); + return -1; + } + if (mid[1] != mid[0]) { + printf("Error:: received different connection ID %d != %d\n", mid[1], mid[0]); + return -1; + } + + free(qoses); return 0; } @@ -59,11 +89,12 @@ Subscribe(int sock, FILE *lf) int main(int argc, char **argv) { - char ch, batch = 1; - ait_val_t *v, val; + char ch, idx = 0, batch = 1; + ait_val_t val; u_short port = atoi(MQTT_PORT); - int sock, ret = 0; - char szLogName[MAXPATHLEN] = { 0 }; + mqtt_subscr_t *sub; + int ret = 0; + char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 }; FILE *lf; if (!(args = malloc(sizeof(struct tagArgs)))) { @@ -71,8 +102,8 @@ main(int argc, char **argv) return 1; } else memset(args, 0, sizeof(struct tagArgs)); - if (!(args->Subscribes = io_allocVars(1))) { - printf("Error:: in subscribes array #%d - %s\n", io_GetErrno(), io_GetError()); + if (!(args->subscr = mqtt_subAlloc(idx))) { + printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); free(args); return 1; } else @@ -115,15 +146,27 @@ main(int argc, char **argv) port = (u_short) strtol(optarg, NULL, 0); break; case 's': - v = io_allocVar(); - if (!v) { - printf("Error:: not enough memory #%d - %s\n", errno, strerror(errno)); + sub = mqtt_subRealloc(&args->subscr, idx + 1); + if (!sub) { + printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError()); args->free(args); free(args); return 1; } else - AIT_SET_STR(v, optarg); - io_arrayElem(args->Subscribes, io_arraySize(args->Subscribes), v); + sub += idx++; + + strlcpy(szStr, optarg, sizeof szStr); + if ((str = strchr(szStr, '|'))) { + *str++ = 0; + *str -= 0x30; + if (*str < 0 || *str > MQTT_QOS_RESERVED) + sub->sub_ret = (u_char) args->QoS; + else + sub->sub_ret = (u_char) *str; + } else + sub->sub_ret = (u_char) args->QoS; + sub->sub_topic.msg_base = strdup(szStr); + sub->sub_topic.msg_len = strlen(szStr); break; case 'q': args->QoS = (char) strtol(optarg, NULL, 0); @@ -158,7 +201,7 @@ main(int argc, char **argv) } argc -= optind; argv += optind; - if (argc < 3) { + if (argc < 2) { printf("Error:: host for connect not found, connection id or topic not supplied!\n\n"); args->free(args); free(args); @@ -167,12 +210,10 @@ main(int argc, char **argv) } else { AIT_FREE_VAL(&args->ConnID); AIT_SET_STR(&args->ConnID, argv[1]); - AIT_FREE_VAL(&args->Publish); - AIT_SET_STR(&args->Publish, argv[2]); } - if (argc > 3) { + if (argc > 2) { AIT_FREE_VAL(&args->Value); - AIT_SET_STR(&args->Value, argv[3]); + AIT_SET_STR(&args->Value, argv[2]); } if (!io_gethostbyname(*argv, port, &args->addr)) { printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError()); @@ -181,16 +222,16 @@ main(int argc, char **argv) Usage(); return 1; } - ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr)); + printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr)); + AIT_FREE_VAL(&val); - if ((sock = InitClient()) == -1) { + if (!(args->cli = mqtt_cli_Open(&args->addr.sa))) { args->free(args); free(args); return 2; } - printf("Connected ... "); - switch ((ret = try2Connect(sock))) { + switch ((ret = ConnectClient(args->cli->sock))) { case -1: printf(">> FAILED!\n"); break; @@ -220,15 +261,15 @@ main(int argc, char **argv) else lf = stdout; if (lf) { - ret = Subscribe(sock, lf); + ret = Subscribe(args->cli->sock, lf); fclose(lf); - shutdown(sock, SHUT_RDWR); } else printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno)); } else ret = 3; - close(sock); + mqtt_cli_Close(&args->cli); + args->free(args); free(args); return ret;