Annotation of embedaddon/rsync/token.c, revision 1.1.1.4
1.1 misho 1: /*
2: * Routines used by the file-transfer code.
3: *
4: * Copyright (C) 1996 Andrew Tridgell
5: * Copyright (C) 1996 Paul Mackerras
1.1.1.4 ! misho 6: * Copyright (C) 2003-2020 Wayne Davison
1.1 misho 7: *
8: * This program is free software; you can redistribute it and/or modify
9: * it under the terms of the GNU General Public License as published by
10: * the Free Software Foundation; either version 3 of the License, or
11: * (at your option) any later version.
12: *
13: * This program is distributed in the hope that it will be useful,
14: * but WITHOUT ANY WARRANTY; without even the implied warranty of
15: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16: * GNU General Public License for more details.
17: *
18: * You should have received a copy of the GNU General Public License along
19: * with this program; if not, visit the http://fsf.org website.
20: */
21:
22: #include "rsync.h"
1.1.1.2 misho 23: #include "itypes.h"
24: #include <zlib.h>
1.1.1.4 ! misho 25: #ifdef SUPPORT_ZSTD
! 26: #include <zstd.h>
! 27: #endif
! 28: #ifdef SUPPORT_LZ4
! 29: #include <lz4.h>
! 30: #endif
1.1.1.2 misho 31:
1.1 misho 32: extern int do_compression;
1.1.1.2 misho 33: extern int protocol_version;
1.1 misho 34: extern int module_id;
1.1.1.4 ! misho 35: extern int do_compression_level;
1.1 misho 36: extern char *skip_compress;
37:
1.1.1.4 ! misho 38: #ifndef Z_INSERT_ONLY
! 39: #define Z_INSERT_ONLY Z_SYNC_FLUSH
! 40: #endif
! 41:
! 42: static int compression_level; /* The compression level for the current file. */
! 43: static int skip_compression_level; /* The least possible compressing for handling skip-compress files. */
! 44: static int per_file_default_level; /* The default level that each new file gets prior to checking its suffix. */
1.1 misho 45:
46: struct suffix_tree {
47: struct suffix_tree *sibling;
48: struct suffix_tree *child;
49: char letter, word_end;
50: };
51:
52: static char *match_list;
53: static struct suffix_tree *suftree;
54:
1.1.1.4 ! misho 55: void init_compression_level(void)
! 56: {
! 57: int min_level, max_level, def_level, off_level;
! 58:
! 59: switch (do_compression) {
! 60: case CPRES_NONE:
! 61: return;
! 62: case CPRES_ZLIB:
! 63: case CPRES_ZLIBX:
! 64: min_level = 1;
! 65: max_level = Z_BEST_COMPRESSION;
! 66: def_level = 6; /* Z_DEFAULT_COMPRESSION is -1, so set it to the real default */
! 67: off_level = skip_compression_level = Z_NO_COMPRESSION;
! 68: if (do_compression_level == Z_DEFAULT_COMPRESSION)
! 69: do_compression_level = def_level;
! 70: break;
! 71: #ifdef SUPPORT_ZSTD
! 72: case CPRES_ZSTD:
! 73: min_level = skip_compression_level = ZSTD_minCLevel();
! 74: max_level = ZSTD_maxCLevel();
! 75: def_level = ZSTD_CLEVEL_DEFAULT;
! 76: off_level = CLVL_NOT_SPECIFIED;
! 77: if (do_compression_level == 0)
! 78: do_compression_level = def_level;
! 79: break;
! 80: #endif
! 81: #ifdef SUPPORT_LZ4
! 82: case CPRES_LZ4:
! 83: min_level = skip_compression_level = 0;
! 84: max_level = 0;
! 85: def_level = 0;
! 86: off_level = CLVL_NOT_SPECIFIED;
! 87: break;
! 88: #endif
! 89: default: /* paranoia to prevent missing case values */
! 90: NOISY_DEATH("Unknown do_compression value");
! 91: }
! 92:
! 93: if (do_compression_level == CLVL_NOT_SPECIFIED)
! 94: do_compression_level = def_level;
! 95: else if (do_compression_level == off_level) {
! 96: do_compression = CPRES_NONE;
! 97: return;
! 98: }
! 99:
! 100: /* We don't bother with any errors or warnings -- just make sure that the values are valid. */
! 101: if (do_compression_level < min_level)
! 102: do_compression_level = min_level;
! 103: else if (do_compression_level > max_level)
! 104: do_compression_level = max_level;
! 105: }
! 106:
1.1 misho 107: static void add_suffix(struct suffix_tree **prior, char ltr, const char *str)
108: {
109: struct suffix_tree *node, *newnode;
110:
111: if (ltr == '[') {
112: const char *after = strchr(str, ']');
113: /* Treat "[foo" and "[]" as having a literal '['. */
114: if (after && after++ != str+1) {
115: while ((ltr = *str++) != ']')
116: add_suffix(prior, ltr, after);
117: return;
118: }
119: }
120:
121: for (node = *prior; node; prior = &node->sibling, node = node->sibling) {
122: if (node->letter == ltr) {
123: if (*str)
124: add_suffix(&node->child, *str, str+1);
125: else
126: node->word_end = 1;
127: return;
128: }
129: if (node->letter > ltr)
130: break;
131: }
1.1.1.4 ! misho 132: newnode = new(struct suffix_tree);
1.1 misho 133: newnode->sibling = node;
134: newnode->child = NULL;
135: newnode->letter = ltr;
136: *prior = newnode;
137: if (*str) {
138: add_suffix(&newnode->child, *str, str+1);
139: newnode->word_end = 0;
140: } else
141: newnode->word_end = 1;
142: }
143:
144: static void add_nocompress_suffixes(const char *str)
145: {
146: char *buf, *t;
147: const char *f = str;
148:
1.1.1.4 ! misho 149: buf = new_array(char, strlen(f) + 1);
1.1 misho 150:
151: while (*f) {
152: if (*f == '/') {
153: f++;
154: continue;
155: }
156:
157: t = buf;
158: do {
159: if (isUpper(f))
160: *t++ = toLower(f);
161: else
162: *t++ = *f;
163: } while (*++f != '/' && *f);
164: *t++ = '\0';
165:
166: add_suffix(&suftree, *buf, buf+1);
167: }
168:
169: free(buf);
170: }
171:
172: static void init_set_compression(void)
173: {
174: const char *f;
175: char *t, *start;
176:
177: if (skip_compress)
178: add_nocompress_suffixes(skip_compress);
179:
180: /* A non-daemon transfer skips the default suffix list if the
181: * user specified --skip-compress. */
182: if (skip_compress && module_id < 0)
183: f = "";
184: else
185: f = lp_dont_compress(module_id);
186:
1.1.1.4 ! misho 187: match_list = t = new_array(char, strlen(f) + 2);
1.1 misho 188:
1.1.1.4 ! misho 189: per_file_default_level = do_compression_level;
1.1 misho 190:
191: while (*f) {
192: if (*f == ' ') {
193: f++;
194: continue;
195: }
196:
197: start = t;
198: do {
199: if (isUpper(f))
200: *t++ = toLower(f);
201: else
202: *t++ = *f;
203: } while (*++f != ' ' && *f);
204: *t++ = '\0';
205:
206: if (t - start == 1+1 && *start == '*') {
207: /* Optimize a match-string of "*". */
208: *match_list = '\0';
209: suftree = NULL;
1.1.1.4 ! misho 210: per_file_default_level = skip_compression_level;
1.1 misho 211: break;
212: }
213:
214: /* Move *.foo items into the stuffix tree. */
215: if (*start == '*' && start[1] == '.' && start[2]
216: && !strpbrk(start+2, ".?*")) {
217: add_suffix(&suftree, start[2], start+3);
218: t = start;
219: }
220: }
221: *t++ = '\0';
222: }
223:
224: /* determine the compression level based on a wildcard filename list */
225: void set_compression(const char *fname)
226: {
227: const struct suffix_tree *node;
228: const char *s;
229: char ltr;
230:
231: if (!do_compression)
232: return;
233:
234: if (!match_list)
235: init_set_compression();
236:
237: compression_level = per_file_default_level;
238:
239: if (!*match_list && !suftree)
240: return;
241:
242: if ((s = strrchr(fname, '/')) != NULL)
243: fname = s + 1;
244:
245: for (s = match_list; *s; s += strlen(s) + 1) {
246: if (iwildmatch(s, fname)) {
1.1.1.4 ! misho 247: compression_level = skip_compression_level;
1.1 misho 248: return;
249: }
250: }
251:
252: if (!(node = suftree) || !(s = strrchr(fname, '.'))
253: || s == fname || !(ltr = *++s))
254: return;
255:
256: while (1) {
257: if (isUpper(<r))
258: ltr = toLower(<r);
259: while (node->letter != ltr) {
260: if (node->letter > ltr)
261: return;
262: if (!(node = node->sibling))
263: return;
264: }
265: if ((ltr = *++s) == '\0') {
266: if (node->word_end)
1.1.1.4 ! misho 267: compression_level = skip_compression_level;
1.1 misho 268: return;
269: }
270: if (!(node = node->child))
271: return;
272: }
273: }
274:
275: /* non-compressing recv token */
276: static int32 simple_recv_token(int f, char **data)
277: {
278: static int32 residue;
279: static char *buf;
280: int32 n;
281:
1.1.1.4 ! misho 282: if (!buf)
1.1 misho 283: buf = new_array(char, CHUNK_SIZE);
284:
285: if (residue == 0) {
286: int32 i = read_int(f);
287: if (i <= 0)
288: return i;
289: residue = i;
290: }
291:
292: *data = buf;
293: n = MIN(CHUNK_SIZE,residue);
294: residue -= n;
295: read_buf(f,buf,n);
296: return n;
297: }
298:
299: /* non-compressing send token */
1.1.1.4 ! misho 300: static void simple_send_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 n)
1.1 misho 301: {
302: if (n > 0) {
303: int32 len = 0;
304: while (len < n) {
305: int32 n1 = MIN(CHUNK_SIZE, n-len);
306: write_int(f, n1);
307: write_buf(f, map_ptr(buf, offset+len, n1), n1);
308: len += n1;
309: }
310: }
311: /* a -2 token means to send data only and no token */
312: if (token != -2)
313: write_int(f, -(token+1));
314: }
315:
316: /* Flag bytes in compressed stream are encoded as follows: */
317: #define END_FLAG 0 /* that's all folks */
318: #define TOKEN_LONG 0x20 /* followed by 32-bit token number */
319: #define TOKENRUN_LONG 0x21 /* ditto with 16-bit run count */
320: #define DEFLATED_DATA 0x40 /* + 6-bit high len, then low len byte */
321: #define TOKEN_REL 0x80 /* + 6-bit relative token number */
322: #define TOKENRUN_REL 0xc0 /* ditto with 16-bit run count */
323:
324: #define MAX_DATA_COUNT 16383 /* fit 14 bit count into 2 bytes with flags */
325:
326: /* zlib.h says that if we want to be able to compress something in a single
327: * call, avail_out must be at least 0.1% larger than avail_in plus 12 bytes.
328: * We'll add in 0.1%+16, just to be safe (and we'll avoid floating point,
329: * to ensure that this is a compile-time value). */
330: #define AVAIL_OUT_SIZE(avail_in_size) ((avail_in_size)*1001/1000+16)
331:
332: /* For coding runs of tokens */
333: static int32 last_token = -1;
334: static int32 run_start;
335: static int32 last_run_end;
336:
337: /* Deflation state */
338: static z_stream tx_strm;
339:
340: /* Output buffer */
341: static char *obuf;
342:
343: /* We want obuf to be able to hold both MAX_DATA_COUNT+2 bytes as well as
344: * AVAIL_OUT_SIZE(CHUNK_SIZE) bytes, so make sure that it's large enough. */
345: #if MAX_DATA_COUNT+2 > AVAIL_OUT_SIZE(CHUNK_SIZE)
346: #define OBUF_SIZE (MAX_DATA_COUNT+2)
347: #else
348: #define OBUF_SIZE AVAIL_OUT_SIZE(CHUNK_SIZE)
349: #endif
350:
351: /* Send a deflated token */
352: static void
1.1.1.4 ! misho 353: send_deflated_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb, int32 toklen)
1.1 misho 354: {
355: static int init_done, flush_pending;
1.1.1.4 ! misho 356: int32 n, r;
1.1 misho 357:
358: if (last_token == -1) {
359: /* initialization */
360: if (!init_done) {
361: tx_strm.next_in = NULL;
362: tx_strm.zalloc = NULL;
363: tx_strm.zfree = NULL;
364: if (deflateInit2(&tx_strm, compression_level,
365: Z_DEFLATED, -15, 8,
366: Z_DEFAULT_STRATEGY) != Z_OK) {
367: rprintf(FERROR, "compression init failed\n");
1.1.1.2 misho 368: exit_cleanup(RERR_PROTOCOL);
1.1 misho 369: }
1.1.1.4 ! misho 370: obuf = new_array(char, OBUF_SIZE);
1.1 misho 371: init_done = 1;
372: } else
373: deflateReset(&tx_strm);
374: last_run_end = 0;
375: run_start = token;
376: flush_pending = 0;
377: } else if (last_token == -2) {
378: run_start = token;
1.1.1.4 ! misho 379: } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
1.1 misho 380: /* output previous run */
381: r = run_start - last_run_end;
382: n = last_token - run_start;
383: if (r >= 0 && r <= 63) {
384: write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
385: } else {
386: write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
387: write_int(f, run_start);
388: }
389: if (n != 0) {
390: write_byte(f, n);
391: write_byte(f, n >> 8);
392: }
393: last_run_end = last_token;
394: run_start = token;
395: }
396:
397: last_token = token;
398:
399: if (nb != 0 || flush_pending) {
400: /* deflate the data starting at offset */
401: int flush = Z_NO_FLUSH;
402: tx_strm.avail_in = 0;
403: tx_strm.avail_out = 0;
404: do {
405: if (tx_strm.avail_in == 0 && nb != 0) {
406: /* give it some more input */
407: n = MIN(nb, CHUNK_SIZE);
408: tx_strm.next_in = (Bytef *)
409: map_ptr(buf, offset, n);
410: tx_strm.avail_in = n;
411: nb -= n;
412: offset += n;
413: }
414: if (tx_strm.avail_out == 0) {
415: tx_strm.next_out = (Bytef *)(obuf + 2);
416: tx_strm.avail_out = MAX_DATA_COUNT;
417: if (flush != Z_NO_FLUSH) {
418: /*
419: * We left the last 4 bytes in the
420: * buffer, in case they are the
421: * last 4. Move them to the front.
422: */
1.1.1.4 ! misho 423: memcpy(tx_strm.next_out, obuf+MAX_DATA_COUNT-2, 4);
1.1 misho 424: tx_strm.next_out += 4;
425: tx_strm.avail_out -= 4;
426: }
427: }
428: if (nb == 0 && token != -2)
429: flush = Z_SYNC_FLUSH;
430: r = deflate(&tx_strm, flush);
431: if (r != Z_OK) {
432: rprintf(FERROR, "deflate returned %d\n", r);
433: exit_cleanup(RERR_STREAMIO);
434: }
435: if (nb == 0 || tx_strm.avail_out == 0) {
436: n = MAX_DATA_COUNT - tx_strm.avail_out;
437: if (flush != Z_NO_FLUSH) {
438: /*
439: * We have to trim off the last 4
440: * bytes of output when flushing
441: * (they are just 0, 0, ff, ff).
442: */
443: n -= 4;
444: }
445: if (n > 0) {
446: obuf[0] = DEFLATED_DATA + (n >> 8);
447: obuf[1] = n;
448: write_buf(f, obuf, n+2);
449: }
450: }
451: } while (nb != 0 || tx_strm.avail_out == 0);
452: flush_pending = token == -2;
453: }
454:
455: if (token == -1) {
456: /* end of file - clean up */
457: write_byte(f, END_FLAG);
1.1.1.4 ! misho 458: } else if (token != -2 && do_compression == CPRES_ZLIB) {
1.1 misho 459: /* Add the data in the current block to the compressor's
460: * history and hash table. */
461: do {
462: /* Break up long sections in the same way that
463: * see_deflate_token() does. */
464: int32 n1 = toklen > 0xffff ? 0xffff : toklen;
465: toklen -= n1;
466: tx_strm.next_in = (Bytef *)map_ptr(buf, offset, n1);
467: tx_strm.avail_in = n1;
1.1.1.2 misho 468: if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
469: offset += n1;
1.1.1.3 misho 470: tx_strm.next_out = (Bytef *) obuf;
471: tx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
472: r = deflate(&tx_strm, Z_INSERT_ONLY);
473: if (r != Z_OK || tx_strm.avail_in != 0) {
474: rprintf(FERROR, "deflate on token returned %d (%d bytes left)\n",
475: r, tx_strm.avail_in);
476: exit_cleanup(RERR_STREAMIO);
477: }
1.1 misho 478: } while (toklen > 0);
479: }
480: }
481:
482: /* tells us what the receiver is in the middle of doing */
483: static enum { r_init, r_idle, r_running, r_inflating, r_inflated } recv_state;
484:
485: /* for inflating stuff */
486: static z_stream rx_strm;
487: static char *cbuf;
488: static char *dbuf;
489:
490: /* for decoding runs of tokens */
491: static int32 rx_token;
492: static int32 rx_run;
493:
494: /* Receive a deflated token and inflate it */
495: static int32 recv_deflated_token(int f, char **data)
496: {
497: static int init_done;
498: static int32 saved_flag;
499: int32 n, flag;
500: int r;
501:
502: for (;;) {
503: switch (recv_state) {
504: case r_init:
505: if (!init_done) {
506: rx_strm.next_out = NULL;
507: rx_strm.zalloc = NULL;
508: rx_strm.zfree = NULL;
509: if (inflateInit2(&rx_strm, -15) != Z_OK) {
510: rprintf(FERROR, "inflate init failed\n");
1.1.1.2 misho 511: exit_cleanup(RERR_PROTOCOL);
1.1 misho 512: }
1.1.1.4 ! misho 513: cbuf = new_array(char, MAX_DATA_COUNT);
! 514: dbuf = new_array(char, AVAIL_OUT_SIZE(CHUNK_SIZE));
1.1 misho 515: init_done = 1;
516: } else {
517: inflateReset(&rx_strm);
518: }
519: recv_state = r_idle;
520: rx_token = 0;
521: break;
522:
523: case r_idle:
524: case r_inflated:
525: if (saved_flag) {
526: flag = saved_flag & 0xff;
527: saved_flag = 0;
528: } else
529: flag = read_byte(f);
530: if ((flag & 0xC0) == DEFLATED_DATA) {
531: n = ((flag & 0x3f) << 8) + read_byte(f);
532: read_buf(f, cbuf, n);
533: rx_strm.next_in = (Bytef *)cbuf;
534: rx_strm.avail_in = n;
535: recv_state = r_inflating;
536: break;
537: }
538: if (recv_state == r_inflated) {
539: /* check previous inflated stuff ended correctly */
540: rx_strm.avail_in = 0;
541: rx_strm.next_out = (Bytef *)dbuf;
542: rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
543: r = inflate(&rx_strm, Z_SYNC_FLUSH);
544: n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
545: /*
546: * Z_BUF_ERROR just means no progress was
547: * made, i.e. the decompressor didn't have
548: * any pending output for us.
549: */
550: if (r != Z_OK && r != Z_BUF_ERROR) {
551: rprintf(FERROR, "inflate flush returned %d (%d bytes)\n",
552: r, n);
553: exit_cleanup(RERR_STREAMIO);
554: }
555: if (n != 0 && r != Z_BUF_ERROR) {
556: /* have to return some more data and
557: save the flag for later. */
558: saved_flag = flag + 0x10000;
559: *data = dbuf;
560: return n;
561: }
562: /*
563: * At this point the decompressor should
564: * be expecting to see the 0, 0, ff, ff bytes.
565: */
566: if (!inflateSyncPoint(&rx_strm)) {
567: rprintf(FERROR, "decompressor lost sync!\n");
568: exit_cleanup(RERR_STREAMIO);
569: }
570: rx_strm.avail_in = 4;
571: rx_strm.next_in = (Bytef *)cbuf;
572: cbuf[0] = cbuf[1] = 0;
573: cbuf[2] = cbuf[3] = 0xff;
574: inflate(&rx_strm, Z_SYNC_FLUSH);
575: recv_state = r_idle;
576: }
577: if (flag == END_FLAG) {
578: /* that's all folks */
579: recv_state = r_init;
580: return 0;
581: }
582:
583: /* here we have a token of some kind */
584: if (flag & TOKEN_REL) {
585: rx_token += flag & 0x3f;
586: flag >>= 6;
587: } else
588: rx_token = read_int(f);
589: if (flag & 1) {
590: rx_run = read_byte(f);
591: rx_run += read_byte(f) << 8;
592: recv_state = r_running;
593: }
594: return -1 - rx_token;
595:
596: case r_inflating:
597: rx_strm.next_out = (Bytef *)dbuf;
598: rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
599: r = inflate(&rx_strm, Z_NO_FLUSH);
600: n = AVAIL_OUT_SIZE(CHUNK_SIZE) - rx_strm.avail_out;
601: if (r != Z_OK) {
602: rprintf(FERROR, "inflate returned %d (%d bytes)\n", r, n);
603: exit_cleanup(RERR_STREAMIO);
604: }
605: if (rx_strm.avail_in == 0)
606: recv_state = r_inflated;
607: if (n != 0) {
608: *data = dbuf;
609: return n;
610: }
611: break;
612:
613: case r_running:
614: ++rx_token;
615: if (--rx_run == 0)
616: recv_state = r_idle;
617: return -1 - rx_token;
618: }
619: }
620: }
621:
622: /*
623: * put the data corresponding to a token that we've just returned
624: * from recv_deflated_token into the decompressor's history buffer.
625: */
626: static void see_deflate_token(char *buf, int32 len)
627: {
628: int r;
629: int32 blklen;
630: unsigned char hdr[5];
631:
632: rx_strm.avail_in = 0;
633: blklen = 0;
634: hdr[0] = 0;
635: do {
636: if (rx_strm.avail_in == 0 && len != 0) {
637: if (blklen == 0) {
638: /* Give it a fake stored-block header. */
639: rx_strm.next_in = (Bytef *)hdr;
640: rx_strm.avail_in = 5;
641: blklen = len;
642: if (blklen > 0xffff)
643: blklen = 0xffff;
644: hdr[1] = blklen;
645: hdr[2] = blklen >> 8;
646: hdr[3] = ~hdr[1];
647: hdr[4] = ~hdr[2];
648: } else {
649: rx_strm.next_in = (Bytef *)buf;
650: rx_strm.avail_in = blklen;
1.1.1.2 misho 651: if (protocol_version >= 31) /* Newer protocols avoid a data-duplicating bug */
652: buf += blklen;
1.1 misho 653: len -= blklen;
654: blklen = 0;
655: }
656: }
657: rx_strm.next_out = (Bytef *)dbuf;
658: rx_strm.avail_out = AVAIL_OUT_SIZE(CHUNK_SIZE);
659: r = inflate(&rx_strm, Z_SYNC_FLUSH);
660: if (r != Z_OK && r != Z_BUF_ERROR) {
661: rprintf(FERROR, "inflate (token) returned %d\n", r);
662: exit_cleanup(RERR_STREAMIO);
663: }
664: } while (len || rx_strm.avail_out == 0);
665: }
666:
1.1.1.4 ! misho 667: #ifdef SUPPORT_ZSTD
! 668:
! 669: static ZSTD_inBuffer zstd_in_buff;
! 670: static ZSTD_outBuffer zstd_out_buff;
! 671: static ZSTD_CCtx *zstd_cctx;
! 672:
! 673: static void send_zstd_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
! 674: {
! 675: static int comp_init_done, flush_pending;
! 676: ZSTD_EndDirective flush = ZSTD_e_continue;
! 677: int32 n, r;
! 678:
! 679: /* initialization */
! 680: if (!comp_init_done) {
! 681: zstd_cctx = ZSTD_createCCtx();
! 682: if (!zstd_cctx) {
! 683: rprintf(FERROR, "compression init failed\n");
! 684: exit_cleanup(RERR_PROTOCOL);
! 685: }
! 686:
! 687: obuf = new_array(char, OBUF_SIZE);
! 688:
! 689: ZSTD_CCtx_setParameter(zstd_cctx, ZSTD_c_compressionLevel, do_compression_level);
! 690: zstd_out_buff.dst = obuf + 2;
! 691:
! 692: comp_init_done = 1;
! 693: }
! 694:
! 695: if (last_token == -1) {
! 696: last_run_end = 0;
! 697: run_start = token;
! 698: flush_pending = 0;
! 699: } else if (last_token == -2) {
! 700: run_start = token;
! 701: } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
! 702: /* output previous run */
! 703: r = run_start - last_run_end;
! 704: n = last_token - run_start;
! 705:
! 706: if (r >= 0 && r <= 63) {
! 707: write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
! 708: } else {
! 709: write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
! 710: write_int(f, run_start);
! 711: }
! 712: if (n != 0) {
! 713: write_byte(f, n);
! 714: write_byte(f, n >> 8);
! 715: }
! 716: last_run_end = last_token;
! 717: run_start = token;
! 718: }
! 719:
! 720: last_token = token;
! 721:
! 722: if (nb || flush_pending) {
! 723:
! 724: zstd_in_buff.src = map_ptr(buf, offset, nb);
! 725: zstd_in_buff.size = nb;
! 726: zstd_in_buff.pos = 0;
! 727:
! 728: do {
! 729: if (zstd_out_buff.size == 0) {
! 730: zstd_out_buff.size = MAX_DATA_COUNT;
! 731: zstd_out_buff.pos = 0;
! 732: }
! 733:
! 734: /* File ended, flush */
! 735: if (token != -2)
! 736: flush = ZSTD_e_flush;
! 737:
! 738: r = ZSTD_compressStream2(zstd_cctx, &zstd_out_buff, &zstd_in_buff, flush);
! 739: if (ZSTD_isError(r)) {
! 740: rprintf(FERROR, "ZSTD_compressStream returned %d\n", r);
! 741: exit_cleanup(RERR_STREAMIO);
! 742: }
! 743:
! 744: /*
! 745: * Nothing is sent if the buffer isn't full so avoid smaller
! 746: * transfers. If a file is finished then we flush the internal
! 747: * state and send a smaller buffer so that the remote side can
! 748: * finish the file.
! 749: */
! 750: if (zstd_out_buff.pos == zstd_out_buff.size || flush == ZSTD_e_flush) {
! 751: n = zstd_out_buff.pos;
! 752:
! 753: obuf[0] = DEFLATED_DATA + (n >> 8);
! 754: obuf[1] = n;
! 755: write_buf(f, obuf, n+2);
! 756:
! 757: zstd_out_buff.size = 0;
! 758: }
! 759: /*
! 760: * Loop while the input buffer isn't full consumed or the
! 761: * internal state isn't fully flushed.
! 762: */
! 763: } while (zstd_in_buff.pos < zstd_in_buff.size || r > 0);
! 764: flush_pending = token == -2;
! 765: }
! 766:
! 767: if (token == -1) {
! 768: /* end of file - clean up */
! 769: write_byte(f, END_FLAG);
! 770: }
! 771: }
! 772:
! 773: static ZSTD_DCtx *zstd_dctx;
! 774:
! 775: static int32 recv_zstd_token(int f, char **data)
! 776: {
! 777: static int decomp_init_done;
! 778: static int out_buffer_size;
! 779: int32 n, flag;
! 780: int r;
! 781:
! 782: if (!decomp_init_done) {
! 783: zstd_dctx = ZSTD_createDCtx();
! 784: if (!zstd_dctx) {
! 785: rprintf(FERROR, "ZSTD_createDStream failed\n");
! 786: exit_cleanup(RERR_PROTOCOL);
! 787: }
! 788:
! 789: /* Output buffer fits two decompressed blocks */
! 790: out_buffer_size = ZSTD_DStreamOutSize() * 2;
! 791: cbuf = new_array(char, MAX_DATA_COUNT);
! 792: dbuf = new_array(char, out_buffer_size);
! 793:
! 794: zstd_in_buff.src = cbuf;
! 795: zstd_out_buff.dst = dbuf;
! 796:
! 797: decomp_init_done = 1;
! 798: }
! 799:
! 800: for (;;) {
! 801: switch (recv_state) {
! 802: case r_init:
! 803: recv_state = r_idle;
! 804: rx_token = 0;
! 805: break;
! 806:
! 807: case r_idle:
! 808: flag = read_byte(f);
! 809: if ((flag & 0xC0) == DEFLATED_DATA) {
! 810: n = ((flag & 0x3f) << 8) + read_byte(f);
! 811: read_buf(f, cbuf, n);
! 812:
! 813: zstd_in_buff.size = n;
! 814: zstd_in_buff.pos = 0;
! 815:
! 816: recv_state = r_inflating;
! 817: break;
! 818: }
! 819:
! 820: if (flag == END_FLAG) {
! 821: /* that's all folks */
! 822: recv_state = r_init;
! 823: return 0;
! 824: }
! 825: /* here we have a token of some kind */
! 826: if (flag & TOKEN_REL) {
! 827: rx_token += flag & 0x3f;
! 828: flag >>= 6;
! 829: } else
! 830: rx_token = read_int(f);
! 831: if (flag & 1) {
! 832: rx_run = read_byte(f);
! 833: rx_run += read_byte(f) << 8;
! 834: recv_state = r_running;
! 835: }
! 836: return -1 - rx_token;
! 837:
! 838: case r_inflated: /* zstd doesn't get into this state */
! 839: break;
! 840:
! 841: case r_inflating:
! 842: zstd_out_buff.size = out_buffer_size;
! 843: zstd_out_buff.pos = 0;
! 844:
! 845: r = ZSTD_decompressStream(zstd_dctx, &zstd_out_buff, &zstd_in_buff);
! 846: n = zstd_out_buff.pos;
! 847: if (ZSTD_isError(r)) {
! 848: rprintf(FERROR, "ZSTD decomp returned %d (%d bytes)\n", r, n);
! 849: exit_cleanup(RERR_STREAMIO);
! 850: }
! 851:
! 852: /*
! 853: * If the input buffer is fully consumed and the output
! 854: * buffer is not full then next step is to read more
! 855: * data.
! 856: */
! 857: if (zstd_in_buff.size == zstd_in_buff.pos && n < out_buffer_size)
! 858: recv_state = r_idle;
! 859:
! 860: if (n != 0) {
! 861: *data = dbuf;
! 862: return n;
! 863: }
! 864: break;
! 865:
! 866: case r_running:
! 867: ++rx_token;
! 868: if (--rx_run == 0)
! 869: recv_state = r_idle;
! 870: return -1 - rx_token;
! 871: }
! 872: }
! 873: }
! 874: #endif /* SUPPORT_ZSTD */
! 875:
! 876: #ifdef SUPPORT_LZ4
! 877: static void
! 878: send_compressed_token(int f, int32 token, struct map_struct *buf, OFF_T offset, int32 nb)
! 879: {
! 880: static int init_done, flush_pending;
! 881: int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
! 882: int32 n, r;
! 883:
! 884: if (last_token == -1) {
! 885: if (!init_done) {
! 886: obuf = new_array(char, size);
! 887: init_done = 1;
! 888: }
! 889: last_run_end = 0;
! 890: run_start = token;
! 891: flush_pending = 0;
! 892: } else if (last_token == -2) {
! 893: run_start = token;
! 894: } else if (nb != 0 || token != last_token + 1 || token >= run_start + 65536) {
! 895: /* output previous run */
! 896: r = run_start - last_run_end;
! 897: n = last_token - run_start;
! 898: if (r >= 0 && r <= 63) {
! 899: write_byte(f, (n==0? TOKEN_REL: TOKENRUN_REL) + r);
! 900: } else {
! 901: write_byte(f, (n==0? TOKEN_LONG: TOKENRUN_LONG));
! 902: write_int(f, run_start);
! 903: }
! 904: if (n != 0) {
! 905: write_byte(f, n);
! 906: write_byte(f, n >> 8);
! 907: }
! 908: last_run_end = last_token;
! 909: run_start = token;
! 910: }
! 911:
! 912: last_token = token;
! 913:
! 914: if (nb != 0 || flush_pending) {
! 915: int available_in, available_out = 0;
! 916: const char *next_in;
! 917:
! 918: do {
! 919: char *next_out = obuf + 2;
! 920:
! 921: if (available_out == 0) {
! 922: available_in = MIN(nb, MAX_DATA_COUNT);
! 923: next_in = map_ptr(buf, offset, available_in);
! 924: } else
! 925: available_in /= 2;
! 926:
! 927: available_out = LZ4_compress_default(next_in, next_out, available_in, size - 2);
! 928: if (!available_out) {
! 929: rprintf(FERROR, "compress returned %d\n", available_out);
! 930: exit_cleanup(RERR_STREAMIO);
! 931: }
! 932: if (available_out <= MAX_DATA_COUNT) {
! 933: obuf[0] = DEFLATED_DATA + (available_out >> 8);
! 934: obuf[1] = available_out;
! 935:
! 936: write_buf(f, obuf, available_out + 2);
! 937:
! 938: available_out = 0;
! 939: nb -= available_in;
! 940: offset += available_in;
! 941: }
! 942: } while (nb != 0);
! 943: flush_pending = token == -2;
! 944: }
! 945: if (token == -1) {
! 946: /* end of file - clean up */
! 947: write_byte(f, END_FLAG);
! 948: }
! 949: }
! 950:
! 951: static int32 recv_compressed_token(int f, char **data)
! 952: {
! 953: static int init_done;
! 954: int32 n, flag;
! 955: int size = MAX(LZ4_compressBound(CHUNK_SIZE), MAX_DATA_COUNT+2);
! 956: static const char *next_in;
! 957: static int avail_in;
! 958: int avail_out;
! 959:
! 960: for (;;) {
! 961: switch (recv_state) {
! 962: case r_init:
! 963: if (!init_done) {
! 964: cbuf = new_array(char, MAX_DATA_COUNT);
! 965: dbuf = new_array(char, size);
! 966: init_done = 1;
! 967: }
! 968: recv_state = r_idle;
! 969: rx_token = 0;
! 970: break;
! 971:
! 972: case r_idle:
! 973: flag = read_byte(f);
! 974: if ((flag & 0xC0) == DEFLATED_DATA) {
! 975: n = ((flag & 0x3f) << 8) + read_byte(f);
! 976: read_buf(f, cbuf, n);
! 977: next_in = (char *)cbuf;
! 978: avail_in = n;
! 979: recv_state = r_inflating;
! 980: break;
! 981: }
! 982:
! 983: if (flag == END_FLAG) {
! 984: /* that's all folks */
! 985: recv_state = r_init;
! 986: return 0;
! 987: }
! 988:
! 989: /* here we have a token of some kind */
! 990: if (flag & TOKEN_REL) {
! 991: rx_token += flag & 0x3f;
! 992: flag >>= 6;
! 993: } else
! 994: rx_token = read_int(f);
! 995: if (flag & 1) {
! 996: rx_run = read_byte(f);
! 997: rx_run += read_byte(f) << 8;
! 998: recv_state = r_running;
! 999: }
! 1000: return -1 - rx_token;
! 1001:
! 1002: case r_inflating:
! 1003: avail_out = LZ4_decompress_safe(next_in, dbuf, avail_in, size);
! 1004: if (avail_out < 0) {
! 1005: rprintf(FERROR, "uncompress failed: %d\n", avail_out);
! 1006: exit_cleanup(RERR_STREAMIO);
! 1007: }
! 1008: recv_state = r_idle;
! 1009: *data = dbuf;
! 1010: return avail_out;
! 1011:
! 1012: case r_inflated: /* lz4 doesn't get into this state */
! 1013: break;
! 1014:
! 1015: case r_running:
! 1016: ++rx_token;
! 1017: if (--rx_run == 0)
! 1018: recv_state = r_idle;
! 1019: return -1 - rx_token;
! 1020: }
! 1021: }
! 1022: }
! 1023: #endif /* SUPPORT_LZ4 */
! 1024:
1.1 misho 1025: /**
1026: * Transmit a verbatim buffer of length @p n followed by a token.
1027: * If token == -1 then we have reached EOF
1028: * If n == 0 then don't send a buffer
1029: */
1030: void send_token(int f, int32 token, struct map_struct *buf, OFF_T offset,
1031: int32 n, int32 toklen)
1032: {
1.1.1.4 ! misho 1033: switch (do_compression) {
! 1034: case CPRES_NONE:
1.1 misho 1035: simple_send_token(f, token, buf, offset, n);
1.1.1.4 ! misho 1036: break;
! 1037: case CPRES_ZLIB:
! 1038: case CPRES_ZLIBX:
1.1 misho 1039: send_deflated_token(f, token, buf, offset, n, toklen);
1.1.1.4 ! misho 1040: break;
! 1041: #ifdef SUPPORT_ZSTD
! 1042: case CPRES_ZSTD:
! 1043: send_zstd_token(f, token, buf, offset, n);
! 1044: break;
! 1045: #endif
! 1046: #ifdef SUPPORT_LZ4
! 1047: case CPRES_LZ4:
! 1048: send_compressed_token(f, token, buf, offset, n);
! 1049: break;
! 1050: #endif
! 1051: default:
! 1052: NOISY_DEATH("Unknown do_compression value");
! 1053: }
1.1 misho 1054: }
1055:
1056: /*
1.1.1.4 ! misho 1057: * receive a token or buffer from the other end. If the return value is >0 then
1.1 misho 1058: * it is a data buffer of that length, and *data will point at the data.
1059: * if the return value is -i then it represents token i-1
1060: * if the return value is 0 then the end has been reached
1061: */
1062: int32 recv_token(int f, char **data)
1063: {
1.1.1.4 ! misho 1064: switch (do_compression) {
! 1065: case CPRES_NONE:
! 1066: return simple_recv_token(f,data);
! 1067: case CPRES_ZLIB:
! 1068: case CPRES_ZLIBX:
! 1069: return recv_deflated_token(f, data);
! 1070: #ifdef SUPPORT_ZSTD
! 1071: case CPRES_ZSTD:
! 1072: return recv_zstd_token(f, data);
! 1073: #endif
! 1074: #ifdef SUPPORT_LZ4
! 1075: case CPRES_LZ4:
! 1076: return recv_compressed_token(f, data);
! 1077: #endif
! 1078: default:
! 1079: NOISY_DEATH("Unknown do_compression value");
1.1 misho 1080: }
1081: }
1082:
1083: /*
1084: * look at the data corresponding to a token, if necessary
1085: */
1086: void see_token(char *data, int32 toklen)
1087: {
1.1.1.4 ! misho 1088: switch (do_compression) {
! 1089: case CPRES_NONE:
! 1090: break;
! 1091: case CPRES_ZLIB:
1.1 misho 1092: see_deflate_token(data, toklen);
1.1.1.4 ! misho 1093: break;
! 1094: case CPRES_ZLIBX:
! 1095: break;
! 1096: #ifdef SUPPORT_ZSTD
! 1097: case CPRES_ZSTD:
! 1098: break;
! 1099: #endif
! 1100: #ifdef SUPPORT_LZ4
! 1101: case CPRES_LZ4:
! 1102: /*see_uncompressed_token(data, toklen);*/
! 1103: break;
! 1104: #endif
! 1105: default:
! 1106: NOISY_DEATH("Unknown do_compression value");
! 1107: }
1.1 misho 1108: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>