Annotation of embedaddon/bird2/proto/bfd/io.c, revision 1.1.1.1
1.1 misho 1: /*
2: * BIRD -- I/O and event loop
3: *
4: * Can be freely distributed and used under the terms of the GNU GPL.
5: */
6:
7: #include <stdio.h>
8: #include <stdlib.h>
9: #include <unistd.h>
10: #include <errno.h>
11: #include <fcntl.h>
12: #include <poll.h>
13: #include <pthread.h>
14: #include <time.h>
15: #include <sys/time.h>
16:
17: #include "nest/bird.h"
18: #include "proto/bfd/io.h"
19:
20: #include "lib/buffer.h"
21: #include "lib/lists.h"
22: #include "lib/resource.h"
23: #include "lib/event.h"
24: #include "lib/timer.h"
25: #include "lib/socket.h"
26:
27:
28: struct birdloop
29: {
30: pool *pool;
31: pthread_t thread;
32: pthread_mutex_t mutex;
33:
34: u8 stop_called;
35: u8 poll_active;
36: u8 wakeup_masked;
37: int wakeup_fds[2];
38:
39: struct timeloop time;
40: list event_list;
41: list sock_list;
42: uint sock_num;
43:
44: BUFFER(sock *) poll_sk;
45: BUFFER(struct pollfd) poll_fd;
46: u8 poll_changed;
47: u8 close_scheduled;
48: };
49:
50:
51: /*
52: * Current thread context
53: */
54:
55: static pthread_key_t current_loop_key;
56: extern pthread_key_t current_time_key;
57:
58: static inline struct birdloop *
59: birdloop_current(void)
60: {
61: return pthread_getspecific(current_loop_key);
62: }
63:
64: static inline void
65: birdloop_set_current(struct birdloop *loop)
66: {
67: pthread_setspecific(current_loop_key, loop);
68: pthread_setspecific(current_time_key, loop ? &loop->time : &main_timeloop);
69: }
70:
71: static inline void
72: birdloop_init_current(void)
73: {
74: pthread_key_create(¤t_loop_key, NULL);
75: }
76:
77:
78: /*
79: * Wakeup code for birdloop
80: */
81:
82: static void
83: pipe_new(int *pfds)
84: {
85: int rv = pipe(pfds);
86: if (rv < 0)
87: die("pipe: %m");
88:
89: if (fcntl(pfds[0], F_SETFL, O_NONBLOCK) < 0)
90: die("fcntl(O_NONBLOCK): %m");
91:
92: if (fcntl(pfds[1], F_SETFL, O_NONBLOCK) < 0)
93: die("fcntl(O_NONBLOCK): %m");
94: }
95:
96: void
97: pipe_drain(int fd)
98: {
99: char buf[64];
100: int rv;
101:
102: try:
103: rv = read(fd, buf, 64);
104: if (rv < 0)
105: {
106: if (errno == EINTR)
107: goto try;
108: if (errno == EAGAIN)
109: return;
110: die("wakeup read: %m");
111: }
112: if (rv == 64)
113: goto try;
114: }
115:
116: void
117: pipe_kick(int fd)
118: {
119: u64 v = 1;
120: int rv;
121:
122: try:
123: rv = write(fd, &v, sizeof(u64));
124: if (rv < 0)
125: {
126: if (errno == EINTR)
127: goto try;
128: if (errno == EAGAIN)
129: return;
130: die("wakeup write: %m");
131: }
132: }
133:
134: static inline void
135: wakeup_init(struct birdloop *loop)
136: {
137: pipe_new(loop->wakeup_fds);
138: }
139:
140: static inline void
141: wakeup_drain(struct birdloop *loop)
142: {
143: pipe_drain(loop->wakeup_fds[0]);
144: }
145:
146: static inline void
147: wakeup_do_kick(struct birdloop *loop)
148: {
149: pipe_kick(loop->wakeup_fds[1]);
150: }
151:
152: static inline void
153: wakeup_kick(struct birdloop *loop)
154: {
155: if (!loop->wakeup_masked)
156: wakeup_do_kick(loop);
157: else
158: loop->wakeup_masked = 2;
159: }
160:
161: /* For notifications from outside */
162: void
163: wakeup_kick_current(void)
164: {
165: struct birdloop *loop = birdloop_current();
166:
167: if (loop && loop->poll_active)
168: wakeup_kick(loop);
169: }
170:
171:
172: /*
173: * Events
174: */
175:
176: static inline uint
177: events_waiting(struct birdloop *loop)
178: {
179: return !EMPTY_LIST(loop->event_list);
180: }
181:
182: static inline void
183: events_init(struct birdloop *loop)
184: {
185: init_list(&loop->event_list);
186: }
187:
188: static void
189: events_fire(struct birdloop *loop)
190: {
191: times_update(&loop->time);
192: ev_run_list(&loop->event_list);
193: }
194:
195: void
196: ev2_schedule(event *e)
197: {
198: struct birdloop *loop = birdloop_current();
199:
200: if (loop->poll_active && EMPTY_LIST(loop->event_list))
201: wakeup_kick(loop);
202:
203: if (e->n.next)
204: rem_node(&e->n);
205:
206: add_tail(&loop->event_list, &e->n);
207: }
208:
209:
210: /*
211: * Sockets
212: */
213:
214: static void
215: sockets_init(struct birdloop *loop)
216: {
217: init_list(&loop->sock_list);
218: loop->sock_num = 0;
219:
220: BUFFER_INIT(loop->poll_sk, loop->pool, 4);
221: BUFFER_INIT(loop->poll_fd, loop->pool, 4);
222: loop->poll_changed = 1; /* add wakeup fd */
223: }
224:
225: static void
226: sockets_add(struct birdloop *loop, sock *s)
227: {
228: add_tail(&loop->sock_list, &s->n);
229: loop->sock_num++;
230:
231: s->index = -1;
232: loop->poll_changed = 1;
233:
234: if (loop->poll_active)
235: wakeup_kick(loop);
236: }
237:
238: void
239: sk_start(sock *s)
240: {
241: struct birdloop *loop = birdloop_current();
242:
243: sockets_add(loop, s);
244: }
245:
246: static void
247: sockets_remove(struct birdloop *loop, sock *s)
248: {
249: rem_node(&s->n);
250: loop->sock_num--;
251:
252: if (s->index >= 0)
253: loop->poll_sk.data[s->index] = NULL;
254:
255: s->index = -1;
256: loop->poll_changed = 1;
257:
258: /* Wakeup moved to sk_stop() */
259: }
260:
261: void
262: sk_stop(sock *s)
263: {
264: struct birdloop *loop = birdloop_current();
265:
266: sockets_remove(loop, s);
267:
268: if (loop->poll_active)
269: {
270: loop->close_scheduled = 1;
271: wakeup_kick(loop);
272: }
273: else
274: close(s->fd);
275:
276: s->fd = -1;
277: }
278:
279: static inline uint sk_want_events(sock *s)
280: { return (s->rx_hook ? POLLIN : 0) | ((s->ttx != s->tpos) ? POLLOUT : 0); }
281:
282: /*
283: FIXME: this should be called from sock code
284:
285: static void
286: sockets_update(struct birdloop *loop, sock *s)
287: {
288: if (s->index >= 0)
289: loop->poll_fd.data[s->index].events = sk_want_events(s);
290: }
291: */
292:
293: static void
294: sockets_prepare(struct birdloop *loop)
295: {
296: BUFFER_SET(loop->poll_sk, loop->sock_num + 1);
297: BUFFER_SET(loop->poll_fd, loop->sock_num + 1);
298:
299: struct pollfd *pfd = loop->poll_fd.data;
300: sock **psk = loop->poll_sk.data;
301: uint i = 0;
302: node *n;
303:
304: WALK_LIST(n, loop->sock_list)
305: {
306: sock *s = SKIP_BACK(sock, n, n);
307:
308: ASSERT(i < loop->sock_num);
309:
310: s->index = i;
311: *psk = s;
312: pfd->fd = s->fd;
313: pfd->events = sk_want_events(s);
314: pfd->revents = 0;
315:
316: pfd++;
317: psk++;
318: i++;
319: }
320:
321: ASSERT(i == loop->sock_num);
322:
323: /* Add internal wakeup fd */
324: *psk = NULL;
325: pfd->fd = loop->wakeup_fds[0];
326: pfd->events = POLLIN;
327: pfd->revents = 0;
328:
329: loop->poll_changed = 0;
330: }
331:
332: static void
333: sockets_close_fds(struct birdloop *loop)
334: {
335: struct pollfd *pfd = loop->poll_fd.data;
336: sock **psk = loop->poll_sk.data;
337: int poll_num = loop->poll_fd.used - 1;
338:
339: int i;
340: for (i = 0; i < poll_num; i++)
341: if (psk[i] == NULL)
342: close(pfd[i].fd);
343:
344: loop->close_scheduled = 0;
345: }
346:
347: int sk_read(sock *s, int revents);
348: int sk_write(sock *s);
349:
350: static void
351: sockets_fire(struct birdloop *loop)
352: {
353: struct pollfd *pfd = loop->poll_fd.data;
354: sock **psk = loop->poll_sk.data;
355: int poll_num = loop->poll_fd.used - 1;
356:
357: times_update(&loop->time);
358:
359: /* Last fd is internal wakeup fd */
360: if (pfd[poll_num].revents & POLLIN)
361: wakeup_drain(loop);
362:
363: int i;
364: for (i = 0; i < poll_num; pfd++, psk++, i++)
365: {
366: int e = 1;
367:
368: if (! pfd->revents)
369: continue;
370:
371: if (pfd->revents & POLLNVAL)
372: die("poll: invalid fd %d", pfd->fd);
373:
374: if (pfd->revents & POLLIN)
375: while (e && *psk && (*psk)->rx_hook)
376: e = sk_read(*psk, 0);
377:
378: e = 1;
379: if (pfd->revents & POLLOUT)
380: while (e && *psk)
381: e = sk_write(*psk);
382: }
383: }
384:
385:
386: /*
387: * Birdloop
388: */
389:
390: static void * birdloop_main(void *arg);
391:
392: struct birdloop *
393: birdloop_new(void)
394: {
395: /* FIXME: this init should be elsewhere and thread-safe */
396: static int init = 0;
397: if (!init)
398: { birdloop_init_current(); init = 1; }
399:
400: pool *p = rp_new(NULL, "Birdloop root");
401: struct birdloop *loop = mb_allocz(p, sizeof(struct birdloop));
402: loop->pool = p;
403: pthread_mutex_init(&loop->mutex, NULL);
404:
405: wakeup_init(loop);
406:
407: events_init(loop);
408: timers_init(&loop->time, p);
409: sockets_init(loop);
410:
411: return loop;
412: }
413:
414: void
415: birdloop_start(struct birdloop *loop)
416: {
417: int rv = pthread_create(&loop->thread, NULL, birdloop_main, loop);
418: if (rv)
419: die("pthread_create(): %M", rv);
420: }
421:
422: void
423: birdloop_stop(struct birdloop *loop)
424: {
425: pthread_mutex_lock(&loop->mutex);
426: loop->stop_called = 1;
427: wakeup_do_kick(loop);
428: pthread_mutex_unlock(&loop->mutex);
429:
430: int rv = pthread_join(loop->thread, NULL);
431: if (rv)
432: die("pthread_join(): %M", rv);
433: }
434:
435: void
436: birdloop_free(struct birdloop *loop)
437: {
438: rfree(loop->pool);
439: }
440:
441:
442: void
443: birdloop_enter(struct birdloop *loop)
444: {
445: /* TODO: these functions could save and restore old context */
446: pthread_mutex_lock(&loop->mutex);
447: birdloop_set_current(loop);
448: }
449:
450: void
451: birdloop_leave(struct birdloop *loop)
452: {
453: /* TODO: these functions could save and restore old context */
454: birdloop_set_current(NULL);
455: pthread_mutex_unlock(&loop->mutex);
456: }
457:
458: void
459: birdloop_mask_wakeups(struct birdloop *loop)
460: {
461: pthread_mutex_lock(&loop->mutex);
462: loop->wakeup_masked = 1;
463: pthread_mutex_unlock(&loop->mutex);
464: }
465:
466: void
467: birdloop_unmask_wakeups(struct birdloop *loop)
468: {
469: pthread_mutex_lock(&loop->mutex);
470: if (loop->wakeup_masked == 2)
471: wakeup_do_kick(loop);
472: loop->wakeup_masked = 0;
473: pthread_mutex_unlock(&loop->mutex);
474: }
475:
476: static void *
477: birdloop_main(void *arg)
478: {
479: struct birdloop *loop = arg;
480: timer *t;
481: int rv, timeout;
482:
483: birdloop_set_current(loop);
484:
485: pthread_mutex_lock(&loop->mutex);
486: while (1)
487: {
488: events_fire(loop);
489: timers_fire(&loop->time);
490:
491: times_update(&loop->time);
492: if (events_waiting(loop))
493: timeout = 0;
494: else if (t = timers_first(&loop->time))
495: timeout = (tm_remains(t) TO_MS) + 1;
496: else
497: timeout = -1;
498:
499: if (loop->poll_changed)
500: sockets_prepare(loop);
501:
502: loop->poll_active = 1;
503: pthread_mutex_unlock(&loop->mutex);
504:
505: try:
506: rv = poll(loop->poll_fd.data, loop->poll_fd.used, timeout);
507: if (rv < 0)
508: {
509: if (errno == EINTR || errno == EAGAIN)
510: goto try;
511: die("poll: %m");
512: }
513:
514: pthread_mutex_lock(&loop->mutex);
515: loop->poll_active = 0;
516:
517: if (loop->close_scheduled)
518: sockets_close_fds(loop);
519:
520: if (loop->stop_called)
521: break;
522:
523: if (rv)
524: sockets_fire(loop);
525:
526: timers_fire(&loop->time);
527: }
528:
529: loop->stop_called = 0;
530: pthread_mutex_unlock(&loop->mutex);
531:
532: return NULL;
533: }
534:
535:
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>