1: /*
2: * Routines used by the file-transfer code.
3: *
4: * Copyright (C) 1996 Andrew Tridgell
5: * Copyright (C) 1996 Paul Mackerras
6: * Copyright (C) 2003-2020 Wayne Davison
7: *
8: * This program is free software; you can redistribute it and/or modify
9: * it under the terms of the GNU General Public License as published by
10: * the Free Software Foundation; either version 3 of the License, or
11: * (at your option) any later version.
12: *
13: * This program is distributed in the hope that it will be useful,
14: * but WITHOUT ANY WARRANTY; without even the implied warranty of
15: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16: * GNU General Public License for more details.
17: *
18: * You should have received a copy of the GNU General Public License along
19: * with this program; if not, visit the http://fsf.org website.
20: */
21:
22: #include "rsync.h"
23: #include "itypes.h"
24: #include <zlib.h>
25: #ifdef SUPPORT_ZSTD
26: #include <zstd.h>
27: #endif
28: #ifdef SUPPORT_LZ4
29: #include <lz4.h>
30: #endif
31:
32: extern int do_compression;
33: extern int protocol_version;
34: extern int module_id;
35: extern int do_compression_level;
36: extern char *skip_compress;
37:
38: #ifndef Z_INSERT_ONLY
39: #define Z_INSERT_ONLY Z_SYNC_FLUSH
40: #endif
41:
42: static int compression_level; /* The compression level for the current file. */
43: static int skip_compression_level; /* The least possible compressing for handling skip-compress files. */
44: static int per_file_default_level; /* The default level that each new file gets prior to checking its suffix. */
45:
46: struct suffix_tree {
47: struct suffix_tree *sibling;
48: struct suffix_tree *child;
49: char letter, word_end;
50: };
51:
52: static char *match_list;
53: static struct suffix_tree *suftree;
54:
55: void init_compression_level(void)
56: {
57: int min_level, max_level, def_level, off_level;
58:
59: switch (do_compression) {
60: case CPRES_NONE:
61: return;
62: case CPRES_ZLIB:
63: case CPRES_ZLIBX:
64: min_level = 1;
65: max_level = Z_BEST_COMPRESSION;
66: def_level = 6; /* Z_DEFAULT_COMPRESSION is -1, so set it to the real default */
67: off_level = skip_compression_level = Z_NO_COMPRESSION;
68: if (do_compression_level == Z_DEFAULT_COMPRESSION)
69: do_compression_level = def_level;
70: break;
71: #ifdef SUPPORT_ZSTD
72: case CPRES_ZSTD:
73: min_level = skip_compression_level = ZSTD_minCLevel();
74: max_level = ZSTD_maxCLevel();
75: def_level = ZSTD_CLEVEL_DEFAULT;
76: off_level = CLVL_NOT_SPECIFIED;
77: if (do_compression_level == 0)
78: do_compression_level = def_level;
79: break;
80: #endif
81: #ifdef SUPPORT_LZ4
82: case CPRES_LZ4:
83: min_level = skip_compression_level = 0;
84: max_level = 0;
85: def_level = 0;
86: off_level = CLVL_NOT_SPECIFIED;
87: break;
88: #endif
89: default: /* paranoia to prevent missing case values */
90: NOISY_DEATH("Unknown do_compression value");
91: }
92:
93: if (do_compression_level == CLVL_NOT_SPECIFIED)
94: do_compression_level = def_level;
95: else if (do_compression_level == off_level) {
96: do_compression = CPRES_NONE;
97: return;
98: }
99:
100: /* We don't bother with any errors or warnings -- just make sure that the values are valid. */
101: if (do_compression_level < min_level)
102: do_compression_level = min_level;
103: else if (do_compression_level > max_level)
104: do_compression_level = max_level;
105: }
106:
107: static void add_suffix(struct suffix_tree **prior, char ltr, const char *str)
108: {
109: struct suffix_tree *node, *newnode;
110:
111: if (ltr == '[') {
112: const char *after = strchr(str, ']');
113: /* Treat "[foo" and "[]" as having a literal '['. */
114: if (after && after++ != str+1) {
115: while ((ltr = *str++) != ']')
116: add_suffix(prior, ltr, after);
117: return;
118: }
119: }
120:
121: for (node = *prior; node; prior = &node->sibling, node = node->sibling) {
122: if (node->letter == ltr) {
123: if (*str)
124: add_suffix(&node->child, *str, str+1);
125: else
126: node->word_end = 1;
127: return;
128: }
129: if (node->letter > ltr)
130: break;
131: }
132: newnode = new(struct suffix_tree);
133: newnode->sibling = node;
134: newnode->child = NULL;
135: newnode->letter = ltr;
136: *prior = newnode;
137: if (*str) {
138: add_suffix(&newnode->child, *str, str+1);
139: newnode->word_end = 0;
140: } else
141: newnode->word_end = 1;
142: }
143:
144: static void add_nocompress_suffixes(const char *str)
145: {
146: char *buf, *t;
147: const char *f = str;
148:
149: buf = new_array(char, strlen(f) + 1);
150:
151: while (*f) {
152: if (*f == '/') {
153: f++;
154: continue;
155: }
156:
157: t = buf;
158: do {
159: if (isUpper(f))
160: *t++ = toLower(f);
161: else
162: *t++ = *f;
163: } while (*++f != '/' && *f);
164: *t++ = '\0';
165:
166: add_suffix(&suftree, *buf, buf+1);
167: }
168:
169: free(buf);
170: }
171:
172: static void init_set_compression(void)
173: {
174: const char *f;
175: char *t, *start;
176:
177: if (skip_compress)
178: add_nocompress_suffixes(skip_compress);
179:
180: /* A non-daemon transfer skips the default suffix list if the
181: * user specified --skip-compress. */
182: if (skip_compress && module_id < 0)
183: f = "";
184: else
185: f = lp_dont_compress(module_id);
186:
187: match_list = t = new_array(char, strlen(f) + 2);
188:
189: per_file_default_level = do_compression_level;
190:
191: while (*f) {
192: if (*f == ' ') {
193: f++;
194: continue;
195: }
196:
197: start = t;
198: do {
199: if (isUpper(f))
200: *t++ = toLower(f);
201: else
202: *t++ = *f;
203: } while (*++f != ' ' && *f);
204: *t++ = '\0';
205:
206: if (t - start == 1+1 && *start == '*') {
207: /* Optimize a match-string of "*". */
208: *match_list = '\0';
209: suftree = NULL;
210: per_file_default_level = skip_compression_level;
211: break;
212: }
213:
214: /* Move *.foo items into the stuffix tree. */
215: if (*start == '*' && start[1] == '.' && start[2]
216: && !strpbrk(start+2, ".?*")) {
217: add_suffix(&suftree, start[2], start+3);
218: t = start;
219: }
220: }
221: *t++ = '\0';
222: }
223:
224: /* determine the compression level based on a wildcard filename list */
225: void set_compression(const char *fname)
226: {
227: const struct suffix_tree *node;
228: const char *s;
229: char ltr;
230:
231: if (!do_compression)
232: return;
233:
234: if (!match_list)
235: init_set_compression();
236:
237: compression_level = per_file_default_level;
238:
239: if (!*match_list && !suftree)
240: return;
241:
242: if ((s = strrchr(fname, '/')) != NULL)
243: fname = s + 1;
244:
245: for (s = match_list; *s; s += strlen(s) + 1) {
246: if (iwildmatch(s, fname)) {
247: compression_level = skip_compression_level;
248: return;
249: }
250: }
251:
252: if (!(node = suftree) || !(s = strrchr(fname, '.'))
253: || s == fname || !(ltr = *++s))
254: return;
255:
256: while (1) {
257: if (isUpper(<r))
258: ltr = toLower(<r);
259: while (node->letter != ltr) {
260: if (node->letter > ltr)
261: return;
262: if (!(node = node->sibling))
263: return;
264: }
265: if ((ltr = *++s) == '\0') {
266: if (node->word_end)
267: compression_level = skip_compression_level;
268: return;
269: }
270: if (!(node = node->child))
271: return;
272: }
273: }
274:
275: /* non-compressing recv token */
276: static int32 simple_recv_token(int f, char **data)
277: {
278: static int32 residue;
279: static char *buf;
280: int32 n;
281:
282: if (!buf)
283: buf = new_array(char, CHUNK_SIZE);
284:
285: if (residue == 0) {
286: int32 i = read_int(f);
287: if (i <= 0)
288: return i;
289: residue = i;
290: }
291:
292: *data = buf;
293: n = MIN(CHUNK_SIZE,residue);
294: residue -= n;
295: read_buf(f,buf,n);
296: return n;
297: }
298:
299: /* non-compressing send token */
300: static void simple_send_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 n)
301: {
302: if (n > 0) {
303: int32 len = 0;
304: while (len < n) {
305: int32 n1 = MIN(CHUNK_SIZE, n-len);
306: write_int(f, n1);
307: write_buf(f, map_ptr(buf, offset+len, n1), n1);
308: len += n1;
309: }
310: }
311: /* a -2 token means to send data only and no token */
312: if (token != -2)
313: write_int(f, -(token+1));
314: }
315:
316: /* Flag bytes in compressed stream are encoded as follows: */
317: #define END_FLAG 0 /* that's all folks */
318: #define TOKEN_LONG 0x20 /* followed by 32-bit token number */
319: #define TOKENRUN_LONG 0x21 /* ditto with 16-bit run count */
320: #define DEFLATED_DATA 0x40 /* + 6-bit high len, then low len byte */
321: #define TOKEN_REL 0x80 /* + 6-bit relative token number */
322: #define TOKENRUN_REL 0xc0 /* ditto with 16-bit run count */
323:
324: #define MAX_DATA_COUNT 16383 /* fit 14 bit count into 2 bytes with flags */
325:
326: /* zlib.h says that if we want to be able to compress something in a single
327: * call, avail_out must be at least 0.1% larger than avail_in plus 12 bytes.
328: * We'll add in 0.1%+16, just to be safe (and we'll avoid floating point,
329: * to ensure that this is a compile-time value). */
330: #define AVAIL_OUT_SIZE(avail_in_size) ((avail_in_size)*1001/1000+16)
331:
332: /* For coding runs of tokens */
333: static int32 last_token = -1;
334: static int32 run_start;
335: static int32 last_run_end;
336:
337: /* Deflation state */
338: static z_stream tx_strm;
339:
340: /* Output buffer */
341: static char *obuf;
342:
343: /* We want obuf to be able to hold both MAX_DATA_COUNT+2 bytes as well as
344: * AVAIL_OUT_SIZE(CHUNK_SIZE) bytes, so make sure that it's large enough. */
345: #if MAX_DATA_COUNT+2 > AVAIL_OUT_SIZE(CHUNK_SIZE)
346: #define OBUF_SIZE (MAX_DATA_COUNT+2)
347: #else
348: #define OBUF_SIZE AVAIL_OUT_SIZE(CHUNK_SIZE)
349: #endif
350:
351: /* Send a deflated token */
352: static void
353: send_deflated_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb, int32 toklen)
354: {
355: static int init_done, flush_pending;
356: int32 n, r;
357:
358: if (last_token == -1) {
359: /* initialization */
360: if (!init_done) {
361: tx_strm.next_in = NULL;
362: tx_strm.zalloc = NULL;
363: tx_strm.zfree = NULL;
364: if (deflateInit2(&tx_strm, compression_level,
365: Z_DEFLATED, -15, 8,
366: Z_DEFAULT_STRATEGY) != Z_OK) {
367: rprintf(FERROR, "compression init failed\n");
368: exit_cleanup(RERR_PROTOCOL);
369: }
370: obuf = new_array(char, OBUF_SIZE);
371: init_done = 1;
372: } else
373: deflateReset(&tx_strm);
374: last_run_end = 0;
375: run_start = token;
376: flush_pending = 0;
377: } else if (last_token == -2) {
378: run_start = token;
379: } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
380: /* output previous run */
381: r = run_start - last_run_end;
382: n = last_token - run_start;
383: if (r >= 0 && r <= 63) {
384: write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
385: } else {
386: write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
387: write_int(f, run_start);
388: }
389: if (n != 0) {
390: write_byte(f, n);
391: write_byte(f, n >> 8);
392: }
393: last_run_end = last_token;
394: run_start = token;
395: }
396:
397: last_token = token;
398:
399: if (nb != 0 || flush_pending) {
400: /* deflate the data starting at offset */
401: int flush = Z_NO_FLUSH;
402: tx_strm.avail_in = 0;
403: tx_strm.avail_out = 0;
404: do {
405: if (tx_strm.avail_in == 0 && nb != 0) {
406: /* give it some more input */
407: n = MIN(nb, CHUNK_SIZE);
408: tx_strm.next_in = (Bytef *)
409: map_ptr(buf, offset, n);
410: tx_strm.avail_in = n;
411: nb -= n;
412: offset += n;
413: }
414: if (tx_strm.avail_out == 0) {
415: tx_strm.next_out = (Bytef *)(obuf + 2);
416: tx_strm.avail_out = MAX_DATA_COUNT;
417: if (flush != Z_NO_FLUSH) {
418: /*
419: * We left the last 4 bytes in the
420: * buffer, in case they are the
421: * last 4. Move them to the front.
422: */
423: memcpy(tx_strm.next_out, obuf+MAX_DATA_COUNT-2, 4);
424: tx_strm.next_out += 4;
425: tx_strm.avail_out -= 4;
426: }
427: }
428: if (nb == 0 && token != -2)
429: flush = Z_SYNC_FLUSH;
430: r = deflate(&tx_strm, flush);
431: if (r != Z_OK) {
432: rprintf(FERROR, "deflate returned %d\n", r);
433: exit_cleanup(RERR_STREAMIO);
434: }
435: if (nb == 0 || tx_strm.avail_out == 0) {
436: n = MAX_DATA_COUNT - tx_strm.avail_out;
437: if (flush != Z_NO_FLUSH) {
438: /*
439: * We have to trim off the last 4
440: * bytes of output when flushing
441: * (they are just 0, 0, ff, ff).
442: */
443: n -= 4;
444: }
445: if (n > 0) {
446: obuf[0] = DEFLATED_DATA + (n >> 8);
447: obuf[1] = n;
448: write_buf(f, obuf, n+2);
449: }
450: }
451: } while (nb != 0 || tx_strm.avail_out == 0);
452: flush_pending = token == -2;
453: }
454:
455: if (token == -1) {
456: /* end of file - clean up */
457: write_byte(f, END_FLAG);
458: } else if (token != -2 && do_compression == CPRES_ZLIB) {
459: /* Add the data in the current block to the compressor's
460: * history and hash table. */
461: do {
462: /* Break up long sections in the same way that
463: * see_deflate_token() does. */
464: int32 n1 = toklen > 0xffff ? 0xffff : toklen;
465: toklen -= n1;
466: tx_strm.next_in = (Bytef *)map_ptr(buf, offset, n1);
467: tx_strm.avail_in = n1;
468: if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
469: offset += n1;
470: tx_strm.next_out = (Bytef *) obuf;
471: tx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
472: r = deflate(&tx_strm, Z_INSERT_ONLY);
473: if (r != Z_OK || tx_strm.avail_in != 0) {
474: rprintf(FERROR, "deflate on token returned %d (%d bytes left)\n",
475: r, tx_strm.avail_in);
476: exit_cleanup(RERR_STREAMIO);
477: }
478: } while (toklen > 0);
479: }
480: }
481:
482: /* tells us what the receiver is in the middle of doing */
483: static enum { r_init, r_idle, r_running, r_inflating, r_inflated } recv_state;
484:
485: /* for inflating stuff */
486: static z_stream rx_strm;
487: static char *cbuf;
488: static char *dbuf;
489:
490: /* for decoding runs of tokens */
491: static int32 rx_token;
492: static int32 rx_run;
493:
494: /* Receive a deflated token and inflate it */
495: static int32 recv_deflated_token(int f, char **data)
496: {
497: static int init_done;
498: static int32 saved_flag;
499: int32 n, flag;
500: int r;
501:
502: for (;;) {
503: switch (recv_state) {
504: case r_init:
505: if (!init_done) {
506: rx_strm.next_out = NULL;
507: rx_strm.zalloc = NULL;
508: rx_strm.zfree = NULL;
509: if (inflateInit2(&rx_strm, -15) != Z_OK) {
510: rprintf(FERROR, "inflate init failed\n");
511: exit_cleanup(RERR_PROTOCOL);
512: }
513: cbuf = new_array(char, MAX_DATA_COUNT);
514: dbuf = new_array(char, AVAIL_OUT_SIZE(CHUNK_SIZE));
515: init_done = 1;
516: } else {
517: inflateReset(&rx_strm);
518: }
519: recv_state = r_idle;
520: rx_token = 0;
521: break;
522:
523: case r_idle:
524: case r_inflated:
525: if (saved_flag) {
526: flag = saved_flag & 0xff;
527: saved_flag = 0;
528: } else
529: flag = read_byte(f);
530: if ((flag & 0xC0) == DEFLATED_DATA) {
531: n = ((flag & 0x3f) << 8) + read_byte(f);
532: read_buf(f, cbuf, n);
533: rx_strm.next_in = (Bytef *)cbuf;
534: rx_strm.avail_in = n;
535: recv_state = r_inflating;
536: break;
537: }
538: if (recv_state == r_inflated) {
539: /* check previous inflated stuff ended correctly */
540: rx_strm.avail_in = 0;
541: rx_strm.next_out = (Bytef *)dbuf;
542: rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
543: r = inflate(&rx_strm, Z_SYNC_FLUSH);
544: n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
545: /*
546: * Z_BUF_ERROR just means no progress was
547: * made, i.e. the decompressor didn't have
548: * any pending output for us.
549: */
550: if (r != Z_OK && r != Z_BUF_ERROR) {
551: rprintf(FERROR, "inflate flush returned %d (%d bytes)\n",
552: r, n);
553: exit_cleanup(RERR_STREAMIO);
554: }
555: if (n != 0 && r != Z_BUF_ERROR) {
556: /* have to return some more data and
557: save the flag for later. */
558: saved_flag = flag + 0x10000;
559: *data = dbuf;
560: return n;
561: }
562: /*
563: * At this point the decompressor should
564: * be expecting to see the 0, 0, ff, ff bytes.
565: */
566: if (!inflateSyncPoint(&rx_strm)) {
567: rprintf(FERROR, "decompressor lost sync!\n");
568: exit_cleanup(RERR_STREAMIO);
569: }
570: rx_strm.avail_in = 4;
571: rx_strm.next_in = (Bytef *)cbuf;
572: cbuf[0] = cbuf[1] = 0;
573: cbuf[2] = cbuf[3] = 0xff;
574: inflate(&rx_strm, Z_SYNC_FLUSH);
575: recv_state = r_idle;
576: }
577: if (flag == END_FLAG) {
578: /* that's all folks */
579: recv_state = r_init;
580: return 0;
581: }
582:
583: /* here we have a token of some kind */
584: if (flag & TOKEN_REL) {
585: rx_token += flag & 0x3f;
586: flag >>= 6;
587: } else
588: rx_token = read_int(f);
589: if (flag & 1) {
590: rx_run = read_byte(f);
591: rx_run += read_byte(f) << 8;
592: recv_state = r_running;
593: }
594: return -1 - rx_token;
595:
596: case r_inflating:
597: rx_strm.next_out = (Bytef *)dbuf;
598: rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
599: r = inflate(&rx_strm, Z_NO_FLUSH);
600: n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
601: if (r != Z_OK) {
602: rprintf(FERROR, "inflate returned %d (%d bytes)\n", r, n);
603: exit_cleanup(RERR_STREAMIO);
604: }
605: if (rx_strm.avail_in == 0)
606: recv_state = r_inflated;
607: if (n != 0) {
608: *data = dbuf;
609: return n;
610: }
611: break;
612:
613: case r_running:
614: ++rx_token;
615: if (--rx_run == 0)
616: recv_state = r_idle;
617: return -1 - rx_token;
618: }
619: }
620: }
621:
622: /*
623: * put the data corresponding to a token that we've just returned
624: * from recv_deflated_token into the decompressor's history buffer.
625: */
626: static void see_deflate_token(char *buf, int32 len)
627: {
628: int r;
629: int32 blklen;
630: unsigned char hdr[5];
631:
632: rx_strm.avail_in = 0;
633: blklen = 0;
634: hdr[0] = 0;
635: do {
636: if (rx_strm.avail_in == 0 && len != 0) {
637: if (blklen == 0) {
638: /* Give it a fake stored-block header. */
639: rx_strm.next_in = (Bytef *)hdr;
640: rx_strm.avail_in = 5;
641: blklen = len;
642: if (blklen > 0xffff)
643: blklen = 0xffff;
644: hdr[1] = blklen;
645: hdr[2] = blklen >> 8;
646: hdr[3] = ~hdr[1];
647: hdr[4] = ~hdr[2];
648: } else {
649: rx_strm.next_in = (Bytef *)buf;
650: rx_strm.avail_in = blklen;
651: if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
652: buf += blklen;
653: len -= blklen;
654: blklen = 0;
655: }
656: }
657: rx_strm.next_out = (Bytef *)dbuf;
658: rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
659: r = inflate(&rx_strm, Z_SYNC_FLUSH);
660: if (r != Z_OK && r != Z_BUF_ERROR) {
661: rprintf(FERROR, "inflate (token) returned %d\n", r);
662: exit_cleanup(RERR_STREAMIO);
663: }
664: } while (len || rx_strm.avail_out == 0);
665: }
666:
667: #ifdef SUPPORT_ZSTD
668:
669: static ZSTD_inBuffer zstd_in_buff;
670: static ZSTD_outBuffer zstd_out_buff;
671: static ZSTD_CCtx *zstd_cctx;
672:
673: static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
674: {
675: static int comp_init_done, flush_pending;
676: ZSTD_EndDirective flush = ZSTD_e_continue;
677: int32 n, r;
678:
679: /* initialization */
680: if (!comp_init_done) {
681: zstd_cctx = ZSTD_createCCtx();
682: if (!zstd_cctx) {
683: rprintf(FERROR, "compression init failed\n");
684: exit_cleanup(RERR_PROTOCOL);
685: }
686:
687: obuf = new_array(char, OBUF_SIZE);
688:
689: ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level);
690: zstd_out_buff.dst = obuf + 2;
691:
692: comp_init_done = 1;
693: }
694:
695: if (last_token == -1) {
696: last_run_end = 0;
697: run_start = token;
698: flush_pending = 0;
699: } else if (last_token == -2) {
700: run_start = token;
701: } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
702: /* output previous run */
703: r = run_start - last_run_end;
704: n = last_token - run_start;
705:
706: if (r >= 0 && r <= 63) {
707: write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
708: } else {
709: write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
710: write_int(f, run_start);
711: }
712: if (n != 0) {
713: write_byte(f, n);
714: write_byte(f, n >> 8);
715: }
716: last_run_end = last_token;
717: run_start = token;
718: }
719:
720: last_token = token;
721:
722: if (nb || flush_pending) {
723:
724: zstd_in_buff.src = map_ptr(buf, offset, nb);
725: zstd_in_buff.size = nb;
726: zstd_in_buff.pos = 0;
727:
728: do {
729: if (zstd_out_buff.size == 0) {
730: zstd_out_buff.size = MAX_DATA_COUNT;
731: zstd_out_buff.pos = 0;
732: }
733:
734: /* File ended, flush */
735: if (token != -2)
736: flush = ZSTD_e_flush;
737:
738: r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush);
739: if (ZSTD_isError(r)) {
740: rprintf(FERROR, "ZSTD_compressStream returned %d\n", r);
741: exit_cleanup(RERR_STREAMIO);
742: }
743:
744: /*
745: * Nothing is sent if the buffer isn't full so avoid smaller
746: * transfers. If a file is finished then we flush the internal
747: * state and send a smaller buffer so that the remote side can
748: * finish the file.
749: */
750: if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
751: n = zstd_out_buff.pos;
752:
753: obuf[0] = DEFLATED_DATA + (n >> 8);
754: obuf[1] = n;
755: write_buf(f, obuf, n+2);
756:
757: zstd_out_buff.size = 0;
758: }
759: /*
760: * Loop while the input buffer isn't full consumed or the
761: * internal state isn't fully flushed.
762: */
763: } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
764: flush_pending = token == -2;
765: }
766:
767: if (token == -1) {
768: /* end of file - clean up */
769: write_byte(f, END_FLAG);
770: }
771: }
772:
773: static ZSTD_DCtx *zstd_dctx;
774:
775: static int32 recv_zstd_token(int f, char **data)
776: {
777: static int decomp_init_done;
778: static int out_buffer_size;
779: int32 n, flag;
780: int r;
781:
782: if (!decomp_init_done) {
783: zstd_dctx = ZSTD_createDCtx();
784: if (!zstd_dctx) {
785: rprintf(FERROR, "ZSTD_createDStream failed\n");
786: exit_cleanup(RERR_PROTOCOL);
787: }
788:
789: /* Output buffer fits two decompressed blocks */
790: out_buffer_size = ZSTD_DStreamOutSize() * 2;
791: cbuf = new_array(char, MAX_DATA_COUNT);
792: dbuf = new_array(char, out_buffer_size);
793:
794: zstd_in_buff.src = cbuf;
795: zstd_out_buff.dst = dbuf;
796:
797: decomp_init_done = 1;
798: }
799:
800: for (;;) {
801: switch (recv_state) {
802: case r_init:
803: recv_state = r_idle;
804: rx_token = 0;
805: break;
806:
807: case r_idle:
808: flag = read_byte(f);
809: if ((flag & 0xC0) == DEFLATED_DATA) {
810: n = ((flag & 0x3f) << 8) + read_byte(f);
811: read_buf(f, cbuf, n);
812:
813: zstd_in_buff.size = n;
814: zstd_in_buff.pos = 0;
815:
816: recv_state = r_inflating;
817: break;
818: }
819:
820: if (flag == END_FLAG) {
821: /* that's all folks */
822: recv_state = r_init;
823: return 0;
824: }
825: /* here we have a token of some kind */
826: if (flag & TOKEN_REL) {
827: rx_token += flag & 0x3f;
828: flag >>= 6;
829: } else
830: rx_token = read_int(f);
831: if (flag & 1) {
832: rx_run = read_byte(f);
833: rx_run += read_byte(f) << 8;
834: recv_state = r_running;
835: }
836: return -1 - rx_token;
837:
838: case r_inflated: /* zstd doesn't get into this state */
839: break;
840:
841: case r_inflating:
842: zstd_out_buff.size = out_buffer_size;
843: zstd_out_buff.pos = 0;
844:
845: r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff);
846: n = zstd_out_buff.pos;
847: if (ZSTD_isError(r)) {
848: rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n);
849: exit_cleanup(RERR_STREAMIO);
850: }
851:
852: /*
853: * If the input buffer is fully consumed and the output
854: * buffer is not full then next step is to read more
855: * data.
856: */
857: if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size)
858: recv_state = r_idle;
859:
860: if (n != 0) {
861: *data = dbuf;
862: return n;
863: }
864: break;
865:
866: case r_running:
867: ++rx_token;
868: if (--rx_run == 0)
869: recv_state = r_idle;
870: return -1 - rx_token;
871: }
872: }
873: }
874: #endif /* SUPPORT_ZSTD */
875:
876: #ifdef SUPPORT_LZ4
877: static void
878: send_compressed_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
879: {
880: static int init_done, flush_pending;
881: int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
882: int32 n, r;
883:
884: if (last_token == -1) {
885: if (!init_done) {
886: obuf = new_array(char, size);
887: init_done = 1;
888: }
889: last_run_end = 0;
890: run_start = token;
891: flush_pending = 0;
892: } else if (last_token == -2) {
893: run_start = token;
894: } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
895: /* output previous run */
896: r = run_start - last_run_end;
897: n = last_token - run_start;
898: if (r >= 0 && r <= 63) {
899: write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
900: } else {
901: write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
902: write_int(f, run_start);
903: }
904: if (n != 0) {
905: write_byte(f, n);
906: write_byte(f, n >> 8);
907: }
908: last_run_end = last_token;
909: run_start = token;
910: }
911:
912: last_token = token;
913:
914: if (nb != 0 || flush_pending) {
915: int available_in, available_out = 0;
916: const char *next_in;
917:
918: do {
919: char *next_out = obuf + 2;
920:
921: if (available_out == 0) {
922: available_in = MIN(nb, MAX_DATA_COUNT);
923: next_in = map_ptr(buf, offset, available_in);
924: } else
925: available_in /= 2;
926:
927: available_out = LZ4_compress_default(next_in, next_out, available_in, size - 2);
928: if (!available_out) {
929: rprintf(FERROR, "compress returned %d\n", available_out);
930: exit_cleanup(RERR_STREAMIO);
931: }
932: if (available_out <= MAX_DATA_COUNT) {
933: obuf[0] = DEFLATED_DATA + (available_out >> 8);
934: obuf[1] = available_out;
935:
936: write_buf(f, obuf, available_out + 2);
937:
938: available_out = 0;
939: nb -= available_in;
940: offset += available_in;
941: }
942: } while (nb != 0);
943: flush_pending = token == -2;
944: }
945: if (token == -1) {
946: /* end of file - clean up */
947: write_byte(f, END_FLAG);
948: }
949: }
950:
951: static int32 recv_compressed_token(int f, char **data)
952: {
953: static int init_done;
954: int32 n, flag;
955: int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
956: static const char *next_in;
957: static int avail_in;
958: int avail_out;
959:
960: for (;;) {
961: switch (recv_state) {
962: case r_init:
963: if (!init_done) {
964: cbuf = new_array(char, MAX_DATA_COUNT);
965: dbuf = new_array(char, size);
966: init_done = 1;
967: }
968: recv_state = r_idle;
969: rx_token = 0;
970: break;
971:
972: case r_idle:
973: flag = read_byte(f);
974: if ((flag & 0xC0) == DEFLATED_DATA) {
975: n = ((flag & 0x3f) << 8) + read_byte(f);
976: read_buf(f, cbuf, n);
977: next_in = (char *)cbuf;
978: avail_in = n;
979: recv_state = r_inflating;
980: break;
981: }
982:
983: if (flag == END_FLAG) {
984: /* that's all folks */
985: recv_state = r_init;
986: return 0;
987: }
988:
989: /* here we have a token of some kind */
990: if (flag & TOKEN_REL) {
991: rx_token += flag & 0x3f;
992: flag >>= 6;
993: } else
994: rx_token = read_int(f);
995: if (flag & 1) {
996: rx_run = read_byte(f);
997: rx_run += read_byte(f) << 8;
998: recv_state = r_running;
999: }
1000: return -1 - rx_token;
1001:
1002: case r_inflating:
1003: avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, size);
1004: if (avail_out < 0) {
1005: rprintf(FERROR, "uncompress failed: %d\n", avail_out);
1006: exit_cleanup(RERR_STREAMIO);
1007: }
1008: recv_state = r_idle;
1009: *data = dbuf;
1010: return avail_out;
1011:
1012: case r_inflated: /* lz4 doesn't get into this state */
1013: break;
1014:
1015: case r_running:
1016: ++rx_token;
1017: if (--rx_run == 0)
1018: recv_state = r_idle;
1019: return -1 - rx_token;
1020: }
1021: }
1022: }
1023: #endif /* SUPPORT_LZ4 */
1024:
1025: /**
1026: * Transmit a verbatim buffer of length @p n followed by a token.
1027: * If token == -1 then we have reached EOF
1028: * If n == 0 then don't send a buffer
1029: */
1030: void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
1031: int32 n, int32 toklen)
1032: {
1033: switch (do_compression) {
1034: case CPRES_NONE:
1035: simple_send_token(f, token, buf, offset, n);
1036: break;
1037: case CPRES_ZLIB:
1038: case CPRES_ZLIBX:
1039: send_deflated_token(f, token, buf, offset, n, toklen);
1040: break;
1041: #ifdef SUPPORT_ZSTD
1042: case CPRES_ZSTD:
1043: send_zstd_token(f, token, buf, offset, n);
1044: break;
1045: #endif
1046: #ifdef SUPPORT_LZ4
1047: case CPRES_LZ4:
1048: send_compressed_token(f, token, buf, offset, n);
1049: break;
1050: #endif
1051: default:
1052: NOISY_DEATH("Unknown do_compression value");
1053: }
1054: }
1055:
1056: /*
1057: * receive a token or buffer from the other end. If the return value is >0 then
1058: * it is a data buffer of that length, and *data will point at the data.
1059: * if the return value is -i then it represents token i-1
1060: * if the return value is 0 then the end has been reached
1061: */
1062: int32 recv_token(int f, char **data)
1063: {
1064: switch (do_compression) {
1065: case CPRES_NONE:
1066: return simple_recv_token(f,data);
1067: case CPRES_ZLIB:
1068: case CPRES_ZLIBX:
1069: return recv_deflated_token(f, data);
1070: #ifdef SUPPORT_ZSTD
1071: case CPRES_ZSTD:
1072: return recv_zstd_token(f, data);
1073: #endif
1074: #ifdef SUPPORT_LZ4
1075: case CPRES_LZ4:
1076: return recv_compressed_token(f, data);
1077: #endif
1078: default:
1079: NOISY_DEATH("Unknown do_compression value");
1080: }
1081: }
1082:
1083: /*
1084: * look at the data corresponding to a token, if necessary
1085: */
1086: void see_token(char *data, int32 toklen)
1087: {
1088: switch (do_compression) {
1089: case CPRES_NONE:
1090: break;
1091: case CPRES_ZLIB:
1092: see_deflate_token(data, toklen);
1093: break;
1094: case CPRES_ZLIBX:
1095: break;
1096: #ifdef SUPPORT_ZSTD
1097: case CPRES_ZSTD:
1098: break;
1099: #endif
1100: #ifdef SUPPORT_LZ4
1101: case CPRES_LZ4:
1102: /*see_uncompressed_token(data, toklen);*/
1103: break;
1104: #endif
1105: default:
1106: NOISY_DEATH("Unknown do_compression value");
1107: }
1108: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>