Annotation of embedaddon/mpd/src/contrib/libpdel/util/mesg_port.c, revision 1.1
1.1 ! misho 1:
! 2: /*
! 3: * Copyright (c) 2001-2002 Packet Design, LLC.
! 4: * All rights reserved.
! 5: *
! 6: * Subject to the following obligations and disclaimer of warranty,
! 7: * use and redistribution of this software, in source or object code
! 8: * forms, with or without modifications are expressly permitted by
! 9: * Packet Design; provided, however, that:
! 10: *
! 11: * (i) Any and all reproductions of the source or object code
! 12: * must include the copyright notice above and the following
! 13: * disclaimer of warranties; and
! 14: * (ii) No rights are granted, in any manner or form, to use
! 15: * Packet Design trademarks, including the mark "PACKET DESIGN"
! 16: * on advertising, endorsements, or otherwise except as such
! 17: * appears in the above copyright notice or in the software.
! 18: *
! 19: * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
! 20: * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
! 21: * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
! 22: * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
! 23: * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
! 24: * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
! 25: * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
! 26: * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
! 27: * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE
! 28: * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
! 29: * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
! 30: * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
! 31: * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
! 32: * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
! 33: * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
! 34: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
! 35: * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
! 36: * THE POSSIBILITY OF SUCH DAMAGE.
! 37: *
! 38: * Author: Archie Cobbs <archie@freebsd.org>
! 39: */
! 40:
! 41: #include <sys/types.h>
! 42: #include <sys/queue.h>
! 43: #include <sys/time.h>
! 44:
! 45: #include <stdio.h>
! 46: #include <stdarg.h>
! 47: #include <string.h>
! 48: #include <errno.h>
! 49: #include <assert.h>
! 50:
! 51: #include <pthread.h>
! 52:
! 53: #include "structs/structs.h"
! 54: #include "structs/type/array.h"
! 55:
! 56: #include "util/mesg_port.h"
! 57: #include "util/pevent.h"
! 58: #include "util/typed_mem.h"
! 59:
! 60: #include "internal.h"
! 61:
! 62: #define MESG_PORT_MAGIC 0xe8a20c14
! 63:
! 64: struct mesg {
! 65: void *data;
! 66: TAILQ_ENTRY(mesg) next; /* next message in queue */
! 67: };
! 68:
! 69: struct mesg_port {
! 70: u_int32_t magic; /* magic number */
! 71: pthread_mutex_t mutex; /* mutex */
! 72: pthread_cond_t readable; /* condition variable */
! 73: u_int qlen; /* length of queue */
! 74: TAILQ_HEAD(,mesg) queue; /* message queue */
! 75: const char *mtype; /* typed memory type */
! 76: char mtype_buf[TYPED_MEM_TYPELEN];
! 77: struct pevent *event; /* associated event, if any */
! 78: };
! 79:
! 80: /*
! 81: * Create a new message port.
! 82: */
! 83: struct mesg_port *
! 84: mesg_port_create(const char *mtype)
! 85: {
! 86: struct mesg_port *port;
! 87:
! 88: if ((port = MALLOC(mtype, sizeof(*port))) == NULL)
! 89: return (NULL);
! 90: memset(port, 0, sizeof(*port));
! 91: port->magic = MESG_PORT_MAGIC;
! 92: if (mtype != NULL) {
! 93: strlcpy(port->mtype_buf, mtype, sizeof(port->mtype_buf));
! 94: port->mtype = port->mtype_buf;
! 95: }
! 96: TAILQ_INIT(&port->queue);
! 97: if ((errno = pthread_mutex_init(&port->mutex, NULL)) != 0) {
! 98: FREE(mtype, port);
! 99: return (NULL);
! 100: }
! 101: if ((errno = pthread_cond_init(&port->readable, NULL)) != 0) {
! 102: pthread_mutex_destroy(&port->mutex);
! 103: FREE(mtype, port);
! 104: return (NULL);
! 105: }
! 106: return (port);
! 107: }
! 108:
! 109: /*
! 110: * Destroy a message port.
! 111: */
! 112: void
! 113: mesg_port_destroy(struct mesg_port **portp)
! 114: {
! 115: struct mesg_port *const port = *portp;
! 116:
! 117: if (port == NULL)
! 118: return;
! 119: *portp = NULL;
! 120: assert(port->magic == MESG_PORT_MAGIC);
! 121: assert(port->qlen == 0);
! 122: if (port->event != NULL)
! 123: _pevent_unref(port->event);
! 124: pthread_cond_destroy(&port->readable);
! 125: pthread_mutex_destroy(&port->mutex);
! 126: port->magic = 0;
! 127: FREE(port->mtype, port);
! 128: }
! 129:
! 130: /*
! 131: * Send a message down a message port.
! 132: */
! 133: int
! 134: mesg_port_put(struct mesg_port *port, void *data)
! 135: {
! 136: struct pevent *ev = NULL;
! 137: struct mesg *mesg;
! 138: int r;
! 139:
! 140: /* Sanity check */
! 141: if (data == NULL) {
! 142: errno = EINVAL;
! 143: return (-1);
! 144: }
! 145:
! 146: /* Get new mesg */
! 147: if ((mesg = MALLOC(port->mtype, sizeof(*mesg))) == NULL)
! 148: return (-1);
! 149: memset(mesg, 0, sizeof(*mesg));
! 150: mesg->data = data;
! 151:
! 152: /* Add message to the queue */
! 153: r = pthread_mutex_lock(&port->mutex);
! 154: assert(r == 0);
! 155: assert(port->magic == MESG_PORT_MAGIC);
! 156: TAILQ_INSERT_TAIL(&port->queue, mesg, next);
! 157:
! 158: /* Notify waiters and grab associated event */
! 159: if (port->qlen++ == 0) {
! 160: pthread_cond_signal(&port->readable);
! 161: ev = port->event;
! 162: port->event = NULL;
! 163: }
! 164:
! 165: /* Unlock port */
! 166: r = pthread_mutex_unlock(&port->mutex);
! 167: assert(r == 0);
! 168:
! 169: /* Trigger the event, if any */
! 170: if (ev != NULL) {
! 171: pevent_trigger(ev);
! 172: _pevent_unref(ev);
! 173: }
! 174:
! 175: /* Done */
! 176: return (0);
! 177: }
! 178:
! 179: /*
! 180: * Read the next message from a message port.
! 181: */
! 182: void *
! 183: mesg_port_get(struct mesg_port *port, int timeout)
! 184: {
! 185: struct timespec waketime;
! 186: struct mesg *mesg;
! 187: void *data = NULL;
! 188: int r;
! 189:
! 190: /* Get waketime, which is absolute */
! 191: if (timeout > 0) {
! 192: struct timeval waketimeval;
! 193: struct timeval tv;
! 194:
! 195: tv.tv_sec = timeout / 1000;
! 196: tv.tv_usec = timeout % 1000;
! 197: (void)gettimeofday(&waketimeval, NULL);
! 198: timeradd(&waketimeval, &tv, &waketimeval);
! 199: TIMEVAL_TO_TIMESPEC(&waketimeval, &waketime);
! 200: }
! 201:
! 202: /* Grab mutex, but release if thread is canceled */
! 203: r = pthread_mutex_lock(&port->mutex);
! 204: assert(r == 0);
! 205: pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock,
! 206: &port->mutex);
! 207: assert(port->magic == MESG_PORT_MAGIC);
! 208:
! 209: /* Wait for the queue to be non-empty */
! 210: while (port->qlen == 0) {
! 211:
! 212: /* Sleep on condition variable with optional timeout */
! 213: if (timeout == 0)
! 214: errno = ETIMEDOUT;
! 215: else if (timeout > 0) {
! 216: errno = pthread_cond_timedwait(&port->readable,
! 217: &port->mutex, &waketime);
! 218: } else {
! 219: errno = pthread_cond_wait(&port->readable,
! 220: &port->mutex);
! 221: }
! 222:
! 223: /* Sanity check */
! 224: assert(port->magic == MESG_PORT_MAGIC);
! 225:
! 226: /* Bail if there was an error */
! 227: if (errno != 0)
! 228: goto done;
! 229: }
! 230:
! 231: /* Remove next message after grabbing its payload */
! 232: mesg = TAILQ_FIRST(&port->queue);
! 233: TAILQ_REMOVE(&port->queue, mesg, next);
! 234: port->qlen--;
! 235: data = mesg->data;
! 236: FREE(port->mtype, mesg);
! 237:
! 238: /* Signal next reader in line, if any */
! 239: if (port->qlen > 0)
! 240: pthread_cond_signal(&port->readable);
! 241:
! 242: done:;
! 243: /* Done */
! 244: pthread_cleanup_pop(1);
! 245: return (data);
! 246: }
! 247:
! 248: /*
! 249: * Get the number of messages queued on a message port.
! 250: */
! 251: u_int
! 252: mesg_port_qlen(struct mesg_port *port)
! 253: {
! 254: u_int qlen;
! 255: int r;
! 256:
! 257: r = pthread_mutex_lock(&port->mutex);
! 258: assert(r == 0);
! 259: assert(port->magic == MESG_PORT_MAGIC);
! 260: qlen = port->qlen;
! 261: r = pthread_mutex_unlock(&port->mutex);
! 262: assert(r == 0);
! 263: return (qlen);
! 264: }
! 265:
! 266: /*
! 267: * Set the associated pevent.
! 268: */
! 269: int
! 270: _mesg_port_set_event(struct mesg_port *port, struct pevent *ev)
! 271: {
! 272: int rtn;
! 273: int r;
! 274:
! 275: /* Lock port */
! 276: r = pthread_mutex_lock(&port->mutex);
! 277: assert(r == 0);
! 278:
! 279: /* Sanity check */
! 280: assert(port->magic == MESG_PORT_MAGIC);
! 281: assert(ev != NULL);
! 282:
! 283: /* Check if event already there; if so, and it's obsolete, nuke it */
! 284: if (port->event != NULL) {
! 285: if (!_pevent_canceled(port->event)) {
! 286: errno = EBUSY;
! 287: rtn = -1;
! 288: goto done;
! 289: }
! 290: _pevent_unref(port->event);
! 291: port->event = NULL;
! 292: }
! 293:
! 294: /* Remember event */
! 295: port->event = ev;
! 296: rtn = 0;
! 297:
! 298: done:
! 299: /* Unlock port */
! 300: r = pthread_mutex_unlock(&port->mutex);
! 301: assert(r == 0);
! 302: return (rtn);
! 303: }
! 304:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>