--- embedaddon/lighttpd/src/chunk.c 2013/10/14 10:32:47 1.1.1.1 +++ embedaddon/lighttpd/src/chunk.c 2016/11/02 10:35:00 1.1.1.2 @@ -1,3 +1,5 @@ +#include "first.h" + /** * the network chunk-API * @@ -5,10 +7,12 @@ */ #include "chunk.h" +#include "base.h" +#include "log.h" #include #include -#include +#include "sys-mmap.h" #include #include @@ -18,16 +22,27 @@ #include #include +/* default 1MB, upper limit 128MB */ +#define DEFAULT_TEMPFILE_SIZE (1 * 1024 * 1024) +#define MAX_TEMPFILE_SIZE (128 * 1024 * 1024) + +static array *chunkqueue_default_tempdirs = NULL; +static unsigned int chunkqueue_default_tempfile_size = DEFAULT_TEMPFILE_SIZE; + chunkqueue *chunkqueue_init(void) { chunkqueue *cq; cq = calloc(1, sizeof(*cq)); + force_assert(NULL != cq); cq->first = NULL; cq->last = NULL; cq->unused = NULL; + cq->tempdirs = chunkqueue_default_tempdirs; + cq->upload_temp_file_size = chunkqueue_default_tempfile_size; + return cq; } @@ -35,31 +50,30 @@ static chunk *chunk_init(void) { chunk *c; c = calloc(1, sizeof(*c)); + force_assert(NULL != c); + c->type = MEM_CHUNK; c->mem = buffer_init(); c->file.name = buffer_init(); + c->file.start = c->file.length = c->file.mmap.offset = 0; c->file.fd = -1; c->file.mmap.start = MAP_FAILED; + c->file.mmap.length = 0; + c->file.is_temp = 0; + c->offset = 0; c->next = NULL; return c; } -static void chunk_free(chunk *c) { - if (!c) return; - - buffer_free(c->mem); - buffer_free(c->file.name); - - free(c); -} - static void chunk_reset(chunk *c) { - if (!c) return; + if (NULL == c) return; + c->type = MEM_CHUNK; + buffer_reset(c->mem); - if (c->file.is_temp && !buffer_is_empty(c->file.name)) { + if (c->file.is_temp && !buffer_string_is_empty(c->file.name)) { unlink(c->file.name->ptr); } @@ -73,13 +87,45 @@ static void chunk_reset(chunk *c) { munmap(c->file.mmap.start, c->file.mmap.length); c->file.mmap.start = MAP_FAILED; } + c->file.start = c->file.length = c->file.mmap.offset = 0; + c->file.mmap.length = 0; + c->file.is_temp = 0; + c->offset = 0; + c->next = NULL; } +static void chunk_free(chunk *c) { + if (NULL == c) return; + chunk_reset(c); + + buffer_free(c->mem); + buffer_free(c->file.name); + + free(c); +} + +static off_t chunk_remaining_length(const chunk *c) { + off_t len = 0; + switch (c->type) { + case MEM_CHUNK: + len = buffer_string_length(c->mem); + break; + case FILE_CHUNK: + len = c->file.length; + break; + default: + force_assert(c->type == MEM_CHUNK || c->type == FILE_CHUNK); + break; + } + force_assert(c->offset <= len); + return len - c->offset; +} + void chunkqueue_free(chunkqueue *cq) { chunk *c, *pc; - if (!cq) return; + if (NULL == cq) return; for (c = cq->first; c; ) { pc = c; @@ -96,11 +142,27 @@ void chunkqueue_free(chunkqueue *cq) { free(cq); } +static void chunkqueue_push_unused_chunk(chunkqueue *cq, chunk *c) { + force_assert(NULL != cq && NULL != c); + + /* keep at max 4 chunks in the 'unused'-cache */ + if (cq->unused_chunks > 4) { + chunk_free(c); + } else { + chunk_reset(c); + c->next = cq->unused; + cq->unused = c; + cq->unused_chunks++; + } +} + static chunk *chunkqueue_get_unused_chunk(chunkqueue *cq) { chunk *c; + force_assert(NULL != cq); + /* check if we have a unused chunk */ - if (!cq->unused) { + if (0 == cq->unused) { c = chunk_init(); } else { /* take the first element from the list (a stack) */ @@ -113,203 +175,315 @@ static chunk *chunkqueue_get_unused_chunk(chunkqueue * return c; } -static int chunkqueue_prepend_chunk(chunkqueue *cq, chunk *c) { +static void chunkqueue_prepend_chunk(chunkqueue *cq, chunk *c) { c->next = cq->first; cq->first = c; - if (cq->last == NULL) { + if (NULL == cq->last) { cq->last = c; } - - return 0; + cq->bytes_in += chunk_remaining_length(c); } -static int chunkqueue_append_chunk(chunkqueue *cq, chunk *c) { +static void chunkqueue_append_chunk(chunkqueue *cq, chunk *c) { + c->next = NULL; if (cq->last) { cq->last->next = c; } cq->last = c; - if (cq->first == NULL) { + if (NULL == cq->first) { cq->first = c; } - - return 0; + cq->bytes_in += chunk_remaining_length(c); } void chunkqueue_reset(chunkqueue *cq) { - chunk *c; - /* move everything to the unused queue */ + chunk *cur = cq->first; - /* mark all read written */ - for (c = cq->first; c; c = c->next) { - switch(c->type) { - case MEM_CHUNK: - c->offset = c->mem->used - 1; - break; - case FILE_CHUNK: - c->offset = c->file.length; - break; - default: - break; - } + cq->first = cq->last = NULL; + + while (NULL != cur) { + chunk *next = cur->next; + chunkqueue_push_unused_chunk(cq, cur); + cur = next; } - chunkqueue_remove_finished_chunks(cq); cq->bytes_in = 0; cq->bytes_out = 0; + cq->tempdir_idx = 0; } -int chunkqueue_append_file(chunkqueue *cq, buffer *fn, off_t offset, off_t len) { +void chunkqueue_append_file_fd(chunkqueue *cq, buffer *fn, int fd, off_t offset, off_t len) { chunk *c; - if (len == 0) return 0; + if (0 == len) { + close(fd); + return; + } c = chunkqueue_get_unused_chunk(cq); c->type = FILE_CHUNK; - buffer_copy_string_buffer(c->file.name, fn); + buffer_copy_buffer(c->file.name, fn); c->file.start = offset; c->file.length = len; + c->file.fd = fd; c->offset = 0; chunkqueue_append_chunk(cq, c); - - return 0; } -int chunkqueue_append_buffer(chunkqueue *cq, buffer *mem) { +void chunkqueue_append_file(chunkqueue *cq, buffer *fn, off_t offset, off_t len) { chunk *c; - if (mem->used == 0) return 0; + if (0 == len) return; c = chunkqueue_get_unused_chunk(cq); - c->type = MEM_CHUNK; + + c->type = FILE_CHUNK; + + buffer_copy_buffer(c->file.name, fn); + c->file.start = offset; + c->file.length = len; c->offset = 0; - buffer_copy_string_buffer(c->mem, mem); chunkqueue_append_chunk(cq, c); - - return 0; } -int chunkqueue_append_buffer_weak(chunkqueue *cq, buffer *mem) { +void chunkqueue_append_buffer(chunkqueue *cq, buffer *mem) { chunk *c; + if (buffer_string_is_empty(mem)) return; + c = chunkqueue_get_unused_chunk(cq); c->type = MEM_CHUNK; - c->offset = 0; - if (c->mem) buffer_free(c->mem); - c->mem = mem; + force_assert(NULL != c->mem); + buffer_move(c->mem, mem); chunkqueue_append_chunk(cq, c); - - return 0; } -int chunkqueue_prepend_buffer(chunkqueue *cq, buffer *mem) { +void chunkqueue_prepend_buffer(chunkqueue *cq, buffer *mem) { chunk *c; - if (mem->used == 0) return 0; + if (buffer_string_is_empty(mem)) return; c = chunkqueue_get_unused_chunk(cq); c->type = MEM_CHUNK; - c->offset = 0; - buffer_copy_string_buffer(c->mem, mem); + force_assert(NULL != c->mem); + buffer_move(c->mem, mem); chunkqueue_prepend_chunk(cq, c); - - return 0; } -int chunkqueue_append_mem(chunkqueue *cq, const char * mem, size_t len) { +void chunkqueue_append_mem(chunkqueue *cq, const char * mem, size_t len) { chunk *c; - if (len == 0) return 0; + if (0 == len) return; c = chunkqueue_get_unused_chunk(cq); c->type = MEM_CHUNK; - c->offset = 0; - buffer_copy_string_len(c->mem, mem, len - 1); + buffer_copy_string_len(c->mem, mem, len); chunkqueue_append_chunk(cq, c); +} - return 0; + +void chunkqueue_append_chunkqueue(chunkqueue *cq, chunkqueue *src) { + if (src == NULL || NULL == src->first) return; + + if (NULL == cq->first) { + cq->first = src->first; + } else { + cq->last->next = src->first; + } + cq->last = src->last; + cq->bytes_in += (src->bytes_in - src->bytes_out); + + src->first = NULL; + src->last = NULL; + src->bytes_out = src->bytes_in; } -buffer * chunkqueue_get_prepend_buffer(chunkqueue *cq) { + +void chunkqueue_get_memory(chunkqueue *cq, char **mem, size_t *len, size_t min_size, size_t alloc_size) { + static const size_t REALLOC_MAX_SIZE = 256; chunk *c; + buffer *b; + char *dummy_mem; + size_t dummy_len; - c = chunkqueue_get_unused_chunk(cq); + force_assert(NULL != cq); + if (NULL == mem) mem = &dummy_mem; + if (NULL == len) len = &dummy_len; - c->type = MEM_CHUNK; - c->offset = 0; - buffer_reset(c->mem); + /* default values: */ + if (0 == min_size) min_size = 1024; + if (0 == alloc_size) alloc_size = 4096; + if (alloc_size < min_size) alloc_size = min_size; - chunkqueue_prepend_chunk(cq, c); + if (NULL != cq->last && MEM_CHUNK == cq->last->type) { + size_t have; - return c->mem; -} + b = cq->last->mem; + have = buffer_string_space(b); -buffer *chunkqueue_get_append_buffer(chunkqueue *cq) { - chunk *c; + /* unused buffer: allocate space */ + if (buffer_string_is_empty(b)) { + buffer_string_prepare_copy(b, alloc_size); + have = buffer_string_space(b); + } + /* if buffer is really small just make it bigger */ + else if (have < min_size && b->size <= REALLOC_MAX_SIZE) { + size_t cur_len = buffer_string_length(b); + size_t new_size = cur_len + min_size, append; + if (new_size < alloc_size) new_size = alloc_size; - c = chunkqueue_get_unused_chunk(cq); + append = new_size - cur_len; + if (append >= min_size) { + buffer_string_prepare_append(b, append); + have = buffer_string_space(b); + } + } - c->type = MEM_CHUNK; - c->offset = 0; - buffer_reset(c->mem); + /* return pointer into existing buffer if large enough */ + if (have >= min_size) { + *mem = b->ptr + buffer_string_length(b); + *len = have; + return; + } + } + /* allocate new chunk */ + c = chunkqueue_get_unused_chunk(cq); + c->type = MEM_CHUNK; chunkqueue_append_chunk(cq, c); - return c->mem; + b = c->mem; + buffer_string_prepare_append(b, alloc_size); + + *mem = b->ptr + buffer_string_length(b); + *len = buffer_string_space(b); } -int chunkqueue_set_tempdirs(chunkqueue *cq, array *tempdirs) { - if (!cq) return -1; +void chunkqueue_use_memory(chunkqueue *cq, size_t len) { + buffer *b; + force_assert(NULL != cq); + force_assert(NULL != cq->last && MEM_CHUNK == cq->last->type); + b = cq->last->mem; + + if (len > 0) { + buffer_commit(b, len); + cq->bytes_in += len; + } else if (buffer_string_is_empty(b)) { + /* unused buffer: can't remove chunk easily from + * end of list, so just reset the buffer + */ + buffer_reset(b); + } +} + +void chunkqueue_set_tempdirs_default (array *tempdirs, unsigned int upload_temp_file_size) { + chunkqueue_default_tempdirs = tempdirs; + chunkqueue_default_tempfile_size + = (0 == upload_temp_file_size) ? DEFAULT_TEMPFILE_SIZE + : (upload_temp_file_size > MAX_TEMPFILE_SIZE) ? MAX_TEMPFILE_SIZE + : upload_temp_file_size; +} + +#if 0 +void chunkqueue_set_tempdirs(chunkqueue *cq, array *tempdirs, unsigned int upload_temp_file_size) { + force_assert(NULL != cq); cq->tempdirs = tempdirs; + cq->upload_temp_file_size + = (0 == upload_temp_file_size) ? DEFAULT_TEMPFILE_SIZE + : (upload_temp_file_size > MAX_TEMPFILE_SIZE) ? MAX_TEMPFILE_SIZE + : upload_temp_file_size; + cq->tempdir_idx = 0; +} +#endif - return 0; +void chunkqueue_steal(chunkqueue *dest, chunkqueue *src, off_t len) { + while (len > 0) { + chunk *c = src->first; + off_t clen = 0, use; + + if (NULL == c) break; + + clen = chunk_remaining_length(c); + if (0 == clen) { + /* drop empty chunk */ + src->first = c->next; + if (c == src->last) src->last = NULL; + chunkqueue_push_unused_chunk(src, c); + continue; + } + + use = len >= clen ? clen : len; + len -= use; + + if (use == clen) { + /* move complete chunk */ + src->first = c->next; + if (c == src->last) src->last = NULL; + + chunkqueue_append_chunk(dest, c); + } else { + /* partial chunk with length "use" */ + + switch (c->type) { + case MEM_CHUNK: + chunkqueue_append_mem(dest, c->mem->ptr + c->offset, use); + break; + case FILE_CHUNK: + /* tempfile flag is in "last" chunk after the split */ + chunkqueue_append_file(dest, c->file.name, c->file.start + c->offset, use); + break; + } + + c->offset += use; + force_assert(0 == len); + } + + src->bytes_out += use; + } } -chunk *chunkqueue_get_append_tempfile(chunkqueue *cq) { +static chunk *chunkqueue_get_append_tempfile(chunkqueue *cq) { chunk *c; buffer *template = buffer_init_string("/var/tmp/lighttpd-upload-XXXXXX"); + int fd = -1; - c = chunkqueue_get_unused_chunk(cq); - - c->type = FILE_CHUNK; - c->offset = 0; - if (cq->tempdirs && cq->tempdirs->used) { - size_t i; - /* we have several tempdirs, only if all of them fail we jump out */ - for (i = 0; i < cq->tempdirs->used; i++) { - data_string *ds = (data_string *)cq->tempdirs->data[i]; + for (errno = EIO; cq->tempdir_idx < cq->tempdirs->used; ++cq->tempdir_idx) { + data_string *ds = (data_string *)cq->tempdirs->data[cq->tempdir_idx]; - buffer_copy_string_buffer(template, ds->value); - BUFFER_APPEND_SLASH(template); + buffer_copy_buffer(template, ds->value); + buffer_append_slash(template); buffer_append_string_len(template, CONST_STR_LEN("lighttpd-upload-XXXXXX")); - if (-1 != (c->file.fd = mkstemp(template->ptr))) { - /* only trigger the unlink if we created the temp-file successfully */ - c->file.is_temp = 1; - break; - } + if (-1 != (fd = mkstemp(template->ptr))) break; } } else { - if (-1 != (c->file.fd = mkstemp(template->ptr))) { - /* only trigger the unlink if we created the temp-file successfully */ - c->file.is_temp = 1; - } + fd = mkstemp(template->ptr); } - buffer_copy_string_buffer(c->file.name, template); + if (fd < 0) { + buffer_free(template); + return NULL; + } + + c = chunkqueue_get_unused_chunk(cq); + c->type = FILE_CHUNK; + c->file.fd = fd; + c->file.is_temp = 1; + buffer_copy_buffer(c->file.name, template); c->file.length = 0; chunkqueue_append_chunk(cq, c); @@ -319,82 +493,238 @@ chunk *chunkqueue_get_append_tempfile(chunkqueue *cq) return c; } +static void chunkqueue_remove_empty_chunks(chunkqueue *cq); -off_t chunkqueue_length(chunkqueue *cq) { - off_t len = 0; - chunk *c; +int chunkqueue_append_mem_to_tempfile(server *srv, chunkqueue *dest, const char *mem, size_t len) { + chunk *dst_c; + ssize_t written; - for (c = cq->first; c; c = c->next) { + do { + /* + * if the last chunk is + * - smaller than dest->upload_temp_file_size + * - not read yet (offset == 0) + * -> append to it (so it might actually become larger than dest->upload_temp_file_size) + * otherwise + * -> create a new chunk + * + * */ + + dst_c = dest->last; + if (NULL != dst_c + && FILE_CHUNK == dst_c->type + && dst_c->file.is_temp + && dst_c->file.fd >= 0 + && 0 == dst_c->offset) { + /* ok, take the last chunk for our job */ + + if (dst_c->file.length >= (off_t)dest->upload_temp_file_size) { + /* the chunk is too large now, close it */ + int rc = close(dst_c->file.fd); + dst_c->file.fd = -1; + if (0 != rc) { + log_error_write(srv, __FILE__, __LINE__, "sbss", + "close() temp-file", dst_c->file.name, "failed:", + strerror(errno)); + return -1; + } + dst_c = NULL; + } + } else { + dst_c = NULL; + } + + if (NULL == dst_c && NULL == (dst_c = chunkqueue_get_append_tempfile(dest))) { + /* we don't have file to write to, + * EACCES might be one reason. + * + * Instead of sending 500 we send 413 and say the request is too large + */ + + log_error_write(srv, __FILE__, __LINE__, "ss", + "opening temp-file failed:", strerror(errno)); + + return -1; + } + + written = write(dst_c->file.fd, mem, len); + + if ((size_t) written == len) { + dst_c->file.length += len; + dest->bytes_in += len; + + return 0; + } else if (written >= 0) { + /*(assume EINTR if partial write and retry write(); + * retry write() might fail with ENOSPC if no more space on volume)*/ + dest->bytes_in += written; + mem += written; + len -= (size_t)written; + dst_c->file.length += (size_t)written; + /* continue; retry */ + } else if (errno == EINTR) { + /* continue; retry */ + } else { + int retry = (errno == ENOSPC && dest->tempdirs && ++dest->tempdir_idx < dest->tempdirs->used); + if (!retry) { + log_error_write(srv, __FILE__, __LINE__, "sbs", + "write() temp-file", dst_c->file.name, "failed:", + strerror(errno)); + } + + if (0 == chunk_remaining_length(dst_c)) { + /*(remove empty chunk and unlink tempfile)*/ + chunkqueue_remove_empty_chunks(dest); + } else {/*(close tempfile; avoid later attempts to append)*/ + int rc = close(dst_c->file.fd); + dst_c->file.fd = -1; + if (0 != rc) { + log_error_write(srv, __FILE__, __LINE__, "sbss", + "close() temp-file", dst_c->file.name, "failed:", + strerror(errno)); + return -1; + } + } + if (!retry) return -1; + + /* continue; retry */ + } + + } while (dst_c); + + return -1; /*(not reached)*/ +} + +int chunkqueue_steal_with_tempfiles(server *srv, chunkqueue *dest, chunkqueue *src, off_t len) { + while (len > 0) { + chunk *c = src->first; + off_t clen = 0, use; + + if (NULL == c) break; + + clen = chunk_remaining_length(c); + if (0 == clen) { + /* drop empty chunk */ + src->first = c->next; + if (c == src->last) src->last = NULL; + chunkqueue_push_unused_chunk(src, c); + continue; + } + + use = (len >= clen) ? clen : len; + len -= use; + switch (c->type) { - case MEM_CHUNK: - len += c->mem->used ? c->mem->used - 1 : 0; - break; case FILE_CHUNK: - len += c->file.length; + if (use == clen) { + /* move complete chunk */ + src->first = c->next; + if (c == src->last) src->last = NULL; + chunkqueue_append_chunk(dest, c); + } else { + /* partial chunk with length "use" */ + /* tempfile flag is in "last" chunk after the split */ + chunkqueue_append_file(dest, c->file.name, c->file.start + c->offset, use); + + c->offset += use; + force_assert(0 == len); + } break; - default: + + case MEM_CHUNK: + /* store "use" bytes from memory chunk in tempfile */ + if (0 != chunkqueue_append_mem_to_tempfile(srv, dest, c->mem->ptr + c->offset, use)) { + return -1; + } + + if (use == clen) { + /* finished chunk */ + src->first = c->next; + if (c == src->last) src->last = NULL; + chunkqueue_push_unused_chunk(src, c); + } else { + /* partial chunk */ + c->offset += use; + force_assert(0 == len); + } break; } + + src->bytes_out += use; } - return len; + return 0; } -off_t chunkqueue_written(chunkqueue *cq) { +off_t chunkqueue_length(chunkqueue *cq) { off_t len = 0; chunk *c; for (c = cq->first; c; c = c->next) { - switch (c->type) { - case MEM_CHUNK: - case FILE_CHUNK: - len += c->offset; - break; - default: - break; - } + len += chunk_remaining_length(c); } return len; } int chunkqueue_is_empty(chunkqueue *cq) { - return cq->first ? 0 : 1; + return NULL == cq->first; } -int chunkqueue_remove_finished_chunks(chunkqueue *cq) { +void chunkqueue_mark_written(chunkqueue *cq, off_t len) { + off_t written = len; chunk *c; + force_assert(len >= 0); - for (c = cq->first; c; c = cq->first) { - int is_finished = 0; + for (c = cq->first; NULL != c; c = cq->first) { + off_t c_len = chunk_remaining_length(c); - switch (c->type) { - case MEM_CHUNK: - if (c->mem->used == 0 || (c->offset == (off_t)c->mem->used - 1)) is_finished = 1; - break; - case FILE_CHUNK: - if (c->offset == c->file.length) is_finished = 1; - break; - default: - break; + if (0 == written && 0 != c_len) break; /* no more finished chunks */ + + if (written >= c_len) { /* chunk got finished */ + c->offset += c_len; + written -= c_len; + + cq->first = c->next; + if (c == cq->last) cq->last = NULL; + + chunkqueue_push_unused_chunk(cq, c); + } else { /* partial chunk */ + c->offset += written; + written = 0; + break; /* chunk not finished */ } + } - if (!is_finished) break; + force_assert(0 == written); + cq->bytes_out += len; +} - chunk_reset(c); +void chunkqueue_remove_finished_chunks(chunkqueue *cq) { + chunk *c; + for (c = cq->first; c; c = cq->first) { + if (0 != chunk_remaining_length(c)) break; /* not finished yet */ + cq->first = c->next; if (c == cq->last) cq->last = NULL; - /* keep at max 4 chunks in the 'unused'-cache */ - if (cq->unused_chunks > 4) { - chunk_free(c); - } else { - c->next = cq->unused; - cq->unused = c; - cq->unused_chunks++; - } + chunkqueue_push_unused_chunk(cq, c); } +} - return 0; +static void chunkqueue_remove_empty_chunks(chunkqueue *cq) { + chunk *c; + chunkqueue_remove_finished_chunks(cq); + if (chunkqueue_is_empty(cq)) return; + + for (c = cq->first; c->next; c = c->next) { + if (0 == chunk_remaining_length(c->next)) { + chunk *empty = c->next; + c->next = empty->next; + if (empty == cq->last) cq->last = c; + + chunkqueue_push_unused_chunk(cq, empty); + } + } }