Annotation of mqtt/src/mqtt_pub.c, revision 1.2
1.2 ! misho 1: #include "global.h"
! 2: #include "rtlm.h"
! 3: #include "mqtt.h"
! 4: #include "client.h"
! 5:
! 6:
! 7: io_enableDEBUG;
! 8:
! 9: extern char compiled[], compiledby[], compilehost[];
! 10:
! 11: struct tagArgs *args;
! 12:
! 13:
! 14: static void
! 15: Usage(void)
! 16: {
! 17: printf( " -= MQTT Publisher Client =- Publisher from ELWIX\n"
! 18: "=== %s@%s === Compiled: %s ===\n\n"
! 19: " Syntax: mqtt_pub [options] <connect_to_broker[:port]> <ConnectID> <topic> <value_for_publish>\n\n"
! 20: "\t-f\t\t\t'value_for_publish' is file name instead text\n"
! 21: "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"
! 22: "\t-m <message_id>\t\tMessage ID for publish message\n"
! 23: "\t-d\t\t\tSend duplicate message\n"
! 24: "\t-r\t\t\tRetain message from broker\n\n"
! 25: "\t-C\t\t\tNot clear before connect!!!\n"
! 26: "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
! 27: "\t-T <timeout>\t\tKeep alive timeout in seconds (default: 10sec)\n"
! 28: "\t-U <username>\t\tUsername\n"
! 29: "\t-P <password>\t\tPassword\n"
! 30: "\t-W <topic>\t\tWill Topic\n"
! 31: "\t-M <message>\t\tWill Message\n"
! 32: "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
! 33: "\t-h\t\t\tHelp! This screen\n\n",
! 34: compiledby, compilehost, compiled);
! 35: }
! 36:
! 37: static void
! 38: cleanArgs(struct tagArgs * __restrict args)
! 39: {
! 40: mqtt_msgFree(&args->msg, 42);
! 41: AIT_FREE_VAL(&args->Will.Msg);
! 42: AIT_FREE_VAL(&args->Will.Topic);
! 43: AIT_FREE_VAL(&args->User);
! 44: AIT_FREE_VAL(&args->Pass);
! 45: AIT_FREE_VAL(&args->Publish);
! 46: AIT_FREE_VAL(&args->Value);
! 47: AIT_FREE_VAL(&args->ConnID);
! 48: }
! 49:
! 50: static int
! 51: Publish(int sock)
! 52: {
! 53: int siz = 0;
! 54:
! 55: siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup,
! 56: args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value));
! 57: if (siz == -1) {
! 58: printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
! 59: return -1;
! 60: }
! 61: if (SendTo(sock, siz) == -1)
! 62: return -1;
! 63: /* QoS == MQTT_QOS_ONCE, no wait for reply */
! 64: if (args->QoS == MQTT_QOS_ONCE)
! 65: return 0;
! 66:
! 67: if ((siz = RecvFrom(sock)) == -1 || !siz)
! 68: return -1;
! 69: /* QoS == MQTT_QOS_ACK, wait for PUBACK */
! 70: if (args->QoS == MQTT_QOS_ACK) {
! 71: siz = mqtt_readPUBACK(args->msg);
! 72: if (siz == args->MsgID)
! 73: return siz;
! 74: if (!args->Dup) {
! 75: args->Dup++;
! 76: return Publish(sock);
! 77: }
! 78: goto end;
! 79: }
! 80:
! 81:
! 82: /* QoS == MQTT_QOS_EXACTLY */
! 83: siz = mqtt_readPUBREC(args->msg);
! 84: if (siz != args->MsgID) {
! 85: if (!args->Dup) {
! 86: args->Dup++;
! 87: return Publish(sock);
! 88: }
! 89: goto end;
! 90: }
! 91:
! 92: siz = mqtt_msgPUBREL(args->msg, args->MsgID);
! 93: if (siz == -1) {
! 94: printf("Error:: msgPUBREL #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
! 95: return -1;
! 96: }
! 97: if (SendTo(sock, siz) == -1)
! 98: return -1;
! 99:
! 100: if ((siz = RecvFrom(sock)) == -1 || !siz)
! 101: return -1;
! 102:
! 103: siz = mqtt_readPUBCOMP(args->msg);
! 104: if (siz == args->MsgID)
! 105: return siz;
! 106: if (!args->Dup) {
! 107: args->Dup++;
! 108: return Publish(sock);
! 109: }
! 110:
! 111: end:
! 112: printf("Error:: Message not delivered\n");
! 113: return -1;
! 114: }
! 115:
! 116:
! 117: int
! 118: main(int argc, char **argv)
! 119: {
! 120: char ch;
! 121: ait_val_t val;
! 122: u_short port = atoi(MQTT_PORT);
! 123: int sock, ret = 0;
! 124:
! 125: if (!(args = malloc(sizeof(struct tagArgs)))) {
! 126: printf("Error:: in alloc arguments #%d - %s\n", errno, strerror(errno));
! 127: return 1;
! 128: } else
! 129: memset(args, 0, sizeof(struct tagArgs));
! 130: args->free = cleanArgs;
! 131:
! 132: if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
! 133: printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
! 134: args->free(args);
! 135: free(args);
! 136: return 1;
! 137: }
! 138:
! 139: AIT_SET_STR(&args->ConnID, "");
! 140: AIT_SET_STR(&args->User, "");
! 141: AIT_SET_STR(&args->Pass, "");
! 142:
! 143: args->ka = MQTT_KEEPALIVE;
! 144: while ((ch = getopt(argc, argv, "T:U:P:p:q:drCW:M:m:fvh")) != -1)
! 145: switch (ch) {
! 146: case 'T':
! 147: args->ka = (u_short) strtol(optarg, NULL, 0);
! 148: break;
! 149: case 'M':
! 150: AIT_FREE_VAL(&args->Will.Msg);
! 151: AIT_SET_STR(&args->Will.Msg, optarg);
! 152: break;
! 153: case 'W':
! 154: AIT_FREE_VAL(&args->Will.Topic);
! 155: AIT_SET_STR(&args->Will.Topic, optarg);
! 156: break;
! 157: case 'U':
! 158: AIT_FREE_VAL(&args->User);
! 159: AIT_SET_STR(&args->User, optarg);
! 160: break;
! 161: case 'P':
! 162: AIT_FREE_VAL(&args->Pass);
! 163: AIT_SET_STR(&args->Pass, optarg);
! 164: break;
! 165: case 'm':
! 166: args->MsgID = (u_short) strtol(optarg, NULL, 0);
! 167: break;
! 168: case 'p':
! 169: port = (u_short) strtol(optarg, NULL, 0);
! 170: break;
! 171: case 'q':
! 172: args->QoS = (char) strtol(optarg, NULL, 0);
! 173: if (args->QoS > MQTT_QOS_EXACTLY) {
! 174: printf("Error:: invalid QoS level %d\n", args->QoS);
! 175: args->free(args);
! 176: free(args);
! 177: return 1;
! 178: }
! 179: break;
! 180: case 'd':
! 181: args->Dup++;
! 182: break;
! 183: case 'r':
! 184: args->Retain++;
! 185: break;
! 186: case 'C':
! 187: args->notClear++;
! 188: break;
! 189: case 'f':
! 190: args->isFile++;
! 191: break;
! 192: case 'v':
! 193: io_incDebug;
! 194: break;
! 195: case 'h':
! 196: default:
! 197: args->free(args);
! 198: free(args);
! 199: Usage();
! 200: return 1;
! 201: }
! 202: argc -= optind;
! 203: argv += optind;
! 204: if (argc < 4) {
! 205: printf("Error:: host for connect not found, connection id, topic or value not supplied!\n\n");
! 206: args->free(args);
! 207: free(args);
! 208: Usage();
! 209: return 1;
! 210: } else {
! 211: AIT_FREE_VAL(&args->ConnID);
! 212: AIT_SET_STR(&args->ConnID, argv[1]);
! 213: AIT_FREE_VAL(&args->Publish);
! 214: AIT_SET_STR(&args->Publish, argv[2]);
! 215: AIT_FREE_VAL(&args->Value);
! 216: AIT_SET_STR(&args->Value, argv[3]);
! 217: }
! 218: if (!io_gethostbyname(*argv, port, &args->addr)) {
! 219: printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
! 220: args->free(args);
! 221: free(args);
! 222: Usage();
! 223: return 1;
! 224: }
! 225: if (args->QoS && !args->MsgID)
! 226: args->MsgID = MQTT_DEFAULT_MSGID;
! 227: ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
! 228:
! 229: if ((sock = InitClient()) == -1) {
! 230: args->free(args);
! 231: free(args);
! 232: return 2;
! 233: }
! 234:
! 235: if (args->isFile && !OpenFile()) {
! 236: args->free(args);
! 237: free(args);
! 238: return 3;
! 239: }
! 240:
! 241: printf("Connected ... ");
! 242: switch ((ret = ConnectClient(sock))) {
! 243: case -1:
! 244: printf(">> FAILED!\n");
! 245: break;
! 246: case MQTT_RETCODE_ACCEPTED:
! 247: printf(">> OK\n");
! 248: break;
! 249: case MQTT_RETCODE_REFUSE_VER:
! 250: printf(">> Incorrect version\n");
! 251: break;
! 252: case MQTT_RETCODE_REFUSE_ID:
! 253: printf(">> Incorrect connectID\n");
! 254: break;
! 255: case MQTT_RETCODE_REFUSE_UNAVAIL:
! 256: printf(">> Service unavailable\n");
! 257: break;
! 258: case MQTT_RETCODE_REFUSE_USERPASS:
! 259: printf(">> Refuse user/pass\n");
! 260: break;
! 261: case MQTT_RETCODE_DENIED:
! 262: printf(">> DENIED.\n");
! 263: break;
! 264: }
! 265:
! 266: if (ret == MQTT_RETCODE_ACCEPTED) {
! 267: ret = !(Publish(sock) == args->MsgID);
! 268: CloseClient(sock);
! 269: } else {
! 270: close(sock);
! 271: ret = 4;
! 272: }
! 273:
! 274: CloseFile();
! 275: args->free(args);
! 276: free(args);
! 277: return ret;
! 278: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>