File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / rsync / db.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Mar 17 00:32:36 2021 UTC (3 years, 2 months ago) by misho
Branches: rsync, MAIN
CVS tags: v3_2_3, HEAD
rsync 3.2.3

/*
 * Routines to access extended file info via DB.
 *
 * Copyright (C) 2008-2013 Wayne Davison
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, visit the http://fsf.org website.
 */

#include "rsync.h"
#include "ifuncs.h"
#include "itypes.h"
#include "inums.h"
#ifdef USE_OPENSSL
#include "openssl/md4.h"
#include "openssl/md5.h"
#endif

extern int recurse;
extern int same_db;
extern int am_receiver;
extern int am_generator;
extern int checksum_type;
extern int db_clean, db_check, db_do_md4, db_do_md5, db_update, db_lax, db_init, db_mounts;
extern int db_output_name, db_output_sum, db_output_info, db_output_unchanged, db_output_dirs, db_output_msgs;
extern int saw_db_output_opt, saw_db_sum_opt;
extern char *db_config;

#define MOUNT_HELPER_SCRIPT "/usr/sbin/rsyncdb-mountinfo"

#if defined HAVE_MYSQL_MYSQL_H && defined HAVE_LIBMYSQLCLIENT
#define USE_MYSQL
#include <mysql/mysql.h>
#include <mysql/errmsg.h>
#endif

#if defined HAVE_SQLITE3_H && defined HAVE_LIBSQLITE3
#define USE_SQLITE
#include <sqlite3.h>
#ifndef HAVE_SQLITE3_OPEN_V2
#define sqlite3_open_v2(dbname, dbhptr, flags, vfs) \
	sqlite3_open(dbname, dbhptr)
#endif
#ifndef HAVE_SQLITE3_PREPARE_V2
#define sqlite3_prepare_v2 sqlite3_prepare
#endif
#define MAX_LOCK_FAILURES 10
#define LOCK_FAIL_MSLEEP 100
#endif

#ifndef USE_OPENSSL
#define MD5_CTX md_context
#define MD5_Init md5_begin
#define MD5_Update md5_update
#define MD5_Final(digest, cptr) md5_result(cptr, digest)
#endif

#define DB_TYPE_NONE 0
#define DB_TYPE_MYSQL 1
#define DB_TYPE_SQLITE 2

int use_db = DB_TYPE_NONE;
int select_many_sums = 0;

#define PREP_NORM 0
#define PREP_MOUNT 1

static const char *dbhost = NULL, *dbuser = NULL, *dbpass = NULL, *dbname = NULL;
static unsigned int dbport = 0;
static int transaction_state = -1;

static union {
#ifdef USE_MYSQL
    MYSQL *mysql;
#endif
#ifdef USE_SQLITE
    sqlite3 *sqlite;
#endif
    void *all;
} dbh;

#define SEL_DEV 0
#define SEL_SUM 1
#define REP_SUM 2
#define UPD_CTIME 3
#define INS_MOUNT 4
#define UPD_MOUNT 5 /* SQLite only */
#define SEL_MOUNT 6
#define UN_MOUNT 7
#define DEL_SUMS 8
#define INS_PRESENT 9
#define MAX_PREP_CNT 10

#define MAX_BIND_CNT 7
#define MAX_RESULT_BINDS 32

static union {
#ifdef USE_MYSQL
    MYSQL_STMT *mysql;
#endif
#ifdef USE_SQLITE
    sqlite3_stmt *sqlite;
#endif
    void *all;
} statements[MAX_PREP_CNT];

static int md_num;
static enum logcode log_code;

#ifdef USE_MYSQL
static unsigned int bind_disk_id, bind_mdnum;
static int64 bind_devno, bind_ino, bind_size, bind_mtime, bind_ctime;
static char bind_sum[MAX_DIGEST_LEN];
static unsigned long result_length[MAX_RESULT_BINDS];
static bool result_is_null[MAX_RESULT_BINDS], result_error[MAX_RESULT_BINDS];
#elif defined USE_SQLITE
static int64 bind_mtime;
#endif
static char bind_thishost[128+1];
static unsigned long bind_thishost_len;
static char *mount_helper_script = NULL;

static char *error_log;
#if defined USE_SQLITE && defined SQLITE_CONFIG_LOG
static char bind_mount_uniq[128+1];
static unsigned long bind_mount_uniq_len;
static FILE *error_log_fp;
#endif

#define PTR_SIZE (sizeof (struct file_struct *))

#if defined USE_MYSQL || defined USE_SQLITE
static void update_mounts(void);
#endif

struct name_list {
	struct name_list *next;
	char name[1];
} *dirs_list;

int db_read_config(enum logcode code, const char *config_file)
{
	char buf[2048], *cp;
	FILE *fp;
	int lineno = 0;

	log_code = code;

	bind_thishost_len = strlcpy(bind_thishost, "localhost", sizeof bind_thishost);

	if (!(fp = fopen(config_file, "r"))) {
		rsyserr(log_code, errno, "unable to open %s", config_file);
		return 0;
	}
	if (DEBUG_GTE(DB, 1))
		rprintf(FCLIENT, "[%s] Reading DB config from %s\n", who_am_i(), config_file);
	while (fgets(buf, sizeof buf, fp)) {
		lineno++;
		if ((cp = strchr(buf, '#')) == NULL
		 && (cp = strchr(buf, '\r')) == NULL
		 && (cp = strchr(buf, '\n')) == NULL)
			cp = buf + strlen(buf);
		while (cp != buf && isSpace(cp-1)) cp--;
		*cp = '\0';

		if (!*buf)
			continue;

		if (!(cp = strchr(buf, ':')))
			goto invalid_line;
		*cp++ = '\0';

		while (isSpace(cp)) cp++;
		if (strcasecmp(buf, "dbhost") == 0)
			dbhost = strdup(cp);
		else if (strcasecmp(buf, "dbuser") == 0)
			dbuser = strdup(cp);
		else if (strcasecmp(buf, "dbpass") == 0)
			dbpass = strdup(cp);
		else if (strcasecmp(buf, "dbname") == 0)
			dbname = strdup(cp);
		else if (strcasecmp(buf, "dbport") == 0)
			dbport = atoi(cp);
		else if (strcasecmp(buf, "transaction") == 0)
			transaction_state = atoi(cp) ? 0 : -1;
		else if (strcasecmp(buf, "mountHelper") == 0)
			mount_helper_script = strdup(cp);
		else if (strcasecmp(buf, "errlog") == 0)
			error_log = strdup(cp);
		else if (strcasecmp(buf, "thishost") == 0)
			bind_thishost_len = strlcpy(bind_thishost, cp, sizeof bind_thishost);
		else if (strcasecmp(buf, "dbtype") == 0) {
#ifdef USE_MYSQL
			if (strcasecmp(cp, "mysql") == 0) {
				use_db = DB_TYPE_MYSQL;
				continue;
			}
#endif
#ifdef USE_SQLITE
			if (strcasecmp(cp, "sqlite") == 0) {
				use_db = DB_TYPE_SQLITE;
				continue;
			}
#endif
			rprintf(log_code,
			    "Unsupported dbtype on line #%d in %s.\n",
			    lineno, config_file);
			use_db = DB_TYPE_NONE;
			return 0;
		} else {
		  invalid_line:
			rprintf(log_code, "Invalid line #%d in %s\n",
				lineno, config_file);
			use_db = DB_TYPE_NONE;
			return 0;
		}
	}
	fclose(fp);

	if (bind_thishost_len >= (int)sizeof bind_thishost)
		bind_thishost_len = sizeof bind_thishost - 1;

	if (!use_db || !dbname) {
		rprintf(log_code, "Please specify at least dbtype and dbname in %s.\n", config_file);
		use_db = DB_TYPE_NONE;
		return 0;
	}

	md_num = checksum_type == 5 ? 5 : 4;

	if (error_log) {
		if (use_db != DB_TYPE_SQLITE)
			rprintf(log_code, "Ignoring errlog setting for non-SQLite DB.\n");
#ifndef SQLITE_CONFIG_LOG
		else
			rprintf(log_code, "Your sqlite doesn't support SQLITE_CONFIG_LOG.\n");
#endif
	}

	if (!mount_helper_script)
		mount_helper_script = MOUNT_HELPER_SCRIPT;

	return 1;
}

#if defined USE_SQLITE && defined SQLITE_CONFIG_LOG
static void errorLogCallback(UNUSED(void *pArg), int iErrCode, const char *zMsg)
{
	fprintf(error_log_fp, "[%d] %s (%d)\n", (int)getpid(), zMsg, iErrCode);
}
#endif

static int run_sql(const char *fmt, ...)
{
	va_list ap;
	char *query;
	int ok = 0, qlen;

	va_start(ap, fmt);
	qlen = vasprintf(&query, fmt, ap);
	va_end(ap);
	if (qlen < 0)
		out_of_memory("run_sql");
	if (DEBUG_GTE(DB, 3))
		rprintf(FCLIENT, "[%s] SQL being run: %s\n", who_am_i(), query);

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL:
		if (mysql_query(dbh.mysql, query) < 0) {
			rprintf(FERROR, "Failed to run sql: %s\n", mysql_error(dbh.mysql));
			rprintf(FERROR, "%s\n", query);
		} else
			ok = 1;
		break;
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		int rc, lock_failures = 0;
		while (1) {
			if ((rc = sqlite3_exec(dbh.sqlite, query, NULL, NULL, NULL)) == 0)
				break;
			if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED)
				break;
			if (++lock_failures > MAX_LOCK_FAILURES)
				break;
			msleep(LOCK_FAIL_MSLEEP);
		}
		if (rc) {
			rprintf(FERROR, "[%s] Failed to run sql: %s\n", who_am_i(), sqlite3_errmsg(dbh.sqlite));
			rprintf(FERROR, "%s\n", query);
		} else
			ok = 1;
		break;
	    }
#endif
	}

	free(query);

	return ok;
}

#ifdef USE_MYSQL
static int prepare_mysql(int ndx, MYSQL_BIND *binds, int bind_cnt, const char *fmt, ...)
{
	va_list ap;
	char *query;
	int qlen, param_cnt;
	MYSQL_STMT *stmt = mysql_stmt_init(dbh.mysql);

	if (stmt == NULL)
		out_of_memory("prepare_mysql");

	va_start(ap, fmt);
	qlen = vasprintf(&query, fmt, ap);
	va_end(ap);
	if (qlen < 0)
		out_of_memory("prepare_mysql");
	if (DEBUG_GTE(DB, 3))
		rprintf(FCLIENT, "[%s] SQL being prepared: %s\n", who_am_i(), query);

	if (mysql_stmt_prepare(stmt, query, qlen) != 0) {
		rprintf(log_code, "[%s] Prepare failed: %s\n", who_am_i(), mysql_stmt_error(stmt));
		rprintf(log_code, "%s\n", query);
		free(query);
		return 0;
	}

	if ((param_cnt = mysql_stmt_param_count(stmt)) != bind_cnt) {
		rprintf(log_code, "[%s] Parameters in statement = %d, bind vars = %d\n",
			who_am_i(), param_cnt, bind_cnt);
		rprintf(log_code, "%s\n", query);
		free(query);
		return 0;
	}
	if (bind_cnt)
		mysql_stmt_bind_param(stmt, binds);

	statements[ndx].mysql = stmt;
	free(query);

	return 1;
}
#endif

#ifdef USE_MYSQL
static int prepare_mysql_queries(int type)
{
	MYSQL_BIND binds[MAX_BIND_CNT];
	char *sql;

	switch (type) {
	case PREP_NORM:
		sql="SELECT disk_id"
		    " FROM disk"
		    " WHERE host = ? AND devno = ?";
		memset(binds, 0, sizeof binds);
		binds[0].buffer_type = MYSQL_TYPE_STRING;
		binds[0].buffer = &bind_thishost;
		binds[0].buffer_length = bind_thishost_len;
		binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[1].buffer = &bind_devno;
		if (!prepare_mysql(SEL_DEV, binds, 2, sql))
			return 0;

		memset(binds, 0, sizeof binds);
		binds[0].buffer_type = MYSQL_TYPE_LONG;
		binds[0].buffer = &bind_disk_id;
		binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[1].buffer = &bind_ino;
		if (select_many_sums) {
			sql="SELECT checksum, sum_type, size, mtime, ctime"
			    " FROM inode_map"
			    " WHERE disk_id = ? AND ino = ?";
			if (!prepare_mysql(SEL_SUM, binds, 2, sql))
				return 0;
		} else {
			sql="SELECT checksum"
			    " FROM inode_map"
			    " WHERE disk_id = ? AND ino = ? AND sum_type = %d"
			    "   AND size = ? AND mtime = ? %s"; /* optional: AND ctime = ? */
			binds[2].buffer_type = MYSQL_TYPE_LONGLONG;
			binds[2].buffer = &bind_size;
			binds[3].buffer_type = MYSQL_TYPE_LONGLONG;
			binds[3].buffer = &bind_mtime;
			if (!db_lax) {
				binds[4].buffer_type = MYSQL_TYPE_LONGLONG;
				binds[4].buffer = &bind_ctime;
			}
			if (!prepare_mysql(SEL_SUM, binds, 4 + !db_lax, sql, md_num, db_lax ? "" : "AND ctime = ?"))
				return 0;
		}

		sql="INSERT INTO inode_map"
		    " SET disk_id = ?, ino = ?, sum_type = ?,"
		    "     size = ?, mtime = ?, ctime = ?, checksum = ?"
		    " ON DUPLICATE KEY"
		    " UPDATE size = VALUES(size), mtime = VALUES(mtime),"
		    "        ctime = VALUES(ctime), checksum = VALUES(checksum)";
		memset(binds, 0, sizeof binds);
		binds[0].buffer_type = MYSQL_TYPE_LONG;
		binds[0].buffer = &bind_disk_id;
		binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[1].buffer = &bind_ino;
		binds[2].buffer_type = MYSQL_TYPE_LONG;
		binds[2].buffer = &bind_mdnum;
		binds[3].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[3].buffer = &bind_size;
		binds[4].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[4].buffer = &bind_mtime;
		binds[5].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[5].buffer = &bind_ctime;
		binds[6].buffer_type = MYSQL_TYPE_BLOB;
		binds[6].buffer = &bind_sum;
		binds[6].buffer_length = MD5_DIGEST_LEN; /* Same as MD4_DIGEST_LEN */
		if (!prepare_mysql(REP_SUM, binds, 7, sql))
			return 0;

		sql="UPDATE inode_map"
		    " SET ctime = ?"
		    " WHERE disk_id = ? AND ino = ? AND sum_type = ? AND size = ? AND mtime = ?";
		memset(binds, 0, sizeof binds);
		binds[0].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[0].buffer = &bind_ctime;
		binds[1].buffer_type = MYSQL_TYPE_LONG;
		binds[1].buffer = &bind_disk_id;
		binds[2].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[2].buffer = &bind_ino;
		binds[3].buffer_type = MYSQL_TYPE_LONG;
		binds[3].buffer = &bind_mdnum;
		binds[4].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[4].buffer = &bind_size;
		binds[5].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[5].buffer = &bind_mtime;
		if (!prepare_mysql(UPD_CTIME, binds, 6, sql))
			return 0;
		break;

	case PREP_MOUNT:
		sql="INSERT INTO disk"
		    " SET host = ?, last_seen = ?, mount_uniq = ?, devno = ?"
		    " ON DUPLICATE KEY"
		    " UPDATE last_seen = VALUES(last_seen), devno = VALUES(devno)";
		memset(binds, 0, sizeof binds);
		binds[0].buffer_type = MYSQL_TYPE_STRING;
		binds[0].buffer = &bind_thishost;
		binds[0].buffer_length = bind_thishost_len;
		binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[1].buffer = &bind_mtime; /* we abuse mtime to hold the last_seen value */
		binds[2].buffer_type = MYSQL_TYPE_STRING;
		binds[2].buffer = &bind_mount_uniq;
		binds[2].buffer_length = sizeof bind_mount_uniq;
		binds[2].length = &bind_mount_uniq_len;
		binds[3].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[3].buffer = &bind_devno;
		if (!prepare_mysql(INS_MOUNT, binds, 4, sql))
			return 0;

		sql="SELECT mount_uniq"
		    " FROM disk"
		    " WHERE host = ? AND last_seen < ? AND devno != 0";
		/* Reusing first 2 binds from INS_MOUNT */
		if (!prepare_mysql(SEL_MOUNT, binds, 2, sql))
			return 0;

		sql="UPDATE disk"
		    " SET devno = 0"
		    " WHERE host = ? AND last_seen < ? AND devno != 0";
		/* Reusing binds from SEL_MOUNT */
		if (!prepare_mysql(UN_MOUNT, binds, 2, sql))
			return 0;
		break;
	}

	return 1;
}
#endif

#ifdef USE_MYSQL
static int db_connect_mysql(void)
{
	const char *open_dbname = db_init ? "mysql" : dbname;

	if (!(dbh.mysql = mysql_init(NULL)))
		out_of_memory("db_read_config");

	if (DEBUG_GTE(DB, 1)) {
		rprintf(FCLIENT, "[%s] connecting: host=%s user=%s db=%s port=%d\n",
			who_am_i(), dbhost, dbuser, open_dbname, dbport);
	}
	if (!mysql_real_connect(dbh.mysql, dbhost, dbuser, dbpass, open_dbname, dbport, NULL, 0)) {
		rprintf(log_code, "[%s] Unable to connect to DB: %s\n", who_am_i(), mysql_error(dbh.mysql));
		return 0;
	}

	if (db_init) {
		if (db_output_msgs)
			rprintf(FCLIENT, "Creating DB %s (if it does not exist)\n", dbname);
		if (!run_sql("CREATE DATABASE IF NOT EXISTS `%s`", dbname)
		 || !run_sql("USE `%s`", dbname))
			exit_cleanup(RERR_IPC);

		if (db_output_msgs)
			rprintf(FCLIENT, "Dropping old tables (if they exist))\n");
		if (!run_sql("DROP TABLE IF EXISTS disk")
		 || !run_sql("DROP TABLE IF EXISTS inode_map"))
			exit_cleanup(RERR_IPC);

		if (db_output_msgs)
			rprintf(FCLIENT, "Creating empty tables ...\n");
		if (!run_sql(
		    "CREATE TABLE disk (\n"
		    "  disk_id integer unsigned NOT NULL PRIMARY KEY AUTO_INCREMENT,\n"
		    "  host varchar(128) NOT NULL default 'localhost',\n"
		    "  mount_uniq varchar(128) default NULL,\n"
		    "  devno bigint unsigned NOT NULL,\n" /* This is 0 when not mounted */
		    "  last_seen bigint NOT NULL,\n"
		    "  UNIQUE KEY mount_lookup (host, mount_uniq),\n"
		    "  KEY dev_lookup (devno, host)\n"
		    ")"))
			exit_cleanup(RERR_IPC);

		if (!run_sql(
		    "CREATE TABLE inode_map (\n"
		    "  disk_id integer unsigned NOT NULL,\n"
		    "  ino bigint unsigned NOT NULL,\n"
		    "  sum_type tinyint NOT NULL default '0',\n"
		    "  size bigint unsigned NOT NULL,\n"
		    "  mtime bigint NOT NULL,\n"
		    "  ctime bigint NOT NULL,\n"
		    "  checksum binary(16) NOT NULL,\n"
		    "  PRIMARY KEY (disk_id,ino,sum_type)\n"
		    ")"))
			exit_cleanup(RERR_IPC);

		if (!db_mounts)
			exit_cleanup(0);
	}

	if (db_mounts) {
		if (!prepare_mysql_queries(PREP_MOUNT))
			exit_cleanup(RERR_IPC);
		update_mounts();
		exit_cleanup(0);
	}

	if (!prepare_mysql_queries(PREP_NORM))
		return 0;

	return 1;
}
#endif

#ifdef USE_SQLITE
static int prepare_sqlite(int ndx, const char *fmt, ...)
{
	va_list ap;
	char *query;
	int rc, qlen, lock_failures = 0;

	va_start(ap, fmt);
	qlen = vasprintf(&query, fmt, ap);
	va_end(ap);
	if (qlen < 0)
		out_of_memory("prepare_sqlite");
	if (DEBUG_GTE(DB, 3))
		rprintf(FCLIENT, "[%s] SQL being prepared: %s\n", who_am_i(), query);

	while ((rc = sqlite3_prepare_v2(dbh.sqlite, query, -1, &statements[ndx].sqlite, NULL)) != 0) {
		if (DEBUG_GTE(DB, 4)) {
			rprintf(FCLIENT, "[%s] sqlite3_prepare_v2(,%s,,) returned %d\n",
				who_am_i(), query, rc);
		}
		if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED)
			break;
		if (++lock_failures > MAX_LOCK_FAILURES)
			break;
		msleep(LOCK_FAIL_MSLEEP);
	}
	if (rc) {
		rprintf(log_code, "[%s] Failed to prepare SQL: %s (%d)\n", who_am_i(), sqlite3_errmsg(dbh.sqlite), rc);
		rprintf(log_code, "%s\n", query);
		free(query);
		return 0;
	}
	free(query);

	return 1;
}
#endif

#ifdef USE_SQLITE
static int prepare_sqlite_queries(int type)
{
	char *sql;

	switch (type) {
	case PREP_NORM:
		sql="SELECT disk_id"
		    " FROM disk"
		    " WHERE host = ? AND devno = ?";
		if (!prepare_sqlite(SEL_DEV, sql))
			return 0;

		if (select_many_sums) {
			sql="SELECT checksum, sum_type, size, mtime, ctime"
			    " FROM inode_map"
			    " WHERE disk_id = ? AND ino = ?";
			if (!prepare_sqlite(SEL_SUM, sql))
				return 0;
		} else {
			sql="SELECT checksum"
			    " FROM inode_map"
			    " WHERE disk_id = ? AND ino = ? AND sum_type = %d"
			    "   AND size = ? AND mtime = ? %s";
			if (!prepare_sqlite(SEL_SUM, sql, md_num, db_lax ? "" : "AND ctime = ?"))
				return 0;
		}

		sql="INSERT OR REPLACE INTO inode_map"
		    " (disk_id, ino, sum_type, size, mtime, ctime, checksum)"
		    " VALUES (?, ?, ?, ?, ?, ?, ?)";
		if (!prepare_sqlite(REP_SUM, sql))
			return 0;

		sql="UPDATE inode_map"
		    " SET ctime = ?"
		    " WHERE disk_id = ? AND ino = ? AND sum_type = ? AND size = ? AND mtime = ?";
		if (!prepare_sqlite(UPD_CTIME, sql))
			return 0;
		break;

	case PREP_MOUNT:
		sql="INSERT OR IGNORE INTO disk"
		    " (host, last_seen, mount_uniq, devno)"
		    " VALUES (?, ?, ?, ?)";
		if (!prepare_sqlite(INS_MOUNT, sql))
			return 0;

		sql="UPDATE disk"
		    " SET last_seen = ?, devno = ?"
		    " WHERE host = ? AND mount_uniq = ?";
		if (!prepare_sqlite(UPD_MOUNT, sql))
			return 0;

		sql="SELECT mount_uniq"
		    " FROM disk"
		    " WHERE host = ? AND last_seen < ? AND devno != 0";
		if (!prepare_sqlite(SEL_MOUNT, sql))
			return 0;

		sql="UPDATE disk"
		    " SET devno = 0"
		    " WHERE host = ? AND last_seen < ? AND devno != 0";
		if (!prepare_sqlite(UN_MOUNT, sql))
			return 0;
		break;
	}

	return 1;
}
#endif

#ifdef USE_SQLITE
static int db_connect_sqlite(void)
{
	int lock_failures = 0;
	int rc;

#ifdef SQLITE_CONFIG_LOG
	if (error_log) {
		if (DEBUG_GTE(DB, 1))
			rprintf(FCLIENT, "[%s] Setting sqlite errlog to %s\n", who_am_i(), error_log);
		if (!(error_log_fp = fopen(error_log, "a"))) {
			rsyserr(log_code, errno, "unable to append to logfile %s", error_log);
			error_log = NULL;
		} else if (sqlite3_config(SQLITE_CONFIG_LOG, errorLogCallback, NULL) != 0)
			rprintf(log_code, "Failed to set errorLogCallback: %s\n", sqlite3_errmsg(dbh.sqlite));
	}
#endif

	while (1) {
		int open_flags = SQLITE_OPEN_READWRITE;
		if (db_init)
			open_flags |= SQLITE_OPEN_CREATE;
		if (DEBUG_GTE(DB, 1))
			rprintf(FCLIENT, "[%s] opening %s (%d)\n", who_am_i(), dbname, open_flags);
		if ((rc = sqlite3_open_v2(dbname, &dbh.sqlite, open_flags, NULL)) == 0) {
			break;
		}
		if (DEBUG_GTE(DB, 4)) {
			rprintf(FCLIENT, "[%s] sqlite3_open_v2(%s,,%d,NULL) returned %d\n",
				who_am_i(), dbname, open_flags, rc);
		}
		if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED)
			break;
		if (++lock_failures > MAX_LOCK_FAILURES)
			break;
		msleep(LOCK_FAIL_MSLEEP);
	}

	if (rc) {
		rprintf(log_code, "Unable to connect to DB: %s (%d)\n", sqlite3_errmsg(dbh.sqlite), rc);
		return 0;
	}

	if (db_init) {
		char *sql;
		if (db_output_msgs)
			rprintf(FCLIENT, "Dropping old tables (if they exist) ...\n");
		if (!run_sql("DROP TABLE IF EXISTS disk")
		 || !run_sql("DROP TABLE IF EXISTS inode_map"))
			exit_cleanup(RERR_IPC);

		if (db_output_msgs)
			rprintf(FCLIENT, "Creating empty tables ...\n");
		sql="CREATE TABLE disk (\n"
		    "  disk_id integer NOT NULL PRIMARY KEY AUTOINCREMENT,\n"
		    "  host varchar(128) NOT NULL default 'localhost',\n"
		    "  mount_uniq varchar(128) default NULL,\n"
		    "  devno bigint NOT NULL,\n" /* This is 0 when not mounted */
		    "  last_seen bigint NOT NULL,\n"
		    "  UNIQUE (host, mount_uniq)\n"
		    ")";
		if (!run_sql(sql))
			exit_cleanup(RERR_IPC);

		sql="CREATE TABLE inode_map (\n"
		    "  disk_id integer NOT NULL,\n"
		    "  ino bigint NOT NULL,\n"
		    "  size bigint NOT NULL,\n"
		    "  mtime bigint NOT NULL,\n"
		    "  ctime bigint NOT NULL,\n"
		    "  sum_type tinyint NOT NULL default '0',\n"
		    "  checksum binary(16) NOT NULL,\n"
		    "  PRIMARY KEY (disk_id,ino,sum_type)\n"
		    ")";
		if (!run_sql(sql))
			exit_cleanup(RERR_IPC);

#if SQLITE_VERSION_NUMBER >= 3007000
		/* Using WAL locking makes concurrency much better (requires sqlite 3.7.0). */
		sql="PRAGMA journal_mode = wal";
		run_sql(sql); /* We don't check this for success. */
#endif

		if (!db_mounts)
			exit_cleanup(0);
	}

	if (db_mounts) {
		if (!prepare_sqlite_queries(PREP_MOUNT))
			exit_cleanup(RERR_IPC);
		update_mounts();
		exit_cleanup(0);
	}

	if (!prepare_sqlite_queries(PREP_NORM)) {
		db_disconnect(False);
		return 0;
	}

	return 1;
}
#endif

int db_connect(int select_many)
{
	select_many_sums = select_many;

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL:
		if (db_connect_mysql())
			return 1;
		break;
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE:
		if (db_connect_sqlite())
			return 1;
		break;
#endif
	}

	db_disconnect(False);

	return 0;
}

void db_disconnect(BOOL commit)
{
	int ndx;

	if (!dbh.all)
		return;

	if (transaction_state > 0) {
		if (DEBUG_GTE(DB, 1)) {
			rprintf(FCLIENT, "[%s] %s our DB transaction\n",
				who_am_i(), commit ? "Committing" : "Rolling back");
		}
		transaction_state = 0;
		if (commit)
			run_sql("COMMIT");
		else
			run_sql("ROLLBACK");
	}

	if (DEBUG_GTE(DB, 1))
		rprintf(FCLIENT, "[%s] Disconnecting from the DB\n", who_am_i());

	for (ndx = 0; ndx < MAX_PREP_CNT; ndx++) {
		if (statements[ndx].all) {
			switch (use_db) {
#ifdef USE_MYSQL
			case DB_TYPE_MYSQL:
				mysql_stmt_close(statements[ndx].mysql);
				break;
#endif
#ifdef USE_SQLITE
			case DB_TYPE_SQLITE:
				sqlite3_finalize(statements[ndx].sqlite);
				break;
#endif
			}
			statements[ndx].all = NULL;
		}
	}

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL:
		mysql_close(dbh.mysql);
		break;
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE:
		sqlite3_close(dbh.sqlite);
		break;
#endif
	}

	dbh.all = NULL;
	use_db = DB_TYPE_NONE;
}

#ifdef USE_MYSQL
static MYSQL_STMT *exec_mysql(int ndx)
{
	MYSQL_STMT *stmt = statements[ndx].mysql;
	int rc;

	if ((rc = mysql_stmt_execute(stmt)) == CR_SERVER_LOST) {
		db_disconnect(False);
		use_db = DB_TYPE_MYSQL;
		if (db_connect(select_many_sums)) {
			stmt = statements[ndx].mysql;
			rc = mysql_stmt_execute(stmt);
		}
	}
	if (rc != 0) {
		rprintf(log_code, "SQL execute failed: %s\n", mysql_stmt_error(stmt));
		return NULL;
	}

	return stmt;
}
#endif

#ifdef USE_MYSQL
/* This stores up to max_rows into the values pointed to by the bind data arrays.
 * If max_rows is > 1, then all the buffer pointers MUST be set to an array long
 * enough to hold the max count of rows.  The buffer pointer will be incremented
 * to read additional rows (but never past the end).  If stmt_ptr is non-NULL, it
 * will be set to the "stmt" pointer IFF we didn't run out of rows before hitting
 * the max.  In this case, the caller should call mysql_stmt_fetch() to read any
 * remaining rows (the buffer pointers will point at the final array element) and
 * then call mysql_stmt_free_result().  If *stmt_ptr is a NULL value, there were
 * not enough rows to fill the max_rows arrays, and the stmt was already freed. */
static int fetch_mysql(MYSQL_BIND *binds, int bind_cnt, int ndx, int max_rows, MYSQL_STMT **stmt_ptr)
{
	MYSQL_STMT *stmt;
	int i, rc, rows = 0;

	if (bind_cnt > MAX_RESULT_BINDS) {
		fprintf(stderr, "Internal error: MAX_RESULT_BINDS overflow\n");
		exit_cleanup(RERR_UNSUPPORTED);
	}

	if ((stmt = exec_mysql(ndx)) == NULL)
		return 0;

	for (i = 0; i < bind_cnt; i++) {
		binds[i].is_null = &result_is_null[i];
		binds[i].length = &result_length[i];
		binds[i].error = &result_error[i];
	}
	mysql_stmt_bind_result(stmt, binds);

	while (rows < max_rows) {
		if ((rc = mysql_stmt_fetch(stmt)) != 0) {
			if (rc != MYSQL_NO_DATA)
				rprintf(log_code, "SELECT fetch failed: %s\n", mysql_stmt_error(stmt));
			break;
		}
		if (++rows >= max_rows)
			break;
		for (i = 0; i < bind_cnt; i++) {
			switch (binds[i].buffer_type) {
			case MYSQL_TYPE_BLOB:
			case MYSQL_TYPE_STRING:
			    binds[i].buffer += binds[i].buffer_length;
			    break;
			case MYSQL_TYPE_LONG:
			    binds[i].buffer += sizeof (int);
			    break;
			case MYSQL_TYPE_LONGLONG:
			    binds[i].buffer += sizeof (int64);
			    break;
			default:
			    fprintf(stderr, "Unknown MYSQL_TYPE_* in multi-row read: %d.\n", binds[i].buffer_type);
			    exit_cleanup(RERR_UNSUPPORTED);
			}
		}
	}

	if (!stmt_ptr || rows < max_rows) {
		mysql_stmt_free_result(stmt);
		stmt = NULL;
	}
	if (stmt_ptr)
		*stmt_ptr = stmt;

	return rows;
}
#endif

#if defined USE_MYSQL || defined USE_SQLITE
static void update_mounts(void)
{
	char buf[2048], *argv[2];
	int f_from, f_to, len;
	STRUCT_STAT st;
	int pid, status;

	if (DEBUG_GTE(DB, 2))
		printf("Running %s to grab mount info\n", mount_helper_script);
	argv[0] = mount_helper_script;
	argv[1] = NULL;
	pid = piped_child(argv, &f_from, &f_to);
	close(f_to);

	bind_mtime = time(NULL); /* abuse mtime slightly to hold our last_seen value */

	/* Strict format has 2 items with one tab as separator: MOUNT_UNIQ\tPATH */
	while ((len = read_line(f_from, buf, sizeof buf, 0)) > 0) {
		char *mount_uniq, *path;

		if (DEBUG_GTE(DB, 3))
			printf("Parsing mount info: %s\n", buf);
		mount_uniq = strtok(buf, "\t");
		path = mount_uniq ? strtok(NULL, "\r\n") : NULL;
		if (!path) {
			fprintf(stderr, "Failed to parse line from %s output\n", mount_helper_script);
			exit_cleanup(RERR_SYNTAX);
		}

		if (lstat(path, &st) < 0) {
			fprintf(stderr, "Failed to lstat(%s): %s\n", path, strerror(errno));
			exit_cleanup(RERR_IPC);
		}

		bind_mount_uniq_len = strlcpy(bind_mount_uniq, mount_uniq, sizeof bind_mount_uniq);
		if (bind_mount_uniq_len >= (int)sizeof bind_mount_uniq)
			bind_mount_uniq_len = sizeof bind_mount_uniq - 1;

		if (db_output_msgs) {
			printf("Marking mount \"%s\" (%s) as a recent mount\n",
				bind_mount_uniq, big_num(st.st_dev));
		}
		switch (use_db) {
#ifdef USE_MYSQL
		case DB_TYPE_MYSQL:
			bind_devno = st.st_dev;
			if (exec_mysql(INS_MOUNT) == NULL) {
				fprintf(stderr, "Failed to update mount info for \"%s\" - %s\n",
					bind_mount_uniq, mysql_error(dbh.mysql));
				exit_cleanup(RERR_IPC);
			}
			break;
#endif
#ifdef USE_SQLITE
		case DB_TYPE_SQLITE: {
			int rc, change_cnt;
			sqlite3_stmt *stmt = statements[INS_MOUNT].sqlite;
			sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
			sqlite3_bind_int64(stmt, 2, bind_mtime);
			sqlite3_bind_text(stmt, 3, bind_mount_uniq, bind_mount_uniq_len, SQLITE_STATIC);
			sqlite3_bind_int64(stmt, 4, st.st_dev);
			rc = sqlite3_step(stmt);
			if (rc != SQLITE_DONE) {
				fprintf(stderr, "Failed to insert mount info for \"%s\" - %s (%d)\n",
					bind_mount_uniq, sqlite3_errmsg(dbh.sqlite), rc);
				exit_cleanup(RERR_IPC);
			}
			change_cnt = sqlite3_changes(dbh.sqlite);
			sqlite3_reset(stmt);
			if (change_cnt == 0) {
				stmt = statements[UPD_MOUNT].sqlite;
				sqlite3_bind_int64(stmt, 1, bind_mtime);
				sqlite3_bind_int64(stmt, 2, st.st_dev);
				sqlite3_bind_text(stmt, 3, bind_thishost, bind_thishost_len, SQLITE_STATIC);
				sqlite3_bind_text(stmt, 4, bind_mount_uniq, bind_mount_uniq_len, SQLITE_STATIC);
				rc = sqlite3_step(stmt);
				if (rc != SQLITE_DONE) {
					fprintf(stderr, "Failed to update mount info for \"%s\" - %s (%d)\n",
						bind_mount_uniq, sqlite3_errmsg(dbh.sqlite), rc);
					exit_cleanup(RERR_IPC);
				}
				sqlite3_reset(stmt);
			}
			break;
		    }
#endif
		}
	}
	close(f_from);

	waitpid(pid, &status, 0);

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL: {
		if (db_output_msgs) {
			MYSQL_BIND binds[1];
			MYSQL_STMT *stmt;

			binds[0].buffer_type = MYSQL_TYPE_BLOB;
			binds[0].buffer = bind_mount_uniq;
			binds[0].buffer_length = sizeof bind_mount_uniq;
			if (fetch_mysql(binds, 1, SEL_MOUNT, 1, &stmt)) {
				while (1) {
					printf("Marking mount \"%s\" as unmounted.\n", bind_mount_uniq);
					if (mysql_stmt_fetch(stmt) != 0)
						break;
				}
				mysql_stmt_free_result(stmt);
			}
		}

		if (exec_mysql(UN_MOUNT) == NULL) {
			fprintf(stderr, "Failed to update old mount info - %s\n",
				mysql_error(dbh.mysql));
			exit_cleanup(RERR_IPC);
		}
		break;
	    }
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		sqlite3_stmt *stmt;
		int rc;

		if (db_output_msgs) {
			stmt = statements[SEL_MOUNT].sqlite;
			sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
			sqlite3_bind_int64(stmt, 2, bind_mtime);
			while (1) {
				if (sqlite3_step(stmt) != SQLITE_ROW)
					break;
				printf("Marking mount \"%s\" as unmounted.\n", sqlite3_column_text(stmt, 0));
			}
			sqlite3_reset(stmt);
		}

		stmt = statements[UN_MOUNT].sqlite;
		sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
		sqlite3_bind_int64(stmt, 2, bind_mtime);
		rc = sqlite3_step(stmt);
		sqlite3_reset(stmt);
		if (rc != SQLITE_DONE) {
			fprintf(stderr, "Failed to update old mount info - %s (%d)\n",
				sqlite3_errmsg(dbh.sqlite), rc);
			exit_cleanup(RERR_IPC);
		}
		break;
	    }
#endif
	}
}
#endif

static unsigned int get_disk_id(int64 devno)
{
	static unsigned int prior_disk_id = 0;
	static int64 prior_devno = 0;

	if (prior_devno == devno && prior_disk_id) {
		if (DEBUG_GTE(DB, 5))
			rprintf(FCLIENT, "get_disk_id(%s,%s) = %d (cached)\n", bind_thishost, big_num(devno), prior_disk_id);
		return prior_disk_id;
	}
	prior_devno = devno;

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL: {
		MYSQL_BIND binds[1];

		bind_devno = devno; /* The one changing SEL_DEV input value. */

		/* Bind where to put the output. */
		binds[0].buffer_type = MYSQL_TYPE_LONG;
		binds[0].buffer = &prior_disk_id;
		if (!fetch_mysql(binds, 1, SEL_DEV, 1, NULL))
			prior_disk_id = 0;
		break;
	    }
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		sqlite3_stmt *stmt = statements[SEL_DEV].sqlite;
		sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
		sqlite3_bind_int64(stmt, 2, devno);
		if (sqlite3_step(stmt) == SQLITE_ROW)
			prior_disk_id = sqlite3_column_int(stmt, 0);
		else
			prior_disk_id = 0;
		sqlite3_reset(stmt);
		break;
	    }
#endif
	}

	if (DEBUG_GTE(DB, 2))
		rprintf(FCLIENT, "get_disk_id(%s,%s) = %d\n", bind_thishost, big_num(devno), prior_disk_id);
	return prior_disk_id;
}

int db_get_checksum(const STRUCT_STAT *st_p, char *sum)
{
	unsigned int disk_id = get_disk_id(st_p->st_dev);
	int ok = 0;

	if (disk_id == 0)
		return 0;

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL: {
		MYSQL_BIND binds[1];

		bind_disk_id = disk_id;
		bind_ino = st_p->st_ino;
		bind_size = st_p->st_size;
		bind_mtime = st_p->st_mtime;
		if (!db_lax)
			bind_ctime = st_p->st_ctime;

		binds[0].buffer_type = MYSQL_TYPE_BLOB;
		binds[0].buffer = sum;
		binds[0].buffer_length = MD5_DIGEST_LEN;
		ok = fetch_mysql(binds, 1, SEL_SUM, 1, NULL);
		break;
	    }
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		sqlite3_stmt *stmt = statements[SEL_SUM].sqlite;
		sqlite3_bind_int(stmt, 1, disk_id);
		sqlite3_bind_int64(stmt, 2, st_p->st_ino);
		sqlite3_bind_int64(stmt, 3, st_p->st_size);
		sqlite3_bind_int64(stmt, 4, st_p->st_mtime);
		if (!db_lax)
			sqlite3_bind_int64(stmt, 5, st_p->st_ctime);
		if (sqlite3_step(stmt) == SQLITE_ROW) {
			int len = sqlite3_column_bytes(stmt, 0);
			if (len > MAX_DIGEST_LEN)
				len = MAX_DIGEST_LEN;
			memcpy(sum, sqlite3_column_blob(stmt, 0), len);
			ok = 1;
		}
		sqlite3_reset(stmt);
		break;
	    }
#endif
	}

	if (DEBUG_GTE(DB, 2)) {
		if (ok) {
			rprintf(FCLIENT, "[%s] Found DB checksum for %s,%s,%d: %s\n",
				who_am_i(), big_num(st_p->st_dev),
				big_num(st_p->st_ino), md_num, sum_as_hex(md_num, sum, 0));
		} else {
			rprintf(FCLIENT, "[%s] No DB checksum for %s,%s,%d\n",
				who_am_i(), big_num(st_p->st_dev),
				big_num(st_p->st_ino), md_num);
		}
	}

	return ok;
}

int db_get_both_checksums(const STRUCT_STAT *st_p, int *right_sum_cnt, int *wrong_sum_cnt, char **sum4, char **sum5)
{
	static char dbsum[MD5_DIGEST_LEN*2];
	int rows, j, sum_type[2];
	int64 dbsize[2], dbmtime[2], dbctime[2];
	unsigned int disk_id = get_disk_id(st_p->st_dev);

	if (disk_id == 0)
		return 0;

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL: {
		MYSQL_BIND binds[5];

		bind_disk_id = disk_id;
		bind_ino = st_p->st_ino;

		binds[0].buffer_type = MYSQL_TYPE_BLOB;
		binds[0].buffer = dbsum;
		binds[0].buffer_length = MD5_DIGEST_LEN;
		binds[1].buffer_type = MYSQL_TYPE_LONG;
		binds[1].buffer = (char*)sum_type;
		binds[2].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[2].buffer = (char*)dbsize;
		binds[3].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[3].buffer = (char*)dbmtime;
		binds[4].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[4].buffer = (char*)dbctime;
		rows = fetch_mysql(binds, 5, SEL_SUM, 2, NULL);
		break;
	    }
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		sqlite3_stmt *stmt = statements[SEL_SUM].sqlite;
		sqlite3_bind_int(stmt, 1, disk_id);
		sqlite3_bind_int64(stmt, 2, st_p->st_ino);
		for (j = 0; j < 2; j++) {
			int len;
			if (sqlite3_step(stmt) != SQLITE_ROW)
				break;
			len = sqlite3_column_bytes(stmt, 0);
			if (len > MD5_DIGEST_LEN)
				len = MD5_DIGEST_LEN;
			memcpy(dbsum + MD5_DIGEST_LEN*j, sqlite3_column_blob(stmt, 0), len);
			sum_type[j] = sqlite3_column_int(stmt, 1);
			dbsize[j] = sqlite3_column_int(stmt, 2);
			dbmtime[j] = sqlite3_column_int64(stmt, 3);
			dbctime[j] = sqlite3_column_int64(stmt, 4);
		}
		sqlite3_reset(stmt);
		rows = j;
		break;
	    }
#endif
	default:
		return 0;
	}

	if (sum4)
		*sum4 = NULL;
	if (sum5)
		*sum5 = NULL;
	*right_sum_cnt = *wrong_sum_cnt = 0;
	for (j = 0; j < rows; j++) {
		if (DEBUG_GTE(DB, 3)) {
			rprintf(FCLIENT, "DB checksum for %s,%s,%d: %s\n",
				big_num(st_p->st_dev), big_num(st_p->st_ino), sum_type[j],
				sum_as_hex(sum_type[j], dbsum + MD5_DIGEST_LEN*j, 0));
		}

		if (sum_type[j] == 4) {
			if (!sum4)
				continue;
			*sum4 = dbsum + MD5_DIGEST_LEN*j;
		} else {
			if (!sum5)
				continue;
			*sum5 = dbsum + MD5_DIGEST_LEN*j;
		}
		if (st_p->st_size == dbsize[j] && st_p->st_mtime == dbmtime[j] && (db_lax || st_p->st_ctime == dbctime[j]))
			++*right_sum_cnt;
		else
			++*wrong_sum_cnt;
	}

	return rows;
}

int db_set_checksum(int mdnum, const STRUCT_STAT *st_p, const char *sum)
{
	unsigned int disk_id;
	const char *errmsg = NULL;
	int rc = 0;

	if (am_receiver || (am_generator && same_db)) {
		/* Forward the setting to a single process.  The receiver always
		 * forwards to the generator, and the generator will forward to
		 * the sender ONLY if this is a local transfer. */
		char data[MSG_CHECKSUM_LEN];
		SIVAL64(data, 0, st_p->st_dev);
		SIVAL64(data, 8, st_p->st_ino);
		SIVAL64(data, 16, st_p->st_size);
		SIVAL64(data, 24, st_p->st_mtime);
		SIVAL64(data, 32, st_p->st_ctime);
#if MSG_CHECKSUM_LONGS != 5
#error Fix the setting of checksum long values
#endif
		SIVAL(data, MSG_CHECKSUM_LONGS*8, mdnum);
		memcpy(data + MSG_CHECKSUM_LONGS*8 + 4, sum, MAX_DIGEST_LEN);
		return send_msg(MSG_CHECKSUM, data, sizeof data, 0);
	}

	if ((disk_id = get_disk_id(st_p->st_dev)) == 0)
		return 0;

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL:
		if (transaction_state == 0) {
			if (!run_sql("BEGIN"))
				return 0;
			transaction_state = 1;
		}

		bind_disk_id = disk_id;
		bind_ino = st_p->st_ino;
		bind_mdnum = mdnum;
		bind_size = st_p->st_size;
		bind_mtime = st_p->st_mtime;
		bind_ctime = st_p->st_ctime;
		memcpy(bind_sum, sum, MD5_DIGEST_LEN);
		if (exec_mysql(REP_SUM) == NULL)
			errmsg = mysql_error(dbh.mysql);
		break;
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		sqlite3_stmt *stmt = statements[REP_SUM].sqlite;
		int lock_failures = 0;

		if (transaction_state == 0) {
			if (!run_sql("BEGIN"))
				return 0;
			transaction_state = 1;
		}

		sqlite3_bind_int(stmt, 1, disk_id);
		sqlite3_bind_int64(stmt, 2, st_p->st_ino);
		sqlite3_bind_int(stmt, 3, mdnum);
		sqlite3_bind_int64(stmt, 4, st_p->st_size);
		sqlite3_bind_int64(stmt, 5, st_p->st_mtime);
		sqlite3_bind_int64(stmt, 6, st_p->st_ctime);
		sqlite3_bind_blob(stmt, 7, sum, MD5_DIGEST_LEN, SQLITE_TRANSIENT);
		while (1) {
			rc = sqlite3_step(stmt);
			if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED)
				break;
			if (++lock_failures > MAX_LOCK_FAILURES)
				break;
			sqlite3_reset(stmt);
			msleep(LOCK_FAIL_MSLEEP);
		}
		if (rc != SQLITE_DONE)
			errmsg = sqlite3_errmsg(dbh.sqlite);
		sqlite3_reset(stmt);
		break;
	    }
#endif
	}

	if (!errmsg) {
		if (DEBUG_GTE(DB, 2)) {
			rprintf(FCLIENT, "[%s] Set DB checksum for %s,%s,%d: %s\n",
				who_am_i(), big_num(st_p->st_dev), big_num(st_p->st_ino),
				md_num, sum_as_hex(md_num, sum, 0));
		}
	} else {
		rprintf(log_code, "[%s] Failed to set checksum for %s,%s,%d: %s (%d) -- closing DB\n",
			who_am_i(), big_num(st_p->st_dev), big_num(st_p->st_ino),
			md_num, errmsg, rc);
		db_disconnect(False);
	}

	return errmsg ? 0 : 1;
}

/* For a delayed-update copy, we set the checksum on the file when it was
 * inside the partial-dir.  Since renaming the file changes its ctime, we need
 * to update the ctime to its new value (we can skip this in db_lax mode). */
int db_update_ctime(UNUSED(int mdnum), const STRUCT_STAT *st_p)
{
	unsigned int disk_id = get_disk_id(st_p->st_dev);

	if (disk_id == 0)
		return 0;

	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL:
		bind_ctime = st_p->st_ctime;
		bind_disk_id = disk_id;
		bind_ino = st_p->st_ino;
		bind_mdnum = mdnum;
		bind_size = st_p->st_size;
		bind_mtime = st_p->st_mtime;
		return exec_mysql(UPD_CTIME) != NULL;
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		int rc;

		sqlite3_stmt *stmt = statements[UPD_CTIME].sqlite;
		if (stmt == NULL)
			return 0;
		sqlite3_bind_int64(stmt, 1, st_p->st_ctime);
		sqlite3_bind_int(stmt, 2, disk_id);
		sqlite3_bind_int64(stmt, 3, st_p->st_ino);
		sqlite3_bind_int(stmt, 4, mdnum);
		sqlite3_bind_int64(stmt, 5, st_p->st_size);
		sqlite3_bind_int64(stmt, 6, st_p->st_mtime);
		rc = sqlite3_step(stmt);
		sqlite3_reset(stmt);
		return rc == SQLITE_DONE;
	    }
#endif
	}

	return 0;
}

static int db_clean_init(void)
{
	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL: {
		MYSQL_BIND binds[MAX_BIND_CNT];
		char *sql;

		mysql_query(dbh.mysql,
			"CREATE TEMPORARY TABLE inode_present ("
			" disk_id integer unsigned NOT NULL,"
			" ino bigint unsigned NOT NULL,"
			" PRIMARY KEY (disk_id,ino)"
			") ENGINE=MEMORY"
			);

		sql="INSERT IGNORE INTO inode_present"
		    " SET disk_id = ?, ino = ?";
		memset(binds, 0, sizeof binds);
		binds[0].buffer_type = MYSQL_TYPE_LONG;
		binds[0].buffer = &bind_disk_id;
		binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[1].buffer = &bind_ino;
		if (!prepare_mysql(INS_PRESENT, binds, 2, sql))
			exit_cleanup(RERR_SYNTAX);

		sql="DELETE m.*"
		    " FROM inode_map AS m"
		    " LEFT JOIN inode_present AS p USING(disk_id, ino)"
		    " JOIN disk AS d ON(m.disk_id = d.disk_id)"
		    " WHERE host = ? AND devno != 0 AND p.disk_id IS NULL AND ctime < ?";
		memset(binds, 0, sizeof binds);
		binds[0].buffer_type = MYSQL_TYPE_STRING;
		binds[0].buffer = &bind_thishost;
		binds[0].buffer_length = bind_thishost_len;
		binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
		binds[1].buffer = &bind_ctime;
		if (!prepare_mysql(DEL_SUMS, binds, 2, sql))
			exit_cleanup(RERR_SYNTAX);

		return 1;
	    }
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		char *sql;
		sql="ATTACH DATABASE '' AS aux1;"; /* Private temp DB, probably in-memory */
		if (!run_sql(sql))
			exit_cleanup(RERR_IPC);

		sql="CREATE TABLE aux1.inode_present ("
		    " disk_id integer NOT NULL,"
		    " ino bigint NOT NULL,"
		    " PRIMARY KEY (disk_id,ino)"
		    ")";
		if (!run_sql(sql))
			exit_cleanup(RERR_IPC);

		sql="INSERT OR IGNORE INTO aux1.inode_present"
		    " (disk_id, ino)"
		    " VALUES (?, ?)";
		if (!prepare_sqlite(INS_PRESENT, sql))
			exit_cleanup(RERR_IPC);

		sql="DELETE FROM inode_map"
		    " WHERE ROWID IN ("
		    "  SELECT m.ROWID"
		    "  FROM inode_map AS m"
		    "  LEFT JOIN aux1.inode_present AS p USING(disk_id, ino)"
		    "  JOIN disk AS d ON(m.disk_id = d.disk_id)"
		    "  WHERE host = ? AND devno != 0 AND p.disk_id IS NULL AND ctime < ?"
		    " )";
		if (!prepare_sqlite(DEL_SUMS, sql))
			exit_cleanup(RERR_IPC);

		transaction_state = -1; /* bug work-around -- force transaction off when cleaning XXX */

		return 1;
	    }
#endif
	}

	return 0;
}

static int db_note_present(UNUSED(int disk_id), UNUSED(int64 ino))
{
	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL:
		bind_disk_id = disk_id;
		bind_ino = ino;
		return exec_mysql(INS_PRESENT) != NULL;
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		int rc;
		sqlite3_stmt *stmt = statements[INS_PRESENT].sqlite;
		sqlite3_bind_int(stmt, 1, disk_id);
		sqlite3_bind_int64(stmt, 2, ino);
		rc = sqlite3_step(stmt);
		sqlite3_reset(stmt);
		return rc == SQLITE_DONE;
	    }
#endif
	}

	return 0;
}

/* This function requires the user to have populated all disk_id+inode pairs
 * into the inode_present table. */
static int db_clean_inodes(UNUSED(time_t start_time))
{
	int del_cnt = 0;

	/* The extra ctime < start_time check ensures that brand-new checksums that
	 * were added after the start of our cleaning run are not removed. */
	switch (use_db) {
#ifdef USE_MYSQL
	case DB_TYPE_MYSQL: {
		MYSQL_STMT *stmt;
		bind_ctime = start_time;
		stmt = exec_mysql(DEL_SUMS);
		if (stmt != NULL)
			del_cnt = mysql_affected_rows(dbh.mysql);
		break;
	    }
#endif
#ifdef USE_SQLITE
	case DB_TYPE_SQLITE: {
		int rc;
		sqlite3_stmt *stmt = statements[DEL_SUMS].sqlite;
		sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
		sqlite3_bind_int64(stmt, 2, start_time);
		rc = sqlite3_step(stmt);
		if (rc == SQLITE_DONE)
			del_cnt = sqlite3_changes(dbh.sqlite);
		sqlite3_reset(stmt);
		break;
	    }
#endif
	}

	return del_cnt;
}

static int abs_path(char *buf, int bufsiz, const char *curdir, const char *dir)
{
	if (*dir == '/')
		strlcpy(buf, dir, bufsiz);
	else {
		int len = snprintf(buf, bufsiz, "%s/%s", curdir, dir);
		assert(len > 0); /* silence a compiler warning */
	}

	return clean_fname(buf, CFN_DROP_TRAILING_DOT_DIR | CFN_COLLAPSE_DOT_DOT_DIRS);
}

static struct name_list *new_name(const char *basename, const char *filename)
{
	struct name_list *n;
	int blen = strlen(basename);
	int slen = filename ? (int)strlen(filename) : -1;
	int len = blen + 1 + slen;

	if (len >= MAXPATHLEN) {
		if (filename)
			rprintf(FERROR, "Filename too long: %s/%s\n", basename, filename);
		else
			rprintf(FERROR, "Filename too long: %s\n", basename);
		return NULL;
	}

	n = (struct name_list *)new_array(char, sizeof (struct name_list) + len);

	memcpy(n->name, basename, blen);
	if (filename) {
		n->name[blen] = '/';
		memcpy(n->name + 1 + blen, filename, slen);
	}
	n->name[len] = '\0';
	n->next = NULL;

	return n;
}

static int name_compare(const void *n1, const void *n2)
{
	struct name_list *p1 = *(struct name_list **)n1;
	struct name_list *p2 = *(struct name_list **)n2;
	return strcmp(p1->name, p2->name);
}

static struct name_list *get_sorted_names(const char *dir)
{
	struct name_list *add, **sortbuf, *names = NULL, *prior_name = NULL;
	struct dirent *di;
	int cnt = 0;
	DIR *d;

	if (!(d = opendir("."))) {
		rprintf(FERROR, "Unable to opendir %s: %s\n", dir, strerror(errno));
		return NULL;
	}
	while ((di = readdir(d)) != NULL) {
		char *dname = d_name(di);
		if (dname[0] == '.' && (dname[1] == '\0' || (dname[1] == '.' && dname[2] == '\0')))
			continue;
		if (!(add = new_name(dname, NULL)))
			continue;
		if (prior_name)
			prior_name->next = add;
		else
			names = add;
		prior_name = add;
		cnt++;
	}
	closedir(d);

	if (cnt) {
		int j;

		sortbuf = new_array(struct name_list *, cnt);
		for (j = 0; j < cnt; j++) {
			sortbuf[j] = names;
			names = names->next;
		}

		qsort(sortbuf, cnt, PTR_SIZE, name_compare);

		names = prior_name = NULL;
		for (j = 0; j < cnt; j++) {
			add = sortbuf[j];
			if (prior_name)
				prior_name->next = add;
			else
				names = add;
			prior_name = add;
		}

		if (prior_name)
			prior_name->next = NULL;
		free(sortbuf);
	}

	return names;
}

static inline int sums_ne(const char *sum1, const char *sum2)
{
	return memcmp(sum1, sum2, MD5_DIGEST_LEN) != 0;
}

/* Returns 1 if there is a checksum change, else 0. */
static int mention_file(const char *dir, const char *name, int right_cnt, int wrong_cnt,
			const char *dbsum4, const char *dbsum5, const char *sum4, const char *sum5)
{
	char *info_str = wrong_cnt && !right_cnt ? "!i " : "   ";
	char *md4_str = !db_do_md4 ? NULL : !dbsum4 ? "+4 " : !sum4 ? "?4 " : sums_ne(sum4, dbsum4) ? "!4 " : "   ";
	char *md5_str = !db_do_md5 ? NULL : !dbsum5 ? "+5 " : !sum5 ? "?5 " : sums_ne(sum5, dbsum5) ? "!5 " : "   ";
	int chg = *info_str != ' ' || (md4_str && *md4_str != ' ') || (md5_str && *md5_str != ' ');
	if (chg || db_output_unchanged) {
		if (db_output_info) {
			fputs(info_str, stdout);
			if (md4_str)
				fputs(md4_str, stdout);
			if (md5_str)
				fputs(md5_str, stdout);
		}
		if (db_output_sum) {
			if (db_do_md4)
				printf("%s ", sum_as_hex(4, sum4, 0));
			if (db_do_md5)
				printf("%s ", sum_as_hex(5, sum5, 0));
		}
		if (db_output_name) {
			if (db_output_sum)
				putchar(' '); /* We want 2 spaces, like md5sum. */
			if (*dir != '.' || dir[1]) {
				fputs(dir, stdout);
				putchar('/');
			}
			puts(name);
		}
	}

	return chg;
}

NORETURN void run_dbonly(const char **args)
{
	char start_dir[MAXPATHLEN], dirbuf[MAXPATHLEN];
	int need_sum_cnt, start_dir_len;
	struct name_list *prior_dir;
	struct name_list *names;
	time_t clean_start = 0;
	int exit_code = 0;

	checksum_type = 5;

	need_sum_cnt = db_do_md4 + db_do_md5;

	if (!db_read_config(FERROR, db_config) || !db_connect(1))
		exit_cleanup(RERR_FILEIO);

	if (db_clean) {
		clean_start = time(NULL);
		db_clean_init();
	}

	if (getcwd(start_dir, sizeof start_dir - 1) == NULL) {
		rsyserr(FERROR, errno, "getcwd()");
		exit_cleanup(RERR_FILESELECT);
	}
	start_dir_len = strlen(start_dir);

	if (args) {
		prior_dir = NULL;
		while (*args) {
			struct name_list *add;
			if (abs_path(dirbuf, sizeof dirbuf, start_dir, *args++) <= 0)
				continue;
			if (!(add = new_name(dirbuf, NULL)))
				continue;
			if (prior_dir)
				prior_dir->next = add;
			else
				dirs_list = add;
			prior_dir = add;
		}
	} else
		dirs_list = new_name(start_dir, NULL);

	prior_dir = NULL;
	while (dirs_list) {
		struct name_list *subdirs, *prior_subdir, *prior_name;
		const char *dir = dirs_list->name;
		const char *reldir = dir;

		if (prior_dir)
			free((void*)prior_dir);
		prior_dir = dirs_list;
		dirs_list = dirs_list->next;

		if (strncmp(reldir, start_dir, start_dir_len) == 0) {
			if (reldir[start_dir_len] == '\0')
				reldir = ".";
			else if (reldir[start_dir_len] == '/')
				reldir += start_dir_len + 1;
		}
		if (db_output_dirs)
			printf("... %s/ ...\n", reldir);

		if (chdir(dir) < 0) {
			rprintf(FERROR, "Unable to chdir to %s: %s\n", dir, strerror(errno));
			continue;
		}
		if (!(names = get_sorted_names(dir)))
			continue;

		subdirs = prior_subdir = prior_name = NULL;
		while (names) {
			STRUCT_STAT st;
			char *dbsum4, *sum4, sumbuf4[MD5_DIGEST_LEN];
			char *dbsum5, *sum5, sumbuf5[MD5_DIGEST_LEN];
			int right_sum_cnt, wrong_sum_cnt;
			const char *name = names->name;
			unsigned int disk_id;

			if (prior_name)
				free((void*)prior_name);
			prior_name = names;
			names = names->next;

			dbsum4 = dbsum5 = sum4 = sum5 = NULL;

			if (lstat(name, &st) < 0) {
				rprintf(FERROR, "Failed to lstat(%s): %s\n", name, strerror(errno));
				continue;
			}
			if (S_ISLNK(st.st_mode))
				continue;
			if (S_ISDIR(st.st_mode)) {
				/* add optional excluding of things like /^(CVS|\.svn|\.git|\.bzr)$/; */
				if (recurse) {
					struct name_list *add = new_name(dir, name);
					if (add) {
						if (prior_subdir)
							prior_subdir->next = add;
						else
							subdirs = add;
						prior_subdir = add;
					}
				}
				continue;
			}
			if (!S_ISREG(st.st_mode))
				continue;

			if (!(disk_id = get_disk_id(st.st_dev)))
				continue;
			if (db_clean) {
				db_note_present(disk_id, st.st_ino);
				if (!db_update && !db_check)
					continue;
			}
			db_get_both_checksums(&st, &right_sum_cnt, &wrong_sum_cnt,
					      db_do_md4 ? &dbsum4 : NULL, db_do_md5 ? &dbsum5 : NULL);

			if (!db_check && right_sum_cnt == need_sum_cnt) {
				mention_file(reldir, name, right_sum_cnt, wrong_sum_cnt, dbsum4, dbsum5, dbsum4, dbsum5);
				continue;
			}

			if (db_update || (db_check && right_sum_cnt) || db_output_sum) {
				uchar *data;
				int32 remainder;
				md_context m4;
				MD5_CTX m5;
				struct map_struct *buf;
				OFF_T off, len = st.st_size;
				int fd = do_open(name, O_RDONLY, 0);

				if (fd < 0) {
					rprintf(FERROR, "ERROR: unable to read %s: %s\n", name, strerror(errno));
					continue;
				}

				if (db_do_md4)
					mdfour_begin(&m4);
				if (db_do_md5)
					MD5_Init(&m5);

				buf = map_file(fd, len, MAX_MAP_SIZE, CSUM_CHUNK);

				for (off = 0; off + CSUM_CHUNK <= len; off += CSUM_CHUNK) {
					data = (uchar*)map_ptr(buf, off, CSUM_CHUNK);
					if (db_do_md4)
						mdfour_update(&m4, data, CSUM_CHUNK);
					if (db_do_md5)
						MD5_Update(&m5, data, CSUM_CHUNK);
				}

				remainder = (int32)(len - off);
				data = (uchar*)map_ptr(buf, off, remainder);
				if (db_do_md4) {
					mdfour_update(&m4, data, remainder);
					mdfour_result(&m4, (uchar*)(sum4 = sumbuf4));
				}
				if (db_do_md5) {
					MD5_Update(&m5, data, remainder);
					MD5_Final((uchar*)(sum5 = sumbuf5), &m5);
				}

				close(fd);
				unmap_file(buf);
			}

			int chg = mention_file(reldir, name, right_sum_cnt, wrong_sum_cnt, dbsum4, dbsum5, sum4, sum5);
			if (!chg) {
				/* Only db_check should get here... */
			} else if (!db_update) {
				exit_code = 1;
			} else {
				int fail = 0;
				if (db_do_md4 && !db_set_checksum(4, &st, sum4))
					fail = 1;
				if (db_do_md5 && !db_set_checksum(5, &st, sum5))
					fail = 1;
				if (fail) {
					fprintf(stderr, "Failed to set checksum on %s/%s\n", reldir, name);
					exit_cleanup(RERR_FILEIO);
				}
			}
		}
		if (prior_name)
			free((void*)prior_name);

		if (recurse && subdirs) {
			prior_subdir->next = dirs_list;
			dirs_list = subdirs;
		}
	}
	if (prior_dir)
		free((void*)prior_dir);

	if (db_clean) {
		int rows = db_clean_inodes(clean_start);
		if (db_output_msgs)
			printf("Cleaned out %d old inode%s.\n", rows, rows == 1 ? "" : "s");
	}

	db_disconnect(True);
	exit(exit_code);
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>