Return to mqtt_subs.c CVS log | Up to [ELWIX - Embedded LightWeight unIX -] / mqtt / src |
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "mqtt.h"
4: #include "client.h"
5:
6:
7: io_enableDEBUG;
8:
9: extern char compiled[], compiledby[], compilehost[];
1.2.2.2 misho 10: volatile intptr_t Kill;
1.2.2.12 misho 11: sched_root_task_t *root;
1.2 misho 12:
13: struct tagArgs *args;
14:
15:
16: static void
17: Usage(void)
18: {
19: printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n"
20: "=== %s@%s === Compiled: %s ===\n\n"
1.2.2.3 misho 21: " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"
1.2 misho 22: "\t-l <value2file>\t\tSave received values to file\n"
1.2.2.8 misho 23: "\t-u\t\t\tUnsubscribe given topic(s)\n"
1.2 misho 24: "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"
25: "\t-d\t\t\tSend duplicate message\n\n"
26: "\t-C\t\t\tNot clear before connect!\n"
27: "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
28: "\t-T <timeout>\t\tKeep alive timeout in seconds\n"
29: "\t-U <username>\t\tUsername\n"
30: "\t-P <password>\t\tPassword\n"
31: "\t-W <topic>\t\tWill Topic\n"
32: "\t-M <message>\t\tWill Message\n\n"
33: "\t-D\t\t\tDaemon mode\n"
34: "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
35: "\t-h\t\t\tHelp! This screen\n\n",
36: compiledby, compilehost, compiled);
37: }
38:
39: static void
40: cleanArgs(struct tagArgs * __restrict args)
41: {
42: mqtt_msgFree(&args->msg, 42);
1.2.2.3 misho 43: mqtt_subFree(&args->subscr);
1.2 misho 44: AIT_FREE_VAL(&args->Will.Msg);
45: AIT_FREE_VAL(&args->Will.Topic);
46: AIT_FREE_VAL(&args->User);
47: AIT_FREE_VAL(&args->Pass);
48: AIT_FREE_VAL(&args->Publish);
49: AIT_FREE_VAL(&args->Value);
50: AIT_FREE_VAL(&args->ConnID);
51: }
52:
53: static int
54: Subscribe(int sock, FILE *lf)
55: {
1.2.2.6 misho 56: u_char *qoses, *qos;
57: u_short mid;
58: mqtt_subscr_t *sub;
1.2.2.3 misho 59:
1.2.2.10 misho 60: #ifdef __NetBSD__
61: srandom(getpid() ^ time(NULL));
62: #else
1.2.2.4 misho 63: srandomdev();
1.2.2.10 misho 64: #endif
1.2.2.6 misho 65: mid = random() % USHRT_MAX;
1.2.2.4 misho 66:
1.2.2.6 misho 67: printf(" > Execute SUBSCRIBE request #%d ... ", mid);
68: qoses = mqtt_cli_Subscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK);
69: if (!qoses) {
70: printf("Error:: Subscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2.2.3 misho 71: return -1;
1.2.2.6 misho 72: } else
73: printf("OK\n");
1.2.2.3 misho 74:
1.2.2.6 misho 75: for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
1.2.2.16! misho 76: printf(" + Topic %s with QoS %d subscribe %s\n", (char*)
1.2.2.6 misho 77: sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
78:
79: free(qoses);
80: return 0;
81: }
82:
83: static int
84: Unsubscribe(int sock)
85: {
86: u_short mid;
87:
1.2.2.10 misho 88: #ifdef __NetBSD__
89: srandom(getpid() ^ time(NULL));
90: #else
1.2.2.6 misho 91: srandomdev();
1.2.2.10 misho 92: #endif
1.2.2.6 misho 93: mid = random() % USHRT_MAX;
94:
1.2.2.7 misho 95: printf(" > Execute UNSUBSCRIBE request #%d ... ", mid);
1.2.2.6 misho 96: if (mqtt_cli_Unsubscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK)) {
97: printf("Error:: Unsubscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2.2.3 misho 98: return -1;
1.2.2.7 misho 99: } else
100: printf("OK\n");
1.2.2.3 misho 101:
1.2 misho 102: return 0;
103: }
104:
1.2.2.14 misho 105: static void
106: sigz(int sig)
107: {
108: int stat;
109:
110: switch (sig) {
111: case SIGINT:
112: case SIGTERM:
113: Kill++;
114: break;
115: case SIGCHLD:
116: while (waitpid(-1, &stat, WNOHANG) > 0);
117: break;
118: }
119: }
120:
121:
122: static void *
123: execProc(sched_task_t *task)
124: {
125: FILE *f;
126: char szLine[MAXPATHLEN + BUFSIZ] = { 0 };
127:
128: snprintf(szLine, sizeof szLine, "%s '%s' %hu %u", AIT_GET_STR(&args->Value),
129: (char*) TASK_ARG(task), (u_short) TASK_VAL(task), TASK_DATLEN(task));
130: if (TASK_ARG(task))
131: io_free(TASK_ARG(task));
132:
133: f = popen(szLine, "w");
134: if (!f) {
135: ioSYSERR(0);
136: return NULL;
137: } else
138: fputs(TASK_DATA(task), f);
139: pclose(f);
140: return NULL;
141: }
1.2 misho 142:
1.2.2.12 misho 143: static void *
144: pubRX(sched_task_t *task)
145: {
146: int siz, rlen;
1.2.2.14 misho 147: char szTime[STRSIZ] = { 0 }, szTopic[STRSIZ] = { 0 };
1.2.2.12 misho 148: void *data = NULL;
149: u_short mid;
1.2.2.14 misho 150: time_t tim;
1.2.2.12 misho 151:
152: rlen = RecvFrom(TASK_FD(task));
1.2.2.13 misho 153: if (rlen == -1)
1.2.2.12 misho 154: goto end;
1.2.2.14 misho 155: if (!rlen) {
156: Kill++;
157: return NULL;
158: }
1.2.2.13 misho 159: siz = mqtt_readPUBLISH(args->msg, szTopic, sizeof szTopic, &mid, &data);
1.2.2.12 misho 160: if (siz == -1)
161: goto end;
162:
1.2.2.14 misho 163: /* send to output */
164: tim = time(NULL);
165: strftime(szTime, sizeof szTime, "%Y-%m-%d %H:%M:%S", localtime(&tim));
1.2.2.15 misho 166: fprintf(TASK_ARG(task), "\n[%s] Message ID: %04hu, Length: %u, Topic: %s\n",
1.2.2.14 misho 167: szTime, mid, siz, szTopic);
1.2.2.12 misho 168:
169: if (data) {
170: fputs((const char*) data, TASK_ARG(task));
171: free(data);
172: }
1.2.2.13 misho 173:
1.2.2.14 misho 174: fprintf(TASK_ARG(task), "\n");
1.2.2.13 misho 175: fflush(TASK_ARG(task));
1.2.2.14 misho 176:
177: /* if exists exec script */
178: if (!AIT_ISEMPTY(&args->Value))
179: schedEvent(root, execProc, io_strdup(szTopic), mid, data, siz);
1.2.2.12 misho 180: end:
181: schedReadSelf(task);
182: return NULL;
183: }
184:
185:
1.2 misho 186: int
187: main(int argc, char **argv)
188: {
1.2.2.6 misho 189: char ch, un = 0, idx = 0, batch = 1;
1.2.2.3 misho 190: ait_val_t val;
1.2 misho 191: u_short port = atoi(MQTT_PORT);
1.2.2.3 misho 192: mqtt_subscr_t *sub;
1.2.2.5 misho 193: int ret = 0;
1.2.2.3 misho 194: char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
1.2 misho 195: FILE *lf;
1.2.2.14 misho 196: struct sigaction sa;
1.2 misho 197:
1.2.2.9 misho 198: if (!(args = io_malloc(sizeof(struct tagArgs)))) {
1.2 misho 199: printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
200: return 1;
201: } else
202: memset(args, 0, sizeof(struct tagArgs));
1.2.2.3 misho 203: if (!(args->subscr = mqtt_subAlloc(idx))) {
204: printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2.2.9 misho 205: io_free(args);
1.2 misho 206: return 1;
207: } else
208: args->free = cleanArgs;
209:
210: if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
211: printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
212: args->free(args);
1.2.2.9 misho 213: io_free(args);
1.2 misho 214: return 1;
215: }
216:
1.2.2.4 misho 217: AIT_SET_STR(&args->ConnID, "");
1.2 misho 218: AIT_SET_STR(&args->User, "");
219: AIT_SET_STR(&args->Pass, "");
220:
221: args->ka = MQTT_KEEPALIVE;
1.2.2.6 misho 222: while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvuh")) != -1)
1.2 misho 223: switch (ch) {
224: case 'T':
225: args->ka = (u_short) strtol(optarg, NULL, 0);
226: break;
227: case 'M':
228: AIT_FREE_VAL(&args->Will.Msg);
229: AIT_SET_STR(&args->Will.Msg, optarg);
230: break;
231: case 'W':
232: AIT_FREE_VAL(&args->Will.Topic);
233: AIT_SET_STR(&args->Will.Topic, optarg);
234: break;
235: case 'U':
236: AIT_FREE_VAL(&args->User);
237: AIT_SET_STR(&args->User, optarg);
238: break;
239: case 'P':
240: AIT_FREE_VAL(&args->Pass);
241: AIT_SET_STR(&args->Pass, optarg);
242: break;
243: case 'p':
244: port = (u_short) strtol(optarg, NULL, 0);
245: break;
246: case 's':
1.2.2.3 misho 247: sub = mqtt_subRealloc(&args->subscr, idx + 1);
248: if (!sub) {
249: printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2 misho 250: args->free(args);
1.2.2.9 misho 251: io_free(args);
1.2 misho 252: return 1;
253: } else
1.2.2.3 misho 254: sub += idx++;
255:
256: strlcpy(szStr, optarg, sizeof szStr);
257: if ((str = strchr(szStr, '|'))) {
258: *str++ = 0;
259: *str -= 0x30;
260: if (*str < 0 || *str > MQTT_QOS_RESERVED)
261: sub->sub_ret = (u_char) args->QoS;
262: else
263: sub->sub_ret = (u_char) *str;
264: } else
265: sub->sub_ret = (u_char) args->QoS;
1.2.2.11 misho 266: sub->sub_topic.msg_base = strdup(szStr);
1.2.2.3 misho 267: sub->sub_topic.msg_len = strlen(szStr);
1.2 misho 268: break;
269: case 'q':
270: args->QoS = (char) strtol(optarg, NULL, 0);
271: if (args->QoS > MQTT_QOS_EXACTLY) {
272: printf("Error:: invalid QoS level %d\n", args->QoS);
273: args->free(args);
1.2.2.9 misho 274: io_free(args);
1.2 misho 275: return 1;
276: }
277: break;
278: case 'd':
279: args->Dup++;
280: break;
281: case 'C':
282: args->notClear++;
283: break;
284: case 'l':
285: strlcpy(szLogName, optarg, sizeof szLogName);
286: break;
287: case 'D':
288: batch = 0;
289: break;
290: case 'v':
291: io_incDebug;
292: break;
1.2.2.6 misho 293: case 'u':
294: un = 1;
295: break;
1.2 misho 296: case 'h':
297: default:
298: args->free(args);
1.2.2.9 misho 299: io_free(args);
1.2 misho 300: Usage();
301: return 1;
302: }
303: argc -= optind;
304: argv += optind;
1.2.2.3 misho 305: if (argc < 2) {
1.2 misho 306: printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");
307: args->free(args);
1.2.2.9 misho 308: io_free(args);
1.2 misho 309: Usage();
310: return 1;
1.2.2.4 misho 311: } else {
312: AIT_FREE_VAL(&args->ConnID);
313: AIT_SET_STR(&args->ConnID, argv[1]);
314: }
1.2.2.3 misho 315: if (argc > 2) {
1.2 misho 316: AIT_FREE_VAL(&args->Value);
1.2.2.3 misho 317: AIT_SET_STR(&args->Value, argv[2]);
1.2 misho 318: }
319: if (!io_gethostbyname(*argv, port, &args->addr)) {
320: printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
321: args->free(args);
1.2.2.9 misho 322: io_free(args);
1.2 misho 323: Usage();
324: return 1;
325: }
1.2.2.1 misho 326: printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
327: AIT_FREE_VAL(&val);
1.2 misho 328:
1.2.2.14 misho 329: sa.sa_handler = sigz;
330: sigemptyset(&sa.sa_mask);
331: sigaction(SIGTERM, &sa, NULL);
332: sigaction(SIGINT, &sa, NULL);
333: sigaction(SIGCHLD, &sa, NULL);
334:
335: if (!batch)
336: switch (fork()) {
337: case -1: /* error */
338: printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
339: ret = 2;
340: goto end;
341: case 0: /* child */
342: setsid();
343:
344: ret = open("/dev/null", O_RDWR);
345: if (ret != -1) {
346: dup2(ret, STDIN_FILENO);
347: dup2(ret, STDOUT_FILENO);
348: dup2(ret, STDERR_FILENO);
349: close(ret);
350: }
351: break;
352: default: /* parent */
353: printf(">> Service started\n");
354: ret = 0;
355: goto end;
356: }
357:
1.2.2.6 misho 358: if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
1.2.2.14 misho 359: ret = 2;
360: goto end;
1.2 misho 361: }
362:
1.2.2.5 misho 363: switch ((ret = ConnectClient(args->cli->sock))) {
1.2 misho 364: case -1:
365: printf(">> FAILED!\n");
366: break;
367: case MQTT_RETCODE_ACCEPTED:
368: printf(">> OK\n");
369: break;
370: case MQTT_RETCODE_REFUSE_VER:
371: printf(">> Incorrect version\n");
372: break;
373: case MQTT_RETCODE_REFUSE_ID:
374: printf(">> Incorrect connectID\n");
375: break;
376: case MQTT_RETCODE_REFUSE_UNAVAIL:
377: printf(">> Service unavailable\n");
378: break;
379: case MQTT_RETCODE_REFUSE_USERPASS:
380: printf(">> Refuse user/pass\n");
381: break;
382: case MQTT_RETCODE_DENIED:
383: printf(">> DENIED.\n");
384: break;
385: }
386:
387: if (ret == MQTT_RETCODE_ACCEPTED) {
388: if (*szLogName)
389: lf = fopen(szLogName, "w");
390: else
391: lf = stdout;
392: if (lf) {
1.2.2.12 misho 393: root = schedBegin();
394:
1.2.2.7 misho 395: ret = Subscribe(args->cli->sock, lf);
1.2.2.12 misho 396:
397: schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
398: schedRun(root, &Kill);
399:
1.2.2.7 misho 400: if (un)
401: Unsubscribe(args->cli->sock);
1.2 misho 402: fclose(lf);
1.2.2.12 misho 403:
404: schedEnd(&root);
1.2 misho 405: } else
1.2.2.14 misho 406: printf("Error:: in output file #%d - %s\n", errno, strerror(errno));
1.2.2.5 misho 407: } else
1.2 misho 408: ret = 3;
1.2.2.5 misho 409:
410: mqtt_cli_Close(&args->cli);
1.2.2.14 misho 411: end:
1.2 misho 412: args->free(args);
1.2.2.9 misho 413: io_free(args);
1.2 misho 414: return ret;
415: }