File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / bmon / src / out_distribution.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Feb 21 22:19:56 2012 UTC (13 years, 4 months ago) by misho
Branches: bmon, MAIN
CVS tags: v2_1_0p0, v2_1_0, HEAD
bmon

    1: /*
    2:  * bmon_distr.c            Bandwidth Monitor
    3:  *
    4:  * Copyright (c) 2001-2004 Thomas Graf <tgraf@suug.ch>
    5:  *
    6:  * Permission is hereby granted, free of charge, to any person obtaining a
    7:  * copy of this software and associated documentation files (the "Software"),
    8:  * to deal in the Software without restriction, including without limitation
    9:  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
   10:  * and/or sell copies of the Software, and to permit persons to whom the
   11:  * Software is furnished to do so, subject to the following conditions:
   12:  *
   13:  * The above copyright notice and this permission notice shall be included
   14:  * in all copies or substantial portions of the Software.
   15:  *
   16:  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   17:  * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
   18:  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
   19:  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
   20:  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
   21:  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
   22:  * DEALINGS IN THE SOFTWARE.
   23:  */
   24: 
   25: #include <bmon/bmon.h>
   26: #include <bmon/output.h>
   27: #include <bmon/node.h>
   28: #include <bmon/item.h>
   29: #include <bmon/input.h>
   30: #include <bmon/distribution.h>
   31: #include <bmon/utils.h>
   32: 
   33: #include <netdb.h>
   34: 
   35: static int send_fd;
   36: static int c_ipv6;
   37: static char *c_ip = "224.0.0.1";
   38: static char *c_port = "2048";
   39: static int c_forward = 0;
   40: static int c_errignore = 0;
   41: static int c_debug = 0;
   42: static int c_send_all = 15;
   43: static int send_all_rem = 1;
   44: 
   45: static void * build_opts(item_t *intf, size_t *size)
   46: {
   47: 	void *buf;
   48: 	size_t optsize = 0, off = 0;
   49: 	struct distr_msg_ifopt *ip;
   50: 
   51: 	if (intf->i_handle)
   52: 		optsize += sizeof(*ip) + 4;
   53: 
   54: 	if (intf->i_parent)
   55: 		optsize += sizeof(*ip);
   56: 
   57: 	if (intf->i_link)
   58: 		optsize += sizeof(*ip);
   59: 
   60: 	if (intf->i_level)
   61: 		optsize += sizeof(*ip);
   62: 
   63: 	if (intf->i_rx_usage >= 0)
   64: 		optsize += sizeof(*ip) + 4;
   65: 
   66: 	if (intf->i_tx_usage >= 0)
   67: 		optsize += sizeof(*ip) + 4;
   68: 
   69: 	if (intf->i_desc)
   70: 		optsize += sizeof(*ip) + ((strlen(intf->i_desc)+4) & ~3);
   71: 
   72: 	if (0 == optsize) {
   73: 		*size = 0;
   74: 		return NULL;
   75: 	}
   76: 
   77: 	buf = xcalloc(1, optsize);
   78: 
   79: 	if (intf->i_handle) {
   80: 		ip = buf + off;
   81: 		ip->io_type = IFOPT_HANDLE;
   82: 		ip->io_len = 4;
   83: 		*((uint32_t *) (buf + off + sizeof(*ip)))  = htonl(intf->i_handle);
   84: 		off += sizeof(*ip) + 4;
   85: 	}
   86: 
   87: 	if (intf->i_parent) {
   88: 		ip = buf + off;
   89: 		ip->io_type = IFOPT_PARENT;
   90: 		ip->io_len = 0;
   91: 		ip->io_pad = htons(intf->i_parent);
   92: 		off += sizeof(*ip);
   93: 	}
   94: 
   95: 	if (intf->i_link) {
   96: 		ip = buf + off;
   97: 		ip->io_type = IFOPT_LINK;
   98: 		ip->io_len = 0;
   99: 		ip->io_pad = htons(intf->i_link);
  100: 		off += sizeof(*ip);
  101: 	}
  102: 
  103: 	if (intf->i_level) {
  104: 		ip = buf + off;
  105: 		ip->io_type = IFOPT_LEVEL;
  106: 		ip->io_len = 0;
  107: 		ip->io_pad = htons(intf->i_level);
  108: 		off += sizeof(*ip);
  109: 	}
  110: 
  111: 	if (intf->i_rx_usage >= 0) {
  112: 		ip = buf + off;
  113: 		ip->io_type = IFOPT_RX_USAGE;
  114: 		ip->io_len = 4;
  115: 		*((uint32_t *) (buf+off+sizeof(*ip))) = htonl(intf->i_rx_usage);
  116: 		off += sizeof(*ip) + 4;
  117: 	}
  118: 
  119: 	if (intf->i_tx_usage >= 0) {
  120: 		ip = buf + off;
  121: 		ip->io_type = IFOPT_TX_USAGE;
  122: 		ip->io_len = 4;
  123: 		*((uint32_t *) (buf+off+sizeof(*ip))) = htonl(intf->i_tx_usage);
  124: 		off += sizeof(*ip) + 4;
  125: 	}
  126: 
  127: 	if (intf->i_desc) {
  128: 		ip = buf + off;
  129: 		ip->io_type = IFOPT_DESC;
  130: 		ip->io_len = (strlen(intf->i_desc)+4) & ~3;
  131: 		strcpy(buf+off+sizeof(*ip), intf->i_desc);
  132: 		off += sizeof(*ip) + ip->io_len;
  133: 	}
  134: 
  135: 	*size = optsize;
  136: 	return buf;
  137: }
  138: 
  139: static inline int worth_sending(stat_attr_t *a)
  140: {
  141: 	if (send_all_rem == 0)
  142: 		return 1;
  143: 
  144: 	if (!(a->a_flags & ATTR_FLAG_RX_ENABLED) &&
  145: 	    !(a->a_flags & ATTR_FLAG_TX_ENABLED)) {
  146: 		if (c_debug)
  147: 			fprintf(stderr, "Attribute %s not worth sending, no data\n",
  148: 			    type2name(a->a_type));
  149: 		return 0;
  150: 	}
  151: 
  152: 	if (!attr_get_rx(a) && !attr_get_tx(a)) {
  153: 		if (c_debug)
  154: 			fprintf(stderr, "Attribute %s not worth sending, still 0\n",
  155: 			    type2name(a->a_type));
  156: 		return 0;
  157: 	}
  158: 	
  159: 	if (a->a_last_distribution.tv_sec == a->a_updated.tv_sec) {
  160: 		if (a->a_last_distribution.tv_usec == a->a_updated.tv_usec) {
  161: 			if (c_debug)
  162: 				fprintf(stderr, "Attribute %s not worth sending, no update\n",
  163: 				    type2name(a->a_type));
  164: 			return 0;
  165: 		}
  166: 	}
  167: 
  168: 	return 1;
  169: }
  170: 
  171: static void * build_item_msg(item_t *intf, size_t *size)
  172: {
  173: 	int i, off = 0;
  174: 	void *buf, *opts;
  175: 	size_t msgsize = 0, namelen, nattrs = 0, optsize;
  176: 	struct distr_msg_item *ip;
  177: 	struct distr_msg_attr *ap;
  178: 
  179: 	namelen = (strlen(intf->i_name) + 5) & ~3; /* 5 because of \0 */
  180: 	opts = build_opts(intf, &optsize);
  181: 
  182: 	for (i = 0; i < ATTR_HASH_MAX; i++) {
  183: 		stat_attr_t *a;
  184: 		for (a = intf->i_attrs[i]; a; a = a->a_next)
  185: 			if (worth_sending(a))
  186: 				nattrs++;
  187: 	}
  188: 	
  189: 	msgsize = sizeof(*ip) + namelen + optsize + (nattrs * sizeof(*ap));
  190: 
  191: 	ip = buf = xcalloc(1, msgsize);
  192: 
  193: 	ip->i_index = htons(intf->i_index);
  194: 	ip->i_offset = htons(msgsize);
  195: 	ip->i_namelen = namelen;
  196: 	ip->i_optslen = optsize;
  197: 	ip->i_flags = htons((intf->i_flags & ITEM_FLAG_IS_CHILD) ? IF_IS_CHILD : 0);
  198: 
  199: 	off = sizeof(*ip);
  200: 	memcpy(buf + off, intf->i_name, strlen(intf->i_name));
  201: 	off += namelen;
  202: 
  203: 	if (opts) {
  204: 		memcpy(buf + off, opts, optsize);
  205: 		off += optsize;
  206: 	}
  207: 
  208: 	for (i = 0; i < ATTR_HASH_MAX; i++) {
  209: 		stat_attr_t *a;
  210: 		for (a = intf->i_attrs[i]; a; a = a->a_next) {
  211: 			if (worth_sending(a)) {
  212: 				struct distr_msg_attr am = {
  213: 					.a_type = htons(a->a_type),
  214: 					.a_rx = xhtonll(attr_get_rx(a)),
  215: 					.a_tx = xhtonll(attr_get_tx(a)),
  216: 					.a_flags = htons(
  217: 					    (a->a_flags & ATTR_FLAG_RX_ENABLED ? ATTR_RX_PROVIDED : 0) |
  218: 					    (a->a_flags & ATTR_FLAG_TX_ENABLED ? ATTR_TX_PROVIDED : 0)),
  219: 				};
  220: 
  221: 				COPY_TS(&a->a_last_distribution, &a->a_updated);
  222: 				memcpy(buf + off, &am, sizeof(am));
  223: 				off += sizeof(am);
  224: 			}
  225: 		}
  226: 	}
  227: 
  228: 	*size = msgsize;
  229: 	return buf;
  230: }
  231: 
  232: static void * build_item_group(node_t *node, size_t *size)
  233: {
  234: 	size_t grpsize = sizeof(struct distr_msg_grp);
  235: 	struct distr_msg_grp *gp;
  236: 	void *group;
  237: 	int i;
  238: 
  239: 	gp = group = xcalloc(1, grpsize);
  240: 	
  241: 	for (i = 0; i < node->n_nitems; i++) {
  242: 		if (node->n_items[i].i_name[0]) {
  243: 			size_t size;
  244: 			void *im = build_item_msg(&node->n_items[i], &size);
  245: 			int goff = grpsize;
  246: 
  247: 			grpsize += size;
  248: 			gp = group = xrealloc(group, grpsize);
  249: 			memcpy(group + goff, im, size);
  250: 
  251: 			xfree(im);
  252: 		}
  253: 	}
  254: 
  255: 	gp->g_type = htons(BMON_GRP_IF);
  256: 	gp->g_offset = htons(grpsize);
  257: 
  258: 	*size = grpsize;
  259: 	return group;
  260: }
  261: 
  262: static void distribute_node(node_t *node, void *arg)
  263: {
  264: 	void *buf;
  265: 	struct distr_msg_hdr *hdr;
  266: 	size_t grpsize;
  267: 	void *grp = build_item_group(node, &grpsize);
  268: 	size_t nodenamelen = (strlen(node->n_name) + 5) & ~3; /* 5 because of \0 */
  269: 	size_t msgsize = sizeof(*hdr) + nodenamelen + grpsize;
  270: 
  271: 	hdr = buf = xcalloc(1, msgsize);
  272: 
  273: 	hdr->h_magic = BMON_MAGIC;
  274: 	hdr->h_ver = BMON_VERSION;
  275: 	hdr->h_offset = sizeof(*hdr) + nodenamelen;
  276: 	hdr->h_len = htons(msgsize);
  277: 	hdr->h_ts_sec = htonl(rtiming.rt_last_read.tv_sec);
  278: 	hdr->h_ts_usec = htonl(rtiming.rt_last_read.tv_usec);
  279: 	memcpy(buf + sizeof(*hdr), node->n_name, strlen(node->n_name));
  280: 	memcpy(buf + sizeof(*hdr) + nodenamelen, grp, grpsize);
  281: 
  282: 	if (send(send_fd, buf, msgsize, 0) < 0) {
  283: 		if (!c_errignore)
  284: 			quit("send() failed: %s\n", strerror(errno));
  285: 		else if (c_debug)
  286: 			fprintf(stderr, "Ignoring error %s\n", strerror(errno));
  287: 	}
  288: 	
  289: 	xfree(grp);
  290: 	xfree(buf);
  291: }
  292: 
  293: static void distribute_nodes(void)
  294: {
  295: 	send_all_rem--;
  296: 	
  297: 	if (c_forward)
  298: 		foreach_node(distribute_node, NULL);
  299: 	else
  300: 		distribute_node(get_local_node(), NULL);
  301: 
  302: 	if (send_all_rem <= 0)
  303: 		send_all_rem = c_send_all;
  304: }
  305: 
  306: static void print_module_help(void)
  307: {
  308: 	printf(
  309: 	"Distribution of statistic over a network\n" \
  310: 	"\n" \
  311: 	"  Sends all local statistics to the specified ip or to the\n" \
  312: 	"  multicast address all-hosts if none was specified. The\n" \
  313: 	"  protocol is optimized for size, therefore only counters\n" \
  314: 	"  that have changed will get distributed. The complete list\n" \
  315: 	"  of attribute will be distributed once in a while to make\n" \
  316: 	"  clients also list zero counters.\n" \
  317: 	"\n" \
  318: 	"  You may want to set the option `errignore' while in unicast\n" \
  319: 	"  mode to prevent send() from failing if the destination has\n" \
  320: 	"  gone down for a while (f.e. due to a reboot).\n" \
  321: 	"\n" \
  322: 	"  Remotely collected statistics can be distributed in forwarding\n" \
  323: 	"  mode. However, you must make sure to not create loops as there\n" \
  324: 	"  is no duplicate check on the receiving side and thus would\n" \
  325: 	"  result in statistic corruption.\n" \
  326: 	"\n"
  327: 	"  Author: Thomas Graf <tgraf@suug.ch>\n" \
  328: 	"\n" \
  329: 	"  Options:\n" \
  330: 	"    ip=ADDR          Destination address (default: 224.0.0.1/ff01::1)\n" \
  331: 	"    port=NUM         Destination port (default: 2048)\n" \
  332: 	"    ipv6             Prefer IPv6\n" \
  333: 	"    forward          Forwarding mode, also distribute non-local nodes\n" \
  334: 	"    errignore        Ignore ICMP error messages while sending\n" \
  335: 	"    debug            Verbose output for debugging\n" \
  336: 	"    sendall          Send interval of complete attribute list (default: 15)\n" \
  337: 	"    help             Print this help text\n");
  338: }
  339: 
  340: static void distribution_set_opts(tv_t *attrs)
  341: {
  342: 	while (attrs) {
  343: 		if (!strcasecmp(attrs->type, "ip") && attrs->value)
  344: 			c_ip = attrs->value;
  345: 		else if (!strcasecmp(attrs->type, "port") && attrs->value)
  346: 			c_port = attrs->value;
  347: 		else if (!strcasecmp(attrs->type, "ipv6"))
  348: 			c_ipv6 = 1;
  349: 		else if (!strcasecmp(attrs->type, "forward"))
  350: 			c_forward = 1;
  351: 		else if (!strcasecmp(attrs->type, "errignore"))
  352: 			c_errignore = 1;
  353: 		else if (!strcasecmp(attrs->type, "debug"))
  354: 			c_debug = 1;
  355: 		else if (!strcasecmp(attrs->type, "sendall")) {
  356: 			if (attrs->value)
  357: 				c_send_all = strtol(attrs->value, NULL, 0);
  358: 			else
  359: 				c_send_all = 1;
  360: 		} else if (!strcasecmp(attrs->type, "help")) {
  361: 			print_module_help();
  362: 			exit(0);
  363: 		}
  364: 		attrs = attrs->next;
  365: 	}
  366: }
  367: 
  368: static int distribution_probe(void)
  369: {
  370: 	int err;
  371: 	struct addrinfo *t, *res = NULL;
  372: 	struct addrinfo hints = {
  373: 		.ai_socktype = SOCK_DGRAM,
  374: 		.ai_family = c_ipv6 ? PF_INET6 : PF_INET,
  375: 	};
  376: 	char s[INET6_ADDRSTRLEN];
  377: 
  378: 	if (c_ipv6 && !strcmp(c_ip, "224.0.0.1"))
  379: 		c_ip = "ff01::1";
  380: 	
  381: 	if ((err = getaddrinfo(c_ip, c_port, &hints, &res)) < 0)
  382: 		quit("getaddrinfo failed: %s\n", gai_strerror(err));
  383: 	
  384: 	
  385: 	for (t = res; t; t = t->ai_next) {
  386: 		if (c_debug) {
  387: 			const char *x = xinet_ntop(t->ai_addr, s, sizeof(s));
  388: 			fprintf(stderr, "Trying %s...", x ? x : "null");
  389: 		}
  390: 		send_fd = socket(t->ai_family, t->ai_socktype, 0);
  391: 
  392: 		if (send_fd < 0) {
  393: 			if (c_debug)
  394: 				fprintf(stderr, "socket() failed: %s\n", strerror(errno));
  395: 			continue;
  396: 		}
  397: 
  398: 		if (connect(send_fd, t->ai_addr, t->ai_addrlen) < 0) {
  399: 			if (c_debug)
  400: 				fprintf(stderr, "connect() failed: %s\n", strerror(errno));
  401: 			continue;
  402: 		}
  403: 
  404: 		if (c_debug)
  405: 			fprintf(stderr, "OK\n");
  406: 		return 1;
  407: 	}
  408: 
  409: 	fprintf(stderr, "Could not create and connect a datagram " \
  410: 		"socket, tried:\n");
  411: 
  412: 	for (t = res; t; t = t->ai_next) {
  413: 		const char *x = xinet_ntop(t->ai_addr, s, sizeof(s));
  414: 		fprintf(stderr, "\t%s\n", x ? x : "null");
  415: 	}
  416: 
  417: 	quit("Last error message was: %s\n", strerror(errno));
  418: 	return 0;
  419: }
  420: 
  421: static struct output_module distribution_ops = {
  422: 	.om_name = "distribution",
  423: 	.om_draw = distribute_nodes,
  424: 	.om_set_opts = distribution_set_opts,
  425: 	.om_probe = distribution_probe,
  426: };
  427: 
  428: static void __init do_distribution_init(void)
  429: {
  430: 	register_secondary_output_module(&distribution_ops);
  431: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>