Return to mesg_port.c CVS log | Up to [ELWIX - Embedded LightWeight unIX -] / embedaddon / mpd / src / contrib / libpdel / util |
1.1 misho 1: 2: /* 3: * Copyright (c) 2001-2002 Packet Design, LLC. 4: * All rights reserved. 5: * 6: * Subject to the following obligations and disclaimer of warranty, 7: * use and redistribution of this software, in source or object code 8: * forms, with or without modifications are expressly permitted by 9: * Packet Design; provided, however, that: 10: * 11: * (i) Any and all reproductions of the source or object code 12: * must include the copyright notice above and the following 13: * disclaimer of warranties; and 14: * (ii) No rights are granted, in any manner or form, to use 15: * Packet Design trademarks, including the mark "PACKET DESIGN" 16: * on advertising, endorsements, or otherwise except as such 17: * appears in the above copyright notice or in the software. 18: * 19: * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND 20: * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO 21: * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING 22: * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED 23: * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, 24: * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE, 25: * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS 26: * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY, 27: * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE 28: * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE 29: * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT, 30: * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL 31: * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF 32: * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF 33: * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 34: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF 35: * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF 36: * THE POSSIBILITY OF SUCH DAMAGE. 37: * 38: * Author: Archie Cobbs <archie@freebsd.org> 39: */ 40: 41: #include <sys/types.h> 42: #include <sys/queue.h> 43: #include <sys/time.h> 44: 45: #include <stdio.h> 46: #include <stdarg.h> 47: #include <string.h> 48: #include <errno.h> 49: #include <assert.h> 50: 51: #include <pthread.h> 52: 53: #include "structs/structs.h" 54: #include "structs/type/array.h" 55: 56: #include "util/mesg_port.h" 57: #include "util/pevent.h" 58: #include "util/typed_mem.h" 59: 60: #include "internal.h" 61: 62: #define MESG_PORT_MAGIC 0xe8a20c14 63: 64: struct mesg { 65: void *data; 66: TAILQ_ENTRY(mesg) next; /* next message in queue */ 67: }; 68: 69: struct mesg_port { 70: u_int32_t magic; /* magic number */ 71: pthread_mutex_t mutex; /* mutex */ 72: pthread_cond_t readable; /* condition variable */ 73: u_int qlen; /* length of queue */ 74: TAILQ_HEAD(,mesg) queue; /* message queue */ 75: const char *mtype; /* typed memory type */ 76: char mtype_buf[TYPED_MEM_TYPELEN]; 77: struct pevent *event; /* associated event, if any */ 78: }; 79: 80: /* 81: * Create a new message port. 82: */ 83: struct mesg_port * 84: mesg_port_create(const char *mtype) 85: { 86: struct mesg_port *port; 87: 88: if ((port = MALLOC(mtype, sizeof(*port))) == NULL) 89: return (NULL); 90: memset(port, 0, sizeof(*port)); 91: port->magic = MESG_PORT_MAGIC; 92: if (mtype != NULL) { 93: strlcpy(port->mtype_buf, mtype, sizeof(port->mtype_buf)); 94: port->mtype = port->mtype_buf; 95: } 96: TAILQ_INIT(&port->queue); 97: if ((errno = pthread_mutex_init(&port->mutex, NULL)) != 0) { 98: FREE(mtype, port); 99: return (NULL); 100: } 101: if ((errno = pthread_cond_init(&port->readable, NULL)) != 0) { 102: pthread_mutex_destroy(&port->mutex); 103: FREE(mtype, port); 104: return (NULL); 105: } 106: return (port); 107: } 108: 109: /* 110: * Destroy a message port. 111: */ 112: void 113: mesg_port_destroy(struct mesg_port **portp) 114: { 115: struct mesg_port *const port = *portp; 116: 117: if (port == NULL) 118: return; 119: *portp = NULL; 120: assert(port->magic == MESG_PORT_MAGIC); 121: assert(port->qlen == 0); 122: if (port->event != NULL) 123: _pevent_unref(port->event); 124: pthread_cond_destroy(&port->readable); 125: pthread_mutex_destroy(&port->mutex); 126: port->magic = 0; 127: FREE(port->mtype, port); 128: } 129: 130: /* 131: * Send a message down a message port. 132: */ 133: int 134: mesg_port_put(struct mesg_port *port, void *data) 135: { 136: struct pevent *ev = NULL; 137: struct mesg *mesg; 138: int r; 139: 140: /* Sanity check */ 141: if (data == NULL) { 142: errno = EINVAL; 143: return (-1); 144: } 145: 146: /* Get new mesg */ 147: if ((mesg = MALLOC(port->mtype, sizeof(*mesg))) == NULL) 148: return (-1); 149: memset(mesg, 0, sizeof(*mesg)); 150: mesg->data = data; 151: 152: /* Add message to the queue */ 153: r = pthread_mutex_lock(&port->mutex); 154: assert(r == 0); 155: assert(port->magic == MESG_PORT_MAGIC); 156: TAILQ_INSERT_TAIL(&port->queue, mesg, next); 157: 158: /* Notify waiters and grab associated event */ 159: if (port->qlen++ == 0) { 160: pthread_cond_signal(&port->readable); 161: ev = port->event; 162: port->event = NULL; 163: } 164: 165: /* Unlock port */ 166: r = pthread_mutex_unlock(&port->mutex); 167: assert(r == 0); 168: 169: /* Trigger the event, if any */ 170: if (ev != NULL) { 171: pevent_trigger(ev); 172: _pevent_unref(ev); 173: } 174: 175: /* Done */ 176: return (0); 177: } 178: 179: /* 180: * Read the next message from a message port. 181: */ 182: void * 183: mesg_port_get(struct mesg_port *port, int timeout) 184: { 185: struct timespec waketime; 186: struct mesg *mesg; 187: void *data = NULL; 188: int r; 189: 190: /* Get waketime, which is absolute */ 191: if (timeout > 0) { 192: struct timeval waketimeval; 193: struct timeval tv; 194: 195: tv.tv_sec = timeout / 1000; 196: tv.tv_usec = timeout % 1000; 197: (void)gettimeofday(&waketimeval, NULL); 198: timeradd(&waketimeval, &tv, &waketimeval); 199: TIMEVAL_TO_TIMESPEC(&waketimeval, &waketime); 200: } 201: 202: /* Grab mutex, but release if thread is canceled */ 203: r = pthread_mutex_lock(&port->mutex); 204: assert(r == 0); 205: pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock, 206: &port->mutex); 207: assert(port->magic == MESG_PORT_MAGIC); 208: 209: /* Wait for the queue to be non-empty */ 210: while (port->qlen == 0) { 211: 212: /* Sleep on condition variable with optional timeout */ 213: if (timeout == 0) 214: errno = ETIMEDOUT; 215: else if (timeout > 0) { 216: errno = pthread_cond_timedwait(&port->readable, 217: &port->mutex, &waketime); 218: } else { 219: errno = pthread_cond_wait(&port->readable, 220: &port->mutex); 221: } 222: 223: /* Sanity check */ 224: assert(port->magic == MESG_PORT_MAGIC); 225: 226: /* Bail if there was an error */ 227: if (errno != 0) 228: goto done; 229: } 230: 231: /* Remove next message after grabbing its payload */ 232: mesg = TAILQ_FIRST(&port->queue); 233: TAILQ_REMOVE(&port->queue, mesg, next); 234: port->qlen--; 235: data = mesg->data; 236: FREE(port->mtype, mesg); 237: 238: /* Signal next reader in line, if any */ 239: if (port->qlen > 0) 240: pthread_cond_signal(&port->readable); 241: 242: done:; 243: /* Done */ 244: pthread_cleanup_pop(1); 245: return (data); 246: } 247: 248: /* 249: * Get the number of messages queued on a message port. 250: */ 251: u_int 252: mesg_port_qlen(struct mesg_port *port) 253: { 254: u_int qlen; 255: int r; 256: 257: r = pthread_mutex_lock(&port->mutex); 258: assert(r == 0); 259: assert(port->magic == MESG_PORT_MAGIC); 260: qlen = port->qlen; 261: r = pthread_mutex_unlock(&port->mutex); 262: assert(r == 0); 263: return (qlen); 264: } 265: 266: /* 267: * Set the associated pevent. 268: */ 269: int 270: _mesg_port_set_event(struct mesg_port *port, struct pevent *ev) 271: { 272: int rtn; 273: int r; 274: 275: /* Lock port */ 276: r = pthread_mutex_lock(&port->mutex); 277: assert(r == 0); 278: 279: /* Sanity check */ 280: assert(port->magic == MESG_PORT_MAGIC); 281: assert(ev != NULL); 282: 283: /* Check if event already there; if so, and it's obsolete, nuke it */ 284: if (port->event != NULL) { 285: if (!_pevent_canceled(port->event)) { 286: errno = EBUSY; 287: rtn = -1; 288: goto done; 289: } 290: _pevent_unref(port->event); 291: port->event = NULL; 292: } 293: 294: /* Remember event */ 295: port->event = ev; 296: rtn = 0; 297: 298: done: 299: /* Unlock port */ 300: r = pthread_mutex_unlock(&port->mutex); 301: assert(r == 0); 302: return (rtn); 303: } 304: