Annotation of embedaddon/mpd/src/contrib/libpdel/util/mesg_port.c, revision 1.1.1.1

1.1       misho       1: 
                      2: /*
                      3:  * Copyright (c) 2001-2002 Packet Design, LLC.
                      4:  * All rights reserved.
                      5:  * 
                      6:  * Subject to the following obligations and disclaimer of warranty,
                      7:  * use and redistribution of this software, in source or object code
                      8:  * forms, with or without modifications are expressly permitted by
                      9:  * Packet Design; provided, however, that:
                     10:  * 
                     11:  *    (i)  Any and all reproductions of the source or object code
                     12:  *         must include the copyright notice above and the following
                     13:  *         disclaimer of warranties; and
                     14:  *    (ii) No rights are granted, in any manner or form, to use
                     15:  *         Packet Design trademarks, including the mark "PACKET DESIGN"
                     16:  *         on advertising, endorsements, or otherwise except as such
                     17:  *         appears in the above copyright notice or in the software.
                     18:  * 
                     19:  * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
                     20:  * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
                     21:  * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
                     22:  * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
                     23:  * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
                     24:  * OR NON-INFRINGEMENT.  PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
                     25:  * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
                     26:  * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
                     27:  * RELIABILITY OR OTHERWISE.  IN NO EVENT SHALL PACKET DESIGN BE
                     28:  * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
                     29:  * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
                     30:  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
                     31:  * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
                     32:  * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
                     33:  * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
                     34:  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
                     35:  * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
                     36:  * THE POSSIBILITY OF SUCH DAMAGE.
                     37:  *
                     38:  * Author: Archie Cobbs <archie@freebsd.org>
                     39:  */
                     40: 
                     41: #include <sys/types.h>
                     42: #include <sys/queue.h>
                     43: #include <sys/time.h>
                     44: 
                     45: #include <stdio.h>
                     46: #include <stdarg.h>
                     47: #include <string.h>
                     48: #include <errno.h>
                     49: #include <assert.h>
                     50: 
                     51: #include <pthread.h>
                     52: 
                     53: #include "structs/structs.h"
                     54: #include "structs/type/array.h"
                     55: 
                     56: #include "util/mesg_port.h"
                     57: #include "util/pevent.h"
                     58: #include "util/typed_mem.h"
                     59: 
                     60: #include "internal.h"
                     61: 
                     62: #define MESG_PORT_MAGIC                0xe8a20c14
                     63: 
                     64: struct mesg {
                     65:        void                    *data;
                     66:        TAILQ_ENTRY(mesg)       next;           /* next message in queue */
                     67: };
                     68: 
                     69: struct mesg_port {
                     70:        u_int32_t               magic;          /* magic number */
                     71:        pthread_mutex_t         mutex;          /* mutex */
                     72:        pthread_cond_t          readable;       /* condition variable */
                     73:        u_int                   qlen;           /* length of queue */
                     74:        TAILQ_HEAD(,mesg)       queue;          /* message queue */
                     75:        const char              *mtype;         /* typed memory type */
                     76:        char                    mtype_buf[TYPED_MEM_TYPELEN];
                     77:        struct pevent           *event;         /* associated event, if any */
                     78: };
                     79: 
                     80: /*
                     81:  * Create a new message port.
                     82:  */
                     83: struct mesg_port *
                     84: mesg_port_create(const char *mtype)
                     85: {
                     86:        struct mesg_port *port;
                     87: 
                     88:        if ((port = MALLOC(mtype, sizeof(*port))) == NULL)
                     89:                return (NULL);
                     90:        memset(port, 0, sizeof(*port));
                     91:        port->magic = MESG_PORT_MAGIC;
                     92:        if (mtype != NULL) {
                     93:                strlcpy(port->mtype_buf, mtype, sizeof(port->mtype_buf));
                     94:                port->mtype = port->mtype_buf;
                     95:        }
                     96:        TAILQ_INIT(&port->queue);
                     97:        if ((errno = pthread_mutex_init(&port->mutex, NULL)) != 0) {
                     98:                FREE(mtype, port);
                     99:                return (NULL);
                    100:        }
                    101:        if ((errno = pthread_cond_init(&port->readable, NULL)) != 0) {
                    102:                pthread_mutex_destroy(&port->mutex);
                    103:                FREE(mtype, port);
                    104:                return (NULL);
                    105:        }
                    106:        return (port);
                    107: }
                    108: 
                    109: /*
                    110:  * Destroy a message port.
                    111:  */
                    112: void
                    113: mesg_port_destroy(struct mesg_port **portp)
                    114: {
                    115:        struct mesg_port *const port = *portp;
                    116: 
                    117:        if (port == NULL)
                    118:                return;
                    119:        *portp = NULL;
                    120:        assert(port->magic == MESG_PORT_MAGIC);
                    121:        assert(port->qlen == 0);
                    122:        if (port->event != NULL)
                    123:                _pevent_unref(port->event);
                    124:        pthread_cond_destroy(&port->readable);
                    125:        pthread_mutex_destroy(&port->mutex);
                    126:        port->magic = 0;
                    127:        FREE(port->mtype, port);
                    128: }
                    129: 
                    130: /*
                    131:  * Send a message down a message port.
                    132:  */
                    133: int
                    134: mesg_port_put(struct mesg_port *port, void *data)
                    135: {
                    136:        struct pevent *ev = NULL;
                    137:        struct mesg *mesg;
                    138:        int r;
                    139: 
                    140:        /* Sanity check */
                    141:        if (data == NULL) {
                    142:                errno = EINVAL;
                    143:                return (-1);
                    144:        }
                    145: 
                    146:        /* Get new mesg */
                    147:        if ((mesg = MALLOC(port->mtype, sizeof(*mesg))) == NULL)
                    148:                return (-1);
                    149:        memset(mesg, 0, sizeof(*mesg));
                    150:        mesg->data = data;
                    151: 
                    152:        /* Add message to the queue */
                    153:        r = pthread_mutex_lock(&port->mutex);
                    154:        assert(r == 0);
                    155:        assert(port->magic == MESG_PORT_MAGIC);
                    156:        TAILQ_INSERT_TAIL(&port->queue, mesg, next);
                    157: 
                    158:        /* Notify waiters and grab associated event */
                    159:        if (port->qlen++ == 0) {
                    160:                pthread_cond_signal(&port->readable);
                    161:                ev = port->event;
                    162:                port->event = NULL;
                    163:        }
                    164: 
                    165:        /* Unlock port */
                    166:        r = pthread_mutex_unlock(&port->mutex);
                    167:        assert(r == 0);
                    168: 
                    169:        /* Trigger the event, if any */
                    170:        if (ev != NULL) {
                    171:                pevent_trigger(ev);
                    172:                _pevent_unref(ev);
                    173:        }
                    174: 
                    175:        /* Done */
                    176:        return (0);
                    177: }
                    178: 
                    179: /*
                    180:  * Read the next message from a message port.
                    181:  */
                    182: void *
                    183: mesg_port_get(struct mesg_port *port, int timeout)
                    184: {
                    185:        struct timespec waketime;
                    186:        struct mesg *mesg;
                    187:        void *data = NULL;
                    188:        int r;
                    189: 
                    190:        /* Get waketime, which is absolute */
                    191:        if (timeout > 0) {
                    192:                struct timeval waketimeval;
                    193:                struct timeval tv;
                    194: 
                    195:                tv.tv_sec = timeout / 1000;
                    196:                tv.tv_usec = timeout % 1000;
                    197:                (void)gettimeofday(&waketimeval, NULL);
                    198:                timeradd(&waketimeval, &tv, &waketimeval);
                    199:                TIMEVAL_TO_TIMESPEC(&waketimeval, &waketime);
                    200:        }
                    201: 
                    202:        /* Grab mutex, but release if thread is canceled */
                    203:        r = pthread_mutex_lock(&port->mutex);
                    204:        assert(r == 0);
                    205:        pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock,
                    206:            &port->mutex);
                    207:        assert(port->magic == MESG_PORT_MAGIC);
                    208: 
                    209:        /* Wait for the queue to be non-empty */
                    210:        while (port->qlen == 0) {
                    211: 
                    212:                /* Sleep on condition variable with optional timeout */
                    213:                if (timeout == 0)
                    214:                        errno = ETIMEDOUT;
                    215:                else if (timeout > 0) {
                    216:                        errno = pthread_cond_timedwait(&port->readable,
                    217:                            &port->mutex, &waketime);
                    218:                } else {
                    219:                        errno = pthread_cond_wait(&port->readable,
                    220:                            &port->mutex);
                    221:                }
                    222: 
                    223:                /* Sanity check */
                    224:                assert(port->magic == MESG_PORT_MAGIC);
                    225: 
                    226:                /* Bail if there was an error */
                    227:                if (errno != 0)
                    228:                        goto done;
                    229:        }
                    230: 
                    231:        /* Remove next message after grabbing its payload */
                    232:        mesg = TAILQ_FIRST(&port->queue);
                    233:        TAILQ_REMOVE(&port->queue, mesg, next);
                    234:        port->qlen--;
                    235:        data = mesg->data;
                    236:        FREE(port->mtype, mesg);
                    237: 
                    238:        /* Signal next reader in line, if any */
                    239:        if (port->qlen > 0)
                    240:                pthread_cond_signal(&port->readable);
                    241: 
                    242: done:;
                    243:        /* Done */
                    244:        pthread_cleanup_pop(1);
                    245:        return (data);
                    246: }
                    247: 
                    248: /*
                    249:  * Get the number of messages queued on a message port.
                    250:  */
                    251: u_int
                    252: mesg_port_qlen(struct mesg_port *port)
                    253: {
                    254:        u_int qlen;
                    255:        int r;
                    256: 
                    257:        r = pthread_mutex_lock(&port->mutex);
                    258:        assert(r == 0);
                    259:        assert(port->magic == MESG_PORT_MAGIC);
                    260:        qlen = port->qlen;
                    261:        r = pthread_mutex_unlock(&port->mutex);
                    262:        assert(r == 0);
                    263:        return (qlen);
                    264: }
                    265: 
                    266: /*
                    267:  * Set the associated pevent.
                    268:  */
                    269: int
                    270: _mesg_port_set_event(struct mesg_port *port, struct pevent *ev)
                    271: {
                    272:        int rtn;
                    273:        int r;
                    274: 
                    275:        /* Lock port */
                    276:        r = pthread_mutex_lock(&port->mutex);
                    277:        assert(r == 0);
                    278: 
                    279:        /* Sanity check */
                    280:        assert(port->magic == MESG_PORT_MAGIC);
                    281:        assert(ev != NULL);
                    282: 
                    283:        /* Check if event already there; if so, and it's obsolete, nuke it */
                    284:        if (port->event != NULL) {
                    285:                if (!_pevent_canceled(port->event)) {
                    286:                        errno = EBUSY;
                    287:                        rtn = -1;
                    288:                        goto done;
                    289:                }
                    290:                _pevent_unref(port->event);
                    291:                port->event = NULL;
                    292:        }
                    293: 
                    294:        /* Remember event */
                    295:        port->event = ev;
                    296:        rtn = 0;
                    297: 
                    298: done:
                    299:        /* Unlock port */
                    300:        r = pthread_mutex_unlock(&port->mutex);
                    301:        assert(r == 0);
                    302:        return (rtn);
                    303: }
                    304: 

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>