Annotation of embedaddon/mpd/src/contrib/libpdel/util/mesg_port.c, revision 1.1.1.1
1.1 misho 1:
2: /*
3: * Copyright (c) 2001-2002 Packet Design, LLC.
4: * All rights reserved.
5: *
6: * Subject to the following obligations and disclaimer of warranty,
7: * use and redistribution of this software, in source or object code
8: * forms, with or without modifications are expressly permitted by
9: * Packet Design; provided, however, that:
10: *
11: * (i) Any and all reproductions of the source or object code
12: * must include the copyright notice above and the following
13: * disclaimer of warranties; and
14: * (ii) No rights are granted, in any manner or form, to use
15: * Packet Design trademarks, including the mark "PACKET DESIGN"
16: * on advertising, endorsements, or otherwise except as such
17: * appears in the above copyright notice or in the software.
18: *
19: * THIS SOFTWARE IS BEING PROVIDED BY PACKET DESIGN "AS IS", AND
20: * TO THE MAXIMUM EXTENT PERMITTED BY LAW, PACKET DESIGN MAKES NO
21: * REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED, REGARDING
22: * THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY AND ALL IMPLIED
23: * WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE,
24: * OR NON-INFRINGEMENT. PACKET DESIGN DOES NOT WARRANT, GUARANTEE,
25: * OR MAKE ANY REPRESENTATIONS REGARDING THE USE OF, OR THE RESULTS
26: * OF THE USE OF THIS SOFTWARE IN TERMS OF ITS CORRECTNESS, ACCURACY,
27: * RELIABILITY OR OTHERWISE. IN NO EVENT SHALL PACKET DESIGN BE
28: * LIABLE FOR ANY DAMAGES RESULTING FROM OR ARISING OUT OF ANY USE
29: * OF THIS SOFTWARE, INCLUDING WITHOUT LIMITATION, ANY DIRECT,
30: * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE, OR CONSEQUENTIAL
31: * DAMAGES, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES, LOSS OF
32: * USE, DATA OR PROFITS, HOWEVER CAUSED AND UNDER ANY THEORY OF
33: * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
35: * THE USE OF THIS SOFTWARE, EVEN IF PACKET DESIGN IS ADVISED OF
36: * THE POSSIBILITY OF SUCH DAMAGE.
37: *
38: * Author: Archie Cobbs <archie@freebsd.org>
39: */
40:
41: #include <sys/types.h>
42: #include <sys/queue.h>
43: #include <sys/time.h>
44:
45: #include <stdio.h>
46: #include <stdarg.h>
47: #include <string.h>
48: #include <errno.h>
49: #include <assert.h>
50:
51: #include <pthread.h>
52:
53: #include "structs/structs.h"
54: #include "structs/type/array.h"
55:
56: #include "util/mesg_port.h"
57: #include "util/pevent.h"
58: #include "util/typed_mem.h"
59:
60: #include "internal.h"
61:
62: #define MESG_PORT_MAGIC 0xe8a20c14
63:
64: struct mesg {
65: void *data;
66: TAILQ_ENTRY(mesg) next; /* next message in queue */
67: };
68:
69: struct mesg_port {
70: u_int32_t magic; /* magic number */
71: pthread_mutex_t mutex; /* mutex */
72: pthread_cond_t readable; /* condition variable */
73: u_int qlen; /* length of queue */
74: TAILQ_HEAD(,mesg) queue; /* message queue */
75: const char *mtype; /* typed memory type */
76: char mtype_buf[TYPED_MEM_TYPELEN];
77: struct pevent *event; /* associated event, if any */
78: };
79:
80: /*
81: * Create a new message port.
82: */
83: struct mesg_port *
84: mesg_port_create(const char *mtype)
85: {
86: struct mesg_port *port;
87:
88: if ((port = MALLOC(mtype, sizeof(*port))) == NULL)
89: return (NULL);
90: memset(port, 0, sizeof(*port));
91: port->magic = MESG_PORT_MAGIC;
92: if (mtype != NULL) {
93: strlcpy(port->mtype_buf, mtype, sizeof(port->mtype_buf));
94: port->mtype = port->mtype_buf;
95: }
96: TAILQ_INIT(&port->queue);
97: if ((errno = pthread_mutex_init(&port->mutex, NULL)) != 0) {
98: FREE(mtype, port);
99: return (NULL);
100: }
101: if ((errno = pthread_cond_init(&port->readable, NULL)) != 0) {
102: pthread_mutex_destroy(&port->mutex);
103: FREE(mtype, port);
104: return (NULL);
105: }
106: return (port);
107: }
108:
109: /*
110: * Destroy a message port.
111: */
112: void
113: mesg_port_destroy(struct mesg_port **portp)
114: {
115: struct mesg_port *const port = *portp;
116:
117: if (port == NULL)
118: return;
119: *portp = NULL;
120: assert(port->magic == MESG_PORT_MAGIC);
121: assert(port->qlen == 0);
122: if (port->event != NULL)
123: _pevent_unref(port->event);
124: pthread_cond_destroy(&port->readable);
125: pthread_mutex_destroy(&port->mutex);
126: port->magic = 0;
127: FREE(port->mtype, port);
128: }
129:
130: /*
131: * Send a message down a message port.
132: */
133: int
134: mesg_port_put(struct mesg_port *port, void *data)
135: {
136: struct pevent *ev = NULL;
137: struct mesg *mesg;
138: int r;
139:
140: /* Sanity check */
141: if (data == NULL) {
142: errno = EINVAL;
143: return (-1);
144: }
145:
146: /* Get new mesg */
147: if ((mesg = MALLOC(port->mtype, sizeof(*mesg))) == NULL)
148: return (-1);
149: memset(mesg, 0, sizeof(*mesg));
150: mesg->data = data;
151:
152: /* Add message to the queue */
153: r = pthread_mutex_lock(&port->mutex);
154: assert(r == 0);
155: assert(port->magic == MESG_PORT_MAGIC);
156: TAILQ_INSERT_TAIL(&port->queue, mesg, next);
157:
158: /* Notify waiters and grab associated event */
159: if (port->qlen++ == 0) {
160: pthread_cond_signal(&port->readable);
161: ev = port->event;
162: port->event = NULL;
163: }
164:
165: /* Unlock port */
166: r = pthread_mutex_unlock(&port->mutex);
167: assert(r == 0);
168:
169: /* Trigger the event, if any */
170: if (ev != NULL) {
171: pevent_trigger(ev);
172: _pevent_unref(ev);
173: }
174:
175: /* Done */
176: return (0);
177: }
178:
179: /*
180: * Read the next message from a message port.
181: */
182: void *
183: mesg_port_get(struct mesg_port *port, int timeout)
184: {
185: struct timespec waketime;
186: struct mesg *mesg;
187: void *data = NULL;
188: int r;
189:
190: /* Get waketime, which is absolute */
191: if (timeout > 0) {
192: struct timeval waketimeval;
193: struct timeval tv;
194:
195: tv.tv_sec = timeout / 1000;
196: tv.tv_usec = timeout % 1000;
197: (void)gettimeofday(&waketimeval, NULL);
198: timeradd(&waketimeval, &tv, &waketimeval);
199: TIMEVAL_TO_TIMESPEC(&waketimeval, &waketime);
200: }
201:
202: /* Grab mutex, but release if thread is canceled */
203: r = pthread_mutex_lock(&port->mutex);
204: assert(r == 0);
205: pthread_cleanup_push((void (*)(void *))pthread_mutex_unlock,
206: &port->mutex);
207: assert(port->magic == MESG_PORT_MAGIC);
208:
209: /* Wait for the queue to be non-empty */
210: while (port->qlen == 0) {
211:
212: /* Sleep on condition variable with optional timeout */
213: if (timeout == 0)
214: errno = ETIMEDOUT;
215: else if (timeout > 0) {
216: errno = pthread_cond_timedwait(&port->readable,
217: &port->mutex, &waketime);
218: } else {
219: errno = pthread_cond_wait(&port->readable,
220: &port->mutex);
221: }
222:
223: /* Sanity check */
224: assert(port->magic == MESG_PORT_MAGIC);
225:
226: /* Bail if there was an error */
227: if (errno != 0)
228: goto done;
229: }
230:
231: /* Remove next message after grabbing its payload */
232: mesg = TAILQ_FIRST(&port->queue);
233: TAILQ_REMOVE(&port->queue, mesg, next);
234: port->qlen--;
235: data = mesg->data;
236: FREE(port->mtype, mesg);
237:
238: /* Signal next reader in line, if any */
239: if (port->qlen > 0)
240: pthread_cond_signal(&port->readable);
241:
242: done:;
243: /* Done */
244: pthread_cleanup_pop(1);
245: return (data);
246: }
247:
248: /*
249: * Get the number of messages queued on a message port.
250: */
251: u_int
252: mesg_port_qlen(struct mesg_port *port)
253: {
254: u_int qlen;
255: int r;
256:
257: r = pthread_mutex_lock(&port->mutex);
258: assert(r == 0);
259: assert(port->magic == MESG_PORT_MAGIC);
260: qlen = port->qlen;
261: r = pthread_mutex_unlock(&port->mutex);
262: assert(r == 0);
263: return (qlen);
264: }
265:
266: /*
267: * Set the associated pevent.
268: */
269: int
270: _mesg_port_set_event(struct mesg_port *port, struct pevent *ev)
271: {
272: int rtn;
273: int r;
274:
275: /* Lock port */
276: r = pthread_mutex_lock(&port->mutex);
277: assert(r == 0);
278:
279: /* Sanity check */
280: assert(port->magic == MESG_PORT_MAGIC);
281: assert(ev != NULL);
282:
283: /* Check if event already there; if so, and it's obsolete, nuke it */
284: if (port->event != NULL) {
285: if (!_pevent_canceled(port->event)) {
286: errno = EBUSY;
287: rtn = -1;
288: goto done;
289: }
290: _pevent_unref(port->event);
291: port->event = NULL;
292: }
293:
294: /* Remember event */
295: port->event = ev;
296: rtn = 0;
297:
298: done:
299: /* Unlock port */
300: r = pthread_mutex_unlock(&port->mutex);
301: assert(r == 0);
302: return (rtn);
303: }
304:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>