Annotation of mqtt/src/mqtt_subs.c, revision 1.2.2.12
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "mqtt.h"
4: #include "client.h"
5:
6:
7: io_enableDEBUG;
8:
9: extern char compiled[], compiledby[], compilehost[];
1.2.2.2 misho 10: volatile intptr_t Kill;
1.2.2.12! misho 11: sched_root_task_t *root;
1.2 misho 12:
13: struct tagArgs *args;
14:
15:
16: static void
17: Usage(void)
18: {
19: printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n"
20: "=== %s@%s === Compiled: %s ===\n\n"
1.2.2.3 misho 21: " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"
1.2 misho 22: "\t-l <value2file>\t\tSave received values to file\n"
1.2.2.8 misho 23: "\t-u\t\t\tUnsubscribe given topic(s)\n"
1.2 misho 24: "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"
25: "\t-d\t\t\tSend duplicate message\n\n"
26: "\t-C\t\t\tNot clear before connect!\n"
27: "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
28: "\t-T <timeout>\t\tKeep alive timeout in seconds\n"
29: "\t-U <username>\t\tUsername\n"
30: "\t-P <password>\t\tPassword\n"
31: "\t-W <topic>\t\tWill Topic\n"
32: "\t-M <message>\t\tWill Message\n\n"
33: "\t-D\t\t\tDaemon mode\n"
34: "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
35: "\t-h\t\t\tHelp! This screen\n\n",
36: compiledby, compilehost, compiled);
37: }
38:
39: static void
40: cleanArgs(struct tagArgs * __restrict args)
41: {
42: mqtt_msgFree(&args->msg, 42);
1.2.2.3 misho 43: mqtt_subFree(&args->subscr);
1.2 misho 44: AIT_FREE_VAL(&args->Will.Msg);
45: AIT_FREE_VAL(&args->Will.Topic);
46: AIT_FREE_VAL(&args->User);
47: AIT_FREE_VAL(&args->Pass);
48: AIT_FREE_VAL(&args->Publish);
49: AIT_FREE_VAL(&args->Value);
50: AIT_FREE_VAL(&args->ConnID);
51: }
52:
53: static int
54: Subscribe(int sock, FILE *lf)
55: {
1.2.2.6 misho 56: u_char *qoses, *qos;
57: u_short mid;
58: mqtt_subscr_t *sub;
1.2.2.3 misho 59:
1.2.2.10 misho 60: #ifdef __NetBSD__
61: srandom(getpid() ^ time(NULL));
62: #else
1.2.2.4 misho 63: srandomdev();
1.2.2.10 misho 64: #endif
1.2.2.6 misho 65: mid = random() % USHRT_MAX;
1.2.2.4 misho 66:
1.2.2.6 misho 67: printf(" > Execute SUBSCRIBE request #%d ... ", mid);
68: qoses = mqtt_cli_Subscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK);
69: if (!qoses) {
70: printf("Error:: Subscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2.2.3 misho 71: return -1;
1.2.2.6 misho 72: } else
73: printf("OK\n");
1.2.2.3 misho 74:
1.2.2.6 misho 75: for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
76: printf(" + Topic %s with QoS %d subscribed %s\n", (char*)
77: sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
78:
79: free(qoses);
80: return 0;
81: }
82:
83: static int
84: Unsubscribe(int sock)
85: {
86: u_short mid;
87:
1.2.2.10 misho 88: #ifdef __NetBSD__
89: srandom(getpid() ^ time(NULL));
90: #else
1.2.2.6 misho 91: srandomdev();
1.2.2.10 misho 92: #endif
1.2.2.6 misho 93: mid = random() % USHRT_MAX;
94:
1.2.2.7 misho 95: printf(" > Execute UNSUBSCRIBE request #%d ... ", mid);
1.2.2.6 misho 96: if (mqtt_cli_Unsubscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK)) {
97: printf("Error:: Unsubscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2.2.3 misho 98: return -1;
1.2.2.7 misho 99: } else
100: printf("OK\n");
1.2.2.3 misho 101:
1.2 misho 102: return 0;
103: }
104:
105:
1.2.2.12! misho 106: static void *
! 107: pubRX(sched_task_t *task)
! 108: {
! 109: int siz, rlen;
! 110: char szTopic[BUFSIZ] = { 0 };
! 111: void *data = NULL;
! 112: u_short mid;
! 113:
! 114: rlen = RecvFrom(TASK_FD(task));
! 115: if (siz == -1)
! 116: goto end;
! 117: siz = mqtt_readPUBLISH(args->msg->msg_base, szTopic, sizeof szTopic, &mid, &data);
! 118: if (siz == -1)
! 119: goto end;
! 120:
! 121: fprintf(TASK_ARG(task), "\nMessage ID: 0x%04hu\n", mid);
! 122:
! 123: if (data) {
! 124: fputs((const char*) data, TASK_ARG(task));
! 125: free(data);
! 126: }
! 127: end:
! 128: schedReadSelf(task);
! 129: return NULL;
! 130: }
! 131:
! 132:
1.2 misho 133: int
134: main(int argc, char **argv)
135: {
1.2.2.6 misho 136: char ch, un = 0, idx = 0, batch = 1;
1.2.2.3 misho 137: ait_val_t val;
1.2 misho 138: u_short port = atoi(MQTT_PORT);
1.2.2.3 misho 139: mqtt_subscr_t *sub;
1.2.2.5 misho 140: int ret = 0;
1.2.2.3 misho 141: char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
1.2 misho 142: FILE *lf;
143:
1.2.2.9 misho 144: if (!(args = io_malloc(sizeof(struct tagArgs)))) {
1.2 misho 145: printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
146: return 1;
147: } else
148: memset(args, 0, sizeof(struct tagArgs));
1.2.2.3 misho 149: if (!(args->subscr = mqtt_subAlloc(idx))) {
150: printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2.2.9 misho 151: io_free(args);
1.2 misho 152: return 1;
153: } else
154: args->free = cleanArgs;
155:
156: if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
157: printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
158: args->free(args);
1.2.2.9 misho 159: io_free(args);
1.2 misho 160: return 1;
161: }
162:
1.2.2.4 misho 163: AIT_SET_STR(&args->ConnID, "");
1.2 misho 164: AIT_SET_STR(&args->User, "");
165: AIT_SET_STR(&args->Pass, "");
166:
167: args->ka = MQTT_KEEPALIVE;
1.2.2.6 misho 168: while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvuh")) != -1)
1.2 misho 169: switch (ch) {
170: case 'T':
171: args->ka = (u_short) strtol(optarg, NULL, 0);
172: break;
173: case 'M':
174: AIT_FREE_VAL(&args->Will.Msg);
175: AIT_SET_STR(&args->Will.Msg, optarg);
176: break;
177: case 'W':
178: AIT_FREE_VAL(&args->Will.Topic);
179: AIT_SET_STR(&args->Will.Topic, optarg);
180: break;
181: case 'U':
182: AIT_FREE_VAL(&args->User);
183: AIT_SET_STR(&args->User, optarg);
184: break;
185: case 'P':
186: AIT_FREE_VAL(&args->Pass);
187: AIT_SET_STR(&args->Pass, optarg);
188: break;
189: case 'p':
190: port = (u_short) strtol(optarg, NULL, 0);
191: break;
192: case 's':
1.2.2.3 misho 193: sub = mqtt_subRealloc(&args->subscr, idx + 1);
194: if (!sub) {
195: printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2 misho 196: args->free(args);
1.2.2.9 misho 197: io_free(args);
1.2 misho 198: return 1;
199: } else
1.2.2.3 misho 200: sub += idx++;
201:
202: strlcpy(szStr, optarg, sizeof szStr);
203: if ((str = strchr(szStr, '|'))) {
204: *str++ = 0;
205: *str -= 0x30;
206: if (*str < 0 || *str > MQTT_QOS_RESERVED)
207: sub->sub_ret = (u_char) args->QoS;
208: else
209: sub->sub_ret = (u_char) *str;
210: } else
211: sub->sub_ret = (u_char) args->QoS;
1.2.2.11 misho 212: sub->sub_topic.msg_base = strdup(szStr);
1.2.2.3 misho 213: sub->sub_topic.msg_len = strlen(szStr);
1.2 misho 214: break;
215: case 'q':
216: args->QoS = (char) strtol(optarg, NULL, 0);
217: if (args->QoS > MQTT_QOS_EXACTLY) {
218: printf("Error:: invalid QoS level %d\n", args->QoS);
219: args->free(args);
1.2.2.9 misho 220: io_free(args);
1.2 misho 221: return 1;
222: }
223: break;
224: case 'd':
225: args->Dup++;
226: break;
227: case 'C':
228: args->notClear++;
229: break;
230: case 'l':
231: strlcpy(szLogName, optarg, sizeof szLogName);
232: break;
233: case 'D':
234: batch = 0;
235: break;
236: case 'v':
237: io_incDebug;
238: break;
1.2.2.6 misho 239: case 'u':
240: un = 1;
241: break;
1.2 misho 242: case 'h':
243: default:
244: args->free(args);
1.2.2.9 misho 245: io_free(args);
1.2 misho 246: Usage();
247: return 1;
248: }
249: argc -= optind;
250: argv += optind;
1.2.2.3 misho 251: if (argc < 2) {
1.2 misho 252: printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");
253: args->free(args);
1.2.2.9 misho 254: io_free(args);
1.2 misho 255: Usage();
256: return 1;
1.2.2.4 misho 257: } else {
258: AIT_FREE_VAL(&args->ConnID);
259: AIT_SET_STR(&args->ConnID, argv[1]);
260: }
1.2.2.3 misho 261: if (argc > 2) {
1.2 misho 262: AIT_FREE_VAL(&args->Value);
1.2.2.3 misho 263: AIT_SET_STR(&args->Value, argv[2]);
1.2 misho 264: }
265: if (!io_gethostbyname(*argv, port, &args->addr)) {
266: printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
267: args->free(args);
1.2.2.9 misho 268: io_free(args);
1.2 misho 269: Usage();
270: return 1;
271: }
1.2.2.1 misho 272: printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
273: AIT_FREE_VAL(&val);
1.2 misho 274:
1.2.2.6 misho 275: if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
1.2 misho 276: args->free(args);
1.2.2.9 misho 277: io_free(args);
1.2 misho 278: return 2;
279: }
280:
1.2.2.5 misho 281: switch ((ret = ConnectClient(args->cli->sock))) {
1.2 misho 282: case -1:
283: printf(">> FAILED!\n");
284: break;
285: case MQTT_RETCODE_ACCEPTED:
286: printf(">> OK\n");
287: break;
288: case MQTT_RETCODE_REFUSE_VER:
289: printf(">> Incorrect version\n");
290: break;
291: case MQTT_RETCODE_REFUSE_ID:
292: printf(">> Incorrect connectID\n");
293: break;
294: case MQTT_RETCODE_REFUSE_UNAVAIL:
295: printf(">> Service unavailable\n");
296: break;
297: case MQTT_RETCODE_REFUSE_USERPASS:
298: printf(">> Refuse user/pass\n");
299: break;
300: case MQTT_RETCODE_DENIED:
301: printf(">> DENIED.\n");
302: break;
303: }
304:
305: if (ret == MQTT_RETCODE_ACCEPTED) {
306: if (*szLogName)
307: lf = fopen(szLogName, "w");
308: else
309: lf = stdout;
310: if (lf) {
1.2.2.12! misho 311: root = schedBegin();
! 312:
1.2.2.7 misho 313: ret = Subscribe(args->cli->sock, lf);
1.2.2.12! misho 314:
! 315: schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
! 316: schedRun(root, &Kill);
! 317:
1.2.2.7 misho 318: if (un)
319: Unsubscribe(args->cli->sock);
1.2 misho 320: fclose(lf);
1.2.2.12! misho 321:
! 322: schedEnd(&root);
1.2 misho 323: } else
324: printf("Error:: in subscribe file #%d - %s\n", errno, strerror(errno));
1.2.2.5 misho 325: } else
1.2 misho 326: ret = 3;
1.2.2.5 misho 327:
328: mqtt_cli_Close(&args->cli);
1.2 misho 329:
330: args->free(args);
1.2.2.9 misho 331: io_free(args);
1.2 misho 332: return ret;
333: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>