--- embedaddon/rsync/token.c 2016/11/01 09:54:32 1.1.1.3 +++ embedaddon/rsync/token.c 2021/03/17 00:32:36 1.1.1.4 @@ -3,7 +3,7 @@ * * Copyright (C) 1996 Andrew Tridgell * Copyright (C) 1996 Paul Mackerras - * Copyright (C) 2003-2015 Wayne Davison + * Copyright (C) 2003-2020 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -22,15 +22,27 @@ #include "rsync.h" #include "itypes.h" #include +#ifdef SUPPORT_ZSTD +#include +#endif +#ifdef SUPPORT_LZ4 +#include +#endif extern int do_compression; extern int protocol_version; extern int module_id; -extern int def_compress_level; +extern int do_compression_level; extern char *skip_compress; -static int compression_level, per_file_default_level; +#ifndef Z_INSERT_ONLY +#define Z_INSERT_ONLY Z_SYNC_FLUSH +#endif +static int compression_level; /* The compression level for the current file. */ +static int skip_compression_level; /* The least possible compressing for handling skip-compress files. */ +static int per_file_default_level; /* The default level that each new file gets prior to checking its suffix. */ + struct suffix_tree { struct suffix_tree *sibling; struct suffix_tree *child; @@ -40,6 +52,58 @@ struct suffix_tree { static char *match_list; static struct suffix_tree *suftree; +void init_compression_level(void) +{ + int min_level, max_level, def_level, off_level; + + switch (do_compression) { + case CPRES_NONE: + return; + case CPRES_ZLIB: + case CPRES_ZLIBX: + min_level = 1; + max_level = Z_BEST_COMPRESSION; + def_level = 6; /* Z_DEFAULT_COMPRESSION is -1, so set it to the real default */ + off_level = skip_compression_level = Z_NO_COMPRESSION; + if (do_compression_level == Z_DEFAULT_COMPRESSION) + do_compression_level = def_level; + break; +#ifdef SUPPORT_ZSTD + case CPRES_ZSTD: + min_level = skip_compression_level = ZSTD_minCLevel(); + max_level = ZSTD_maxCLevel(); + def_level = ZSTD_CLEVEL_DEFAULT; + off_level = CLVL_NOT_SPECIFIED; + if (do_compression_level == 0) + do_compression_level = def_level; + break; +#endif +#ifdef SUPPORT_LZ4 + case CPRES_LZ4: + min_level = skip_compression_level = 0; + max_level = 0; + def_level = 0; + off_level = CLVL_NOT_SPECIFIED; + break; +#endif + default: /* paranoia to prevent missing case values */ + NOISY_DEATH("Unknown do_compression value"); + } + + if (do_compression_level == CLVL_NOT_SPECIFIED) + do_compression_level = def_level; + else if (do_compression_level == off_level) { + do_compression = CPRES_NONE; + return; + } + + /* We don't bother with any errors or warnings -- just make sure that the values are valid. */ + if (do_compression_level < min_level) + do_compression_level = min_level; + else if (do_compression_level > max_level) + do_compression_level = max_level; +} + static void add_suffix(struct suffix_tree **prior, char ltr, const char *str) { struct suffix_tree *node, *newnode; @@ -65,8 +129,7 @@ static void add_suffix(struct suffix_tree **prior, cha if (node->letter > ltr) break; } - if (!(newnode = new(struct suffix_tree))) - out_of_memory("add_suffix"); + newnode = new(struct suffix_tree); newnode->sibling = node; newnode->child = NULL; newnode->letter = ltr; @@ -83,8 +146,7 @@ static void add_nocompress_suffixes(const char *str) char *buf, *t; const char *f = str; - if (!(buf = new_array(char, strlen(f) + 1))) - out_of_memory("add_nocompress_suffixes"); + buf = new_array(char, strlen(f) + 1); while (*f) { if (*f == '/') { @@ -122,10 +184,9 @@ static void init_set_compression(void) else f = lp_dont_compress(module_id); - if (!(match_list = t = new_array(char, strlen(f) + 2))) - out_of_memory("set_compression"); + match_list = t = new_array(char, strlen(f) + 2); - per_file_default_level = def_compress_level; + per_file_default_level = do_compression_level; while (*f) { if (*f == ' ') { @@ -146,7 +207,7 @@ static void init_set_compression(void) /* Optimize a match-string of "*". */ *match_list = '\0'; suftree = NULL; - per_file_default_level = 0; + per_file_default_level = skip_compression_level; break; } @@ -183,7 +244,7 @@ void set_compression(const char *fname) for (s = match_list; *s; s += strlen(s) + 1) { if (iwildmatch(s, fname)) { - compression_level = 0; + compression_level = skip_compression_level; return; } } @@ -203,7 +264,7 @@ void set_compression(const char *fname) } if ((ltr = *++s) == '\0') { if (node->word_end) - compression_level = 0; + compression_level = skip_compression_level; return; } if (!(node = node->child)) @@ -218,11 +279,8 @@ static int32 simple_recv_token(int f, char **data) static char *buf; int32 n; - if (!buf) { + if (!buf) buf = new_array(char, CHUNK_SIZE); - if (!buf) - out_of_memory("simple_recv_token"); - } if (residue == 0) { int32 i = read_int(f); @@ -239,8 +297,7 @@ static int32 simple_recv_token(int f, char **data) } /* non-compressing send token */ -static void simple_send_token(int f, int32 token, struct map_struct *buf, - OFF_T offset, int32 n) +static void simple_send_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 n) { if (n > 0) { int32 len = 0; @@ -293,11 +350,10 @@ static char *obuf; /* Send a deflated token */ static void -send_deflated_token(int f, int32 token, struct map_struct *buf, OFF_T offset, - int32 nb, int32 toklen) +send_deflated_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb, int32 toklen) { - int32 n, r; static int init_done, flush_pending; + int32 n, r; if (last_token == -1) { /* initialization */ @@ -311,8 +367,7 @@ send_deflated_token(int f, int32 token, struct map_str rprintf(FERROR, "compression init failed\n"); exit_cleanup(RERR_PROTOCOL); } - if ((obuf = new_array(char, OBUF_SIZE)) == NULL) - out_of_memory("send_deflated_token"); + obuf = new_array(char, OBUF_SIZE); init_done = 1; } else deflateReset(&tx_strm); @@ -321,8 +376,7 @@ send_deflated_token(int f, int32 token, struct map_str flush_pending = 0; } else if (last_token == -2) { run_start = token; - } else if (nb != 0 || token != last_token + 1 - || token >= run_start + 65536) { + } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) { /* output previous run */ r = run_start - last_run_end; n = last_token - run_start; @@ -366,8 +420,7 @@ send_deflated_token(int f, int32 token, struct map_str * buffer, in case they are the * last 4. Move them to the front. */ - memcpy(tx_strm.next_out, - obuf+MAX_DATA_COUNT-2, 4); + memcpy(tx_strm.next_out, obuf+MAX_DATA_COUNT-2, 4); tx_strm.next_out += 4; tx_strm.avail_out -= 4; } @@ -402,10 +455,9 @@ send_deflated_token(int f, int32 token, struct map_str if (token == -1) { /* end of file - clean up */ write_byte(f, END_FLAG); - } else if (token != -2 && do_compression == 1) { + } else if (token != -2 && do_compression == CPRES_ZLIB) { /* Add the data in the current block to the compressor's * history and hash table. */ -#ifndef EXTERNAL_ZLIB do { /* Break up long sections in the same way that * see_deflate_token() does. */ @@ -424,11 +476,6 @@ send_deflated_token(int f, int32 token, struct map_str exit_cleanup(RERR_STREAMIO); } } while (toklen > 0); -#else - toklen++; - rprintf(FERROR, "Impossible error in external-zlib code (1).\n"); - exit_cleanup(RERR_STREAMIO); -#endif } } @@ -463,9 +510,8 @@ static int32 recv_deflated_token(int f, char **data) rprintf(FERROR, "inflate init failed\n"); exit_cleanup(RERR_PROTOCOL); } - if (!(cbuf = new_array(char, MAX_DATA_COUNT)) - || !(dbuf = new_array(char, AVAIL_OUT_SIZE(CHUNK_SIZE)))) - out_of_memory("recv_deflated_token"); + cbuf = new_array(char, MAX_DATA_COUNT); + dbuf = new_array(char, AVAIL_OUT_SIZE(CHUNK_SIZE)); init_done = 1; } else { inflateReset(&rx_strm); @@ -579,7 +625,6 @@ static int32 recv_deflated_token(int f, char **data) */ static void see_deflate_token(char *buf, int32 len) { -#ifndef EXTERNAL_ZLIB int r; int32 blklen; unsigned char hdr[5]; @@ -617,13 +662,366 @@ static void see_deflate_token(char *buf, int32 len) exit_cleanup(RERR_STREAMIO); } } while (len || rx_strm.avail_out == 0); -#else - buf++; len++; - rprintf(FERROR, "Impossible error in external-zlib code (2).\n"); - exit_cleanup(RERR_STREAMIO); -#endif } +#ifdef SUPPORT_ZSTD + +static ZSTD_inBuffer zstd_in_buff; +static ZSTD_outBuffer zstd_out_buff; +static ZSTD_CCtx *zstd_cctx; + +static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb) +{ + static int comp_init_done, flush_pending; + ZSTD_EndDirective flush = ZSTD_e_continue; + int32 n, r; + + /* initialization */ + if (!comp_init_done) { + zstd_cctx = ZSTD_createCCtx(); + if (!zstd_cctx) { + rprintf(FERROR, "compression init failed\n"); + exit_cleanup(RERR_PROTOCOL); + } + + obuf = new_array(char, OBUF_SIZE); + + ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level); + zstd_out_buff.dst = obuf + 2; + + comp_init_done = 1; + } + + if (last_token == -1) { + last_run_end = 0; + run_start = token; + flush_pending = 0; + } else if (last_token == -2) { + run_start = token; + } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) { + /* output previous run */ + r = run_start - last_run_end; + n = last_token - run_start; + + if (r >= 0 && r <= 63) { + write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r); + } else { + write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG)); + write_int(f, run_start); + } + if (n != 0) { + write_byte(f, n); + write_byte(f, n >> 8); + } + last_run_end = last_token; + run_start = token; + } + + last_token = token; + + if (nb || flush_pending) { + + zstd_in_buff.src = map_ptr(buf, offset, nb); + zstd_in_buff.size = nb; + zstd_in_buff.pos = 0; + + do { + if (zstd_out_buff.size == 0) { + zstd_out_buff.size = MAX_DATA_COUNT; + zstd_out_buff.pos = 0; + } + + /* File ended, flush */ + if (token != -2) + flush = ZSTD_e_flush; + + r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush); + if (ZSTD_isError(r)) { + rprintf(FERROR, "ZSTD_compressStream returned %d\n", r); + exit_cleanup(RERR_STREAMIO); + } + + /* + * Nothing is sent if the buffer isn't full so avoid smaller + * transfers. If a file is finished then we flush the internal + * state and send a smaller buffer so that the remote side can + * finish the file. + */ + if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) { + n = zstd_out_buff.pos; + + obuf[0] = DEFLATED_DATA + (n >> 8); + obuf[1] = n; + write_buf(f, obuf, n+2); + + zstd_out_buff.size = 0; + } + /* + * Loop while the input buffer isn't full consumed or the + * internal state isn't fully flushed. + */ + } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0); + flush_pending = token == -2; + } + + if (token == -1) { + /* end of file - clean up */ + write_byte(f, END_FLAG); + } +} + +static ZSTD_DCtx *zstd_dctx; + +static int32 recv_zstd_token(int f, char **data) +{ + static int decomp_init_done; + static int out_buffer_size; + int32 n, flag; + int r; + + if (!decomp_init_done) { + zstd_dctx = ZSTD_createDCtx(); + if (!zstd_dctx) { + rprintf(FERROR, "ZSTD_createDStream failed\n"); + exit_cleanup(RERR_PROTOCOL); + } + + /* Output buffer fits two decompressed blocks */ + out_buffer_size = ZSTD_DStreamOutSize() * 2; + cbuf = new_array(char, MAX_DATA_COUNT); + dbuf = new_array(char, out_buffer_size); + + zstd_in_buff.src = cbuf; + zstd_out_buff.dst = dbuf; + + decomp_init_done = 1; + } + + for (;;) { + switch (recv_state) { + case r_init: + recv_state = r_idle; + rx_token = 0; + break; + + case r_idle: + flag = read_byte(f); + if ((flag & 0xC0) == DEFLATED_DATA) { + n = ((flag & 0x3f) << 8) + read_byte(f); + read_buf(f, cbuf, n); + + zstd_in_buff.size = n; + zstd_in_buff.pos = 0; + + recv_state = r_inflating; + break; + } + + if (flag == END_FLAG) { + /* that's all folks */ + recv_state = r_init; + return 0; + } + /* here we have a token of some kind */ + if (flag & TOKEN_REL) { + rx_token += flag & 0x3f; + flag >>= 6; + } else + rx_token = read_int(f); + if (flag & 1) { + rx_run = read_byte(f); + rx_run += read_byte(f) << 8; + recv_state = r_running; + } + return -1 - rx_token; + + case r_inflated: /* zstd doesn't get into this state */ + break; + + case r_inflating: + zstd_out_buff.size = out_buffer_size; + zstd_out_buff.pos = 0; + + r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff); + n = zstd_out_buff.pos; + if (ZSTD_isError(r)) { + rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n); + exit_cleanup(RERR_STREAMIO); + } + + /* + * If the input buffer is fully consumed and the output + * buffer is not full then next step is to read more + * data. + */ + if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size) + recv_state = r_idle; + + if (n != 0) { + *data = dbuf; + return n; + } + break; + + case r_running: + ++rx_token; + if (--rx_run == 0) + recv_state = r_idle; + return -1 - rx_token; + } + } +} +#endif /* SUPPORT_ZSTD */ + +#ifdef SUPPORT_LZ4 +static void +send_compressed_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb) +{ + static int init_done, flush_pending; + int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2); + int32 n, r; + + if (last_token == -1) { + if (!init_done) { + obuf = new_array(char, size); + init_done = 1; + } + last_run_end = 0; + run_start = token; + flush_pending = 0; + } else if (last_token == -2) { + run_start = token; + } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) { + /* output previous run */ + r = run_start - last_run_end; + n = last_token - run_start; + if (r >= 0 && r <= 63) { + write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r); + } else { + write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG)); + write_int(f, run_start); + } + if (n != 0) { + write_byte(f, n); + write_byte(f, n >> 8); + } + last_run_end = last_token; + run_start = token; + } + + last_token = token; + + if (nb != 0 || flush_pending) { + int available_in, available_out = 0; + const char *next_in; + + do { + char *next_out = obuf + 2; + + if (available_out == 0) { + available_in = MIN(nb, MAX_DATA_COUNT); + next_in = map_ptr(buf, offset, available_in); + } else + available_in /= 2; + + available_out = LZ4_compress_default(next_in, next_out, available_in, size - 2); + if (!available_out) { + rprintf(FERROR, "compress returned %d\n", available_out); + exit_cleanup(RERR_STREAMIO); + } + if (available_out <= MAX_DATA_COUNT) { + obuf[0] = DEFLATED_DATA + (available_out >> 8); + obuf[1] = available_out; + + write_buf(f, obuf, available_out + 2); + + available_out = 0; + nb -= available_in; + offset += available_in; + } + } while (nb != 0); + flush_pending = token == -2; + } + if (token == -1) { + /* end of file - clean up */ + write_byte(f, END_FLAG); + } +} + +static int32 recv_compressed_token(int f, char **data) +{ + static int init_done; + int32 n, flag; + int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2); + static const char *next_in; + static int avail_in; + int avail_out; + + for (;;) { + switch (recv_state) { + case r_init: + if (!init_done) { + cbuf = new_array(char, MAX_DATA_COUNT); + dbuf = new_array(char, size); + init_done = 1; + } + recv_state = r_idle; + rx_token = 0; + break; + + case r_idle: + flag = read_byte(f); + if ((flag & 0xC0) == DEFLATED_DATA) { + n = ((flag & 0x3f) << 8) + read_byte(f); + read_buf(f, cbuf, n); + next_in = (char *)cbuf; + avail_in = n; + recv_state = r_inflating; + break; + } + + if (flag == END_FLAG) { + /* that's all folks */ + recv_state = r_init; + return 0; + } + + /* here we have a token of some kind */ + if (flag & TOKEN_REL) { + rx_token += flag & 0x3f; + flag >>= 6; + } else + rx_token = read_int(f); + if (flag & 1) { + rx_run = read_byte(f); + rx_run += read_byte(f) << 8; + recv_state = r_running; + } + return -1 - rx_token; + + case r_inflating: + avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, size); + if (avail_out < 0) { + rprintf(FERROR, "uncompress failed: %d\n", avail_out); + exit_cleanup(RERR_STREAMIO); + } + recv_state = r_idle; + *data = dbuf; + return avail_out; + + case r_inflated: /* lz4 doesn't get into this state */ + break; + + case r_running: + ++rx_token; + if (--rx_run == 0) + recv_state = r_idle; + return -1 - rx_token; + } + } +} +#endif /* SUPPORT_LZ4 */ + /** * Transmit a verbatim buffer of length @p n followed by a token. * If token == -1 then we have reached EOF @@ -632,28 +1030,54 @@ static void see_deflate_token(char *buf, int32 len) void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 n, int32 toklen) { - if (!do_compression) + switch (do_compression) { + case CPRES_NONE: simple_send_token(f, token, buf, offset, n); - else + break; + case CPRES_ZLIB: + case CPRES_ZLIBX: send_deflated_token(f, token, buf, offset, n, toklen); + break; +#ifdef SUPPORT_ZSTD + case CPRES_ZSTD: + send_zstd_token(f, token, buf, offset, n); + break; +#endif +#ifdef SUPPORT_LZ4 + case CPRES_LZ4: + send_compressed_token(f, token, buf, offset, n); + break; +#endif + default: + NOISY_DEATH("Unknown do_compression value"); + } } /* - * receive a token or buffer from the other end. If the reurn value is >0 then + * receive a token or buffer from the other end. If the return value is >0 then * it is a data buffer of that length, and *data will point at the data. * if the return value is -i then it represents token i-1 * if the return value is 0 then the end has been reached */ int32 recv_token(int f, char **data) { - int tok; - - if (!do_compression) { - tok = simple_recv_token(f,data); - } else { - tok = recv_deflated_token(f, data); + switch (do_compression) { + case CPRES_NONE: + return simple_recv_token(f,data); + case CPRES_ZLIB: + case CPRES_ZLIBX: + return recv_deflated_token(f, data); +#ifdef SUPPORT_ZSTD + case CPRES_ZSTD: + return recv_zstd_token(f, data); +#endif +#ifdef SUPPORT_LZ4 + case CPRES_LZ4: + return recv_compressed_token(f, data); +#endif + default: + NOISY_DEATH("Unknown do_compression value"); } - return tok; } /* @@ -661,6 +1085,24 @@ int32 recv_token(int f, char **data) */ void see_token(char *data, int32 toklen) { - if (do_compression == 1) + switch (do_compression) { + case CPRES_NONE: + break; + case CPRES_ZLIB: see_deflate_token(data, toklen); + break; + case CPRES_ZLIBX: + break; +#ifdef SUPPORT_ZSTD + case CPRES_ZSTD: + break; +#endif +#ifdef SUPPORT_LZ4 + case CPRES_LZ4: + /*see_uncompressed_token(data, toklen);*/ + break; +#endif + default: + NOISY_DEATH("Unknown do_compression value"); + } }