Diff for /embedaddon/rsync/io.c between versions 1.1.1.3 and 1.1.1.4

version 1.1.1.3, 2016/11/01 09:54:32 version 1.1.1.4, 2021/03/17 00:32:36
Line 4 Line 4
  * Copyright (C) 1996-2001 Andrew Tridgell   * Copyright (C) 1996-2001 Andrew Tridgell
  * Copyright (C) 1996 Paul Mackerras   * Copyright (C) 1996 Paul Mackerras
  * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>   * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
 * Copyright (C) 2003-2015 Wayne Davison * Copyright (C) 2003-2020 Wayne Davison
  *   *
  * This program is free software; you can redistribute it and/or modify   * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by   * it under the terms of the GNU General Public License as published by
Line 41  extern int am_server; Line 41  extern int am_server;
 extern int am_sender;  extern int am_sender;
 extern int am_receiver;  extern int am_receiver;
 extern int am_generator;  extern int am_generator;
   extern int local_server;
 extern int msgs2stderr;  extern int msgs2stderr;
 extern int inc_recurse;  extern int inc_recurse;
   extern int same_db;
 extern int io_error;  extern int io_error;
   extern int batch_fd;
 extern int eol_nulls;  extern int eol_nulls;
 extern int flist_eof;  extern int flist_eof;
 extern int file_total;  extern int file_total;
Line 53  extern int read_batch; Line 56  extern int read_batch;
 extern int compat_flags;  extern int compat_flags;
 extern int protect_args;  extern int protect_args;
 extern int checksum_seed;  extern int checksum_seed;
   extern int checksum_files;
   extern int daemon_connection;
 extern int protocol_version;  extern int protocol_version;
 extern int remove_source_files;  extern int remove_source_files;
 extern int preserve_hard_links;  extern int preserve_hard_links;
 extern BOOL extra_flist_sending_enabled;  extern BOOL extra_flist_sending_enabled;
 extern BOOL flush_ok_after_signal;  extern BOOL flush_ok_after_signal;
 extern struct stats stats;  extern struct stats stats;
   extern time_t stop_at_utime;
 extern struct file_list *cur_flist;  extern struct file_list *cur_flist;
 #ifdef ICONV_OPTION  #ifdef ICONV_OPTION
 extern int filesfrom_convert;  extern int filesfrom_convert;
Line 67  extern iconv_t ic_send, ic_recv; Line 73  extern iconv_t ic_send, ic_recv;
   
 int csum_length = SHORT_SUM_LENGTH; /* initial value */  int csum_length = SHORT_SUM_LENGTH; /* initial value */
 int allowed_lull = 0;  int allowed_lull = 0;
 int batch_fd = -1;  
 int msgdone_cnt = 0;  int msgdone_cnt = 0;
 int forward_flist_data = 0;  int forward_flist_data = 0;
 BOOL flist_receiving_enabled = False;  BOOL flist_receiving_enabled = False;
Line 251  static size_t safe_read(int fd, char *buf, size_t len) Line 256  static size_t safe_read(int fd, char *buf, size_t len)
                 cnt = select(fd+1, &r_fds, NULL, &e_fds, &tv);                  cnt = select(fd+1, &r_fds, NULL, &e_fds, &tv);
                 if (cnt <= 0) {                  if (cnt <= 0) {
                         if (cnt < 0 && errno == EBADF) {                          if (cnt < 0 && errno == EBADF) {
                                rsyserr(FERROR, errno, "safe_read select failed [%s]",                                rsyserr(FERROR, errno, "safe_read select failed");
                                        who_am_i()); 
                                 exit_cleanup(RERR_FILEIO);                                  exit_cleanup(RERR_FILEIO);
                         }                          }
                         check_timeout(1, MSK_ALLOW_FLUSH);                          check_timeout(1, MSK_ALLOW_FLUSH);
Line 271  static size_t safe_read(int fd, char *buf, size_t len) Line 275  static size_t safe_read(int fd, char *buf, size_t len)
                         if (n < 0) {                          if (n < 0) {
                                 if (errno == EINTR)                                  if (errno == EINTR)
                                         continue;                                          continue;
                                rsyserr(FERROR, errno, "safe_read failed to read %ld bytes [%s]",                                rsyserr(FERROR, errno, "safe_read failed to read %ld bytes", (long)len);
                                        (long)len, who_am_i()); 
                                 exit_cleanup(RERR_STREAMIO);                                  exit_cleanup(RERR_STREAMIO);
                         }                          }
                         if ((got += (size_t)n) == len)                          if ((got += (size_t)n) == len)
Line 315  static void safe_write(int fd, const char *buf, size_t Line 318  static void safe_write(int fd, const char *buf, size_t
                 if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {                  if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
                   write_failed:                    write_failed:
                         rsyserr(FERROR, errno,                          rsyserr(FERROR, errno,
                                "safe_write failed to write %ld bytes to %s [%s]",                                "safe_write failed to write %ld bytes to %s",
                                (long)len, what_fd_is(fd), who_am_i());                                (long)len, what_fd_is(fd));
                         exit_cleanup(RERR_STREAMIO);                          exit_cleanup(RERR_STREAMIO);
                 }                  }
         } else {          } else {
Line 337  static void safe_write(int fd, const char *buf, size_t Line 340  static void safe_write(int fd, const char *buf, size_t
                 cnt = select(fd + 1, NULL, &w_fds, NULL, &tv);                  cnt = select(fd + 1, NULL, &w_fds, NULL, &tv);
                 if (cnt <= 0) {                  if (cnt <= 0) {
                         if (cnt < 0 && errno == EBADF) {                          if (cnt < 0 && errno == EBADF) {
                                rsyserr(FERROR, errno, "safe_write select failed on %s [%s]",                                rsyserr(FERROR, errno, "safe_write select failed on %s", what_fd_is(fd));
                                        what_fd_is(fd), who_am_i()); 
                                 exit_cleanup(RERR_FILEIO);                                  exit_cleanup(RERR_FILEIO);
                         }                          }
                         if (io_timeout)                          if (io_timeout)
Line 466  void reduce_iobuf_size(xbuf *out, size_t new_size) Line 468  void reduce_iobuf_size(xbuf *out, size_t new_size)
 {  {
         if (new_size < out->size) {          if (new_size < out->size) {
                 /* Avoid weird buffer interactions by only outputting this to stderr. */                  /* Avoid weird buffer interactions by only outputting this to stderr. */
                if (msgs2stderr && DEBUG_GTE(IO, 4)) {                if (msgs2stderr == 1 && DEBUG_GTE(IO, 4)) {
                         const char *name = out == &iobuf.out ? "iobuf.out"                          const char *name = out == &iobuf.out ? "iobuf.out"
                                          : out == &iobuf.msg ? "iobuf.msg"                                           : out == &iobuf.msg ? "iobuf.msg"
                                          : NULL;                                           : NULL;
Line 484  void restore_iobuf_size(xbuf *out) Line 486  void restore_iobuf_size(xbuf *out)
         if (IOBUF_WAS_REDUCED(out->size)) {          if (IOBUF_WAS_REDUCED(out->size)) {
                 size_t new_size = IOBUF_RESTORE_SIZE(out->size);                  size_t new_size = IOBUF_RESTORE_SIZE(out->size);
                 /* Avoid weird buffer interactions by only outputting this to stderr. */                  /* Avoid weird buffer interactions by only outputting this to stderr. */
                if (msgs2stderr && DEBUG_GTE(IO, 4)) {                if (msgs2stderr == 1 && DEBUG_GTE(IO, 4)) {
                         const char *name = out == &iobuf.out ? "iobuf.out"                          const char *name = out == &iobuf.out ? "iobuf.out"
                                          : out == &iobuf.msg ? "iobuf.msg"                                           : out == &iobuf.msg ? "iobuf.msg"
                                          : NULL;                                           : NULL;
Line 568  static char *perform_io(size_t needed, int flags) Line 570  static char *perform_io(size_t needed, int flags)
                         exit_cleanup(RERR_PROTOCOL);                          exit_cleanup(RERR_PROTOCOL);
                 }                  }
   
                if (msgs2stderr && DEBUG_GTE(IO, 3)) {                if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) {
                         rprintf(FINFO, "[%s] perform_io(%ld, %sinput)\n",                          rprintf(FINFO, "[%s] perform_io(%ld, %sinput)\n",
                                 who_am_i(), (long)needed, flags & PIO_CONSUME_INPUT ? "consume&" : "");                                  who_am_i(), (long)needed, flags & PIO_CONSUME_INPUT ? "consume&" : "");
                 }                  }
Line 582  static char *perform_io(size_t needed, int flags) Line 584  static char *perform_io(size_t needed, int flags)
                         exit_cleanup(RERR_PROTOCOL);                          exit_cleanup(RERR_PROTOCOL);
                 }                  }
   
                if (msgs2stderr && DEBUG_GTE(IO, 3)) {                if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) {
                         rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n",                          rprintf(FINFO, "[%s] perform_io(%ld, outroom) needs to flush %ld\n",
                                 who_am_i(), (long)needed,                                  who_am_i(), (long)needed,
                                 iobuf.out.len + needed > iobuf.out.size                                  iobuf.out.len + needed > iobuf.out.size
Line 598  static char *perform_io(size_t needed, int flags) Line 600  static char *perform_io(size_t needed, int flags)
                         exit_cleanup(RERR_PROTOCOL);                          exit_cleanup(RERR_PROTOCOL);
                 }                  }
   
                if (msgs2stderr && DEBUG_GTE(IO, 3)) {                if (msgs2stderr == 1 && DEBUG_GTE(IO, 3)) {
                         rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n",                          rprintf(FINFO, "[%s] perform_io(%ld, msgroom) needs to flush %ld\n",
                                 who_am_i(), (long)needed,                                  who_am_i(), (long)needed,
                                 iobuf.msg.len + needed > iobuf.msg.size                                  iobuf.msg.len + needed > iobuf.msg.size
Line 607  static char *perform_io(size_t needed, int flags) Line 609  static char *perform_io(size_t needed, int flags)
                 break;                  break;
   
         case 0:          case 0:
                if (msgs2stderr && DEBUG_GTE(IO, 3))                if (msgs2stderr == 1 && DEBUG_GTE(IO, 3))
                         rprintf(FINFO, "[%s] perform_io(%ld, %d)\n", who_am_i(), (long)needed, flags);                          rprintf(FINFO, "[%s] perform_io(%ld, %d)\n", who_am_i(), (long)needed, flags);
                 break;                  break;
   
Line 665  static char *perform_io(size_t needed, int flags) Line 667  static char *perform_io(size_t needed, int flags)
                                         SIVAL(iobuf.out.buf + iobuf.raw_data_header_pos, 0,                                          SIVAL(iobuf.out.buf + iobuf.raw_data_header_pos, 0,
                                               ((MPLEX_BASE + (int)MSG_DATA)<<24) + iobuf.out.len - 4);                                                ((MPLEX_BASE + (int)MSG_DATA)<<24) + iobuf.out.len - 4);
   
                                        if (msgs2stderr && DEBUG_GTE(IO, 1)) {                                        if (msgs2stderr == 1 && DEBUG_GTE(IO, 1)) {
                                                 rprintf(FINFO, "[%s] send_msg(%d, %ld)\n",                                                  rprintf(FINFO, "[%s] send_msg(%d, %ld)\n",
                                                         who_am_i(), (int)MSG_DATA, (long)iobuf.out.len - 4);                                                          who_am_i(), (int)MSG_DATA, (long)iobuf.out.len - 4);
                                         }                                          }
Line 785  static char *perform_io(size_t needed, int flags) Line 787  static char *perform_io(size_t needed, int flags)
                                         exit_cleanup(RERR_SOCKETIO);                                          exit_cleanup(RERR_SOCKETIO);
                                 }                                  }
                         }                          }
                        if (msgs2stderr && DEBUG_GTE(IO, 2))                        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2))
                                 rprintf(FINFO, "[%s] recv=%ld\n", who_am_i(), (long)n);                                  rprintf(FINFO, "[%s] recv=%ld\n", who_am_i(), (long)n);
   
                        if (io_timeout) {                        if (io_timeout || stop_at_utime) {
                                 last_io_in = time(NULL);                                  last_io_in = time(NULL);
                                if (flags & PIO_NEED_INPUT)                                if (stop_at_utime && last_io_in >= stop_at_utime) {
                                         rprintf(FERROR, "stopping at requested limit\n");
                                         exit_cleanup(RERR_TIMEOUT);
                                 }
                                 if (io_timeout && flags & PIO_NEED_INPUT)
                                         maybe_send_keepalive(last_io_in, 0);                                          maybe_send_keepalive(last_io_in, 0);
                         }                          }
                         stats.total_read += n;                          stats.total_read += n;
Line 815  static char *perform_io(size_t needed, int flags) Line 821  static char *perform_io(size_t needed, int flags)
                                         msgs2stderr = 1;                                          msgs2stderr = 1;
                                         iobuf.out_fd = -2;                                          iobuf.out_fd = -2;
                                         iobuf.out.len = iobuf.msg.len = iobuf.raw_flushing_ends_before = 0;                                          iobuf.out.len = iobuf.msg.len = iobuf.raw_flushing_ends_before = 0;
                                        rsyserr(FERROR_SOCKET, errno, "[%s] write error", who_am_i());                                        rsyserr(FERROR_SOCKET, errno, "write error");
                                         drain_multiplex_messages();                                          drain_multiplex_messages();
                                         exit_cleanup(RERR_SOCKETIO);                                          exit_cleanup(RERR_SOCKETIO);
                                 }                                  }
                         }                          }
                        if (msgs2stderr && DEBUG_GTE(IO, 2)) {                        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) {
                                 rprintf(FINFO, "[%s] %s sent=%ld\n",                                  rprintf(FINFO, "[%s] %s sent=%ld\n",
                                         who_am_i(), out == &iobuf.out ? "out" : "msg", (long)n);                                          who_am_i(), out == &iobuf.out ? "out" : "msg", (long)n);
                         }                          }
Line 918  void noop_io_until_death(void) Line 924  void noop_io_until_death(void)
         if (!iobuf.in.buf || !iobuf.out.buf || iobuf.in_fd < 0 || iobuf.out_fd < 0 || kluge_around_eof)          if (!iobuf.in.buf || !iobuf.out.buf || iobuf.in_fd < 0 || iobuf.out_fd < 0 || kluge_around_eof)
                 return;                  return;
   
           /* If we're talking to a daemon over a socket, don't short-circuit this logic */
           if (msgs2stderr && daemon_connection >= 0)
                   return;
   
         kluge_around_eof = 2;          kluge_around_eof = 2;
         /* Setting an I/O timeout ensures that if something inexplicably weird          /* Setting an I/O timeout ensures that if something inexplicably weird
          * happens, we won't hang around forever. */           * happens, we won't hang around forever. */
Line 933  int send_msg(enum msgcode code, const char *buf, size_ Line 943  int send_msg(enum msgcode code, const char *buf, size_
 {  {
         char *hdr;          char *hdr;
         size_t needed, pos;          size_t needed, pos;
        BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr || code != MSG_INFO);        BOOL want_debug = DEBUG_GTE(IO, 1) && convert >= 0 && (msgs2stderr == 1 || code != MSG_INFO);
   
         if (!OUT_MULTIPLEXED)          if (!OUT_MULTIPLEXED)
                 return 0;                  return 0;
Line 954  int send_msg(enum msgcode code, const char *buf, size_ Line 964  int send_msg(enum msgcode code, const char *buf, size_
         } else          } else
 #endif  #endif
                 needed = len + 4 + 3;                  needed = len + 4 + 3;
        if (iobuf.msg.len + needed > iobuf.msg.size)        if (iobuf.msg.len + needed > iobuf.msg.size) {
                perform_io(needed, PIO_NEED_MSGROOM);                if (!am_receiver)
                         perform_io(needed, PIO_NEED_MSGROOM);
                 else { /* We allow the receiver to increase their iobuf.msg size to avoid a deadlock. */
                         size_t old_size = iobuf.msg.size;
                         restore_iobuf_size(&iobuf.msg);
                         realloc_xbuf(&iobuf.msg, iobuf.msg.size * 2);
                         if (iobuf.msg.pos + iobuf.msg.len > old_size)
                                 memcpy(iobuf.msg.buf + old_size, iobuf.msg.buf, iobuf.msg.pos + iobuf.msg.len - old_size);
                 }
         }
   
         pos = iobuf.msg.pos + iobuf.msg.len; /* Must be set after any flushing. */          pos = iobuf.msg.pos + iobuf.msg.len; /* Must be set after any flushing. */
         if (pos >= iobuf.msg.size)          if (pos >= iobuf.msg.size)
Line 1050  static void got_flist_entry_status(enum festatus statu Line 1069  static void got_flist_entry_status(enum festatus statu
                                 if (inc_recurse)                                  if (inc_recurse)
                                         flist->in_progress++;                                          flist->in_progress++;
                         }                          }
                   } else if (checksum_files & CSF_UPDATE) {
                           struct file_struct *file = flist->files[ndx - flist->ndx_start];
                           set_cached_checksum(flist, file);
                 }                  }
 #endif  #endif
                 break;                  break;
Line 1113  static void check_for_d_option_error(const char *msg) Line 1135  static void check_for_d_option_error(const char *msg)
         }          }
   
         if (saw_d) {          if (saw_d) {
                rprintf(FWARNING,                rprintf(FWARNING, "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n");
                    "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n"); 
         }          }
 }  }
   
Line 1176  int read_line(int fd, char *buf, size_t bufsiz, int fl Line 1197  int read_line(int fd, char *buf, size_t bufsiz, int fl
   
 #ifdef ICONV_OPTION  #ifdef ICONV_OPTION
         if (flags & RL_CONVERT && iconv_buf.size < bufsiz)          if (flags & RL_CONVERT && iconv_buf.size < bufsiz)
                realloc_xbuf(&iconv_buf, bufsiz + 1024);                realloc_xbuf(&iconv_buf, ROUND_UP_1024(bufsiz) + 1024);
 #endif  #endif
   
   start:    start:
Line 1234  void read_args(int f_in, char *mod_name, char *buf, si Line 1255  void read_args(int f_in, char *mod_name, char *buf, si
         rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0);          rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0);
 #endif  #endif
   
        if (!(argv = new_array(char *, maxargs)))        argv = new_array(char *, maxargs);
                out_of_memory("read_args"); 
         if (mod_name && !protect_args)          if (mod_name && !protect_args)
                 argv[argc++] = "rsyncd";                  argv[argc++] = "rsyncd";
   
Line 1248  void read_args(int f_in, char *mod_name, char *buf, si Line 1268  void read_args(int f_in, char *mod_name, char *buf, si
   
                 if (argc == maxargs-1) {                  if (argc == maxargs-1) {
                         maxargs += MAX_ARGS;                          maxargs += MAX_ARGS;
                        if (!(argv = realloc_array(argv, char *, maxargs)))                        argv = realloc_array(argv, char *, maxargs);
                                out_of_memory("read_args"); 
                 }                  }
   
                 if (dot_pos) {                  if (dot_pos) {
Line 1257  void read_args(int f_in, char *mod_name, char *buf, si Line 1276  void read_args(int f_in, char *mod_name, char *buf, si
                                 int len = strlen(buf);                                  int len = strlen(buf);
                                 if (request_len)                                  if (request_len)
                                         request_p[0][request_len++] = ' ';                                          request_p[0][request_len++] = ' ';
                                if (!(*request_p = realloc_array(*request_p, char, request_len + len + 1)))                                *request_p = realloc_array(*request_p, char, request_len + len + 1);
                                        out_of_memory("read_args"); 
                                 memcpy(*request_p + request_len, buf, len + 1);                                  memcpy(*request_p + request_len, buf, len + 1);
                                 request_len += len;                                  request_len += len;
                         }                          }
Line 1267  void read_args(int f_in, char *mod_name, char *buf, si Line 1285  void read_args(int f_in, char *mod_name, char *buf, si
                         else                          else
                                 glob_expand(buf, &argv, &argc, &maxargs);                                  glob_expand(buf, &argv, &argc, &maxargs);
                 } else {                  } else {
                        if (!(p = strdup(buf)))                        p = strdup(buf);
                                out_of_memory("read_args"); 
                         argv[argc++] = p;                          argv[argc++] = p;
                         if (*p == '.' && p[1] == '\0')                          if (*p == '.' && p[1] == '\0')
                                 dot_pos = argc;                                  dot_pos = argc;
Line 1284  void read_args(int f_in, char *mod_name, char *buf, si Line 1301  void read_args(int f_in, char *mod_name, char *buf, si
   
 BOOL io_start_buffering_out(int f_out)  BOOL io_start_buffering_out(int f_out)
 {  {
        if (msgs2stderr && DEBUG_GTE(IO, 2))        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2))
                 rprintf(FINFO, "[%s] io_start_buffering_out(%d)\n", who_am_i(), f_out);                  rprintf(FINFO, "[%s] io_start_buffering_out(%d)\n", who_am_i(), f_out);
   
         if (iobuf.out.buf) {          if (iobuf.out.buf) {
Line 1303  BOOL io_start_buffering_out(int f_out) Line 1320  BOOL io_start_buffering_out(int f_out)
   
 BOOL io_start_buffering_in(int f_in)  BOOL io_start_buffering_in(int f_in)
 {  {
        if (msgs2stderr && DEBUG_GTE(IO, 2))        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2))
                 rprintf(FINFO, "[%s] io_start_buffering_in(%d)\n", who_am_i(), f_in);                  rprintf(FINFO, "[%s] io_start_buffering_in(%d)\n", who_am_i(), f_in);
   
         if (iobuf.in.buf) {          if (iobuf.in.buf) {
Line 1322  BOOL io_start_buffering_in(int f_in) Line 1339  BOOL io_start_buffering_in(int f_in)
   
 void io_end_buffering_in(BOOL free_buffers)  void io_end_buffering_in(BOOL free_buffers)
 {  {
        if (msgs2stderr && DEBUG_GTE(IO, 2)) {        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) {
                 rprintf(FINFO, "[%s] io_end_buffering_in(IOBUF_%s_BUFS)\n",                  rprintf(FINFO, "[%s] io_end_buffering_in(IOBUF_%s_BUFS)\n",
                         who_am_i(), free_buffers ? "FREE" : "KEEP");                          who_am_i(), free_buffers ? "FREE" : "KEEP");
         }          }
Line 1337  void io_end_buffering_in(BOOL free_buffers) Line 1354  void io_end_buffering_in(BOOL free_buffers)
   
 void io_end_buffering_out(BOOL free_buffers)  void io_end_buffering_out(BOOL free_buffers)
 {  {
        if (msgs2stderr && DEBUG_GTE(IO, 2)) {        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2)) {
                 rprintf(FINFO, "[%s] io_end_buffering_out(IOBUF_%s_BUFS)\n",                  rprintf(FINFO, "[%s] io_end_buffering_out(IOBUF_%s_BUFS)\n",
                         who_am_i(), free_buffers ? "FREE" : "KEEP");                          who_am_i(), free_buffers ? "FREE" : "KEEP");
         }          }
Line 1425  static void read_a_msg(void) Line 1442  static void read_a_msg(void)
         msg_bytes = tag & 0xFFFFFF;          msg_bytes = tag & 0xFFFFFF;
         tag = (tag >> 24) - MPLEX_BASE;          tag = (tag >> 24) - MPLEX_BASE;
   
        if (DEBUG_GTE(IO, 1) && msgs2stderr)        if (msgs2stderr == 1 && DEBUG_GTE(IO, 1))
                 rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes);                  rprintf(FINFO, "[%s] got msg=%d, len=%ld\n", who_am_i(), (int)tag, (long)msg_bytes);
   
         switch (tag) {          switch (tag) {
Line 1481  static void read_a_msg(void) Line 1498  static void read_a_msg(void)
                 if (am_sender)                  if (am_sender)
                         maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH);                          maybe_send_keepalive(time(NULL), MSK_ALLOW_FLUSH);
                 break;                  break;
           case MSG_CHECKSUM:
                   /* This receives some checksum info that we want to make a note of
                    * (which allows a single process to do all the writing to the db). */
                   if (msg_bytes != MSG_CHECKSUM_LEN)
                           goto overflow;
                   raw_read_buf(data, MSG_CHECKSUM_LEN);
                   if (am_generator && same_db) {
                           iobuf.in_multiplexed = 1;
                           send_msg(MSG_CHECKSUM, data, MSG_CHECKSUM_LEN, 0);
                   } if (am_receiver || (am_sender && !local_server))
                           goto unexpected;
                   else {
                           /* The received data is a set of numbers followed by the checksum. */
                           STRUCT_STAT st;
                           st.st_dev = IVAL64(data, 0);
                           st.st_ino = IVAL64(data, 8);
                           st.st_size = IVAL64(data, 16);
                           st.st_mtime = IVAL64(data, 24);
                           st.st_ctime = IVAL64(data, 32);
   #if MSG_CHECKSUM_LONGS != 5
   #error Fix the parsing of checksum long values
   #endif
                           iobuf.in_multiplexed = 1;
                           db_set_checksum(IVAL(data, MSG_CHECKSUM_LONGS*8), &st, data + MSG_CHECKSUM_LONGS*8 + 4);
                   }
                   break;
         case MSG_DELETED:          case MSG_DELETED:
                 if (msg_bytes >= sizeof data)                  if (msg_bytes >= sizeof data)
                         goto overflow;                          goto overflow;
Line 1632  static void read_a_msg(void) Line 1675  static void read_a_msg(void)
                  * with a duplicate exit message. */                   * with a duplicate exit message. */
                 _exit_cleanup(val, __FILE__, 0 - __LINE__);                  _exit_cleanup(val, __FILE__, 0 - __LINE__);
         default:          default:
           unexpected:
                 rprintf(FERROR, "unexpected tag %d [%s%s]\n",                  rprintf(FERROR, "unexpected tag %d [%s%s]\n",
                         tag, who_am_i(), inc_recurse ? "/inc" : "");                          tag, who_am_i(), inc_recurse ? "/inc" : "");
                 exit_cleanup(RERR_STREAMIO);                  exit_cleanup(RERR_STREAMIO);
Line 1982  static void sleep_for_bwlimit(int bytes_written) Line 2026  static void sleep_for_bwlimit(int bytes_written)
         total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);          total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
 }  }
   
void io_flush(int flush_it_all)void io_flush(int flush_type)
 {  {
         if (iobuf.out.len > iobuf.out_empty_len) {          if (iobuf.out.len > iobuf.out_empty_len) {
                if (flush_it_all) /* FULL_FLUSH: flush everything in the output buffers */                if (flush_type == FULL_FLUSH)               /* flush everything in the output buffers */
                         perform_io(iobuf.out.size - iobuf.out_empty_len, PIO_NEED_OUTROOM);                          perform_io(iobuf.out.size - iobuf.out_empty_len, PIO_NEED_OUTROOM);
                else /* NORMAL_FLUSH: flush at least 1 byte */                else if (flush_type == NORMAL_FLUSH)       /* flush at least 1 byte */
                         perform_io(iobuf.out.size - iobuf.out.len + 1, PIO_NEED_OUTROOM);                          perform_io(iobuf.out.size - iobuf.out.len + 1, PIO_NEED_OUTROOM);
                                                           /* MSG_FLUSH: flush iobuf.msg only */
         }          }
         if (iobuf.msg.len)          if (iobuf.msg.len)
                 perform_io(iobuf.msg.size, PIO_NEED_MSGROOM);                  perform_io(iobuf.msg.size, PIO_NEED_MSGROOM);
Line 2013  void write_varint(int f, int32 x) Line 2058  void write_varint(int f, int32 x)
 {  {
         char b[5];          char b[5];
         uchar bit;          uchar bit;
        int cnt = 4;        int cnt;
   
         SIVAL(b, 1, x);          SIVAL(b, 1, x);
   
        while (cnt > 1 && b[cnt] == 0)        for (cnt = 4; cnt > 1 && b[cnt] == 0; cnt--) {}
                cnt--; 
         bit = ((uchar)1<<(7-cnt+1));          bit = ((uchar)1<<(7-cnt+1));
   
         if (CVAL(b, cnt) >= bit) {          if (CVAL(b, cnt) >= bit) {
                 cnt++;                  cnt++;
                 *b = ~(bit-1);                  *b = ~(bit-1);
         } else if (cnt > 1)          } else if (cnt > 1)
                 *b = b[cnt] | ~(bit*2-1);                  *b = b[cnt] | ~(bit*2-1);
         else          else
                *b = b[cnt];                *b = b[1];
   
         write_buf(f, b, cnt);          write_buf(f, b, cnt);
 }  }
Line 2296  void io_start_multiplex_out(int fd) Line 2341  void io_start_multiplex_out(int fd)
 {  {
         io_flush(FULL_FLUSH);          io_flush(FULL_FLUSH);
   
        if (msgs2stderr && DEBUG_GTE(IO, 2))        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2))
                 rprintf(FINFO, "[%s] io_start_multiplex_out(%d)\n", who_am_i(), fd);                  rprintf(FINFO, "[%s] io_start_multiplex_out(%d)\n", who_am_i(), fd);
   
         if (!iobuf.msg.buf)          if (!iobuf.msg.buf)
Line 2313  void io_start_multiplex_out(int fd) Line 2358  void io_start_multiplex_out(int fd)
 /* Setup for multiplexing a MSG_* stream with the data stream. */  /* Setup for multiplexing a MSG_* stream with the data stream. */
 void io_start_multiplex_in(int fd)  void io_start_multiplex_in(int fd)
 {  {
        if (msgs2stderr && DEBUG_GTE(IO, 2))        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2))
                 rprintf(FINFO, "[%s] io_start_multiplex_in(%d)\n", who_am_i(), fd);                  rprintf(FINFO, "[%s] io_start_multiplex_in(%d)\n", who_am_i(), fd);
   
         iobuf.in_multiplexed = 1; /* See also IN_MULTIPLEXED */          iobuf.in_multiplexed = 1; /* See also IN_MULTIPLEXED */
Line 2324  int io_end_multiplex_in(int mode) Line 2369  int io_end_multiplex_in(int mode)
 {  {
         int ret = iobuf.in_multiplexed ? iobuf.in_fd : -1;          int ret = iobuf.in_multiplexed ? iobuf.in_fd : -1;
   
        if (msgs2stderr && DEBUG_GTE(IO, 2))        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2))
                 rprintf(FINFO, "[%s] io_end_multiplex_in(mode=%d)\n", who_am_i(), mode);                  rprintf(FINFO, "[%s] io_end_multiplex_in(mode=%d)\n", who_am_i(), mode);
   
         iobuf.in_multiplexed = 0;          iobuf.in_multiplexed = 0;
Line 2342  int io_end_multiplex_out(int mode) Line 2387  int io_end_multiplex_out(int mode)
 {  {
         int ret = iobuf.out_empty_len ? iobuf.out_fd : -1;          int ret = iobuf.out_empty_len ? iobuf.out_fd : -1;
   
        if (msgs2stderr && DEBUG_GTE(IO, 2))        if (msgs2stderr == 1 && DEBUG_GTE(IO, 2))
                 rprintf(FINFO, "[%s] io_end_multiplex_out(mode=%d)\n", who_am_i(), mode);                  rprintf(FINFO, "[%s] io_end_multiplex_out(mode=%d)\n", who_am_i(), mode);
   
         if (mode != MPLX_TO_BUFFERED)          if (mode != MPLX_TO_BUFFERED)
Line 2368  void start_write_batch(int fd) Line 2413  void start_write_batch(int fd)
          * is involved. */           * is involved. */
         write_int(batch_fd, protocol_version);          write_int(batch_fd, protocol_version);
         if (protocol_version >= 30)          if (protocol_version >= 30)
                write_byte(batch_fd, compat_flags);                write_varint(batch_fd, compat_flags);
         write_int(batch_fd, checksum_seed);          write_int(batch_fd, checksum_seed);
   
         if (am_sender)          if (am_sender)

Removed from v.1.1.1.3  
changed lines
  Added in v.1.1.1.4


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>