File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / bmon / src / in_distribution.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Feb 21 22:19:56 2012 UTC (12 years, 3 months ago) by misho
Branches: bmon, MAIN
CVS tags: v2_1_0p0, v2_1_0, HEAD
bmon

/*
 * in_distribution.c     Distribution Input
 *
 * Copyright (c) 2001-2004 Thomas Graf <tgraf@suug.ch>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the "Software"),
 * to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
 * and/or sell copies of the Software, and to permit persons to whom the
 * Software is furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included
 * in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 * DEALINGS IN THE SOFTWARE.
 */

#include <sys/ioctl.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <netdb.h>
#include <inttypes.h>
#include <sys/socket.h>
#include <net/if.h>

#include <bmon/bmon.h>
#include <bmon/input.h>
#include <bmon/node.h>
#include <bmon/item.h>
#include <bmon/distribution.h>
#include <bmon/utils.h>

#ifdef HAVE_SYS_SOCKIO_H
#include <sys/sockio.h>
#endif

static int recv_fd = -1;
static char *c_port = "2048";
static int c_port_int = 2048;
static char *c_ip = NULL;
static int c_ipv6 = 0;
static int c_max_read = 10;
static int c_bufsize = 8192;
static int c_debug = 0;
static int c_multicast = 0;
static int c_bind = 1;
static char *c_iface = NULL;
static char *buf;

static int join_multicast4(int fd, struct sockaddr_in *addr, const char *iface)
{
	struct ip_mreq mreq;
	struct ifreq ifreq;
	unsigned char loop = 0;

	memcpy(&mreq.imr_multiaddr, &addr->sin_addr, sizeof(struct sockaddr_in));
	strncpy(ifreq.ifr_name, iface, IFNAMSIZ);

	if (iface) {
		if (ioctl(fd, SIOCGIFADDR, &ifreq) < 0)
			return -1;

		memcpy(&mreq.imr_interface,
			&((struct sockaddr_in *) &ifreq.ifr_addr)->sin_addr,
			sizeof(struct sockaddr_in));
	} else
		mreq.imr_interface.s_addr = htonl(INADDR_ANY);
	
	if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0)
		return -1;

	return setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop));
}

static int join_multicast6(int fd, struct sockaddr_in6 *addr, const char *iface)
{
	struct ipv6_mreq mreq6;
	unsigned char loop = 0;

	memcpy(&mreq6.ipv6mr_multiaddr, &addr->sin6_addr, sizeof(struct sockaddr_in6));

	if (iface) {
		return -1; /* XXX: unsupported atm */
	} else
		mreq6.ipv6mr_interface = 0;

	if (setsockopt(fd, IPPROTO_IPV6, IPV6_JOIN_GROUP, &mreq6, sizeof(mreq6)) < 0)
		return -1;

	return setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop, sizeof(loop));
}

static int join_multicast(int fd, struct sockaddr *sa, const char *iface)
{
	switch (sa->sa_family) {
		case AF_INET:
			return join_multicast4(fd, (struct sockaddr_in *) sa, iface);

		case AF_INET6:
			return join_multicast6(fd, (struct sockaddr_in6 *) sa, iface);
	}

	return -1;
}

static int distribution_probe(void)
{
	if (c_ip) {
		int err;
		char s[INET6_ADDRSTRLEN];
		struct addrinfo hints = {
			.ai_socktype = SOCK_DGRAM,
			.ai_family = c_ipv6 ? PF_INET6 : PF_INET,
		};
		struct addrinfo *res = NULL, *t;

		if (c_ipv6 && !strcmp(c_ip, "224.0.0.1"))
			c_ip = "ff01::1";

		if ((err = getaddrinfo(c_ip, c_port, &hints, &res)) < 0)
			quit("getaddrinfo failed: %s\n", gai_strerror(err));

		for (t = res; t; t = t->ai_next) {
			if (c_debug) {
				const char *x = xinet_ntop(t->ai_addr, s, sizeof(s));
				fprintf(stderr, "Trying %s...", x ? x : "null");
			}
			recv_fd = socket(t->ai_family, t->ai_socktype, 0);

			if (recv_fd < 0) {
				if (c_debug)
					fprintf(stderr, "socket() failed: %s\n", strerror(errno));
				continue;
			}

			if (c_multicast) {
				if (join_multicast(recv_fd, t->ai_addr, c_iface) < 0)
					continue;

				if (!c_bind)
					goto skip_bind;
			}

			if (bind(recv_fd, t->ai_addr, t->ai_addrlen) < 0) {
				if (c_debug)
					fprintf(stderr, "bind() failed: %s\n", strerror(errno));
				continue;
			}
skip_bind:

			if (c_debug)
				fprintf(stderr, "OK\n");

			goto ok;
		}

		fprintf(stderr, "Could not create and connect a datagram " \
			"socket, tried:\n");

		for (t = res; t; t = t->ai_next) {
			const char *x = xinet_ntop(t->ai_addr, s, sizeof(s));
			fprintf(stderr, "\t%s\n", x ? x : "null");
		}

		quit("Last error message was: %s\n", strerror(errno));
	} else {
		if (c_ipv6) {
			struct sockaddr_in6 addr = {
				.sin6_family = AF_INET6,
				.sin6_addr = IN6ADDR_ANY_INIT,
				.sin6_port = htons(c_port_int),
			};

			recv_fd = socket(AF_INET6, SOCK_DGRAM, 0);

			if (recv_fd < 0)
				goto try_4;

			if (bind(recv_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
				goto try_4;

			goto ok;
		}
try_4:
		recv_fd = socket(AF_INET, SOCK_DGRAM, 0);

		if (recv_fd < 0)
			quit("socket creation failed: %s\n", strerror(errno));
		{
			struct sockaddr_in addr = {
				.sin_family = AF_INET,
				.sin_port = htons(c_port_int),
			};

			/* Guess what, NetBSD is fucked up so this can't
			 * be in the initializer */
			addr.sin_addr.s_addr = htonl(INADDR_ANY);

			if (bind(recv_fd, (struct sockaddr *) &addr, sizeof(addr)) < 0)
				quit("bind failed: %s\n", strerror(errno));
		}

		goto ok;
	}

ok:
	{
		int flags;
		
		if ((flags = fcntl(recv_fd, F_GETFL)) < 0)
			quit("fcntl failed: %s\n", strerror(errno));
		
		if (fcntl(recv_fd, F_SETFL, flags | O_NONBLOCK) < 0)
			quit("fcntl failed: %s\n", strerror(errno));

	}

	return 1;
}

static int process_item(struct distr_msg_hdr *hdr, const char *nodename,
			struct distr_msg_item *intf, char *from)
{
	char *intfname;
	int remaining, offset;
	char *desc = NULL;
	uint32_t handle = 0;
	int rx_usage = -1, tx_usage = -1;
	int parent = 0, level = 0, link = 0, index = 0;

	item_t *local_intf, *parent_intf;
	node_t *remote_node;

	intfname = ((char *) intf) + sizeof(*intf);

	if (c_debug)
		fprintf(stderr, "Processing interface %s (offset: %d " \
			"optslen: %d namelen: %d hdrsize: %lu)\n",
			intfname ? intfname : "null", ntohs(intf->i_offset),
			intf->i_optslen, intf->i_namelen, (unsigned long) sizeof(*intf));

	if (intf->i_namelen < 4 || intf->i_namelen > IFNAME_MAX) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (invalid namelen %d)\n",
				intf->i_namelen);
		return -1;
	}

	if ('\0' == *intfname) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (empty linkname)\n");
		return -1;
	}

	index = ntohs(intf->i_index);

	if (intf->i_optslen) {
		offset = sizeof(*intf) + intf->i_namelen;
		remaining = intf->i_optslen;

		while (remaining > 0) {
			struct distr_msg_ifopt *opt;

			opt = (struct distr_msg_ifopt *) (((char *) intf) + offset);

			if (opt->io_len > (remaining - sizeof(*opt))) {
				if (c_debug)
					fprintf(stderr, "Discarding malformed packet (invalid opt len)\n");
				return -1;
			}

			switch (opt->io_type) {
				case IFOPT_HANDLE:
					if (opt->io_len != sizeof(uint32_t)) {
						if (c_debug)
							fprintf(stderr, "Discarding malformed packet " \
								"(invalid opt len for handle)\n");
						return -1;
					}

					handle = ntohl(*(uint32_t *) (((char *) opt) + sizeof (*opt)));
					break;

				case IFOPT_PARENT:
					parent = ntohs(opt->io_pad);
					break;

				case IFOPT_LEVEL:
					level = ntohs(opt->io_pad);
					break;

				case IFOPT_LINK:
					link = ntohs(opt->io_pad);
					break;

				case IFOPT_RX_USAGE:
					if (opt->io_len != sizeof(uint32_t)) {
						if (c_debug)
							fprintf(stderr, "Discarding malformed packet " \
								"(invalid opt len for rx usage)\n");
						return -1;
					}

					rx_usage = ntohl(*(uint32_t *) (((char *)opt) + sizeof (*opt)));
					break;

				case IFOPT_TX_USAGE:
					if (opt->io_len != sizeof(uint32_t)) {
						if (c_debug)
							fprintf(stderr, "Discarding malformed packet " \
								"(invalid opt len for tx usage)\n");
						return -1;
					}

					tx_usage = ntohl(*(uint32_t *) (((char *)opt) + sizeof (*opt)));
					break;

				case IFOPT_DESC:
					if (opt->io_len <= 0) {
						if (c_debug)
							fprintf(stderr, "Discarding malformed packet " \
								"(invalid opt len for description)\n");
						return -1;
					}

					desc = ((char *) opt) + sizeof(*opt);
					break;
			}

			remaining -= (sizeof(*opt) + opt->io_len);
			offset += (sizeof(*opt) + opt->io_len);
		}
	
		if (remaining < 0)
			if (c_debug)
				fprintf(stderr, "Leftover from options: %d\n", abs(remaining));
	}

	remote_node = lookup_node(nodename, 1);

	if (NULL == remote_node) {
		if (c_debug)
			fprintf(stderr, "Could not create node entry for remote node\n");
		return -1;
	}

	if (remote_node->n_from)
		xfree((void *) remote_node->n_from);
	remote_node->n_from = strdup(from);

	if (ntohs(intf->i_flags) & IF_IS_CHILD) {
		parent_intf = get_item(remote_node, parent);
		if (parent_intf == NULL) {
			if (c_debug)
				fprintf(stderr, "Could not find parent interface for remote interface\n");
			return -1;
		}
	} else
		parent_intf = NULL;

	local_intf = lookup_item(remote_node, intfname, handle, parent_intf);
	if (local_intf == NULL) {
		if (c_debug)
			fprintf(stderr, "Could not crate interface for remote interface\n");
		return -1;
	}

	if (local_intf->i_flags & ITEM_FLAG_LOCAL) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet " \
					"(about to overwrite a local item)\n");
		return -1;
	}

	local_intf->i_major_attr = BYTES;
	local_intf->i_minor_attr = PACKETS;

	local_intf->i_rx_usage = rx_usage;
	local_intf->i_tx_usage = tx_usage;

	if (desc) {
		if (local_intf->i_desc && strcmp(local_intf->i_desc, desc)) {
			free(local_intf->i_desc);
			local_intf->i_desc = NULL;
		}

		if (local_intf->i_desc == NULL)
			local_intf->i_desc = strdup(desc);
	}
		
	offset = sizeof(*intf) + intf->i_optslen + intf->i_namelen;
	remaining = ntohs(intf->i_offset) - offset;

	while (remaining > 0) {
		struct distr_msg_attr *attr;
		uint64_t rx, tx;
		int type;
		attr = (struct distr_msg_attr *) (((char *) intf) + offset);

		type = ntohs(attr->a_type);
		rx = xntohll(attr->a_rx);
		tx = xntohll(attr->a_tx);

		if (c_debug)
			fprintf(stderr, "Attribute type %d %" PRIu64 " %" PRIu64 "\n",
				type, rx, tx);

		if (type >= ATTR_MAX)
			goto skip;

		if (1) {
			int aflags = ntohs(attr->a_flags);
			int flags = (aflags & ATTR_RX_PROVIDED ? RX_PROVIDED : 0) |
				    (aflags & ATTR_TX_PROVIDED ? TX_PROVIDED : 0);

			update_attr(local_intf, type, rx, tx, flags);
		}

skip:
		remaining -= sizeof(*attr);
		offset += sizeof(*attr);
	}

	if (ntohs(intf->i_flags) & IF_IS_CHILD)
		local_intf->i_flags |= ITEM_FLAG_IS_CHILD;
	local_intf->i_level = level;
	local_intf->i_link = link;

	if (1) {
		timestamp_t ts = {
			.tv_sec = ntohl(hdr->h_ts_sec),
			.tv_usec = ntohl(hdr->h_ts_usec)
		};

		notify_update(local_intf, &ts);
	}

	increase_lifetime(local_intf, 1);

	if (remaining < 0)
		if (c_debug)
			fprintf(stderr, "Leftover from attributes: %d\n", abs(remaining));

	return 0;
}

static void process_group(struct distr_msg_hdr *hdr, const char *nodename,
			  struct distr_msg_grp *grp, char *from)
{
	int remaining, offset;
	int grpoffset;
	
	if (c_debug)
		fprintf(stderr, "Processing group (type:%d offset:%d)\n",
			ntohs(grp->g_type), ntohs(grp->g_offset));

	if (ntohs(grp->g_type) != BMON_GRP_IF) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (invalid group type)\n");
		return;
	}

	grpoffset = ntohs(grp->g_offset);

	if (grpoffset < sizeof(*grp) || grpoffset > ntohs(hdr->h_len)) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (invalid group offset)\n");
		return;
	}

	offset = sizeof(*grp);
	remaining = grpoffset - offset;

	while (remaining > 0) {
		struct distr_msg_item *intf = (struct distr_msg_item *) (((char *) grp) + offset);
		int ioff = ntohs(intf->i_offset);

		if (ioff < (sizeof(*intf) + 4) || ioff > ntohs(hdr->h_len)) {
			if (c_debug)
				fprintf(stderr, "Discarding malformed packet (interface offset too short)\n");
			return;
		}

		if (ioff > remaining) {
			if (c_debug)
				fprintf(stderr, "Discarding malformed packet (unexpected group end)\n");
			return;
		}

		if (process_item(hdr, nodename, intf, from) < 0)
			return;

		remaining -= ioff;
		offset += ioff;
	}

	if (remaining < 0)
		if (c_debug)
			fprintf(stderr, "Leftover from group: %d\n", abs(remaining));
}

static void process_msg(struct distr_msg_hdr *hdr, char *from)
{
	char *nodename;
	struct distr_msg_grp *group;
	
	if (c_debug)
		fprintf(stderr, "Processing message from %s (len=%d)\n",
			from, ntohs(hdr->h_len));

	nodename = ((char *) hdr) + sizeof(*hdr);
	group = (struct distr_msg_grp *) (((char *) hdr) + hdr->h_offset);

	if ('\0' == *nodename) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (empty nodename)\n");
		return;
	}
		

	process_group(hdr, nodename, group, from);
}

static void process_data(char *buf, int len, char *from)
{
	struct distr_msg_hdr *hdr = (struct distr_msg_hdr *) buf;

	if (len < sizeof(*hdr)) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (hdrcheck)\n");
		return;
	}

	if (hdr->h_magic != BMON_MAGIC) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (magic mismatch)\n");
		return;
	}

	if (hdr->h_ver != BMON_VERSION) {
		if (c_debug)
			fprintf(stderr, "Discarding incompatible packet (version mismatch)\n");
		return;
	}

	if (ntohs(hdr->h_len) < len) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (packet size mismatch)\n");
		return;
	}

	if (hdr->h_offset < (sizeof(*hdr) + 4)) {
		if (c_debug)
			fprintf(stderr, "Discarding malformed packet (offset too short)\n");
		return;
	}

	process_msg(hdr, from);
}

static void distribution_read(void)
{
	int i, n;
	struct sockaddr_in6 addr;
	socklen_t len = sizeof(addr);
	char addrstr[INET6_ADDRSTRLEN];

	memset(&addr, 0, sizeof(addr));
	memset(buf, 0, c_bufsize);

	for (i = 0; i < c_max_read; i++) {
		n = recvfrom(recv_fd, buf, c_bufsize, 0,
			(struct sockaddr *) &addr, &len);

		if (n < 0) {
			if (EAGAIN == errno)
				return;
			else
				quit("recvfrom failed: %s\n", strerror(errno));
		}

		if (addr.sin6_family == AF_INET) {
			struct sockaddr_in *in4 = (struct sockaddr_in *) &addr;
			inet_ntop(AF_INET, (void *) &in4->sin_addr,
				addrstr, len);
		} else if (addr.sin6_family == AF_INET6) {
			inet_ntop(AF_INET6, (void *) &addr.sin6_addr,
				addrstr, len);
		}

		if (c_debug)
			fprintf(stderr, "Read %d bytes from %s\n", n, addrstr);

		process_data(buf, n, addrstr);
	}

	return;
}

static void distribution_init(void)
{
	buf = xcalloc(1, c_bufsize);
}

static void print_help(void)
{
	printf(
	"Distribution - Collects statistics from other nodes\n" \
	"\n" \
	"  Collects statistics from other nodes using the distribution\n" \
	"  secondary output method (-O distribution).\n" \
	"\n" \
	"  Author: Thomas Graf <tgraf@suug.ch>\n" \
	"\n" \
	"  Options:\n" \
	"    ip=ADDR            Only process messages from this address (default: none)\n" \
	"    port=NUM           Port the messages are comming from (default: 2048)\n" \
	"    ipv6               Prefer IPv6 when creating sockets\n" \
	"    multicast[=ADDR]   Use multicast to collect statistics\n" \
	"    intf=NAME          Bind multicast socket to given interface\n" \
	"    nobind             Don't bind, receive multicast and unicast messages\n" \
	"    max_read=NUM       Max. reads during one read interval (default: 10)\n" \
	"    bufsize=NUM        Size of receive buffer (default: 8192)\n" \
	"    debug              Print verbose message for debugging\n" \
	"    help               Print this help text\n");
}

static void distribution_set_opts(tv_t *attrs)
{
	while (attrs) {
		if (!strcasecmp(attrs->type, "port") && attrs->value) {
			c_port = attrs->value;
			c_port_int = strtol(c_port, NULL, 0);
		} else if (!strcasecmp(attrs->type, "ip") && attrs->value)
			c_ip = attrs->value;
		else if (!strcasecmp(attrs->type, "ipv6"))
			c_ipv6 = 1;
		else if (!strcasecmp(attrs->type, "max_read") && attrs->value)
			c_max_read = strtol(attrs->value, NULL, 0);
		else if (!strcasecmp(attrs->type, "bufsize") && attrs->value)
			c_bufsize = strtol(attrs->value, NULL, 0);
		else if (!strcasecmp(attrs->type, "debug"))
			c_debug = 1;
		else if (!strcasecmp(attrs->type, "multicast")) {
			if (attrs->value)
				c_ip = attrs->value;
			else
				c_ip = "224.0.0.1";
		} else if (!strcasecmp(attrs->type, "nobind"))
			c_bind = 0;
		else if (!strcasecmp(attrs->type, "intf") && attrs->value)
			c_iface = attrs->value;
		else if (!strcasecmp(attrs->type, "help")) {
			print_help();
			exit(0);
		}
		attrs = attrs->next;
	}
}

static struct input_module distribution_ops = {
	.im_name = "distribution",
	.im_set_opts = distribution_set_opts,
	.im_read = distribution_read,
	.im_probe = distribution_probe,
	.im_init = distribution_init,
};

static void __init do_distribution_init(void)
{
	register_secondary_input_module(&distribution_ops);
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>