Annotation of mqtt/src/mqtt_subs.c, revision 1.5
1.4 misho 1: /*************************************************************************
2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.5 ! misho 6: * $Id: mqtt_subs.c,v 1.4.4.2 2013/01/18 10:32:17 misho Exp $
1.4 misho 7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.5 ! misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
1.4 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.2 misho 46: #include "global.h"
47: #include "rtlm.h"
48: #include "mqtt.h"
49: #include "client.h"
50:
51:
52: extern char compiled[], compiledby[], compilehost[];
1.3 misho 53: volatile intptr_t Kill;
54: sched_root_task_t *root;
1.2 misho 55:
56: struct tagArgs *args;
57:
58:
59: static void
60: Usage(void)
61: {
62: printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n"
63: "=== %s@%s === Compiled: %s ===\n\n"
1.3 misho 64: " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"
1.2 misho 65: "\t-l <value2file>\t\tSave received values to file\n"
1.3 misho 66: "\t-u\t\t\tUnsubscribe given topic(s)\n"
1.2 misho 67: "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"
68: "\t-d\t\t\tSend duplicate message\n\n"
69: "\t-C\t\t\tNot clear before connect!\n"
70: "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
71: "\t-T <timeout>\t\tKeep alive timeout in seconds\n"
72: "\t-U <username>\t\tUsername\n"
73: "\t-P <password>\t\tPassword\n"
74: "\t-W <topic>\t\tWill Topic\n"
75: "\t-M <message>\t\tWill Message\n\n"
76: "\t-D\t\t\tDaemon mode\n"
77: "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
78: "\t-h\t\t\tHelp! This screen\n\n",
79: compiledby, compilehost, compiled);
80: }
81:
82: static void
83: cleanArgs(struct tagArgs * __restrict args)
84: {
85: mqtt_msgFree(&args->msg, 42);
1.3 misho 86: mqtt_subFree(&args->subscr);
1.2 misho 87: AIT_FREE_VAL(&args->Will.Msg);
88: AIT_FREE_VAL(&args->Will.Topic);
89: AIT_FREE_VAL(&args->User);
90: AIT_FREE_VAL(&args->Pass);
91: AIT_FREE_VAL(&args->Publish);
92: AIT_FREE_VAL(&args->Value);
93: AIT_FREE_VAL(&args->ConnID);
94: }
95:
96: static int
97: Subscribe(int sock, FILE *lf)
98: {
1.3 misho 99: u_char *qoses, *qos;
100: u_short mid;
101: mqtt_subscr_t *sub;
102:
103: #ifdef __NetBSD__
104: srandom(getpid() ^ time(NULL));
105: #else
106: srandomdev();
107: #endif
108: mid = random() % USHRT_MAX;
109:
110: printf(" > Execute SUBSCRIBE request #%d ... ", mid);
111: qoses = mqtt_cli_Subscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK);
112: if (!qoses) {
113: printf("Error:: Subscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
114: return -1;
115: } else
116: printf("OK\n");
117:
118: for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
119: printf(" + Topic %s with QoS %d subscribe %s\n", (char*)
120: sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
121:
122: free(qoses);
123: return 0;
124: }
125:
126: static int
127: Unsubscribe(int sock)
128: {
129: u_short mid;
130:
131: #ifdef __NetBSD__
132: srandom(getpid() ^ time(NULL));
133: #else
134: srandomdev();
135: #endif
136: mid = random() % USHRT_MAX;
137:
138: printf(" > Execute UNSUBSCRIBE request #%d ... ", mid);
139: if (mqtt_cli_Unsubscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK)) {
140: printf("Error:: Unsubscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
141: return -1;
142: } else
143: printf("OK\n");
144:
1.2 misho 145: return 0;
146: }
147:
1.3 misho 148: static void
149: sigz(int sig)
150: {
151: int stat;
152:
153: switch (sig) {
154: case SIGINT:
155: case SIGTERM:
156: Kill++;
157: break;
158: case SIGCHLD:
159: while (waitpid(-1, &stat, WNOHANG) > 0);
160: break;
161: }
162: }
163:
164:
165: static void *
166: execProc(sched_task_t *task)
167: {
168: FILE *f;
169: char szLine[MAXPATHLEN + BUFSIZ] = { 0 };
170:
171: snprintf(szLine, sizeof szLine, "%s '%s' %hu %u", AIT_GET_STR(&args->Value),
172: (char*) TASK_ARG(task), (u_short) TASK_VAL(task), (u_int) TASK_DATLEN(task));
173: if (TASK_ARG(task))
1.5 ! misho 174: e_free(TASK_ARG(task));
1.3 misho 175:
176: f = popen(szLine, "w");
177: if (!f) {
1.5 ! misho 178: ESYSERR(0);
1.3 misho 179: return NULL;
180: } else
181: fputs(TASK_DATA(task), f);
182: pclose(f);
183: return NULL;
184: }
185:
186: static void *
187: pubRX(sched_task_t *task)
188: {
189: int siz, rlen;
190: char szTime[STRSIZ] = { 0 }, szTopic[STRSIZ] = { 0 };
191: void *data = NULL;
192: u_short mid;
193: time_t tim;
194: struct mqtthdr *hdr;
195:
196: rlen = RecvFrom(TASK_FD(task));
197: if (rlen == -1)
198: goto end;
199: if (!rlen) {
200: Kill++;
201: return NULL;
202: }
203:
204: while (rlen > 0) {
205: hdr = (struct mqtthdr*) args->msg->msg_base;
206:
207: switch (hdr->mqtt_msg.type) {
208: case MQTT_TYPE_PUBLISH:
209: siz = mqtt_readPUBLISH(args->msg, szTopic, sizeof szTopic, &mid, &data);
210: if (siz == -1)
211: goto end;
212: else {
213: siz = mqtt_pktLen(hdr);
214: rlen -= siz;
1.5 ! misho 215: EVERBS(4) printf("Remains %d bytes, packet %d bytes\n", rlen, siz);
1.3 misho 216: }
217:
218: /* send to output */
219: tim = time(NULL);
220: strftime(szTime, sizeof szTime, "%Y-%m-%d %H:%M:%S", localtime(&tim));
221: fprintf(TASK_ARG(task), "\n[%s] Message ID: %04hu, QoS: %hhu, "
222: "Length: %u, Topic: %s\n", szTime, mid, hdr->mqtt_msg.qos,
223: siz, szTopic);
224:
225: if (data) {
226: fputs((const char*) data, TASK_ARG(task));
227: free(data);
228: }
229:
230: fprintf(TASK_ARG(task), "\n");
231: fflush(TASK_ARG(task));
232:
233: /* if exists exec script */
234: if (!AIT_ISEMPTY(&args->Value))
1.5 ! misho 235: schedEvent(root, execProc, e_strdup(szTopic), mid, data, siz);
1.3 misho 236:
237: memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
238: break;
239: case MQTT_TYPE_PINGREQ:
240: siz = mqtt_msgPINGRESP(args->msg);
241: if (siz == -1)
242: goto end;
243: else
244: rlen -= siz;
245:
246: /* send ping reply */
247: if (SendTo(TASK_FD(task), siz) == -1)
248: goto end;
249:
250: memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
251: break;
252: default:
1.5 ! misho 253: EVERBS(1) printf("Unwanted message type #%d ...\n", hdr->mqtt_msg.type);
1.3 misho 254: goto end;
255: }
256: }
257: end:
258: schedReadSelf(task);
259: return NULL;
260: }
261:
1.2 misho 262:
263: int
264: main(int argc, char **argv)
265: {
1.3 misho 266: char ch, un = 0, idx = 0, batch = 1;
267: ait_val_t val;
1.2 misho 268: u_short port = atoi(MQTT_PORT);
1.3 misho 269: mqtt_subscr_t *sub;
270: int ret = 0;
271: char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
1.2 misho 272: FILE *lf;
1.3 misho 273: struct sigaction sa;
1.2 misho 274:
1.5 ! misho 275: if (!(args = e_malloc(sizeof(struct tagArgs)))) {
1.2 misho 276: printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
277: return 1;
278: } else
279: memset(args, 0, sizeof(struct tagArgs));
1.3 misho 280: if (!(args->subscr = mqtt_subAlloc(idx))) {
281: printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.5 ! misho 282: e_free(args);
1.2 misho 283: return 1;
284: } else
285: args->free = cleanArgs;
286:
287: if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
288: printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
289: args->free(args);
1.5 ! misho 290: e_free(args);
1.2 misho 291: return 1;
292: }
293:
294: AIT_SET_STR(&args->ConnID, "");
295: AIT_SET_STR(&args->User, "");
296: AIT_SET_STR(&args->Pass, "");
297:
298: args->ka = MQTT_KEEPALIVE;
1.3 misho 299: while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvuh")) != -1)
1.2 misho 300: switch (ch) {
301: case 'T':
302: args->ka = (u_short) strtol(optarg, NULL, 0);
303: break;
304: case 'M':
305: AIT_FREE_VAL(&args->Will.Msg);
306: AIT_SET_STR(&args->Will.Msg, optarg);
307: break;
308: case 'W':
309: AIT_FREE_VAL(&args->Will.Topic);
310: AIT_SET_STR(&args->Will.Topic, optarg);
311: break;
312: case 'U':
313: AIT_FREE_VAL(&args->User);
314: AIT_SET_STR(&args->User, optarg);
315: break;
316: case 'P':
317: AIT_FREE_VAL(&args->Pass);
318: AIT_SET_STR(&args->Pass, optarg);
319: break;
320: case 'p':
321: port = (u_short) strtol(optarg, NULL, 0);
322: break;
323: case 's':
1.3 misho 324: sub = mqtt_subRealloc(&args->subscr, idx + 1);
325: if (!sub) {
326: printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2 misho 327: args->free(args);
1.5 ! misho 328: e_free(args);
1.2 misho 329: return 1;
330: } else
1.3 misho 331: sub += idx++;
332:
333: strlcpy(szStr, optarg, sizeof szStr);
334: if ((str = strchr(szStr, '|'))) {
335: *str++ = 0;
336: *str -= 0x30;
337: if (*str < 0 || *str > MQTT_QOS_RESERVED)
338: sub->sub_ret = (u_char) args->QoS;
339: else
340: sub->sub_ret = (u_char) *str;
341: } else
342: sub->sub_ret = (u_char) args->QoS;
343: sub->sub_topic.msg_base = strdup(szStr);
344: sub->sub_topic.msg_len = strlen(szStr);
1.2 misho 345: break;
346: case 'q':
347: args->QoS = (char) strtol(optarg, NULL, 0);
348: if (args->QoS > MQTT_QOS_EXACTLY) {
349: printf("Error:: invalid QoS level %d\n", args->QoS);
350: args->free(args);
1.5 ! misho 351: e_free(args);
1.2 misho 352: return 1;
353: }
354: break;
355: case 'd':
356: args->Dup++;
357: break;
358: case 'C':
359: args->notClear++;
360: break;
361: case 'l':
362: strlcpy(szLogName, optarg, sizeof szLogName);
363: break;
364: case 'D':
365: batch = 0;
366: break;
367: case 'v':
1.5 ! misho 368: e_incVerbose;
1.2 misho 369: break;
1.3 misho 370: case 'u':
371: un = 1;
372: break;
1.2 misho 373: case 'h':
374: default:
375: args->free(args);
1.5 ! misho 376: e_free(args);
1.2 misho 377: Usage();
378: return 1;
379: }
380: argc -= optind;
381: argv += optind;
1.3 misho 382: if (argc < 2) {
1.2 misho 383: printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");
384: args->free(args);
1.5 ! misho 385: e_free(args);
1.2 misho 386: Usage();
387: return 1;
388: } else {
389: AIT_FREE_VAL(&args->ConnID);
390: AIT_SET_STR(&args->ConnID, argv[1]);
391: }
1.3 misho 392: if (argc > 2) {
1.2 misho 393: AIT_FREE_VAL(&args->Value);
1.3 misho 394: AIT_SET_STR(&args->Value, argv[2]);
1.2 misho 395: }
1.5 ! misho 396: if (!e_gethostbyname(*argv, port, &args->addr)) {
! 397: printf("Error:: host not valid #%d - %s\n", elwix_GetErrno(), elwix_GetError());
1.2 misho 398: args->free(args);
1.5 ! misho 399: e_free(args);
1.2 misho 400: Usage();
401: return 1;
402: }
1.5 ! misho 403: printf("Connecting to %s:%d ... ", e_n2addr(&args->addr, &val), e_n2port(&args->addr));
1.3 misho 404: AIT_FREE_VAL(&val);
1.2 misho 405:
1.5 ! misho 406: memset(&sa, 0, sizeof sa);
1.3 misho 407: sa.sa_handler = sigz;
408: sigemptyset(&sa.sa_mask);
409: sigaction(SIGTERM, &sa, NULL);
410: sigaction(SIGINT, &sa, NULL);
411: sigaction(SIGCHLD, &sa, NULL);
412:
413: if (!batch)
414: switch (fork()) {
415: case -1: /* error */
416: printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
417: ret = 2;
418: goto end;
419: case 0: /* child */
420: setsid();
421:
422: ret = open("/dev/null", O_RDWR);
423: if (ret != -1) {
424: dup2(ret, STDIN_FILENO);
425: dup2(ret, STDOUT_FILENO);
426: dup2(ret, STDERR_FILENO);
427: close(ret);
428: }
429: break;
430: default: /* parent */
431: printf(">> Service started\n");
432: ret = 0;
433: goto end;
434: }
435:
436: if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
437: ret = 2;
438: goto end;
1.2 misho 439: }
440:
1.3 misho 441: switch ((ret = ConnectClient(args->cli->sock))) {
1.2 misho 442: case -1:
443: printf(">> FAILED!\n");
444: break;
445: case MQTT_RETCODE_ACCEPTED:
446: printf(">> OK\n");
447: break;
448: case MQTT_RETCODE_REFUSE_VER:
449: printf(">> Incorrect version\n");
450: break;
451: case MQTT_RETCODE_REFUSE_ID:
452: printf(">> Incorrect connectID\n");
453: break;
454: case MQTT_RETCODE_REFUSE_UNAVAIL:
455: printf(">> Service unavailable\n");
456: break;
457: case MQTT_RETCODE_REFUSE_USERPASS:
458: printf(">> Refuse user/pass\n");
459: break;
460: case MQTT_RETCODE_DENIED:
461: printf(">> DENIED.\n");
462: break;
463: }
464:
465: if (ret == MQTT_RETCODE_ACCEPTED) {
466: if (*szLogName)
467: lf = fopen(szLogName, "w");
468: else
469: lf = stdout;
470: if (lf) {
1.3 misho 471: ret = Subscribe(args->cli->sock, lf);
472:
473: root = schedBegin();
474:
475: schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
476: schedRun(root, &Kill);
477:
478: schedEnd(&root);
479:
480: if (un)
481: Unsubscribe(args->cli->sock);
1.2 misho 482: fclose(lf);
483: } else
1.3 misho 484: printf("Error:: in output file #%d - %s\n", errno, strerror(errno));
485: } else
1.2 misho 486: ret = 3;
487:
1.3 misho 488: mqtt_cli_Close(&args->cli);
489: end:
1.2 misho 490: args->free(args);
1.5 ! misho 491: e_free(args);
1.2 misho 492: return ret;
493: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>