Annotation of mqtt/src/mqtt_subs.c, revision 1.4
1.4 ! misho 1: /*************************************************************************
! 2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
! 3: * by Michael Pounov <misho@openbsd-bg.org>
! 4: *
! 5: * $Author: misho $
! 6: * $Id: mqtt_subs.c,v 1.3.2.1 2012/07/03 12:22:56 misho Exp $
! 7: *
! 8: **************************************************************************
! 9: The ELWIX and AITNET software is distributed under the following
! 10: terms:
! 11:
! 12: All of the documentation and software included in the ELWIX and AITNET
! 13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
! 14:
! 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
! 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
! 17:
! 18: Redistribution and use in source and binary forms, with or without
! 19: modification, are permitted provided that the following conditions
! 20: are met:
! 21: 1. Redistributions of source code must retain the above copyright
! 22: notice, this list of conditions and the following disclaimer.
! 23: 2. Redistributions in binary form must reproduce the above copyright
! 24: notice, this list of conditions and the following disclaimer in the
! 25: documentation and/or other materials provided with the distribution.
! 26: 3. All advertising materials mentioning features or use of this software
! 27: must display the following acknowledgement:
! 28: This product includes software developed by Michael Pounov <misho@elwix.org>
! 29: ELWIX - Embedded LightWeight unIX and its contributors.
! 30: 4. Neither the name of AITNET nor the names of its contributors
! 31: may be used to endorse or promote products derived from this software
! 32: without specific prior written permission.
! 33:
! 34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
! 35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
! 36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
! 37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
! 38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
! 39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
! 40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
! 41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
! 42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
! 43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
! 44: SUCH DAMAGE.
! 45: */
1.2 misho 46: #include "global.h"
47: #include "rtlm.h"
48: #include "mqtt.h"
49: #include "client.h"
50:
51:
52: io_enableDEBUG;
53:
54: extern char compiled[], compiledby[], compilehost[];
1.3 misho 55: volatile intptr_t Kill;
56: sched_root_task_t *root;
1.2 misho 57:
58: struct tagArgs *args;
59:
60:
61: static void
62: Usage(void)
63: {
64: printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n"
65: "=== %s@%s === Compiled: %s ===\n\n"
1.3 misho 66: " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"
1.2 misho 67: "\t-l <value2file>\t\tSave received values to file\n"
1.3 misho 68: "\t-u\t\t\tUnsubscribe given topic(s)\n"
1.2 misho 69: "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"
70: "\t-d\t\t\tSend duplicate message\n\n"
71: "\t-C\t\t\tNot clear before connect!\n"
72: "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
73: "\t-T <timeout>\t\tKeep alive timeout in seconds\n"
74: "\t-U <username>\t\tUsername\n"
75: "\t-P <password>\t\tPassword\n"
76: "\t-W <topic>\t\tWill Topic\n"
77: "\t-M <message>\t\tWill Message\n\n"
78: "\t-D\t\t\tDaemon mode\n"
79: "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
80: "\t-h\t\t\tHelp! This screen\n\n",
81: compiledby, compilehost, compiled);
82: }
83:
84: static void
85: cleanArgs(struct tagArgs * __restrict args)
86: {
87: mqtt_msgFree(&args->msg, 42);
1.3 misho 88: mqtt_subFree(&args->subscr);
1.2 misho 89: AIT_FREE_VAL(&args->Will.Msg);
90: AIT_FREE_VAL(&args->Will.Topic);
91: AIT_FREE_VAL(&args->User);
92: AIT_FREE_VAL(&args->Pass);
93: AIT_FREE_VAL(&args->Publish);
94: AIT_FREE_VAL(&args->Value);
95: AIT_FREE_VAL(&args->ConnID);
96: }
97:
98: static int
99: Subscribe(int sock, FILE *lf)
100: {
1.3 misho 101: u_char *qoses, *qos;
102: u_short mid;
103: mqtt_subscr_t *sub;
104:
105: #ifdef __NetBSD__
106: srandom(getpid() ^ time(NULL));
107: #else
108: srandomdev();
109: #endif
110: mid = random() % USHRT_MAX;
111:
112: printf(" > Execute SUBSCRIBE request #%d ... ", mid);
113: qoses = mqtt_cli_Subscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK);
114: if (!qoses) {
115: printf("Error:: Subscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
116: return -1;
117: } else
118: printf("OK\n");
119:
120: for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
121: printf(" + Topic %s with QoS %d subscribe %s\n", (char*)
122: sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
123:
124: free(qoses);
125: return 0;
126: }
127:
128: static int
129: Unsubscribe(int sock)
130: {
131: u_short mid;
132:
133: #ifdef __NetBSD__
134: srandom(getpid() ^ time(NULL));
135: #else
136: srandomdev();
137: #endif
138: mid = random() % USHRT_MAX;
139:
140: printf(" > Execute UNSUBSCRIBE request #%d ... ", mid);
141: if (mqtt_cli_Unsubscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK)) {
142: printf("Error:: Unsubscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
143: return -1;
144: } else
145: printf("OK\n");
146:
1.2 misho 147: return 0;
148: }
149:
1.3 misho 150: static void
151: sigz(int sig)
152: {
153: int stat;
154:
155: switch (sig) {
156: case SIGINT:
157: case SIGTERM:
158: Kill++;
159: break;
160: case SIGCHLD:
161: while (waitpid(-1, &stat, WNOHANG) > 0);
162: break;
163: }
164: }
165:
166:
167: static void *
168: execProc(sched_task_t *task)
169: {
170: FILE *f;
171: char szLine[MAXPATHLEN + BUFSIZ] = { 0 };
172:
173: snprintf(szLine, sizeof szLine, "%s '%s' %hu %u", AIT_GET_STR(&args->Value),
174: (char*) TASK_ARG(task), (u_short) TASK_VAL(task), (u_int) TASK_DATLEN(task));
175: if (TASK_ARG(task))
176: io_free(TASK_ARG(task));
177:
178: f = popen(szLine, "w");
179: if (!f) {
180: ioSYSERR(0);
181: return NULL;
182: } else
183: fputs(TASK_DATA(task), f);
184: pclose(f);
185: return NULL;
186: }
187:
188: static void *
189: pubRX(sched_task_t *task)
190: {
191: int siz, rlen;
192: char szTime[STRSIZ] = { 0 }, szTopic[STRSIZ] = { 0 };
193: void *data = NULL;
194: u_short mid;
195: time_t tim;
196: struct mqtthdr *hdr;
197:
198: rlen = RecvFrom(TASK_FD(task));
199: if (rlen == -1)
200: goto end;
201: if (!rlen) {
202: Kill++;
203: return NULL;
204: }
205:
206: while (rlen > 0) {
207: hdr = (struct mqtthdr*) args->msg->msg_base;
208:
209: switch (hdr->mqtt_msg.type) {
210: case MQTT_TYPE_PUBLISH:
211: siz = mqtt_readPUBLISH(args->msg, szTopic, sizeof szTopic, &mid, &data);
212: if (siz == -1)
213: goto end;
214: else {
215: siz = mqtt_pktLen(hdr);
216: rlen -= siz;
217: ioVERBOSE(4) printf("Remains %d bytes, packet %d bytes\n",
218: rlen, siz);
219: }
220:
221: /* send to output */
222: tim = time(NULL);
223: strftime(szTime, sizeof szTime, "%Y-%m-%d %H:%M:%S", localtime(&tim));
224: fprintf(TASK_ARG(task), "\n[%s] Message ID: %04hu, QoS: %hhu, "
225: "Length: %u, Topic: %s\n", szTime, mid, hdr->mqtt_msg.qos,
226: siz, szTopic);
227:
228: if (data) {
229: fputs((const char*) data, TASK_ARG(task));
230: free(data);
231: }
232:
233: fprintf(TASK_ARG(task), "\n");
234: fflush(TASK_ARG(task));
235:
236: /* if exists exec script */
237: if (!AIT_ISEMPTY(&args->Value))
238: schedEvent(root, execProc, io_strdup(szTopic), mid, data, siz);
239:
240: memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
241: break;
242: case MQTT_TYPE_PINGREQ:
243: siz = mqtt_msgPINGRESP(args->msg);
244: if (siz == -1)
245: goto end;
246: else
247: rlen -= siz;
248:
249: /* send ping reply */
250: if (SendTo(TASK_FD(task), siz) == -1)
251: goto end;
252:
253: memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
254: break;
255: default:
256: ioVERBOSE(1) printf("Unwanted message type #%d ...\n", hdr->mqtt_msg.type);
257: goto end;
258: }
259: }
260: end:
261: schedReadSelf(task);
262: return NULL;
263: }
264:
1.2 misho 265:
266: int
267: main(int argc, char **argv)
268: {
1.3 misho 269: char ch, un = 0, idx = 0, batch = 1;
270: ait_val_t val;
1.2 misho 271: u_short port = atoi(MQTT_PORT);
1.3 misho 272: mqtt_subscr_t *sub;
273: int ret = 0;
274: char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
1.2 misho 275: FILE *lf;
1.3 misho 276: struct sigaction sa;
1.2 misho 277:
1.3 misho 278: if (!(args = io_malloc(sizeof(struct tagArgs)))) {
1.2 misho 279: printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
280: return 1;
281: } else
282: memset(args, 0, sizeof(struct tagArgs));
1.3 misho 283: if (!(args->subscr = mqtt_subAlloc(idx))) {
284: printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
285: io_free(args);
1.2 misho 286: return 1;
287: } else
288: args->free = cleanArgs;
289:
290: if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
291: printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
292: args->free(args);
1.3 misho 293: io_free(args);
1.2 misho 294: return 1;
295: }
296:
297: AIT_SET_STR(&args->ConnID, "");
298: AIT_SET_STR(&args->User, "");
299: AIT_SET_STR(&args->Pass, "");
300:
301: args->ka = MQTT_KEEPALIVE;
1.3 misho 302: while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvuh")) != -1)
1.2 misho 303: switch (ch) {
304: case 'T':
305: args->ka = (u_short) strtol(optarg, NULL, 0);
306: break;
307: case 'M':
308: AIT_FREE_VAL(&args->Will.Msg);
309: AIT_SET_STR(&args->Will.Msg, optarg);
310: break;
311: case 'W':
312: AIT_FREE_VAL(&args->Will.Topic);
313: AIT_SET_STR(&args->Will.Topic, optarg);
314: break;
315: case 'U':
316: AIT_FREE_VAL(&args->User);
317: AIT_SET_STR(&args->User, optarg);
318: break;
319: case 'P':
320: AIT_FREE_VAL(&args->Pass);
321: AIT_SET_STR(&args->Pass, optarg);
322: break;
323: case 'p':
324: port = (u_short) strtol(optarg, NULL, 0);
325: break;
326: case 's':
1.3 misho 327: sub = mqtt_subRealloc(&args->subscr, idx + 1);
328: if (!sub) {
329: printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2 misho 330: args->free(args);
1.3 misho 331: io_free(args);
1.2 misho 332: return 1;
333: } else
1.3 misho 334: sub += idx++;
335:
336: strlcpy(szStr, optarg, sizeof szStr);
337: if ((str = strchr(szStr, '|'))) {
338: *str++ = 0;
339: *str -= 0x30;
340: if (*str < 0 || *str > MQTT_QOS_RESERVED)
341: sub->sub_ret = (u_char) args->QoS;
342: else
343: sub->sub_ret = (u_char) *str;
344: } else
345: sub->sub_ret = (u_char) args->QoS;
346: sub->sub_topic.msg_base = strdup(szStr);
347: sub->sub_topic.msg_len = strlen(szStr);
1.2 misho 348: break;
349: case 'q':
350: args->QoS = (char) strtol(optarg, NULL, 0);
351: if (args->QoS > MQTT_QOS_EXACTLY) {
352: printf("Error:: invalid QoS level %d\n", args->QoS);
353: args->free(args);
1.3 misho 354: io_free(args);
1.2 misho 355: return 1;
356: }
357: break;
358: case 'd':
359: args->Dup++;
360: break;
361: case 'C':
362: args->notClear++;
363: break;
364: case 'l':
365: strlcpy(szLogName, optarg, sizeof szLogName);
366: break;
367: case 'D':
368: batch = 0;
369: break;
370: case 'v':
371: io_incDebug;
372: break;
1.3 misho 373: case 'u':
374: un = 1;
375: break;
1.2 misho 376: case 'h':
377: default:
378: args->free(args);
1.3 misho 379: io_free(args);
1.2 misho 380: Usage();
381: return 1;
382: }
383: argc -= optind;
384: argv += optind;
1.3 misho 385: if (argc < 2) {
1.2 misho 386: printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");
387: args->free(args);
1.3 misho 388: io_free(args);
1.2 misho 389: Usage();
390: return 1;
391: } else {
392: AIT_FREE_VAL(&args->ConnID);
393: AIT_SET_STR(&args->ConnID, argv[1]);
394: }
1.3 misho 395: if (argc > 2) {
1.2 misho 396: AIT_FREE_VAL(&args->Value);
1.3 misho 397: AIT_SET_STR(&args->Value, argv[2]);
1.2 misho 398: }
399: if (!io_gethostbyname(*argv, port, &args->addr)) {
400: printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
401: args->free(args);
1.3 misho 402: io_free(args);
1.2 misho 403: Usage();
404: return 1;
405: }
1.3 misho 406: printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
407: AIT_FREE_VAL(&val);
1.2 misho 408:
1.3 misho 409: sa.sa_handler = sigz;
410: sigemptyset(&sa.sa_mask);
411: sigaction(SIGTERM, &sa, NULL);
412: sigaction(SIGINT, &sa, NULL);
413: sigaction(SIGCHLD, &sa, NULL);
414:
415: if (!batch)
416: switch (fork()) {
417: case -1: /* error */
418: printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
419: ret = 2;
420: goto end;
421: case 0: /* child */
422: setsid();
423:
424: ret = open("/dev/null", O_RDWR);
425: if (ret != -1) {
426: dup2(ret, STDIN_FILENO);
427: dup2(ret, STDOUT_FILENO);
428: dup2(ret, STDERR_FILENO);
429: close(ret);
430: }
431: break;
432: default: /* parent */
433: printf(">> Service started\n");
434: ret = 0;
435: goto end;
436: }
437:
438: if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
439: ret = 2;
440: goto end;
1.2 misho 441: }
442:
1.3 misho 443: switch ((ret = ConnectClient(args->cli->sock))) {
1.2 misho 444: case -1:
445: printf(">> FAILED!\n");
446: break;
447: case MQTT_RETCODE_ACCEPTED:
448: printf(">> OK\n");
449: break;
450: case MQTT_RETCODE_REFUSE_VER:
451: printf(">> Incorrect version\n");
452: break;
453: case MQTT_RETCODE_REFUSE_ID:
454: printf(">> Incorrect connectID\n");
455: break;
456: case MQTT_RETCODE_REFUSE_UNAVAIL:
457: printf(">> Service unavailable\n");
458: break;
459: case MQTT_RETCODE_REFUSE_USERPASS:
460: printf(">> Refuse user/pass\n");
461: break;
462: case MQTT_RETCODE_DENIED:
463: printf(">> DENIED.\n");
464: break;
465: }
466:
467: if (ret == MQTT_RETCODE_ACCEPTED) {
468: if (*szLogName)
469: lf = fopen(szLogName, "w");
470: else
471: lf = stdout;
472: if (lf) {
1.3 misho 473: ret = Subscribe(args->cli->sock, lf);
474:
475: root = schedBegin();
476:
477: schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
478: schedRun(root, &Kill);
479:
480: schedEnd(&root);
481:
482: if (un)
483: Unsubscribe(args->cli->sock);
1.2 misho 484: fclose(lf);
485: } else
1.3 misho 486: printf("Error:: in output file #%d - %s\n", errno, strerror(errno));
487: } else
1.2 misho 488: ret = 3;
489:
1.3 misho 490: mqtt_cli_Close(&args->cli);
491: end:
1.2 misho 492: args->free(args);
1.3 misho 493: io_free(args);
1.2 misho 494: return ret;
495: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>