Annotation of mqtt/src/mqtt_pub.c, revision 1.1.2.6
1.1.2.1 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "mqtt.h"
4: #include "client.h"
5:
6:
7: io_enableDEBUG;
8:
9: extern char compiled[], compiledby[], compilehost[];
10:
11: struct tagArgs *args;
12:
13:
14: static void
15: Usage(void)
16: {
17: printf( " -= MQTT Publisher Client =- Publisher from ELWIX\n"
18: "=== %s@%s === Compiled: %s ===\n\n"
19: " Syntax: mqtt_pub [options] <connect_to_broker[:port]> <ConnectID> <topic> <value_for_publish>\n\n"
20: "\t-f\t\t\t'value_for_publish' is file name instead text\n"
21: "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"
1.1.2.5 misho 22: "\t-m <message_id>\t\tMessage ID for publish message\n"
1.1.2.1 misho 23: "\t-d\t\t\tSend duplicate message\n"
24: "\t-r\t\t\tRetain message from broker\n\n"
25: "\t-C\t\t\tNot clear before connect!!!\n"
26: "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
27: "\t-T <timeout>\t\tKeep alive timeout in seconds (default: 10sec)\n"
28: "\t-U <username>\t\tUsername\n"
29: "\t-P <password>\t\tPassword\n"
30: "\t-W <topic>\t\tWill Topic\n"
31: "\t-M <message>\t\tWill Message\n"
32: "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
33: "\t-h\t\t\tHelp! This screen\n\n",
34: compiledby, compilehost, compiled);
35: }
36:
37: static void
38: cleanArgs(struct tagArgs * __restrict args)
39: {
40: mqtt_msgFree(&args->msg, 42);
41: AIT_FREE_VAL(&args->Will.Msg);
42: AIT_FREE_VAL(&args->Will.Topic);
43: AIT_FREE_VAL(&args->User);
44: AIT_FREE_VAL(&args->Pass);
45: AIT_FREE_VAL(&args->Publish);
46: AIT_FREE_VAL(&args->Value);
47: AIT_FREE_VAL(&args->ConnID);
48: }
49:
50: static int
51: Publish(int sock)
52: {
1.1.2.4 misho 53: int siz = 0;
54:
1.1.2.5 misho 55: siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup,
56: args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value));
57: if (siz == -1) {
58: printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
59: return -1;
60: }
1.1.2.6 ! misho 61: if (SendTo(sock, siz) == -1)
1.1.2.5 misho 62: return -1;
63: /* QoS == MQTT_QOS_ONCE, no wait for reply */
64: if (args->QoS == MQTT_QOS_ONCE)
65: return 0;
66:
1.1.2.6 ! misho 67: if ((siz = RecvFrom(sock)) == -1 || !siz)
1.1.2.5 misho 68: return -1;
69: /* QoS == MQTT_QOS_ACK, wait for PUBACK */
70: if (args->QoS == MQTT_QOS_ACK)
71: return mqtt_readPUBACK(args->msg);
72:
1.1.2.6 ! misho 73:
1.1.2.5 misho 74: /* QoS == MQTT_QOS_EXACTLY */
75: if (mqtt_readPUBREC(args->msg) != args->MsgID) {
76: printf("Error:: Message not delivered\n");
77: return -1;
78: }
79:
80: siz = mqtt_msgPUBREL(args->msg, args->MsgID);
81: if (siz == -1) {
1.1.2.6 ! misho 82: printf("Error:: msgPUBREL #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
1.1.2.5 misho 83: return -1;
84: }
1.1.2.6 ! misho 85: if (SendTo(sock, siz) == -1)
1.1.2.5 misho 86: return -1;
87:
1.1.2.6 ! misho 88: if ((siz = RecvFrom(sock)) == -1 || !siz)
1.1.2.5 misho 89: return -1;
90: return mqtt_readPUBCOMP(args->msg);
1.1.2.1 misho 91: }
92:
93:
94: int
95: main(int argc, char **argv)
96: {
97: char ch;
98: ait_val_t val;
99: u_short port = atoi(MQTT_PORT);
100: int sock, ret = 0;
101:
102: if (!(args = malloc(sizeof(struct tagArgs)))) {
103: printf("Error:: in alloc arguments #%d - %s\n", errno, strerror(errno));
104: return 1;
105: } else
106: memset(args, 0, sizeof(struct tagArgs));
107: args->free = cleanArgs;
108:
109: if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
110: printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
111: args->free(args);
112: free(args);
113: return 1;
114: }
115:
116: AIT_SET_STR(&args->ConnID, "");
117: AIT_SET_STR(&args->User, "");
118: AIT_SET_STR(&args->Pass, "");
119:
120: args->ka = MQTT_KEEPALIVE;
1.1.2.5 misho 121: while ((ch = getopt(argc, argv, "T:U:P:p:q:drCW:M:m:fvh")) != -1)
1.1.2.1 misho 122: switch (ch) {
123: case 'T':
124: args->ka = (u_short) strtol(optarg, NULL, 0);
125: break;
126: case 'M':
127: AIT_FREE_VAL(&args->Will.Msg);
128: AIT_SET_STR(&args->Will.Msg, optarg);
129: break;
130: case 'W':
131: AIT_FREE_VAL(&args->Will.Topic);
132: AIT_SET_STR(&args->Will.Topic, optarg);
133: break;
134: case 'U':
135: AIT_FREE_VAL(&args->User);
136: AIT_SET_STR(&args->User, optarg);
137: break;
138: case 'P':
139: AIT_FREE_VAL(&args->Pass);
140: AIT_SET_STR(&args->Pass, optarg);
141: break;
1.1.2.5 misho 142: case 'm':
143: args->MsgID = (u_short) strtol(optarg, NULL, 0);
144: break;
1.1.2.1 misho 145: case 'p':
146: port = (u_short) strtol(optarg, NULL, 0);
147: break;
148: case 'q':
149: args->QoS = (char) strtol(optarg, NULL, 0);
150: if (args->QoS > MQTT_QOS_EXACTLY) {
151: printf("Error:: invalid QoS level %d\n", args->QoS);
152: args->free(args);
153: free(args);
154: return 1;
155: }
156: break;
157: case 'd':
158: args->Dup++;
159: break;
160: case 'r':
161: args->Retain++;
162: break;
163: case 'C':
164: args->notClear++;
165: break;
166: case 'f':
167: args->isFile++;
168: break;
169: case 'v':
170: io_incDebug;
171: break;
172: case 'h':
173: default:
174: args->free(args);
175: free(args);
176: Usage();
177: return 1;
178: }
179: argc -= optind;
180: argv += optind;
181: if (argc < 4) {
182: printf("Error:: host for connect not found, connection id, topic or value not supplied!\n\n");
183: args->free(args);
184: free(args);
185: Usage();
186: return 1;
187: } else {
188: AIT_FREE_VAL(&args->ConnID);
189: AIT_SET_STR(&args->ConnID, argv[1]);
190: AIT_FREE_VAL(&args->Publish);
191: AIT_SET_STR(&args->Publish, argv[2]);
192: AIT_FREE_VAL(&args->Value);
193: AIT_SET_STR(&args->Value, argv[3]);
194: }
195: if (!io_gethostbyname(*argv, port, &args->addr)) {
196: printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
197: args->free(args);
198: free(args);
199: Usage();
200: return 1;
201: }
1.1.2.5 misho 202: if (args->QoS && !args->MsgID)
203: args->MsgID = MQTT_DEFAULT_MSGID;
1.1.2.1 misho 204: ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
205:
206: if ((sock = InitClient()) == -1) {
207: args->free(args);
208: free(args);
209: return 2;
210: }
211:
1.1.2.4 misho 212: if (args->isFile && !OpenFile()) {
213: args->free(args);
214: free(args);
215: return 3;
216: }
217:
1.1.2.1 misho 218: printf("Connected ... ");
1.1.2.3 misho 219: switch ((ret = ConnectClient(sock))) {
1.1.2.1 misho 220: case -1:
221: printf(">> FAILED!\n");
222: break;
223: case MQTT_RETCODE_ACCEPTED:
224: printf(">> OK\n");
225: break;
226: case MQTT_RETCODE_REFUSE_VER:
227: printf(">> Incorrect version\n");
228: break;
229: case MQTT_RETCODE_REFUSE_ID:
230: printf(">> Incorrect connectID\n");
231: break;
232: case MQTT_RETCODE_REFUSE_UNAVAIL:
233: printf(">> Service unavailable\n");
234: break;
235: case MQTT_RETCODE_REFUSE_USERPASS:
236: printf(">> Refuse user/pass\n");
237: break;
238: case MQTT_RETCODE_DENIED:
239: printf(">> DENIED.\n");
240: break;
241: }
242:
243: if (ret == MQTT_RETCODE_ACCEPTED) {
1.1.2.5 misho 244: ret = !(Publish(sock) == args->MsgID);
1.1.2.3 misho 245: CloseClient(sock);
246: } else {
247: close(sock);
1.1.2.4 misho 248: ret = 4;
1.1.2.3 misho 249: }
1.1.2.1 misho 250:
1.1.2.4 misho 251: CloseFile();
1.1.2.1 misho 252: args->free(args);
253: free(args);
254: return ret;
255: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>