|
version 1.1.1.3, 2016/11/01 09:54:32
|
version 1.1.1.4, 2021/03/17 00:32:36
|
|
Line 4
|
Line 4
|
| * Copyright (C) 1996-2001 Andrew Tridgell |
* Copyright (C) 1996-2001 Andrew Tridgell |
| * Copyright (C) 1996 Paul Mackerras |
* Copyright (C) 1996 Paul Mackerras |
| * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org> |
* Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org> |
| * Copyright (C) 2003-2015 Wayne Davison | * Copyright (C) 2003-2020 Wayne Davison |
| * |
* |
| * This program is free software; you can redistribute it and/or modify |
* This program is free software; you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
* it under the terms of the GNU General Public License as published by |
|
Line 41 extern int am_server;
|
Line 41 extern int am_server;
|
| extern int am_sender; |
extern int am_sender; |
| extern int am_receiver; |
extern int am_receiver; |
| extern int am_generator; |
extern int am_generator; |
| |
extern int local_server; |
| extern int msgs2stderr; |
extern int msgs2stderr; |
| extern int inc_recurse; |
extern int inc_recurse; |
| |
extern int same_db; |
| extern int io_error; |
extern int io_error; |
| |
extern int batch_fd; |
| extern int eol_nulls; |
extern int eol_nulls; |
| extern int flist_eof; |
extern int flist_eof; |
| extern int file_total; |
extern int file_total; |
|
Line 53 extern int read_batch;
|
Line 56 extern int read_batch;
|
| extern int compat_flags; |
extern int compat_flags; |
| extern int protect_args; |
extern int protect_args; |
| extern int checksum_seed; |
extern int checksum_seed; |
| |
extern int checksum_files; |
| |
extern int daemon_connection; |
| extern int protocol_version; |
extern int protocol_version; |
| extern int remove_source_files; |
extern int remove_source_files; |
| extern int preserve_hard_links; |
extern int preserve_hard_links; |
| extern BOOL extra_flist_sending_enabled; |
extern BOOL extra_flist_sending_enabled; |
| extern BOOL flush_ok_after_signal; |
extern BOOL flush_ok_after_signal; |
| extern struct stats stats; |
extern struct stats stats; |
| |
extern time_t stop_at_utime; |
| extern struct file_list *cur_flist; |
extern struct file_list *cur_flist; |
| #ifdef ICONV_OPTION |
#ifdef ICONV_OPTION |
| extern int filesfrom_convert; |
extern int filesfrom_convert; |
|
Line 67 extern iconv_t ic_send, ic_recv;
|
Line 73 extern iconv_t ic_send, ic_recv;
|
| |
|
| int csum_length = SHORT_SUM_LENGTH; /* initial value */ |
int csum_length = SHORT_SUM_LENGTH; /* initial value */ |
| int allowed_lull = 0; |
int allowed_lull = 0; |
| int batch_fd = -1; |
|
| int msgdone_cnt = 0; |
int msgdone_cnt = 0; |
| int forward_flist_data = 0; |
int forward_flist_data = 0; |
| BOOL flist_receiving_enabled = False; |
BOOL flist_receiving_enabled = False; |
|
Line 251 static size_t safe_read(int fd, char *buf, size_t len)
|
Line 256 static size_t safe_read(int fd, char *buf, size_t len)
|
| cnt = select(fd+1, &r_fds, NULL, &e_fds, &tv); |
cnt = select(fd+1, &r_fds, NULL, &e_fds, &tv); |
| if (cnt <= 0) { |
if (cnt <= 0) { |
| if (cnt < 0 && errno == EBADF) { |
if (cnt < 0 && errno == EBADF) { |
| rsyserr(FERROR, errno, "safe_read select failed [%s]", | rsyserr(FERROR, errno, "safe_read select failed"); |
| who_am_i()); | |
| exit_cleanup(RERR_FILEIO); |
exit_cleanup(RERR_FILEIO); |
| } |
} |
| check_timeout(1, MSK_ALLOW_FLUSH); |
check_timeout(1, MSK_ALLOW_FLUSH); |
|
Line 271 static size_t safe_read(int fd, char *buf, size_t len)
|
Line 275 static size_t safe_read(int fd, char *buf, size_t len)
|
| if (n < 0) { |
if (n < 0) { |
| if (errno == EINTR) |
if (errno == EINTR) |
| continue; |
continue; |
| rsyserr(FERROR, errno, "safe_read failed to read %ld bytes [%s]", | rsyserr(FERROR, errno, "safe_read failed to read %ld bytes", (long)len); |
| (long)len, who_am_i()); | |
| exit_cleanup(RERR_STREAMIO); |
exit_cleanup(RERR_STREAMIO); |
| } |
} |
| if ((got += (size_t)n) == len) |
if ((got += (size_t)n) == len) |
|
Line 315 static void safe_write(int fd, const char *buf, size_t
|
Line 318 static void safe_write(int fd, const char *buf, size_t
|
| if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { |
if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { |
| write_failed: |
write_failed: |
| rsyserr(FERROR, errno, |
rsyserr(FERROR, errno, |
| "safe_write failed to write %ld bytes to %s [%s]", | "safe_write failed to write %ld bytes to %s", |
| (long)len, what_fd_is(fd), who_am_i()); | (long)len, what_fd_is(fd)); |
| exit_cleanup(RERR_STREAMIO); |
exit_cleanup(RERR_STREAMIO); |
| } |
} |
| } else { |
} else { |
|
Line 337 static void safe_write(int fd, const char *buf, size_t
|
Line 340 static void safe_write(int fd, const char *buf, size_t
|
| cnt = select(fd + 1, NULL, &w_fds, NULL, &tv); |
cnt = select(fd + 1, NULL, &w_fds, NULL, &tv); |
| if (cnt <= 0) { |
if (cnt <= 0) { |
| if (cnt < 0 && errno == EBADF) { |
if (cnt < 0 && errno == EBADF) { |
| rsyserr(FERROR, errno, "safe_write select failed on %s [%s]", | rsyserr(FERROR, errno, "safe_write select failed on %s", what_fd_is(fd)); |
| what_fd_is(fd), who_am_i()); | |
| exit_cleanup(RERR_FILEIO); |
exit_cleanup(RERR_FILEIO); |
| } |
} |
| if (io_timeout) |
if (io_timeout) |
|
Line 466 void reduce_iobuf_size(xbuf *out, size_t new_size)
|
Line 468 void reduce_iobuf_size(xbuf *out, size_t new_size)
|
| { |
{ |
| if (new_size < out->size) { |
if (new_size < out->size) { |
| /* Avoid weird buffer interactions by only outputting this to stderr. */ |
/* Avoid weird buffer interactions by only outputting this to stderr. */ |
| if (msgs2stderr && DEBUG_GTE(IO, 4)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 4)) { |
| const char *name = out == &iobuf.out ? "iobuf.out" |
const char *name = out == &iobuf.out ? "iobuf.out" |
| : out == &iobuf.msg ? "iobuf.msg" |
: out == &iobuf.msg ? "iobuf.msg" |
| : NULL; |
: NULL; |
|
Line 484 void restore_iobuf_size(xbuf *out)
|
Line 486 void restore_iobuf_size(xbuf *out)
|
| if (IOBUF_WAS_REDUCED(out->size)) { |
if (IOBUF_WAS_REDUCED(out->size)) { |
| size_t new_size = IOBUF_RESTORE_SIZE(out->size); |
size_t new_size = IOBUF_RESTORE_SIZE(out->size); |
| /* Avoid weird buffer interactions by only outputting this to stderr. */ |
/* Avoid weird buffer interactions by only outputting this to stderr. */ |
| if (msgs2stderr && DEBUG_GTE(IO, 4)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 4)) { |
| const char *name = out == &iobuf.out ? "iobuf.out" |
const char *name = out == &iobuf.out ? "iobuf.out" |
| : out == &iobuf.msg ? "iobuf.msg" |
: out == &iobuf.msg ? "iobuf.msg" |
| : NULL; |
: NULL; |
|
Line 568 static char *perform_io(size_t needed, int flags)
|
Line 570 static char *perform_io(size_t needed, int flags)
|
| exit_cleanup(RERR_PROTOCOL); |
exit_cleanup(RERR_PROTOCOL); |
| } |
} |
| |
|
| if (msgs2stderr && DEBUG_GTE(IO, 3)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { |
| rprintf(FINFO, "[%s] perform_io(%ld, %sinput)\n", |
rprintf(FINFO, "[%s] perform_io(%ld, %sinput)\n", |
| who_am_i(), (long)needed, flags & PIO_CONSUME_INPUT ? "consume&" : ""); |
who_am_i(), (long)needed, flags & PIO_CONSUME_INPUT ? "consume&" : ""); |
| } |
} |
|
Line 582 static char *perform_io(size_t needed, int flags)
|
Line 584 static char *perform_io(size_t needed, int flags)
|
| exit_cleanup(RERR_PROTOCOL); |
exit_cleanup(RERR_PROTOCOL); |
| } |
} |
| |
|
| if (msgs2stderr && DEBUG_GTE(IO, 3)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { |
| rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n", |
rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n", |
| who_am_i(), (long)needed, |
who_am_i(), (long)needed, |
| iobuf.out.len + needed > iobuf.out.size |
iobuf.out.len + needed > iobuf.out.size |
|
Line 598 static char *perform_io(size_t needed, int flags)
|
Line 600 static char *perform_io(size_t needed, int flags)
|
| exit_cleanup(RERR_PROTOCOL); |
exit_cleanup(RERR_PROTOCOL); |
| } |
} |
| |
|
| if (msgs2stderr && DEBUG_GTE(IO, 3)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { |
| rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n", |
rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n", |
| who_am_i(), (long)needed, |
who_am_i(), (long)needed, |
| iobuf.msg.len + needed > iobuf.msg.size |
iobuf.msg.len + needed > iobuf.msg.size |
|
Line 607 static char *perform_io(size_t needed, int flags)
|
Line 609 static char *perform_io(size_t needed, int flags)
|
| break; |
break; |
| |
|
| case 0: |
case 0: |
| if (msgs2stderr && DEBUG_GTE(IO, 3)) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) |
| rprintf(FINFO, "[%s] perform_io(%ld, %d)\n", who_am_i(), (long)needed, flags); |
rprintf(FINFO, "[%s] perform_io(%ld, %d)\n", who_am_i(), (long)needed, flags); |
| break; |
break; |
| |
|
|
Line 665 static char *perform_io(size_t needed, int flags)
|
Line 667 static char *perform_io(size_t needed, int flags)
|
| SIVAL(iobuf.out.buf + iobuf.raw_data_header_pos, 0, |
SIVAL(iobuf.out.buf + iobuf.raw_data_header_pos, 0, |
| ((MPLEX_BASE + (int)MSG_DATA)<<24) + iobuf.out.len - 4); |
((MPLEX_BASE + (int)MSG_DATA)<<24) + iobuf.out.len - 4); |
| |
|
| if (msgs2stderr && DEBUG_GTE(IO, 1)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 1)) { |
| rprintf(FINFO, "[%s] send_msg(%d, %ld)\n", |
rprintf(FINFO, "[%s] send_msg(%d, %ld)\n", |
| who_am_i(), (int)MSG_DATA, (long)iobuf.out.len - 4); |
who_am_i(), (int)MSG_DATA, (long)iobuf.out.len - 4); |
| } |
} |
|
Line 785 static char *perform_io(size_t needed, int flags)
|
Line 787 static char *perform_io(size_t needed, int flags)
|
| exit_cleanup(RERR_SOCKETIO); |
exit_cleanup(RERR_SOCKETIO); |
| } |
} |
| } |
} |
| if (msgs2stderr && DEBUG_GTE(IO, 2)) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) |
| rprintf(FINFO, "[%s] recv=%ld\n", who_am_i(), (long)n); |
rprintf(FINFO, "[%s] recv=%ld\n", who_am_i(), (long)n); |
| |
|
| if (io_timeout) { | if (io_timeout || stop_at_utime) { |
| last_io_in = time(NULL); |
last_io_in = time(NULL); |
| if (flags & PIO_NEED_INPUT) | if (stop_at_utime && last_io_in >= stop_at_utime) { |
| | rprintf(FERROR, "stopping at requested limit\n"); |
| | exit_cleanup(RERR_TIMEOUT); |
| | } |
| | if (io_timeout && flags & PIO_NEED_INPUT) |
| maybe_send_keepalive(last_io_in, 0); |
maybe_send_keepalive(last_io_in, 0); |
| } |
} |
| stats.total_read += n; |
stats.total_read += n; |
|
Line 815 static char *perform_io(size_t needed, int flags)
|
Line 821 static char *perform_io(size_t needed, int flags)
|
| msgs2stderr = 1; |
msgs2stderr = 1; |
| iobuf.out_fd = -2; |
iobuf.out_fd = -2; |
| iobuf.out.len = iobuf.msg.len = iobuf.raw_flushing_ends_before = 0; |
iobuf.out.len = iobuf.msg.len = iobuf.raw_flushing_ends_before = 0; |
| rsyserr(FERROR_SOCKET, errno, "[%s] write error", who_am_i()); | rsyserr(FERROR_SOCKET, errno, "write error"); |
| drain_multiplex_messages(); |
drain_multiplex_messages(); |
| exit_cleanup(RERR_SOCKETIO); |
exit_cleanup(RERR_SOCKETIO); |
| } |
} |
| } |
} |
| if (msgs2stderr && DEBUG_GTE(IO, 2)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { |
| rprintf(FINFO, "[%s] %s sent=%ld\n", |
rprintf(FINFO, "[%s] %s sent=%ld\n", |
| who_am_i(), out == &iobuf.out ? "out" : "msg", (long)n); |
who_am_i(), out == &iobuf.out ? "out" : "msg", (long)n); |
| } |
} |
|
Line 918 void noop_io_until_death(void)
|
Line 924 void noop_io_until_death(void)
|
| if (!iobuf.in.buf || !iobuf.out.buf || iobuf.in_fd < 0 || iobuf.out_fd < 0 || kluge_around_eof) |
if (!iobuf.in.buf || !iobuf.out.buf || iobuf.in_fd < 0 || iobuf.out_fd < 0 || kluge_around_eof) |
| return; |
return; |
| |
|
| |
/* If we're talking to a daemon over a socket, don't short-circuit this logic */ |
| |
if (msgs2stderr && daemon_connection >= 0) |
| |
return; |
| |
|
| kluge_around_eof = 2; |
kluge_around_eof = 2; |
| /* Setting an I/O timeout ensures that if something inexplicably weird |
/* Setting an I/O timeout ensures that if something inexplicably weird |
| * happens, we won't hang around forever. */ |
* happens, we won't hang around forever. */ |
|
Line 933 int send_msg(enum msgcode code, const char *buf, size_
|
Line 943 int send_msg(enum msgcode code, const char *buf, size_
|
| { |
{ |
| char *hdr; |
char *hdr; |
| size_t needed, pos; |
size_t needed, pos; |
| BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr || code != MSG_INFO); | BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr == 1 || code != MSG_INFO); |
| |
|
| if (!OUT_MULTIPLEXED) |
if (!OUT_MULTIPLEXED) |
| return 0; |
return 0; |
|
Line 954 int send_msg(enum msgcode code, const char *buf, size_
|
Line 964 int send_msg(enum msgcode code, const char *buf, size_
|
| } else |
} else |
| #endif |
#endif |
| needed = len + 4 + 3; |
needed = len + 4 + 3; |
| if (iobuf.msg.len + needed > iobuf.msg.size) | if (iobuf.msg.len + needed > iobuf.msg.size) { |
| perform_io(needed, PIO_NEED_MSGROOM); | if (!am_receiver) |
| | perform_io(needed, PIO_NEED_MSGROOM); |
| | else { /* We allow the receiver to increase their iobuf.msg size to avoid a deadlock. */ |
| | size_t old_size = iobuf.msg.size; |
| | restore_iobuf_size(&iobuf.msg); |
| | realloc_xbuf(&iobuf.msg, iobuf.msg.size * 2); |
| | if (iobuf.msg.pos + iobuf.msg.len > old_size) |
| | memcpy(iobuf.msg.buf + old_size, iobuf.msg.buf, iobuf.msg.pos + iobuf.msg.len - old_size); |
| | } |
| | } |
| |
|
| pos = iobuf.msg.pos + iobuf.msg.len; /* Must be set after any flushing. */ |
pos = iobuf.msg.pos + iobuf.msg.len; /* Must be set after any flushing. */ |
| if (pos >= iobuf.msg.size) |
if (pos >= iobuf.msg.size) |
|
Line 1050 static void got_flist_entry_status(enum festatus statu
|
Line 1069 static void got_flist_entry_status(enum festatus statu
|
| if (inc_recurse) |
if (inc_recurse) |
| flist->in_progress++; |
flist->in_progress++; |
| } |
} |
| |
} else if (checksum_files & CSF_UPDATE) { |
| |
struct file_struct *file = flist->files[ndx - flist->ndx_start]; |
| |
set_cached_checksum(flist, file); |
| } |
} |
| #endif |
#endif |
| break; |
break; |
|
Line 1113 static void check_for_d_option_error(const char *msg)
|
Line 1135 static void check_for_d_option_error(const char *msg)
|
| } |
} |
| |
|
| if (saw_d) { |
if (saw_d) { |
| rprintf(FWARNING, | rprintf(FWARNING, "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n"); |
| "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n"); | |
| } |
} |
| } |
} |
| |
|
|
Line 1176 int read_line(int fd, char *buf, size_t bufsiz, int fl
|
Line 1197 int read_line(int fd, char *buf, size_t bufsiz, int fl
|
| |
|
| #ifdef ICONV_OPTION |
#ifdef ICONV_OPTION |
| if (flags & RL_CONVERT && iconv_buf.size < bufsiz) |
if (flags & RL_CONVERT && iconv_buf.size < bufsiz) |
| realloc_xbuf(&iconv_buf, bufsiz + 1024); | realloc_xbuf(&iconv_buf, ROUND_UP_1024(bufsiz) + 1024); |
| #endif |
#endif |
| |
|
| start: |
start: |
|
Line 1234 void read_args(int f_in, char *mod_name, char *buf, si
|
Line 1255 void read_args(int f_in, char *mod_name, char *buf, si
|
| rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0); |
rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0); |
| #endif |
#endif |
| |
|
| if (!(argv = new_array(char *, maxargs))) | argv = new_array(char *, maxargs); |
| out_of_memory("read_args"); | |
| if (mod_name && !protect_args) |
if (mod_name && !protect_args) |
| argv[argc++] = "rsyncd"; |
argv[argc++] = "rsyncd"; |
| |
|
|
Line 1248 void read_args(int f_in, char *mod_name, char *buf, si
|
Line 1268 void read_args(int f_in, char *mod_name, char *buf, si
|
| |
|
| if (argc == maxargs-1) { |
if (argc == maxargs-1) { |
| maxargs += MAX_ARGS; |
maxargs += MAX_ARGS; |
| if (!(argv = realloc_array(argv, char *, maxargs))) | argv = realloc_array(argv, char *, maxargs); |
| out_of_memory("read_args"); | |
| } |
} |
| |
|
| if (dot_pos) { |
if (dot_pos) { |
|
Line 1257 void read_args(int f_in, char *mod_name, char *buf, si
|
Line 1276 void read_args(int f_in, char *mod_name, char *buf, si
|
| int len = strlen(buf); |
int len = strlen(buf); |
| if (request_len) |
if (request_len) |
| request_p[0][request_len++] = ' '; |
request_p[0][request_len++] = ' '; |
| if (!(*request_p = realloc_array(*request_p, char, request_len + len + 1))) | *request_p = realloc_array(*request_p, char, request_len + len + 1); |
| out_of_memory("read_args"); | |
| memcpy(*request_p + request_len, buf, len + 1); |
memcpy(*request_p + request_len, buf, len + 1); |
| request_len += len; |
request_len += len; |
| } |
} |
|
Line 1267 void read_args(int f_in, char *mod_name, char *buf, si
|
Line 1285 void read_args(int f_in, char *mod_name, char *buf, si
|
| else |
else |
| glob_expand(buf, &argv, &argc, &maxargs); |
glob_expand(buf, &argv, &argc, &maxargs); |
| } else { |
} else { |
| if (!(p = strdup(buf))) | p = strdup(buf); |
| out_of_memory("read_args"); | |
| argv[argc++] = p; |
argv[argc++] = p; |
| if (*p == '.' && p[1] == '\0') |
if (*p == '.' && p[1] == '\0') |
| dot_pos = argc; |
dot_pos = argc; |
|
Line 1284 void read_args(int f_in, char *mod_name, char *buf, si
|
Line 1301 void read_args(int f_in, char *mod_name, char *buf, si
|
| |
|
| BOOL io_start_buffering_out(int f_out) |
BOOL io_start_buffering_out(int f_out) |
| { |
{ |
| if (msgs2stderr && DEBUG_GTE(IO, 2)) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) |
| rprintf(FINFO, "[%s] io_start_buffering_out(%d)\n", who_am_i(), f_out); |
rprintf(FINFO, "[%s] io_start_buffering_out(%d)\n", who_am_i(), f_out); |
| |
|
| if (iobuf.out.buf) { |
if (iobuf.out.buf) { |
|
Line 1303 BOOL io_start_buffering_out(int f_out)
|
Line 1320 BOOL io_start_buffering_out(int f_out)
|
| |
|
| BOOL io_start_buffering_in(int f_in) |
BOOL io_start_buffering_in(int f_in) |
| { |
{ |
| if (msgs2stderr && DEBUG_GTE(IO, 2)) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) |
| rprintf(FINFO, "[%s] io_start_buffering_in(%d)\n", who_am_i(), f_in); |
rprintf(FINFO, "[%s] io_start_buffering_in(%d)\n", who_am_i(), f_in); |
| |
|
| if (iobuf.in.buf) { |
if (iobuf.in.buf) { |
|
Line 1322 BOOL io_start_buffering_in(int f_in)
|
Line 1339 BOOL io_start_buffering_in(int f_in)
|
| |
|
| void io_end_buffering_in(BOOL free_buffers) |
void io_end_buffering_in(BOOL free_buffers) |
| { |
{ |
| if (msgs2stderr && DEBUG_GTE(IO, 2)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { |
| rprintf(FINFO, "[%s] io_end_buffering_in(IOBUF_%s_BUFS)\n", |
rprintf(FINFO, "[%s] io_end_buffering_in(IOBUF_%s_BUFS)\n", |
| who_am_i(), free_buffers ? "FREE" : "KEEP"); |
who_am_i(), free_buffers ? "FREE" : "KEEP"); |
| } |
} |
|
Line 1337 void io_end_buffering_in(BOOL free_buffers)
|
Line 1354 void io_end_buffering_in(BOOL free_buffers)
|
| |
|
| void io_end_buffering_out(BOOL free_buffers) |
void io_end_buffering_out(BOOL free_buffers) |
| { |
{ |
| if (msgs2stderr && DEBUG_GTE(IO, 2)) { | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { |
| rprintf(FINFO, "[%s] io_end_buffering_out(IOBUF_%s_BUFS)\n", |
rprintf(FINFO, "[%s] io_end_buffering_out(IOBUF_%s_BUFS)\n", |
| who_am_i(), free_buffers ? "FREE" : "KEEP"); |
who_am_i(), free_buffers ? "FREE" : "KEEP"); |
| } |
} |
|
Line 1425 static void read_a_msg(void)
|
Line 1442 static void read_a_msg(void)
|
| msg_bytes = tag & 0xFFFFFF; |
msg_bytes = tag & 0xFFFFFF; |
| tag = (tag >> 24) - MPLEX_BASE; |
tag = (tag >> 24) - MPLEX_BASE; |
| |
|
| if (DEBUG_GTE(IO, 1) && msgs2stderr) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 1)) |
| rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes); |
rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes); |
| |
|
| switch (tag) { |
switch (tag) { |
|
Line 1481 static void read_a_msg(void)
|
Line 1498 static void read_a_msg(void)
|
| if (am_sender) |
if (am_sender) |
| maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH); |
maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH); |
| break; |
break; |
| |
case MSG_CHECKSUM: |
| |
/* This receives some checksum info that we want to make a note of |
| |
* (which allows a single process to do all the writing to the db). */ |
| |
if (msg_bytes != MSG_CHECKSUM_LEN) |
| |
goto overflow; |
| |
raw_read_buf(data, MSG_CHECKSUM_LEN); |
| |
if (am_generator && same_db) { |
| |
iobuf.in_multiplexed = 1; |
| |
send_msg(MSG_CHECKSUM, data, MSG_CHECKSUM_LEN, 0); |
| |
} if (am_receiver || (am_sender && !local_server)) |
| |
goto unexpected; |
| |
else { |
| |
/* The received data is a set of numbers followed by the checksum. */ |
| |
STRUCT_STAT st; |
| |
st.st_dev = IVAL64(data, 0); |
| |
st.st_ino = IVAL64(data, 8); |
| |
st.st_size = IVAL64(data, 16); |
| |
st.st_mtime = IVAL64(data, 24); |
| |
st.st_ctime = IVAL64(data, 32); |
| |
#if MSG_CHECKSUM_LONGS != 5 |
| |
#error Fix the parsing of checksum long values |
| |
#endif |
| |
iobuf.in_multiplexed = 1; |
| |
db_set_checksum(IVAL(data, MSG_CHECKSUM_LONGS*8), &st, data + MSG_CHECKSUM_LONGS*8 + 4); |
| |
} |
| |
break; |
| case MSG_DELETED: |
case MSG_DELETED: |
| if (msg_bytes >= sizeof data) |
if (msg_bytes >= sizeof data) |
| goto overflow; |
goto overflow; |
|
Line 1632 static void read_a_msg(void)
|
Line 1675 static void read_a_msg(void)
|
| * with a duplicate exit message. */ |
* with a duplicate exit message. */ |
| _exit_cleanup(val, __FILE__, 0 - __LINE__); |
_exit_cleanup(val, __FILE__, 0 - __LINE__); |
| default: |
default: |
| |
unexpected: |
| rprintf(FERROR, "unexpected tag %d [%s%s]\n", |
rprintf(FERROR, "unexpected tag %d [%s%s]\n", |
| tag, who_am_i(), inc_recurse ? "/inc" : ""); |
tag, who_am_i(), inc_recurse ? "/inc" : ""); |
| exit_cleanup(RERR_STREAMIO); |
exit_cleanup(RERR_STREAMIO); |
|
Line 1982 static void sleep_for_bwlimit(int bytes_written)
|
Line 2026 static void sleep_for_bwlimit(int bytes_written)
|
| total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024); |
total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024); |
| } |
} |
| |
|
| void io_flush(int flush_it_all) | void io_flush(int flush_type) |
| { |
{ |
| if (iobuf.out.len > iobuf.out_empty_len) { |
if (iobuf.out.len > iobuf.out_empty_len) { |
| if (flush_it_all) /* FULL_FLUSH: flush everything in the output buffers */ | if (flush_type == FULL_FLUSH) /* flush everything in the output buffers */ |
| perform_io(iobuf.out.size - iobuf.out_empty_len, PIO_NEED_OUTROOM); |
perform_io(iobuf.out.size - iobuf.out_empty_len, PIO_NEED_OUTROOM); |
| else /* NORMAL_FLUSH: flush at least 1 byte */ | else if (flush_type == NORMAL_FLUSH) /* flush at least 1 byte */ |
| perform_io(iobuf.out.size - iobuf.out.len + 1, PIO_NEED_OUTROOM); |
perform_io(iobuf.out.size - iobuf.out.len + 1, PIO_NEED_OUTROOM); |
| |
/* MSG_FLUSH: flush iobuf.msg only */ |
| } |
} |
| if (iobuf.msg.len) |
if (iobuf.msg.len) |
| perform_io(iobuf.msg.size, PIO_NEED_MSGROOM); |
perform_io(iobuf.msg.size, PIO_NEED_MSGROOM); |
|
Line 2013 void write_varint(int f, int32 x)
|
Line 2058 void write_varint(int f, int32 x)
|
| { |
{ |
| char b[5]; |
char b[5]; |
| uchar bit; |
uchar bit; |
| int cnt = 4; | int cnt; |
| |
|
| SIVAL(b, 1, x); |
SIVAL(b, 1, x); |
| |
|
| while (cnt > 1 && b[cnt] == 0) | for (cnt = 4; cnt > 1 && b[cnt] == 0; cnt--) {} |
| cnt--; | |
| bit = ((uchar)1<<(7-cnt+1)); |
bit = ((uchar)1<<(7-cnt+1)); |
| |
|
| if (CVAL(b, cnt) >= bit) { |
if (CVAL(b, cnt) >= bit) { |
| cnt++; |
cnt++; |
| *b = ~(bit-1); |
*b = ~(bit-1); |
| } else if (cnt > 1) |
} else if (cnt > 1) |
| *b = b[cnt] | ~(bit*2-1); |
*b = b[cnt] | ~(bit*2-1); |
| else |
else |
| *b = b[cnt]; | *b = b[1]; |
| |
|
| write_buf(f, b, cnt); |
write_buf(f, b, cnt); |
| } |
} |
|
Line 2296 void io_start_multiplex_out(int fd)
|
Line 2341 void io_start_multiplex_out(int fd)
|
| { |
{ |
| io_flush(FULL_FLUSH); |
io_flush(FULL_FLUSH); |
| |
|
| if (msgs2stderr && DEBUG_GTE(IO, 2)) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) |
| rprintf(FINFO, "[%s] io_start_multiplex_out(%d)\n", who_am_i(), fd); |
rprintf(FINFO, "[%s] io_start_multiplex_out(%d)\n", who_am_i(), fd); |
| |
|
| if (!iobuf.msg.buf) |
if (!iobuf.msg.buf) |
|
Line 2313 void io_start_multiplex_out(int fd)
|
Line 2358 void io_start_multiplex_out(int fd)
|
| /* Setup for multiplexing a MSG_* stream with the data stream. */ |
/* Setup for multiplexing a MSG_* stream with the data stream. */ |
| void io_start_multiplex_in(int fd) |
void io_start_multiplex_in(int fd) |
| { |
{ |
| if (msgs2stderr && DEBUG_GTE(IO, 2)) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) |
| rprintf(FINFO, "[%s] io_start_multiplex_in(%d)\n", who_am_i(), fd); |
rprintf(FINFO, "[%s] io_start_multiplex_in(%d)\n", who_am_i(), fd); |
| |
|
| iobuf.in_multiplexed = 1; /* See also IN_MULTIPLEXED */ |
iobuf.in_multiplexed = 1; /* See also IN_MULTIPLEXED */ |
|
Line 2324 int io_end_multiplex_in(int mode)
|
Line 2369 int io_end_multiplex_in(int mode)
|
| { |
{ |
| int ret = iobuf.in_multiplexed ? iobuf.in_fd : -1; |
int ret = iobuf.in_multiplexed ? iobuf.in_fd : -1; |
| |
|
| if (msgs2stderr && DEBUG_GTE(IO, 2)) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) |
| rprintf(FINFO, "[%s] io_end_multiplex_in(mode=%d)\n", who_am_i(), mode); |
rprintf(FINFO, "[%s] io_end_multiplex_in(mode=%d)\n", who_am_i(), mode); |
| |
|
| iobuf.in_multiplexed = 0; |
iobuf.in_multiplexed = 0; |
|
Line 2342 int io_end_multiplex_out(int mode)
|
Line 2387 int io_end_multiplex_out(int mode)
|
| { |
{ |
| int ret = iobuf.out_empty_len ? iobuf.out_fd : -1; |
int ret = iobuf.out_empty_len ? iobuf.out_fd : -1; |
| |
|
| if (msgs2stderr && DEBUG_GTE(IO, 2)) | if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) |
| rprintf(FINFO, "[%s] io_end_multiplex_out(mode=%d)\n", who_am_i(), mode); |
rprintf(FINFO, "[%s] io_end_multiplex_out(mode=%d)\n", who_am_i(), mode); |
| |
|
| if (mode != MPLX_TO_BUFFERED) |
if (mode != MPLX_TO_BUFFERED) |
|
Line 2368 void start_write_batch(int fd)
|
Line 2413 void start_write_batch(int fd)
|
| * is involved. */ |
* is involved. */ |
| write_int(batch_fd, protocol_version); |
write_int(batch_fd, protocol_version); |
| if (protocol_version >= 30) |
if (protocol_version >= 30) |
| write_byte(batch_fd, compat_flags); | write_varint(batch_fd, compat_flags); |
| write_int(batch_fd, checksum_seed); |
write_int(batch_fd, checksum_seed); |
| |
|
| if (am_sender) |
if (am_sender) |