Annotation of embedaddon/nginx/src/event/ngx_event_pipe.c, revision 1.1.1.1
1.1 misho 1:
2: /*
3: * Copyright (C) Igor Sysoev
4: * Copyright (C) Nginx, Inc.
5: */
6:
7:
8: #include <ngx_config.h>
9: #include <ngx_core.h>
10: #include <ngx_event.h>
11: #include <ngx_event_pipe.h>
12:
13:
14: static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
15: static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
16:
17: static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
18: static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
19: static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
20:
21:
22: ngx_int_t
23: ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
24: {
25: u_int flags;
26: ngx_int_t rc;
27: ngx_event_t *rev, *wev;
28:
29: for ( ;; ) {
30: if (do_write) {
31: p->log->action = "sending to client";
32:
33: rc = ngx_event_pipe_write_to_downstream(p);
34:
35: if (rc == NGX_ABORT) {
36: return NGX_ABORT;
37: }
38:
39: if (rc == NGX_BUSY) {
40: return NGX_OK;
41: }
42: }
43:
44: p->read = 0;
45: p->upstream_blocked = 0;
46:
47: p->log->action = "reading upstream";
48:
49: if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
50: return NGX_ABORT;
51: }
52:
53: if (!p->read && !p->upstream_blocked) {
54: break;
55: }
56:
57: do_write = 1;
58: }
59:
60: if (p->upstream->fd != -1) {
61: rev = p->upstream->read;
62:
63: flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
64:
65: if (ngx_handle_read_event(rev, flags) != NGX_OK) {
66: return NGX_ABORT;
67: }
68:
69: if (rev->active && !rev->ready) {
70: ngx_add_timer(rev, p->read_timeout);
71:
72: } else if (rev->timer_set) {
73: ngx_del_timer(rev);
74: }
75: }
76:
77: if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
78: wev = p->downstream->write;
79: if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
80: return NGX_ABORT;
81: }
82:
83: if (!wev->delayed) {
84: if (wev->active && !wev->ready) {
85: ngx_add_timer(wev, p->send_timeout);
86:
87: } else if (wev->timer_set) {
88: ngx_del_timer(wev);
89: }
90: }
91: }
92:
93: return NGX_OK;
94: }
95:
96:
97: static ngx_int_t
98: ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
99: {
100: ssize_t n, size;
101: ngx_int_t rc;
102: ngx_buf_t *b;
103: ngx_chain_t *chain, *cl, *ln;
104:
105: if (p->upstream_eof || p->upstream_error || p->upstream_done) {
106: return NGX_OK;
107: }
108:
109: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
110: "pipe read upstream: %d", p->upstream->read->ready);
111:
112: for ( ;; ) {
113:
114: if (p->upstream_eof || p->upstream_error || p->upstream_done) {
115: break;
116: }
117:
118: if (p->preread_bufs == NULL && !p->upstream->read->ready) {
119: break;
120: }
121:
122: if (p->preread_bufs) {
123:
124: /* use the pre-read bufs if they exist */
125:
126: chain = p->preread_bufs;
127: p->preread_bufs = NULL;
128: n = p->preread_size;
129:
130: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
131: "pipe preread: %z", n);
132:
133: if (n) {
134: p->read = 1;
135: }
136:
137: } else {
138:
139: #if (NGX_HAVE_KQUEUE)
140:
141: /*
142: * kqueue notifies about the end of file or a pending error.
143: * This test allows not to allocate a buf on these conditions
144: * and not to call c->recv_chain().
145: */
146:
147: if (p->upstream->read->available == 0
148: && p->upstream->read->pending_eof)
149: {
150: p->upstream->read->ready = 0;
151: p->upstream->read->eof = 1;
152: p->upstream_eof = 1;
153: p->read = 1;
154:
155: if (p->upstream->read->kq_errno) {
156: p->upstream->read->error = 1;
157: p->upstream_error = 1;
158: p->upstream_eof = 0;
159:
160: ngx_log_error(NGX_LOG_ERR, p->log,
161: p->upstream->read->kq_errno,
162: "kevent() reported that upstream "
163: "closed connection");
164: }
165:
166: break;
167: }
168: #endif
169:
170: if (p->free_raw_bufs) {
171:
172: /* use the free bufs if they exist */
173:
174: chain = p->free_raw_bufs;
175: if (p->single_buf) {
176: p->free_raw_bufs = p->free_raw_bufs->next;
177: chain->next = NULL;
178: } else {
179: p->free_raw_bufs = NULL;
180: }
181:
182: } else if (p->allocated < p->bufs.num) {
183:
184: /* allocate a new buf if it's still allowed */
185:
186: b = ngx_create_temp_buf(p->pool, p->bufs.size);
187: if (b == NULL) {
188: return NGX_ABORT;
189: }
190:
191: p->allocated++;
192:
193: chain = ngx_alloc_chain_link(p->pool);
194: if (chain == NULL) {
195: return NGX_ABORT;
196: }
197:
198: chain->buf = b;
199: chain->next = NULL;
200:
201: } else if (!p->cacheable
202: && p->downstream->data == p->output_ctx
203: && p->downstream->write->ready
204: && !p->downstream->write->delayed)
205: {
206: /*
207: * if the bufs are not needed to be saved in a cache and
208: * a downstream is ready then write the bufs to a downstream
209: */
210:
211: p->upstream_blocked = 1;
212:
213: ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
214: "pipe downstream ready");
215:
216: break;
217:
218: } else if (p->cacheable
219: || p->temp_file->offset < p->max_temp_file_size)
220: {
221:
222: /*
223: * if it is allowed, then save some bufs from r->in
224: * to a temporary file, and add them to a r->out chain
225: */
226:
227: rc = ngx_event_pipe_write_chain_to_temp_file(p);
228:
229: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
230: "pipe temp offset: %O", p->temp_file->offset);
231:
232: if (rc == NGX_BUSY) {
233: break;
234: }
235:
236: if (rc == NGX_AGAIN) {
237: if (ngx_event_flags & NGX_USE_LEVEL_EVENT
238: && p->upstream->read->active
239: && p->upstream->read->ready)
240: {
241: if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
242: == NGX_ERROR)
243: {
244: return NGX_ABORT;
245: }
246: }
247: }
248:
249: if (rc != NGX_OK) {
250: return rc;
251: }
252:
253: chain = p->free_raw_bufs;
254: if (p->single_buf) {
255: p->free_raw_bufs = p->free_raw_bufs->next;
256: chain->next = NULL;
257: } else {
258: p->free_raw_bufs = NULL;
259: }
260:
261: } else {
262:
263: /* there are no bufs to read in */
264:
265: ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
266: "no pipe bufs to read in");
267:
268: break;
269: }
270:
271: n = p->upstream->recv_chain(p->upstream, chain);
272:
273: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
274: "pipe recv chain: %z", n);
275:
276: if (p->free_raw_bufs) {
277: chain->next = p->free_raw_bufs;
278: }
279: p->free_raw_bufs = chain;
280:
281: if (n == NGX_ERROR) {
282: p->upstream_error = 1;
283: return NGX_ERROR;
284: }
285:
286: if (n == NGX_AGAIN) {
287: if (p->single_buf) {
288: ngx_event_pipe_remove_shadow_links(chain->buf);
289: }
290:
291: break;
292: }
293:
294: p->read = 1;
295:
296: if (n == 0) {
297: p->upstream_eof = 1;
298: break;
299: }
300: }
301:
302: p->read_length += n;
303: cl = chain;
304: p->free_raw_bufs = NULL;
305:
306: while (cl && n > 0) {
307:
308: ngx_event_pipe_remove_shadow_links(cl->buf);
309:
310: size = cl->buf->end - cl->buf->last;
311:
312: if (n >= size) {
313: cl->buf->last = cl->buf->end;
314:
315: /* STUB */ cl->buf->num = p->num++;
316:
317: if (p->input_filter(p, cl->buf) == NGX_ERROR) {
318: return NGX_ABORT;
319: }
320:
321: n -= size;
322: ln = cl;
323: cl = cl->next;
324: ngx_free_chain(p->pool, ln);
325:
326: } else {
327: cl->buf->last += n;
328: n = 0;
329: }
330: }
331:
332: if (cl) {
333: for (ln = cl; ln->next; ln = ln->next) { /* void */ }
334:
335: ln->next = p->free_raw_bufs;
336: p->free_raw_bufs = cl;
337: }
338: }
339:
340: #if (NGX_DEBUG)
341:
342: for (cl = p->busy; cl; cl = cl->next) {
343: ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
344: "pipe buf busy s:%d t:%d f:%d "
345: "%p, pos %p, size: %z "
346: "file: %O, size: %z",
347: (cl->buf->shadow ? 1 : 0),
348: cl->buf->temporary, cl->buf->in_file,
349: cl->buf->start, cl->buf->pos,
350: cl->buf->last - cl->buf->pos,
351: cl->buf->file_pos,
352: cl->buf->file_last - cl->buf->file_pos);
353: }
354:
355: for (cl = p->out; cl; cl = cl->next) {
356: ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
357: "pipe buf out s:%d t:%d f:%d "
358: "%p, pos %p, size: %z "
359: "file: %O, size: %z",
360: (cl->buf->shadow ? 1 : 0),
361: cl->buf->temporary, cl->buf->in_file,
362: cl->buf->start, cl->buf->pos,
363: cl->buf->last - cl->buf->pos,
364: cl->buf->file_pos,
365: cl->buf->file_last - cl->buf->file_pos);
366: }
367:
368: for (cl = p->in; cl; cl = cl->next) {
369: ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
370: "pipe buf in s:%d t:%d f:%d "
371: "%p, pos %p, size: %z "
372: "file: %O, size: %z",
373: (cl->buf->shadow ? 1 : 0),
374: cl->buf->temporary, cl->buf->in_file,
375: cl->buf->start, cl->buf->pos,
376: cl->buf->last - cl->buf->pos,
377: cl->buf->file_pos,
378: cl->buf->file_last - cl->buf->file_pos);
379: }
380:
381: for (cl = p->free_raw_bufs; cl; cl = cl->next) {
382: ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
383: "pipe buf free s:%d t:%d f:%d "
384: "%p, pos %p, size: %z "
385: "file: %O, size: %z",
386: (cl->buf->shadow ? 1 : 0),
387: cl->buf->temporary, cl->buf->in_file,
388: cl->buf->start, cl->buf->pos,
389: cl->buf->last - cl->buf->pos,
390: cl->buf->file_pos,
391: cl->buf->file_last - cl->buf->file_pos);
392: }
393:
394: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
395: "pipe length: %O", p->length);
396:
397: #endif
398:
399: if (p->free_raw_bufs && p->length != -1) {
400: cl = p->free_raw_bufs;
401:
402: if (cl->buf->last - cl->buf->pos >= p->length) {
403:
404: p->free_raw_bufs = cl->next;
405:
406: /* STUB */ cl->buf->num = p->num++;
407:
408: if (p->input_filter(p, cl->buf) == NGX_ERROR) {
409: return NGX_ABORT;
410: }
411:
412: ngx_free_chain(p->pool, cl);
413: }
414: }
415:
416: if (p->length == 0) {
417: p->upstream_done = 1;
418: p->read = 1;
419: }
420:
421: if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
422:
423: /* STUB */ p->free_raw_bufs->buf->num = p->num++;
424:
425: if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
426: return NGX_ABORT;
427: }
428:
429: p->free_raw_bufs = p->free_raw_bufs->next;
430:
431: if (p->free_bufs && p->buf_to_file == NULL) {
432: for (cl = p->free_raw_bufs; cl; cl = cl->next) {
433: if (cl->buf->shadow == NULL) {
434: ngx_pfree(p->pool, cl->buf->start);
435: }
436: }
437: }
438: }
439:
440: if (p->cacheable && p->in) {
441: if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
442: return NGX_ABORT;
443: }
444: }
445:
446: return NGX_OK;
447: }
448:
449:
450: static ngx_int_t
451: ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
452: {
453: u_char *prev;
454: size_t bsize;
455: ngx_int_t rc;
456: ngx_uint_t flush, flushed, prev_last_shadow;
457: ngx_chain_t *out, **ll, *cl, file;
458: ngx_connection_t *downstream;
459:
460: downstream = p->downstream;
461:
462: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
463: "pipe write downstream: %d", downstream->write->ready);
464:
465: flushed = 0;
466:
467: for ( ;; ) {
468: if (p->downstream_error) {
469: return ngx_event_pipe_drain_chains(p);
470: }
471:
472: if (p->upstream_eof || p->upstream_error || p->upstream_done) {
473:
474: /* pass the p->out and p->in chains to the output filter */
475:
476: for (cl = p->busy; cl; cl = cl->next) {
477: cl->buf->recycled = 0;
478: }
479:
480: if (p->out) {
481: ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
482: "pipe write downstream flush out");
483:
484: for (cl = p->out; cl; cl = cl->next) {
485: cl->buf->recycled = 0;
486: }
487:
488: rc = p->output_filter(p->output_ctx, p->out);
489:
490: if (rc == NGX_ERROR) {
491: p->downstream_error = 1;
492: return ngx_event_pipe_drain_chains(p);
493: }
494:
495: p->out = NULL;
496: }
497:
498: if (p->in) {
499: ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
500: "pipe write downstream flush in");
501:
502: for (cl = p->in; cl; cl = cl->next) {
503: cl->buf->recycled = 0;
504: }
505:
506: rc = p->output_filter(p->output_ctx, p->in);
507:
508: if (rc == NGX_ERROR) {
509: p->downstream_error = 1;
510: return ngx_event_pipe_drain_chains(p);
511: }
512:
513: p->in = NULL;
514: }
515:
516: if (p->cacheable && p->buf_to_file) {
517:
518: file.buf = p->buf_to_file;
519: file.next = NULL;
520:
521: if (ngx_write_chain_to_temp_file(p->temp_file, &file)
522: == NGX_ERROR)
523: {
524: return NGX_ABORT;
525: }
526: }
527:
528: ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
529: "pipe write downstream done");
530:
531: /* TODO: free unused bufs */
532:
533: p->downstream_done = 1;
534: break;
535: }
536:
537: if (downstream->data != p->output_ctx
538: || !downstream->write->ready
539: || downstream->write->delayed)
540: {
541: break;
542: }
543:
544: /* bsize is the size of the busy recycled bufs */
545:
546: prev = NULL;
547: bsize = 0;
548:
549: for (cl = p->busy; cl; cl = cl->next) {
550:
551: if (cl->buf->recycled) {
552: if (prev == cl->buf->start) {
553: continue;
554: }
555:
556: bsize += cl->buf->end - cl->buf->start;
557: prev = cl->buf->start;
558: }
559: }
560:
561: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
562: "pipe write busy: %uz", bsize);
563:
564: out = NULL;
565:
566: if (bsize >= (size_t) p->busy_size) {
567: flush = 1;
568: goto flush;
569: }
570:
571: flush = 0;
572: ll = NULL;
573: prev_last_shadow = 1;
574:
575: for ( ;; ) {
576: if (p->out) {
577: cl = p->out;
578:
579: if (cl->buf->recycled) {
580: ngx_log_error(NGX_LOG_ALERT, p->log, 0,
581: "recycled buffer in pipe out chain");
582: }
583:
584: p->out = p->out->next;
585:
586: } else if (!p->cacheable && p->in) {
587: cl = p->in;
588:
589: ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
590: "pipe write buf ls:%d %p %z",
591: cl->buf->last_shadow,
592: cl->buf->pos,
593: cl->buf->last - cl->buf->pos);
594:
595: if (cl->buf->recycled && prev_last_shadow) {
596: if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
597: flush = 1;
598: break;
599: }
600:
601: bsize += cl->buf->end - cl->buf->start;
602: }
603:
604: prev_last_shadow = cl->buf->last_shadow;
605:
606: p->in = p->in->next;
607:
608: } else {
609: break;
610: }
611:
612: cl->next = NULL;
613:
614: if (out) {
615: *ll = cl;
616: } else {
617: out = cl;
618: }
619: ll = &cl->next;
620: }
621:
622: flush:
623:
624: ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
625: "pipe write: out:%p, f:%d", out, flush);
626:
627: if (out == NULL) {
628:
629: if (!flush) {
630: break;
631: }
632:
633: /* a workaround for AIO */
634: if (flushed++ > 10) {
635: return NGX_BUSY;
636: }
637: }
638:
639: rc = p->output_filter(p->output_ctx, out);
640:
641: ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
642:
643: if (rc == NGX_ERROR) {
644: p->downstream_error = 1;
645: return ngx_event_pipe_drain_chains(p);
646: }
647:
648: for (cl = p->free; cl; cl = cl->next) {
649:
650: if (cl->buf->temp_file) {
651: if (p->cacheable || !p->cyclic_temp_file) {
652: continue;
653: }
654:
655: /* reset p->temp_offset if all bufs had been sent */
656:
657: if (cl->buf->file_last == p->temp_file->offset) {
658: p->temp_file->offset = 0;
659: }
660: }
661:
662: /* TODO: free buf if p->free_bufs && upstream done */
663:
664: /* add the free shadow raw buf to p->free_raw_bufs */
665:
666: if (cl->buf->last_shadow) {
667: if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
668: return NGX_ABORT;
669: }
670:
671: cl->buf->last_shadow = 0;
672: }
673:
674: cl->buf->shadow = NULL;
675: }
676: }
677:
678: return NGX_OK;
679: }
680:
681:
682: static ngx_int_t
683: ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
684: {
685: ssize_t size, bsize, n;
686: ngx_buf_t *b;
687: ngx_uint_t prev_last_shadow;
688: ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
689:
690: if (p->buf_to_file) {
691: fl.buf = p->buf_to_file;
692: fl.next = p->in;
693: out = &fl;
694:
695: } else {
696: out = p->in;
697: }
698:
699: if (!p->cacheable) {
700:
701: size = 0;
702: cl = out;
703: ll = NULL;
704: prev_last_shadow = 1;
705:
706: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
707: "pipe offset: %O", p->temp_file->offset);
708:
709: do {
710: bsize = cl->buf->last - cl->buf->pos;
711:
712: ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
713: "pipe buf ls:%d %p, pos %p, size: %z",
714: cl->buf->last_shadow, cl->buf->start,
715: cl->buf->pos, bsize);
716:
717: if (prev_last_shadow
718: && ((size + bsize > p->temp_file_write_size)
719: || (p->temp_file->offset + size + bsize
720: > p->max_temp_file_size)))
721: {
722: break;
723: }
724:
725: prev_last_shadow = cl->buf->last_shadow;
726:
727: size += bsize;
728: ll = &cl->next;
729: cl = cl->next;
730:
731: } while (cl);
732:
733: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
734:
735: if (ll == NULL) {
736: return NGX_BUSY;
737: }
738:
739: if (cl) {
740: p->in = cl;
741: *ll = NULL;
742:
743: } else {
744: p->in = NULL;
745: p->last_in = &p->in;
746: }
747:
748: } else {
749: p->in = NULL;
750: p->last_in = &p->in;
751: }
752:
753: n = ngx_write_chain_to_temp_file(p->temp_file, out);
754:
755: if (n == NGX_ERROR) {
756: return NGX_ABORT;
757: }
758:
759: if (p->buf_to_file) {
760: p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
761: n -= p->buf_to_file->last - p->buf_to_file->pos;
762: p->buf_to_file = NULL;
763: out = out->next;
764: }
765:
766: if (n > 0) {
767: /* update previous buffer or add new buffer */
768:
769: if (p->out) {
770: for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
771:
772: b = cl->buf;
773:
774: if (b->file_last == p->temp_file->offset) {
775: p->temp_file->offset += n;
776: b->file_last = p->temp_file->offset;
777: goto free;
778: }
779:
780: last_out = &cl->next;
781:
782: } else {
783: last_out = &p->out;
784: }
785:
786: cl = ngx_chain_get_free_buf(p->pool, &p->free);
787: if (cl == NULL) {
788: return NGX_ABORT;
789: }
790:
791: b = cl->buf;
792:
793: ngx_memzero(b, sizeof(ngx_buf_t));
794:
795: b->tag = p->tag;
796:
797: b->file = &p->temp_file->file;
798: b->file_pos = p->temp_file->offset;
799: p->temp_file->offset += n;
800: b->file_last = p->temp_file->offset;
801:
802: b->in_file = 1;
803: b->temp_file = 1;
804:
805: *last_out = cl;
806: }
807:
808: free:
809:
810: for (last_free = &p->free_raw_bufs;
811: *last_free != NULL;
812: last_free = &(*last_free)->next)
813: {
814: /* void */
815: }
816:
817: for (cl = out; cl; cl = next) {
818: next = cl->next;
819:
820: cl->next = p->free;
821: p->free = cl;
822:
823: b = cl->buf;
824:
825: if (b->last_shadow) {
826:
827: tl = ngx_alloc_chain_link(p->pool);
828: if (tl == NULL) {
829: return NGX_ABORT;
830: }
831:
832: tl->buf = b->shadow;
833: tl->next = NULL;
834:
835: *last_free = tl;
836: last_free = &tl->next;
837:
838: b->shadow->pos = b->shadow->start;
839: b->shadow->last = b->shadow->start;
840:
841: ngx_event_pipe_remove_shadow_links(b->shadow);
842: }
843: }
844:
845: return NGX_OK;
846: }
847:
848:
849: /* the copy input filter */
850:
851: ngx_int_t
852: ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
853: {
854: ngx_buf_t *b;
855: ngx_chain_t *cl;
856:
857: if (buf->pos == buf->last) {
858: return NGX_OK;
859: }
860:
861: if (p->free) {
862: cl = p->free;
863: b = cl->buf;
864: p->free = cl->next;
865: ngx_free_chain(p->pool, cl);
866:
867: } else {
868: b = ngx_alloc_buf(p->pool);
869: if (b == NULL) {
870: return NGX_ERROR;
871: }
872: }
873:
874: ngx_memcpy(b, buf, sizeof(ngx_buf_t));
875: b->shadow = buf;
876: b->tag = p->tag;
877: b->last_shadow = 1;
878: b->recycled = 1;
879: buf->shadow = b;
880:
881: cl = ngx_alloc_chain_link(p->pool);
882: if (cl == NULL) {
883: return NGX_ERROR;
884: }
885:
886: cl->buf = b;
887: cl->next = NULL;
888:
889: ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
890:
891: if (p->in) {
892: *p->last_in = cl;
893: } else {
894: p->in = cl;
895: }
896: p->last_in = &cl->next;
897:
898: if (p->length == -1) {
899: return NGX_OK;
900: }
901:
902: p->length -= b->last - b->pos;
903:
904: return NGX_OK;
905: }
906:
907:
908: static ngx_inline void
909: ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
910: {
911: ngx_buf_t *b, *next;
912:
913: b = buf->shadow;
914:
915: if (b == NULL) {
916: return;
917: }
918:
919: while (!b->last_shadow) {
920: next = b->shadow;
921:
922: b->temporary = 0;
923: b->recycled = 0;
924:
925: b->shadow = NULL;
926: b = next;
927: }
928:
929: b->temporary = 0;
930: b->recycled = 0;
931: b->last_shadow = 0;
932:
933: b->shadow = NULL;
934:
935: buf->shadow = NULL;
936: }
937:
938:
939: ngx_int_t
940: ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
941: {
942: ngx_chain_t *cl;
943:
944: cl = ngx_alloc_chain_link(p->pool);
945: if (cl == NULL) {
946: return NGX_ERROR;
947: }
948:
949: if (p->buf_to_file && b->start == p->buf_to_file->start) {
950: b->pos = p->buf_to_file->last;
951: b->last = p->buf_to_file->last;
952:
953: } else {
954: b->pos = b->start;
955: b->last = b->start;
956: }
957:
958: b->shadow = NULL;
959:
960: cl->buf = b;
961:
962: if (p->free_raw_bufs == NULL) {
963: p->free_raw_bufs = cl;
964: cl->next = NULL;
965:
966: return NGX_OK;
967: }
968:
969: if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
970:
971: /* add the free buf to the list start */
972:
973: cl->next = p->free_raw_bufs;
974: p->free_raw_bufs = cl;
975:
976: return NGX_OK;
977: }
978:
979: /* the first free buf is partially filled, thus add the free buf after it */
980:
981: cl->next = p->free_raw_bufs->next;
982: p->free_raw_bufs->next = cl;
983:
984: return NGX_OK;
985: }
986:
987:
988: static ngx_int_t
989: ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
990: {
991: ngx_chain_t *cl, *tl;
992:
993: for ( ;; ) {
994: if (p->busy) {
995: cl = p->busy;
996: p->busy = NULL;
997:
998: } else if (p->out) {
999: cl = p->out;
1000: p->out = NULL;
1001:
1002: } else if (p->in) {
1003: cl = p->in;
1004: p->in = NULL;
1005:
1006: } else {
1007: return NGX_OK;
1008: }
1009:
1010: while (cl) {
1011: if (cl->buf->last_shadow) {
1012: if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
1013: return NGX_ABORT;
1014: }
1015:
1016: cl->buf->last_shadow = 0;
1017: }
1018:
1019: cl->buf->shadow = NULL;
1020: tl = cl->next;
1021: cl->next = p->free;
1022: p->free = cl;
1023: cl = tl;
1024: }
1025: }
1026: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>