--- embedaddon/rsync/io.c 2013/10/14 07:51:14 1.1.1.2 +++ embedaddon/rsync/io.c 2021/03/17 00:32:36 1.1.1.4 @@ -4,7 +4,7 @@ * Copyright (C) 1996-2001 Andrew Tridgell * Copyright (C) 1996 Paul Mackerras * Copyright (C) 2001, 2002 Martin Pool - * Copyright (C) 2003-2013 Wayne Davison + * Copyright (C) 2003-2020 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -41,9 +41,12 @@ extern int am_server; extern int am_sender; extern int am_receiver; extern int am_generator; +extern int local_server; extern int msgs2stderr; extern int inc_recurse; +extern int same_db; extern int io_error; +extern int batch_fd; extern int eol_nulls; extern int flist_eof; extern int file_total; @@ -53,12 +56,15 @@ extern int read_batch; extern int compat_flags; extern int protect_args; extern int checksum_seed; +extern int checksum_files; +extern int daemon_connection; extern int protocol_version; extern int remove_source_files; extern int preserve_hard_links; extern BOOL extra_flist_sending_enabled; extern BOOL flush_ok_after_signal; extern struct stats stats; +extern time_t stop_at_utime; extern struct file_list *cur_flist; #ifdef ICONV_OPTION extern int filesfrom_convert; @@ -67,7 +73,6 @@ extern iconv_t ic_send, ic_recv; int csum_length = SHORT_SUM_LENGTH; /* initial value */ int allowed_lull = 0; -int batch_fd = -1; int msgdone_cnt = 0; int forward_flist_data = 0; BOOL flist_receiving_enabled = False; @@ -155,7 +160,7 @@ static void read_a_msg(void); static void drain_multiplex_messages(void); static void sleep_for_bwlimit(int bytes_written); -static void check_timeout(BOOL allow_keepalive) +static void check_timeout(BOOL allow_keepalive, int keepalive_flags) { time_t t, chk; @@ -177,7 +182,7 @@ static void check_timeout(BOOL allow_keepalive) if (allow_keepalive) { /* This may put data into iobuf.msg w/o flushing. */ - maybe_send_keepalive(t, 0); + maybe_send_keepalive(t, keepalive_flags); } if (!last_io_in) @@ -232,28 +237,10 @@ static NORETURN void whine_about_eof(BOOL allow_kluge) * the socket except very early in the transfer. */ static size_t safe_read(int fd, char *buf, size_t len) { - size_t got; - int n; + size_t got = 0; assert(fd != iobuf.in_fd); - n = read(fd, buf, len); - if ((size_t)n == len || n == 0) { - if (DEBUG_GTE(IO, 2)) - rprintf(FINFO, "[%s] safe_read(%d)=%ld\n", who_am_i(), fd, (long)n); - return n; - } - if (n < 0) { - if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { - read_failed: - rsyserr(FERROR, errno, "safe_read failed to read %ld bytes [%s]", - (long)len, who_am_i()); - exit_cleanup(RERR_STREAMIO); - } - got = 0; - } else - got = n; - while (1) { struct timeval tv; fd_set r_fds, e_fds; @@ -269,12 +256,10 @@ static size_t safe_read(int fd, char *buf, size_t len) cnt = select(fd+1, &r_fds, NULL, &e_fds, &tv); if (cnt <= 0) { if (cnt < 0 && errno == EBADF) { - rsyserr(FERROR, errno, "safe_read select failed [%s]", - who_am_i()); + rsyserr(FERROR, errno, "safe_read select failed"); exit_cleanup(RERR_FILEIO); } - if (io_timeout) - maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH); + check_timeout(1, MSK_ALLOW_FLUSH); continue; } @@ -282,7 +267,7 @@ static size_t safe_read(int fd, char *buf, size_t len) rprintf(FINFO, "select exception on fd %d\n", fd); */ if (FD_ISSET(fd, &r_fds)) { - n = read(fd, buf + got, len - got); + int n = read(fd, buf + got, len - got); if (DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] safe_read(%d)=%ld\n", who_am_i(), fd, (long)n); if (n == 0) @@ -290,7 +275,8 @@ static size_t safe_read(int fd, char *buf, size_t len) if (n < 0) { if (errno == EINTR) continue; - goto read_failed; + rsyserr(FERROR, errno, "safe_read failed to read %ld bytes", (long)len); + exit_cleanup(RERR_STREAMIO); } if ((got += (size_t)n) == len) break; @@ -332,8 +318,8 @@ static void safe_write(int fd, const char *buf, size_t if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) { write_failed: rsyserr(FERROR, errno, - "safe_write failed to write %ld bytes to %s [%s]", - (long)len, what_fd_is(fd), who_am_i()); + "safe_write failed to write %ld bytes to %s", + (long)len, what_fd_is(fd)); exit_cleanup(RERR_STREAMIO); } } else { @@ -354,8 +340,7 @@ static void safe_write(int fd, const char *buf, size_t cnt = select(fd + 1, NULL, &w_fds, NULL, &tv); if (cnt <= 0) { if (cnt < 0 && errno == EBADF) { - rsyserr(FERROR, errno, "safe_write select failed on %s [%s]", - what_fd_is(fd), who_am_i()); + rsyserr(FERROR, errno, "safe_write select failed on %s", what_fd_is(fd)); exit_cleanup(RERR_FILEIO); } if (io_timeout) @@ -483,7 +468,7 @@ void reduce_iobuf_size(xbuf *out, size_t new_size) { if (new_size < out->size) { /* Avoid weird buffer interactions by only outputting this to stderr. */ - if (msgs2stderr && DEBUG_GTE(IO, 4)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 4)) { const char *name = out == &iobuf.out ? "iobuf.out" : out == &iobuf.msg ? "iobuf.msg" : NULL; @@ -501,7 +486,7 @@ void restore_iobuf_size(xbuf *out) if (IOBUF_WAS_REDUCED(out->size)) { size_t new_size = IOBUF_RESTORE_SIZE(out->size); /* Avoid weird buffer interactions by only outputting this to stderr. */ - if (msgs2stderr && DEBUG_GTE(IO, 4)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 4)) { const char *name = out == &iobuf.out ? "iobuf.out" : out == &iobuf.msg ? "iobuf.msg" : NULL; @@ -585,7 +570,7 @@ static char *perform_io(size_t needed, int flags) exit_cleanup(RERR_PROTOCOL); } - if (msgs2stderr && DEBUG_GTE(IO, 3)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, %sinput)\n", who_am_i(), (long)needed, flags & PIO_CONSUME_INPUT ? "consume&" : ""); } @@ -599,7 +584,7 @@ static char *perform_io(size_t needed, int flags) exit_cleanup(RERR_PROTOCOL); } - if (msgs2stderr && DEBUG_GTE(IO, 3)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n", who_am_i(), (long)needed, iobuf.out.len + needed > iobuf.out.size @@ -615,7 +600,7 @@ static char *perform_io(size_t needed, int flags) exit_cleanup(RERR_PROTOCOL); } - if (msgs2stderr && DEBUG_GTE(IO, 3)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) { rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n", who_am_i(), (long)needed, iobuf.msg.len + needed > iobuf.msg.size @@ -624,7 +609,7 @@ static char *perform_io(size_t needed, int flags) break; case 0: - if (msgs2stderr && DEBUG_GTE(IO, 3)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) rprintf(FINFO, "[%s] perform_io(%ld, %d)\n", who_am_i(), (long)needed, flags); break; @@ -682,7 +667,7 @@ static char *perform_io(size_t needed, int flags) SIVAL(iobuf.out.buf + iobuf.raw_data_header_pos, 0, ((MPLEX_BASE + (int)MSG_DATA)<<24) + iobuf.out.len - 4); - if (msgs2stderr && DEBUG_GTE(IO, 1)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 1)) { rprintf(FINFO, "[%s] send_msg(%d, %ld)\n", who_am_i(), (int)MSG_DATA, (long)iobuf.out.len - 4); } @@ -768,7 +753,7 @@ static char *perform_io(size_t needed, int flags) send_extra_file_list(sock_f_out, -1); extra_flist_sending_enabled = !flist_eof; } else - check_timeout((flags & PIO_NEED_INPUT) != 0); + check_timeout((flags & PIO_NEED_INPUT) != 0, 0); FD_ZERO(&r_fds); /* Just in case... */ FD_ZERO(&w_fds); } @@ -802,12 +787,16 @@ static char *perform_io(size_t needed, int flags) exit_cleanup(RERR_SOCKETIO); } } - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] recv=%ld\n", who_am_i(), (long)n); - if (io_timeout) { + if (io_timeout || stop_at_utime) { last_io_in = time(NULL); - if (flags & PIO_NEED_INPUT) + if (stop_at_utime && last_io_in >= stop_at_utime) { + rprintf(FERROR, "stopping at requested limit\n"); + exit_cleanup(RERR_TIMEOUT); + } + if (io_timeout && flags & PIO_NEED_INPUT) maybe_send_keepalive(last_io_in, 0); } stats.total_read += n; @@ -832,12 +821,12 @@ static char *perform_io(size_t needed, int flags) msgs2stderr = 1; iobuf.out_fd = -2; iobuf.out.len = iobuf.msg.len = iobuf.raw_flushing_ends_before = 0; - rsyserr(FERROR_SOCKET, errno, "[%s] write error", who_am_i()); + rsyserr(FERROR_SOCKET, errno, "write error"); drain_multiplex_messages(); exit_cleanup(RERR_SOCKETIO); } } - if (msgs2stderr && DEBUG_GTE(IO, 2)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { rprintf(FINFO, "[%s] %s sent=%ld\n", who_am_i(), out == &iobuf.out ? "out" : "msg", (long)n); } @@ -935,6 +924,10 @@ void noop_io_until_death(void) if (!iobuf.in.buf || !iobuf.out.buf || iobuf.in_fd < 0 || iobuf.out_fd < 0 || kluge_around_eof) return; + /* If we're talking to a daemon over a socket, don't short-circuit this logic */ + if (msgs2stderr && daemon_connection >= 0) + return; + kluge_around_eof = 2; /* Setting an I/O timeout ensures that if something inexplicably weird * happens, we won't hang around forever. */ @@ -950,7 +943,7 @@ int send_msg(enum msgcode code, const char *buf, size_ { char *hdr; size_t needed, pos; - BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr || code != MSG_INFO); + BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr == 1 || code != MSG_INFO); if (!OUT_MULTIPLEXED) return 0; @@ -971,8 +964,17 @@ int send_msg(enum msgcode code, const char *buf, size_ } else #endif needed = len + 4 + 3; - if (iobuf.msg.len + needed > iobuf.msg.size) - perform_io(needed, PIO_NEED_MSGROOM); + if (iobuf.msg.len + needed > iobuf.msg.size) { + if (!am_receiver) + perform_io(needed, PIO_NEED_MSGROOM); + else { /* We allow the receiver to increase their iobuf.msg size to avoid a deadlock. */ + size_t old_size = iobuf.msg.size; + restore_iobuf_size(&iobuf.msg); + realloc_xbuf(&iobuf.msg, iobuf.msg.size * 2); + if (iobuf.msg.pos + iobuf.msg.len > old_size) + memcpy(iobuf.msg.buf + old_size, iobuf.msg.buf, iobuf.msg.pos + iobuf.msg.len - old_size); + } + } pos = iobuf.msg.pos + iobuf.msg.len; /* Must be set after any flushing. */ if (pos >= iobuf.msg.size) @@ -1067,6 +1069,9 @@ static void got_flist_entry_status(enum festatus statu if (inc_recurse) flist->in_progress++; } + } else if (checksum_files & CSF_UPDATE) { + struct file_struct *file = flist->files[ndx - flist->ndx_start]; + set_cached_checksum(flist, file); } #endif break; @@ -1130,8 +1135,7 @@ static void check_for_d_option_error(const char *msg) } if (saw_d) { - rprintf(FWARNING, - "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n"); + rprintf(FWARNING, "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n"); } } @@ -1193,7 +1197,7 @@ int read_line(int fd, char *buf, size_t bufsiz, int fl #ifdef ICONV_OPTION if (flags & RL_CONVERT && iconv_buf.size < bufsiz) - realloc_xbuf(&iconv_buf, bufsiz + 1024); + realloc_xbuf(&iconv_buf, ROUND_UP_1024(bufsiz) + 1024); #endif start: @@ -1251,8 +1255,7 @@ void read_args(int f_in, char *mod_name, char *buf, si rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0); #endif - if (!(argv = new_array(char *, maxargs))) - out_of_memory("read_args"); + argv = new_array(char *, maxargs); if (mod_name && !protect_args) argv[argc++] = "rsyncd"; @@ -1265,8 +1268,7 @@ void read_args(int f_in, char *mod_name, char *buf, si if (argc == maxargs-1) { maxargs += MAX_ARGS; - if (!(argv = realloc_array(argv, char *, maxargs))) - out_of_memory("read_args"); + argv = realloc_array(argv, char *, maxargs); } if (dot_pos) { @@ -1274,8 +1276,7 @@ void read_args(int f_in, char *mod_name, char *buf, si int len = strlen(buf); if (request_len) request_p[0][request_len++] = ' '; - if (!(*request_p = realloc_array(*request_p, char, request_len + len + 1))) - out_of_memory("read_args"); + *request_p = realloc_array(*request_p, char, request_len + len + 1); memcpy(*request_p + request_len, buf, len + 1); request_len += len; } @@ -1284,8 +1285,7 @@ void read_args(int f_in, char *mod_name, char *buf, si else glob_expand(buf, &argv, &argc, &maxargs); } else { - if (!(p = strdup(buf))) - out_of_memory("read_args"); + p = strdup(buf); argv[argc++] = p; if (*p == '.' && p[1] == '\0') dot_pos = argc; @@ -1301,7 +1301,7 @@ void read_args(int f_in, char *mod_name, char *buf, si BOOL io_start_buffering_out(int f_out) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_start_buffering_out(%d)\n", who_am_i(), f_out); if (iobuf.out.buf) { @@ -1320,7 +1320,7 @@ BOOL io_start_buffering_out(int f_out) BOOL io_start_buffering_in(int f_in) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_start_buffering_in(%d)\n", who_am_i(), f_in); if (iobuf.in.buf) { @@ -1339,7 +1339,7 @@ BOOL io_start_buffering_in(int f_in) void io_end_buffering_in(BOOL free_buffers) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { rprintf(FINFO, "[%s] io_end_buffering_in(IOBUF_%s_BUFS)\n", who_am_i(), free_buffers ? "FREE" : "KEEP"); } @@ -1354,7 +1354,7 @@ void io_end_buffering_in(BOOL free_buffers) void io_end_buffering_out(BOOL free_buffers) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) { + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) { rprintf(FINFO, "[%s] io_end_buffering_out(IOBUF_%s_BUFS)\n", who_am_i(), free_buffers ? "FREE" : "KEEP"); } @@ -1388,6 +1388,14 @@ void maybe_send_keepalive(time_t now, int flags) if (flags & MSK_ACTIVE_RECEIVER) last_io_in = now; /* Fudge things when we're working hard on the files. */ + /* Early in the transfer (before the receiver forks) the receiving side doesn't + * care if it hasn't sent data in a while as long as it is receiving data (in + * fact, a pre-3.1.0 rsync would die if we tried to send it a keep alive during + * this time). So, if we're an early-receiving proc, just return and let the + * incoming data determine if we timeout. */ + if (!am_sender && !am_receiver && !am_generator) + return; + if (now - last_io_out >= allowed_lull) { /* The receiver is special: it only sends keep-alive messages if it is * actively receiving data. Otherwise, it lets the generator timeout. */ @@ -1434,7 +1442,7 @@ static void read_a_msg(void) msg_bytes = tag & 0xFFFFFF; tag = (tag >> 24) - MPLEX_BASE; - if (DEBUG_GTE(IO, 1) && msgs2stderr) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 1)) rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes); switch (tag) { @@ -1490,6 +1498,32 @@ static void read_a_msg(void) if (am_sender) maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH); break; + case MSG_CHECKSUM: + /* This receives some checksum info that we want to make a note of + * (which allows a single process to do all the writing to the db). */ + if (msg_bytes != MSG_CHECKSUM_LEN) + goto overflow; + raw_read_buf(data, MSG_CHECKSUM_LEN); + if (am_generator && same_db) { + iobuf.in_multiplexed = 1; + send_msg(MSG_CHECKSUM, data, MSG_CHECKSUM_LEN, 0); + } if (am_receiver || (am_sender && !local_server)) + goto unexpected; + else { + /* The received data is a set of numbers followed by the checksum. */ + STRUCT_STAT st; + st.st_dev = IVAL64(data, 0); + st.st_ino = IVAL64(data, 8); + st.st_size = IVAL64(data, 16); + st.st_mtime = IVAL64(data, 24); + st.st_ctime = IVAL64(data, 32); +#if MSG_CHECKSUM_LONGS != 5 +#error Fix the parsing of checksum long values +#endif + iobuf.in_multiplexed = 1; + db_set_checksum(IVAL(data, MSG_CHECKSUM_LONGS*8), &st, data + MSG_CHECKSUM_LONGS*8 + 4); + } + break; case MSG_DELETED: if (msg_bytes >= sizeof data) goto overflow; @@ -1641,6 +1675,7 @@ static void read_a_msg(void) * with a duplicate exit message. */ _exit_cleanup(val, __FILE__, 0 - __LINE__); default: + unexpected: rprintf(FERROR, "unexpected tag %d [%s%s]\n", tag, who_am_i(), inc_recurse ? "/inc" : ""); exit_cleanup(RERR_STREAMIO); @@ -1694,7 +1729,7 @@ void wait_for_receiver(void) rprintf(FINFO, "[%s] receiving flist for dir %d\n", who_am_i(), ndx); } - flist = recv_file_list(iobuf.in_fd); + flist = recv_file_list(iobuf.in_fd, ndx); flist->parent_ndx = ndx; #ifdef SUPPORT_HARD_LINKS if (preserve_hard_links) @@ -1794,7 +1829,7 @@ int64 read_varlong(int f, uchar min_bytes) #if SIZEOF_INT64 < 8 u.x = IVAL(u.b,0); #elif CAREFUL_ALIGNMENT - u.x = IVAL(u.b,0) | (((int64)IVAL(u.b,4))<<32); + u.x = IVAL64(u.b,0); #endif return u.x; } @@ -1991,13 +2026,14 @@ static void sleep_for_bwlimit(int bytes_written) total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024); } -void io_flush(int flush_it_all) +void io_flush(int flush_type) { if (iobuf.out.len > iobuf.out_empty_len) { - if (flush_it_all) /* FULL_FLUSH: flush everything in the output buffers */ + if (flush_type == FULL_FLUSH) /* flush everything in the output buffers */ perform_io(iobuf.out.size - iobuf.out_empty_len, PIO_NEED_OUTROOM); - else /* NORMAL_FLUSH: flush at least 1 byte */ + else if (flush_type == NORMAL_FLUSH) /* flush at least 1 byte */ perform_io(iobuf.out.size - iobuf.out.len + 1, PIO_NEED_OUTROOM); + /* MSG_FLUSH: flush iobuf.msg only */ } if (iobuf.msg.len) perform_io(iobuf.msg.size, PIO_NEED_MSGROOM); @@ -2022,20 +2058,20 @@ void write_varint(int f, int32 x) { char b[5]; uchar bit; - int cnt = 4; + int cnt; SIVAL(b, 1, x); - while (cnt > 1 && b[cnt] == 0) - cnt--; + for (cnt = 4; cnt > 1 && b[cnt] == 0; cnt--) {} bit = ((uchar)1<<(7-cnt+1)); + if (CVAL(b, cnt) >= bit) { cnt++; *b = ~(bit-1); } else if (cnt > 1) *b = b[cnt] | ~(bit*2-1); else - *b = b[cnt]; + *b = b[1]; write_buf(f, b, cnt); } @@ -2046,10 +2082,10 @@ void write_varlong(int f, int64 x, uchar min_bytes) uchar bit; int cnt = 8; - SIVAL(b, 1, x); #if SIZEOF_INT64 >= 8 - SIVAL(b, 5, x >> 32); + SIVAL64(b, 1, x); #else + SIVAL(b, 1, x); if (x <= 0x7FFFFFFF && x >= 0) memset(b + 5, 0, 4); else { @@ -2096,6 +2132,19 @@ void write_longint(int f, int64 x) #endif } +void write_bigbuf(int f, const char *buf, size_t len) +{ + size_t half_max = (iobuf.out.size - iobuf.out_empty_len) / 2; + + while (len > half_max + 1024) { + write_buf(f, buf, half_max); + buf += half_max; + len -= half_max; + } + + write_buf(f, buf, len); +} + void write_buf(int f, const char *buf, size_t len) { size_t pos, siz; @@ -2279,7 +2328,7 @@ void io_printf(int fd, const char *format, ...) if (len < 0) exit_cleanup(RERR_PROTOCOL); - if (len > (int)sizeof buf) { + if (len >= (int)sizeof buf) { rprintf(FERROR, "io_printf() was too long for the buffer.\n"); exit_cleanup(RERR_PROTOCOL); } @@ -2292,7 +2341,7 @@ void io_start_multiplex_out(int fd) { io_flush(FULL_FLUSH); - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_start_multiplex_out(%d)\n", who_am_i(), fd); if (!iobuf.msg.buf) @@ -2309,7 +2358,7 @@ void io_start_multiplex_out(int fd) /* Setup for multiplexing a MSG_* stream with the data stream. */ void io_start_multiplex_in(int fd) { - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_start_multiplex_in(%d)\n", who_am_i(), fd); iobuf.in_multiplexed = 1; /* See also IN_MULTIPLEXED */ @@ -2320,7 +2369,7 @@ int io_end_multiplex_in(int mode) { int ret = iobuf.in_multiplexed ? iobuf.in_fd : -1; - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_end_multiplex_in(mode=%d)\n", who_am_i(), mode); iobuf.in_multiplexed = 0; @@ -2338,7 +2387,7 @@ int io_end_multiplex_out(int mode) { int ret = iobuf.out_empty_len ? iobuf.out_fd : -1; - if (msgs2stderr && DEBUG_GTE(IO, 2)) + if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) rprintf(FINFO, "[%s] io_end_multiplex_out(mode=%d)\n", who_am_i(), mode); if (mode != MPLX_TO_BUFFERED) @@ -2364,7 +2413,7 @@ void start_write_batch(int fd) * is involved. */ write_int(batch_fd, protocol_version); if (protocol_version >= 30) - write_byte(batch_fd, compat_flags); + write_varint(batch_fd, compat_flags); write_int(batch_fd, checksum_seed); if (am_sender)