--- embedaddon/rsync/io.c 2016/11/01 09:54:32 1.1.1.3 +++ embedaddon/rsync/io.c 2021/03/17 00:32:36 1.1.1.4 @@ -4,7 +4,7 @@ * Copyright (C) 1996-2001 Andrew Tridgell * Copyright (C) 1996 Paul Mackerras * Copyright (C) 2001, 2002 Martin Pool - * Copyright (C) 2003-2015 Wayne Davison + * Copyright (C) 2003-2020 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -41,9 +41,12 @@ extern int am_server; extern int am_sender; extern int am_receiver; extern int am_generator; +extern int local_server; extern int msgs2stderr; extern int inc_recurse; +extern int same_db; extern int io_error; +extern int batch_fd; extern int eol_nulls; extern int flist_eof; extern int file_total; @@ -53,12 +56,15 @@ extern int read_batch; extern int compat_flags; extern int protect_args; extern int checksum_seed; +extern int checksum_files; +extern int daemon_connection; extern int protocol_version; extern int remove_source_files; extern int preserve_hard_links; extern BOOL extra_flist_sending_enabled; extern BOOL flush_ok_after_signal; extern struct stats stats; +extern time_t stop_at_utime; extern struct file_list *cur_flist; #ifdef ICONV_OPTION extern int filesfrom_convert; @@ -67,7 +73,6 @@ extern iconv_t ic_send, ic_recv; int csum_length = SHORT_SUM_LENGTH; /* initial value */ int allowed_lull = 0; -int batch_fd = -1; int msgdone_cnt = 0; int forward_flist_data = 0; BOOL flist_receiving_enabled = False; @@ -251,8 +256,7 @@ static size_t safe_read(int fd, char *buf, size_t len) cnt = select(fd+1, &r_fds, NULL, &e_fds, &tv); if (cnt <= 0) { if (cnt < 0 && errno == EBADF) { - rsyserr(FERROR, errno, "safe_read select failed [%s]", - who_am_i()); + rsyserr(FERROR, errno, "safe_read select failed"); exit_cleanup(RERR_FILEIO); } check_timeout(1, MSK_ALLOW_FLUSH); @@ -271,8 +275,7 @@ static size_t safe_read(int fd, char *buf, size_t len) if (n < 0) { if (errno == EINTR) continue; - rsyserr(FERROR, errno, "safe_read failed to read %ld bytes [%s]", - (long)len, who_am_i()); + rsyserr(FERROR, errno, "safe_read failed to read %ld bytes", (long)len); exit_cleanup(RERR_STREAMIO); } if ((got += (size_t)n) == len) @@ -315,8 +318,8 @@ static void safe_write(int fd, const char *buf, size_t if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { write_failed: rsyserr(FERROR, errno, - "safe_write failed to write %ld bytes to %s [%s]", - (long)len, what_fd_is(fd), who_am_i()); + "safe_write failed to write %ld bytes to %s", + (long)len, what_fd_is(fd)); exit_cleanup(RERR_STREAMIO); } } else { @@ -337,8 +340,7 @@ static void safe_write(int fd, const char *buf, size_t cnt = select(fd + 1, NULL, &w_fds, NULL, &tv); if (cnt <= 0) { if (cnt < 0 && errno == EBADF) { - rsyserr(FERROR, errno, "safe_write select failed on %s [%s]", - what_fd_is(fd), who_am_i()); + rsyserr(FERROR, errno, "safe_write select failed on %s", what_fd_is(fd)); exit_cleanup(RERR_FILEIO); } if (io_timeout) @@ -466,7 +468,7 @@ void reduce_iobuf_size(xbuf *out, size_t new_size) { if (new_size < out->size) { /* Avoid weird buffer interactions by only outputting this to stderr. */ - if (msgs2stderr && DEBUG_GTE(IO, 4)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 4)) { const char *name = out == &iobuf.out ? "iobuf.out" : out == &iobuf.msg ? "iobuf.msg" : NULL; @@ -484,7 +486,7 @@ void restore_iobuf_size(xbuf *out) if (IOBUF_WAS_REDUCED(out->size)) { size_t new_size = IOBUF_RESTORE_SIZE(out->size); /* Avoid weird buffer interactions by only outputting this to stderr. */ - if (msgs2stderr && DEBUG_GTE(IO, 4)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 4)) { const char *name = out == &iobuf.out ? "iobuf.out" : out == &iobuf.msg ? "iobuf.msg" : NULL; @@ -568,7 +570,7 @@ static char *perform_io(size_t needed, int flags) exit_cleanup(RERR_PROTOCOL); } - if (msgs2stderr && DEBUG_GTE(IO, 3)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, %sinput)\n", who_am_i(), (long)needed, flags & PIO_CONSUME_INPUT ? "consume&" : ""); } @@ -582,7 +584,7 @@ static char *perform_io(size_t needed, int flags) exit_cleanup(RERR_PROTOCOL); } - if (msgs2stderr && DEBUG_GTE(IO, 3)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n", who_am_i(), (long)needed, iobuf.out.len + needed > iobuf.out.size @@ -598,7 +600,7 @@ static char *perform_io(size_t needed, int flags) exit_cleanup(RERR_PROTOCOL); } - if (msgs2stderr && DEBUG_GTE(IO, 3)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n", who_am_i(), (long)needed, iobuf.msg.len + needed > iobuf.msg.size @@ -607,7 +609,7 @@ static char *perform_io(size_t needed, int flags) break; case 0: - if (msgs2stderr && DEBUG_GTE(IO, 3)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) rprintf(FINFO, "[%s] perform_io(%ld, %d)\n", who_am_i(), (long)needed, flags); break; @@ -665,7 +667,7 @@ static char *perform_io(size_t needed, int flags) SIVAL(iobuf.out.buf + iobuf.raw_data_header_pos, 0, ((MPLEX_BASE + (int)MSG_DATA)<<24) + iobuf.out.len - 4); - if (msgs2stderr && DEBUG_GTE(IO, 1)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 1)) { rprintf(FINFO, "[%s] send_msg(%d, %ld)\n", who_am_i(), (int)MSG_DATA, (long)iobuf.out.len - 4); } @@ -785,12 +787,16 @@ static char *perform_io(size_t needed, int flags) exit_cleanup(RERR_SOCKETIO); } } - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] recv=%ld\n", who_am_i(), (long)n); - if (io_timeout) { + if (io_timeout || stop_at_utime) { last_io_in = time(NULL); - if (flags & PIO_NEED_INPUT) + if (stop_at_utime && last_io_in >= stop_at_utime) { + rprintf(FERROR, "stopping at requested limit\n"); + exit_cleanup(RERR_TIMEOUT); + } + if (io_timeout && flags & PIO_NEED_INPUT) maybe_send_keepalive(last_io_in, 0); } stats.total_read += n; @@ -815,12 +821,12 @@ static char *perform_io(size_t needed, int flags) msgs2stderr = 1; iobuf.out_fd = -2; iobuf.out.len = iobuf.msg.len = iobuf.raw_flushing_ends_before = 0; - rsyserr(FERROR_SOCKET, errno, "[%s] write error", who_am_i()); + rsyserr(FERROR_SOCKET, errno, "write error"); drain_multiplex_messages(); exit_cleanup(RERR_SOCKETIO); } } - if (msgs2stderr && DEBUG_GTE(IO, 2)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { rprintf(FINFO, "[%s] %s sent=%ld\n", who_am_i(), out == &iobuf.out ? "out" : "msg", (long)n); } @@ -918,6 +924,10 @@ void noop_io_until_death(void) if (!iobuf.in.buf || !iobuf.out.buf || iobuf.in_fd < 0 || iobuf.out_fd < 0 || kluge_around_eof) return; + /* If we're talking to a daemon over a socket, don't short-circuit this logic */ + if (msgs2stderr && daemon_connection >= 0) + return; + kluge_around_eof = 2; /* Setting an I/O timeout ensures that if something inexplicably weird * happens, we won't hang around forever. */ @@ -933,7 +943,7 @@ int send_msg(enum msgcode code, const char *buf, size_ { char *hdr; size_t needed, pos; - BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr || code != MSG_INFO); + BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr == 1 || code != MSG_INFO); if (!OUT_MULTIPLEXED) return 0; @@ -954,8 +964,17 @@ int send_msg(enum msgcode code, const char *buf, size_ } else #endif needed = len + 4 + 3; - if (iobuf.msg.len + needed > iobuf.msg.size) - perform_io(needed, PIO_NEED_MSGROOM); + if (iobuf.msg.len + needed > iobuf.msg.size) { + if (!am_receiver) + perform_io(needed, PIO_NEED_MSGROOM); + else { /* We allow the receiver to increase their iobuf.msg size to avoid a deadlock. */ + size_t old_size = iobuf.msg.size; + restore_iobuf_size(&iobuf.msg); + realloc_xbuf(&iobuf.msg, iobuf.msg.size * 2); + if (iobuf.msg.pos + iobuf.msg.len > old_size) + memcpy(iobuf.msg.buf + old_size, iobuf.msg.buf, iobuf.msg.pos + iobuf.msg.len - old_size); + } + } pos = iobuf.msg.pos + iobuf.msg.len; /* Must be set after any flushing. */ if (pos >= iobuf.msg.size) @@ -1050,6 +1069,9 @@ static void got_flist_entry_status(enum festatus statu if (inc_recurse) flist->in_progress++; } + } else if (checksum_files & CSF_UPDATE) { + struct file_struct *file = flist->files[ndx - flist->ndx_start]; + set_cached_checksum(flist, file); } #endif break; @@ -1113,8 +1135,7 @@ static void check_for_d_option_error(const char *msg) } if (saw_d) { - rprintf(FWARNING, - "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n"); + rprintf(FWARNING, "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n"); } } @@ -1176,7 +1197,7 @@ int read_line(int fd, char *buf, size_t bufsiz, int fl #ifdef ICONV_OPTION if (flags & RL_CONVERT && iconv_buf.size < bufsiz) - realloc_xbuf(&iconv_buf, bufsiz + 1024); + realloc_xbuf(&iconv_buf, ROUND_UP_1024(bufsiz) + 1024); #endif start: @@ -1234,8 +1255,7 @@ void read_args(int f_in, char *mod_name, char *buf, si rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0); #endif - if (!(argv = new_array(char *, maxargs))) - out_of_memory("read_args"); + argv = new_array(char *, maxargs); if (mod_name && !protect_args) argv[argc++] = "rsyncd"; @@ -1248,8 +1268,7 @@ void read_args(int f_in, char *mod_name, char *buf, si if (argc == maxargs-1) { maxargs += MAX_ARGS; - if (!(argv = realloc_array(argv, char *, maxargs))) - out_of_memory("read_args"); + argv = realloc_array(argv, char *, maxargs); } if (dot_pos) { @@ -1257,8 +1276,7 @@ void read_args(int f_in, char *mod_name, char *buf, si int len = strlen(buf); if (request_len) request_p[0][request_len++] = ' '; - if (!(*request_p = realloc_array(*request_p, char, request_len + len + 1))) - out_of_memory("read_args"); + *request_p = realloc_array(*request_p, char, request_len + len + 1); memcpy(*request_p + request_len, buf, len + 1); request_len += len; } @@ -1267,8 +1285,7 @@ void read_args(int f_in, char *mod_name, char *buf, si else glob_expand(buf, &argv, &argc, &maxargs); } else { - if (!(p = strdup(buf))) - out_of_memory("read_args"); + p = strdup(buf); argv[argc++] = p; if (*p == '.' && p[1] == '\0') dot_pos = argc; @@ -1284,7 +1301,7 @@ void read_args(int f_in, char *mod_name, char *buf, si BOOL io_start_buffering_out(int f_out) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_start_buffering_out(%d)\n", who_am_i(), f_out); if (iobuf.out.buf) { @@ -1303,7 +1320,7 @@ BOOL io_start_buffering_out(int f_out) BOOL io_start_buffering_in(int f_in) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_start_buffering_in(%d)\n", who_am_i(), f_in); if (iobuf.in.buf) { @@ -1322,7 +1339,7 @@ BOOL io_start_buffering_in(int f_in) void io_end_buffering_in(BOOL free_buffers) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { rprintf(FINFO, "[%s] io_end_buffering_in(IOBUF_%s_BUFS)\n", who_am_i(), free_buffers ? "FREE" : "KEEP"); } @@ -1337,7 +1354,7 @@ void io_end_buffering_in(BOOL free_buffers) void io_end_buffering_out(BOOL free_buffers) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { rprintf(FINFO, "[%s] io_end_buffering_out(IOBUF_%s_BUFS)\n", who_am_i(), free_buffers ? "FREE" : "KEEP"); } @@ -1425,7 +1442,7 @@ static void read_a_msg(void) msg_bytes = tag & 0xFFFFFF; tag = (tag >> 24) - MPLEX_BASE; - if (DEBUG_GTE(IO, 1) && msgs2stderr) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 1)) rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes); switch (tag) { @@ -1481,6 +1498,32 @@ static void read_a_msg(void) if (am_sender) maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH); break; + case MSG_CHECKSUM: + /* This receives some checksum info that we want to make a note of + * (which allows a single process to do all the writing to the db). */ + if (msg_bytes != MSG_CHECKSUM_LEN) + goto overflow; + raw_read_buf(data, MSG_CHECKSUM_LEN); + if (am_generator && same_db) { + iobuf.in_multiplexed = 1; + send_msg(MSG_CHECKSUM, data, MSG_CHECKSUM_LEN, 0); + } if (am_receiver || (am_sender && !local_server)) + goto unexpected; + else { + /* The received data is a set of numbers followed by the checksum. */ + STRUCT_STAT st; + st.st_dev = IVAL64(data, 0); + st.st_ino = IVAL64(data, 8); + st.st_size = IVAL64(data, 16); + st.st_mtime = IVAL64(data, 24); + st.st_ctime = IVAL64(data, 32); +#if MSG_CHECKSUM_LONGS != 5 +#error Fix the parsing of checksum long values +#endif + iobuf.in_multiplexed = 1; + db_set_checksum(IVAL(data, MSG_CHECKSUM_LONGS*8), &st, data + MSG_CHECKSUM_LONGS*8 + 4); + } + break; case MSG_DELETED: if (msg_bytes >= sizeof data) goto overflow; @@ -1632,6 +1675,7 @@ static void read_a_msg(void) * with a duplicate exit message. */ _exit_cleanup(val, __FILE__, 0 - __LINE__); default: + unexpected: rprintf(FERROR, "unexpected tag %d [%s%s]\n", tag, who_am_i(), inc_recurse ? "/inc" : ""); exit_cleanup(RERR_STREAMIO); @@ -1982,13 +2026,14 @@ static void sleep_for_bwlimit(int bytes_written) total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024); } -void io_flush(int flush_it_all) +void io_flush(int flush_type) { if (iobuf.out.len > iobuf.out_empty_len) { - if (flush_it_all) /* FULL_FLUSH: flush everything in the output buffers */ + if (flush_type == FULL_FLUSH) /* flush everything in the output buffers */ perform_io(iobuf.out.size - iobuf.out_empty_len, PIO_NEED_OUTROOM); - else /* NORMAL_FLUSH: flush at least 1 byte */ + else if (flush_type == NORMAL_FLUSH) /* flush at least 1 byte */ perform_io(iobuf.out.size - iobuf.out.len + 1, PIO_NEED_OUTROOM); + /* MSG_FLUSH: flush iobuf.msg only */ } if (iobuf.msg.len) perform_io(iobuf.msg.size, PIO_NEED_MSGROOM); @@ -2013,20 +2058,20 @@ void write_varint(int f, int32 x) { char b[5]; uchar bit; - int cnt = 4; + int cnt; SIVAL(b, 1, x); - while (cnt > 1 && b[cnt] == 0) - cnt--; + for (cnt = 4; cnt > 1 && b[cnt] == 0; cnt--) {} bit = ((uchar)1<<(7-cnt+1)); + if (CVAL(b, cnt) >= bit) { cnt++; *b = ~(bit-1); } else if (cnt > 1) *b = b[cnt] | ~(bit*2-1); else - *b = b[cnt]; + *b = b[1]; write_buf(f, b, cnt); } @@ -2296,7 +2341,7 @@ void io_start_multiplex_out(int fd) { io_flush(FULL_FLUSH); - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_start_multiplex_out(%d)\n", who_am_i(), fd); if (!iobuf.msg.buf) @@ -2313,7 +2358,7 @@ void io_start_multiplex_out(int fd) /* Setup for multiplexing a MSG_* stream with the data stream. */ void io_start_multiplex_in(int fd) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_start_multiplex_in(%d)\n", who_am_i(), fd); iobuf.in_multiplexed = 1; /* See also IN_MULTIPLEXED */ @@ -2324,7 +2369,7 @@ int io_end_multiplex_in(int mode) { int ret = iobuf.in_multiplexed ? iobuf.in_fd : -1; - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_end_multiplex_in(mode=%d)\n", who_am_i(), mode); iobuf.in_multiplexed = 0; @@ -2342,7 +2387,7 @@ int io_end_multiplex_out(int mode) { int ret = iobuf.out_empty_len ? iobuf.out_fd : -1; - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_end_multiplex_out(mode=%d)\n", who_am_i(), mode); if (mode != MPLX_TO_BUFFERED) @@ -2368,7 +2413,7 @@ void start_write_batch(int fd) * is involved. */ write_int(batch_fd, protocol_version); if (protocol_version >= 30) - write_byte(batch_fd, compat_flags); + write_varint(batch_fd, compat_flags); write_int(batch_fd, checksum_seed); if (am_sender)