Annotation of embedaddon/libevent/evbuffer.c, revision 1.1
1.1 ! misho 1: /*
! 2: * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
! 3: * All rights reserved.
! 4: *
! 5: * Redistribution and use in source and binary forms, with or without
! 6: * modification, are permitted provided that the following conditions
! 7: * are met:
! 8: * 1. Redistributions of source code must retain the above copyright
! 9: * notice, this list of conditions and the following disclaimer.
! 10: * 2. Redistributions in binary form must reproduce the above copyright
! 11: * notice, this list of conditions and the following disclaimer in the
! 12: * documentation and/or other materials provided with the distribution.
! 13: * 3. The name of the author may not be used to endorse or promote products
! 14: * derived from this software without specific prior written permission.
! 15: *
! 16: * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
! 17: * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
! 18: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
! 19: * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
! 20: * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
! 21: * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
! 22: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
! 23: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
! 24: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
! 25: * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
! 26: */
! 27:
! 28: #include <sys/types.h>
! 29:
! 30: #ifdef HAVE_CONFIG_H
! 31: #include "config.h"
! 32: #endif
! 33:
! 34: #ifdef HAVE_SYS_TIME_H
! 35: #include <sys/time.h>
! 36: #endif
! 37:
! 38: #include <errno.h>
! 39: #include <stdio.h>
! 40: #include <stdlib.h>
! 41: #include <string.h>
! 42: #ifdef HAVE_STDARG_H
! 43: #include <stdarg.h>
! 44: #endif
! 45:
! 46: #ifdef WIN32
! 47: #include <winsock2.h>
! 48: #endif
! 49:
! 50: #include "evutil.h"
! 51: #include "event.h"
! 52:
! 53: /* prototypes */
! 54:
! 55: void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
! 56:
! 57: static int
! 58: bufferevent_add(struct event *ev, int timeout)
! 59: {
! 60: struct timeval tv, *ptv = NULL;
! 61:
! 62: if (timeout) {
! 63: evutil_timerclear(&tv);
! 64: tv.tv_sec = timeout;
! 65: ptv = &tv;
! 66: }
! 67:
! 68: return (event_add(ev, ptv));
! 69: }
! 70:
! 71: /*
! 72: * This callback is executed when the size of the input buffer changes.
! 73: * We use it to apply back pressure on the reading side.
! 74: */
! 75:
! 76: void
! 77: bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
! 78: void *arg) {
! 79: struct bufferevent *bufev = arg;
! 80: /*
! 81: * If we are below the watermark then reschedule reading if it's
! 82: * still enabled.
! 83: */
! 84: if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
! 85: evbuffer_setcb(buf, NULL, NULL);
! 86:
! 87: if (bufev->enabled & EV_READ)
! 88: bufferevent_add(&bufev->ev_read, bufev->timeout_read);
! 89: }
! 90: }
! 91:
! 92: static void
! 93: bufferevent_readcb(int fd, short event, void *arg)
! 94: {
! 95: struct bufferevent *bufev = arg;
! 96: int res = 0;
! 97: short what = EVBUFFER_READ;
! 98: size_t len;
! 99: int howmuch = -1;
! 100:
! 101: if (event == EV_TIMEOUT) {
! 102: what |= EVBUFFER_TIMEOUT;
! 103: goto error;
! 104: }
! 105:
! 106: /*
! 107: * If we have a high watermark configured then we don't want to
! 108: * read more data than would make us reach the watermark.
! 109: */
! 110: if (bufev->wm_read.high != 0) {
! 111: howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
! 112: /* we might have lowered the watermark, stop reading */
! 113: if (howmuch <= 0) {
! 114: struct evbuffer *buf = bufev->input;
! 115: event_del(&bufev->ev_read);
! 116: evbuffer_setcb(buf,
! 117: bufferevent_read_pressure_cb, bufev);
! 118: return;
! 119: }
! 120: }
! 121:
! 122: res = evbuffer_read(bufev->input, fd, howmuch);
! 123: if (res == -1) {
! 124: if (errno == EAGAIN || errno == EINTR)
! 125: goto reschedule;
! 126: /* error case */
! 127: what |= EVBUFFER_ERROR;
! 128: } else if (res == 0) {
! 129: /* eof case */
! 130: what |= EVBUFFER_EOF;
! 131: }
! 132:
! 133: if (res <= 0)
! 134: goto error;
! 135:
! 136: bufferevent_add(&bufev->ev_read, bufev->timeout_read);
! 137:
! 138: /* See if this callbacks meets the water marks */
! 139: len = EVBUFFER_LENGTH(bufev->input);
! 140: if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
! 141: return;
! 142: if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
! 143: struct evbuffer *buf = bufev->input;
! 144: event_del(&bufev->ev_read);
! 145:
! 146: /* Now schedule a callback for us when the buffer changes */
! 147: evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
! 148: }
! 149:
! 150: /* Invoke the user callback - must always be called last */
! 151: if (bufev->readcb != NULL)
! 152: (*bufev->readcb)(bufev, bufev->cbarg);
! 153: return;
! 154:
! 155: reschedule:
! 156: bufferevent_add(&bufev->ev_read, bufev->timeout_read);
! 157: return;
! 158:
! 159: error:
! 160: (*bufev->errorcb)(bufev, what, bufev->cbarg);
! 161: }
! 162:
! 163: static void
! 164: bufferevent_writecb(int fd, short event, void *arg)
! 165: {
! 166: struct bufferevent *bufev = arg;
! 167: int res = 0;
! 168: short what = EVBUFFER_WRITE;
! 169:
! 170: if (event == EV_TIMEOUT) {
! 171: what |= EVBUFFER_TIMEOUT;
! 172: goto error;
! 173: }
! 174:
! 175: if (EVBUFFER_LENGTH(bufev->output)) {
! 176: res = evbuffer_write(bufev->output, fd);
! 177: if (res == -1) {
! 178: #ifndef WIN32
! 179: /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
! 180: *set errno. thus this error checking is not portable*/
! 181: if (errno == EAGAIN ||
! 182: errno == EINTR ||
! 183: errno == EINPROGRESS)
! 184: goto reschedule;
! 185: /* error case */
! 186: what |= EVBUFFER_ERROR;
! 187:
! 188: #else
! 189: goto reschedule;
! 190: #endif
! 191:
! 192: } else if (res == 0) {
! 193: /* eof case */
! 194: what |= EVBUFFER_EOF;
! 195: }
! 196: if (res <= 0)
! 197: goto error;
! 198: }
! 199:
! 200: if (EVBUFFER_LENGTH(bufev->output) != 0)
! 201: bufferevent_add(&bufev->ev_write, bufev->timeout_write);
! 202:
! 203: /*
! 204: * Invoke the user callback if our buffer is drained or below the
! 205: * low watermark.
! 206: */
! 207: if (bufev->writecb != NULL &&
! 208: EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
! 209: (*bufev->writecb)(bufev, bufev->cbarg);
! 210:
! 211: return;
! 212:
! 213: reschedule:
! 214: if (EVBUFFER_LENGTH(bufev->output) != 0)
! 215: bufferevent_add(&bufev->ev_write, bufev->timeout_write);
! 216: return;
! 217:
! 218: error:
! 219: (*bufev->errorcb)(bufev, what, bufev->cbarg);
! 220: }
! 221:
! 222: /*
! 223: * Create a new buffered event object.
! 224: *
! 225: * The read callback is invoked whenever we read new data.
! 226: * The write callback is invoked whenever the output buffer is drained.
! 227: * The error callback is invoked on a write/read error or on EOF.
! 228: *
! 229: * Both read and write callbacks maybe NULL. The error callback is not
! 230: * allowed to be NULL and have to be provided always.
! 231: */
! 232:
! 233: struct bufferevent *
! 234: bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
! 235: everrorcb errorcb, void *cbarg)
! 236: {
! 237: struct bufferevent *bufev;
! 238:
! 239: if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
! 240: return (NULL);
! 241:
! 242: if ((bufev->input = evbuffer_new()) == NULL) {
! 243: free(bufev);
! 244: return (NULL);
! 245: }
! 246:
! 247: if ((bufev->output = evbuffer_new()) == NULL) {
! 248: evbuffer_free(bufev->input);
! 249: free(bufev);
! 250: return (NULL);
! 251: }
! 252:
! 253: event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
! 254: event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
! 255:
! 256: bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
! 257:
! 258: /*
! 259: * Set to EV_WRITE so that using bufferevent_write is going to
! 260: * trigger a callback. Reading needs to be explicitly enabled
! 261: * because otherwise no data will be available.
! 262: */
! 263: bufev->enabled = EV_WRITE;
! 264:
! 265: return (bufev);
! 266: }
! 267:
! 268: void
! 269: bufferevent_setcb(struct bufferevent *bufev,
! 270: evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
! 271: {
! 272: bufev->readcb = readcb;
! 273: bufev->writecb = writecb;
! 274: bufev->errorcb = errorcb;
! 275:
! 276: bufev->cbarg = cbarg;
! 277: }
! 278:
! 279: void
! 280: bufferevent_setfd(struct bufferevent *bufev, int fd)
! 281: {
! 282: event_del(&bufev->ev_read);
! 283: event_del(&bufev->ev_write);
! 284:
! 285: event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
! 286: event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
! 287: if (bufev->ev_base != NULL) {
! 288: event_base_set(bufev->ev_base, &bufev->ev_read);
! 289: event_base_set(bufev->ev_base, &bufev->ev_write);
! 290: }
! 291:
! 292: /* might have to manually trigger event registration */
! 293: }
! 294:
! 295: int
! 296: bufferevent_priority_set(struct bufferevent *bufev, int priority)
! 297: {
! 298: if (event_priority_set(&bufev->ev_read, priority) == -1)
! 299: return (-1);
! 300: if (event_priority_set(&bufev->ev_write, priority) == -1)
! 301: return (-1);
! 302:
! 303: return (0);
! 304: }
! 305:
! 306: /* Closing the file descriptor is the responsibility of the caller */
! 307:
! 308: void
! 309: bufferevent_free(struct bufferevent *bufev)
! 310: {
! 311: event_del(&bufev->ev_read);
! 312: event_del(&bufev->ev_write);
! 313:
! 314: evbuffer_free(bufev->input);
! 315: evbuffer_free(bufev->output);
! 316:
! 317: free(bufev);
! 318: }
! 319:
! 320: /*
! 321: * Returns 0 on success;
! 322: * -1 on failure.
! 323: */
! 324:
! 325: int
! 326: bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
! 327: {
! 328: int res;
! 329:
! 330: res = evbuffer_add(bufev->output, data, size);
! 331:
! 332: if (res == -1)
! 333: return (res);
! 334:
! 335: /* If everything is okay, we need to schedule a write */
! 336: if (size > 0 && (bufev->enabled & EV_WRITE))
! 337: bufferevent_add(&bufev->ev_write, bufev->timeout_write);
! 338:
! 339: return (res);
! 340: }
! 341:
! 342: int
! 343: bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
! 344: {
! 345: int res;
! 346:
! 347: res = bufferevent_write(bufev, buf->buffer, buf->off);
! 348: if (res != -1)
! 349: evbuffer_drain(buf, buf->off);
! 350:
! 351: return (res);
! 352: }
! 353:
! 354: size_t
! 355: bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
! 356: {
! 357: struct evbuffer *buf = bufev->input;
! 358:
! 359: if (buf->off < size)
! 360: size = buf->off;
! 361:
! 362: /* Copy the available data to the user buffer */
! 363: memcpy(data, buf->buffer, size);
! 364:
! 365: if (size)
! 366: evbuffer_drain(buf, size);
! 367:
! 368: return (size);
! 369: }
! 370:
! 371: int
! 372: bufferevent_enable(struct bufferevent *bufev, short event)
! 373: {
! 374: if (event & EV_READ) {
! 375: if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
! 376: return (-1);
! 377: }
! 378: if (event & EV_WRITE) {
! 379: if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
! 380: return (-1);
! 381: }
! 382:
! 383: bufev->enabled |= event;
! 384: return (0);
! 385: }
! 386:
! 387: int
! 388: bufferevent_disable(struct bufferevent *bufev, short event)
! 389: {
! 390: if (event & EV_READ) {
! 391: if (event_del(&bufev->ev_read) == -1)
! 392: return (-1);
! 393: }
! 394: if (event & EV_WRITE) {
! 395: if (event_del(&bufev->ev_write) == -1)
! 396: return (-1);
! 397: }
! 398:
! 399: bufev->enabled &= ~event;
! 400: return (0);
! 401: }
! 402:
! 403: /*
! 404: * Sets the read and write timeout for a buffered event.
! 405: */
! 406:
! 407: void
! 408: bufferevent_settimeout(struct bufferevent *bufev,
! 409: int timeout_read, int timeout_write) {
! 410: bufev->timeout_read = timeout_read;
! 411: bufev->timeout_write = timeout_write;
! 412:
! 413: if (event_pending(&bufev->ev_read, EV_READ, NULL))
! 414: bufferevent_add(&bufev->ev_read, timeout_read);
! 415: if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
! 416: bufferevent_add(&bufev->ev_write, timeout_write);
! 417: }
! 418:
! 419: /*
! 420: * Sets the water marks
! 421: */
! 422:
! 423: void
! 424: bufferevent_setwatermark(struct bufferevent *bufev, short events,
! 425: size_t lowmark, size_t highmark)
! 426: {
! 427: if (events & EV_READ) {
! 428: bufev->wm_read.low = lowmark;
! 429: bufev->wm_read.high = highmark;
! 430: }
! 431:
! 432: if (events & EV_WRITE) {
! 433: bufev->wm_write.low = lowmark;
! 434: bufev->wm_write.high = highmark;
! 435: }
! 436:
! 437: /* If the watermarks changed then see if we should call read again */
! 438: bufferevent_read_pressure_cb(bufev->input,
! 439: 0, EVBUFFER_LENGTH(bufev->input), bufev);
! 440: }
! 441:
! 442: int
! 443: bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
! 444: {
! 445: int res;
! 446:
! 447: bufev->ev_base = base;
! 448:
! 449: res = event_base_set(base, &bufev->ev_read);
! 450: if (res == -1)
! 451: return (res);
! 452:
! 453: res = event_base_set(base, &bufev->ev_write);
! 454: return (res);
! 455: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>