Annotation of embedaddon/nginx/src/event/ngx_event_pipe.c, revision 1.1.1.1

1.1       misho       1: 
                      2: /*
                      3:  * Copyright (C) Igor Sysoev
                      4:  * Copyright (C) Nginx, Inc.
                      5:  */
                      6: 
                      7: 
                      8: #include <ngx_config.h>
                      9: #include <ngx_core.h>
                     10: #include <ngx_event.h>
                     11: #include <ngx_event_pipe.h>
                     12: 
                     13: 
                     14: static ngx_int_t ngx_event_pipe_read_upstream(ngx_event_pipe_t *p);
                     15: static ngx_int_t ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p);
                     16: 
                     17: static ngx_int_t ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p);
                     18: static ngx_inline void ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf);
                     19: static ngx_int_t ngx_event_pipe_drain_chains(ngx_event_pipe_t *p);
                     20: 
                     21: 
                     22: ngx_int_t
                     23: ngx_event_pipe(ngx_event_pipe_t *p, ngx_int_t do_write)
                     24: {
                     25:     u_int         flags;
                     26:     ngx_int_t     rc;
                     27:     ngx_event_t  *rev, *wev;
                     28: 
                     29:     for ( ;; ) {
                     30:         if (do_write) {
                     31:             p->log->action = "sending to client";
                     32: 
                     33:             rc = ngx_event_pipe_write_to_downstream(p);
                     34: 
                     35:             if (rc == NGX_ABORT) {
                     36:                 return NGX_ABORT;
                     37:             }
                     38: 
                     39:             if (rc == NGX_BUSY) {
                     40:                 return NGX_OK;
                     41:             }
                     42:         }
                     43: 
                     44:         p->read = 0;
                     45:         p->upstream_blocked = 0;
                     46: 
                     47:         p->log->action = "reading upstream";
                     48: 
                     49:         if (ngx_event_pipe_read_upstream(p) == NGX_ABORT) {
                     50:             return NGX_ABORT;
                     51:         }
                     52: 
                     53:         if (!p->read && !p->upstream_blocked) {
                     54:             break;
                     55:         }
                     56: 
                     57:         do_write = 1;
                     58:     }
                     59: 
                     60:     if (p->upstream->fd != -1) {
                     61:         rev = p->upstream->read;
                     62: 
                     63:         flags = (rev->eof || rev->error) ? NGX_CLOSE_EVENT : 0;
                     64: 
                     65:         if (ngx_handle_read_event(rev, flags) != NGX_OK) {
                     66:             return NGX_ABORT;
                     67:         }
                     68: 
                     69:         if (rev->active && !rev->ready) {
                     70:             ngx_add_timer(rev, p->read_timeout);
                     71: 
                     72:         } else if (rev->timer_set) {
                     73:             ngx_del_timer(rev);
                     74:         }
                     75:     }
                     76: 
                     77:     if (p->downstream->fd != -1 && p->downstream->data == p->output_ctx) {
                     78:         wev = p->downstream->write;
                     79:         if (ngx_handle_write_event(wev, p->send_lowat) != NGX_OK) {
                     80:             return NGX_ABORT;
                     81:         }
                     82: 
                     83:         if (!wev->delayed) {
                     84:             if (wev->active && !wev->ready) {
                     85:                 ngx_add_timer(wev, p->send_timeout);
                     86: 
                     87:             } else if (wev->timer_set) {
                     88:                 ngx_del_timer(wev);
                     89:             }
                     90:         }
                     91:     }
                     92: 
                     93:     return NGX_OK;
                     94: }
                     95: 
                     96: 
                     97: static ngx_int_t
                     98: ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
                     99: {
                    100:     ssize_t       n, size;
                    101:     ngx_int_t     rc;
                    102:     ngx_buf_t    *b;
                    103:     ngx_chain_t  *chain, *cl, *ln;
                    104: 
                    105:     if (p->upstream_eof || p->upstream_error || p->upstream_done) {
                    106:         return NGX_OK;
                    107:     }
                    108: 
                    109:     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    110:                    "pipe read upstream: %d", p->upstream->read->ready);
                    111: 
                    112:     for ( ;; ) {
                    113: 
                    114:         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
                    115:             break;
                    116:         }
                    117: 
                    118:         if (p->preread_bufs == NULL && !p->upstream->read->ready) {
                    119:             break;
                    120:         }
                    121: 
                    122:         if (p->preread_bufs) {
                    123: 
                    124:             /* use the pre-read bufs if they exist */
                    125: 
                    126:             chain = p->preread_bufs;
                    127:             p->preread_bufs = NULL;
                    128:             n = p->preread_size;
                    129: 
                    130:             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    131:                            "pipe preread: %z", n);
                    132: 
                    133:             if (n) {
                    134:                 p->read = 1;
                    135:             }
                    136: 
                    137:         } else {
                    138: 
                    139: #if (NGX_HAVE_KQUEUE)
                    140: 
                    141:             /*
                    142:              * kqueue notifies about the end of file or a pending error.
                    143:              * This test allows not to allocate a buf on these conditions
                    144:              * and not to call c->recv_chain().
                    145:              */
                    146: 
                    147:             if (p->upstream->read->available == 0
                    148:                 && p->upstream->read->pending_eof)
                    149:             {
                    150:                 p->upstream->read->ready = 0;
                    151:                 p->upstream->read->eof = 1;
                    152:                 p->upstream_eof = 1;
                    153:                 p->read = 1;
                    154: 
                    155:                 if (p->upstream->read->kq_errno) {
                    156:                     p->upstream->read->error = 1;
                    157:                     p->upstream_error = 1;
                    158:                     p->upstream_eof = 0;
                    159: 
                    160:                     ngx_log_error(NGX_LOG_ERR, p->log,
                    161:                                   p->upstream->read->kq_errno,
                    162:                                   "kevent() reported that upstream "
                    163:                                   "closed connection");
                    164:                 }
                    165: 
                    166:                 break;
                    167:             }
                    168: #endif
                    169: 
                    170:             if (p->free_raw_bufs) {
                    171: 
                    172:                 /* use the free bufs if they exist */
                    173: 
                    174:                 chain = p->free_raw_bufs;
                    175:                 if (p->single_buf) {
                    176:                     p->free_raw_bufs = p->free_raw_bufs->next;
                    177:                     chain->next = NULL;
                    178:                 } else {
                    179:                     p->free_raw_bufs = NULL;
                    180:                 }
                    181: 
                    182:             } else if (p->allocated < p->bufs.num) {
                    183: 
                    184:                 /* allocate a new buf if it's still allowed */
                    185: 
                    186:                 b = ngx_create_temp_buf(p->pool, p->bufs.size);
                    187:                 if (b == NULL) {
                    188:                     return NGX_ABORT;
                    189:                 }
                    190: 
                    191:                 p->allocated++;
                    192: 
                    193:                 chain = ngx_alloc_chain_link(p->pool);
                    194:                 if (chain == NULL) {
                    195:                     return NGX_ABORT;
                    196:                 }
                    197: 
                    198:                 chain->buf = b;
                    199:                 chain->next = NULL;
                    200: 
                    201:             } else if (!p->cacheable
                    202:                        && p->downstream->data == p->output_ctx
                    203:                        && p->downstream->write->ready
                    204:                        && !p->downstream->write->delayed)
                    205:             {
                    206:                 /*
                    207:                  * if the bufs are not needed to be saved in a cache and
                    208:                  * a downstream is ready then write the bufs to a downstream
                    209:                  */
                    210: 
                    211:                 p->upstream_blocked = 1;
                    212: 
                    213:                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    214:                                "pipe downstream ready");
                    215: 
                    216:                 break;
                    217: 
                    218:             } else if (p->cacheable
                    219:                        || p->temp_file->offset < p->max_temp_file_size)
                    220:             {
                    221: 
                    222:                 /*
                    223:                  * if it is allowed, then save some bufs from r->in
                    224:                  * to a temporary file, and add them to a r->out chain
                    225:                  */
                    226: 
                    227:                 rc = ngx_event_pipe_write_chain_to_temp_file(p);
                    228: 
                    229:                 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    230:                                "pipe temp offset: %O", p->temp_file->offset);
                    231: 
                    232:                 if (rc == NGX_BUSY) {
                    233:                     break;
                    234:                 }
                    235: 
                    236:                 if (rc == NGX_AGAIN) {
                    237:                     if (ngx_event_flags & NGX_USE_LEVEL_EVENT
                    238:                         && p->upstream->read->active
                    239:                         && p->upstream->read->ready)
                    240:                     {
                    241:                         if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
                    242:                             == NGX_ERROR)
                    243:                         {
                    244:                             return NGX_ABORT;
                    245:                         }
                    246:                     }
                    247:                 }
                    248: 
                    249:                 if (rc != NGX_OK) {
                    250:                     return rc;
                    251:                 }
                    252: 
                    253:                 chain = p->free_raw_bufs;
                    254:                 if (p->single_buf) {
                    255:                     p->free_raw_bufs = p->free_raw_bufs->next;
                    256:                     chain->next = NULL;
                    257:                 } else {
                    258:                     p->free_raw_bufs = NULL;
                    259:                 }
                    260: 
                    261:             } else {
                    262: 
                    263:                 /* there are no bufs to read in */
                    264: 
                    265:                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    266:                                "no pipe bufs to read in");
                    267: 
                    268:                 break;
                    269:             }
                    270: 
                    271:             n = p->upstream->recv_chain(p->upstream, chain);
                    272: 
                    273:             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    274:                            "pipe recv chain: %z", n);
                    275: 
                    276:             if (p->free_raw_bufs) {
                    277:                 chain->next = p->free_raw_bufs;
                    278:             }
                    279:             p->free_raw_bufs = chain;
                    280: 
                    281:             if (n == NGX_ERROR) {
                    282:                 p->upstream_error = 1;
                    283:                 return NGX_ERROR;
                    284:             }
                    285: 
                    286:             if (n == NGX_AGAIN) {
                    287:                 if (p->single_buf) {
                    288:                     ngx_event_pipe_remove_shadow_links(chain->buf);
                    289:                 }
                    290: 
                    291:                 break;
                    292:             }
                    293: 
                    294:             p->read = 1;
                    295: 
                    296:             if (n == 0) {
                    297:                 p->upstream_eof = 1;
                    298:                 break;
                    299:             }
                    300:         }
                    301: 
                    302:         p->read_length += n;
                    303:         cl = chain;
                    304:         p->free_raw_bufs = NULL;
                    305: 
                    306:         while (cl && n > 0) {
                    307: 
                    308:             ngx_event_pipe_remove_shadow_links(cl->buf);
                    309: 
                    310:             size = cl->buf->end - cl->buf->last;
                    311: 
                    312:             if (n >= size) {
                    313:                 cl->buf->last = cl->buf->end;
                    314: 
                    315:                 /* STUB */ cl->buf->num = p->num++;
                    316: 
                    317:                 if (p->input_filter(p, cl->buf) == NGX_ERROR) {
                    318:                     return NGX_ABORT;
                    319:                 }
                    320: 
                    321:                 n -= size;
                    322:                 ln = cl;
                    323:                 cl = cl->next;
                    324:                 ngx_free_chain(p->pool, ln);
                    325: 
                    326:             } else {
                    327:                 cl->buf->last += n;
                    328:                 n = 0;
                    329:             }
                    330:         }
                    331: 
                    332:         if (cl) {
                    333:             for (ln = cl; ln->next; ln = ln->next) { /* void */ }
                    334: 
                    335:             ln->next = p->free_raw_bufs;
                    336:             p->free_raw_bufs = cl;
                    337:         }
                    338:     }
                    339: 
                    340: #if (NGX_DEBUG)
                    341: 
                    342:     for (cl = p->busy; cl; cl = cl->next) {
                    343:         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    344:                        "pipe buf busy s:%d t:%d f:%d "
                    345:                        "%p, pos %p, size: %z "
                    346:                        "file: %O, size: %z",
                    347:                        (cl->buf->shadow ? 1 : 0),
                    348:                        cl->buf->temporary, cl->buf->in_file,
                    349:                        cl->buf->start, cl->buf->pos,
                    350:                        cl->buf->last - cl->buf->pos,
                    351:                        cl->buf->file_pos,
                    352:                        cl->buf->file_last - cl->buf->file_pos);
                    353:     }
                    354: 
                    355:     for (cl = p->out; cl; cl = cl->next) {
                    356:         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    357:                        "pipe buf out  s:%d t:%d f:%d "
                    358:                        "%p, pos %p, size: %z "
                    359:                        "file: %O, size: %z",
                    360:                        (cl->buf->shadow ? 1 : 0),
                    361:                        cl->buf->temporary, cl->buf->in_file,
                    362:                        cl->buf->start, cl->buf->pos,
                    363:                        cl->buf->last - cl->buf->pos,
                    364:                        cl->buf->file_pos,
                    365:                        cl->buf->file_last - cl->buf->file_pos);
                    366:     }
                    367: 
                    368:     for (cl = p->in; cl; cl = cl->next) {
                    369:         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    370:                        "pipe buf in   s:%d t:%d f:%d "
                    371:                        "%p, pos %p, size: %z "
                    372:                        "file: %O, size: %z",
                    373:                        (cl->buf->shadow ? 1 : 0),
                    374:                        cl->buf->temporary, cl->buf->in_file,
                    375:                        cl->buf->start, cl->buf->pos,
                    376:                        cl->buf->last - cl->buf->pos,
                    377:                        cl->buf->file_pos,
                    378:                        cl->buf->file_last - cl->buf->file_pos);
                    379:     }
                    380: 
                    381:     for (cl = p->free_raw_bufs; cl; cl = cl->next) {
                    382:         ngx_log_debug8(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    383:                        "pipe buf free s:%d t:%d f:%d "
                    384:                        "%p, pos %p, size: %z "
                    385:                        "file: %O, size: %z",
                    386:                        (cl->buf->shadow ? 1 : 0),
                    387:                        cl->buf->temporary, cl->buf->in_file,
                    388:                        cl->buf->start, cl->buf->pos,
                    389:                        cl->buf->last - cl->buf->pos,
                    390:                        cl->buf->file_pos,
                    391:                        cl->buf->file_last - cl->buf->file_pos);
                    392:     }
                    393: 
                    394:     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    395:                    "pipe length: %O", p->length);
                    396: 
                    397: #endif
                    398: 
                    399:     if (p->free_raw_bufs && p->length != -1) {
                    400:         cl = p->free_raw_bufs;
                    401: 
                    402:         if (cl->buf->last - cl->buf->pos >= p->length) {
                    403: 
                    404:             p->free_raw_bufs = cl->next;
                    405: 
                    406:             /* STUB */ cl->buf->num = p->num++;
                    407: 
                    408:             if (p->input_filter(p, cl->buf) == NGX_ERROR) {
                    409:                  return NGX_ABORT;
                    410:             }
                    411: 
                    412:             ngx_free_chain(p->pool, cl);
                    413:         }
                    414:     }
                    415: 
                    416:     if (p->length == 0) {
                    417:         p->upstream_done = 1;
                    418:         p->read = 1;
                    419:     }
                    420: 
                    421:     if ((p->upstream_eof || p->upstream_error) && p->free_raw_bufs) {
                    422: 
                    423:         /* STUB */ p->free_raw_bufs->buf->num = p->num++;
                    424: 
                    425:         if (p->input_filter(p, p->free_raw_bufs->buf) == NGX_ERROR) {
                    426:             return NGX_ABORT;
                    427:         }
                    428: 
                    429:         p->free_raw_bufs = p->free_raw_bufs->next;
                    430: 
                    431:         if (p->free_bufs && p->buf_to_file == NULL) {
                    432:             for (cl = p->free_raw_bufs; cl; cl = cl->next) {
                    433:                 if (cl->buf->shadow == NULL) {
                    434:                     ngx_pfree(p->pool, cl->buf->start);
                    435:                 }
                    436:             }
                    437:         }
                    438:     }
                    439: 
                    440:     if (p->cacheable && p->in) {
                    441:         if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
                    442:             return NGX_ABORT;
                    443:         }
                    444:     }
                    445: 
                    446:     return NGX_OK;
                    447: }
                    448: 
                    449: 
                    450: static ngx_int_t
                    451: ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
                    452: {
                    453:     u_char            *prev;
                    454:     size_t             bsize;
                    455:     ngx_int_t          rc;
                    456:     ngx_uint_t         flush, flushed, prev_last_shadow;
                    457:     ngx_chain_t       *out, **ll, *cl, file;
                    458:     ngx_connection_t  *downstream;
                    459: 
                    460:     downstream = p->downstream;
                    461: 
                    462:     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    463:                    "pipe write downstream: %d", downstream->write->ready);
                    464: 
                    465:     flushed = 0;
                    466: 
                    467:     for ( ;; ) {
                    468:         if (p->downstream_error) {
                    469:             return ngx_event_pipe_drain_chains(p);
                    470:         }
                    471: 
                    472:         if (p->upstream_eof || p->upstream_error || p->upstream_done) {
                    473: 
                    474:             /* pass the p->out and p->in chains to the output filter */
                    475: 
                    476:             for (cl = p->busy; cl; cl = cl->next) {
                    477:                 cl->buf->recycled = 0;
                    478:             }
                    479: 
                    480:             if (p->out) {
                    481:                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    482:                                "pipe write downstream flush out");
                    483: 
                    484:                 for (cl = p->out; cl; cl = cl->next) {
                    485:                     cl->buf->recycled = 0;
                    486:                 }
                    487: 
                    488:                 rc = p->output_filter(p->output_ctx, p->out);
                    489: 
                    490:                 if (rc == NGX_ERROR) {
                    491:                     p->downstream_error = 1;
                    492:                     return ngx_event_pipe_drain_chains(p);
                    493:                 }
                    494: 
                    495:                 p->out = NULL;
                    496:             }
                    497: 
                    498:             if (p->in) {
                    499:                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    500:                                "pipe write downstream flush in");
                    501: 
                    502:                 for (cl = p->in; cl; cl = cl->next) {
                    503:                     cl->buf->recycled = 0;
                    504:                 }
                    505: 
                    506:                 rc = p->output_filter(p->output_ctx, p->in);
                    507: 
                    508:                 if (rc == NGX_ERROR) {
                    509:                     p->downstream_error = 1;
                    510:                     return ngx_event_pipe_drain_chains(p);
                    511:                 }
                    512: 
                    513:                 p->in = NULL;
                    514:             }
                    515: 
                    516:             if (p->cacheable && p->buf_to_file) {
                    517: 
                    518:                 file.buf = p->buf_to_file;
                    519:                 file.next = NULL;
                    520: 
                    521:                 if (ngx_write_chain_to_temp_file(p->temp_file, &file)
                    522:                     == NGX_ERROR)
                    523:                 {
                    524:                     return NGX_ABORT;
                    525:                 }
                    526:             }
                    527: 
                    528:             ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    529:                            "pipe write downstream done");
                    530: 
                    531:             /* TODO: free unused bufs */
                    532: 
                    533:             p->downstream_done = 1;
                    534:             break;
                    535:         }
                    536: 
                    537:         if (downstream->data != p->output_ctx
                    538:             || !downstream->write->ready
                    539:             || downstream->write->delayed)
                    540:         {
                    541:             break;
                    542:         }
                    543: 
                    544:         /* bsize is the size of the busy recycled bufs */
                    545: 
                    546:         prev = NULL;
                    547:         bsize = 0;
                    548: 
                    549:         for (cl = p->busy; cl; cl = cl->next) {
                    550: 
                    551:             if (cl->buf->recycled) {
                    552:                 if (prev == cl->buf->start) {
                    553:                     continue;
                    554:                 }
                    555: 
                    556:                 bsize += cl->buf->end - cl->buf->start;
                    557:                 prev = cl->buf->start;
                    558:             }
                    559:         }
                    560: 
                    561:         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    562:                        "pipe write busy: %uz", bsize);
                    563: 
                    564:         out = NULL;
                    565: 
                    566:         if (bsize >= (size_t) p->busy_size) {
                    567:             flush = 1;
                    568:             goto flush;
                    569:         }
                    570: 
                    571:         flush = 0;
                    572:         ll = NULL;
                    573:         prev_last_shadow = 1;
                    574: 
                    575:         for ( ;; ) {
                    576:             if (p->out) {
                    577:                 cl = p->out;
                    578: 
                    579:                 if (cl->buf->recycled) {
                    580:                     ngx_log_error(NGX_LOG_ALERT, p->log, 0,
                    581:                                   "recycled buffer in pipe out chain");
                    582:                 }
                    583: 
                    584:                 p->out = p->out->next;
                    585: 
                    586:             } else if (!p->cacheable && p->in) {
                    587:                 cl = p->in;
                    588: 
                    589:                 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    590:                                "pipe write buf ls:%d %p %z",
                    591:                                cl->buf->last_shadow,
                    592:                                cl->buf->pos,
                    593:                                cl->buf->last - cl->buf->pos);
                    594: 
                    595:                 if (cl->buf->recycled && prev_last_shadow) {
                    596:                     if (bsize + cl->buf->end - cl->buf->start > p->busy_size) {
                    597:                         flush = 1;
                    598:                         break;
                    599:                     }
                    600: 
                    601:                     bsize += cl->buf->end - cl->buf->start;
                    602:                 }
                    603: 
                    604:                 prev_last_shadow = cl->buf->last_shadow;
                    605: 
                    606:                 p->in = p->in->next;
                    607: 
                    608:             } else {
                    609:                 break;
                    610:             }
                    611: 
                    612:             cl->next = NULL;
                    613: 
                    614:             if (out) {
                    615:                 *ll = cl;
                    616:             } else {
                    617:                 out = cl;
                    618:             }
                    619:             ll = &cl->next;
                    620:         }
                    621: 
                    622:     flush:
                    623: 
                    624:         ngx_log_debug2(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    625:                        "pipe write: out:%p, f:%d", out, flush);
                    626: 
                    627:         if (out == NULL) {
                    628: 
                    629:             if (!flush) {
                    630:                 break;
                    631:             }
                    632: 
                    633:             /* a workaround for AIO */
                    634:             if (flushed++ > 10) {
                    635:                 return NGX_BUSY;
                    636:             }
                    637:         }
                    638: 
                    639:         rc = p->output_filter(p->output_ctx, out);
                    640: 
                    641:         ngx_chain_update_chains(p->pool, &p->free, &p->busy, &out, p->tag);
                    642: 
                    643:         if (rc == NGX_ERROR) {
                    644:             p->downstream_error = 1;
                    645:             return ngx_event_pipe_drain_chains(p);
                    646:         }
                    647: 
                    648:         for (cl = p->free; cl; cl = cl->next) {
                    649: 
                    650:             if (cl->buf->temp_file) {
                    651:                 if (p->cacheable || !p->cyclic_temp_file) {
                    652:                     continue;
                    653:                 }
                    654: 
                    655:                 /* reset p->temp_offset if all bufs had been sent */
                    656: 
                    657:                 if (cl->buf->file_last == p->temp_file->offset) {
                    658:                     p->temp_file->offset = 0;
                    659:                 }
                    660:             }
                    661: 
                    662:             /* TODO: free buf if p->free_bufs && upstream done */
                    663: 
                    664:             /* add the free shadow raw buf to p->free_raw_bufs */
                    665: 
                    666:             if (cl->buf->last_shadow) {
                    667:                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
                    668:                     return NGX_ABORT;
                    669:                 }
                    670: 
                    671:                 cl->buf->last_shadow = 0;
                    672:             }
                    673: 
                    674:             cl->buf->shadow = NULL;
                    675:         }
                    676:     }
                    677: 
                    678:     return NGX_OK;
                    679: }
                    680: 
                    681: 
                    682: static ngx_int_t
                    683: ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
                    684: {
                    685:     ssize_t       size, bsize, n;
                    686:     ngx_buf_t    *b;
                    687:     ngx_uint_t    prev_last_shadow;
                    688:     ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
                    689: 
                    690:     if (p->buf_to_file) {
                    691:         fl.buf = p->buf_to_file;
                    692:         fl.next = p->in;
                    693:         out = &fl;
                    694: 
                    695:     } else {
                    696:         out = p->in;
                    697:     }
                    698: 
                    699:     if (!p->cacheable) {
                    700: 
                    701:         size = 0;
                    702:         cl = out;
                    703:         ll = NULL;
                    704:         prev_last_shadow = 1;
                    705: 
                    706:         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    707:                        "pipe offset: %O", p->temp_file->offset);
                    708: 
                    709:         do {
                    710:             bsize = cl->buf->last - cl->buf->pos;
                    711: 
                    712:             ngx_log_debug4(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    713:                            "pipe buf ls:%d %p, pos %p, size: %z",
                    714:                            cl->buf->last_shadow, cl->buf->start,
                    715:                            cl->buf->pos, bsize);
                    716: 
                    717:             if (prev_last_shadow
                    718:                 && ((size + bsize > p->temp_file_write_size)
                    719:                     || (p->temp_file->offset + size + bsize
                    720:                         > p->max_temp_file_size)))
                    721:             {
                    722:                 break;
                    723:             }
                    724: 
                    725:             prev_last_shadow = cl->buf->last_shadow;
                    726: 
                    727:             size += bsize;
                    728:             ll = &cl->next;
                    729:             cl = cl->next;
                    730: 
                    731:         } while (cl);
                    732: 
                    733:         ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "size: %z", size);
                    734: 
                    735:         if (ll == NULL) {
                    736:             return NGX_BUSY;
                    737:         }
                    738: 
                    739:         if (cl) {
                    740:            p->in = cl;
                    741:            *ll = NULL;
                    742: 
                    743:         } else {
                    744:            p->in = NULL;
                    745:            p->last_in = &p->in;
                    746:         }
                    747: 
                    748:     } else {
                    749:         p->in = NULL;
                    750:         p->last_in = &p->in;
                    751:     }
                    752: 
                    753:     n = ngx_write_chain_to_temp_file(p->temp_file, out);
                    754: 
                    755:     if (n == NGX_ERROR) {
                    756:         return NGX_ABORT;
                    757:     }
                    758: 
                    759:     if (p->buf_to_file) {
                    760:         p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
                    761:         n -= p->buf_to_file->last - p->buf_to_file->pos;
                    762:         p->buf_to_file = NULL;
                    763:         out = out->next;
                    764:     }
                    765: 
                    766:     if (n > 0) {
                    767:         /* update previous buffer or add new buffer */
                    768: 
                    769:         if (p->out) {
                    770:             for (cl = p->out; cl->next; cl = cl->next) { /* void */ }
                    771: 
                    772:             b = cl->buf;
                    773: 
                    774:             if (b->file_last == p->temp_file->offset) {
                    775:                 p->temp_file->offset += n;
                    776:                 b->file_last = p->temp_file->offset;
                    777:                 goto free;
                    778:             }
                    779: 
                    780:             last_out = &cl->next;
                    781: 
                    782:         } else {
                    783:             last_out = &p->out;
                    784:         }
                    785: 
                    786:         cl = ngx_chain_get_free_buf(p->pool, &p->free);
                    787:         if (cl == NULL) {
                    788:             return NGX_ABORT;
                    789:         }
                    790: 
                    791:         b = cl->buf;
                    792: 
                    793:         ngx_memzero(b, sizeof(ngx_buf_t));
                    794: 
                    795:         b->tag = p->tag;
                    796: 
                    797:         b->file = &p->temp_file->file;
                    798:         b->file_pos = p->temp_file->offset;
                    799:         p->temp_file->offset += n;
                    800:         b->file_last = p->temp_file->offset;
                    801: 
                    802:         b->in_file = 1;
                    803:         b->temp_file = 1;
                    804: 
                    805:         *last_out = cl;
                    806:     }
                    807: 
                    808: free:
                    809: 
                    810:     for (last_free = &p->free_raw_bufs;
                    811:          *last_free != NULL;
                    812:          last_free = &(*last_free)->next)
                    813:     {
                    814:         /* void */
                    815:     }
                    816: 
                    817:     for (cl = out; cl; cl = next) {
                    818:         next = cl->next;
                    819: 
                    820:         cl->next = p->free;
                    821:         p->free = cl;
                    822: 
                    823:         b = cl->buf;
                    824: 
                    825:         if (b->last_shadow) {
                    826: 
                    827:             tl = ngx_alloc_chain_link(p->pool);
                    828:             if (tl == NULL) {
                    829:                 return NGX_ABORT;
                    830:             }
                    831: 
                    832:             tl->buf = b->shadow;
                    833:             tl->next = NULL;
                    834: 
                    835:             *last_free = tl;
                    836:             last_free = &tl->next;
                    837: 
                    838:             b->shadow->pos = b->shadow->start;
                    839:             b->shadow->last = b->shadow->start;
                    840: 
                    841:             ngx_event_pipe_remove_shadow_links(b->shadow);
                    842:         }
                    843:     }
                    844: 
                    845:     return NGX_OK;
                    846: }
                    847: 
                    848: 
                    849: /* the copy input filter */
                    850: 
                    851: ngx_int_t
                    852: ngx_event_pipe_copy_input_filter(ngx_event_pipe_t *p, ngx_buf_t *buf)
                    853: {
                    854:     ngx_buf_t    *b;
                    855:     ngx_chain_t  *cl;
                    856: 
                    857:     if (buf->pos == buf->last) {
                    858:         return NGX_OK;
                    859:     }
                    860: 
                    861:     if (p->free) {
                    862:         cl = p->free;
                    863:         b = cl->buf;
                    864:         p->free = cl->next;
                    865:         ngx_free_chain(p->pool, cl);
                    866: 
                    867:     } else {
                    868:         b = ngx_alloc_buf(p->pool);
                    869:         if (b == NULL) {
                    870:             return NGX_ERROR;
                    871:         }
                    872:     }
                    873: 
                    874:     ngx_memcpy(b, buf, sizeof(ngx_buf_t));
                    875:     b->shadow = buf;
                    876:     b->tag = p->tag;
                    877:     b->last_shadow = 1;
                    878:     b->recycled = 1;
                    879:     buf->shadow = b;
                    880: 
                    881:     cl = ngx_alloc_chain_link(p->pool);
                    882:     if (cl == NULL) {
                    883:         return NGX_ERROR;
                    884:     }
                    885: 
                    886:     cl->buf = b;
                    887:     cl->next = NULL;
                    888: 
                    889:     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0, "input buf #%d", b->num);
                    890: 
                    891:     if (p->in) {
                    892:         *p->last_in = cl;
                    893:     } else {
                    894:         p->in = cl;
                    895:     }
                    896:     p->last_in = &cl->next;
                    897: 
                    898:     if (p->length == -1) {
                    899:         return NGX_OK;
                    900:     }
                    901: 
                    902:     p->length -= b->last - b->pos;
                    903: 
                    904:     return NGX_OK;
                    905: }
                    906: 
                    907: 
                    908: static ngx_inline void
                    909: ngx_event_pipe_remove_shadow_links(ngx_buf_t *buf)
                    910: {
                    911:     ngx_buf_t  *b, *next;
                    912: 
                    913:     b = buf->shadow;
                    914: 
                    915:     if (b == NULL) {
                    916:         return;
                    917:     }
                    918: 
                    919:     while (!b->last_shadow) {
                    920:         next = b->shadow;
                    921: 
                    922:         b->temporary = 0;
                    923:         b->recycled = 0;
                    924: 
                    925:         b->shadow = NULL;
                    926:         b = next;
                    927:     }
                    928: 
                    929:     b->temporary = 0;
                    930:     b->recycled = 0;
                    931:     b->last_shadow = 0;
                    932: 
                    933:     b->shadow = NULL;
                    934: 
                    935:     buf->shadow = NULL;
                    936: }
                    937: 
                    938: 
                    939: ngx_int_t
                    940: ngx_event_pipe_add_free_buf(ngx_event_pipe_t *p, ngx_buf_t *b)
                    941: {
                    942:     ngx_chain_t  *cl;
                    943: 
                    944:     cl = ngx_alloc_chain_link(p->pool);
                    945:     if (cl == NULL) {
                    946:         return NGX_ERROR;
                    947:     }
                    948: 
                    949:     if (p->buf_to_file && b->start == p->buf_to_file->start) {
                    950:         b->pos = p->buf_to_file->last;
                    951:         b->last = p->buf_to_file->last;
                    952: 
                    953:     } else {
                    954:         b->pos = b->start;
                    955:         b->last = b->start;
                    956:     }
                    957: 
                    958:     b->shadow = NULL;
                    959: 
                    960:     cl->buf = b;
                    961: 
                    962:     if (p->free_raw_bufs == NULL) {
                    963:         p->free_raw_bufs = cl;
                    964:         cl->next = NULL;
                    965: 
                    966:         return NGX_OK;
                    967:     }
                    968: 
                    969:     if (p->free_raw_bufs->buf->pos == p->free_raw_bufs->buf->last) {
                    970: 
                    971:         /* add the free buf to the list start */
                    972: 
                    973:         cl->next = p->free_raw_bufs;
                    974:         p->free_raw_bufs = cl;
                    975: 
                    976:         return NGX_OK;
                    977:     }
                    978: 
                    979:     /* the first free buf is partially filled, thus add the free buf after it */
                    980: 
                    981:     cl->next = p->free_raw_bufs->next;
                    982:     p->free_raw_bufs->next = cl;
                    983: 
                    984:     return NGX_OK;
                    985: }
                    986: 
                    987: 
                    988: static ngx_int_t
                    989: ngx_event_pipe_drain_chains(ngx_event_pipe_t *p)
                    990: {
                    991:     ngx_chain_t  *cl, *tl;
                    992: 
                    993:     for ( ;; ) {
                    994:         if (p->busy) {
                    995:             cl = p->busy;
                    996:             p->busy = NULL;
                    997: 
                    998:         } else if (p->out) {
                    999:             cl = p->out;
                   1000:             p->out = NULL;
                   1001: 
                   1002:         } else if (p->in) {
                   1003:             cl = p->in;
                   1004:             p->in = NULL;
                   1005: 
                   1006:         } else {
                   1007:             return NGX_OK;
                   1008:         }
                   1009: 
                   1010:         while (cl) {
                   1011:             if (cl->buf->last_shadow) {
                   1012:                 if (ngx_event_pipe_add_free_buf(p, cl->buf->shadow) != NGX_OK) {
                   1013:                     return NGX_ABORT;
                   1014:                 }
                   1015: 
                   1016:                 cl->buf->last_shadow = 0;
                   1017:             }
                   1018: 
                   1019:             cl->buf->shadow = NULL;
                   1020:             tl = cl->next;
                   1021:             cl->next = p->free;
                   1022:             p->free = cl;
                   1023:             cl = tl;
                   1024:         }
                   1025:     }
                   1026: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>