Annotation of embedaddon/rsync/io.c, revision 1.1

1.1     ! misho       1: /*
        !             2:  * Socket and pipe I/O utilities used in rsync.
        !             3:  *
        !             4:  * Copyright (C) 1996-2001 Andrew Tridgell
        !             5:  * Copyright (C) 1996 Paul Mackerras
        !             6:  * Copyright (C) 2001, 2002 Martin Pool <mbp@samba.org>
        !             7:  * Copyright (C) 2003-2009 Wayne Davison
        !             8:  *
        !             9:  * This program is free software; you can redistribute it and/or modify
        !            10:  * it under the terms of the GNU General Public License as published by
        !            11:  * the Free Software Foundation; either version 3 of the License, or
        !            12:  * (at your option) any later version.
        !            13:  *
        !            14:  * This program is distributed in the hope that it will be useful,
        !            15:  * but WITHOUT ANY WARRANTY; without even the implied warranty of
        !            16:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
        !            17:  * GNU General Public License for more details.
        !            18:  *
        !            19:  * You should have received a copy of the GNU General Public License along
        !            20:  * with this program; if not, visit the http://fsf.org website.
        !            21:  */
        !            22: 
        !            23: /* Rsync provides its own multiplexing system, which is used to send
        !            24:  * stderr and stdout over a single socket.
        !            25:  *
        !            26:  * For historical reasons this is off during the start of the
        !            27:  * connection, but it's switched on quite early using
        !            28:  * io_start_multiplex_out() and io_start_multiplex_in(). */
        !            29: 
        !            30: #include "rsync.h"
        !            31: #include "ifuncs.h"
        !            32: 
        !            33: /** If no timeout is specified then use a 60 second select timeout */
        !            34: #define SELECT_TIMEOUT 60
        !            35: 
        !            36: extern int bwlimit;
        !            37: extern size_t bwlimit_writemax;
        !            38: extern int io_timeout;
        !            39: extern int am_server;
        !            40: extern int am_daemon;
        !            41: extern int am_sender;
        !            42: extern int am_generator;
        !            43: extern int inc_recurse;
        !            44: extern int io_error;
        !            45: extern int eol_nulls;
        !            46: extern int flist_eof;
        !            47: extern int list_only;
        !            48: extern int read_batch;
        !            49: extern int compat_flags;
        !            50: extern int protect_args;
        !            51: extern int checksum_seed;
        !            52: extern int protocol_version;
        !            53: extern int remove_source_files;
        !            54: extern int preserve_hard_links;
        !            55: extern struct stats stats;
        !            56: extern struct file_list *cur_flist;
        !            57: #ifdef ICONV_OPTION
        !            58: extern int filesfrom_convert;
        !            59: extern iconv_t ic_send, ic_recv;
        !            60: #endif
        !            61: 
        !            62: int csum_length = SHORT_SUM_LENGTH; /* initial value */
        !            63: int allowed_lull = 0;
        !            64: int ignore_timeout = 0;
        !            65: int batch_fd = -1;
        !            66: int msgdone_cnt = 0;
        !            67: 
        !            68: /* Ignore an EOF error if non-zero. See whine_about_eof(). */
        !            69: int kluge_around_eof = 0;
        !            70: 
        !            71: int msg_fd_in = -1;
        !            72: int msg_fd_out = -1;
        !            73: int sock_f_in = -1;
        !            74: int sock_f_out = -1;
        !            75: 
        !            76: static int iobuf_f_in = -1;
        !            77: static char *iobuf_in;
        !            78: static size_t iobuf_in_siz;
        !            79: static size_t iobuf_in_ndx;
        !            80: static size_t iobuf_in_remaining;
        !            81: 
        !            82: static int iobuf_f_out = -1;
        !            83: static char *iobuf_out;
        !            84: static int iobuf_out_cnt;
        !            85: 
        !            86: int flist_forward_from = -1;
        !            87: 
        !            88: static int io_multiplexing_out;
        !            89: static int io_multiplexing_in;
        !            90: static time_t last_io_in;
        !            91: static time_t last_io_out;
        !            92: static int no_flush;
        !            93: 
        !            94: static int write_batch_monitor_in = -1;
        !            95: static int write_batch_monitor_out = -1;
        !            96: 
        !            97: static int io_filesfrom_f_in = -1;
        !            98: static int io_filesfrom_f_out = -1;
        !            99: static xbuf ff_buf = EMPTY_XBUF;
        !           100: static char ff_lastchar;
        !           101: #ifdef ICONV_OPTION
        !           102: static xbuf iconv_buf = EMPTY_XBUF;
        !           103: #endif
        !           104: static int defer_forwarding_messages = 0, keep_defer_forwarding = 0;
        !           105: static int select_timeout = SELECT_TIMEOUT;
        !           106: static int active_filecnt = 0;
        !           107: static OFF_T active_bytecnt = 0;
        !           108: static int first_message = 1;
        !           109: 
        !           110: static char int_byte_extra[64] = {
        !           111:        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (00 - 3F)/4 */
        !           112:        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, /* (40 - 7F)/4 */
        !           113:        1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, /* (80 - BF)/4 */
        !           114:        2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 4, 4, 5, 6, /* (C0 - FF)/4 */
        !           115: };
        !           116: 
        !           117: #define REMOTE_OPTION_ERROR "rsync: on remote machine: -"
        !           118: #define REMOTE_OPTION_ERROR2 ": unknown option"
        !           119: 
        !           120: enum festatus { FES_SUCCESS, FES_REDO, FES_NO_SEND };
        !           121: 
        !           122: static void check_timeout(void)
        !           123: {
        !           124:        time_t t, chk;
        !           125: 
        !           126:        if (!io_timeout || ignore_timeout)
        !           127:                return;
        !           128: 
        !           129:        t = time(NULL);
        !           130: 
        !           131:        if (!last_io_in)
        !           132:                last_io_in = t;
        !           133: 
        !           134:        chk = MAX(last_io_out, last_io_in);
        !           135:        if (t - chk >= io_timeout) {
        !           136:                if (am_server || am_daemon)
        !           137:                        exit_cleanup(RERR_TIMEOUT);
        !           138:                rprintf(FERROR, "[%s] io timeout after %d seconds -- exiting\n",
        !           139:                        who_am_i(), (int)(t-chk));
        !           140:                exit_cleanup(RERR_TIMEOUT);
        !           141:        }
        !           142: }
        !           143: 
        !           144: static void readfd(int fd, char *buffer, size_t N);
        !           145: static void writefd(int fd, const char *buf, size_t len);
        !           146: static void writefd_unbuffered(int fd, const char *buf, size_t len);
        !           147: static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert);
        !           148: 
        !           149: static flist_ndx_list redo_list, hlink_list;
        !           150: 
        !           151: struct msg_list_item {
        !           152:        struct msg_list_item *next;
        !           153:        char convert;
        !           154:        char buf[1];
        !           155: };
        !           156: 
        !           157: struct msg_list {
        !           158:        struct msg_list_item *head, *tail;
        !           159: };
        !           160: 
        !           161: static struct msg_list msg_queue;
        !           162: 
        !           163: static void got_flist_entry_status(enum festatus status, const char *buf)
        !           164: {
        !           165:        int ndx = IVAL(buf, 0);
        !           166:        struct file_list *flist = flist_for_ndx(ndx, "got_flist_entry_status");
        !           167: 
        !           168:        if (remove_source_files) {
        !           169:                active_filecnt--;
        !           170:                active_bytecnt -= F_LENGTH(flist->files[ndx - flist->ndx_start]);
        !           171:        }
        !           172: 
        !           173:        if (inc_recurse)
        !           174:                flist->in_progress--;
        !           175: 
        !           176:        switch (status) {
        !           177:        case FES_SUCCESS:
        !           178:                if (remove_source_files)
        !           179:                        send_msg(MSG_SUCCESS, buf, 4, 0);
        !           180:                /* FALL THROUGH */
        !           181:        case FES_NO_SEND:
        !           182: #ifdef SUPPORT_HARD_LINKS
        !           183:                if (preserve_hard_links) {
        !           184:                        struct file_struct *file = flist->files[ndx - flist->ndx_start];
        !           185:                        if (F_IS_HLINKED(file)) {
        !           186:                                if (status == FES_NO_SEND)
        !           187:                                        flist_ndx_push(&hlink_list, -2); /* indicates a failure follows */
        !           188:                                flist_ndx_push(&hlink_list, ndx);
        !           189:                                flist->in_progress++;
        !           190:                        }
        !           191:                }
        !           192: #endif
        !           193:                break;
        !           194:        case FES_REDO:
        !           195:                if (read_batch) {
        !           196:                        if (inc_recurse)
        !           197:                                flist->in_progress++;
        !           198:                        break;
        !           199:                }
        !           200:                if (inc_recurse)
        !           201:                        flist->to_redo++;
        !           202:                flist_ndx_push(&redo_list, ndx);
        !           203:                break;
        !           204:        }
        !           205: }
        !           206: 
        !           207: /* Note the fds used for the main socket (which might really be a pipe
        !           208:  * for a local transfer, but we can ignore that). */
        !           209: void io_set_sock_fds(int f_in, int f_out)
        !           210: {
        !           211:        sock_f_in = f_in;
        !           212:        sock_f_out = f_out;
        !           213: }
        !           214: 
        !           215: void set_io_timeout(int secs)
        !           216: {
        !           217:        io_timeout = secs;
        !           218:        allowed_lull = (io_timeout + 1) / 2;
        !           219: 
        !           220:        if (!io_timeout || allowed_lull > SELECT_TIMEOUT)
        !           221:                select_timeout = SELECT_TIMEOUT;
        !           222:        else
        !           223:                select_timeout = allowed_lull;
        !           224: 
        !           225:        if (read_batch)
        !           226:                allowed_lull = 0;
        !           227: }
        !           228: 
        !           229: /* Setup the fd used to receive MSG_* messages.  Only needed during the
        !           230:  * early stages of being a local sender (up through the sending of the
        !           231:  * file list) or when we're the generator (to fetch the messages from
        !           232:  * the receiver). */
        !           233: void set_msg_fd_in(int fd)
        !           234: {
        !           235:        msg_fd_in = fd;
        !           236: }
        !           237: 
        !           238: /* Setup the fd used to send our MSG_* messages.  Only needed when
        !           239:  * we're the receiver (to send our messages to the generator). */
        !           240: void set_msg_fd_out(int fd)
        !           241: {
        !           242:        msg_fd_out = fd;
        !           243:        set_nonblocking(msg_fd_out);
        !           244: }
        !           245: 
        !           246: /* Add a message to the pending MSG_* list. */
        !           247: static void msg_list_add(struct msg_list *lst, int code, const char *buf, int len, int convert)
        !           248: {
        !           249:        struct msg_list_item *m;
        !           250:        int sz = len + 4 + sizeof m[0] - 1;
        !           251: 
        !           252:        if (!(m = (struct msg_list_item *)new_array(char, sz)))
        !           253:                out_of_memory("msg_list_add");
        !           254:        m->next = NULL;
        !           255:        m->convert = convert;
        !           256:        SIVAL(m->buf, 0, ((code+MPLEX_BASE)<<24) | len);
        !           257:        memcpy(m->buf + 4, buf, len);
        !           258:        if (lst->tail)
        !           259:                lst->tail->next = m;
        !           260:        else
        !           261:                lst->head = m;
        !           262:        lst->tail = m;
        !           263: }
        !           264: 
        !           265: static inline int flush_a_msg(int fd)
        !           266: {
        !           267:        struct msg_list_item *m = msg_queue.head;
        !           268:        int len = IVAL(m->buf, 0) & 0xFFFFFF;
        !           269:        int tag = *((uchar*)m->buf+3) - MPLEX_BASE;
        !           270: 
        !           271:        if (!(msg_queue.head = m->next))
        !           272:                msg_queue.tail = NULL;
        !           273: 
        !           274:        defer_forwarding_messages++;
        !           275:        mplex_write(fd, tag, m->buf + 4, len, m->convert);
        !           276:        defer_forwarding_messages--;
        !           277: 
        !           278:        free(m);
        !           279: 
        !           280:        return len;
        !           281: }
        !           282: 
        !           283: static void msg_flush(void)
        !           284: {
        !           285:        if (am_generator) {
        !           286:                while (msg_queue.head && io_multiplexing_out)
        !           287:                        stats.total_written += flush_a_msg(sock_f_out) + 4;
        !           288:        } else {
        !           289:                while (msg_queue.head)
        !           290:                        (void)flush_a_msg(msg_fd_out);
        !           291:        }
        !           292: }
        !           293: 
        !           294: static void check_for_d_option_error(const char *msg)
        !           295: {
        !           296:        static char rsync263_opts[] = "BCDHIKLPRSTWabceghlnopqrtuvxz";
        !           297:        char *colon;
        !           298:        int saw_d = 0;
        !           299: 
        !           300:        if (*msg != 'r'
        !           301:         || strncmp(msg, REMOTE_OPTION_ERROR, sizeof REMOTE_OPTION_ERROR - 1) != 0)
        !           302:                return;
        !           303: 
        !           304:        msg += sizeof REMOTE_OPTION_ERROR - 1;
        !           305:        if (*msg == '-' || (colon = strchr(msg, ':')) == NULL
        !           306:         || strncmp(colon, REMOTE_OPTION_ERROR2, sizeof REMOTE_OPTION_ERROR2 - 1) != 0)
        !           307:                return;
        !           308: 
        !           309:        for ( ; *msg != ':'; msg++) {
        !           310:                if (*msg == 'd')
        !           311:                        saw_d = 1;
        !           312:                else if (*msg == 'e')
        !           313:                        break;
        !           314:                else if (strchr(rsync263_opts, *msg) == NULL)
        !           315:                        return;
        !           316:        }
        !           317: 
        !           318:        if (saw_d) {
        !           319:                rprintf(FWARNING,
        !           320:                    "*** Try using \"--old-d\" if remote rsync is <= 2.6.3 ***\n");
        !           321:        }
        !           322: }
        !           323: 
        !           324: /* Read a message from the MSG_* fd and handle it.  This is called either
        !           325:  * during the early stages of being a local sender (up through the sending
        !           326:  * of the file list) or when we're the generator (to fetch the messages
        !           327:  * from the receiver). */
        !           328: static void read_msg_fd(void)
        !           329: {
        !           330:        char buf[2048];
        !           331:        size_t n;
        !           332:        struct file_list *flist;
        !           333:        int fd = msg_fd_in;
        !           334:        int tag, len;
        !           335: 
        !           336:        /* Temporarily disable msg_fd_in.  This is needed to avoid looping back
        !           337:         * to this routine from writefd_unbuffered(). */
        !           338:        no_flush++;
        !           339:        msg_fd_in = -1;
        !           340:        defer_forwarding_messages++;
        !           341: 
        !           342:        readfd(fd, buf, 4);
        !           343:        tag = IVAL(buf, 0);
        !           344: 
        !           345:        len = tag & 0xFFFFFF;
        !           346:        tag = (tag >> 24) - MPLEX_BASE;
        !           347: 
        !           348:        switch (tag) {
        !           349:        case MSG_DONE:
        !           350:                if (len < 0 || len > 1 || !am_generator) {
        !           351:                  invalid_msg:
        !           352:                        rprintf(FERROR, "invalid message %d:%d [%s%s]\n",
        !           353:                                tag, len, who_am_i(),
        !           354:                                inc_recurse ? "/inc" : "");
        !           355:                        exit_cleanup(RERR_STREAMIO);
        !           356:                }
        !           357:                if (len) {
        !           358:                        readfd(fd, buf, len);
        !           359:                        stats.total_read = read_varlong(fd, 3);
        !           360:                }
        !           361:                msgdone_cnt++;
        !           362:                break;
        !           363:        case MSG_REDO:
        !           364:                if (len != 4 || !am_generator)
        !           365:                        goto invalid_msg;
        !           366:                readfd(fd, buf, 4);
        !           367:                got_flist_entry_status(FES_REDO, buf);
        !           368:                break;
        !           369:        case MSG_FLIST:
        !           370:                if (len != 4 || !am_generator || !inc_recurse)
        !           371:                        goto invalid_msg;
        !           372:                readfd(fd, buf, 4);
        !           373:                /* Read extra file list from receiver. */
        !           374:                assert(iobuf_in != NULL);
        !           375:                assert(iobuf_f_in == fd);
        !           376:                if (verbose > 3) {
        !           377:                        rprintf(FINFO, "[%s] receiving flist for dir %d\n",
        !           378:                                who_am_i(), IVAL(buf,0));
        !           379:                }
        !           380:                flist = recv_file_list(fd);
        !           381:                flist->parent_ndx = IVAL(buf,0);
        !           382: #ifdef SUPPORT_HARD_LINKS
        !           383:                if (preserve_hard_links)
        !           384:                        match_hard_links(flist);
        !           385: #endif
        !           386:                break;
        !           387:        case MSG_FLIST_EOF:
        !           388:                if (len != 0 || !am_generator || !inc_recurse)
        !           389:                        goto invalid_msg;
        !           390:                flist_eof = 1;
        !           391:                break;
        !           392:        case MSG_IO_ERROR:
        !           393:                if (len != 4)
        !           394:                        goto invalid_msg;
        !           395:                readfd(fd, buf, len);
        !           396:                io_error |= IVAL(buf, 0);
        !           397:                break;
        !           398:        case MSG_DELETED:
        !           399:                if (len >= (int)sizeof buf || !am_generator)
        !           400:                        goto invalid_msg;
        !           401:                readfd(fd, buf, len);
        !           402:                send_msg(MSG_DELETED, buf, len, 1);
        !           403:                break;
        !           404:        case MSG_SUCCESS:
        !           405:                if (len != 4 || !am_generator)
        !           406:                        goto invalid_msg;
        !           407:                readfd(fd, buf, 4);
        !           408:                got_flist_entry_status(FES_SUCCESS, buf);
        !           409:                break;
        !           410:        case MSG_NO_SEND:
        !           411:                if (len != 4 || !am_generator)
        !           412:                        goto invalid_msg;
        !           413:                readfd(fd, buf, 4);
        !           414:                got_flist_entry_status(FES_NO_SEND, buf);
        !           415:                break;
        !           416:        case MSG_ERROR_SOCKET:
        !           417:        case MSG_ERROR_UTF8:
        !           418:        case MSG_CLIENT:
        !           419:                if (!am_generator)
        !           420:                        goto invalid_msg;
        !           421:                if (tag == MSG_ERROR_SOCKET)
        !           422:                        io_end_multiplex_out();
        !           423:                /* FALL THROUGH */
        !           424:        case MSG_INFO:
        !           425:        case MSG_ERROR:
        !           426:        case MSG_ERROR_XFER:
        !           427:        case MSG_WARNING:
        !           428:        case MSG_LOG:
        !           429:                while (len) {
        !           430:                        n = len;
        !           431:                        if (n >= sizeof buf)
        !           432:                                n = sizeof buf - 1;
        !           433:                        readfd(fd, buf, n);
        !           434:                        rwrite((enum logcode)tag, buf, n, !am_generator);
        !           435:                        len -= n;
        !           436:                }
        !           437:                break;
        !           438:        default:
        !           439:                rprintf(FERROR, "unknown message %d:%d [%s]\n",
        !           440:                        tag, len, who_am_i());
        !           441:                exit_cleanup(RERR_STREAMIO);
        !           442:        }
        !           443: 
        !           444:        no_flush--;
        !           445:        msg_fd_in = fd;
        !           446:        if (!--defer_forwarding_messages && !no_flush)
        !           447:                msg_flush();
        !           448: }
        !           449: 
        !           450: /* This is used by the generator to limit how many file transfers can
        !           451:  * be active at once when --remove-source-files is specified.  Without
        !           452:  * this, sender-side deletions were mostly happening at the end. */
        !           453: void increment_active_files(int ndx, int itemizing, enum logcode code)
        !           454: {
        !           455:        while (1) {
        !           456:                /* TODO: tune these limits? */
        !           457:                int limit = active_bytecnt >= 128*1024 ? 10 : 50;
        !           458:                if (active_filecnt < limit)
        !           459:                        break;
        !           460:                check_for_finished_files(itemizing, code, 0);
        !           461:                if (active_filecnt < limit)
        !           462:                        break;
        !           463:                if (iobuf_out_cnt)
        !           464:                        io_flush(NORMAL_FLUSH);
        !           465:                else
        !           466:                        read_msg_fd();
        !           467:        }
        !           468: 
        !           469:        active_filecnt++;
        !           470:        active_bytecnt += F_LENGTH(cur_flist->files[ndx - cur_flist->ndx_start]);
        !           471: }
        !           472: 
        !           473: /* Write an message to a multiplexed stream. If this fails, rsync exits. */
        !           474: static void mplex_write(int fd, enum msgcode code, const char *buf, size_t len, int convert)
        !           475: {
        !           476:        char buffer[BIGPATHBUFLEN]; /* Oversized for use by iconv code. */
        !           477:        size_t n = len;
        !           478: 
        !           479: #ifdef ICONV_OPTION
        !           480:        /* We need to convert buf before doing anything else so that we
        !           481:         * can include the (converted) byte length in the message header. */
        !           482:        if (convert && ic_send != (iconv_t)-1) {
        !           483:                xbuf outbuf, inbuf;
        !           484: 
        !           485:                INIT_XBUF(outbuf, buffer + 4, 0, sizeof buffer - 4);
        !           486:                INIT_XBUF(inbuf, (char*)buf, len, -1);
        !           487: 
        !           488:                iconvbufs(ic_send, &inbuf, &outbuf,
        !           489:                          ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE);
        !           490:                if (inbuf.len > 0) {
        !           491:                        rprintf(FERROR, "overflowed conversion buffer in mplex_write");
        !           492:                        exit_cleanup(RERR_UNSUPPORTED);
        !           493:                }
        !           494: 
        !           495:                n = len = outbuf.len;
        !           496:        } else
        !           497: #endif
        !           498:        if (n > 1024 - 4) /* BIGPATHBUFLEN can handle 1024 bytes */
        !           499:                n = 0;    /* We'd rather do 2 writes than too much memcpy(). */
        !           500:        else
        !           501:                memcpy(buffer + 4, buf, n);
        !           502: 
        !           503:        SIVAL(buffer, 0, ((MPLEX_BASE + (int)code)<<24) + len);
        !           504: 
        !           505:        keep_defer_forwarding++; /* defer_forwarding_messages++ on return */
        !           506:        writefd_unbuffered(fd, buffer, n+4);
        !           507:        keep_defer_forwarding--;
        !           508: 
        !           509:        if (len > n)
        !           510:                writefd_unbuffered(fd, buf+n, len-n);
        !           511: 
        !           512:        if (!--defer_forwarding_messages && !no_flush)
        !           513:                msg_flush();
        !           514: }
        !           515: 
        !           516: int send_msg(enum msgcode code, const char *buf, int len, int convert)
        !           517: {
        !           518:        if (msg_fd_out < 0) {
        !           519:                if (!defer_forwarding_messages)
        !           520:                        return io_multiplex_write(code, buf, len, convert);
        !           521:                if (!io_multiplexing_out)
        !           522:                        return 0;
        !           523:                msg_list_add(&msg_queue, code, buf, len, convert);
        !           524:                return 1;
        !           525:        }
        !           526:        if (flist_forward_from >= 0)
        !           527:                msg_list_add(&msg_queue, code, buf, len, convert);
        !           528:        else
        !           529:                mplex_write(msg_fd_out, code, buf, len, convert);
        !           530:        return 1;
        !           531: }
        !           532: 
        !           533: void send_msg_int(enum msgcode code, int num)
        !           534: {
        !           535:        char numbuf[4];
        !           536:        SIVAL(numbuf, 0, num);
        !           537:        send_msg(code, numbuf, 4, 0);
        !           538: }
        !           539: 
        !           540: void wait_for_receiver(void)
        !           541: {
        !           542:        if (io_flush(NORMAL_FLUSH))
        !           543:                return;
        !           544:        read_msg_fd();
        !           545: }
        !           546: 
        !           547: int get_redo_num(void)
        !           548: {
        !           549:        return flist_ndx_pop(&redo_list);
        !           550: }
        !           551: 
        !           552: int get_hlink_num(void)
        !           553: {
        !           554:        return flist_ndx_pop(&hlink_list);
        !           555: }
        !           556: 
        !           557: /**
        !           558:  * When we're the receiver and we have a local --files-from list of names
        !           559:  * that needs to be sent over the socket to the sender, we have to do two
        !           560:  * things at the same time: send the sender a list of what files we're
        !           561:  * processing and read the incoming file+info list from the sender.  We do
        !           562:  * this by augmenting the read_timeout() function to copy this data.  It
        !           563:  * uses ff_buf to read a block of data from f_in (when it is ready, since
        !           564:  * it might be a pipe) and then blast it out f_out (when it is ready to
        !           565:  * receive more data).
        !           566:  */
        !           567: void io_set_filesfrom_fds(int f_in, int f_out)
        !           568: {
        !           569:        io_filesfrom_f_in = f_in;
        !           570:        io_filesfrom_f_out = f_out;
        !           571:        alloc_xbuf(&ff_buf, 2048);
        !           572: #ifdef ICONV_OPTION
        !           573:        if (protect_args)
        !           574:                alloc_xbuf(&iconv_buf, 1024);
        !           575: #endif
        !           576: }
        !           577: 
        !           578: /* It's almost always an error to get an EOF when we're trying to read from the
        !           579:  * network, because the protocol is (for the most part) self-terminating.
        !           580:  *
        !           581:  * There is one case for the receiver when it is at the end of the transfer
        !           582:  * (hanging around reading any keep-alive packets that might come its way): if
        !           583:  * the sender dies before the generator's kill-signal comes through, we can end
        !           584:  * up here needing to loop until the kill-signal arrives.  In this situation,
        !           585:  * kluge_around_eof will be < 0.
        !           586:  *
        !           587:  * There is another case for older protocol versions (< 24) where the module
        !           588:  * listing was not terminated, so we must ignore an EOF error in that case and
        !           589:  * exit.  In this situation, kluge_around_eof will be > 0. */
        !           590: static void whine_about_eof(int fd)
        !           591: {
        !           592:        if (kluge_around_eof && fd == sock_f_in) {
        !           593:                int i;
        !           594:                if (kluge_around_eof > 0)
        !           595:                        exit_cleanup(0);
        !           596:                /* If we're still here after 10 seconds, exit with an error. */
        !           597:                for (i = 10*1000/20; i--; )
        !           598:                        msleep(20);
        !           599:        }
        !           600: 
        !           601:        rprintf(FERROR, RSYNC_NAME ": connection unexpectedly closed "
        !           602:                "(%.0f bytes received so far) [%s]\n",
        !           603:                (double)stats.total_read, who_am_i());
        !           604: 
        !           605:        exit_cleanup(RERR_STREAMIO);
        !           606: }
        !           607: 
        !           608: /**
        !           609:  * Read from a socket with I/O timeout. return the number of bytes
        !           610:  * read. If no bytes can be read then exit, never return a number <= 0.
        !           611:  *
        !           612:  * TODO: If the remote shell connection fails, then current versions
        !           613:  * actually report an "unexpected EOF" error here.  Since it's a
        !           614:  * fairly common mistake to try to use rsh when ssh is required, we
        !           615:  * should trap that: if we fail to read any data at all, we should
        !           616:  * give a better explanation.  We can tell whether the connection has
        !           617:  * started by looking e.g. at whether the remote version is known yet.
        !           618:  */
        !           619: static int read_timeout(int fd, char *buf, size_t len)
        !           620: {
        !           621:        int n, cnt = 0;
        !           622: 
        !           623:        io_flush(FULL_FLUSH);
        !           624: 
        !           625:        while (cnt == 0) {
        !           626:                /* until we manage to read *something* */
        !           627:                fd_set r_fds, w_fds;
        !           628:                struct timeval tv;
        !           629:                int maxfd = fd;
        !           630:                int count;
        !           631: 
        !           632:                FD_ZERO(&r_fds);
        !           633:                FD_ZERO(&w_fds);
        !           634:                FD_SET(fd, &r_fds);
        !           635:                if (io_filesfrom_f_out >= 0) {
        !           636:                        int new_fd;
        !           637:                        if (ff_buf.len == 0) {
        !           638:                                if (io_filesfrom_f_in >= 0) {
        !           639:                                        FD_SET(io_filesfrom_f_in, &r_fds);
        !           640:                                        new_fd = io_filesfrom_f_in;
        !           641:                                } else {
        !           642:                                        io_filesfrom_f_out = -1;
        !           643:                                        new_fd = -1;
        !           644:                                }
        !           645:                        } else {
        !           646:                                FD_SET(io_filesfrom_f_out, &w_fds);
        !           647:                                new_fd = io_filesfrom_f_out;
        !           648:                        }
        !           649:                        if (new_fd > maxfd)
        !           650:                                maxfd = new_fd;
        !           651:                }
        !           652: 
        !           653:                tv.tv_sec = select_timeout;
        !           654:                tv.tv_usec = 0;
        !           655: 
        !           656:                errno = 0;
        !           657: 
        !           658:                count = select(maxfd + 1, &r_fds, &w_fds, NULL, &tv);
        !           659: 
        !           660:                if (count <= 0) {
        !           661:                        if (errno == EBADF) {
        !           662:                                defer_forwarding_messages = 0;
        !           663:                                exit_cleanup(RERR_SOCKETIO);
        !           664:                        }
        !           665:                        check_timeout();
        !           666:                        continue;
        !           667:                }
        !           668: 
        !           669:                if (io_filesfrom_f_out >= 0) {
        !           670:                        if (ff_buf.len) {
        !           671:                                if (FD_ISSET(io_filesfrom_f_out, &w_fds)) {
        !           672:                                        int l = write(io_filesfrom_f_out,
        !           673:                                                      ff_buf.buf + ff_buf.pos,
        !           674:                                                      ff_buf.len);
        !           675:                                        if (l > 0) {
        !           676:                                                if (!(ff_buf.len -= l))
        !           677:                                                        ff_buf.pos = 0;
        !           678:                                                else
        !           679:                                                        ff_buf.pos += l;
        !           680:                                        } else if (errno != EINTR) {
        !           681:                                                /* XXX should we complain? */
        !           682:                                                io_filesfrom_f_out = -1;
        !           683:                                        }
        !           684:                                }
        !           685:                        } else if (io_filesfrom_f_in >= 0) {
        !           686:                                if (FD_ISSET(io_filesfrom_f_in, &r_fds)) {
        !           687: #ifdef ICONV_OPTION
        !           688:                                        xbuf *ibuf = filesfrom_convert ? &iconv_buf : &ff_buf;
        !           689: #else
        !           690:                                        xbuf *ibuf = &ff_buf;
        !           691: #endif
        !           692:                                        int l = read(io_filesfrom_f_in, ibuf->buf, ibuf->size);
        !           693:                                        if (l <= 0) {
        !           694:                                                if (l == 0 || errno != EINTR) {
        !           695:                                                        /* Send end-of-file marker */
        !           696:                                                        memcpy(ff_buf.buf, "\0\0", 2);
        !           697:                                                        ff_buf.len = ff_lastchar? 2 : 1;
        !           698:                                                        ff_buf.pos = 0;
        !           699:                                                        io_filesfrom_f_in = -1;
        !           700:                                                }
        !           701:                                        } else {
        !           702: #ifdef ICONV_OPTION
        !           703:                                                if (filesfrom_convert) {
        !           704:                                                        iconv_buf.pos = 0;
        !           705:                                                        iconv_buf.len = l;
        !           706:                                                        iconvbufs(ic_send, &iconv_buf, &ff_buf,
        !           707:                                                            ICB_EXPAND_OUT|ICB_INCLUDE_BAD|ICB_INCLUDE_INCOMPLETE);
        !           708:                                                        l = ff_buf.len;
        !           709:                                                }
        !           710: #endif
        !           711:                                                if (!eol_nulls) {
        !           712:                                                        char *s = ff_buf.buf + l;
        !           713:                                                        /* Transform CR and/or LF into '\0' */
        !           714:                                                        while (s-- > ff_buf.buf) {
        !           715:                                                                if (*s == '\n' || *s == '\r')
        !           716:                                                                        *s = '\0';
        !           717:                                                        }
        !           718:                                                }
        !           719:                                                if (!ff_lastchar) {
        !           720:                                                        /* Last buf ended with a '\0', so don't
        !           721:                                                         * let this buf start with one. */
        !           722:                                                        while (l && ff_buf.buf[ff_buf.pos] == '\0')
        !           723:                                                                ff_buf.pos++, l--;
        !           724:                                                }
        !           725:                                                if (!l)
        !           726:                                                        ff_buf.pos = 0;
        !           727:                                                else {
        !           728:                                                        char *f = ff_buf.buf + ff_buf.pos;
        !           729:                                                        char *t = f;
        !           730:                                                        char *eob = f + l;
        !           731:                                                        /* Eliminate any multi-'\0' runs. */
        !           732:                                                        while (f != eob) {
        !           733:                                                                if (!(*t++ = *f++)) {
        !           734:                                                                        while (f != eob && !*f)
        !           735:                                                                                f++, l--;
        !           736:                                                                }
        !           737:                                                        }
        !           738:                                                        ff_lastchar = f[-1];
        !           739:                                                }
        !           740:                                                ff_buf.len = l;
        !           741:                                        }
        !           742:                                }
        !           743:                        }
        !           744:                }
        !           745: 
        !           746:                if (!FD_ISSET(fd, &r_fds))
        !           747:                        continue;
        !           748: 
        !           749:                n = read(fd, buf, len);
        !           750: 
        !           751:                if (n <= 0) {
        !           752:                        if (n == 0)
        !           753:                                whine_about_eof(fd); /* Doesn't return. */
        !           754:                        if (errno == EINTR || errno == EWOULDBLOCK
        !           755:                            || errno == EAGAIN)
        !           756:                                continue;
        !           757: 
        !           758:                        /* Don't write errors on a dead socket. */
        !           759:                        if (fd == sock_f_in) {
        !           760:                                io_end_multiplex_out();
        !           761:                                rsyserr(FERROR_SOCKET, errno, "read error");
        !           762:                        } else
        !           763:                                rsyserr(FERROR, errno, "read error");
        !           764:                        exit_cleanup(RERR_STREAMIO);
        !           765:                }
        !           766: 
        !           767:                buf += n;
        !           768:                len -= n;
        !           769:                cnt += n;
        !           770: 
        !           771:                if (fd == sock_f_in && io_timeout)
        !           772:                        last_io_in = time(NULL);
        !           773:        }
        !           774: 
        !           775:        return cnt;
        !           776: }
        !           777: 
        !           778: /* Read a line into the "buf" buffer. */
        !           779: int read_line(int fd, char *buf, size_t bufsiz, int flags)
        !           780: {
        !           781:        char ch, *s, *eob;
        !           782:        int cnt;
        !           783: 
        !           784: #ifdef ICONV_OPTION
        !           785:        if (flags & RL_CONVERT && iconv_buf.size < bufsiz)
        !           786:                realloc_xbuf(&iconv_buf, bufsiz + 1024);
        !           787: #endif
        !           788: 
        !           789:   start:
        !           790: #ifdef ICONV_OPTION
        !           791:        s = flags & RL_CONVERT ? iconv_buf.buf : buf;
        !           792: #else
        !           793:        s = buf;
        !           794: #endif
        !           795:        eob = s + bufsiz - 1;
        !           796:        while (1) {
        !           797:                cnt = read(fd, &ch, 1);
        !           798:                if (cnt < 0 && (errno == EWOULDBLOCK
        !           799:                  || errno == EINTR || errno == EAGAIN)) {
        !           800:                        struct timeval tv;
        !           801:                        fd_set r_fds, e_fds;
        !           802:                        FD_ZERO(&r_fds);
        !           803:                        FD_SET(fd, &r_fds);
        !           804:                        FD_ZERO(&e_fds);
        !           805:                        FD_SET(fd, &e_fds);
        !           806:                        tv.tv_sec = select_timeout;
        !           807:                        tv.tv_usec = 0;
        !           808:                        if (!select(fd+1, &r_fds, NULL, &e_fds, &tv))
        !           809:                                check_timeout();
        !           810:                        /*if (FD_ISSET(fd, &e_fds))
        !           811:                                rprintf(FINFO, "select exception on fd %d\n", fd); */
        !           812:                        continue;
        !           813:                }
        !           814:                if (cnt != 1)
        !           815:                        break;
        !           816:                if (flags & RL_EOL_NULLS ? ch == '\0' : (ch == '\r' || ch == '\n')) {
        !           817:                        /* Skip empty lines if dumping comments. */
        !           818:                        if (flags & RL_DUMP_COMMENTS && s == buf)
        !           819:                                continue;
        !           820:                        break;
        !           821:                }
        !           822:                if (s < eob)
        !           823:                        *s++ = ch;
        !           824:        }
        !           825:        *s = '\0';
        !           826: 
        !           827:        if (flags & RL_DUMP_COMMENTS && (*buf == '#' || *buf == ';'))
        !           828:                goto start;
        !           829: 
        !           830: #ifdef ICONV_OPTION
        !           831:        if (flags & RL_CONVERT) {
        !           832:                xbuf outbuf;
        !           833:                INIT_XBUF(outbuf, buf, 0, bufsiz);
        !           834:                iconv_buf.pos = 0;
        !           835:                iconv_buf.len = s - iconv_buf.buf;
        !           836:                iconvbufs(ic_recv, &iconv_buf, &outbuf,
        !           837:                          ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE);
        !           838:                outbuf.buf[outbuf.len] = '\0';
        !           839:                return outbuf.len;
        !           840:        }
        !           841: #endif
        !           842: 
        !           843:        return s - buf;
        !           844: }
        !           845: 
        !           846: void read_args(int f_in, char *mod_name, char *buf, size_t bufsiz, int rl_nulls,
        !           847:               char ***argv_p, int *argc_p, char **request_p)
        !           848: {
        !           849:        int maxargs = MAX_ARGS;
        !           850:        int dot_pos = 0;
        !           851:        int argc = 0;
        !           852:        char **argv, *p;
        !           853:        int rl_flags = (rl_nulls ? RL_EOL_NULLS : 0);
        !           854: 
        !           855: #ifdef ICONV_OPTION
        !           856:        rl_flags |= (protect_args && ic_recv != (iconv_t)-1 ? RL_CONVERT : 0);
        !           857: #endif
        !           858: 
        !           859:        if (!(argv = new_array(char *, maxargs)))
        !           860:                out_of_memory("read_args");
        !           861:        if (mod_name && !protect_args)
        !           862:                argv[argc++] = "rsyncd";
        !           863: 
        !           864:        while (1) {
        !           865:                if (read_line(f_in, buf, bufsiz, rl_flags) == 0)
        !           866:                        break;
        !           867: 
        !           868:                if (argc == maxargs-1) {
        !           869:                        maxargs += MAX_ARGS;
        !           870:                        if (!(argv = realloc_array(argv, char *, maxargs)))
        !           871:                                out_of_memory("read_args");
        !           872:                }
        !           873: 
        !           874:                if (dot_pos) {
        !           875:                        if (request_p) {
        !           876:                                *request_p = strdup(buf);
        !           877:                                request_p = NULL;
        !           878:                        }
        !           879:                        if (mod_name)
        !           880:                                glob_expand_module(mod_name, buf, &argv, &argc, &maxargs);
        !           881:                        else
        !           882:                                glob_expand(buf, &argv, &argc, &maxargs);
        !           883:                } else {
        !           884:                        if (!(p = strdup(buf)))
        !           885:                                out_of_memory("read_args");
        !           886:                        argv[argc++] = p;
        !           887:                        if (*p == '.' && p[1] == '\0')
        !           888:                                dot_pos = argc;
        !           889:                }
        !           890:        }
        !           891:        argv[argc] = NULL;
        !           892: 
        !           893:        glob_expand(NULL, NULL, NULL, NULL);
        !           894: 
        !           895:        *argc_p = argc;
        !           896:        *argv_p = argv;
        !           897: }
        !           898: 
        !           899: int io_start_buffering_out(int f_out)
        !           900: {
        !           901:        if (iobuf_out) {
        !           902:                assert(f_out == iobuf_f_out);
        !           903:                return 0;
        !           904:        }
        !           905:        if (!(iobuf_out = new_array(char, IO_BUFFER_SIZE)))
        !           906:                out_of_memory("io_start_buffering_out");
        !           907:        iobuf_out_cnt = 0;
        !           908:        iobuf_f_out = f_out;
        !           909:        return 1;
        !           910: }
        !           911: 
        !           912: int io_start_buffering_in(int f_in)
        !           913: {
        !           914:        if (iobuf_in) {
        !           915:                assert(f_in == iobuf_f_in);
        !           916:                return 0;
        !           917:        }
        !           918:        iobuf_in_siz = 2 * IO_BUFFER_SIZE;
        !           919:        if (!(iobuf_in = new_array(char, iobuf_in_siz)))
        !           920:                out_of_memory("io_start_buffering_in");
        !           921:        iobuf_f_in = f_in;
        !           922:        return 1;
        !           923: }
        !           924: 
        !           925: void io_end_buffering_in(void)
        !           926: {
        !           927:        if (!iobuf_in)
        !           928:                return;
        !           929:        free(iobuf_in);
        !           930:        iobuf_in = NULL;
        !           931:        iobuf_in_ndx = 0;
        !           932:        iobuf_in_remaining = 0;
        !           933:        iobuf_f_in = -1;
        !           934: }
        !           935: 
        !           936: void io_end_buffering_out(void)
        !           937: {
        !           938:        if (!iobuf_out)
        !           939:                return;
        !           940:        io_flush(FULL_FLUSH);
        !           941:        free(iobuf_out);
        !           942:        iobuf_out = NULL;
        !           943:        iobuf_f_out = -1;
        !           944: }
        !           945: 
        !           946: void maybe_flush_socket(int important)
        !           947: {
        !           948:        if (iobuf_out && iobuf_out_cnt
        !           949:         && (important || time(NULL) - last_io_out >= 5))
        !           950:                io_flush(NORMAL_FLUSH);
        !           951: }
        !           952: 
        !           953: void maybe_send_keepalive(void)
        !           954: {
        !           955:        if (time(NULL) - last_io_out >= allowed_lull) {
        !           956:                if (!iobuf_out || !iobuf_out_cnt) {
        !           957:                        if (protocol_version < 29)
        !           958:                                send_msg(MSG_DATA, "", 0, 0);
        !           959:                        else if (protocol_version >= 30)
        !           960:                                send_msg(MSG_NOOP, "", 0, 0);
        !           961:                        else {
        !           962:                                write_int(sock_f_out, cur_flist->used);
        !           963:                                write_shortint(sock_f_out, ITEM_IS_NEW);
        !           964:                        }
        !           965:                }
        !           966:                if (iobuf_out)
        !           967:                        io_flush(NORMAL_FLUSH);
        !           968:        }
        !           969: }
        !           970: 
        !           971: void start_flist_forward(int f_in)
        !           972: {
        !           973:        assert(iobuf_out != NULL);
        !           974:        assert(iobuf_f_out == msg_fd_out);
        !           975:        flist_forward_from = f_in;
        !           976:        defer_forwarding_messages++;
        !           977: }
        !           978: 
        !           979: void stop_flist_forward(void)
        !           980: {
        !           981:        flist_forward_from = -1;
        !           982:        defer_forwarding_messages--;
        !           983:        io_flush(FULL_FLUSH);
        !           984: }
        !           985: 
        !           986: /**
        !           987:  * Continue trying to read len bytes - don't return until len has been
        !           988:  * read.
        !           989:  **/
        !           990: static void read_loop(int fd, char *buf, size_t len)
        !           991: {
        !           992:        while (len) {
        !           993:                int n = read_timeout(fd, buf, len);
        !           994: 
        !           995:                buf += n;
        !           996:                len -= n;
        !           997:        }
        !           998: }
        !           999: 
        !          1000: /**
        !          1001:  * Read from the file descriptor handling multiplexing - return number
        !          1002:  * of bytes read.
        !          1003:  *
        !          1004:  * Never returns <= 0.
        !          1005:  */
        !          1006: static int readfd_unbuffered(int fd, char *buf, size_t len)
        !          1007: {
        !          1008:        size_t msg_bytes;
        !          1009:        int tag, cnt = 0;
        !          1010:        char line[BIGPATHBUFLEN];
        !          1011: 
        !          1012:        if (!iobuf_in || fd != iobuf_f_in)
        !          1013:                return read_timeout(fd, buf, len);
        !          1014: 
        !          1015:        if (!io_multiplexing_in && iobuf_in_remaining == 0) {
        !          1016:                iobuf_in_remaining = read_timeout(fd, iobuf_in, iobuf_in_siz);
        !          1017:                iobuf_in_ndx = 0;
        !          1018:        }
        !          1019: 
        !          1020:        while (cnt == 0) {
        !          1021:                if (iobuf_in_remaining) {
        !          1022:                        len = MIN(len, iobuf_in_remaining);
        !          1023:                        memcpy(buf, iobuf_in + iobuf_in_ndx, len);
        !          1024:                        iobuf_in_ndx += len;
        !          1025:                        iobuf_in_remaining -= len;
        !          1026:                        cnt = len;
        !          1027:                        break;
        !          1028:                }
        !          1029: 
        !          1030:                read_loop(fd, line, 4);
        !          1031:                tag = IVAL(line, 0);
        !          1032: 
        !          1033:                msg_bytes = tag & 0xFFFFFF;
        !          1034:                tag = (tag >> 24) - MPLEX_BASE;
        !          1035: 
        !          1036:                switch (tag) {
        !          1037:                case MSG_DATA:
        !          1038:                        if (msg_bytes > iobuf_in_siz) {
        !          1039:                                if (!(iobuf_in = realloc_array(iobuf_in, char,
        !          1040:                                                               msg_bytes)))
        !          1041:                                        out_of_memory("readfd_unbuffered");
        !          1042:                                iobuf_in_siz = msg_bytes;
        !          1043:                        }
        !          1044:                        read_loop(fd, iobuf_in, msg_bytes);
        !          1045:                        iobuf_in_remaining = msg_bytes;
        !          1046:                        iobuf_in_ndx = 0;
        !          1047:                        break;
        !          1048:                case MSG_NOOP:
        !          1049:                        if (msg_bytes != 0)
        !          1050:                                goto invalid_msg;
        !          1051:                        if (am_sender)
        !          1052:                                maybe_send_keepalive();
        !          1053:                        break;
        !          1054:                case MSG_IO_ERROR:
        !          1055:                        if (msg_bytes != 4)
        !          1056:                                goto invalid_msg;
        !          1057:                        read_loop(fd, line, msg_bytes);
        !          1058:                        send_msg_int(MSG_IO_ERROR, IVAL(line, 0));
        !          1059:                        io_error |= IVAL(line, 0);
        !          1060:                        break;
        !          1061:                case MSG_DELETED:
        !          1062:                        if (msg_bytes >= sizeof line)
        !          1063:                                goto overflow;
        !          1064: #ifdef ICONV_OPTION
        !          1065:                        if (ic_recv != (iconv_t)-1) {
        !          1066:                                xbuf outbuf, inbuf;
        !          1067:                                char ibuf[512];
        !          1068:                                int add_null = 0;
        !          1069: 
        !          1070:                                INIT_CONST_XBUF(outbuf, line);
        !          1071:                                INIT_XBUF(inbuf, ibuf, 0, -1);
        !          1072: 
        !          1073:                                while (msg_bytes) {
        !          1074:                                        inbuf.len = msg_bytes > sizeof ibuf
        !          1075:                                                  ? sizeof ibuf : msg_bytes;
        !          1076:                                        read_loop(fd, inbuf.buf, inbuf.len);
        !          1077:                                        if (!(msg_bytes -= inbuf.len)
        !          1078:                                         && !ibuf[inbuf.len-1])
        !          1079:                                                inbuf.len--, add_null = 1;
        !          1080:                                        if (iconvbufs(ic_send, &inbuf, &outbuf,
        !          1081:                                            ICB_INCLUDE_BAD | ICB_INCLUDE_INCOMPLETE) < 0)
        !          1082:                                                goto overflow;
        !          1083:                                }
        !          1084:                                if (add_null) {
        !          1085:                                        if (outbuf.len == outbuf.size)
        !          1086:                                                goto overflow;
        !          1087:                                        outbuf.buf[outbuf.len++] = '\0';
        !          1088:                                }
        !          1089:                                msg_bytes = outbuf.len;
        !          1090:                        } else
        !          1091: #endif
        !          1092:                                read_loop(fd, line, msg_bytes);
        !          1093:                        /* A directory name was sent with the trailing null */
        !          1094:                        if (msg_bytes > 0 && !line[msg_bytes-1])
        !          1095:                                log_delete(line, S_IFDIR);
        !          1096:                        else {
        !          1097:                                line[msg_bytes] = '\0';
        !          1098:                                log_delete(line, S_IFREG);
        !          1099:                        }
        !          1100:                        break;
        !          1101:                case MSG_SUCCESS:
        !          1102:                        if (msg_bytes != 4) {
        !          1103:                          invalid_msg:
        !          1104:                                rprintf(FERROR, "invalid multi-message %d:%ld [%s]\n",
        !          1105:                                        tag, (long)msg_bytes, who_am_i());
        !          1106:                                exit_cleanup(RERR_STREAMIO);
        !          1107:                        }
        !          1108:                        read_loop(fd, line, msg_bytes);
        !          1109:                        successful_send(IVAL(line, 0));
        !          1110:                        break;
        !          1111:                case MSG_NO_SEND:
        !          1112:                        if (msg_bytes != 4)
        !          1113:                                goto invalid_msg;
        !          1114:                        read_loop(fd, line, msg_bytes);
        !          1115:                        send_msg_int(MSG_NO_SEND, IVAL(line, 0));
        !          1116:                        break;
        !          1117:                case MSG_INFO:
        !          1118:                case MSG_ERROR:
        !          1119:                case MSG_ERROR_XFER:
        !          1120:                case MSG_WARNING:
        !          1121:                        if (msg_bytes >= sizeof line) {
        !          1122:                            overflow:
        !          1123:                                rprintf(FERROR,
        !          1124:                                        "multiplexing overflow %d:%ld [%s]\n",
        !          1125:                                        tag, (long)msg_bytes, who_am_i());
        !          1126:                                exit_cleanup(RERR_STREAMIO);
        !          1127:                        }
        !          1128:                        read_loop(fd, line, msg_bytes);
        !          1129:                        rwrite((enum logcode)tag, line, msg_bytes, 1);
        !          1130:                        if (first_message) {
        !          1131:                                if (list_only && !am_sender && tag == 1) {
        !          1132:                                        line[msg_bytes] = '\0';
        !          1133:                                        check_for_d_option_error(line);
        !          1134:                                }
        !          1135:                                first_message = 0;
        !          1136:                        }
        !          1137:                        break;
        !          1138:                default:
        !          1139:                        rprintf(FERROR, "unexpected tag %d [%s]\n",
        !          1140:                                tag, who_am_i());
        !          1141:                        exit_cleanup(RERR_STREAMIO);
        !          1142:                }
        !          1143:        }
        !          1144: 
        !          1145:        if (iobuf_in_remaining == 0)
        !          1146:                io_flush(NORMAL_FLUSH);
        !          1147: 
        !          1148:        return cnt;
        !          1149: }
        !          1150: 
        !          1151: /* Do a buffered read from fd.  Don't return until all N bytes have
        !          1152:  * been read.  If all N can't be read then exit with an error. */
        !          1153: static void readfd(int fd, char *buffer, size_t N)
        !          1154: {
        !          1155:        int  cnt;
        !          1156:        size_t total = 0;
        !          1157: 
        !          1158:        while (total < N) {
        !          1159:                cnt = readfd_unbuffered(fd, buffer + total, N-total);
        !          1160:                total += cnt;
        !          1161:        }
        !          1162: 
        !          1163:        if (fd == write_batch_monitor_in) {
        !          1164:                if ((size_t)write(batch_fd, buffer, total) != total)
        !          1165:                        exit_cleanup(RERR_FILEIO);
        !          1166:        }
        !          1167: 
        !          1168:        if (fd == flist_forward_from)
        !          1169:                writefd(iobuf_f_out, buffer, total);
        !          1170: 
        !          1171:        if (fd == sock_f_in)
        !          1172:                stats.total_read += total;
        !          1173: }
        !          1174: 
        !          1175: unsigned short read_shortint(int f)
        !          1176: {
        !          1177:        char b[2];
        !          1178:        readfd(f, b, 2);
        !          1179:        return (UVAL(b, 1) << 8) + UVAL(b, 0);
        !          1180: }
        !          1181: 
        !          1182: int32 read_int(int f)
        !          1183: {
        !          1184:        char b[4];
        !          1185:        int32 num;
        !          1186: 
        !          1187:        readfd(f, b, 4);
        !          1188:        num = IVAL(b, 0);
        !          1189: #if SIZEOF_INT32 > 4
        !          1190:        if (num & (int32)0x80000000)
        !          1191:                num |= ~(int32)0xffffffff;
        !          1192: #endif
        !          1193:        return num;
        !          1194: }
        !          1195: 
        !          1196: int32 read_varint(int f)
        !          1197: {
        !          1198:        union {
        !          1199:            char b[5];
        !          1200:            int32 x;
        !          1201:        } u;
        !          1202:        uchar ch;
        !          1203:        int extra;
        !          1204: 
        !          1205:        u.x = 0;
        !          1206:        readfd(f, (char*)&ch, 1);
        !          1207:        extra = int_byte_extra[ch / 4];
        !          1208:        if (extra) {
        !          1209:                uchar bit = ((uchar)1<<(8-extra));
        !          1210:                if (extra >= (int)sizeof u.b) {
        !          1211:                        rprintf(FERROR, "Overflow in read_varint()\n");
        !          1212:                        exit_cleanup(RERR_STREAMIO);
        !          1213:                }
        !          1214:                readfd(f, u.b, extra);
        !          1215:                u.b[extra] = ch & (bit-1);
        !          1216:        } else
        !          1217:                u.b[0] = ch;
        !          1218: #if CAREFUL_ALIGNMENT
        !          1219:        u.x = IVAL(u.b,0);
        !          1220: #endif
        !          1221: #if SIZEOF_INT32 > 4
        !          1222:        if (u.x & (int32)0x80000000)
        !          1223:                u.x |= ~(int32)0xffffffff;
        !          1224: #endif
        !          1225:        return u.x;
        !          1226: }
        !          1227: 
        !          1228: int64 read_varlong(int f, uchar min_bytes)
        !          1229: {
        !          1230:        union {
        !          1231:            char b[9];
        !          1232:            int64 x;
        !          1233:        } u;
        !          1234:        char b2[8];
        !          1235:        int extra;
        !          1236: 
        !          1237: #if SIZEOF_INT64 < 8
        !          1238:        memset(u.b, 0, 8);
        !          1239: #else
        !          1240:        u.x = 0;
        !          1241: #endif
        !          1242:        readfd(f, b2, min_bytes);
        !          1243:        memcpy(u.b, b2+1, min_bytes-1);
        !          1244:        extra = int_byte_extra[CVAL(b2, 0) / 4];
        !          1245:        if (extra) {
        !          1246:                uchar bit = ((uchar)1<<(8-extra));
        !          1247:                if (min_bytes + extra > (int)sizeof u.b) {
        !          1248:                        rprintf(FERROR, "Overflow in read_varlong()\n");
        !          1249:                        exit_cleanup(RERR_STREAMIO);
        !          1250:                }
        !          1251:                readfd(f, u.b + min_bytes - 1, extra);
        !          1252:                u.b[min_bytes + extra - 1] = CVAL(b2, 0) & (bit-1);
        !          1253: #if SIZEOF_INT64 < 8
        !          1254:                if (min_bytes + extra > 5 || u.b[4] || CVAL(u.b,3) & 0x80) {
        !          1255:                        rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
        !          1256:                        exit_cleanup(RERR_UNSUPPORTED);
        !          1257:                }
        !          1258: #endif
        !          1259:        } else
        !          1260:                u.b[min_bytes + extra - 1] = CVAL(b2, 0);
        !          1261: #if SIZEOF_INT64 < 8
        !          1262:        u.x = IVAL(u.b,0);
        !          1263: #elif CAREFUL_ALIGNMENT
        !          1264:        u.x = IVAL(u.b,0) | (((int64)IVAL(u.b,4))<<32);
        !          1265: #endif
        !          1266:        return u.x;
        !          1267: }
        !          1268: 
        !          1269: int64 read_longint(int f)
        !          1270: {
        !          1271: #if SIZEOF_INT64 >= 8
        !          1272:        char b[9];
        !          1273: #endif
        !          1274:        int32 num = read_int(f);
        !          1275: 
        !          1276:        if (num != (int32)0xffffffff)
        !          1277:                return num;
        !          1278: 
        !          1279: #if SIZEOF_INT64 < 8
        !          1280:        rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
        !          1281:        exit_cleanup(RERR_UNSUPPORTED);
        !          1282: #else
        !          1283:        readfd(f, b, 8);
        !          1284:        return IVAL(b,0) | (((int64)IVAL(b,4))<<32);
        !          1285: #endif
        !          1286: }
        !          1287: 
        !          1288: void read_buf(int f, char *buf, size_t len)
        !          1289: {
        !          1290:        readfd(f,buf,len);
        !          1291: }
        !          1292: 
        !          1293: void read_sbuf(int f, char *buf, size_t len)
        !          1294: {
        !          1295:        readfd(f, buf, len);
        !          1296:        buf[len] = '\0';
        !          1297: }
        !          1298: 
        !          1299: uchar read_byte(int f)
        !          1300: {
        !          1301:        uchar c;
        !          1302:        readfd(f, (char *)&c, 1);
        !          1303:        return c;
        !          1304: }
        !          1305: 
        !          1306: int read_vstring(int f, char *buf, int bufsize)
        !          1307: {
        !          1308:        int len = read_byte(f);
        !          1309: 
        !          1310:        if (len & 0x80)
        !          1311:                len = (len & ~0x80) * 0x100 + read_byte(f);
        !          1312: 
        !          1313:        if (len >= bufsize) {
        !          1314:                rprintf(FERROR, "over-long vstring received (%d > %d)\n",
        !          1315:                        len, bufsize - 1);
        !          1316:                return -1;
        !          1317:        }
        !          1318: 
        !          1319:        if (len)
        !          1320:                readfd(f, buf, len);
        !          1321:        buf[len] = '\0';
        !          1322:        return len;
        !          1323: }
        !          1324: 
        !          1325: /* Populate a sum_struct with values from the socket.  This is
        !          1326:  * called by both the sender and the receiver. */
        !          1327: void read_sum_head(int f, struct sum_struct *sum)
        !          1328: {
        !          1329:        int32 max_blength = protocol_version < 30 ? OLD_MAX_BLOCK_SIZE : MAX_BLOCK_SIZE;
        !          1330:        sum->count = read_int(f);
        !          1331:        if (sum->count < 0) {
        !          1332:                rprintf(FERROR, "Invalid checksum count %ld [%s]\n",
        !          1333:                        (long)sum->count, who_am_i());
        !          1334:                exit_cleanup(RERR_PROTOCOL);
        !          1335:        }
        !          1336:        sum->blength = read_int(f);
        !          1337:        if (sum->blength < 0 || sum->blength > max_blength) {
        !          1338:                rprintf(FERROR, "Invalid block length %ld [%s]\n",
        !          1339:                        (long)sum->blength, who_am_i());
        !          1340:                exit_cleanup(RERR_PROTOCOL);
        !          1341:        }
        !          1342:        sum->s2length = protocol_version < 27 ? csum_length : (int)read_int(f);
        !          1343:        if (sum->s2length < 0 || sum->s2length > MAX_DIGEST_LEN) {
        !          1344:                rprintf(FERROR, "Invalid checksum length %d [%s]\n",
        !          1345:                        sum->s2length, who_am_i());
        !          1346:                exit_cleanup(RERR_PROTOCOL);
        !          1347:        }
        !          1348:        sum->remainder = read_int(f);
        !          1349:        if (sum->remainder < 0 || sum->remainder > sum->blength) {
        !          1350:                rprintf(FERROR, "Invalid remainder length %ld [%s]\n",
        !          1351:                        (long)sum->remainder, who_am_i());
        !          1352:                exit_cleanup(RERR_PROTOCOL);
        !          1353:        }
        !          1354: }
        !          1355: 
        !          1356: /* Send the values from a sum_struct over the socket.  Set sum to
        !          1357:  * NULL if there are no checksums to send.  This is called by both
        !          1358:  * the generator and the sender. */
        !          1359: void write_sum_head(int f, struct sum_struct *sum)
        !          1360: {
        !          1361:        static struct sum_struct null_sum;
        !          1362: 
        !          1363:        if (sum == NULL)
        !          1364:                sum = &null_sum;
        !          1365: 
        !          1366:        write_int(f, sum->count);
        !          1367:        write_int(f, sum->blength);
        !          1368:        if (protocol_version >= 27)
        !          1369:                write_int(f, sum->s2length);
        !          1370:        write_int(f, sum->remainder);
        !          1371: }
        !          1372: 
        !          1373: /**
        !          1374:  * Sleep after writing to limit I/O bandwidth usage.
        !          1375:  *
        !          1376:  * @todo Rather than sleeping after each write, it might be better to
        !          1377:  * use some kind of averaging.  The current algorithm seems to always
        !          1378:  * use a bit less bandwidth than specified, because it doesn't make up
        !          1379:  * for slow periods.  But arguably this is a feature.  In addition, we
        !          1380:  * ought to take the time used to write the data into account.
        !          1381:  *
        !          1382:  * During some phases of big transfers (file FOO is uptodate) this is
        !          1383:  * called with a small bytes_written every time.  As the kernel has to
        !          1384:  * round small waits up to guarantee that we actually wait at least the
        !          1385:  * requested number of microseconds, this can become grossly inaccurate.
        !          1386:  * We therefore keep track of the bytes we've written over time and only
        !          1387:  * sleep when the accumulated delay is at least 1 tenth of a second.
        !          1388:  **/
        !          1389: static void sleep_for_bwlimit(int bytes_written)
        !          1390: {
        !          1391:        static struct timeval prior_tv;
        !          1392:        static long total_written = 0;
        !          1393:        struct timeval tv, start_tv;
        !          1394:        long elapsed_usec, sleep_usec;
        !          1395: 
        !          1396: #define ONE_SEC        1000000L /* # of microseconds in a second */
        !          1397: 
        !          1398:        if (!bwlimit_writemax)
        !          1399:                return;
        !          1400: 
        !          1401:        total_written += bytes_written;
        !          1402: 
        !          1403:        gettimeofday(&start_tv, NULL);
        !          1404:        if (prior_tv.tv_sec) {
        !          1405:                elapsed_usec = (start_tv.tv_sec - prior_tv.tv_sec) * ONE_SEC
        !          1406:                             + (start_tv.tv_usec - prior_tv.tv_usec);
        !          1407:                total_written -= (int64)elapsed_usec * bwlimit / (ONE_SEC/1024);
        !          1408:                if (total_written < 0)
        !          1409:                        total_written = 0;
        !          1410:        }
        !          1411: 
        !          1412:        sleep_usec = total_written * (ONE_SEC/1024) / bwlimit;
        !          1413:        if (sleep_usec < ONE_SEC / 10) {
        !          1414:                prior_tv = start_tv;
        !          1415:                return;
        !          1416:        }
        !          1417: 
        !          1418:        tv.tv_sec  = sleep_usec / ONE_SEC;
        !          1419:        tv.tv_usec = sleep_usec % ONE_SEC;
        !          1420:        select(0, NULL, NULL, NULL, &tv);
        !          1421: 
        !          1422:        gettimeofday(&prior_tv, NULL);
        !          1423:        elapsed_usec = (prior_tv.tv_sec - start_tv.tv_sec) * ONE_SEC
        !          1424:                     + (prior_tv.tv_usec - start_tv.tv_usec);
        !          1425:        total_written = (sleep_usec - elapsed_usec) * bwlimit / (ONE_SEC/1024);
        !          1426: }
        !          1427: 
        !          1428: static const char *what_fd_is(int fd)
        !          1429: {
        !          1430:        static char buf[20];
        !          1431: 
        !          1432:        if (fd == sock_f_out)
        !          1433:                return "socket";
        !          1434:        else if (fd == msg_fd_out)
        !          1435:                return "message fd";
        !          1436:        else if (fd == batch_fd)
        !          1437:                return "batch file";
        !          1438:        else {
        !          1439:                snprintf(buf, sizeof buf, "fd %d", fd);
        !          1440:                return buf;
        !          1441:        }
        !          1442: }
        !          1443: 
        !          1444: /* Write len bytes to the file descriptor fd, looping as necessary to get
        !          1445:  * the job done and also (in certain circumstances) reading any data on
        !          1446:  * msg_fd_in to avoid deadlock.
        !          1447:  *
        !          1448:  * This function underlies the multiplexing system.  The body of the
        !          1449:  * application never calls this function directly. */
        !          1450: static void writefd_unbuffered(int fd, const char *buf, size_t len)
        !          1451: {
        !          1452:        size_t n, total = 0;
        !          1453:        fd_set w_fds, r_fds, e_fds;
        !          1454:        int maxfd, count, cnt, using_r_fds;
        !          1455:        int defer_inc = 0;
        !          1456:        struct timeval tv;
        !          1457: 
        !          1458:        if (no_flush++)
        !          1459:                defer_forwarding_messages++, defer_inc++;
        !          1460: 
        !          1461:        while (total < len) {
        !          1462:                FD_ZERO(&w_fds);
        !          1463:                FD_SET(fd, &w_fds);
        !          1464:                FD_ZERO(&e_fds);
        !          1465:                FD_SET(fd, &e_fds);
        !          1466:                maxfd = fd;
        !          1467: 
        !          1468:                if (msg_fd_in >= 0) {
        !          1469:                        FD_ZERO(&r_fds);
        !          1470:                        FD_SET(msg_fd_in, &r_fds);
        !          1471:                        if (msg_fd_in > maxfd)
        !          1472:                                maxfd = msg_fd_in;
        !          1473:                        using_r_fds = 1;
        !          1474:                } else
        !          1475:                        using_r_fds = 0;
        !          1476: 
        !          1477:                tv.tv_sec = select_timeout;
        !          1478:                tv.tv_usec = 0;
        !          1479: 
        !          1480:                errno = 0;
        !          1481:                count = select(maxfd + 1, using_r_fds ? &r_fds : NULL,
        !          1482:                               &w_fds, &e_fds, &tv);
        !          1483: 
        !          1484:                if (count <= 0) {
        !          1485:                        if (count < 0 && errno == EBADF)
        !          1486:                                exit_cleanup(RERR_SOCKETIO);
        !          1487:                        check_timeout();
        !          1488:                        continue;
        !          1489:                }
        !          1490: 
        !          1491:                /*if (FD_ISSET(fd, &e_fds))
        !          1492:                        rprintf(FINFO, "select exception on fd %d\n", fd); */
        !          1493: 
        !          1494:                if (using_r_fds && FD_ISSET(msg_fd_in, &r_fds))
        !          1495:                        read_msg_fd();
        !          1496: 
        !          1497:                if (!FD_ISSET(fd, &w_fds))
        !          1498:                        continue;
        !          1499: 
        !          1500:                n = len - total;
        !          1501:                if (bwlimit_writemax && n > bwlimit_writemax)
        !          1502:                        n = bwlimit_writemax;
        !          1503:                cnt = write(fd, buf + total, n);
        !          1504: 
        !          1505:                if (cnt <= 0) {
        !          1506:                        if (cnt < 0) {
        !          1507:                                if (errno == EINTR)
        !          1508:                                        continue;
        !          1509:                                if (errno == EWOULDBLOCK || errno == EAGAIN) {
        !          1510:                                        msleep(1);
        !          1511:                                        continue;
        !          1512:                                }
        !          1513:                        }
        !          1514: 
        !          1515:                        /* Don't try to write errors back across the stream. */
        !          1516:                        if (fd == sock_f_out)
        !          1517:                                io_end_multiplex_out();
        !          1518:                        /* Don't try to write errors down a failing msg pipe. */
        !          1519:                        if (am_server && fd == msg_fd_out)
        !          1520:                                exit_cleanup(RERR_STREAMIO);
        !          1521:                        rsyserr(FERROR, errno,
        !          1522:                                "writefd_unbuffered failed to write %ld bytes to %s [%s]",
        !          1523:                                (long)len, what_fd_is(fd), who_am_i());
        !          1524:                        /* If the other side is sending us error messages, try
        !          1525:                         * to grab any messages they sent before they died. */
        !          1526:                        while (!am_server && fd == sock_f_out && io_multiplexing_in) {
        !          1527:                                char buf[1024];
        !          1528:                                set_io_timeout(30);
        !          1529:                                ignore_timeout = 0;
        !          1530:                                readfd_unbuffered(sock_f_in, buf, sizeof buf);
        !          1531:                        }
        !          1532:                        exit_cleanup(RERR_STREAMIO);
        !          1533:                }
        !          1534: 
        !          1535:                total += cnt;
        !          1536:                defer_forwarding_messages++, defer_inc++;
        !          1537: 
        !          1538:                if (fd == sock_f_out) {
        !          1539:                        if (io_timeout || am_generator)
        !          1540:                                last_io_out = time(NULL);
        !          1541:                        sleep_for_bwlimit(cnt);
        !          1542:                }
        !          1543:        }
        !          1544: 
        !          1545:        no_flush--;
        !          1546:        if (keep_defer_forwarding)
        !          1547:                defer_inc--;
        !          1548:        if (!(defer_forwarding_messages -= defer_inc) && !no_flush)
        !          1549:                msg_flush();
        !          1550: }
        !          1551: 
        !          1552: int io_flush(int flush_it_all)
        !          1553: {
        !          1554:        int flushed_something = 0;
        !          1555: 
        !          1556:        if (no_flush)
        !          1557:                return 0;
        !          1558: 
        !          1559:        if (iobuf_out_cnt) {
        !          1560:                if (io_multiplexing_out)
        !          1561:                        mplex_write(sock_f_out, MSG_DATA, iobuf_out, iobuf_out_cnt, 0);
        !          1562:                else
        !          1563:                        writefd_unbuffered(iobuf_f_out, iobuf_out, iobuf_out_cnt);
        !          1564:                iobuf_out_cnt = 0;
        !          1565:                flushed_something = 1;
        !          1566:        }
        !          1567: 
        !          1568:        if (flush_it_all && !defer_forwarding_messages && msg_queue.head) {
        !          1569:                msg_flush();
        !          1570:                flushed_something = 1;
        !          1571:        }
        !          1572: 
        !          1573:        return flushed_something;
        !          1574: }
        !          1575: 
        !          1576: static void writefd(int fd, const char *buf, size_t len)
        !          1577: {
        !          1578:        if (fd == sock_f_out)
        !          1579:                stats.total_written += len;
        !          1580: 
        !          1581:        if (fd == write_batch_monitor_out)
        !          1582:                writefd_unbuffered(batch_fd, buf, len);
        !          1583: 
        !          1584:        if (!iobuf_out || fd != iobuf_f_out) {
        !          1585:                writefd_unbuffered(fd, buf, len);
        !          1586:                return;
        !          1587:        }
        !          1588: 
        !          1589:        while (len) {
        !          1590:                int n = MIN((int)len, IO_BUFFER_SIZE - iobuf_out_cnt);
        !          1591:                if (n > 0) {
        !          1592:                        memcpy(iobuf_out+iobuf_out_cnt, buf, n);
        !          1593:                        buf += n;
        !          1594:                        len -= n;
        !          1595:                        iobuf_out_cnt += n;
        !          1596:                }
        !          1597: 
        !          1598:                if (iobuf_out_cnt == IO_BUFFER_SIZE)
        !          1599:                        io_flush(NORMAL_FLUSH);
        !          1600:        }
        !          1601: }
        !          1602: 
        !          1603: void write_shortint(int f, unsigned short x)
        !          1604: {
        !          1605:        char b[2];
        !          1606:        b[0] = (char)x;
        !          1607:        b[1] = (char)(x >> 8);
        !          1608:        writefd(f, b, 2);
        !          1609: }
        !          1610: 
        !          1611: void write_int(int f, int32 x)
        !          1612: {
        !          1613:        char b[4];
        !          1614:        SIVAL(b, 0, x);
        !          1615:        writefd(f, b, 4);
        !          1616: }
        !          1617: 
        !          1618: void write_varint(int f, int32 x)
        !          1619: {
        !          1620:        char b[5];
        !          1621:        uchar bit;
        !          1622:        int cnt = 4;
        !          1623: 
        !          1624:        SIVAL(b, 1, x);
        !          1625: 
        !          1626:        while (cnt > 1 && b[cnt] == 0)
        !          1627:                cnt--;
        !          1628:        bit = ((uchar)1<<(7-cnt+1));
        !          1629:        if (CVAL(b, cnt) >= bit) {
        !          1630:                cnt++;
        !          1631:                *b = ~(bit-1);
        !          1632:        } else if (cnt > 1)
        !          1633:                *b = b[cnt] | ~(bit*2-1);
        !          1634:        else
        !          1635:                *b = b[cnt];
        !          1636: 
        !          1637:        writefd(f, b, cnt);
        !          1638: }
        !          1639: 
        !          1640: void write_varlong(int f, int64 x, uchar min_bytes)
        !          1641: {
        !          1642:        char b[9];
        !          1643:        uchar bit;
        !          1644:        int cnt = 8;
        !          1645: 
        !          1646:        SIVAL(b, 1, x);
        !          1647: #if SIZEOF_INT64 >= 8
        !          1648:        SIVAL(b, 5, x >> 32);
        !          1649: #else
        !          1650:        if (x <= 0x7FFFFFFF && x >= 0)
        !          1651:                memset(b + 5, 0, 4);
        !          1652:        else {
        !          1653:                rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
        !          1654:                exit_cleanup(RERR_UNSUPPORTED);
        !          1655:        }
        !          1656: #endif
        !          1657: 
        !          1658:        while (cnt > min_bytes && b[cnt] == 0)
        !          1659:                cnt--;
        !          1660:        bit = ((uchar)1<<(7-cnt+min_bytes));
        !          1661:        if (CVAL(b, cnt) >= bit) {
        !          1662:                cnt++;
        !          1663:                *b = ~(bit-1);
        !          1664:        } else if (cnt > min_bytes)
        !          1665:                *b = b[cnt] | ~(bit*2-1);
        !          1666:        else
        !          1667:                *b = b[cnt];
        !          1668: 
        !          1669:        writefd(f, b, cnt);
        !          1670: }
        !          1671: 
        !          1672: /*
        !          1673:  * Note: int64 may actually be a 32-bit type if ./configure couldn't find any
        !          1674:  * 64-bit types on this platform.
        !          1675:  */
        !          1676: void write_longint(int f, int64 x)
        !          1677: {
        !          1678:        char b[12], * const s = b+4;
        !          1679: 
        !          1680:        SIVAL(s, 0, x);
        !          1681:        if (x <= 0x7FFFFFFF && x >= 0) {
        !          1682:                writefd(f, s, 4);
        !          1683:                return;
        !          1684:        }
        !          1685: 
        !          1686: #if SIZEOF_INT64 < 8
        !          1687:        rprintf(FERROR, "Integer overflow: attempted 64-bit offset\n");
        !          1688:        exit_cleanup(RERR_UNSUPPORTED);
        !          1689: #else
        !          1690:        memset(b, 0xFF, 4);
        !          1691:        SIVAL(s, 4, x >> 32);
        !          1692:        writefd(f, b, 12);
        !          1693: #endif
        !          1694: }
        !          1695: 
        !          1696: void write_buf(int f, const char *buf, size_t len)
        !          1697: {
        !          1698:        writefd(f,buf,len);
        !          1699: }
        !          1700: 
        !          1701: /** Write a string to the connection */
        !          1702: void write_sbuf(int f, const char *buf)
        !          1703: {
        !          1704:        writefd(f, buf, strlen(buf));
        !          1705: }
        !          1706: 
        !          1707: void write_byte(int f, uchar c)
        !          1708: {
        !          1709:        writefd(f, (char *)&c, 1);
        !          1710: }
        !          1711: 
        !          1712: void write_vstring(int f, const char *str, int len)
        !          1713: {
        !          1714:        uchar lenbuf[3], *lb = lenbuf;
        !          1715: 
        !          1716:        if (len > 0x7F) {
        !          1717:                if (len > 0x7FFF) {
        !          1718:                        rprintf(FERROR,
        !          1719:                                "attempting to send over-long vstring (%d > %d)\n",
        !          1720:                                len, 0x7FFF);
        !          1721:                        exit_cleanup(RERR_PROTOCOL);
        !          1722:                }
        !          1723:                *lb++ = len / 0x100 + 0x80;
        !          1724:        }
        !          1725:        *lb = len;
        !          1726: 
        !          1727:        writefd(f, (char*)lenbuf, lb - lenbuf + 1);
        !          1728:        if (len)
        !          1729:                writefd(f, str, len);
        !          1730: }
        !          1731: 
        !          1732: /* Send a file-list index using a byte-reduction method. */
        !          1733: void write_ndx(int f, int32 ndx)
        !          1734: {
        !          1735:        static int32 prev_positive = -1, prev_negative = 1;
        !          1736:        int32 diff, cnt = 0;
        !          1737:        char b[6];
        !          1738: 
        !          1739:        if (protocol_version < 30 || read_batch) {
        !          1740:                write_int(f, ndx);
        !          1741:                return;
        !          1742:        }
        !          1743: 
        !          1744:        /* Send NDX_DONE as a single-byte 0 with no side effects.  Send
        !          1745:         * negative nums as a positive after sending a leading 0xFF. */
        !          1746:        if (ndx >= 0) {
        !          1747:                diff = ndx - prev_positive;
        !          1748:                prev_positive = ndx;
        !          1749:        } else if (ndx == NDX_DONE) {
        !          1750:                *b = 0;
        !          1751:                writefd(f, b, 1);
        !          1752:                return;
        !          1753:        } else {
        !          1754:                b[cnt++] = (char)0xFF;
        !          1755:                ndx = -ndx;
        !          1756:                diff = ndx - prev_negative;
        !          1757:                prev_negative = ndx;
        !          1758:        }
        !          1759: 
        !          1760:        /* A diff of 1 - 253 is sent as a one-byte diff; a diff of 254 - 32767
        !          1761:         * or 0 is sent as a 0xFE + a two-byte diff; otherwise we send 0xFE
        !          1762:         * & all 4 bytes of the (non-negative) num with the high-bit set. */
        !          1763:        if (diff < 0xFE && diff > 0)
        !          1764:                b[cnt++] = (char)diff;
        !          1765:        else if (diff < 0 || diff > 0x7FFF) {
        !          1766:                b[cnt++] = (char)0xFE;
        !          1767:                b[cnt++] = (char)((ndx >> 24) | 0x80);
        !          1768:                b[cnt++] = (char)ndx;
        !          1769:                b[cnt++] = (char)(ndx >> 8);
        !          1770:                b[cnt++] = (char)(ndx >> 16);
        !          1771:        } else {
        !          1772:                b[cnt++] = (char)0xFE;
        !          1773:                b[cnt++] = (char)(diff >> 8);
        !          1774:                b[cnt++] = (char)diff;
        !          1775:        }
        !          1776:        writefd(f, b, cnt);
        !          1777: }
        !          1778: 
        !          1779: /* Receive a file-list index using a byte-reduction method. */
        !          1780: int32 read_ndx(int f)
        !          1781: {
        !          1782:        static int32 prev_positive = -1, prev_negative = 1;
        !          1783:        int32 *prev_ptr, num;
        !          1784:        char b[4];
        !          1785: 
        !          1786:        if (protocol_version < 30)
        !          1787:                return read_int(f);
        !          1788: 
        !          1789:        readfd(f, b, 1);
        !          1790:        if (CVAL(b, 0) == 0xFF) {
        !          1791:                readfd(f, b, 1);
        !          1792:                prev_ptr = &prev_negative;
        !          1793:        } else if (CVAL(b, 0) == 0)
        !          1794:                return NDX_DONE;
        !          1795:        else
        !          1796:                prev_ptr = &prev_positive;
        !          1797:        if (CVAL(b, 0) == 0xFE) {
        !          1798:                readfd(f, b, 2);
        !          1799:                if (CVAL(b, 0) & 0x80) {
        !          1800:                        b[3] = CVAL(b, 0) & ~0x80;
        !          1801:                        b[0] = b[1];
        !          1802:                        readfd(f, b+1, 2);
        !          1803:                        num = IVAL(b, 0);
        !          1804:                } else
        !          1805:                        num = (UVAL(b,0)<<8) + UVAL(b,1) + *prev_ptr;
        !          1806:        } else
        !          1807:                num = UVAL(b, 0) + *prev_ptr;
        !          1808:        *prev_ptr = num;
        !          1809:        if (prev_ptr == &prev_negative)
        !          1810:                num = -num;
        !          1811:        return num;
        !          1812: }
        !          1813: 
        !          1814: /* Read a line of up to bufsiz-1 characters into buf.  Strips
        !          1815:  * the (required) trailing newline and all carriage returns.
        !          1816:  * Returns 1 for success; 0 for I/O error or truncation. */
        !          1817: int read_line_old(int f, char *buf, size_t bufsiz)
        !          1818: {
        !          1819:        bufsiz--; /* leave room for the null */
        !          1820:        while (bufsiz > 0) {
        !          1821:                buf[0] = 0;
        !          1822:                read_buf(f, buf, 1);
        !          1823:                if (buf[0] == 0)
        !          1824:                        return 0;
        !          1825:                if (buf[0] == '\n')
        !          1826:                        break;
        !          1827:                if (buf[0] != '\r') {
        !          1828:                        buf++;
        !          1829:                        bufsiz--;
        !          1830:                }
        !          1831:        }
        !          1832:        *buf = '\0';
        !          1833:        return bufsiz > 0;
        !          1834: }
        !          1835: 
        !          1836: void io_printf(int fd, const char *format, ...)
        !          1837: {
        !          1838:        va_list ap;
        !          1839:        char buf[BIGPATHBUFLEN];
        !          1840:        int len;
        !          1841: 
        !          1842:        va_start(ap, format);
        !          1843:        len = vsnprintf(buf, sizeof buf, format, ap);
        !          1844:        va_end(ap);
        !          1845: 
        !          1846:        if (len < 0)
        !          1847:                exit_cleanup(RERR_STREAMIO);
        !          1848: 
        !          1849:        if (len > (int)sizeof buf) {
        !          1850:                rprintf(FERROR, "io_printf() was too long for the buffer.\n");
        !          1851:                exit_cleanup(RERR_STREAMIO);
        !          1852:        }
        !          1853: 
        !          1854:        write_sbuf(fd, buf);
        !          1855: }
        !          1856: 
        !          1857: /** Setup for multiplexing a MSG_* stream with the data stream. */
        !          1858: void io_start_multiplex_out(void)
        !          1859: {
        !          1860:        io_flush(NORMAL_FLUSH);
        !          1861:        io_start_buffering_out(sock_f_out);
        !          1862:        io_multiplexing_out = 1;
        !          1863: }
        !          1864: 
        !          1865: /** Setup for multiplexing a MSG_* stream with the data stream. */
        !          1866: void io_start_multiplex_in(void)
        !          1867: {
        !          1868:        io_flush(NORMAL_FLUSH);
        !          1869:        io_start_buffering_in(sock_f_in);
        !          1870:        io_multiplexing_in = 1;
        !          1871: }
        !          1872: 
        !          1873: /** Write an message to the multiplexed data stream. */
        !          1874: int io_multiplex_write(enum msgcode code, const char *buf, size_t len, int convert)
        !          1875: {
        !          1876:        if (!io_multiplexing_out)
        !          1877:                return 0;
        !          1878:        io_flush(NORMAL_FLUSH);
        !          1879:        stats.total_written += (len+4);
        !          1880:        mplex_write(sock_f_out, code, buf, len, convert);
        !          1881:        return 1;
        !          1882: }
        !          1883: 
        !          1884: void io_end_multiplex_in(void)
        !          1885: {
        !          1886:        io_multiplexing_in = 0;
        !          1887:        io_end_buffering_in();
        !          1888: }
        !          1889: 
        !          1890: /** Stop output multiplexing. */
        !          1891: void io_end_multiplex_out(void)
        !          1892: {
        !          1893:        io_multiplexing_out = 0;
        !          1894:        io_end_buffering_out();
        !          1895: }
        !          1896: 
        !          1897: void start_write_batch(int fd)
        !          1898: {
        !          1899:        /* Some communication has already taken place, but we don't
        !          1900:         * enable batch writing until here so that we can write a
        !          1901:         * canonical record of the communication even though the
        !          1902:         * actual communication so far depends on whether a daemon
        !          1903:         * is involved. */
        !          1904:        write_int(batch_fd, protocol_version);
        !          1905:        if (protocol_version >= 30)
        !          1906:                write_byte(batch_fd, compat_flags);
        !          1907:        write_int(batch_fd, checksum_seed);
        !          1908: 
        !          1909:        if (am_sender)
        !          1910:                write_batch_monitor_out = fd;
        !          1911:        else
        !          1912:                write_batch_monitor_in = fd;
        !          1913: }
        !          1914: 
        !          1915: void stop_write_batch(void)
        !          1916: {
        !          1917:        write_batch_monitor_out = -1;
        !          1918:        write_batch_monitor_in = -1;
        !          1919: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>