Annotation of embedaddon/libevent/evbuffer.c, revision 1.1

1.1     ! misho       1: /*
        !             2:  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
        !             3:  * All rights reserved.
        !             4:  *
        !             5:  * Redistribution and use in source and binary forms, with or without
        !             6:  * modification, are permitted provided that the following conditions
        !             7:  * are met:
        !             8:  * 1. Redistributions of source code must retain the above copyright
        !             9:  *    notice, this list of conditions and the following disclaimer.
        !            10:  * 2. Redistributions in binary form must reproduce the above copyright
        !            11:  *    notice, this list of conditions and the following disclaimer in the
        !            12:  *    documentation and/or other materials provided with the distribution.
        !            13:  * 3. The name of the author may not be used to endorse or promote products
        !            14:  *    derived from this software without specific prior written permission.
        !            15:  *
        !            16:  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
        !            17:  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
        !            18:  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
        !            19:  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
        !            20:  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
        !            21:  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
        !            22:  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
        !            23:  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
        !            24:  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
        !            25:  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
        !            26:  */
        !            27: 
        !            28: #include <sys/types.h>
        !            29: 
        !            30: #ifdef HAVE_CONFIG_H
        !            31: #include "config.h"
        !            32: #endif
        !            33: 
        !            34: #ifdef HAVE_SYS_TIME_H
        !            35: #include <sys/time.h>
        !            36: #endif
        !            37: 
        !            38: #include <errno.h>
        !            39: #include <stdio.h>
        !            40: #include <stdlib.h>
        !            41: #include <string.h>
        !            42: #ifdef HAVE_STDARG_H
        !            43: #include <stdarg.h>
        !            44: #endif
        !            45: 
        !            46: #ifdef WIN32
        !            47: #include <winsock2.h>
        !            48: #endif
        !            49: 
        !            50: #include "evutil.h"
        !            51: #include "event.h"
        !            52: 
        !            53: /* prototypes */
        !            54: 
        !            55: void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
        !            56: 
        !            57: static int
        !            58: bufferevent_add(struct event *ev, int timeout)
        !            59: {
        !            60:        struct timeval tv, *ptv = NULL;
        !            61: 
        !            62:        if (timeout) {
        !            63:                evutil_timerclear(&tv);
        !            64:                tv.tv_sec = timeout;
        !            65:                ptv = &tv;
        !            66:        }
        !            67: 
        !            68:        return (event_add(ev, ptv));
        !            69: }
        !            70: 
        !            71: /* 
        !            72:  * This callback is executed when the size of the input buffer changes.
        !            73:  * We use it to apply back pressure on the reading side.
        !            74:  */
        !            75: 
        !            76: void
        !            77: bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
        !            78:     void *arg) {
        !            79:        struct bufferevent *bufev = arg;
        !            80:        /* 
        !            81:         * If we are below the watermark then reschedule reading if it's
        !            82:         * still enabled.
        !            83:         */
        !            84:        if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
        !            85:                evbuffer_setcb(buf, NULL, NULL);
        !            86: 
        !            87:                if (bufev->enabled & EV_READ)
        !            88:                        bufferevent_add(&bufev->ev_read, bufev->timeout_read);
        !            89:        }
        !            90: }
        !            91: 
        !            92: static void
        !            93: bufferevent_readcb(int fd, short event, void *arg)
        !            94: {
        !            95:        struct bufferevent *bufev = arg;
        !            96:        int res = 0;
        !            97:        short what = EVBUFFER_READ;
        !            98:        size_t len;
        !            99:        int howmuch = -1;
        !           100: 
        !           101:        if (event == EV_TIMEOUT) {
        !           102:                what |= EVBUFFER_TIMEOUT;
        !           103:                goto error;
        !           104:        }
        !           105: 
        !           106:        /*
        !           107:         * If we have a high watermark configured then we don't want to
        !           108:         * read more data than would make us reach the watermark.
        !           109:         */
        !           110:        if (bufev->wm_read.high != 0) {
        !           111:                howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
        !           112:                /* we might have lowered the watermark, stop reading */
        !           113:                if (howmuch <= 0) {
        !           114:                        struct evbuffer *buf = bufev->input;
        !           115:                        event_del(&bufev->ev_read);
        !           116:                        evbuffer_setcb(buf,
        !           117:                            bufferevent_read_pressure_cb, bufev);
        !           118:                        return;
        !           119:                }
        !           120:        }
        !           121: 
        !           122:        res = evbuffer_read(bufev->input, fd, howmuch);
        !           123:        if (res == -1) {
        !           124:                if (errno == EAGAIN || errno == EINTR)
        !           125:                        goto reschedule;
        !           126:                /* error case */
        !           127:                what |= EVBUFFER_ERROR;
        !           128:        } else if (res == 0) {
        !           129:                /* eof case */
        !           130:                what |= EVBUFFER_EOF;
        !           131:        }
        !           132: 
        !           133:        if (res <= 0)
        !           134:                goto error;
        !           135: 
        !           136:        bufferevent_add(&bufev->ev_read, bufev->timeout_read);
        !           137: 
        !           138:        /* See if this callbacks meets the water marks */
        !           139:        len = EVBUFFER_LENGTH(bufev->input);
        !           140:        if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
        !           141:                return;
        !           142:        if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
        !           143:                struct evbuffer *buf = bufev->input;
        !           144:                event_del(&bufev->ev_read);
        !           145: 
        !           146:                /* Now schedule a callback for us when the buffer changes */
        !           147:                evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
        !           148:        }
        !           149: 
        !           150:        /* Invoke the user callback - must always be called last */
        !           151:        if (bufev->readcb != NULL)
        !           152:                (*bufev->readcb)(bufev, bufev->cbarg);
        !           153:        return;
        !           154: 
        !           155:  reschedule:
        !           156:        bufferevent_add(&bufev->ev_read, bufev->timeout_read);
        !           157:        return;
        !           158: 
        !           159:  error:
        !           160:        (*bufev->errorcb)(bufev, what, bufev->cbarg);
        !           161: }
        !           162: 
        !           163: static void
        !           164: bufferevent_writecb(int fd, short event, void *arg)
        !           165: {
        !           166:        struct bufferevent *bufev = arg;
        !           167:        int res = 0;
        !           168:        short what = EVBUFFER_WRITE;
        !           169: 
        !           170:        if (event == EV_TIMEOUT) {
        !           171:                what |= EVBUFFER_TIMEOUT;
        !           172:                goto error;
        !           173:        }
        !           174: 
        !           175:        if (EVBUFFER_LENGTH(bufev->output)) {
        !           176:            res = evbuffer_write(bufev->output, fd);
        !           177:            if (res == -1) {
        !           178: #ifndef WIN32
        !           179: /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
        !           180:  *set errno. thus this error checking is not portable*/
        !           181:                    if (errno == EAGAIN ||
        !           182:                        errno == EINTR ||
        !           183:                        errno == EINPROGRESS)
        !           184:                            goto reschedule;
        !           185:                    /* error case */
        !           186:                    what |= EVBUFFER_ERROR;
        !           187: 
        !           188: #else
        !           189:                                goto reschedule;
        !           190: #endif
        !           191: 
        !           192:            } else if (res == 0) {
        !           193:                    /* eof case */
        !           194:                    what |= EVBUFFER_EOF;
        !           195:            }
        !           196:            if (res <= 0)
        !           197:                    goto error;
        !           198:        }
        !           199: 
        !           200:        if (EVBUFFER_LENGTH(bufev->output) != 0)
        !           201:                bufferevent_add(&bufev->ev_write, bufev->timeout_write);
        !           202: 
        !           203:        /*
        !           204:         * Invoke the user callback if our buffer is drained or below the
        !           205:         * low watermark.
        !           206:         */
        !           207:        if (bufev->writecb != NULL &&
        !           208:            EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
        !           209:                (*bufev->writecb)(bufev, bufev->cbarg);
        !           210: 
        !           211:        return;
        !           212: 
        !           213:  reschedule:
        !           214:        if (EVBUFFER_LENGTH(bufev->output) != 0)
        !           215:                bufferevent_add(&bufev->ev_write, bufev->timeout_write);
        !           216:        return;
        !           217: 
        !           218:  error:
        !           219:        (*bufev->errorcb)(bufev, what, bufev->cbarg);
        !           220: }
        !           221: 
        !           222: /*
        !           223:  * Create a new buffered event object.
        !           224:  *
        !           225:  * The read callback is invoked whenever we read new data.
        !           226:  * The write callback is invoked whenever the output buffer is drained.
        !           227:  * The error callback is invoked on a write/read error or on EOF.
        !           228:  *
        !           229:  * Both read and write callbacks maybe NULL.  The error callback is not
        !           230:  * allowed to be NULL and have to be provided always.
        !           231:  */
        !           232: 
        !           233: struct bufferevent *
        !           234: bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
        !           235:     everrorcb errorcb, void *cbarg)
        !           236: {
        !           237:        struct bufferevent *bufev;
        !           238: 
        !           239:        if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
        !           240:                return (NULL);
        !           241: 
        !           242:        if ((bufev->input = evbuffer_new()) == NULL) {
        !           243:                free(bufev);
        !           244:                return (NULL);
        !           245:        }
        !           246: 
        !           247:        if ((bufev->output = evbuffer_new()) == NULL) {
        !           248:                evbuffer_free(bufev->input);
        !           249:                free(bufev);
        !           250:                return (NULL);
        !           251:        }
        !           252: 
        !           253:        event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
        !           254:        event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
        !           255: 
        !           256:        bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
        !           257: 
        !           258:        /*
        !           259:         * Set to EV_WRITE so that using bufferevent_write is going to
        !           260:         * trigger a callback.  Reading needs to be explicitly enabled
        !           261:         * because otherwise no data will be available.
        !           262:         */
        !           263:        bufev->enabled = EV_WRITE;
        !           264: 
        !           265:        return (bufev);
        !           266: }
        !           267: 
        !           268: void
        !           269: bufferevent_setcb(struct bufferevent *bufev,
        !           270:     evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
        !           271: {
        !           272:        bufev->readcb = readcb;
        !           273:        bufev->writecb = writecb;
        !           274:        bufev->errorcb = errorcb;
        !           275: 
        !           276:        bufev->cbarg = cbarg;
        !           277: }
        !           278: 
        !           279: void
        !           280: bufferevent_setfd(struct bufferevent *bufev, int fd)
        !           281: {
        !           282:        event_del(&bufev->ev_read);
        !           283:        event_del(&bufev->ev_write);
        !           284: 
        !           285:        event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
        !           286:        event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
        !           287:        if (bufev->ev_base != NULL) {
        !           288:                event_base_set(bufev->ev_base, &bufev->ev_read);
        !           289:                event_base_set(bufev->ev_base, &bufev->ev_write);
        !           290:        }
        !           291: 
        !           292:        /* might have to manually trigger event registration */
        !           293: }
        !           294: 
        !           295: int
        !           296: bufferevent_priority_set(struct bufferevent *bufev, int priority)
        !           297: {
        !           298:        if (event_priority_set(&bufev->ev_read, priority) == -1)
        !           299:                return (-1);
        !           300:        if (event_priority_set(&bufev->ev_write, priority) == -1)
        !           301:                return (-1);
        !           302: 
        !           303:        return (0);
        !           304: }
        !           305: 
        !           306: /* Closing the file descriptor is the responsibility of the caller */
        !           307: 
        !           308: void
        !           309: bufferevent_free(struct bufferevent *bufev)
        !           310: {
        !           311:        event_del(&bufev->ev_read);
        !           312:        event_del(&bufev->ev_write);
        !           313: 
        !           314:        evbuffer_free(bufev->input);
        !           315:        evbuffer_free(bufev->output);
        !           316: 
        !           317:        free(bufev);
        !           318: }
        !           319: 
        !           320: /*
        !           321:  * Returns 0 on success;
        !           322:  *        -1 on failure.
        !           323:  */
        !           324: 
        !           325: int
        !           326: bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
        !           327: {
        !           328:        int res;
        !           329: 
        !           330:        res = evbuffer_add(bufev->output, data, size);
        !           331: 
        !           332:        if (res == -1)
        !           333:                return (res);
        !           334: 
        !           335:        /* If everything is okay, we need to schedule a write */
        !           336:        if (size > 0 && (bufev->enabled & EV_WRITE))
        !           337:                bufferevent_add(&bufev->ev_write, bufev->timeout_write);
        !           338: 
        !           339:        return (res);
        !           340: }
        !           341: 
        !           342: int
        !           343: bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
        !           344: {
        !           345:        int res;
        !           346: 
        !           347:        res = bufferevent_write(bufev, buf->buffer, buf->off);
        !           348:        if (res != -1)
        !           349:                evbuffer_drain(buf, buf->off);
        !           350: 
        !           351:        return (res);
        !           352: }
        !           353: 
        !           354: size_t
        !           355: bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
        !           356: {
        !           357:        struct evbuffer *buf = bufev->input;
        !           358: 
        !           359:        if (buf->off < size)
        !           360:                size = buf->off;
        !           361: 
        !           362:        /* Copy the available data to the user buffer */
        !           363:        memcpy(data, buf->buffer, size);
        !           364: 
        !           365:        if (size)
        !           366:                evbuffer_drain(buf, size);
        !           367: 
        !           368:        return (size);
        !           369: }
        !           370: 
        !           371: int
        !           372: bufferevent_enable(struct bufferevent *bufev, short event)
        !           373: {
        !           374:        if (event & EV_READ) {
        !           375:                if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
        !           376:                        return (-1);
        !           377:        }
        !           378:        if (event & EV_WRITE) {
        !           379:                if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
        !           380:                        return (-1);
        !           381:        }
        !           382: 
        !           383:        bufev->enabled |= event;
        !           384:        return (0);
        !           385: }
        !           386: 
        !           387: int
        !           388: bufferevent_disable(struct bufferevent *bufev, short event)
        !           389: {
        !           390:        if (event & EV_READ) {
        !           391:                if (event_del(&bufev->ev_read) == -1)
        !           392:                        return (-1);
        !           393:        }
        !           394:        if (event & EV_WRITE) {
        !           395:                if (event_del(&bufev->ev_write) == -1)
        !           396:                        return (-1);
        !           397:        }
        !           398: 
        !           399:        bufev->enabled &= ~event;
        !           400:        return (0);
        !           401: }
        !           402: 
        !           403: /*
        !           404:  * Sets the read and write timeout for a buffered event.
        !           405:  */
        !           406: 
        !           407: void
        !           408: bufferevent_settimeout(struct bufferevent *bufev,
        !           409:     int timeout_read, int timeout_write) {
        !           410:        bufev->timeout_read = timeout_read;
        !           411:        bufev->timeout_write = timeout_write;
        !           412: 
        !           413:        if (event_pending(&bufev->ev_read, EV_READ, NULL))
        !           414:                bufferevent_add(&bufev->ev_read, timeout_read);
        !           415:        if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
        !           416:                bufferevent_add(&bufev->ev_write, timeout_write);
        !           417: }
        !           418: 
        !           419: /*
        !           420:  * Sets the water marks
        !           421:  */
        !           422: 
        !           423: void
        !           424: bufferevent_setwatermark(struct bufferevent *bufev, short events,
        !           425:     size_t lowmark, size_t highmark)
        !           426: {
        !           427:        if (events & EV_READ) {
        !           428:                bufev->wm_read.low = lowmark;
        !           429:                bufev->wm_read.high = highmark;
        !           430:        }
        !           431: 
        !           432:        if (events & EV_WRITE) {
        !           433:                bufev->wm_write.low = lowmark;
        !           434:                bufev->wm_write.high = highmark;
        !           435:        }
        !           436: 
        !           437:        /* If the watermarks changed then see if we should call read again */
        !           438:        bufferevent_read_pressure_cb(bufev->input,
        !           439:            0, EVBUFFER_LENGTH(bufev->input), bufev);
        !           440: }
        !           441: 
        !           442: int
        !           443: bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
        !           444: {
        !           445:        int res;
        !           446: 
        !           447:        bufev->ev_base = base;
        !           448: 
        !           449:        res = event_base_set(base, &bufev->ev_read);
        !           450:        if (res == -1)
        !           451:                return (res);
        !           452: 
        !           453:        res = event_base_set(base, &bufev->ev_write);
        !           454:        return (res);
        !           455: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>