1: /*
2: * Routines only used by the sending process.
3: *
4: * Copyright (C) 1996 Andrew Tridgell
5: * Copyright (C) 1996 Paul Mackerras
6: * Copyright (C) 2003-2020 Wayne Davison
7: *
8: * This program is free software; you can redistribute it and/or modify
9: * it under the terms of the GNU General Public License as published by
10: * the Free Software Foundation; either version 3 of the License, or
11: * (at your option) any later version.
12: *
13: * This program is distributed in the hope that it will be useful,
14: * but WITHOUT ANY WARRANTY; without even the implied warranty of
15: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16: * GNU General Public License for more details.
17: *
18: * You should have received a copy of the GNU General Public License along
19: * with this program; if not, visit the http://fsf.org website.
20: */
21:
22: #include "rsync.h"
23: #include "inums.h"
24: #include "ifuncs.h"
25:
26: extern int do_xfers;
27: extern int am_server;
28: extern int am_daemon;
29: extern int inc_recurse;
30: extern int log_before_transfer;
31: extern int stdout_format_has_i;
32: extern int logfile_format_has_i;
33: extern int want_xattr_optim;
34: extern int csum_length;
35: extern int append_mode;
36: extern int copy_links;
37: extern int io_error;
38: extern int flist_eof;
39: extern int allowed_lull;
40: extern int preserve_xattrs;
41: extern int protocol_version;
42: extern int remove_source_files;
43: extern int updating_basis_file;
44: extern int make_backups;
45: extern int make_source_backups;
46: extern int inplace;
47: extern int inplace_partial;
48: extern int batch_fd;
49: extern int write_batch;
50: extern int file_old_total;
51: extern BOOL want_progress_now;
52: extern char *source_filter;
53: extern struct stats stats;
54: extern struct file_list *cur_flist, *first_flist, *dir_flist;
55:
56: BOOL extra_flist_sending_enabled;
57:
58: /**
59: * @file
60: *
61: * The sender gets checksums from the generator, calculates deltas,
62: * and transmits them to the receiver. The sender process runs on the
63: * machine holding the source files.
64: **/
65:
66: /**
67: * Receive the checksums for a buffer
68: **/
69: static struct sum_struct *receive_sums(int f)
70: {
71: struct sum_struct *s = new(struct sum_struct);
72: int lull_mod = protocol_version >= 31 ? 0 : allowed_lull * 5;
73: OFF_T offset = 0;
74: int32 i;
75:
76: read_sum_head(f, s);
77:
78: s->sums = NULL;
79:
80: if (DEBUG_GTE(DELTASUM, 3)) {
81: rprintf(FINFO, "count=%s n=%ld rem=%ld\n",
82: big_num(s->count), (long)s->blength, (long)s->remainder);
83: }
84:
85: if (append_mode > 0) {
86: s->flength = (OFF_T)s->count * s->blength;
87: if (s->remainder)
88: s->flength -= s->blength - s->remainder;
89: return s;
90: }
91:
92: if (s->count == 0)
93: return(s);
94:
95: s->sums = new_array(struct sum_buf, s->count);
96:
97: for (i = 0; i < s->count; i++) {
98: s->sums[i].sum1 = read_int(f);
99: read_buf(f, s->sums[i].sum2, s->s2length);
100:
101: s->sums[i].offset = offset;
102: s->sums[i].flags = 0;
103:
104: if (i == s->count-1 && s->remainder != 0)
105: s->sums[i].len = s->remainder;
106: else
107: s->sums[i].len = s->blength;
108: offset += s->sums[i].len;
109:
110: if (lull_mod && !(i % lull_mod))
111: maybe_send_keepalive(time(NULL), True);
112:
113: if (DEBUG_GTE(DELTASUM, 3)) {
114: rprintf(FINFO,
115: "chunk[%d] len=%d offset=%s sum1=%08x\n",
116: i, s->sums[i].len, big_num(s->sums[i].offset),
117: s->sums[i].sum1);
118: }
119: }
120:
121: s->flength = offset;
122:
123: return s;
124: }
125:
126: void successful_send(int ndx)
127: {
128: char fname[MAXPATHLEN];
129: char *failed_op;
130: struct file_struct *file;
131: struct file_list *flist;
132: STRUCT_STAT st;
133: int result;
134:
135: if (!remove_source_files)
136: return;
137:
138: flist = flist_for_ndx(ndx, "successful_send");
139: file = flist->files[ndx - flist->ndx_start];
140: if (!change_pathname(file, NULL, 0))
141: return;
142: f_name(file, fname);
143:
144: if ((copy_links ? do_stat(fname, &st) : do_lstat(fname, &st)) < 0) {
145: failed_op = "re-lstat";
146: goto failed;
147: }
148:
149: if (st.st_size != F_LENGTH(file) || st.st_mtime != file->modtime
150: #ifdef ST_MTIME_NSEC
151: || (NSEC_BUMP(file) && (uint32)st.ST_MTIME_NSEC != F_MOD_NSEC(file))
152: #endif
153: ) {
154: rprintf(FERROR_XFER, "ERROR: Skipping sender remove for changed file: %s\n", fname);
155: return;
156: }
157:
158: if (make_source_backups)
159: result = !make_backup(fname, True);
160: else
161: result = do_unlink(fname);
162: if (result < 0) {
163: failed_op = "remove";
164: failed:
165: if (errno == ENOENT)
166: rprintf(FINFO, "sender file already removed: %s\n", fname);
167: else
168: rsyserr(FERROR_XFER, errno, "sender failed to %s %s", failed_op, fname);
169: } else {
170: if (INFO_GTE(REMOVE, 1))
171: rprintf(FINFO, "sender removed %s\n", fname);
172: }
173: }
174:
175: static void write_ndx_and_attrs(int f_out, int ndx, int iflags,
176: const char *fname, struct file_struct *file,
177: uchar fnamecmp_type, char *buf, int len)
178: {
179: write_ndx(f_out, ndx);
180: if (protocol_version < 29)
181: return;
182: write_shortint(f_out, iflags);
183: if (iflags & ITEM_BASIS_TYPE_FOLLOWS)
184: write_byte(f_out, fnamecmp_type);
185: if (iflags & ITEM_XNAME_FOLLOWS)
186: write_vstring(f_out, buf, len);
187: #ifdef SUPPORT_XATTRS
188: if (preserve_xattrs && iflags & ITEM_REPORT_XATTR && do_xfers
189: && !(want_xattr_optim && BITS_SET(iflags, ITEM_XNAME_FOLLOWS|ITEM_LOCAL_CHANGE)))
190: send_xattr_request(fname, file, f_out);
191: #endif
192: }
193:
194: void send_files(int f_in, int f_out)
195: {
196: int fd = -1;
197: struct sum_struct *s;
198: struct map_struct *mbuf = NULL;
199: STRUCT_STAT st;
200: char fname[MAXPATHLEN], xname[MAXPATHLEN];
201: const char *path, *slash;
202: uchar fnamecmp_type;
203: int iflags, xlen;
204: struct file_struct *file;
205: int phase = 0, max_phase = protocol_version >= 29 ? 2 : 1;
206: int itemizing = am_server ? logfile_format_has_i : stdout_format_has_i;
207: enum logcode log_code = log_before_transfer ? FLOG : FINFO;
208: int f_xfer = write_batch < 0 ? batch_fd : f_out;
209: int save_io_error = io_error;
210: int ndx, j;
211: char *filter_argv[MAX_FILTER_ARGS + 1];
212: char *tmp = 0;
213: int unlink_tmp = 0;
214:
215: if (source_filter) {
216: char *p;
217: char *sep = " \t";
218: int i;
219: for (p = strtok(source_filter, sep), i = 0;
220: p && i < MAX_FILTER_ARGS;
221: p = strtok(0, sep))
222: filter_argv[i++] = p;
223: filter_argv[i] = NULL;
224: if (p) {
225: rprintf(FERROR,
226: "Too many arguments to source-filter (> %d)\n",
227: MAX_FILTER_ARGS);
228: exit_cleanup(RERR_SYNTAX);
229: }
230: }
231:
232: if (DEBUG_GTE(SEND, 1))
233: rprintf(FINFO, "send_files starting\n");
234:
235: progress_init();
236:
237: while (1) {
238: if (inc_recurse) {
239: send_extra_file_list(f_out, MIN_FILECNT_LOOKAHEAD);
240: extra_flist_sending_enabled = !flist_eof;
241: }
242:
243: /* This call also sets cur_flist. */
244: ndx = read_ndx_and_attrs(f_in, f_out, &iflags, &fnamecmp_type,
245: xname, &xlen);
246: extra_flist_sending_enabled = False;
247:
248: if (ndx == NDX_DONE) {
249: if (!am_server && cur_flist) {
250: set_current_file_index(NULL, 0);
251: if (INFO_GTE(PROGRESS, 2))
252: end_progress(0);
253: }
254: if (inc_recurse && first_flist) {
255: file_old_total -= first_flist->used;
256: flist_free(first_flist);
257: if (first_flist) {
258: if (first_flist == cur_flist)
259: file_old_total = cur_flist->used;
260: write_ndx(f_out, NDX_DONE);
261: continue;
262: }
263: }
264: if (++phase > max_phase)
265: break;
266: if (DEBUG_GTE(SEND, 1))
267: rprintf(FINFO, "send_files phase=%d\n", phase);
268: write_ndx(f_out, NDX_DONE);
269: continue;
270: }
271:
272: if (inc_recurse)
273: send_extra_file_list(f_out, MIN_FILECNT_LOOKAHEAD);
274:
275: if (ndx - cur_flist->ndx_start >= 0)
276: file = cur_flist->files[ndx - cur_flist->ndx_start];
277: else
278: file = dir_flist->files[cur_flist->parent_ndx];
279: if (F_PATHNAME(file)) {
280: path = F_PATHNAME(file);
281: slash = "/";
282: } else {
283: path = slash = "";
284: }
285: if (!change_pathname(file, NULL, 0))
286: continue;
287: f_name(file, fname);
288:
289: if (DEBUG_GTE(SEND, 1))
290: rprintf(FINFO, "send_files(%d, %s%s%s)\n", ndx, path,slash,fname);
291:
292: #ifdef SUPPORT_XATTRS
293: if (preserve_xattrs && iflags & ITEM_REPORT_XATTR && do_xfers
294: && !(want_xattr_optim && BITS_SET(iflags, ITEM_XNAME_FOLLOWS|ITEM_LOCAL_CHANGE)))
295: recv_xattr_request(file, f_in);
296: #endif
297:
298: if (!(iflags & ITEM_TRANSFER)) {
299: maybe_log_item(file, iflags, itemizing, xname);
300: write_ndx_and_attrs(f_out, ndx, iflags, fname, file, fnamecmp_type, xname, xlen);
301: if (iflags & ITEM_IS_NEW) {
302: stats.created_files++;
303: if (S_ISREG(file->mode)) {
304: /* Nothing further to count. */
305: } else if (S_ISDIR(file->mode))
306: stats.created_dirs++;
307: #ifdef SUPPORT_LINKS
308: else if (S_ISLNK(file->mode))
309: stats.created_symlinks++;
310: #endif
311: else if (IS_DEVICE(file->mode))
312: stats.created_devices++;
313: else
314: stats.created_specials++;
315: }
316: continue;
317: }
318: if (phase == 2) {
319: rprintf(FERROR,
320: "got transfer request in phase 2 [%s]\n",
321: who_am_i());
322: exit_cleanup(RERR_PROTOCOL);
323: }
324:
325: if (file->flags & FLAG_FILE_SENT) {
326: if (csum_length == SHORT_SUM_LENGTH) {
327: /* For inplace: redo phase turns off the backup
328: * flag so that we do a regular inplace send. */
329: make_backups = -make_backups;
330: append_mode = -append_mode;
331: csum_length = SUM_LENGTH;
332: }
333: } else {
334: if (csum_length != SHORT_SUM_LENGTH) {
335: make_backups = -make_backups;
336: append_mode = -append_mode;
337: csum_length = SHORT_SUM_LENGTH;
338: }
339: if (iflags & ITEM_IS_NEW)
340: stats.created_files++;
341: }
342:
343: updating_basis_file = (inplace_partial && fnamecmp_type == FNAMECMP_PARTIAL_DIR)
344: || (inplace && (protocol_version >= 29 ? fnamecmp_type == FNAMECMP_FNAME : make_backups <= 0));
345:
346: if (!am_server)
347: set_current_file_index(file, ndx);
348: stats.xferred_files++;
349: stats.total_transferred_size += F_LENGTH(file);
350:
351: remember_initial_stats();
352:
353: if (!do_xfers) { /* log the transfer */
354: log_item(FCLIENT, file, iflags, NULL);
355: write_ndx_and_attrs(f_out, ndx, iflags, fname, file, fnamecmp_type, xname, xlen);
356: continue;
357: }
358:
359: if (!(s = receive_sums(f_in))) {
360: io_error |= IOERR_GENERAL;
361: rprintf(FERROR_XFER, "receive_sums failed\n");
362: exit_cleanup(RERR_PROTOCOL);
363: }
364:
365: unlink_tmp = 0;
366: fd = do_open(fname, O_RDONLY, 0);
367: if (fd == -1) {
368: if (errno == ENOENT) {
369: enum logcode c = am_daemon && protocol_version < 28 ? FERROR : FWARNING;
370: io_error |= IOERR_VANISHED;
371: rprintf(c, "file has vanished: %s\n",
372: full_fname(fname));
373: } else {
374: io_error |= IOERR_GENERAL;
375: rsyserr(FERROR_XFER, errno,
376: "send_files failed to open %s",
377: full_fname(fname));
378: }
379: free_sums(s);
380: if (protocol_version >= 30)
381: send_msg_int(MSG_NO_SEND, ndx);
382: continue;
383: }
384:
385: if (source_filter) {
386: int fd2;
387: char *tmpl = "/tmp/rsync-filtered_sourceXXXXXX";
388:
389: tmp = strdup(tmpl);
390: fd2 = mkstemp(tmp);
391: if (fd2 == -1) {
392: rprintf(FERROR, "mkstemp %s failed: %s\n",
393: tmp, strerror(errno));
394: } else {
395: int status;
396: pid_t pid = run_filter_on_file(filter_argv, fd2, fd);
397: close(fd);
398: close(fd2);
399: wait_process_with_flush(pid, &status);
400: if (status != 0) {
401: rprintf(FERROR,
402: "bypassing source filter %s; exited with code: %d\n",
403: source_filter, status);
404: fd = do_open(fname, O_RDONLY, 0);
405: } else {
406: fd = do_open(tmp, O_RDONLY, 0);
407: unlink_tmp = 1;
408: }
409: }
410: }
411:
412: /* map the local file */
413: if (do_fstat(fd, &st) != 0) {
414: io_error |= IOERR_GENERAL;
415: rsyserr(FERROR_XFER, errno, "fstat failed");
416: free_sums(s);
417: close(fd);
418: exit_cleanup(RERR_FILEIO);
419: }
420:
421: if (IS_DEVICE(st.st_mode) && st.st_size == 0)
422: st.st_size = get_device_size(fd, fname);
423:
424: if (st.st_size) {
425: int32 read_size = MAX(s->blength * 3, MAX_MAP_SIZE);
426: mbuf = map_file(fd, st.st_size, read_size, s->blength);
427: } else
428: mbuf = NULL;
429:
430: if (DEBUG_GTE(DELTASUM, 2)) {
431: rprintf(FINFO, "send_files mapped %s%s%s of size %s\n",
432: path,slash,fname, big_num(st.st_size));
433: }
434:
435: write_ndx_and_attrs(f_out, ndx, iflags, fname, file, fnamecmp_type, xname, xlen);
436: write_sum_head(f_xfer, s);
437:
438: if (DEBUG_GTE(DELTASUM, 2))
439: rprintf(FINFO, "calling match_sums %s%s%s\n", path,slash,fname);
440:
441: if (log_before_transfer)
442: log_item(FCLIENT, file, iflags, NULL);
443: else if (!am_server && INFO_GTE(NAME, 1) && INFO_EQ(PROGRESS, 1))
444: rprintf(FCLIENT, "%s\n", fname);
445:
446: set_compression(fname);
447:
448: match_sums(f_xfer, s, mbuf, st.st_size);
449: if (INFO_GTE(PROGRESS, 1))
450: end_progress(st.st_size);
451: else if (want_progress_now)
452: instant_progress(fname);
453:
454: log_item(log_code, file, iflags, NULL);
455:
456: if (mbuf) {
457: j = unmap_file(mbuf);
458: if (j) {
459: io_error |= IOERR_GENERAL;
460: rsyserr(FERROR_XFER, j,
461: "read errors mapping %s",
462: full_fname(fname));
463: }
464: }
465: close(fd);
466: if (unlink_tmp)
467: unlink(tmp);
468:
469: free_sums(s);
470:
471: if (DEBUG_GTE(SEND, 1))
472: rprintf(FINFO, "sender finished %s%s%s\n", path,slash,fname);
473:
474: /* Flag that we actually sent this entry. */
475: file->flags |= FLAG_FILE_SENT;
476: }
477: if (make_backups < 0)
478: make_backups = -make_backups;
479:
480: if (io_error != save_io_error && protocol_version >= 30)
481: send_msg_int(MSG_IO_ERROR, io_error);
482:
483: if (DEBUG_GTE(SEND, 1))
484: rprintf(FINFO, "send files finished\n");
485:
486: match_report();
487:
488: write_ndx(f_out, NDX_DONE);
489: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>