Annotation of mqtt/src/mqtt_subs.c, revision 1.3
1.2 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "mqtt.h"
4: #include "client.h"
5:
6:
7: io_enableDEBUG;
8:
9: extern char compiled[], compiledby[], compilehost[];
1.3 ! misho 10: volatile intptr_t Kill;
! 11: sched_root_task_t *root;
1.2 misho 12:
13: struct tagArgs *args;
14:
15:
16: static void
17: Usage(void)
18: {
19: printf( " -= MQTT Subscriber Client =- Subscriber from ELWIX\n"
20: "=== %s@%s === Compiled: %s ===\n\n"
1.3 ! misho 21: " Syntax: mqtt_subs [options] <connect_to_broker[:port]> <ConnectID> [exec_script <value>]\n\n"
1.2 misho 22: "\t-l <value2file>\t\tSave received values to file\n"
1.3 ! misho 23: "\t-u\t\t\tUnsubscribe given topic(s)\n"
1.2 misho 24: "\t-s <topic[|QoS]>\tSubscribe for this topic, if wish add different |QoS to topic\n"
25: "\t-d\t\t\tSend duplicate message\n\n"
26: "\t-C\t\t\tNot clear before connect!\n"
27: "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
28: "\t-T <timeout>\t\tKeep alive timeout in seconds\n"
29: "\t-U <username>\t\tUsername\n"
30: "\t-P <password>\t\tPassword\n"
31: "\t-W <topic>\t\tWill Topic\n"
32: "\t-M <message>\t\tWill Message\n\n"
33: "\t-D\t\t\tDaemon mode\n"
34: "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
35: "\t-h\t\t\tHelp! This screen\n\n",
36: compiledby, compilehost, compiled);
37: }
38:
39: static void
40: cleanArgs(struct tagArgs * __restrict args)
41: {
42: mqtt_msgFree(&args->msg, 42);
1.3 ! misho 43: mqtt_subFree(&args->subscr);
1.2 misho 44: AIT_FREE_VAL(&args->Will.Msg);
45: AIT_FREE_VAL(&args->Will.Topic);
46: AIT_FREE_VAL(&args->User);
47: AIT_FREE_VAL(&args->Pass);
48: AIT_FREE_VAL(&args->Publish);
49: AIT_FREE_VAL(&args->Value);
50: AIT_FREE_VAL(&args->ConnID);
51: }
52:
53: static int
54: Subscribe(int sock, FILE *lf)
55: {
1.3 ! misho 56: u_char *qoses, *qos;
! 57: u_short mid;
! 58: mqtt_subscr_t *sub;
! 59:
! 60: #ifdef __NetBSD__
! 61: srandom(getpid() ^ time(NULL));
! 62: #else
! 63: srandomdev();
! 64: #endif
! 65: mid = random() % USHRT_MAX;
! 66:
! 67: printf(" > Execute SUBSCRIBE request #%d ... ", mid);
! 68: qoses = mqtt_cli_Subscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK);
! 69: if (!qoses) {
! 70: printf("Error:: Subscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
! 71: return -1;
! 72: } else
! 73: printf("OK\n");
! 74:
! 75: for (sub = args->subscr, qos = qoses; sub->sub_topic.msg_base; sub++, qos++)
! 76: printf(" + Topic %s with QoS %d subscribe %s\n", (char*)
! 77: sub->sub_topic.msg_base, sub->sub_ret, *qos ? "done" : "failed");
! 78:
! 79: free(qoses);
! 80: return 0;
! 81: }
! 82:
! 83: static int
! 84: Unsubscribe(int sock)
! 85: {
! 86: u_short mid;
! 87:
! 88: #ifdef __NetBSD__
! 89: srandom(getpid() ^ time(NULL));
! 90: #else
! 91: srandomdev();
! 92: #endif
! 93: mid = random() % USHRT_MAX;
! 94:
! 95: printf(" > Execute UNSUBSCRIBE request #%d ... ", mid);
! 96: if (mqtt_cli_Unsubscribe(args->cli, args->subscr, mid, args->Dup, MQTT_QOS_ACK)) {
! 97: printf("Error:: Unsubscribe #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
! 98: return -1;
! 99: } else
! 100: printf("OK\n");
! 101:
1.2 misho 102: return 0;
103: }
104:
1.3 ! misho 105: static void
! 106: sigz(int sig)
! 107: {
! 108: int stat;
! 109:
! 110: switch (sig) {
! 111: case SIGINT:
! 112: case SIGTERM:
! 113: Kill++;
! 114: break;
! 115: case SIGCHLD:
! 116: while (waitpid(-1, &stat, WNOHANG) > 0);
! 117: break;
! 118: }
! 119: }
! 120:
! 121:
! 122: static void *
! 123: execProc(sched_task_t *task)
! 124: {
! 125: FILE *f;
! 126: char szLine[MAXPATHLEN + BUFSIZ] = { 0 };
! 127:
! 128: snprintf(szLine, sizeof szLine, "%s '%s' %hu %u", AIT_GET_STR(&args->Value),
! 129: (char*) TASK_ARG(task), (u_short) TASK_VAL(task), (u_int) TASK_DATLEN(task));
! 130: if (TASK_ARG(task))
! 131: io_free(TASK_ARG(task));
! 132:
! 133: f = popen(szLine, "w");
! 134: if (!f) {
! 135: ioSYSERR(0);
! 136: return NULL;
! 137: } else
! 138: fputs(TASK_DATA(task), f);
! 139: pclose(f);
! 140: return NULL;
! 141: }
! 142:
! 143: static void *
! 144: pubRX(sched_task_t *task)
! 145: {
! 146: int siz, rlen;
! 147: char szTime[STRSIZ] = { 0 }, szTopic[STRSIZ] = { 0 };
! 148: void *data = NULL;
! 149: u_short mid;
! 150: time_t tim;
! 151: struct mqtthdr *hdr;
! 152:
! 153: rlen = RecvFrom(TASK_FD(task));
! 154: if (rlen == -1)
! 155: goto end;
! 156: if (!rlen) {
! 157: Kill++;
! 158: return NULL;
! 159: }
! 160:
! 161: while (rlen > 0) {
! 162: hdr = (struct mqtthdr*) args->msg->msg_base;
! 163:
! 164: switch (hdr->mqtt_msg.type) {
! 165: case MQTT_TYPE_PUBLISH:
! 166: siz = mqtt_readPUBLISH(args->msg, szTopic, sizeof szTopic, &mid, &data);
! 167: if (siz == -1)
! 168: goto end;
! 169: else {
! 170: siz = mqtt_pktLen(hdr);
! 171: rlen -= siz;
! 172: ioVERBOSE(4) printf("Remains %d bytes, packet %d bytes\n",
! 173: rlen, siz);
! 174: }
! 175:
! 176: /* send to output */
! 177: tim = time(NULL);
! 178: strftime(szTime, sizeof szTime, "%Y-%m-%d %H:%M:%S", localtime(&tim));
! 179: fprintf(TASK_ARG(task), "\n[%s] Message ID: %04hu, QoS: %hhu, "
! 180: "Length: %u, Topic: %s\n", szTime, mid, hdr->mqtt_msg.qos,
! 181: siz, szTopic);
! 182:
! 183: if (data) {
! 184: fputs((const char*) data, TASK_ARG(task));
! 185: free(data);
! 186: }
! 187:
! 188: fprintf(TASK_ARG(task), "\n");
! 189: fflush(TASK_ARG(task));
! 190:
! 191: /* if exists exec script */
! 192: if (!AIT_ISEMPTY(&args->Value))
! 193: schedEvent(root, execProc, io_strdup(szTopic), mid, data, siz);
! 194:
! 195: memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
! 196: break;
! 197: case MQTT_TYPE_PINGREQ:
! 198: siz = mqtt_msgPINGRESP(args->msg);
! 199: if (siz == -1)
! 200: goto end;
! 201: else
! 202: rlen -= siz;
! 203:
! 204: /* send ping reply */
! 205: if (SendTo(TASK_FD(task), siz) == -1)
! 206: goto end;
! 207:
! 208: memmove(args->msg->msg_base, args->msg->msg_base + siz, rlen);
! 209: break;
! 210: default:
! 211: ioVERBOSE(1) printf("Unwanted message type #%d ...\n", hdr->mqtt_msg.type);
! 212: goto end;
! 213: }
! 214: }
! 215: end:
! 216: schedReadSelf(task);
! 217: return NULL;
! 218: }
! 219:
1.2 misho 220:
221: int
222: main(int argc, char **argv)
223: {
1.3 ! misho 224: char ch, un = 0, idx = 0, batch = 1;
! 225: ait_val_t val;
1.2 misho 226: u_short port = atoi(MQTT_PORT);
1.3 ! misho 227: mqtt_subscr_t *sub;
! 228: int ret = 0;
! 229: char *str, szStr[STRSIZ], szLogName[MAXPATHLEN] = { 0 };
1.2 misho 230: FILE *lf;
1.3 ! misho 231: struct sigaction sa;
1.2 misho 232:
1.3 ! misho 233: if (!(args = io_malloc(sizeof(struct tagArgs)))) {
1.2 misho 234: printf("Error:: in arguments #%d - %s\n", errno, strerror(errno));
235: return 1;
236: } else
237: memset(args, 0, sizeof(struct tagArgs));
1.3 ! misho 238: if (!(args->subscr = mqtt_subAlloc(idx))) {
! 239: printf("Error:: in subscribes array #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
! 240: io_free(args);
1.2 misho 241: return 1;
242: } else
243: args->free = cleanArgs;
244:
245: if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
246: printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
247: args->free(args);
1.3 ! misho 248: io_free(args);
1.2 misho 249: return 1;
250: }
251:
252: AIT_SET_STR(&args->ConnID, "");
253: AIT_SET_STR(&args->User, "");
254: AIT_SET_STR(&args->Pass, "");
255:
256: args->ka = MQTT_KEEPALIVE;
1.3 ! misho 257: while ((ch = getopt(argc, argv, "T:U:P:p:s:q:dl:W:M:CDvuh")) != -1)
1.2 misho 258: switch (ch) {
259: case 'T':
260: args->ka = (u_short) strtol(optarg, NULL, 0);
261: break;
262: case 'M':
263: AIT_FREE_VAL(&args->Will.Msg);
264: AIT_SET_STR(&args->Will.Msg, optarg);
265: break;
266: case 'W':
267: AIT_FREE_VAL(&args->Will.Topic);
268: AIT_SET_STR(&args->Will.Topic, optarg);
269: break;
270: case 'U':
271: AIT_FREE_VAL(&args->User);
272: AIT_SET_STR(&args->User, optarg);
273: break;
274: case 'P':
275: AIT_FREE_VAL(&args->Pass);
276: AIT_SET_STR(&args->Pass, optarg);
277: break;
278: case 'p':
279: port = (u_short) strtol(optarg, NULL, 0);
280: break;
281: case 's':
1.3 ! misho 282: sub = mqtt_subRealloc(&args->subscr, idx + 1);
! 283: if (!sub) {
! 284: printf("Error:: #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.2 misho 285: args->free(args);
1.3 ! misho 286: io_free(args);
1.2 misho 287: return 1;
288: } else
1.3 ! misho 289: sub += idx++;
! 290:
! 291: strlcpy(szStr, optarg, sizeof szStr);
! 292: if ((str = strchr(szStr, '|'))) {
! 293: *str++ = 0;
! 294: *str -= 0x30;
! 295: if (*str < 0 || *str > MQTT_QOS_RESERVED)
! 296: sub->sub_ret = (u_char) args->QoS;
! 297: else
! 298: sub->sub_ret = (u_char) *str;
! 299: } else
! 300: sub->sub_ret = (u_char) args->QoS;
! 301: sub->sub_topic.msg_base = strdup(szStr);
! 302: sub->sub_topic.msg_len = strlen(szStr);
1.2 misho 303: break;
304: case 'q':
305: args->QoS = (char) strtol(optarg, NULL, 0);
306: if (args->QoS > MQTT_QOS_EXACTLY) {
307: printf("Error:: invalid QoS level %d\n", args->QoS);
308: args->free(args);
1.3 ! misho 309: io_free(args);
1.2 misho 310: return 1;
311: }
312: break;
313: case 'd':
314: args->Dup++;
315: break;
316: case 'C':
317: args->notClear++;
318: break;
319: case 'l':
320: strlcpy(szLogName, optarg, sizeof szLogName);
321: break;
322: case 'D':
323: batch = 0;
324: break;
325: case 'v':
326: io_incDebug;
327: break;
1.3 ! misho 328: case 'u':
! 329: un = 1;
! 330: break;
1.2 misho 331: case 'h':
332: default:
333: args->free(args);
1.3 ! misho 334: io_free(args);
1.2 misho 335: Usage();
336: return 1;
337: }
338: argc -= optind;
339: argv += optind;
1.3 ! misho 340: if (argc < 2) {
1.2 misho 341: printf("Error:: host for connect not found, connection id or topic not supplied!\n\n");
342: args->free(args);
1.3 ! misho 343: io_free(args);
1.2 misho 344: Usage();
345: return 1;
346: } else {
347: AIT_FREE_VAL(&args->ConnID);
348: AIT_SET_STR(&args->ConnID, argv[1]);
349: }
1.3 ! misho 350: if (argc > 2) {
1.2 misho 351: AIT_FREE_VAL(&args->Value);
1.3 ! misho 352: AIT_SET_STR(&args->Value, argv[2]);
1.2 misho 353: }
354: if (!io_gethostbyname(*argv, port, &args->addr)) {
355: printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
356: args->free(args);
1.3 ! misho 357: io_free(args);
1.2 misho 358: Usage();
359: return 1;
360: }
1.3 ! misho 361: printf("Connecting to %s:%d ... ", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
! 362: AIT_FREE_VAL(&val);
1.2 misho 363:
1.3 ! misho 364: sa.sa_handler = sigz;
! 365: sigemptyset(&sa.sa_mask);
! 366: sigaction(SIGTERM, &sa, NULL);
! 367: sigaction(SIGINT, &sa, NULL);
! 368: sigaction(SIGCHLD, &sa, NULL);
! 369:
! 370: if (!batch)
! 371: switch (fork()) {
! 372: case -1: /* error */
! 373: printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
! 374: ret = 2;
! 375: goto end;
! 376: case 0: /* child */
! 377: setsid();
! 378:
! 379: ret = open("/dev/null", O_RDWR);
! 380: if (ret != -1) {
! 381: dup2(ret, STDIN_FILENO);
! 382: dup2(ret, STDOUT_FILENO);
! 383: dup2(ret, STDERR_FILENO);
! 384: close(ret);
! 385: }
! 386: break;
! 387: default: /* parent */
! 388: printf(">> Service started\n");
! 389: ret = 0;
! 390: goto end;
! 391: }
! 392:
! 393: if (!(args->cli = mqtt_cli_Open(&args->addr.sa, args->ka))) {
! 394: ret = 2;
! 395: goto end;
1.2 misho 396: }
397:
1.3 ! misho 398: switch ((ret = ConnectClient(args->cli->sock))) {
1.2 misho 399: case -1:
400: printf(">> FAILED!\n");
401: break;
402: case MQTT_RETCODE_ACCEPTED:
403: printf(">> OK\n");
404: break;
405: case MQTT_RETCODE_REFUSE_VER:
406: printf(">> Incorrect version\n");
407: break;
408: case MQTT_RETCODE_REFUSE_ID:
409: printf(">> Incorrect connectID\n");
410: break;
411: case MQTT_RETCODE_REFUSE_UNAVAIL:
412: printf(">> Service unavailable\n");
413: break;
414: case MQTT_RETCODE_REFUSE_USERPASS:
415: printf(">> Refuse user/pass\n");
416: break;
417: case MQTT_RETCODE_DENIED:
418: printf(">> DENIED.\n");
419: break;
420: }
421:
422: if (ret == MQTT_RETCODE_ACCEPTED) {
423: if (*szLogName)
424: lf = fopen(szLogName, "w");
425: else
426: lf = stdout;
427: if (lf) {
1.3 ! misho 428: ret = Subscribe(args->cli->sock, lf);
! 429:
! 430: root = schedBegin();
! 431:
! 432: schedRead(root, pubRX, lf, args->cli->sock, NULL, 0);
! 433: schedRun(root, &Kill);
! 434:
! 435: schedEnd(&root);
! 436:
! 437: if (un)
! 438: Unsubscribe(args->cli->sock);
1.2 misho 439: fclose(lf);
440: } else
1.3 ! misho 441: printf("Error:: in output file #%d - %s\n", errno, strerror(errno));
! 442: } else
1.2 misho 443: ret = 3;
444:
1.3 ! misho 445: mqtt_cli_Close(&args->cli);
! 446: end:
1.2 misho 447: args->free(args);
1.3 ! misho 448: io_free(args);
1.2 misho 449: return ret;
450: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>