File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / rsync / token.c
Revision 1.1.1.4 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Mar 17 00:32:36 2021 UTC (3 years, 3 months ago) by misho
Branches: rsync, MAIN
CVS tags: v3_2_3, HEAD
rsync 3.2.3

    1: /*
    2:  * Routines used by the file-transfer code.
    3:  *
    4:  * Copyright (C) 1996 Andrew Tridgell
    5:  * Copyright (C) 1996 Paul Mackerras
    6:  * Copyright (C) 2003-2020 Wayne Davison
    7:  *
    8:  * This program is free software; you can redistribute it and/or modify
    9:  * it under the terms of the GNU General Public License as published by
   10:  * the Free Software Foundation; either version 3 of the License, or
   11:  * (at your option) any later version.
   12:  *
   13:  * This program is distributed in the hope that it will be useful,
   14:  * but WITHOUT ANY WARRANTY; without even the implied warranty of
   15:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   16:  * GNU General Public License for more details.
   17:  *
   18:  * You should have received a copy of the GNU General Public License along
   19:  * with this program; if not, visit the http://fsf.org website.
   20:  */
   21: 
   22: #include "rsync.h"
   23: #include "itypes.h"
   24: #include <zlib.h>
   25: #ifdef SUPPORT_ZSTD
   26: #include <zstd.h>
   27: #endif
   28: #ifdef SUPPORT_LZ4
   29: #include <lz4.h>
   30: #endif
   31: 
   32: extern int do_compression;
   33: extern int protocol_version;
   34: extern int module_id;
   35: extern int do_compression_level;
   36: extern char *skip_compress;
   37: 
   38: #ifndef Z_INSERT_ONLY
   39: #define Z_INSERT_ONLY Z_SYNC_FLUSH
   40: #endif
   41: 
   42: static int compression_level; /* The compression level for the current file. */
   43: static int skip_compression_level; /* The least possible compressing for handling skip-compress files. */
   44: static int per_file_default_level; /* The default level that each new file gets prior to checking its suffix. */
   45: 
   46: struct suffix_tree {
   47: 	struct suffix_tree *sibling;
   48: 	struct suffix_tree *child;
   49: 	char letter, word_end;
   50: };
   51: 
   52: static char *match_list;
   53: static struct suffix_tree *suftree;
   54: 
   55: void init_compression_level(void)
   56: {
   57: 	int min_level, max_level, def_level, off_level;
   58: 
   59: 	switch (do_compression) {
   60: 	case CPRES_NONE:
   61: 		return;
   62: 	case CPRES_ZLIB:
   63: 	case CPRES_ZLIBX:
   64: 		min_level = 1;
   65: 		max_level = Z_BEST_COMPRESSION;
   66: 		def_level = 6; /* Z_DEFAULT_COMPRESSION is -1, so set it to the real default */
   67: 		off_level = skip_compression_level = Z_NO_COMPRESSION;
   68: 		if (do_compression_level == Z_DEFAULT_COMPRESSION)
   69: 			do_compression_level = def_level;
   70: 		break;
   71: #ifdef SUPPORT_ZSTD
   72: 	case CPRES_ZSTD:
   73: 		min_level = skip_compression_level = ZSTD_minCLevel();
   74: 		max_level = ZSTD_maxCLevel();
   75: 		def_level = ZSTD_CLEVEL_DEFAULT;
   76: 		off_level = CLVL_NOT_SPECIFIED;
   77: 		if (do_compression_level == 0)
   78: 			do_compression_level = def_level;
   79: 		break;
   80: #endif
   81: #ifdef SUPPORT_LZ4
   82: 	case CPRES_LZ4:
   83: 		min_level = skip_compression_level = 0;
   84: 		max_level = 0;
   85: 		def_level = 0;
   86: 		off_level = CLVL_NOT_SPECIFIED;
   87: 		break;
   88: #endif
   89: 	default: /* paranoia to prevent missing case values */
   90: 		NOISY_DEATH("Unknown do_compression value");
   91: 	}
   92: 
   93: 	if (do_compression_level == CLVL_NOT_SPECIFIED)
   94: 		do_compression_level = def_level;
   95: 	else if (do_compression_level == off_level) {
   96: 		do_compression = CPRES_NONE;
   97: 		return;
   98: 	}
   99: 
  100: 	/* We don't bother with any errors or warnings -- just make sure that the values are valid. */
  101: 	if (do_compression_level < min_level)
  102: 		do_compression_level = min_level;
  103: 	else if (do_compression_level > max_level)
  104: 		do_compression_level = max_level;
  105: }
  106: 
  107: static void add_suffix(struct suffix_tree **prior, char ltr, const char *str)
  108: {
  109: 	struct suffix_tree *node, *newnode;
  110: 
  111: 	if (ltr == '[') {
  112: 		const char *after = strchr(str, ']');
  113: 		/* Treat "[foo" and "[]" as having a literal '['. */
  114: 		if (after && after++ != str+1) {
  115: 			while ((ltr = *str++) != ']')
  116: 				add_suffix(prior, ltr, after);
  117: 			return;
  118: 		}
  119: 	}
  120: 
  121: 	for (node = *prior; node; prior = &node->sibling, node = node->sibling) {
  122: 		if (node->letter == ltr) {
  123: 			if (*str)
  124: 				add_suffix(&node->child, *str, str+1);
  125: 			else
  126: 				node->word_end = 1;
  127: 			return;
  128: 		}
  129: 		if (node->letter > ltr)
  130: 			break;
  131: 	}
  132: 	newnode = new(struct suffix_tree);
  133: 	newnode->sibling = node;
  134: 	newnode->child = NULL;
  135: 	newnode->letter = ltr;
  136: 	*prior = newnode;
  137: 	if (*str) {
  138: 		add_suffix(&newnode->child, *str, str+1);
  139: 		newnode->word_end = 0;
  140: 	} else
  141: 		newnode->word_end = 1;
  142: }
  143: 
  144: static void add_nocompress_suffixes(const char *str)
  145: {
  146: 	char *buf, *t;
  147: 	const char *f = str;
  148: 
  149: 	buf = new_array(char, strlen(f) + 1);
  150: 
  151: 	while (*f) {
  152: 		if (*f == '/') {
  153: 			f++;
  154: 			continue;
  155: 		}
  156: 
  157: 		t = buf;
  158: 		do {
  159: 			if (isUpper(f))
  160: 				*t++ = toLower(f);
  161: 			else
  162: 				*t++ = *f;
  163: 		} while (*++f != '/' && *f);
  164: 		*t++ = '\0';
  165: 
  166: 		add_suffix(&suftree, *buf, buf+1);
  167: 	}
  168: 
  169: 	free(buf);
  170: }
  171: 
  172: static void init_set_compression(void)
  173: {
  174: 	const char *f;
  175: 	char *t, *start;
  176: 
  177: 	if (skip_compress)
  178: 		add_nocompress_suffixes(skip_compress);
  179: 
  180: 	/* A non-daemon transfer skips the default suffix list if the
  181: 	 * user specified --skip-compress. */
  182: 	if (skip_compress && module_id < 0)
  183: 		f = "";
  184: 	else
  185: 		f = lp_dont_compress(module_id);
  186: 
  187: 	match_list = t = new_array(char, strlen(f) + 2);
  188: 
  189: 	per_file_default_level = do_compression_level;
  190: 
  191: 	while (*f) {
  192: 		if (*f == ' ') {
  193: 			f++;
  194: 			continue;
  195: 		}
  196: 
  197: 		start = t;
  198: 		do {
  199: 			if (isUpper(f))
  200: 				*t++ = toLower(f);
  201: 			else
  202: 				*t++ = *f;
  203: 		} while (*++f != ' ' && *f);
  204: 		*t++ = '\0';
  205: 
  206: 		if (t - start == 1+1 && *start == '*') {
  207: 			/* Optimize a match-string of "*". */
  208: 			*match_list = '\0';
  209: 			suftree = NULL;
  210: 			per_file_default_level = skip_compression_level;
  211: 			break;
  212: 		}
  213: 
  214: 		/* Move *.foo items into the stuffix tree. */
  215: 		if (*start == '*' && start[1] == '.' && start[2]
  216: 		 && !strpbrk(start+2, ".?*")) {
  217: 			add_suffix(&suftree, start[2], start+3);
  218: 			t = start;
  219: 		}
  220: 	}
  221: 	*t++ = '\0';
  222: }
  223: 
  224: /* determine the compression level based on a wildcard filename list */
  225: void set_compression(const char *fname)
  226: {
  227: 	const struct suffix_tree *node;
  228: 	const char *s;
  229: 	char ltr;
  230: 
  231: 	if (!do_compression)
  232: 		return;
  233: 
  234: 	if (!match_list)
  235: 		init_set_compression();
  236: 
  237: 	compression_level = per_file_default_level;
  238: 
  239: 	if (!*match_list && !suftree)
  240: 		return;
  241: 
  242: 	if ((s = strrchr(fname, '/')) != NULL)
  243: 		fname = s + 1;
  244: 
  245: 	for (s = match_list; *s; s += strlen(s) + 1) {
  246: 		if (iwildmatch(s, fname)) {
  247: 			compression_level = skip_compression_level;
  248: 			return;
  249: 		}
  250: 	}
  251: 
  252: 	if (!(node = suftree) || !(s = strrchr(fname, '.'))
  253: 	 || s == fname || !(ltr = *++s))
  254: 		return;
  255: 
  256: 	while (1) {
  257: 		if (isUpper(&ltr))
  258: 			ltr = toLower(&ltr);
  259: 		while (node->letter != ltr) {
  260: 			if (node->letter > ltr)
  261: 				return;
  262: 			if (!(node = node->sibling))
  263: 				return;
  264: 		}
  265: 		if ((ltr = *++s) == '\0') {
  266: 			if (node->word_end)
  267: 				compression_level = skip_compression_level;
  268: 			return;
  269: 		}
  270: 		if (!(node = node->child))
  271: 			return;
  272: 	}
  273: }
  274: 
  275: /* non-compressing recv token */
  276: static int32 simple_recv_token(int f, char **data)
  277: {
  278: 	static int32 residue;
  279: 	static char *buf;
  280: 	int32 n;
  281: 
  282: 	if (!buf)
  283: 		buf = new_array(char, CHUNK_SIZE);
  284: 
  285: 	if (residue == 0) {
  286: 		int32 i = read_int(f);
  287: 		if (i <= 0)
  288: 			return i;
  289: 		residue = i;
  290: 	}
  291: 
  292: 	*data = buf;
  293: 	n = MIN(CHUNK_SIZE,residue);
  294: 	residue -= n;
  295: 	read_buf(f,buf,n);
  296: 	return n;
  297: }
  298: 
  299: /* non-compressing send token */
  300: static void simple_send_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 n)
  301: {
  302: 	if (n > 0) {
  303: 		int32 len = 0;
  304: 		while (len < n) {
  305: 			int32 n1 = MIN(CHUNK_SIZE, n-len);
  306: 			write_int(f, n1);
  307: 			write_buf(f, map_ptr(buf, offset+len, n1), n1);
  308: 			len += n1;
  309: 		}
  310: 	}
  311: 	/* a -2 token means to send data only and no token */
  312: 	if (token != -2)
  313: 		write_int(f, -(token+1));
  314: }
  315: 
  316: /* Flag bytes in compressed stream are encoded as follows: */
  317: #define END_FLAG	0	/* that's all folks */
  318: #define TOKEN_LONG	0x20	/* followed by 32-bit token number */
  319: #define TOKENRUN_LONG	0x21	/* ditto with 16-bit run count */
  320: #define DEFLATED_DATA	0x40	/* + 6-bit high len, then low len byte */
  321: #define TOKEN_REL	0x80	/* + 6-bit relative token number */
  322: #define TOKENRUN_REL	0xc0	/* ditto with 16-bit run count */
  323: 
  324: #define MAX_DATA_COUNT	16383	/* fit 14 bit count into 2 bytes with flags */
  325: 
  326: /* zlib.h says that if we want to be able to compress something in a single
  327:  * call, avail_out must be at least 0.1% larger than avail_in plus 12 bytes.
  328:  * We'll add in 0.1%+16, just to be safe (and we'll avoid floating point,
  329:  * to ensure that this is a compile-time value). */
  330: #define AVAIL_OUT_SIZE(avail_in_size) ((avail_in_size)*1001/1000+16)
  331: 
  332: /* For coding runs of tokens */
  333: static int32 last_token = -1;
  334: static int32 run_start;
  335: static int32 last_run_end;
  336: 
  337: /* Deflation state */
  338: static z_stream tx_strm;
  339: 
  340: /* Output buffer */
  341: static char *obuf;
  342: 
  343: /* We want obuf to be able to hold both MAX_DATA_COUNT+2 bytes as well as
  344:  * AVAIL_OUT_SIZE(CHUNK_SIZE) bytes, so make sure that it's large enough. */
  345: #if MAX_DATA_COUNT+2 > AVAIL_OUT_SIZE(CHUNK_SIZE)
  346: #define OBUF_SIZE	(MAX_DATA_COUNT+2)
  347: #else
  348: #define OBUF_SIZE	AVAIL_OUT_SIZE(CHUNK_SIZE)
  349: #endif
  350: 
  351: /* Send a deflated token */
  352: static void
  353: send_deflated_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb, int32 toklen)
  354: {
  355: 	static int init_done, flush_pending;
  356: 	int32 n, r;
  357: 
  358: 	if (last_token == -1) {
  359: 		/* initialization */
  360: 		if (!init_done) {
  361: 			tx_strm.next_in = NULL;
  362: 			tx_strm.zalloc = NULL;
  363: 			tx_strm.zfree = NULL;
  364: 			if (deflateInit2(&tx_strm, compression_level,
  365: 					 Z_DEFLATED, -15, 8,
  366: 					 Z_DEFAULT_STRATEGY) != Z_OK) {
  367: 				rprintf(FERROR, "compression init failed\n");
  368: 				exit_cleanup(RERR_PROTOCOL);
  369: 			}
  370: 			obuf = new_array(char, OBUF_SIZE);
  371: 			init_done = 1;
  372: 		} else
  373: 			deflateReset(&tx_strm);
  374: 		last_run_end = 0;
  375: 		run_start = token;
  376: 		flush_pending = 0;
  377: 	} else if (last_token == -2) {
  378: 		run_start = token;
  379: 	} else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
  380: 		/* output previous run */
  381: 		r = run_start - last_run_end;
  382: 		n = last_token - run_start;
  383: 		if (r >= 0 && r <= 63) {
  384: 			write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
  385: 		} else {
  386: 			write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
  387: 			write_int(f, run_start);
  388: 		}
  389: 		if (n != 0) {
  390: 			write_byte(f, n);
  391: 			write_byte(f, n >> 8);
  392: 		}
  393: 		last_run_end = last_token;
  394: 		run_start = token;
  395: 	}
  396: 
  397: 	last_token = token;
  398: 
  399: 	if (nb != 0 || flush_pending) {
  400: 		/* deflate the data starting at offset */
  401: 		int flush = Z_NO_FLUSH;
  402: 		tx_strm.avail_in = 0;
  403: 		tx_strm.avail_out = 0;
  404: 		do {
  405: 			if (tx_strm.avail_in == 0 && nb != 0) {
  406: 				/* give it some more input */
  407: 				n = MIN(nb, CHUNK_SIZE);
  408: 				tx_strm.next_in = (Bytef *)
  409: 					map_ptr(buf, offset, n);
  410: 				tx_strm.avail_in = n;
  411: 				nb -= n;
  412: 				offset += n;
  413: 			}
  414: 			if (tx_strm.avail_out == 0) {
  415: 				tx_strm.next_out = (Bytef *)(obuf + 2);
  416: 				tx_strm.avail_out = MAX_DATA_COUNT;
  417: 				if (flush != Z_NO_FLUSH) {
  418: 					/*
  419: 					 * We left the last 4 bytes in the
  420: 					 * buffer, in case they are the
  421: 					 * last 4.  Move them to the front.
  422: 					 */
  423: 					memcpy(tx_strm.next_out, obuf+MAX_DATA_COUNT-2, 4);
  424: 					tx_strm.next_out += 4;
  425: 					tx_strm.avail_out -= 4;
  426: 				}
  427: 			}
  428: 			if (nb == 0 && token != -2)
  429: 				flush = Z_SYNC_FLUSH;
  430: 			r = deflate(&tx_strm, flush);
  431: 			if (r != Z_OK) {
  432: 				rprintf(FERROR, "deflate returned %d\n", r);
  433: 				exit_cleanup(RERR_STREAMIO);
  434: 			}
  435: 			if (nb == 0 || tx_strm.avail_out == 0) {
  436: 				n = MAX_DATA_COUNT - tx_strm.avail_out;
  437: 				if (flush != Z_NO_FLUSH) {
  438: 					/*
  439: 					 * We have to trim off the last 4
  440: 					 * bytes of output when flushing
  441: 					 * (they are just 0, 0, ff, ff).
  442: 					 */
  443: 					n -= 4;
  444: 				}
  445: 				if (n > 0) {
  446: 					obuf[0] = DEFLATED_DATA + (n >> 8);
  447: 					obuf[1] = n;
  448: 					write_buf(f, obuf, n+2);
  449: 				}
  450: 			}
  451: 		} while (nb != 0 || tx_strm.avail_out == 0);
  452: 		flush_pending = token == -2;
  453: 	}
  454: 
  455: 	if (token == -1) {
  456: 		/* end of file - clean up */
  457: 		write_byte(f, END_FLAG);
  458: 	} else if (token != -2 && do_compression == CPRES_ZLIB) {
  459: 		/* Add the data in the current block to the compressor's
  460: 		 * history and hash table. */
  461: 		do {
  462: 			/* Break up long sections in the same way that
  463: 			 * see_deflate_token() does. */
  464: 			int32 n1 = toklen > 0xffff ? 0xffff : toklen;
  465: 			toklen -= n1;
  466: 			tx_strm.next_in = (Bytef *)map_ptr(buf, offset, n1);
  467: 			tx_strm.avail_in = n1;
  468: 			if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
  469: 				offset += n1;
  470: 			tx_strm.next_out = (Bytef *) obuf;
  471: 			tx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
  472: 			r = deflate(&tx_strm, Z_INSERT_ONLY);
  473: 			if (r != Z_OK || tx_strm.avail_in != 0) {
  474: 				rprintf(FERROR, "deflate on token returned %d (%d bytes left)\n",
  475: 					r, tx_strm.avail_in);
  476: 				exit_cleanup(RERR_STREAMIO);
  477: 			}
  478: 		} while (toklen > 0);
  479: 	}
  480: }
  481: 
  482: /* tells us what the receiver is in the middle of doing */
  483: static enum { r_init, r_idle, r_running, r_inflating, r_inflated } recv_state;
  484: 
  485: /* for inflating stuff */
  486: static z_stream rx_strm;
  487: static char *cbuf;
  488: static char *dbuf;
  489: 
  490: /* for decoding runs of tokens */
  491: static int32 rx_token;
  492: static int32 rx_run;
  493: 
  494: /* Receive a deflated token and inflate it */
  495: static int32 recv_deflated_token(int f, char **data)
  496: {
  497: 	static int init_done;
  498: 	static int32 saved_flag;
  499: 	int32 n, flag;
  500: 	int r;
  501: 
  502: 	for (;;) {
  503: 		switch (recv_state) {
  504: 		case r_init:
  505: 			if (!init_done) {
  506: 				rx_strm.next_out = NULL;
  507: 				rx_strm.zalloc = NULL;
  508: 				rx_strm.zfree = NULL;
  509: 				if (inflateInit2(&rx_strm, -15) != Z_OK) {
  510: 					rprintf(FERROR, "inflate init failed\n");
  511: 					exit_cleanup(RERR_PROTOCOL);
  512: 				}
  513: 				cbuf = new_array(char, MAX_DATA_COUNT);
  514: 				dbuf = new_array(char, AVAIL_OUT_SIZE(CHUNK_SIZE));
  515: 				init_done = 1;
  516: 			} else {
  517: 				inflateReset(&rx_strm);
  518: 			}
  519: 			recv_state = r_idle;
  520: 			rx_token = 0;
  521: 			break;
  522: 
  523: 		case r_idle:
  524: 		case r_inflated:
  525: 			if (saved_flag) {
  526: 				flag = saved_flag & 0xff;
  527: 				saved_flag = 0;
  528: 			} else
  529: 				flag = read_byte(f);
  530: 			if ((flag & 0xC0) == DEFLATED_DATA) {
  531: 				n = ((flag & 0x3f) << 8) + read_byte(f);
  532: 				read_buf(f, cbuf, n);
  533: 				rx_strm.next_in = (Bytef *)cbuf;
  534: 				rx_strm.avail_in = n;
  535: 				recv_state = r_inflating;
  536: 				break;
  537: 			}
  538: 			if (recv_state == r_inflated) {
  539: 				/* check previous inflated stuff ended correctly */
  540: 				rx_strm.avail_in = 0;
  541: 				rx_strm.next_out = (Bytef *)dbuf;
  542: 				rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
  543: 				r = inflate(&rx_strm, Z_SYNC_FLUSH);
  544: 				n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
  545: 				/*
  546: 				 * Z_BUF_ERROR just means no progress was
  547: 				 * made, i.e. the decompressor didn't have
  548: 				 * any pending output for us.
  549: 				 */
  550: 				if (r != Z_OK && r != Z_BUF_ERROR) {
  551: 					rprintf(FERROR, "inflate flush returned %d (%d bytes)\n",
  552: 						r, n);
  553: 					exit_cleanup(RERR_STREAMIO);
  554: 				}
  555: 				if (n != 0 && r != Z_BUF_ERROR) {
  556: 					/* have to return some more data and
  557: 					   save the flag for later. */
  558: 					saved_flag = flag + 0x10000;
  559: 					*data = dbuf;
  560: 					return n;
  561: 				}
  562: 				/*
  563: 				 * At this point the decompressor should
  564: 				 * be expecting to see the 0, 0, ff, ff bytes.
  565: 				 */
  566: 				if (!inflateSyncPoint(&rx_strm)) {
  567: 					rprintf(FERROR, "decompressor lost sync!\n");
  568: 					exit_cleanup(RERR_STREAMIO);
  569: 				}
  570: 				rx_strm.avail_in = 4;
  571: 				rx_strm.next_in = (Bytef *)cbuf;
  572: 				cbuf[0] = cbuf[1] = 0;
  573: 				cbuf[2] = cbuf[3] = 0xff;
  574: 				inflate(&rx_strm, Z_SYNC_FLUSH);
  575: 				recv_state = r_idle;
  576: 			}
  577: 			if (flag == END_FLAG) {
  578: 				/* that's all folks */
  579: 				recv_state = r_init;
  580: 				return 0;
  581: 			}
  582: 
  583: 			/* here we have a token of some kind */
  584: 			if (flag & TOKEN_REL) {
  585: 				rx_token += flag & 0x3f;
  586: 				flag >>= 6;
  587: 			} else
  588: 				rx_token = read_int(f);
  589: 			if (flag & 1) {
  590: 				rx_run = read_byte(f);
  591: 				rx_run += read_byte(f) << 8;
  592: 				recv_state = r_running;
  593: 			}
  594: 			return -1 - rx_token;
  595: 
  596: 		case r_inflating:
  597: 			rx_strm.next_out = (Bytef *)dbuf;
  598: 			rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
  599: 			r = inflate(&rx_strm, Z_NO_FLUSH);
  600: 			n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
  601: 			if (r != Z_OK) {
  602: 				rprintf(FERROR, "inflate returned %d (%d bytes)\n", r, n);
  603: 				exit_cleanup(RERR_STREAMIO);
  604: 			}
  605: 			if (rx_strm.avail_in == 0)
  606: 				recv_state = r_inflated;
  607: 			if (n != 0) {
  608: 				*data = dbuf;
  609: 				return n;
  610: 			}
  611: 			break;
  612: 
  613: 		case r_running:
  614: 			++rx_token;
  615: 			if (--rx_run == 0)
  616: 				recv_state = r_idle;
  617: 			return -1 - rx_token;
  618: 		}
  619: 	}
  620: }
  621: 
  622: /*
  623:  * put the data corresponding to a token that we've just returned
  624:  * from recv_deflated_token into the decompressor's history buffer.
  625:  */
  626: static void see_deflate_token(char *buf, int32 len)
  627: {
  628: 	int r;
  629: 	int32 blklen;
  630: 	unsigned char hdr[5];
  631: 
  632: 	rx_strm.avail_in = 0;
  633: 	blklen = 0;
  634: 	hdr[0] = 0;
  635: 	do {
  636: 		if (rx_strm.avail_in == 0 && len != 0) {
  637: 			if (blklen == 0) {
  638: 				/* Give it a fake stored-block header. */
  639: 				rx_strm.next_in = (Bytef *)hdr;
  640: 				rx_strm.avail_in = 5;
  641: 				blklen = len;
  642: 				if (blklen > 0xffff)
  643: 					blklen = 0xffff;
  644: 				hdr[1] = blklen;
  645: 				hdr[2] = blklen >> 8;
  646: 				hdr[3] = ~hdr[1];
  647: 				hdr[4] = ~hdr[2];
  648: 			} else {
  649: 				rx_strm.next_in = (Bytef *)buf;
  650: 				rx_strm.avail_in = blklen;
  651: 				if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
  652: 					buf += blklen;
  653: 				len -= blklen;
  654: 				blklen = 0;
  655: 			}
  656: 		}
  657: 		rx_strm.next_out = (Bytef *)dbuf;
  658: 		rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
  659: 		r = inflate(&rx_strm, Z_SYNC_FLUSH);
  660: 		if (r != Z_OK && r != Z_BUF_ERROR) {
  661: 			rprintf(FERROR, "inflate (token) returned %d\n", r);
  662: 			exit_cleanup(RERR_STREAMIO);
  663: 		}
  664: 	} while (len || rx_strm.avail_out == 0);
  665: }
  666: 
  667: #ifdef SUPPORT_ZSTD
  668: 
  669: static ZSTD_inBuffer zstd_in_buff;
  670: static ZSTD_outBuffer zstd_out_buff;
  671: static ZSTD_CCtx *zstd_cctx;
  672: 
  673: static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
  674: {
  675: 	static int comp_init_done, flush_pending;
  676: 	ZSTD_EndDirective flush = ZSTD_e_continue;
  677: 	int32 n, r;
  678: 
  679: 	/* initialization */
  680: 	if (!comp_init_done) {
  681: 		zstd_cctx = ZSTD_createCCtx();
  682: 		if (!zstd_cctx) {
  683: 			rprintf(FERROR, "compression init failed\n");
  684: 			exit_cleanup(RERR_PROTOCOL);
  685: 		}
  686: 
  687: 		obuf = new_array(char, OBUF_SIZE);
  688: 
  689: 		ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level);
  690: 		zstd_out_buff.dst = obuf + 2;
  691: 
  692: 		comp_init_done = 1;
  693: 	}
  694: 
  695: 	if (last_token == -1) {
  696: 		last_run_end = 0;
  697: 		run_start = token;
  698: 		flush_pending = 0;
  699: 	} else if (last_token == -2) {
  700: 		run_start = token;
  701: 	} else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
  702: 		/* output previous run */
  703: 		r = run_start - last_run_end;
  704: 		n = last_token - run_start;
  705: 
  706: 		if (r >= 0 && r <= 63) {
  707: 			write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
  708: 		} else {
  709: 			write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
  710: 			write_int(f, run_start);
  711: 		}
  712: 		if (n != 0) {
  713: 			write_byte(f, n);
  714: 			write_byte(f, n >> 8);
  715: 		}
  716: 		last_run_end = last_token;
  717: 		run_start = token;
  718: 	}
  719: 
  720: 	last_token = token;
  721: 
  722: 	if (nb || flush_pending) {
  723: 
  724: 		zstd_in_buff.src = map_ptr(buf, offset, nb);
  725: 		zstd_in_buff.size = nb;
  726: 		zstd_in_buff.pos = 0;
  727: 
  728: 		do {
  729: 			if (zstd_out_buff.size == 0) {
  730: 				zstd_out_buff.size = MAX_DATA_COUNT;
  731: 				zstd_out_buff.pos = 0;
  732: 			}
  733: 
  734: 			/* File ended, flush */
  735: 			if (token != -2)
  736: 				flush = ZSTD_e_flush;
  737: 
  738: 			r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush);
  739: 			if (ZSTD_isError(r)) {
  740: 				rprintf(FERROR, "ZSTD_compressStream returned %d\n", r);
  741: 				exit_cleanup(RERR_STREAMIO);
  742: 			}
  743: 
  744: 			/*
  745: 			 * Nothing is sent if the buffer isn't full so avoid smaller
  746: 			 * transfers. If a file is finished then we flush the internal
  747: 			 * state and send a smaller buffer so that the remote side can
  748: 			 * finish the file.
  749: 			 */
  750: 			if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
  751: 				n = zstd_out_buff.pos;
  752: 
  753: 				obuf[0] = DEFLATED_DATA + (n >> 8);
  754: 				obuf[1] = n;
  755: 				write_buf(f, obuf, n+2);
  756: 
  757: 				zstd_out_buff.size = 0;
  758: 			}
  759: 			/*
  760: 			 * Loop while the input buffer isn't full consumed or the
  761: 			 * internal state isn't fully flushed.
  762: 			 */
  763: 		} while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
  764: 		flush_pending = token == -2;
  765: 	}
  766: 
  767: 	if (token == -1) {
  768: 		/* end of file - clean up */
  769: 		write_byte(f, END_FLAG);
  770: 	}
  771: }
  772: 
  773: static ZSTD_DCtx *zstd_dctx;
  774: 
  775: static int32 recv_zstd_token(int f, char **data)
  776: {
  777: 	static int decomp_init_done;
  778: 	static int out_buffer_size;
  779: 	int32 n, flag;
  780: 	int r;
  781: 
  782: 	if (!decomp_init_done) {
  783: 		zstd_dctx = ZSTD_createDCtx();
  784: 		if (!zstd_dctx) {
  785: 			rprintf(FERROR, "ZSTD_createDStream failed\n");
  786: 			exit_cleanup(RERR_PROTOCOL);
  787: 		}
  788: 
  789: 		/* Output buffer fits two decompressed blocks */
  790: 		out_buffer_size = ZSTD_DStreamOutSize() * 2;
  791: 		cbuf = new_array(char, MAX_DATA_COUNT);
  792: 		dbuf = new_array(char, out_buffer_size);
  793: 
  794: 		zstd_in_buff.src = cbuf;
  795: 		zstd_out_buff.dst = dbuf;
  796: 
  797: 		decomp_init_done = 1;
  798: 	}
  799: 
  800: 	for (;;) {
  801: 		switch (recv_state) {
  802: 		case r_init:
  803: 			recv_state = r_idle;
  804: 			rx_token = 0;
  805: 			break;
  806: 
  807: 		case r_idle:
  808: 			flag = read_byte(f);
  809: 			if ((flag & 0xC0) == DEFLATED_DATA) {
  810: 				n = ((flag & 0x3f) << 8) + read_byte(f);
  811: 				read_buf(f, cbuf, n);
  812: 
  813: 				zstd_in_buff.size = n;
  814: 				zstd_in_buff.pos = 0;
  815: 
  816: 				recv_state = r_inflating;
  817: 				break;
  818: 			}
  819: 
  820: 			if (flag == END_FLAG) {
  821: 				/* that's all folks */
  822: 				recv_state = r_init;
  823: 				return 0;
  824: 			}
  825: 			/* here we have a token of some kind */
  826: 			if (flag & TOKEN_REL) {
  827: 				rx_token += flag & 0x3f;
  828: 				flag >>= 6;
  829: 			} else
  830: 				rx_token = read_int(f);
  831: 			if (flag & 1) {
  832: 				rx_run = read_byte(f);
  833: 				rx_run += read_byte(f) << 8;
  834: 				recv_state = r_running;
  835: 			}
  836: 			return -1 - rx_token;
  837: 
  838: 		case r_inflated: /* zstd doesn't get into this state */
  839: 			break;
  840: 
  841: 		case r_inflating:
  842: 			zstd_out_buff.size = out_buffer_size;
  843: 			zstd_out_buff.pos = 0;
  844: 
  845: 			r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff);
  846: 			n = zstd_out_buff.pos;
  847: 			if (ZSTD_isError(r)) {
  848: 				rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n);
  849: 				exit_cleanup(RERR_STREAMIO);
  850: 			}
  851: 
  852: 			/*
  853: 			 * If the input buffer is fully consumed and the output
  854: 			 * buffer is not full then next step is to read more
  855: 			 * data.
  856: 			 */
  857: 			if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size)
  858: 				recv_state = r_idle;
  859: 
  860: 			if (n != 0) {
  861: 				*data = dbuf;
  862: 				return n;
  863: 			}
  864: 			break;
  865: 
  866: 		case r_running:
  867: 			++rx_token;
  868: 			if (--rx_run == 0)
  869: 				recv_state = r_idle;
  870: 			return -1 - rx_token;
  871: 		}
  872: 	}
  873: }
  874: #endif /* SUPPORT_ZSTD */
  875: 
  876: #ifdef SUPPORT_LZ4
  877: static void
  878: send_compressed_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
  879: {
  880: 	static int init_done, flush_pending;
  881: 	int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
  882: 	int32 n, r;
  883: 
  884: 	if (last_token == -1) {
  885: 		if (!init_done) {
  886: 			obuf = new_array(char, size);
  887: 			init_done = 1;
  888: 		}
  889: 		last_run_end = 0;
  890: 		run_start = token;
  891: 		flush_pending = 0;
  892: 	} else if (last_token == -2) {
  893: 		run_start = token;
  894: 	} else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
  895: 		/* output previous run */
  896: 		r = run_start - last_run_end;
  897: 		n = last_token - run_start;
  898: 		if (r >= 0 && r <= 63) {
  899: 			write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
  900: 		} else {
  901: 			write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
  902: 			write_int(f, run_start);
  903: 		}
  904: 		if (n != 0) {
  905: 			write_byte(f, n);
  906: 			write_byte(f, n >> 8);
  907: 		}
  908: 		last_run_end = last_token;
  909: 		run_start = token;
  910: 	}
  911: 
  912: 	last_token = token;
  913: 
  914: 	if (nb != 0 || flush_pending) {
  915: 		int available_in, available_out = 0;
  916: 		const char *next_in;
  917: 
  918: 		do {
  919: 			char *next_out = obuf + 2;
  920: 
  921: 			if (available_out == 0) {
  922: 				available_in = MIN(nb, MAX_DATA_COUNT);
  923: 				next_in = map_ptr(buf, offset, available_in);
  924: 			} else
  925: 				available_in /= 2;
  926: 
  927: 			available_out = LZ4_compress_default(next_in, next_out, available_in, size - 2);
  928: 			if (!available_out) {
  929: 				rprintf(FERROR, "compress returned %d\n", available_out);
  930: 				exit_cleanup(RERR_STREAMIO);
  931: 			}
  932: 			if (available_out <= MAX_DATA_COUNT) {
  933: 				obuf[0] = DEFLATED_DATA + (available_out >> 8);
  934: 				obuf[1] = available_out;
  935: 
  936: 				write_buf(f, obuf, available_out + 2);
  937: 
  938: 				available_out = 0;
  939: 				nb -= available_in;
  940: 				offset += available_in;
  941: 			}
  942: 		} while (nb != 0);
  943: 		flush_pending = token == -2;
  944: 	}
  945: 	if (token == -1) {
  946: 		/* end of file - clean up */
  947: 		write_byte(f, END_FLAG);
  948: 	}
  949: }
  950: 
  951: static int32 recv_compressed_token(int f, char **data)
  952: {
  953: 	static int init_done;
  954: 	int32 n, flag;
  955: 	int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
  956: 	static const char *next_in;
  957: 	static int avail_in;
  958: 	int avail_out;
  959: 
  960: 	for (;;) {
  961: 		switch (recv_state) {
  962: 		case r_init:
  963: 			if (!init_done) {
  964: 				cbuf = new_array(char, MAX_DATA_COUNT);
  965: 				dbuf = new_array(char, size);
  966: 				init_done = 1;
  967: 			}
  968: 			recv_state = r_idle;
  969: 			rx_token = 0;
  970: 			break;
  971: 
  972: 		case r_idle:
  973: 			flag = read_byte(f);
  974: 			if ((flag & 0xC0) == DEFLATED_DATA) {
  975: 				n = ((flag & 0x3f) << 8) + read_byte(f);
  976: 				read_buf(f, cbuf, n);
  977: 				next_in = (char *)cbuf;
  978: 				avail_in = n;
  979: 				recv_state = r_inflating;
  980: 				break;
  981: 			}
  982: 
  983: 			if (flag == END_FLAG) {
  984: 				/* that's all folks */
  985: 				recv_state = r_init;
  986: 				return 0;
  987: 			}
  988: 
  989: 			/* here we have a token of some kind */
  990: 			if (flag & TOKEN_REL) {
  991: 				rx_token += flag & 0x3f;
  992: 				flag >>= 6;
  993: 			} else
  994: 				rx_token = read_int(f);
  995: 			if (flag & 1) {
  996: 				rx_run = read_byte(f);
  997: 				rx_run += read_byte(f) << 8;
  998: 				recv_state = r_running;
  999: 			}
 1000: 			return -1 - rx_token;
 1001: 
 1002: 		case r_inflating:
 1003: 			avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, size);
 1004: 			if (avail_out < 0) {
 1005: 				rprintf(FERROR, "uncompress failed: %d\n", avail_out);
 1006: 				exit_cleanup(RERR_STREAMIO);
 1007: 			}
 1008: 			recv_state = r_idle;
 1009: 			*data = dbuf;
 1010: 			return avail_out;
 1011: 
 1012: 		case r_inflated: /* lz4 doesn't get into this state */
 1013: 			break;
 1014: 
 1015: 		case r_running:
 1016: 			++rx_token;
 1017: 			if (--rx_run == 0)
 1018: 				recv_state = r_idle;
 1019: 			return -1 - rx_token;
 1020: 		}
 1021: 	}
 1022: }
 1023: #endif /* SUPPORT_LZ4 */
 1024: 
 1025: /**
 1026:  * Transmit a verbatim buffer of length @p n followed by a token.
 1027:  * If token == -1 then we have reached EOF
 1028:  * If n == 0 then don't send a buffer
 1029:  */
 1030: void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
 1031: 		int32 n, int32 toklen)
 1032: {
 1033: 	switch (do_compression) {
 1034: 	case CPRES_NONE:
 1035: 		simple_send_token(f, token, buf, offset, n);
 1036: 		break;
 1037: 	case CPRES_ZLIB:
 1038: 	case CPRES_ZLIBX:
 1039: 		send_deflated_token(f, token, buf, offset, n, toklen);
 1040: 		break;
 1041: #ifdef SUPPORT_ZSTD
 1042: 	case CPRES_ZSTD:
 1043: 		send_zstd_token(f, token, buf, offset, n);
 1044: 		break;
 1045: #endif
 1046: #ifdef SUPPORT_LZ4
 1047: 	case CPRES_LZ4:
 1048: 		send_compressed_token(f, token, buf, offset, n);
 1049: 		break;
 1050: #endif
 1051: 	default:
 1052: 		NOISY_DEATH("Unknown do_compression value");
 1053: 	}
 1054: }
 1055: 
 1056: /*
 1057:  * receive a token or buffer from the other end. If the return value is >0 then
 1058:  * it is a data buffer of that length, and *data will point at the data.
 1059:  * if the return value is -i then it represents token i-1
 1060:  * if the return value is 0 then the end has been reached
 1061:  */
 1062: int32 recv_token(int f, char **data)
 1063: {
 1064: 	switch (do_compression) {
 1065: 	case CPRES_NONE:
 1066: 		return simple_recv_token(f,data);
 1067: 	case CPRES_ZLIB:
 1068: 	case CPRES_ZLIBX:
 1069: 		return recv_deflated_token(f, data);
 1070: #ifdef SUPPORT_ZSTD
 1071: 	case CPRES_ZSTD:
 1072: 		return recv_zstd_token(f, data);
 1073: #endif
 1074: #ifdef SUPPORT_LZ4
 1075: 	case CPRES_LZ4:
 1076: 		return recv_compressed_token(f, data);
 1077: #endif
 1078: 	default:
 1079: 		NOISY_DEATH("Unknown do_compression value");
 1080: 	}
 1081: }
 1082: 
 1083: /*
 1084:  * look at the data corresponding to a token, if necessary
 1085:  */
 1086: void see_token(char *data, int32 toklen)
 1087: {
 1088: 	switch (do_compression) {
 1089: 	case CPRES_NONE:
 1090: 		break;
 1091: 	case CPRES_ZLIB:
 1092: 		see_deflate_token(data, toklen);
 1093: 		break;
 1094: 	case CPRES_ZLIBX:
 1095: 		break;
 1096: #ifdef SUPPORT_ZSTD
 1097: 	case CPRES_ZSTD:
 1098: 		break;
 1099: #endif
 1100: #ifdef SUPPORT_LZ4
 1101: 	case CPRES_LZ4:
 1102: 		/*see_uncompressed_token(data, toklen);*/
 1103: 		break;
 1104: #endif
 1105: 	default:
 1106: 		NOISY_DEATH("Unknown do_compression value");
 1107: 	}
 1108: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>