Annotation of mqtt/src/mqtt_pub.c, revision 1.1.2.5
1.1.2.1 misho 1: #include "global.h"
2: #include "rtlm.h"
3: #include "mqtt.h"
4: #include "client.h"
5:
6:
7: io_enableDEBUG;
8:
9: extern char compiled[], compiledby[], compilehost[];
10:
11: struct tagArgs *args;
12:
13:
14: static void
15: Usage(void)
16: {
17: printf( " -= MQTT Publisher Client =- Publisher from ELWIX\n"
18: "=== %s@%s === Compiled: %s ===\n\n"
19: " Syntax: mqtt_pub [options] <connect_to_broker[:port]> <ConnectID> <topic> <value_for_publish>\n\n"
20: "\t-f\t\t\t'value_for_publish' is file name instead text\n"
21: "\t-q <QoS>\t\tQoS level (0-at most 1, 1-at least 1, 2-exactly 1)\n"
1.1.2.5 ! misho 22: "\t-m <message_id>\t\tMessage ID for publish message\n"
1.1.2.1 misho 23: "\t-d\t\t\tSend duplicate message\n"
24: "\t-r\t\t\tRetain message from broker\n\n"
25: "\t-C\t\t\tNot clear before connect!!!\n"
26: "\t-p <port>\t\tDifferent port for connect (default: 1883)\n"
27: "\t-T <timeout>\t\tKeep alive timeout in seconds (default: 10sec)\n"
28: "\t-U <username>\t\tUsername\n"
29: "\t-P <password>\t\tPassword\n"
30: "\t-W <topic>\t\tWill Topic\n"
31: "\t-M <message>\t\tWill Message\n"
32: "\t-v\t\t\tVerbose (more -vvv, more verbose)\n"
33: "\t-h\t\t\tHelp! This screen\n\n",
34: compiledby, compilehost, compiled);
35: }
36:
37: static void
38: cleanArgs(struct tagArgs * __restrict args)
39: {
40: mqtt_msgFree(&args->msg, 42);
41: AIT_FREE_VAL(&args->Will.Msg);
42: AIT_FREE_VAL(&args->Will.Topic);
43: AIT_FREE_VAL(&args->User);
44: AIT_FREE_VAL(&args->Pass);
45: AIT_FREE_VAL(&args->Publish);
46: AIT_FREE_VAL(&args->Value);
47: AIT_FREE_VAL(&args->ConnID);
48: }
49:
50: static int
51: Publish(int sock)
52: {
1.1.2.4 misho 53: int siz = 0;
1.1.2.5 ! misho 54: struct pollfd pfd;
! 55: struct mqtthdr *hdr;
1.1.2.4 misho 56:
1.1.2.5 ! misho 57: siz = mqtt_msgPUBLISH(args->msg, AIT_GET_STR(&args->Publish), args->MsgID, args->Dup,
! 58: args->QoS, args->Retain, AIT_GET_PTR2(&args->Value), AIT_LEN(&args->Value));
! 59: if (siz == -1) {
! 60: printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
! 61: return -1;
! 62: }
! 63:
! 64: siz = send(sock, args->msg->msg_base, siz, 0);
! 65: if (siz == -1) {
! 66: printf("Error:: send() #%d - %s\n", errno, strerror(errno));
! 67: return -1;
! 68: } else
! 69: ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz);
! 70:
! 71: /* QoS == MQTT_QOS_ONCE, no wait for reply */
! 72: if (args->QoS == MQTT_QOS_ONCE)
! 73: return 0;
! 74:
! 75: pfd.fd = sock;
! 76: pfd.events = POLLIN | POLLPRI;
! 77: switch (poll(&pfd, 1, args->ka * 1000)) {
! 78: case -1:
! 79: printf("Error:: poll() #%d - %s\n", errno, strerror(errno));
! 80: return -1;
! 81: case 0:
! 82: ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000);
! 83: return -1;
! 84: }
! 85: if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))
! 86: return -1;
! 87:
! 88: siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0);
! 89: if (siz == -1) {
! 90: printf("Error:: recv() #%d - %s\n", errno, strerror(errno));
! 91: return -1;
! 92: } else
! 93: ioVERBOSE(3) printf("Received %d bytes\n", siz);
! 94: if (!siz)
! 95: return -1;
! 96:
! 97: /* QoS == MQTT_QOS_ACK, wait for PUBACK */
! 98: if (args->QoS == MQTT_QOS_ACK)
! 99: return mqtt_readPUBACK(args->msg);
! 100:
! 101: /* QoS == MQTT_QOS_EXACTLY */
! 102: if (mqtt_readPUBREC(args->msg) != args->MsgID) {
! 103: printf("Error:: Message not delivered\n");
! 104: return -1;
! 105: }
! 106:
! 107: siz = mqtt_msgPUBREL(args->msg, args->MsgID);
! 108: if (siz == -1) {
! 109: printf("Error:: msgPUBLISH #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
! 110: return -1;
! 111: }
! 112:
! 113: siz = send(sock, args->msg->msg_base, siz, 0);
! 114: if (siz == -1) {
! 115: printf("Error:: send() #%d - %s\n", errno, strerror(errno));
! 116: return -1;
! 117: } else
! 118: ioVERBOSE(3) printf("Sended PUBLISH %d bytes\n", siz);
! 119:
! 120: pfd.events = POLLIN | POLLPRI;
! 121: switch (poll(&pfd, 1, args->ka * 1000)) {
! 122: case -1:
! 123: printf("Error:: poll() #%d - %s\n", errno, strerror(errno));
! 124: return -1;
! 125: case 0:
! 126: ioVERBOSE(3) printf("Timeout reached (%d) ...\n", args->ka * 1000);
! 127: return -1;
! 128: }
! 129: if (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))
! 130: return -1;
! 131:
! 132: siz = recv(sock, args->msg->msg_base, args->msg->msg_len, 0);
! 133: if (siz == -1) {
! 134: printf("Error:: recv() #%d - %s\n", errno, strerror(errno));
! 135: return -1;
! 136: } else
! 137: ioVERBOSE(3) printf("Received %d bytes\n", siz);
! 138: if (!siz)
! 139: return -1;
! 140:
! 141: return mqtt_readPUBCOMP(args->msg);
1.1.2.1 misho 142: }
143:
144:
145: int
146: main(int argc, char **argv)
147: {
148: char ch;
149: ait_val_t val;
150: u_short port = atoi(MQTT_PORT);
151: int sock, ret = 0;
152:
153: if (!(args = malloc(sizeof(struct tagArgs)))) {
154: printf("Error:: in alloc arguments #%d - %s\n", errno, strerror(errno));
155: return 1;
156: } else
157: memset(args, 0, sizeof(struct tagArgs));
158: args->free = cleanArgs;
159:
160: if (!(args->msg = mqtt_msgAlloc(USHRT_MAX))) {
161: printf("Error:: in mqtt buffer #%d - %s\n", mqtt_GetErrno(), mqtt_GetError());
162: args->free(args);
163: free(args);
164: return 1;
165: }
166:
167: AIT_SET_STR(&args->ConnID, "");
168: AIT_SET_STR(&args->User, "");
169: AIT_SET_STR(&args->Pass, "");
170:
171: args->ka = MQTT_KEEPALIVE;
1.1.2.5 ! misho 172: while ((ch = getopt(argc, argv, "T:U:P:p:q:drCW:M:m:fvh")) != -1)
1.1.2.1 misho 173: switch (ch) {
174: case 'T':
175: args->ka = (u_short) strtol(optarg, NULL, 0);
176: break;
177: case 'M':
178: AIT_FREE_VAL(&args->Will.Msg);
179: AIT_SET_STR(&args->Will.Msg, optarg);
180: break;
181: case 'W':
182: AIT_FREE_VAL(&args->Will.Topic);
183: AIT_SET_STR(&args->Will.Topic, optarg);
184: break;
185: case 'U':
186: AIT_FREE_VAL(&args->User);
187: AIT_SET_STR(&args->User, optarg);
188: break;
189: case 'P':
190: AIT_FREE_VAL(&args->Pass);
191: AIT_SET_STR(&args->Pass, optarg);
192: break;
1.1.2.5 ! misho 193: case 'm':
! 194: args->MsgID = (u_short) strtol(optarg, NULL, 0);
! 195: break;
1.1.2.1 misho 196: case 'p':
197: port = (u_short) strtol(optarg, NULL, 0);
198: break;
199: case 'q':
200: args->QoS = (char) strtol(optarg, NULL, 0);
201: if (args->QoS > MQTT_QOS_EXACTLY) {
202: printf("Error:: invalid QoS level %d\n", args->QoS);
203: args->free(args);
204: free(args);
205: return 1;
206: }
207: break;
208: case 'd':
209: args->Dup++;
210: break;
211: case 'r':
212: args->Retain++;
213: break;
214: case 'C':
215: args->notClear++;
216: break;
217: case 'f':
218: args->isFile++;
219: break;
220: case 'v':
221: io_incDebug;
222: break;
223: case 'h':
224: default:
225: args->free(args);
226: free(args);
227: Usage();
228: return 1;
229: }
230: argc -= optind;
231: argv += optind;
232: if (argc < 4) {
233: printf("Error:: host for connect not found, connection id, topic or value not supplied!\n\n");
234: args->free(args);
235: free(args);
236: Usage();
237: return 1;
238: } else {
239: AIT_FREE_VAL(&args->ConnID);
240: AIT_SET_STR(&args->ConnID, argv[1]);
241: AIT_FREE_VAL(&args->Publish);
242: AIT_SET_STR(&args->Publish, argv[2]);
243: AIT_FREE_VAL(&args->Value);
244: AIT_SET_STR(&args->Value, argv[3]);
245: }
246: if (!io_gethostbyname(*argv, port, &args->addr)) {
247: printf("Error:: host not valid #%d - %s\n", io_GetErrno(), io_GetError());
248: args->free(args);
249: free(args);
250: Usage();
251: return 1;
252: }
1.1.2.5 ! misho 253: if (args->QoS && !args->MsgID)
! 254: args->MsgID = MQTT_DEFAULT_MSGID;
1.1.2.1 misho 255: ioVERBOSE(1) printf("Connecting to %s:%d ...\n", io_n2addr(&args->addr, &val), io_n2port(&args->addr));
256:
257: if ((sock = InitClient()) == -1) {
258: args->free(args);
259: free(args);
260: return 2;
261: }
262:
1.1.2.4 misho 263: if (args->isFile && !OpenFile()) {
264: args->free(args);
265: free(args);
266: return 3;
267: }
268:
1.1.2.1 misho 269: printf("Connected ... ");
1.1.2.3 misho 270: switch ((ret = ConnectClient(sock))) {
1.1.2.1 misho 271: case -1:
272: printf(">> FAILED!\n");
273: break;
274: case MQTT_RETCODE_ACCEPTED:
275: printf(">> OK\n");
276: break;
277: case MQTT_RETCODE_REFUSE_VER:
278: printf(">> Incorrect version\n");
279: break;
280: case MQTT_RETCODE_REFUSE_ID:
281: printf(">> Incorrect connectID\n");
282: break;
283: case MQTT_RETCODE_REFUSE_UNAVAIL:
284: printf(">> Service unavailable\n");
285: break;
286: case MQTT_RETCODE_REFUSE_USERPASS:
287: printf(">> Refuse user/pass\n");
288: break;
289: case MQTT_RETCODE_DENIED:
290: printf(">> DENIED.\n");
291: break;
292: }
293:
294: if (ret == MQTT_RETCODE_ACCEPTED) {
1.1.2.5 ! misho 295: ret = !(Publish(sock) == args->MsgID);
1.1.2.3 misho 296: CloseClient(sock);
297: } else {
298: close(sock);
1.1.2.4 misho 299: ret = 4;
1.1.2.3 misho 300: }
1.1.2.1 misho 301:
1.1.2.4 misho 302: CloseFile();
1.1.2.1 misho 303: args->free(args);
304: free(args);
305: return ret;
306: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>