File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / libevent / evbuffer.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Feb 21 23:02:54 2012 UTC (12 years, 3 months ago) by misho
Branches: libevent, MAIN
CVS tags: v1_4_14bp0, v1_4_14b, HEAD
libevent

    1: /*
    2:  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
    3:  * All rights reserved.
    4:  *
    5:  * Redistribution and use in source and binary forms, with or without
    6:  * modification, are permitted provided that the following conditions
    7:  * are met:
    8:  * 1. Redistributions of source code must retain the above copyright
    9:  *    notice, this list of conditions and the following disclaimer.
   10:  * 2. Redistributions in binary form must reproduce the above copyright
   11:  *    notice, this list of conditions and the following disclaimer in the
   12:  *    documentation and/or other materials provided with the distribution.
   13:  * 3. The name of the author may not be used to endorse or promote products
   14:  *    derived from this software without specific prior written permission.
   15:  *
   16:  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
   17:  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   18:  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
   19:  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
   20:  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
   21:  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   22:  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   23:  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   24:  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
   25:  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   26:  */
   27: 
   28: #include <sys/types.h>
   29: 
   30: #ifdef HAVE_CONFIG_H
   31: #include "config.h"
   32: #endif
   33: 
   34: #ifdef HAVE_SYS_TIME_H
   35: #include <sys/time.h>
   36: #endif
   37: 
   38: #include <errno.h>
   39: #include <stdio.h>
   40: #include <stdlib.h>
   41: #include <string.h>
   42: #ifdef HAVE_STDARG_H
   43: #include <stdarg.h>
   44: #endif
   45: 
   46: #ifdef WIN32
   47: #include <winsock2.h>
   48: #endif
   49: 
   50: #include "evutil.h"
   51: #include "event.h"
   52: 
   53: /* prototypes */
   54: 
   55: void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
   56: 
   57: static int
   58: bufferevent_add(struct event *ev, int timeout)
   59: {
   60: 	struct timeval tv, *ptv = NULL;
   61: 
   62: 	if (timeout) {
   63: 		evutil_timerclear(&tv);
   64: 		tv.tv_sec = timeout;
   65: 		ptv = &tv;
   66: 	}
   67: 
   68: 	return (event_add(ev, ptv));
   69: }
   70: 
   71: /* 
   72:  * This callback is executed when the size of the input buffer changes.
   73:  * We use it to apply back pressure on the reading side.
   74:  */
   75: 
   76: void
   77: bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
   78:     void *arg) {
   79: 	struct bufferevent *bufev = arg;
   80: 	/* 
   81: 	 * If we are below the watermark then reschedule reading if it's
   82: 	 * still enabled.
   83: 	 */
   84: 	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
   85: 		evbuffer_setcb(buf, NULL, NULL);
   86: 
   87: 		if (bufev->enabled & EV_READ)
   88: 			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
   89: 	}
   90: }
   91: 
   92: static void
   93: bufferevent_readcb(int fd, short event, void *arg)
   94: {
   95: 	struct bufferevent *bufev = arg;
   96: 	int res = 0;
   97: 	short what = EVBUFFER_READ;
   98: 	size_t len;
   99: 	int howmuch = -1;
  100: 
  101: 	if (event == EV_TIMEOUT) {
  102: 		what |= EVBUFFER_TIMEOUT;
  103: 		goto error;
  104: 	}
  105: 
  106: 	/*
  107: 	 * If we have a high watermark configured then we don't want to
  108: 	 * read more data than would make us reach the watermark.
  109: 	 */
  110: 	if (bufev->wm_read.high != 0) {
  111: 		howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
  112: 		/* we might have lowered the watermark, stop reading */
  113: 		if (howmuch <= 0) {
  114: 			struct evbuffer *buf = bufev->input;
  115: 			event_del(&bufev->ev_read);
  116: 			evbuffer_setcb(buf,
  117: 			    bufferevent_read_pressure_cb, bufev);
  118: 			return;
  119: 		}
  120: 	}
  121: 
  122: 	res = evbuffer_read(bufev->input, fd, howmuch);
  123: 	if (res == -1) {
  124: 		if (errno == EAGAIN || errno == EINTR)
  125: 			goto reschedule;
  126: 		/* error case */
  127: 		what |= EVBUFFER_ERROR;
  128: 	} else if (res == 0) {
  129: 		/* eof case */
  130: 		what |= EVBUFFER_EOF;
  131: 	}
  132: 
  133: 	if (res <= 0)
  134: 		goto error;
  135: 
  136: 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
  137: 
  138: 	/* See if this callbacks meets the water marks */
  139: 	len = EVBUFFER_LENGTH(bufev->input);
  140: 	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
  141: 		return;
  142: 	if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
  143: 		struct evbuffer *buf = bufev->input;
  144: 		event_del(&bufev->ev_read);
  145: 
  146: 		/* Now schedule a callback for us when the buffer changes */
  147: 		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
  148: 	}
  149: 
  150: 	/* Invoke the user callback - must always be called last */
  151: 	if (bufev->readcb != NULL)
  152: 		(*bufev->readcb)(bufev, bufev->cbarg);
  153: 	return;
  154: 
  155:  reschedule:
  156: 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
  157: 	return;
  158: 
  159:  error:
  160: 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
  161: }
  162: 
  163: static void
  164: bufferevent_writecb(int fd, short event, void *arg)
  165: {
  166: 	struct bufferevent *bufev = arg;
  167: 	int res = 0;
  168: 	short what = EVBUFFER_WRITE;
  169: 
  170: 	if (event == EV_TIMEOUT) {
  171: 		what |= EVBUFFER_TIMEOUT;
  172: 		goto error;
  173: 	}
  174: 
  175: 	if (EVBUFFER_LENGTH(bufev->output)) {
  176: 	    res = evbuffer_write(bufev->output, fd);
  177: 	    if (res == -1) {
  178: #ifndef WIN32
  179: /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
  180:  *set errno. thus this error checking is not portable*/
  181: 		    if (errno == EAGAIN ||
  182: 			errno == EINTR ||
  183: 			errno == EINPROGRESS)
  184: 			    goto reschedule;
  185: 		    /* error case */
  186: 		    what |= EVBUFFER_ERROR;
  187: 
  188: #else
  189: 				goto reschedule;
  190: #endif
  191: 
  192: 	    } else if (res == 0) {
  193: 		    /* eof case */
  194: 		    what |= EVBUFFER_EOF;
  195: 	    }
  196: 	    if (res <= 0)
  197: 		    goto error;
  198: 	}
  199: 
  200: 	if (EVBUFFER_LENGTH(bufev->output) != 0)
  201: 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
  202: 
  203: 	/*
  204: 	 * Invoke the user callback if our buffer is drained or below the
  205: 	 * low watermark.
  206: 	 */
  207: 	if (bufev->writecb != NULL &&
  208: 	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
  209: 		(*bufev->writecb)(bufev, bufev->cbarg);
  210: 
  211: 	return;
  212: 
  213:  reschedule:
  214: 	if (EVBUFFER_LENGTH(bufev->output) != 0)
  215: 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
  216: 	return;
  217: 
  218:  error:
  219: 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
  220: }
  221: 
  222: /*
  223:  * Create a new buffered event object.
  224:  *
  225:  * The read callback is invoked whenever we read new data.
  226:  * The write callback is invoked whenever the output buffer is drained.
  227:  * The error callback is invoked on a write/read error or on EOF.
  228:  *
  229:  * Both read and write callbacks maybe NULL.  The error callback is not
  230:  * allowed to be NULL and have to be provided always.
  231:  */
  232: 
  233: struct bufferevent *
  234: bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
  235:     everrorcb errorcb, void *cbarg)
  236: {
  237: 	struct bufferevent *bufev;
  238: 
  239: 	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
  240: 		return (NULL);
  241: 
  242: 	if ((bufev->input = evbuffer_new()) == NULL) {
  243: 		free(bufev);
  244: 		return (NULL);
  245: 	}
  246: 
  247: 	if ((bufev->output = evbuffer_new()) == NULL) {
  248: 		evbuffer_free(bufev->input);
  249: 		free(bufev);
  250: 		return (NULL);
  251: 	}
  252: 
  253: 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
  254: 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
  255: 
  256: 	bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
  257: 
  258: 	/*
  259: 	 * Set to EV_WRITE so that using bufferevent_write is going to
  260: 	 * trigger a callback.  Reading needs to be explicitly enabled
  261: 	 * because otherwise no data will be available.
  262: 	 */
  263: 	bufev->enabled = EV_WRITE;
  264: 
  265: 	return (bufev);
  266: }
  267: 
  268: void
  269: bufferevent_setcb(struct bufferevent *bufev,
  270:     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
  271: {
  272: 	bufev->readcb = readcb;
  273: 	bufev->writecb = writecb;
  274: 	bufev->errorcb = errorcb;
  275: 
  276: 	bufev->cbarg = cbarg;
  277: }
  278: 
  279: void
  280: bufferevent_setfd(struct bufferevent *bufev, int fd)
  281: {
  282: 	event_del(&bufev->ev_read);
  283: 	event_del(&bufev->ev_write);
  284: 
  285: 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
  286: 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
  287: 	if (bufev->ev_base != NULL) {
  288: 		event_base_set(bufev->ev_base, &bufev->ev_read);
  289: 		event_base_set(bufev->ev_base, &bufev->ev_write);
  290: 	}
  291: 
  292: 	/* might have to manually trigger event registration */
  293: }
  294: 
  295: int
  296: bufferevent_priority_set(struct bufferevent *bufev, int priority)
  297: {
  298: 	if (event_priority_set(&bufev->ev_read, priority) == -1)
  299: 		return (-1);
  300: 	if (event_priority_set(&bufev->ev_write, priority) == -1)
  301: 		return (-1);
  302: 
  303: 	return (0);
  304: }
  305: 
  306: /* Closing the file descriptor is the responsibility of the caller */
  307: 
  308: void
  309: bufferevent_free(struct bufferevent *bufev)
  310: {
  311: 	event_del(&bufev->ev_read);
  312: 	event_del(&bufev->ev_write);
  313: 
  314: 	evbuffer_free(bufev->input);
  315: 	evbuffer_free(bufev->output);
  316: 
  317: 	free(bufev);
  318: }
  319: 
  320: /*
  321:  * Returns 0 on success;
  322:  *        -1 on failure.
  323:  */
  324: 
  325: int
  326: bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
  327: {
  328: 	int res;
  329: 
  330: 	res = evbuffer_add(bufev->output, data, size);
  331: 
  332: 	if (res == -1)
  333: 		return (res);
  334: 
  335: 	/* If everything is okay, we need to schedule a write */
  336: 	if (size > 0 && (bufev->enabled & EV_WRITE))
  337: 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
  338: 
  339: 	return (res);
  340: }
  341: 
  342: int
  343: bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
  344: {
  345: 	int res;
  346: 
  347: 	res = bufferevent_write(bufev, buf->buffer, buf->off);
  348: 	if (res != -1)
  349: 		evbuffer_drain(buf, buf->off);
  350: 
  351: 	return (res);
  352: }
  353: 
  354: size_t
  355: bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
  356: {
  357: 	struct evbuffer *buf = bufev->input;
  358: 
  359: 	if (buf->off < size)
  360: 		size = buf->off;
  361: 
  362: 	/* Copy the available data to the user buffer */
  363: 	memcpy(data, buf->buffer, size);
  364: 
  365: 	if (size)
  366: 		evbuffer_drain(buf, size);
  367: 
  368: 	return (size);
  369: }
  370: 
  371: int
  372: bufferevent_enable(struct bufferevent *bufev, short event)
  373: {
  374: 	if (event & EV_READ) {
  375: 		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
  376: 			return (-1);
  377: 	}
  378: 	if (event & EV_WRITE) {
  379: 		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
  380: 			return (-1);
  381: 	}
  382: 
  383: 	bufev->enabled |= event;
  384: 	return (0);
  385: }
  386: 
  387: int
  388: bufferevent_disable(struct bufferevent *bufev, short event)
  389: {
  390: 	if (event & EV_READ) {
  391: 		if (event_del(&bufev->ev_read) == -1)
  392: 			return (-1);
  393: 	}
  394: 	if (event & EV_WRITE) {
  395: 		if (event_del(&bufev->ev_write) == -1)
  396: 			return (-1);
  397: 	}
  398: 
  399: 	bufev->enabled &= ~event;
  400: 	return (0);
  401: }
  402: 
  403: /*
  404:  * Sets the read and write timeout for a buffered event.
  405:  */
  406: 
  407: void
  408: bufferevent_settimeout(struct bufferevent *bufev,
  409:     int timeout_read, int timeout_write) {
  410: 	bufev->timeout_read = timeout_read;
  411: 	bufev->timeout_write = timeout_write;
  412: 
  413: 	if (event_pending(&bufev->ev_read, EV_READ, NULL))
  414: 		bufferevent_add(&bufev->ev_read, timeout_read);
  415: 	if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
  416: 		bufferevent_add(&bufev->ev_write, timeout_write);
  417: }
  418: 
  419: /*
  420:  * Sets the water marks
  421:  */
  422: 
  423: void
  424: bufferevent_setwatermark(struct bufferevent *bufev, short events,
  425:     size_t lowmark, size_t highmark)
  426: {
  427: 	if (events & EV_READ) {
  428: 		bufev->wm_read.low = lowmark;
  429: 		bufev->wm_read.high = highmark;
  430: 	}
  431: 
  432: 	if (events & EV_WRITE) {
  433: 		bufev->wm_write.low = lowmark;
  434: 		bufev->wm_write.high = highmark;
  435: 	}
  436: 
  437: 	/* If the watermarks changed then see if we should call read again */
  438: 	bufferevent_read_pressure_cb(bufev->input,
  439: 	    0, EVBUFFER_LENGTH(bufev->input), bufev);
  440: }
  441: 
  442: int
  443: bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
  444: {
  445: 	int res;
  446: 
  447: 	bufev->ev_base = base;
  448: 
  449: 	res = event_base_set(base, &bufev->ev_read);
  450: 	if (res == -1)
  451: 		return (res);
  452: 
  453: 	res = event_base_set(base, &bufev->ev_write);
  454: 	return (res);
  455: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>