Annotation of embedaddon/bmon/src/in_distribution.c, revision 1.1.1.1
1.1 misho 1: /*
2: * in_distribution.c Distribution Input
3: *
4: * Copyright (c) 2001-2004 Thomas Graf <tgraf@suug.ch>
5: *
6: * Permission is hereby granted, free of charge, to any person obtaining a
7: * copy of this software and associated documentation files (the "Software"),
8: * to deal in the Software without restriction, including without limitation
9: * the rights to use, copy, modify, merge, publish, distribute, sublicense,
10: * and/or sell copies of the Software, and to permit persons to whom the
11: * Software is furnished to do so, subject to the following conditions:
12: *
13: * The above copyright notice and this permission notice shall be included
14: * in all copies or substantial portions of the Software.
15: *
16: * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
17: * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18: * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19: * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20: * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21: * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22: * DEALINGS IN THE SOFTWARE.
23: */
24:
25: #include <sys/ioctl.h>
26: #include <fcntl.h>
27: #include <netinet/in.h>
28: #include <netdb.h>
29: #include <inttypes.h>
30: #include <sys/socket.h>
31: #include <net/if.h>
32:
33: #include <bmon/bmon.h>
34: #include <bmon/input.h>
35: #include <bmon/node.h>
36: #include <bmon/item.h>
37: #include <bmon/distribution.h>
38: #include <bmon/utils.h>
39:
40: #ifdef HAVE_SYS_SOCKIO_H
41: #include <sys/sockio.h>
42: #endif
43:
44: static int recv_fd = -1;
45: static char *c_port = "2048";
46: static int c_port_int = 2048;
47: static char *c_ip = NULL;
48: static int c_ipv6 = 0;
49: static int c_max_read = 10;
50: static int c_bufsize = 8192;
51: static int c_debug = 0;
52: static int c_multicast = 0;
53: static int c_bind = 1;
54: static char *c_iface = NULL;
55: static char *buf;
56:
57: static int join_multicast4(int fd, struct sockaddr_in *addr, const char *iface)
58: {
59: struct ip_mreq mreq;
60: struct ifreq ifreq;
61: unsigned char loop = 0;
62:
63: memcpy(&mreq.imr_multiaddr, &addr->sin_addr, sizeof(struct sockaddr_in));
64: strncpy(ifreq.ifr_name, iface, IFNAMSIZ);
65:
66: if (iface) {
67: if (ioctl(fd, SIOCGIFADDR, &ifreq) < 0)
68: return -1;
69:
70: memcpy(&mreq.imr_interface,
71: &((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr,
72: sizeof(struct sockaddr_in));
73: } else
74: mreq.imr_interface.s_addr = htonl(INADDR_ANY);
75:
76: if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
77: return -1;
78:
79: return setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
80: }
81:
82: static int join_multicast6(int fd, struct sockaddr_in6 *addr, const char *iface)
83: {
84: struct ipv6_mreq mreq6;
85: unsigned char loop = 0;
86:
87: memcpy(&mreq6.ipv6mr_multiaddr, &addr->sin6_addr, sizeof(struct sockaddr_in6));
88:
89: if (iface) {
90: return -1; /* XXX: unsupported atm */
91: } else
92: mreq6.ipv6mr_interface = 0;
93:
94: if (setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq6, sizeof(mreq6)) < 0)
95: return -1;
96:
97: return setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop, sizeof(loop));
98: }
99:
100: static int join_multicast(int fd, struct sockaddr *sa, const char *iface)
101: {
102: switch (sa->sa_family) {
103: case AF_INET:
104: return join_multicast4(fd, (struct sockaddr_in *) sa, iface);
105:
106: case AF_INET6:
107: return join_multicast6(fd, (struct sockaddr_in6 *) sa, iface);
108: }
109:
110: return -1;
111: }
112:
113: static int distribution_probe(void)
114: {
115: if (c_ip) {
116: int err;
117: char s[INET6_ADDRSTRLEN];
118: struct addrinfo hints = {
119: .ai_socktype = SOCK_DGRAM,
120: .ai_family = c_ipv6 ? PF_INET6 : PF_INET,
121: };
122: struct addrinfo *res = NULL, *t;
123:
124: if (c_ipv6 && !strcmp(c_ip, "224.0.0.1"))
125: c_ip = "ff01::1";
126:
127: if ((err = getaddrinfo(c_ip, c_port, &hints, &res)) < 0)
128: quit("getaddrinfo failed: %s\n", gai_strerror(err));
129:
130: for (t = res; t; t = t->ai_next) {
131: if (c_debug) {
132: const char *x = xinet_ntop(t->ai_addr, s, sizeof(s));
133: fprintf(stderr, "Trying %s...", x ? x : "null");
134: }
135: recv_fd = socket(t->ai_family, t->ai_socktype, 0);
136:
137: if (recv_fd < 0) {
138: if (c_debug)
139: fprintf(stderr, "socket() failed: %s\n", strerror(errno));
140: continue;
141: }
142:
143: if (c_multicast) {
144: if (join_multicast(recv_fd, t->ai_addr, c_iface) < 0)
145: continue;
146:
147: if (!c_bind)
148: goto skip_bind;
149: }
150:
151: if (bind(recv_fd, t->ai_addr, t->ai_addrlen) < 0) {
152: if (c_debug)
153: fprintf(stderr, "bind() failed: %s\n", strerror(errno));
154: continue;
155: }
156: skip_bind:
157:
158: if (c_debug)
159: fprintf(stderr, "OK\n");
160:
161: goto ok;
162: }
163:
164: fprintf(stderr, "Could not create and connect a datagram " \
165: "socket, tried:\n");
166:
167: for (t = res; t; t = t->ai_next) {
168: const char *x = xinet_ntop(t->ai_addr, s, sizeof(s));
169: fprintf(stderr, "\t%s\n", x ? x : "null");
170: }
171:
172: quit("Last error message was: %s\n", strerror(errno));
173: } else {
174: if (c_ipv6) {
175: struct sockaddr_in6 addr = {
176: .sin6_family = AF_INET6,
177: .sin6_addr = IN6ADDR_ANY_INIT,
178: .sin6_port = htons(c_port_int),
179: };
180:
181: recv_fd = socket(AF_INET6, SOCK_DGRAM, 0);
182:
183: if (recv_fd < 0)
184: goto try_4;
185:
186: if (bind(recv_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
187: goto try_4;
188:
189: goto ok;
190: }
191: try_4:
192: recv_fd = socket(AF_INET, SOCK_DGRAM, 0);
193:
194: if (recv_fd < 0)
195: quit("socket creation failed: %s\n", strerror(errno));
196: {
197: struct sockaddr_in addr = {
198: .sin_family = AF_INET,
199: .sin_port = htons(c_port_int),
200: };
201:
202: /* Guess what, NetBSD is fucked up so this can't
203: * be in the initializer */
204: addr.sin_addr.s_addr = htonl(INADDR_ANY);
205:
206: if (bind(recv_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
207: quit("bind failed: %s\n", strerror(errno));
208: }
209:
210: goto ok;
211: }
212:
213: ok:
214: {
215: int flags;
216:
217: if ((flags = fcntl(recv_fd, F_GETFL)) < 0)
218: quit("fcntl failed: %s\n", strerror(errno));
219:
220: if (fcntl(recv_fd, F_SETFL, flags | O_NONBLOCK) < 0)
221: quit("fcntl failed: %s\n", strerror(errno));
222:
223: }
224:
225: return 1;
226: }
227:
228: static int process_item(struct distr_msg_hdr *hdr, const char *nodename,
229: struct distr_msg_item *intf, char *from)
230: {
231: char *intfname;
232: int remaining, offset;
233: char *desc = NULL;
234: uint32_t handle = 0;
235: int rx_usage = -1, tx_usage = -1;
236: int parent = 0, level = 0, link = 0, index = 0;
237:
238: item_t *local_intf, *parent_intf;
239: node_t *remote_node;
240:
241: intfname = ((char *) intf) + sizeof(*intf);
242:
243: if (c_debug)
244: fprintf(stderr, "Processing interface %s (offset: %d " \
245: "optslen: %d namelen: %d hdrsize: %lu)\n",
246: intfname ? intfname : "null", ntohs(intf->i_offset),
247: intf->i_optslen, intf->i_namelen, (unsigned long) sizeof(*intf));
248:
249: if (intf->i_namelen < 4 || intf->i_namelen > IFNAME_MAX) {
250: if (c_debug)
251: fprintf(stderr, "Discarding malformed packet (invalid namelen %d)\n",
252: intf->i_namelen);
253: return -1;
254: }
255:
256: if ('\0' == *intfname) {
257: if (c_debug)
258: fprintf(stderr, "Discarding malformed packet (empty linkname)\n");
259: return -1;
260: }
261:
262: index = ntohs(intf->i_index);
263:
264: if (intf->i_optslen) {
265: offset = sizeof(*intf) + intf->i_namelen;
266: remaining = intf->i_optslen;
267:
268: while (remaining > 0) {
269: struct distr_msg_ifopt *opt;
270:
271: opt = (struct distr_msg_ifopt *) (((char *) intf) + offset);
272:
273: if (opt->io_len > (remaining - sizeof(*opt))) {
274: if (c_debug)
275: fprintf(stderr, "Discarding malformed packet (invalid opt len)\n");
276: return -1;
277: }
278:
279: switch (opt->io_type) {
280: case IFOPT_HANDLE:
281: if (opt->io_len != sizeof(uint32_t)) {
282: if (c_debug)
283: fprintf(stderr, "Discarding malformed packet " \
284: "(invalid opt len for handle)\n");
285: return -1;
286: }
287:
288: handle = ntohl(*(uint32_t *) (((char *) opt) + sizeof (*opt)));
289: break;
290:
291: case IFOPT_PARENT:
292: parent = ntohs(opt->io_pad);
293: break;
294:
295: case IFOPT_LEVEL:
296: level = ntohs(opt->io_pad);
297: break;
298:
299: case IFOPT_LINK:
300: link = ntohs(opt->io_pad);
301: break;
302:
303: case IFOPT_RX_USAGE:
304: if (opt->io_len != sizeof(uint32_t)) {
305: if (c_debug)
306: fprintf(stderr, "Discarding malformed packet " \
307: "(invalid opt len for rx usage)\n");
308: return -1;
309: }
310:
311: rx_usage = ntohl(*(uint32_t *) (((char *)opt) + sizeof (*opt)));
312: break;
313:
314: case IFOPT_TX_USAGE:
315: if (opt->io_len != sizeof(uint32_t)) {
316: if (c_debug)
317: fprintf(stderr, "Discarding malformed packet " \
318: "(invalid opt len for tx usage)\n");
319: return -1;
320: }
321:
322: tx_usage = ntohl(*(uint32_t *) (((char *)opt) + sizeof (*opt)));
323: break;
324:
325: case IFOPT_DESC:
326: if (opt->io_len <= 0) {
327: if (c_debug)
328: fprintf(stderr, "Discarding malformed packet " \
329: "(invalid opt len for description)\n");
330: return -1;
331: }
332:
333: desc = ((char *) opt) + sizeof(*opt);
334: break;
335: }
336:
337: remaining -= (sizeof(*opt) + opt->io_len);
338: offset += (sizeof(*opt) + opt->io_len);
339: }
340:
341: if (remaining < 0)
342: if (c_debug)
343: fprintf(stderr, "Leftover from options: %d\n", abs(remaining));
344: }
345:
346: remote_node = lookup_node(nodename, 1);
347:
348: if (NULL == remote_node) {
349: if (c_debug)
350: fprintf(stderr, "Could not create node entry for remote node\n");
351: return -1;
352: }
353:
354: if (remote_node->n_from)
355: xfree((void *) remote_node->n_from);
356: remote_node->n_from = strdup(from);
357:
358: if (ntohs(intf->i_flags) & IF_IS_CHILD) {
359: parent_intf = get_item(remote_node, parent);
360: if (parent_intf == NULL) {
361: if (c_debug)
362: fprintf(stderr, "Could not find parent interface for remote interface\n");
363: return -1;
364: }
365: } else
366: parent_intf = NULL;
367:
368: local_intf = lookup_item(remote_node, intfname, handle, parent_intf);
369: if (local_intf == NULL) {
370: if (c_debug)
371: fprintf(stderr, "Could not crate interface for remote interface\n");
372: return -1;
373: }
374:
375: if (local_intf->i_flags & ITEM_FLAG_LOCAL) {
376: if (c_debug)
377: fprintf(stderr, "Discarding malformed packet " \
378: "(about to overwrite a local item)\n");
379: return -1;
380: }
381:
382: local_intf->i_major_attr = BYTES;
383: local_intf->i_minor_attr = PACKETS;
384:
385: local_intf->i_rx_usage = rx_usage;
386: local_intf->i_tx_usage = tx_usage;
387:
388: if (desc) {
389: if (local_intf->i_desc && strcmp(local_intf->i_desc, desc)) {
390: free(local_intf->i_desc);
391: local_intf->i_desc = NULL;
392: }
393:
394: if (local_intf->i_desc == NULL)
395: local_intf->i_desc = strdup(desc);
396: }
397:
398: offset = sizeof(*intf) + intf->i_optslen + intf->i_namelen;
399: remaining = ntohs(intf->i_offset) - offset;
400:
401: while (remaining > 0) {
402: struct distr_msg_attr *attr;
403: uint64_t rx, tx;
404: int type;
405: attr = (struct distr_msg_attr *) (((char *) intf) + offset);
406:
407: type = ntohs(attr->a_type);
408: rx = xntohll(attr->a_rx);
409: tx = xntohll(attr->a_tx);
410:
411: if (c_debug)
412: fprintf(stderr, "Attribute type %d %" PRIu64 " %" PRIu64 "\n",
413: type, rx, tx);
414:
415: if (type >= ATTR_MAX)
416: goto skip;
417:
418: if (1) {
419: int aflags = ntohs(attr->a_flags);
420: int flags = (aflags & ATTR_RX_PROVIDED ? RX_PROVIDED : 0) |
421: (aflags & ATTR_TX_PROVIDED ? TX_PROVIDED : 0);
422:
423: update_attr(local_intf, type, rx, tx, flags);
424: }
425:
426: skip:
427: remaining -= sizeof(*attr);
428: offset += sizeof(*attr);
429: }
430:
431: if (ntohs(intf->i_flags) & IF_IS_CHILD)
432: local_intf->i_flags |= ITEM_FLAG_IS_CHILD;
433: local_intf->i_level = level;
434: local_intf->i_link = link;
435:
436: if (1) {
437: timestamp_t ts = {
438: .tv_sec = ntohl(hdr->h_ts_sec),
439: .tv_usec = ntohl(hdr->h_ts_usec)
440: };
441:
442: notify_update(local_intf, &ts);
443: }
444:
445: increase_lifetime(local_intf, 1);
446:
447: if (remaining < 0)
448: if (c_debug)
449: fprintf(stderr, "Leftover from attributes: %d\n", abs(remaining));
450:
451: return 0;
452: }
453:
454: static void process_group(struct distr_msg_hdr *hdr, const char *nodename,
455: struct distr_msg_grp *grp, char *from)
456: {
457: int remaining, offset;
458: int grpoffset;
459:
460: if (c_debug)
461: fprintf(stderr, "Processing group (type:%d offset:%d)\n",
462: ntohs(grp->g_type), ntohs(grp->g_offset));
463:
464: if (ntohs(grp->g_type) != BMON_GRP_IF) {
465: if (c_debug)
466: fprintf(stderr, "Discarding malformed packet (invalid group type)\n");
467: return;
468: }
469:
470: grpoffset = ntohs(grp->g_offset);
471:
472: if (grpoffset < sizeof(*grp) || grpoffset > ntohs(hdr->h_len)) {
473: if (c_debug)
474: fprintf(stderr, "Discarding malformed packet (invalid group offset)\n");
475: return;
476: }
477:
478: offset = sizeof(*grp);
479: remaining = grpoffset - offset;
480:
481: while (remaining > 0) {
482: struct distr_msg_item *intf = (struct distr_msg_item *) (((char *) grp) + offset);
483: int ioff = ntohs(intf->i_offset);
484:
485: if (ioff < (sizeof(*intf) + 4) || ioff > ntohs(hdr->h_len)) {
486: if (c_debug)
487: fprintf(stderr, "Discarding malformed packet (interface offset too short)\n");
488: return;
489: }
490:
491: if (ioff > remaining) {
492: if (c_debug)
493: fprintf(stderr, "Discarding malformed packet (unexpected group end)\n");
494: return;
495: }
496:
497: if (process_item(hdr, nodename, intf, from) < 0)
498: return;
499:
500: remaining -= ioff;
501: offset += ioff;
502: }
503:
504: if (remaining < 0)
505: if (c_debug)
506: fprintf(stderr, "Leftover from group: %d\n", abs(remaining));
507: }
508:
509: static void process_msg(struct distr_msg_hdr *hdr, char *from)
510: {
511: char *nodename;
512: struct distr_msg_grp *group;
513:
514: if (c_debug)
515: fprintf(stderr, "Processing message from %s (len=%d)\n",
516: from, ntohs(hdr->h_len));
517:
518: nodename = ((char *) hdr) + sizeof(*hdr);
519: group = (struct distr_msg_grp *) (((char *) hdr) + hdr->h_offset);
520:
521: if ('\0' == *nodename) {
522: if (c_debug)
523: fprintf(stderr, "Discarding malformed packet (empty nodename)\n");
524: return;
525: }
526:
527:
528: process_group(hdr, nodename, group, from);
529: }
530:
531: static void process_data(char *buf, int len, char *from)
532: {
533: struct distr_msg_hdr *hdr = (struct distr_msg_hdr *) buf;
534:
535: if (len < sizeof(*hdr)) {
536: if (c_debug)
537: fprintf(stderr, "Discarding malformed packet (hdrcheck)\n");
538: return;
539: }
540:
541: if (hdr->h_magic != BMON_MAGIC) {
542: if (c_debug)
543: fprintf(stderr, "Discarding malformed packet (magic mismatch)\n");
544: return;
545: }
546:
547: if (hdr->h_ver != BMON_VERSION) {
548: if (c_debug)
549: fprintf(stderr, "Discarding incompatible packet (version mismatch)\n");
550: return;
551: }
552:
553: if (ntohs(hdr->h_len) < len) {
554: if (c_debug)
555: fprintf(stderr, "Discarding malformed packet (packet size mismatch)\n");
556: return;
557: }
558:
559: if (hdr->h_offset < (sizeof(*hdr) + 4)) {
560: if (c_debug)
561: fprintf(stderr, "Discarding malformed packet (offset too short)\n");
562: return;
563: }
564:
565: process_msg(hdr, from);
566: }
567:
568: static void distribution_read(void)
569: {
570: int i, n;
571: struct sockaddr_in6 addr;
572: socklen_t len = sizeof(addr);
573: char addrstr[INET6_ADDRSTRLEN];
574:
575: memset(&addr, 0, sizeof(addr));
576: memset(buf, 0, c_bufsize);
577:
578: for (i = 0; i < c_max_read; i++) {
579: n = recvfrom(recv_fd, buf, c_bufsize, 0,
580: (struct sockaddr *) &addr, &len);
581:
582: if (n < 0) {
583: if (EAGAIN == errno)
584: return;
585: else
586: quit("recvfrom failed: %s\n", strerror(errno));
587: }
588:
589: if (addr.sin6_family == AF_INET) {
590: struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
591: inet_ntop(AF_INET, (void *) &in4->sin_addr,
592: addrstr, len);
593: } else if (addr.sin6_family == AF_INET6) {
594: inet_ntop(AF_INET6, (void *) &addr.sin6_addr,
595: addrstr, len);
596: }
597:
598: if (c_debug)
599: fprintf(stderr, "Read %d bytes from %s\n", n, addrstr);
600:
601: process_data(buf, n, addrstr);
602: }
603:
604: return;
605: }
606:
607: static void distribution_init(void)
608: {
609: buf = xcalloc(1, c_bufsize);
610: }
611:
612: static void print_help(void)
613: {
614: printf(
615: "Distribution - Collects statistics from other nodes\n" \
616: "\n" \
617: " Collects statistics from other nodes using the distribution\n" \
618: " secondary output method (-O distribution).\n" \
619: "\n" \
620: " Author: Thomas Graf <tgraf@suug.ch>\n" \
621: "\n" \
622: " Options:\n" \
623: " ip=ADDR Only process messages from this address (default: none)\n" \
624: " port=NUM Port the messages are comming from (default: 2048)\n" \
625: " ipv6 Prefer IPv6 when creating sockets\n" \
626: " multicast[=ADDR] Use multicast to collect statistics\n" \
627: " intf=NAME Bind multicast socket to given interface\n" \
628: " nobind Don't bind, receive multicast and unicast messages\n" \
629: " max_read=NUM Max. reads during one read interval (default: 10)\n" \
630: " bufsize=NUM Size of receive buffer (default: 8192)\n" \
631: " debug Print verbose message for debugging\n" \
632: " help Print this help text\n");
633: }
634:
635: static void distribution_set_opts(tv_t *attrs)
636: {
637: while (attrs) {
638: if (!strcasecmp(attrs->type, "port") && attrs->value) {
639: c_port = attrs->value;
640: c_port_int = strtol(c_port, NULL, 0);
641: } else if (!strcasecmp(attrs->type, "ip") && attrs->value)
642: c_ip = attrs->value;
643: else if (!strcasecmp(attrs->type, "ipv6"))
644: c_ipv6 = 1;
645: else if (!strcasecmp(attrs->type, "max_read") && attrs->value)
646: c_max_read = strtol(attrs->value, NULL, 0);
647: else if (!strcasecmp(attrs->type, "bufsize") && attrs->value)
648: c_bufsize = strtol(attrs->value, NULL, 0);
649: else if (!strcasecmp(attrs->type, "debug"))
650: c_debug = 1;
651: else if (!strcasecmp(attrs->type, "multicast")) {
652: if (attrs->value)
653: c_ip = attrs->value;
654: else
655: c_ip = "224.0.0.1";
656: } else if (!strcasecmp(attrs->type, "nobind"))
657: c_bind = 0;
658: else if (!strcasecmp(attrs->type, "intf") && attrs->value)
659: c_iface = attrs->value;
660: else if (!strcasecmp(attrs->type, "help")) {
661: print_help();
662: exit(0);
663: }
664: attrs = attrs->next;
665: }
666: }
667:
668: static struct input_module distribution_ops = {
669: .im_name = "distribution",
670: .im_set_opts = distribution_set_opts,
671: .im_read = distribution_read,
672: .im_probe = distribution_probe,
673: .im_init = distribution_init,
674: };
675:
676: static void __init do_distribution_init(void)
677: {
678: register_secondary_input_module(&distribution_ops);
679: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>