/* * Routines to access extended file info via DB. * * Copyright (C) 2008-2013 Wayne Davison * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License along * with this program; if not, visit the http://fsf.org website. */ #include "rsync.h" #include "ifuncs.h" #include "itypes.h" #include "inums.h" #ifdef USE_OPENSSL #include "openssl/md4.h" #include "openssl/md5.h" #endif extern int recurse; extern int same_db; extern int am_receiver; extern int am_generator; extern int checksum_type; extern int db_clean, db_check, db_do_md4, db_do_md5, db_update, db_lax, db_init, db_mounts; extern int db_output_name, db_output_sum, db_output_info, db_output_unchanged, db_output_dirs, db_output_msgs; extern int saw_db_output_opt, saw_db_sum_opt; extern char *db_config; #define MOUNT_HELPER_SCRIPT "/usr/sbin/rsyncdb-mountinfo" #if defined HAVE_MYSQL_MYSQL_H && defined HAVE_LIBMYSQLCLIENT #define USE_MYSQL #include #include #endif #if defined HAVE_SQLITE3_H && defined HAVE_LIBSQLITE3 #define USE_SQLITE #include #ifndef HAVE_SQLITE3_OPEN_V2 #define sqlite3_open_v2(dbname, dbhptr, flags, vfs) \ sqlite3_open(dbname, dbhptr) #endif #ifndef HAVE_SQLITE3_PREPARE_V2 #define sqlite3_prepare_v2 sqlite3_prepare #endif #define MAX_LOCK_FAILURES 10 #define LOCK_FAIL_MSLEEP 100 #endif #ifndef USE_OPENSSL #define MD5_CTX md_context #define MD5_Init md5_begin #define MD5_Update md5_update #define MD5_Final(digest, cptr) md5_result(cptr, digest) #endif #define DB_TYPE_NONE 0 #define DB_TYPE_MYSQL 1 #define DB_TYPE_SQLITE 2 int use_db = DB_TYPE_NONE; int select_many_sums = 0; #define PREP_NORM 0 #define PREP_MOUNT 1 static const char *dbhost = NULL, *dbuser = NULL, *dbpass = NULL, *dbname = NULL; static unsigned int dbport = 0; static int transaction_state = -1; static union { #ifdef USE_MYSQL MYSQL *mysql; #endif #ifdef USE_SQLITE sqlite3 *sqlite; #endif void *all; } dbh; #define SEL_DEV 0 #define SEL_SUM 1 #define REP_SUM 2 #define UPD_CTIME 3 #define INS_MOUNT 4 #define UPD_MOUNT 5 /* SQLite only */ #define SEL_MOUNT 6 #define UN_MOUNT 7 #define DEL_SUMS 8 #define INS_PRESENT 9 #define MAX_PREP_CNT 10 #define MAX_BIND_CNT 7 #define MAX_RESULT_BINDS 32 static union { #ifdef USE_MYSQL MYSQL_STMT *mysql; #endif #ifdef USE_SQLITE sqlite3_stmt *sqlite; #endif void *all; } statements[MAX_PREP_CNT]; static int md_num; static enum logcode log_code; #ifdef USE_MYSQL static unsigned int bind_disk_id, bind_mdnum; static int64 bind_devno, bind_ino, bind_size, bind_mtime, bind_ctime; static char bind_sum[MAX_DIGEST_LEN]; static unsigned long result_length[MAX_RESULT_BINDS]; static bool result_is_null[MAX_RESULT_BINDS], result_error[MAX_RESULT_BINDS]; #elif defined USE_SQLITE static int64 bind_mtime; #endif static char bind_thishost[128+1]; static unsigned long bind_thishost_len; static char *mount_helper_script = NULL; static char *error_log; #if defined USE_SQLITE && defined SQLITE_CONFIG_LOG static char bind_mount_uniq[128+1]; static unsigned long bind_mount_uniq_len; static FILE *error_log_fp; #endif #define PTR_SIZE (sizeof (struct file_struct *)) #if defined USE_MYSQL || defined USE_SQLITE static void update_mounts(void); #endif struct name_list { struct name_list *next; char name[1]; } *dirs_list; int db_read_config(enum logcode code, const char *config_file) { char buf[2048], *cp; FILE *fp; int lineno = 0; log_code = code; bind_thishost_len = strlcpy(bind_thishost, "localhost", sizeof bind_thishost); if (!(fp = fopen(config_file, "r"))) { rsyserr(log_code, errno, "unable to open %s", config_file); return 0; } if (DEBUG_GTE(DB, 1)) rprintf(FCLIENT, "[%s] Reading DB config from %s\n", who_am_i(), config_file); while (fgets(buf, sizeof buf, fp)) { lineno++; if ((cp = strchr(buf, '#')) == NULL && (cp = strchr(buf, '\r')) == NULL && (cp = strchr(buf, '\n')) == NULL) cp = buf + strlen(buf); while (cp != buf && isSpace(cp-1)) cp--; *cp = '\0'; if (!*buf) continue; if (!(cp = strchr(buf, ':'))) goto invalid_line; *cp++ = '\0'; while (isSpace(cp)) cp++; if (strcasecmp(buf, "dbhost") == 0) dbhost = strdup(cp); else if (strcasecmp(buf, "dbuser") == 0) dbuser = strdup(cp); else if (strcasecmp(buf, "dbpass") == 0) dbpass = strdup(cp); else if (strcasecmp(buf, "dbname") == 0) dbname = strdup(cp); else if (strcasecmp(buf, "dbport") == 0) dbport = atoi(cp); else if (strcasecmp(buf, "transaction") == 0) transaction_state = atoi(cp) ? 0 : -1; else if (strcasecmp(buf, "mountHelper") == 0) mount_helper_script = strdup(cp); else if (strcasecmp(buf, "errlog") == 0) error_log = strdup(cp); else if (strcasecmp(buf, "thishost") == 0) bind_thishost_len = strlcpy(bind_thishost, cp, sizeof bind_thishost); else if (strcasecmp(buf, "dbtype") == 0) { #ifdef USE_MYSQL if (strcasecmp(cp, "mysql") == 0) { use_db = DB_TYPE_MYSQL; continue; } #endif #ifdef USE_SQLITE if (strcasecmp(cp, "sqlite") == 0) { use_db = DB_TYPE_SQLITE; continue; } #endif rprintf(log_code, "Unsupported dbtype on line #%d in %s.\n", lineno, config_file); use_db = DB_TYPE_NONE; return 0; } else { invalid_line: rprintf(log_code, "Invalid line #%d in %s\n", lineno, config_file); use_db = DB_TYPE_NONE; return 0; } } fclose(fp); if (bind_thishost_len >= (int)sizeof bind_thishost) bind_thishost_len = sizeof bind_thishost - 1; if (!use_db || !dbname) { rprintf(log_code, "Please specify at least dbtype and dbname in %s.\n", config_file); use_db = DB_TYPE_NONE; return 0; } md_num = checksum_type == 5 ? 5 : 4; if (error_log) { if (use_db != DB_TYPE_SQLITE) rprintf(log_code, "Ignoring errlog setting for non-SQLite DB.\n"); #ifndef SQLITE_CONFIG_LOG else rprintf(log_code, "Your sqlite doesn't support SQLITE_CONFIG_LOG.\n"); #endif } if (!mount_helper_script) mount_helper_script = MOUNT_HELPER_SCRIPT; return 1; } #if defined USE_SQLITE && defined SQLITE_CONFIG_LOG static void errorLogCallback(UNUSED(void *pArg), int iErrCode, const char *zMsg) { fprintf(error_log_fp, "[%d] %s (%d)\n", (int)getpid(), zMsg, iErrCode); } #endif static int run_sql(const char *fmt, ...) { va_list ap; char *query; int ok = 0, qlen; va_start(ap, fmt); qlen = vasprintf(&query, fmt, ap); va_end(ap); if (qlen < 0) out_of_memory("run_sql"); if (DEBUG_GTE(DB, 3)) rprintf(FCLIENT, "[%s] SQL being run: %s\n", who_am_i(), query); switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: if (mysql_query(dbh.mysql, query) < 0) { rprintf(FERROR, "Failed to run sql: %s\n", mysql_error(dbh.mysql)); rprintf(FERROR, "%s\n", query); } else ok = 1; break; #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { int rc, lock_failures = 0; while (1) { if ((rc = sqlite3_exec(dbh.sqlite, query, NULL, NULL, NULL)) == 0) break; if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED) break; if (++lock_failures > MAX_LOCK_FAILURES) break; msleep(LOCK_FAIL_MSLEEP); } if (rc) { rprintf(FERROR, "[%s] Failed to run sql: %s\n", who_am_i(), sqlite3_errmsg(dbh.sqlite)); rprintf(FERROR, "%s\n", query); } else ok = 1; break; } #endif } free(query); return ok; } #ifdef USE_MYSQL static int prepare_mysql(int ndx, MYSQL_BIND *binds, int bind_cnt, const char *fmt, ...) { va_list ap; char *query; int qlen, param_cnt; MYSQL_STMT *stmt = mysql_stmt_init(dbh.mysql); if (stmt == NULL) out_of_memory("prepare_mysql"); va_start(ap, fmt); qlen = vasprintf(&query, fmt, ap); va_end(ap); if (qlen < 0) out_of_memory("prepare_mysql"); if (DEBUG_GTE(DB, 3)) rprintf(FCLIENT, "[%s] SQL being prepared: %s\n", who_am_i(), query); if (mysql_stmt_prepare(stmt, query, qlen) != 0) { rprintf(log_code, "[%s] Prepare failed: %s\n", who_am_i(), mysql_stmt_error(stmt)); rprintf(log_code, "%s\n", query); free(query); return 0; } if ((param_cnt = mysql_stmt_param_count(stmt)) != bind_cnt) { rprintf(log_code, "[%s] Parameters in statement = %d, bind vars = %d\n", who_am_i(), param_cnt, bind_cnt); rprintf(log_code, "%s\n", query); free(query); return 0; } if (bind_cnt) mysql_stmt_bind_param(stmt, binds); statements[ndx].mysql = stmt; free(query); return 1; } #endif #ifdef USE_MYSQL static int prepare_mysql_queries(int type) { MYSQL_BIND binds[MAX_BIND_CNT]; char *sql; switch (type) { case PREP_NORM: sql="SELECT disk_id" " FROM disk" " WHERE host = ? AND devno = ?"; memset(binds, 0, sizeof binds); binds[0].buffer_type = MYSQL_TYPE_STRING; binds[0].buffer = &bind_thishost; binds[0].buffer_length = bind_thishost_len; binds[1].buffer_type = MYSQL_TYPE_LONGLONG; binds[1].buffer = &bind_devno; if (!prepare_mysql(SEL_DEV, binds, 2, sql)) return 0; memset(binds, 0, sizeof binds); binds[0].buffer_type = MYSQL_TYPE_LONG; binds[0].buffer = &bind_disk_id; binds[1].buffer_type = MYSQL_TYPE_LONGLONG; binds[1].buffer = &bind_ino; if (select_many_sums) { sql="SELECT checksum, sum_type, size, mtime, ctime" " FROM inode_map" " WHERE disk_id = ? AND ino = ?"; if (!prepare_mysql(SEL_SUM, binds, 2, sql)) return 0; } else { sql="SELECT checksum" " FROM inode_map" " WHERE disk_id = ? AND ino = ? AND sum_type = %d" " AND size = ? AND mtime = ? %s"; /* optional: AND ctime = ? */ binds[2].buffer_type = MYSQL_TYPE_LONGLONG; binds[2].buffer = &bind_size; binds[3].buffer_type = MYSQL_TYPE_LONGLONG; binds[3].buffer = &bind_mtime; if (!db_lax) { binds[4].buffer_type = MYSQL_TYPE_LONGLONG; binds[4].buffer = &bind_ctime; } if (!prepare_mysql(SEL_SUM, binds, 4 + !db_lax, sql, md_num, db_lax ? "" : "AND ctime = ?")) return 0; } sql="INSERT INTO inode_map" " SET disk_id = ?, ino = ?, sum_type = ?," " size = ?, mtime = ?, ctime = ?, checksum = ?" " ON DUPLICATE KEY" " UPDATE size = VALUES(size), mtime = VALUES(mtime)," " ctime = VALUES(ctime), checksum = VALUES(checksum)"; memset(binds, 0, sizeof binds); binds[0].buffer_type = MYSQL_TYPE_LONG; binds[0].buffer = &bind_disk_id; binds[1].buffer_type = MYSQL_TYPE_LONGLONG; binds[1].buffer = &bind_ino; binds[2].buffer_type = MYSQL_TYPE_LONG; binds[2].buffer = &bind_mdnum; binds[3].buffer_type = MYSQL_TYPE_LONGLONG; binds[3].buffer = &bind_size; binds[4].buffer_type = MYSQL_TYPE_LONGLONG; binds[4].buffer = &bind_mtime; binds[5].buffer_type = MYSQL_TYPE_LONGLONG; binds[5].buffer = &bind_ctime; binds[6].buffer_type = MYSQL_TYPE_BLOB; binds[6].buffer = &bind_sum; binds[6].buffer_length = MD5_DIGEST_LEN; /* Same as MD4_DIGEST_LEN */ if (!prepare_mysql(REP_SUM, binds, 7, sql)) return 0; sql="UPDATE inode_map" " SET ctime = ?" " WHERE disk_id = ? AND ino = ? AND sum_type = ? AND size = ? AND mtime = ?"; memset(binds, 0, sizeof binds); binds[0].buffer_type = MYSQL_TYPE_LONGLONG; binds[0].buffer = &bind_ctime; binds[1].buffer_type = MYSQL_TYPE_LONG; binds[1].buffer = &bind_disk_id; binds[2].buffer_type = MYSQL_TYPE_LONGLONG; binds[2].buffer = &bind_ino; binds[3].buffer_type = MYSQL_TYPE_LONG; binds[3].buffer = &bind_mdnum; binds[4].buffer_type = MYSQL_TYPE_LONGLONG; binds[4].buffer = &bind_size; binds[5].buffer_type = MYSQL_TYPE_LONGLONG; binds[5].buffer = &bind_mtime; if (!prepare_mysql(UPD_CTIME, binds, 6, sql)) return 0; break; case PREP_MOUNT: sql="INSERT INTO disk" " SET host = ?, last_seen = ?, mount_uniq = ?, devno = ?" " ON DUPLICATE KEY" " UPDATE last_seen = VALUES(last_seen), devno = VALUES(devno)"; memset(binds, 0, sizeof binds); binds[0].buffer_type = MYSQL_TYPE_STRING; binds[0].buffer = &bind_thishost; binds[0].buffer_length = bind_thishost_len; binds[1].buffer_type = MYSQL_TYPE_LONGLONG; binds[1].buffer = &bind_mtime; /* we abuse mtime to hold the last_seen value */ binds[2].buffer_type = MYSQL_TYPE_STRING; binds[2].buffer = &bind_mount_uniq; binds[2].buffer_length = sizeof bind_mount_uniq; binds[2].length = &bind_mount_uniq_len; binds[3].buffer_type = MYSQL_TYPE_LONGLONG; binds[3].buffer = &bind_devno; if (!prepare_mysql(INS_MOUNT, binds, 4, sql)) return 0; sql="SELECT mount_uniq" " FROM disk" " WHERE host = ? AND last_seen < ? AND devno != 0"; /* Reusing first 2 binds from INS_MOUNT */ if (!prepare_mysql(SEL_MOUNT, binds, 2, sql)) return 0; sql="UPDATE disk" " SET devno = 0" " WHERE host = ? AND last_seen < ? AND devno != 0"; /* Reusing binds from SEL_MOUNT */ if (!prepare_mysql(UN_MOUNT, binds, 2, sql)) return 0; break; } return 1; } #endif #ifdef USE_MYSQL static int db_connect_mysql(void) { const char *open_dbname = db_init ? "mysql" : dbname; if (!(dbh.mysql = mysql_init(NULL))) out_of_memory("db_read_config"); if (DEBUG_GTE(DB, 1)) { rprintf(FCLIENT, "[%s] connecting: host=%s user=%s db=%s port=%d\n", who_am_i(), dbhost, dbuser, open_dbname, dbport); } if (!mysql_real_connect(dbh.mysql, dbhost, dbuser, dbpass, open_dbname, dbport, NULL, 0)) { rprintf(log_code, "[%s] Unable to connect to DB: %s\n", who_am_i(), mysql_error(dbh.mysql)); return 0; } if (db_init) { if (db_output_msgs) rprintf(FCLIENT, "Creating DB %s (if it does not exist)\n", dbname); if (!run_sql("CREATE DATABASE IF NOT EXISTS `%s`", dbname) || !run_sql("USE `%s`", dbname)) exit_cleanup(RERR_IPC); if (db_output_msgs) rprintf(FCLIENT, "Dropping old tables (if they exist))\n"); if (!run_sql("DROP TABLE IF EXISTS disk") || !run_sql("DROP TABLE IF EXISTS inode_map")) exit_cleanup(RERR_IPC); if (db_output_msgs) rprintf(FCLIENT, "Creating empty tables ...\n"); if (!run_sql( "CREATE TABLE disk (\n" " disk_id integer unsigned NOT NULL PRIMARY KEY AUTO_INCREMENT,\n" " host varchar(128) NOT NULL default 'localhost',\n" " mount_uniq varchar(128) default NULL,\n" " devno bigint unsigned NOT NULL,\n" /* This is 0 when not mounted */ " last_seen bigint NOT NULL,\n" " UNIQUE KEY mount_lookup (host, mount_uniq),\n" " KEY dev_lookup (devno, host)\n" ")")) exit_cleanup(RERR_IPC); if (!run_sql( "CREATE TABLE inode_map (\n" " disk_id integer unsigned NOT NULL,\n" " ino bigint unsigned NOT NULL,\n" " sum_type tinyint NOT NULL default '0',\n" " size bigint unsigned NOT NULL,\n" " mtime bigint NOT NULL,\n" " ctime bigint NOT NULL,\n" " checksum binary(16) NOT NULL,\n" " PRIMARY KEY (disk_id,ino,sum_type)\n" ")")) exit_cleanup(RERR_IPC); if (!db_mounts) exit_cleanup(0); } if (db_mounts) { if (!prepare_mysql_queries(PREP_MOUNT)) exit_cleanup(RERR_IPC); update_mounts(); exit_cleanup(0); } if (!prepare_mysql_queries(PREP_NORM)) return 0; return 1; } #endif #ifdef USE_SQLITE static int prepare_sqlite(int ndx, const char *fmt, ...) { va_list ap; char *query; int rc, qlen, lock_failures = 0; va_start(ap, fmt); qlen = vasprintf(&query, fmt, ap); va_end(ap); if (qlen < 0) out_of_memory("prepare_sqlite"); if (DEBUG_GTE(DB, 3)) rprintf(FCLIENT, "[%s] SQL being prepared: %s\n", who_am_i(), query); while ((rc = sqlite3_prepare_v2(dbh.sqlite, query, -1, &statements[ndx].sqlite, NULL)) != 0) { if (DEBUG_GTE(DB, 4)) { rprintf(FCLIENT, "[%s] sqlite3_prepare_v2(,%s,,) returned %d\n", who_am_i(), query, rc); } if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED) break; if (++lock_failures > MAX_LOCK_FAILURES) break; msleep(LOCK_FAIL_MSLEEP); } if (rc) { rprintf(log_code, "[%s] Failed to prepare SQL: %s (%d)\n", who_am_i(), sqlite3_errmsg(dbh.sqlite), rc); rprintf(log_code, "%s\n", query); free(query); return 0; } free(query); return 1; } #endif #ifdef USE_SQLITE static int prepare_sqlite_queries(int type) { char *sql; switch (type) { case PREP_NORM: sql="SELECT disk_id" " FROM disk" " WHERE host = ? AND devno = ?"; if (!prepare_sqlite(SEL_DEV, sql)) return 0; if (select_many_sums) { sql="SELECT checksum, sum_type, size, mtime, ctime" " FROM inode_map" " WHERE disk_id = ? AND ino = ?"; if (!prepare_sqlite(SEL_SUM, sql)) return 0; } else { sql="SELECT checksum" " FROM inode_map" " WHERE disk_id = ? AND ino = ? AND sum_type = %d" " AND size = ? AND mtime = ? %s"; if (!prepare_sqlite(SEL_SUM, sql, md_num, db_lax ? "" : "AND ctime = ?")) return 0; } sql="INSERT OR REPLACE INTO inode_map" " (disk_id, ino, sum_type, size, mtime, ctime, checksum)" " VALUES (?, ?, ?, ?, ?, ?, ?)"; if (!prepare_sqlite(REP_SUM, sql)) return 0; sql="UPDATE inode_map" " SET ctime = ?" " WHERE disk_id = ? AND ino = ? AND sum_type = ? AND size = ? AND mtime = ?"; if (!prepare_sqlite(UPD_CTIME, sql)) return 0; break; case PREP_MOUNT: sql="INSERT OR IGNORE INTO disk" " (host, last_seen, mount_uniq, devno)" " VALUES (?, ?, ?, ?)"; if (!prepare_sqlite(INS_MOUNT, sql)) return 0; sql="UPDATE disk" " SET last_seen = ?, devno = ?" " WHERE host = ? AND mount_uniq = ?"; if (!prepare_sqlite(UPD_MOUNT, sql)) return 0; sql="SELECT mount_uniq" " FROM disk" " WHERE host = ? AND last_seen < ? AND devno != 0"; if (!prepare_sqlite(SEL_MOUNT, sql)) return 0; sql="UPDATE disk" " SET devno = 0" " WHERE host = ? AND last_seen < ? AND devno != 0"; if (!prepare_sqlite(UN_MOUNT, sql)) return 0; break; } return 1; } #endif #ifdef USE_SQLITE static int db_connect_sqlite(void) { int lock_failures = 0; int rc; #ifdef SQLITE_CONFIG_LOG if (error_log) { if (DEBUG_GTE(DB, 1)) rprintf(FCLIENT, "[%s] Setting sqlite errlog to %s\n", who_am_i(), error_log); if (!(error_log_fp = fopen(error_log, "a"))) { rsyserr(log_code, errno, "unable to append to logfile %s", error_log); error_log = NULL; } else if (sqlite3_config(SQLITE_CONFIG_LOG, errorLogCallback, NULL) != 0) rprintf(log_code, "Failed to set errorLogCallback: %s\n", sqlite3_errmsg(dbh.sqlite)); } #endif while (1) { int open_flags = SQLITE_OPEN_READWRITE; if (db_init) open_flags |= SQLITE_OPEN_CREATE; if (DEBUG_GTE(DB, 1)) rprintf(FCLIENT, "[%s] opening %s (%d)\n", who_am_i(), dbname, open_flags); if ((rc = sqlite3_open_v2(dbname, &dbh.sqlite, open_flags, NULL)) == 0) { break; } if (DEBUG_GTE(DB, 4)) { rprintf(FCLIENT, "[%s] sqlite3_open_v2(%s,,%d,NULL) returned %d\n", who_am_i(), dbname, open_flags, rc); } if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED) break; if (++lock_failures > MAX_LOCK_FAILURES) break; msleep(LOCK_FAIL_MSLEEP); } if (rc) { rprintf(log_code, "Unable to connect to DB: %s (%d)\n", sqlite3_errmsg(dbh.sqlite), rc); return 0; } if (db_init) { char *sql; if (db_output_msgs) rprintf(FCLIENT, "Dropping old tables (if they exist) ...\n"); if (!run_sql("DROP TABLE IF EXISTS disk") || !run_sql("DROP TABLE IF EXISTS inode_map")) exit_cleanup(RERR_IPC); if (db_output_msgs) rprintf(FCLIENT, "Creating empty tables ...\n"); sql="CREATE TABLE disk (\n" " disk_id integer NOT NULL PRIMARY KEY AUTOINCREMENT,\n" " host varchar(128) NOT NULL default 'localhost',\n" " mount_uniq varchar(128) default NULL,\n" " devno bigint NOT NULL,\n" /* This is 0 when not mounted */ " last_seen bigint NOT NULL,\n" " UNIQUE (host, mount_uniq)\n" ")"; if (!run_sql(sql)) exit_cleanup(RERR_IPC); sql="CREATE TABLE inode_map (\n" " disk_id integer NOT NULL,\n" " ino bigint NOT NULL,\n" " size bigint NOT NULL,\n" " mtime bigint NOT NULL,\n" " ctime bigint NOT NULL,\n" " sum_type tinyint NOT NULL default '0',\n" " checksum binary(16) NOT NULL,\n" " PRIMARY KEY (disk_id,ino,sum_type)\n" ")"; if (!run_sql(sql)) exit_cleanup(RERR_IPC); #if SQLITE_VERSION_NUMBER >= 3007000 /* Using WAL locking makes concurrency much better (requires sqlite 3.7.0). */ sql="PRAGMA journal_mode = wal"; run_sql(sql); /* We don't check this for success. */ #endif if (!db_mounts) exit_cleanup(0); } if (db_mounts) { if (!prepare_sqlite_queries(PREP_MOUNT)) exit_cleanup(RERR_IPC); update_mounts(); exit_cleanup(0); } if (!prepare_sqlite_queries(PREP_NORM)) { db_disconnect(False); return 0; } return 1; } #endif int db_connect(int select_many) { select_many_sums = select_many; switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: if (db_connect_mysql()) return 1; break; #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: if (db_connect_sqlite()) return 1; break; #endif } db_disconnect(False); return 0; } void db_disconnect(BOOL commit) { int ndx; if (!dbh.all) return; if (transaction_state > 0) { if (DEBUG_GTE(DB, 1)) { rprintf(FCLIENT, "[%s] %s our DB transaction\n", who_am_i(), commit ? "Committing" : "Rolling back"); } transaction_state = 0; if (commit) run_sql("COMMIT"); else run_sql("ROLLBACK"); } if (DEBUG_GTE(DB, 1)) rprintf(FCLIENT, "[%s] Disconnecting from the DB\n", who_am_i()); for (ndx = 0; ndx < MAX_PREP_CNT; ndx++) { if (statements[ndx].all) { switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: mysql_stmt_close(statements[ndx].mysql); break; #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: sqlite3_finalize(statements[ndx].sqlite); break; #endif } statements[ndx].all = NULL; } } switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: mysql_close(dbh.mysql); break; #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: sqlite3_close(dbh.sqlite); break; #endif } dbh.all = NULL; use_db = DB_TYPE_NONE; } #ifdef USE_MYSQL static MYSQL_STMT *exec_mysql(int ndx) { MYSQL_STMT *stmt = statements[ndx].mysql; int rc; if ((rc = mysql_stmt_execute(stmt)) == CR_SERVER_LOST) { db_disconnect(False); use_db = DB_TYPE_MYSQL; if (db_connect(select_many_sums)) { stmt = statements[ndx].mysql; rc = mysql_stmt_execute(stmt); } } if (rc != 0) { rprintf(log_code, "SQL execute failed: %s\n", mysql_stmt_error(stmt)); return NULL; } return stmt; } #endif #ifdef USE_MYSQL /* This stores up to max_rows into the values pointed to by the bind data arrays. * If max_rows is > 1, then all the buffer pointers MUST be set to an array long * enough to hold the max count of rows. The buffer pointer will be incremented * to read additional rows (but never past the end). If stmt_ptr is non-NULL, it * will be set to the "stmt" pointer IFF we didn't run out of rows before hitting * the max. In this case, the caller should call mysql_stmt_fetch() to read any * remaining rows (the buffer pointers will point at the final array element) and * then call mysql_stmt_free_result(). If *stmt_ptr is a NULL value, there were * not enough rows to fill the max_rows arrays, and the stmt was already freed. */ static int fetch_mysql(MYSQL_BIND *binds, int bind_cnt, int ndx, int max_rows, MYSQL_STMT **stmt_ptr) { MYSQL_STMT *stmt; int i, rc, rows = 0; if (bind_cnt > MAX_RESULT_BINDS) { fprintf(stderr, "Internal error: MAX_RESULT_BINDS overflow\n"); exit_cleanup(RERR_UNSUPPORTED); } if ((stmt = exec_mysql(ndx)) == NULL) return 0; for (i = 0; i < bind_cnt; i++) { binds[i].is_null = &result_is_null[i]; binds[i].length = &result_length[i]; binds[i].error = &result_error[i]; } mysql_stmt_bind_result(stmt, binds); while (rows < max_rows) { if ((rc = mysql_stmt_fetch(stmt)) != 0) { if (rc != MYSQL_NO_DATA) rprintf(log_code, "SELECT fetch failed: %s\n", mysql_stmt_error(stmt)); break; } if (++rows >= max_rows) break; for (i = 0; i < bind_cnt; i++) { switch (binds[i].buffer_type) { case MYSQL_TYPE_BLOB: case MYSQL_TYPE_STRING: binds[i].buffer += binds[i].buffer_length; break; case MYSQL_TYPE_LONG: binds[i].buffer += sizeof (int); break; case MYSQL_TYPE_LONGLONG: binds[i].buffer += sizeof (int64); break; default: fprintf(stderr, "Unknown MYSQL_TYPE_* in multi-row read: %d.\n", binds[i].buffer_type); exit_cleanup(RERR_UNSUPPORTED); } } } if (!stmt_ptr || rows < max_rows) { mysql_stmt_free_result(stmt); stmt = NULL; } if (stmt_ptr) *stmt_ptr = stmt; return rows; } #endif #if defined USE_MYSQL || defined USE_SQLITE static void update_mounts(void) { char buf[2048], *argv[2]; int f_from, f_to, len; STRUCT_STAT st; int pid, status; if (DEBUG_GTE(DB, 2)) printf("Running %s to grab mount info\n", mount_helper_script); argv[0] = mount_helper_script; argv[1] = NULL; pid = piped_child(argv, &f_from, &f_to); close(f_to); bind_mtime = time(NULL); /* abuse mtime slightly to hold our last_seen value */ /* Strict format has 2 items with one tab as separator: MOUNT_UNIQ\tPATH */ while ((len = read_line(f_from, buf, sizeof buf, 0)) > 0) { char *mount_uniq, *path; if (DEBUG_GTE(DB, 3)) printf("Parsing mount info: %s\n", buf); mount_uniq = strtok(buf, "\t"); path = mount_uniq ? strtok(NULL, "\r\n") : NULL; if (!path) { fprintf(stderr, "Failed to parse line from %s output\n", mount_helper_script); exit_cleanup(RERR_SYNTAX); } if (lstat(path, &st) < 0) { fprintf(stderr, "Failed to lstat(%s): %s\n", path, strerror(errno)); exit_cleanup(RERR_IPC); } bind_mount_uniq_len = strlcpy(bind_mount_uniq, mount_uniq, sizeof bind_mount_uniq); if (bind_mount_uniq_len >= (int)sizeof bind_mount_uniq) bind_mount_uniq_len = sizeof bind_mount_uniq - 1; if (db_output_msgs) { printf("Marking mount \"%s\" (%s) as a recent mount\n", bind_mount_uniq, big_num(st.st_dev)); } switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: bind_devno = st.st_dev; if (exec_mysql(INS_MOUNT) == NULL) { fprintf(stderr, "Failed to update mount info for \"%s\" - %s\n", bind_mount_uniq, mysql_error(dbh.mysql)); exit_cleanup(RERR_IPC); } break; #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { int rc, change_cnt; sqlite3_stmt *stmt = statements[INS_MOUNT].sqlite; sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC); sqlite3_bind_int64(stmt, 2, bind_mtime); sqlite3_bind_text(stmt, 3, bind_mount_uniq, bind_mount_uniq_len, SQLITE_STATIC); sqlite3_bind_int64(stmt, 4, st.st_dev); rc = sqlite3_step(stmt); if (rc != SQLITE_DONE) { fprintf(stderr, "Failed to insert mount info for \"%s\" - %s (%d)\n", bind_mount_uniq, sqlite3_errmsg(dbh.sqlite), rc); exit_cleanup(RERR_IPC); } change_cnt = sqlite3_changes(dbh.sqlite); sqlite3_reset(stmt); if (change_cnt == 0) { stmt = statements[UPD_MOUNT].sqlite; sqlite3_bind_int64(stmt, 1, bind_mtime); sqlite3_bind_int64(stmt, 2, st.st_dev); sqlite3_bind_text(stmt, 3, bind_thishost, bind_thishost_len, SQLITE_STATIC); sqlite3_bind_text(stmt, 4, bind_mount_uniq, bind_mount_uniq_len, SQLITE_STATIC); rc = sqlite3_step(stmt); if (rc != SQLITE_DONE) { fprintf(stderr, "Failed to update mount info for \"%s\" - %s (%d)\n", bind_mount_uniq, sqlite3_errmsg(dbh.sqlite), rc); exit_cleanup(RERR_IPC); } sqlite3_reset(stmt); } break; } #endif } } close(f_from); waitpid(pid, &status, 0); switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: { if (db_output_msgs) { MYSQL_BIND binds[1]; MYSQL_STMT *stmt; binds[0].buffer_type = MYSQL_TYPE_BLOB; binds[0].buffer = bind_mount_uniq; binds[0].buffer_length = sizeof bind_mount_uniq; if (fetch_mysql(binds, 1, SEL_MOUNT, 1, &stmt)) { while (1) { printf("Marking mount \"%s\" as unmounted.\n", bind_mount_uniq); if (mysql_stmt_fetch(stmt) != 0) break; } mysql_stmt_free_result(stmt); } } if (exec_mysql(UN_MOUNT) == NULL) { fprintf(stderr, "Failed to update old mount info - %s\n", mysql_error(dbh.mysql)); exit_cleanup(RERR_IPC); } break; } #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { sqlite3_stmt *stmt; int rc; if (db_output_msgs) { stmt = statements[SEL_MOUNT].sqlite; sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC); sqlite3_bind_int64(stmt, 2, bind_mtime); while (1) { if (sqlite3_step(stmt) != SQLITE_ROW) break; printf("Marking mount \"%s\" as unmounted.\n", sqlite3_column_text(stmt, 0)); } sqlite3_reset(stmt); } stmt = statements[UN_MOUNT].sqlite; sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC); sqlite3_bind_int64(stmt, 2, bind_mtime); rc = sqlite3_step(stmt); sqlite3_reset(stmt); if (rc != SQLITE_DONE) { fprintf(stderr, "Failed to update old mount info - %s (%d)\n", sqlite3_errmsg(dbh.sqlite), rc); exit_cleanup(RERR_IPC); } break; } #endif } } #endif static unsigned int get_disk_id(int64 devno) { static unsigned int prior_disk_id = 0; static int64 prior_devno = 0; if (prior_devno == devno && prior_disk_id) { if (DEBUG_GTE(DB, 5)) rprintf(FCLIENT, "get_disk_id(%s,%s) = %d (cached)\n", bind_thishost, big_num(devno), prior_disk_id); return prior_disk_id; } prior_devno = devno; switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: { MYSQL_BIND binds[1]; bind_devno = devno; /* The one changing SEL_DEV input value. */ /* Bind where to put the output. */ binds[0].buffer_type = MYSQL_TYPE_LONG; binds[0].buffer = &prior_disk_id; if (!fetch_mysql(binds, 1, SEL_DEV, 1, NULL)) prior_disk_id = 0; break; } #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { sqlite3_stmt *stmt = statements[SEL_DEV].sqlite; sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC); sqlite3_bind_int64(stmt, 2, devno); if (sqlite3_step(stmt) == SQLITE_ROW) prior_disk_id = sqlite3_column_int(stmt, 0); else prior_disk_id = 0; sqlite3_reset(stmt); break; } #endif } if (DEBUG_GTE(DB, 2)) rprintf(FCLIENT, "get_disk_id(%s,%s) = %d\n", bind_thishost, big_num(devno), prior_disk_id); return prior_disk_id; } int db_get_checksum(const STRUCT_STAT *st_p, char *sum) { unsigned int disk_id = get_disk_id(st_p->st_dev); int ok = 0; if (disk_id == 0) return 0; switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: { MYSQL_BIND binds[1]; bind_disk_id = disk_id; bind_ino = st_p->st_ino; bind_size = st_p->st_size; bind_mtime = st_p->st_mtime; if (!db_lax) bind_ctime = st_p->st_ctime; binds[0].buffer_type = MYSQL_TYPE_BLOB; binds[0].buffer = sum; binds[0].buffer_length = MD5_DIGEST_LEN; ok = fetch_mysql(binds, 1, SEL_SUM, 1, NULL); break; } #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { sqlite3_stmt *stmt = statements[SEL_SUM].sqlite; sqlite3_bind_int(stmt, 1, disk_id); sqlite3_bind_int64(stmt, 2, st_p->st_ino); sqlite3_bind_int64(stmt, 3, st_p->st_size); sqlite3_bind_int64(stmt, 4, st_p->st_mtime); if (!db_lax) sqlite3_bind_int64(stmt, 5, st_p->st_ctime); if (sqlite3_step(stmt) == SQLITE_ROW) { int len = sqlite3_column_bytes(stmt, 0); if (len > MAX_DIGEST_LEN) len = MAX_DIGEST_LEN; memcpy(sum, sqlite3_column_blob(stmt, 0), len); ok = 1; } sqlite3_reset(stmt); break; } #endif } if (DEBUG_GTE(DB, 2)) { if (ok) { rprintf(FCLIENT, "[%s] Found DB checksum for %s,%s,%d: %s\n", who_am_i(), big_num(st_p->st_dev), big_num(st_p->st_ino), md_num, sum_as_hex(md_num, sum, 0)); } else { rprintf(FCLIENT, "[%s] No DB checksum for %s,%s,%d\n", who_am_i(), big_num(st_p->st_dev), big_num(st_p->st_ino), md_num); } } return ok; } int db_get_both_checksums(const STRUCT_STAT *st_p, int *right_sum_cnt, int *wrong_sum_cnt, char **sum4, char **sum5) { static char dbsum[MD5_DIGEST_LEN*2]; int rows, j, sum_type[2]; int64 dbsize[2], dbmtime[2], dbctime[2]; unsigned int disk_id = get_disk_id(st_p->st_dev); if (disk_id == 0) return 0; switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: { MYSQL_BIND binds[5]; bind_disk_id = disk_id; bind_ino = st_p->st_ino; binds[0].buffer_type = MYSQL_TYPE_BLOB; binds[0].buffer = dbsum; binds[0].buffer_length = MD5_DIGEST_LEN; binds[1].buffer_type = MYSQL_TYPE_LONG; binds[1].buffer = (char*)sum_type; binds[2].buffer_type = MYSQL_TYPE_LONGLONG; binds[2].buffer = (char*)dbsize; binds[3].buffer_type = MYSQL_TYPE_LONGLONG; binds[3].buffer = (char*)dbmtime; binds[4].buffer_type = MYSQL_TYPE_LONGLONG; binds[4].buffer = (char*)dbctime; rows = fetch_mysql(binds, 5, SEL_SUM, 2, NULL); break; } #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { sqlite3_stmt *stmt = statements[SEL_SUM].sqlite; sqlite3_bind_int(stmt, 1, disk_id); sqlite3_bind_int64(stmt, 2, st_p->st_ino); for (j = 0; j < 2; j++) { int len; if (sqlite3_step(stmt) != SQLITE_ROW) break; len = sqlite3_column_bytes(stmt, 0); if (len > MD5_DIGEST_LEN) len = MD5_DIGEST_LEN; memcpy(dbsum + MD5_DIGEST_LEN*j, sqlite3_column_blob(stmt, 0), len); sum_type[j] = sqlite3_column_int(stmt, 1); dbsize[j] = sqlite3_column_int(stmt, 2); dbmtime[j] = sqlite3_column_int64(stmt, 3); dbctime[j] = sqlite3_column_int64(stmt, 4); } sqlite3_reset(stmt); rows = j; break; } #endif default: return 0; } if (sum4) *sum4 = NULL; if (sum5) *sum5 = NULL; *right_sum_cnt = *wrong_sum_cnt = 0; for (j = 0; j < rows; j++) { if (DEBUG_GTE(DB, 3)) { rprintf(FCLIENT, "DB checksum for %s,%s,%d: %s\n", big_num(st_p->st_dev), big_num(st_p->st_ino), sum_type[j], sum_as_hex(sum_type[j], dbsum + MD5_DIGEST_LEN*j, 0)); } if (sum_type[j] == 4) { if (!sum4) continue; *sum4 = dbsum + MD5_DIGEST_LEN*j; } else { if (!sum5) continue; *sum5 = dbsum + MD5_DIGEST_LEN*j; } if (st_p->st_size == dbsize[j] && st_p->st_mtime == dbmtime[j] && (db_lax || st_p->st_ctime == dbctime[j])) ++*right_sum_cnt; else ++*wrong_sum_cnt; } return rows; } int db_set_checksum(int mdnum, const STRUCT_STAT *st_p, const char *sum) { unsigned int disk_id; const char *errmsg = NULL; int rc = 0; if (am_receiver || (am_generator && same_db)) { /* Forward the setting to a single process. The receiver always * forwards to the generator, and the generator will forward to * the sender ONLY if this is a local transfer. */ char data[MSG_CHECKSUM_LEN]; SIVAL64(data, 0, st_p->st_dev); SIVAL64(data, 8, st_p->st_ino); SIVAL64(data, 16, st_p->st_size); SIVAL64(data, 24, st_p->st_mtime); SIVAL64(data, 32, st_p->st_ctime); #if MSG_CHECKSUM_LONGS != 5 #error Fix the setting of checksum long values #endif SIVAL(data, MSG_CHECKSUM_LONGS*8, mdnum); memcpy(data + MSG_CHECKSUM_LONGS*8 + 4, sum, MAX_DIGEST_LEN); return send_msg(MSG_CHECKSUM, data, sizeof data, 0); } if ((disk_id = get_disk_id(st_p->st_dev)) == 0) return 0; switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: if (transaction_state == 0) { if (!run_sql("BEGIN")) return 0; transaction_state = 1; } bind_disk_id = disk_id; bind_ino = st_p->st_ino; bind_mdnum = mdnum; bind_size = st_p->st_size; bind_mtime = st_p->st_mtime; bind_ctime = st_p->st_ctime; memcpy(bind_sum, sum, MD5_DIGEST_LEN); if (exec_mysql(REP_SUM) == NULL) errmsg = mysql_error(dbh.mysql); break; #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { sqlite3_stmt *stmt = statements[REP_SUM].sqlite; int lock_failures = 0; if (transaction_state == 0) { if (!run_sql("BEGIN")) return 0; transaction_state = 1; } sqlite3_bind_int(stmt, 1, disk_id); sqlite3_bind_int64(stmt, 2, st_p->st_ino); sqlite3_bind_int(stmt, 3, mdnum); sqlite3_bind_int64(stmt, 4, st_p->st_size); sqlite3_bind_int64(stmt, 5, st_p->st_mtime); sqlite3_bind_int64(stmt, 6, st_p->st_ctime); sqlite3_bind_blob(stmt, 7, sum, MD5_DIGEST_LEN, SQLITE_TRANSIENT); while (1) { rc = sqlite3_step(stmt); if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED) break; if (++lock_failures > MAX_LOCK_FAILURES) break; sqlite3_reset(stmt); msleep(LOCK_FAIL_MSLEEP); } if (rc != SQLITE_DONE) errmsg = sqlite3_errmsg(dbh.sqlite); sqlite3_reset(stmt); break; } #endif } if (!errmsg) { if (DEBUG_GTE(DB, 2)) { rprintf(FCLIENT, "[%s] Set DB checksum for %s,%s,%d: %s\n", who_am_i(), big_num(st_p->st_dev), big_num(st_p->st_ino), md_num, sum_as_hex(md_num, sum, 0)); } } else { rprintf(log_code, "[%s] Failed to set checksum for %s,%s,%d: %s (%d) -- closing DB\n", who_am_i(), big_num(st_p->st_dev), big_num(st_p->st_ino), md_num, errmsg, rc); db_disconnect(False); } return errmsg ? 0 : 1; } /* For a delayed-update copy, we set the checksum on the file when it was * inside the partial-dir. Since renaming the file changes its ctime, we need * to update the ctime to its new value (we can skip this in db_lax mode). */ int db_update_ctime(UNUSED(int mdnum), const STRUCT_STAT *st_p) { unsigned int disk_id = get_disk_id(st_p->st_dev); if (disk_id == 0) return 0; switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: bind_ctime = st_p->st_ctime; bind_disk_id = disk_id; bind_ino = st_p->st_ino; bind_mdnum = mdnum; bind_size = st_p->st_size; bind_mtime = st_p->st_mtime; return exec_mysql(UPD_CTIME) != NULL; #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { int rc; sqlite3_stmt *stmt = statements[UPD_CTIME].sqlite; if (stmt == NULL) return 0; sqlite3_bind_int64(stmt, 1, st_p->st_ctime); sqlite3_bind_int(stmt, 2, disk_id); sqlite3_bind_int64(stmt, 3, st_p->st_ino); sqlite3_bind_int(stmt, 4, mdnum); sqlite3_bind_int64(stmt, 5, st_p->st_size); sqlite3_bind_int64(stmt, 6, st_p->st_mtime); rc = sqlite3_step(stmt); sqlite3_reset(stmt); return rc == SQLITE_DONE; } #endif } return 0; } static int db_clean_init(void) { switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: { MYSQL_BIND binds[MAX_BIND_CNT]; char *sql; mysql_query(dbh.mysql, "CREATE TEMPORARY TABLE inode_present (" " disk_id integer unsigned NOT NULL," " ino bigint unsigned NOT NULL," " PRIMARY KEY (disk_id,ino)" ") ENGINE=MEMORY" ); sql="INSERT IGNORE INTO inode_present" " SET disk_id = ?, ino = ?"; memset(binds, 0, sizeof binds); binds[0].buffer_type = MYSQL_TYPE_LONG; binds[0].buffer = &bind_disk_id; binds[1].buffer_type = MYSQL_TYPE_LONGLONG; binds[1].buffer = &bind_ino; if (!prepare_mysql(INS_PRESENT, binds, 2, sql)) exit_cleanup(RERR_SYNTAX); sql="DELETE m.*" " FROM inode_map AS m" " LEFT JOIN inode_present AS p USING(disk_id, ino)" " JOIN disk AS d ON(m.disk_id = d.disk_id)" " WHERE host = ? AND devno != 0 AND p.disk_id IS NULL AND ctime < ?"; memset(binds, 0, sizeof binds); binds[0].buffer_type = MYSQL_TYPE_STRING; binds[0].buffer = &bind_thishost; binds[0].buffer_length = bind_thishost_len; binds[1].buffer_type = MYSQL_TYPE_LONGLONG; binds[1].buffer = &bind_ctime; if (!prepare_mysql(DEL_SUMS, binds, 2, sql)) exit_cleanup(RERR_SYNTAX); return 1; } #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { char *sql; sql="ATTACH DATABASE '' AS aux1;"; /* Private temp DB, probably in-memory */ if (!run_sql(sql)) exit_cleanup(RERR_IPC); sql="CREATE TABLE aux1.inode_present (" " disk_id integer NOT NULL," " ino bigint NOT NULL," " PRIMARY KEY (disk_id,ino)" ")"; if (!run_sql(sql)) exit_cleanup(RERR_IPC); sql="INSERT OR IGNORE INTO aux1.inode_present" " (disk_id, ino)" " VALUES (?, ?)"; if (!prepare_sqlite(INS_PRESENT, sql)) exit_cleanup(RERR_IPC); sql="DELETE FROM inode_map" " WHERE ROWID IN (" " SELECT m.ROWID" " FROM inode_map AS m" " LEFT JOIN aux1.inode_present AS p USING(disk_id, ino)" " JOIN disk AS d ON(m.disk_id = d.disk_id)" " WHERE host = ? AND devno != 0 AND p.disk_id IS NULL AND ctime < ?" " )"; if (!prepare_sqlite(DEL_SUMS, sql)) exit_cleanup(RERR_IPC); transaction_state = -1; /* bug work-around -- force transaction off when cleaning XXX */ return 1; } #endif } return 0; } static int db_note_present(UNUSED(int disk_id), UNUSED(int64 ino)) { switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: bind_disk_id = disk_id; bind_ino = ino; return exec_mysql(INS_PRESENT) != NULL; #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { int rc; sqlite3_stmt *stmt = statements[INS_PRESENT].sqlite; sqlite3_bind_int(stmt, 1, disk_id); sqlite3_bind_int64(stmt, 2, ino); rc = sqlite3_step(stmt); sqlite3_reset(stmt); return rc == SQLITE_DONE; } #endif } return 0; } /* This function requires the user to have populated all disk_id+inode pairs * into the inode_present table. */ static int db_clean_inodes(UNUSED(time_t start_time)) { int del_cnt = 0; /* The extra ctime < start_time check ensures that brand-new checksums that * were added after the start of our cleaning run are not removed. */ switch (use_db) { #ifdef USE_MYSQL case DB_TYPE_MYSQL: { MYSQL_STMT *stmt; bind_ctime = start_time; stmt = exec_mysql(DEL_SUMS); if (stmt != NULL) del_cnt = mysql_affected_rows(dbh.mysql); break; } #endif #ifdef USE_SQLITE case DB_TYPE_SQLITE: { int rc; sqlite3_stmt *stmt = statements[DEL_SUMS].sqlite; sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC); sqlite3_bind_int64(stmt, 2, start_time); rc = sqlite3_step(stmt); if (rc == SQLITE_DONE) del_cnt = sqlite3_changes(dbh.sqlite); sqlite3_reset(stmt); break; } #endif } return del_cnt; } static int abs_path(char *buf, int bufsiz, const char *curdir, const char *dir) { if (*dir == '/') strlcpy(buf, dir, bufsiz); else { int len = snprintf(buf, bufsiz, "%s/%s", curdir, dir); assert(len > 0); /* silence a compiler warning */ } return clean_fname(buf, CFN_DROP_TRAILING_DOT_DIR | CFN_COLLAPSE_DOT_DOT_DIRS); } static struct name_list *new_name(const char *basename, const char *filename) { struct name_list *n; int blen = strlen(basename); int slen = filename ? (int)strlen(filename) : -1; int len = blen + 1 + slen; if (len >= MAXPATHLEN) { if (filename) rprintf(FERROR, "Filename too long: %s/%s\n", basename, filename); else rprintf(FERROR, "Filename too long: %s\n", basename); return NULL; } n = (struct name_list *)new_array(char, sizeof (struct name_list) + len); memcpy(n->name, basename, blen); if (filename) { n->name[blen] = '/'; memcpy(n->name + 1 + blen, filename, slen); } n->name[len] = '\0'; n->next = NULL; return n; } static int name_compare(const void *n1, const void *n2) { struct name_list *p1 = *(struct name_list **)n1; struct name_list *p2 = *(struct name_list **)n2; return strcmp(p1->name, p2->name); } static struct name_list *get_sorted_names(const char *dir) { struct name_list *add, **sortbuf, *names = NULL, *prior_name = NULL; struct dirent *di; int cnt = 0; DIR *d; if (!(d = opendir("."))) { rprintf(FERROR, "Unable to opendir %s: %s\n", dir, strerror(errno)); return NULL; } while ((di = readdir(d)) != NULL) { char *dname = d_name(di); if (dname[0] == '.' && (dname[1] == '\0' || (dname[1] == '.' && dname[2] == '\0'))) continue; if (!(add = new_name(dname, NULL))) continue; if (prior_name) prior_name->next = add; else names = add; prior_name = add; cnt++; } closedir(d); if (cnt) { int j; sortbuf = new_array(struct name_list *, cnt); for (j = 0; j < cnt; j++) { sortbuf[j] = names; names = names->next; } qsort(sortbuf, cnt, PTR_SIZE, name_compare); names = prior_name = NULL; for (j = 0; j < cnt; j++) { add = sortbuf[j]; if (prior_name) prior_name->next = add; else names = add; prior_name = add; } if (prior_name) prior_name->next = NULL; free(sortbuf); } return names; } static inline int sums_ne(const char *sum1, const char *sum2) { return memcmp(sum1, sum2, MD5_DIGEST_LEN) != 0; } /* Returns 1 if there is a checksum change, else 0. */ static int mention_file(const char *dir, const char *name, int right_cnt, int wrong_cnt, const char *dbsum4, const char *dbsum5, const char *sum4, const char *sum5) { char *info_str = wrong_cnt && !right_cnt ? "!i " : " "; char *md4_str = !db_do_md4 ? NULL : !dbsum4 ? "+4 " : !sum4 ? "?4 " : sums_ne(sum4, dbsum4) ? "!4 " : " "; char *md5_str = !db_do_md5 ? NULL : !dbsum5 ? "+5 " : !sum5 ? "?5 " : sums_ne(sum5, dbsum5) ? "!5 " : " "; int chg = *info_str != ' ' || (md4_str && *md4_str != ' ') || (md5_str && *md5_str != ' '); if (chg || db_output_unchanged) { if (db_output_info) { fputs(info_str, stdout); if (md4_str) fputs(md4_str, stdout); if (md5_str) fputs(md5_str, stdout); } if (db_output_sum) { if (db_do_md4) printf("%s ", sum_as_hex(4, sum4, 0)); if (db_do_md5) printf("%s ", sum_as_hex(5, sum5, 0)); } if (db_output_name) { if (db_output_sum) putchar(' '); /* We want 2 spaces, like md5sum. */ if (*dir != '.' || dir[1]) { fputs(dir, stdout); putchar('/'); } puts(name); } } return chg; } NORETURN void run_dbonly(const char **args) { char start_dir[MAXPATHLEN], dirbuf[MAXPATHLEN]; int need_sum_cnt, start_dir_len; struct name_list *prior_dir; struct name_list *names; time_t clean_start = 0; int exit_code = 0; checksum_type = 5; need_sum_cnt = db_do_md4 + db_do_md5; if (!db_read_config(FERROR, db_config) || !db_connect(1)) exit_cleanup(RERR_FILEIO); if (db_clean) { clean_start = time(NULL); db_clean_init(); } if (getcwd(start_dir, sizeof start_dir - 1) == NULL) { rsyserr(FERROR, errno, "getcwd()"); exit_cleanup(RERR_FILESELECT); } start_dir_len = strlen(start_dir); if (args) { prior_dir = NULL; while (*args) { struct name_list *add; if (abs_path(dirbuf, sizeof dirbuf, start_dir, *args++) <= 0) continue; if (!(add = new_name(dirbuf, NULL))) continue; if (prior_dir) prior_dir->next = add; else dirs_list = add; prior_dir = add; } } else dirs_list = new_name(start_dir, NULL); prior_dir = NULL; while (dirs_list) { struct name_list *subdirs, *prior_subdir, *prior_name; const char *dir = dirs_list->name; const char *reldir = dir; if (prior_dir) free((void*)prior_dir); prior_dir = dirs_list; dirs_list = dirs_list->next; if (strncmp(reldir, start_dir, start_dir_len) == 0) { if (reldir[start_dir_len] == '\0') reldir = "."; else if (reldir[start_dir_len] == '/') reldir += start_dir_len + 1; } if (db_output_dirs) printf("... %s/ ...\n", reldir); if (chdir(dir) < 0) { rprintf(FERROR, "Unable to chdir to %s: %s\n", dir, strerror(errno)); continue; } if (!(names = get_sorted_names(dir))) continue; subdirs = prior_subdir = prior_name = NULL; while (names) { STRUCT_STAT st; char *dbsum4, *sum4, sumbuf4[MD5_DIGEST_LEN]; char *dbsum5, *sum5, sumbuf5[MD5_DIGEST_LEN]; int right_sum_cnt, wrong_sum_cnt; const char *name = names->name; unsigned int disk_id; if (prior_name) free((void*)prior_name); prior_name = names; names = names->next; dbsum4 = dbsum5 = sum4 = sum5 = NULL; if (lstat(name, &st) < 0) { rprintf(FERROR, "Failed to lstat(%s): %s\n", name, strerror(errno)); continue; } if (S_ISLNK(st.st_mode)) continue; if (S_ISDIR(st.st_mode)) { /* add optional excluding of things like /^(CVS|\.svn|\.git|\.bzr)$/; */ if (recurse) { struct name_list *add = new_name(dir, name); if (add) { if (prior_subdir) prior_subdir->next = add; else subdirs = add; prior_subdir = add; } } continue; } if (!S_ISREG(st.st_mode)) continue; if (!(disk_id = get_disk_id(st.st_dev))) continue; if (db_clean) { db_note_present(disk_id, st.st_ino); if (!db_update && !db_check) continue; } db_get_both_checksums(&st, &right_sum_cnt, &wrong_sum_cnt, db_do_md4 ? &dbsum4 : NULL, db_do_md5 ? &dbsum5 : NULL); if (!db_check && right_sum_cnt == need_sum_cnt) { mention_file(reldir, name, right_sum_cnt, wrong_sum_cnt, dbsum4, dbsum5, dbsum4, dbsum5); continue; } if (db_update || (db_check && right_sum_cnt) || db_output_sum) { uchar *data; int32 remainder; md_context m4; MD5_CTX m5; struct map_struct *buf; OFF_T off, len = st.st_size; int fd = do_open(name, O_RDONLY, 0); if (fd < 0) { rprintf(FERROR, "ERROR: unable to read %s: %s\n", name, strerror(errno)); continue; } if (db_do_md4) mdfour_begin(&m4); if (db_do_md5) MD5_Init(&m5); buf = map_file(fd, len, MAX_MAP_SIZE, CSUM_CHUNK); for (off = 0; off + CSUM_CHUNK <= len; off += CSUM_CHUNK) { data = (uchar*)map_ptr(buf, off, CSUM_CHUNK); if (db_do_md4) mdfour_update(&m4, data, CSUM_CHUNK); if (db_do_md5) MD5_Update(&m5, data, CSUM_CHUNK); } remainder = (int32)(len - off); data = (uchar*)map_ptr(buf, off, remainder); if (db_do_md4) { mdfour_update(&m4, data, remainder); mdfour_result(&m4, (uchar*)(sum4 = sumbuf4)); } if (db_do_md5) { MD5_Update(&m5, data, remainder); MD5_Final((uchar*)(sum5 = sumbuf5), &m5); } close(fd); unmap_file(buf); } int chg = mention_file(reldir, name, right_sum_cnt, wrong_sum_cnt, dbsum4, dbsum5, sum4, sum5); if (!chg) { /* Only db_check should get here... */ } else if (!db_update) { exit_code = 1; } else { int fail = 0; if (db_do_md4 && !db_set_checksum(4, &st, sum4)) fail = 1; if (db_do_md5 && !db_set_checksum(5, &st, sum5)) fail = 1; if (fail) { fprintf(stderr, "Failed to set checksum on %s/%s\n", reldir, name); exit_cleanup(RERR_FILEIO); } } } if (prior_name) free((void*)prior_name); if (recurse && subdirs) { prior_subdir->next = dirs_list; dirs_list = subdirs; } } if (prior_dir) free((void*)prior_dir); if (db_clean) { int rows = db_clean_inodes(clean_start); if (db_output_msgs) printf("Cleaned out %d old inode%s.\n", rows, rows == 1 ? "" : "s"); } db_disconnect(True); exit(exit_code); }