/*
* Routines to access extended file info via DB.
*
* Copyright (C) 2008-2013 Wayne Davison
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, visit the http://fsf.org website.
*/
#include "rsync.h"
#include "ifuncs.h"
#include "itypes.h"
#include "inums.h"
#ifdef USE_OPENSSL
#include "openssl/md4.h"
#include "openssl/md5.h"
#endif
extern int recurse;
extern int same_db;
extern int am_receiver;
extern int am_generator;
extern int checksum_type;
extern int db_clean, db_check, db_do_md4, db_do_md5, db_update, db_lax, db_init, db_mounts;
extern int db_output_name, db_output_sum, db_output_info, db_output_unchanged, db_output_dirs, db_output_msgs;
extern int saw_db_output_opt, saw_db_sum_opt;
extern char *db_config;
#define MOUNT_HELPER_SCRIPT "/usr/sbin/rsyncdb-mountinfo"
#if defined HAVE_MYSQL_MYSQL_H && defined HAVE_LIBMYSQLCLIENT
#define USE_MYSQL
#include <mysql/mysql.h>
#include <mysql/errmsg.h>
#endif
#if defined HAVE_SQLITE3_H && defined HAVE_LIBSQLITE3
#define USE_SQLITE
#include <sqlite3.h>
#ifndef HAVE_SQLITE3_OPEN_V2
#define sqlite3_open_v2(dbname, dbhptr, flags, vfs) \
sqlite3_open(dbname, dbhptr)
#endif
#ifndef HAVE_SQLITE3_PREPARE_V2
#define sqlite3_prepare_v2 sqlite3_prepare
#endif
#define MAX_LOCK_FAILURES 10
#define LOCK_FAIL_MSLEEP 100
#endif
#ifndef USE_OPENSSL
#define MD5_CTX md_context
#define MD5_Init md5_begin
#define MD5_Update md5_update
#define MD5_Final(digest, cptr) md5_result(cptr, digest)
#endif
#define DB_TYPE_NONE 0
#define DB_TYPE_MYSQL 1
#define DB_TYPE_SQLITE 2
int use_db = DB_TYPE_NONE;
int select_many_sums = 0;
#define PREP_NORM 0
#define PREP_MOUNT 1
static const char *dbhost = NULL, *dbuser = NULL, *dbpass = NULL, *dbname = NULL;
static unsigned int dbport = 0;
static int transaction_state = -1;
static union {
#ifdef USE_MYSQL
MYSQL *mysql;
#endif
#ifdef USE_SQLITE
sqlite3 *sqlite;
#endif
void *all;
} dbh;
#define SEL_DEV 0
#define SEL_SUM 1
#define REP_SUM 2
#define UPD_CTIME 3
#define INS_MOUNT 4
#define UPD_MOUNT 5 /* SQLite only */
#define SEL_MOUNT 6
#define UN_MOUNT 7
#define DEL_SUMS 8
#define INS_PRESENT 9
#define MAX_PREP_CNT 10
#define MAX_BIND_CNT 7
#define MAX_RESULT_BINDS 32
static union {
#ifdef USE_MYSQL
MYSQL_STMT *mysql;
#endif
#ifdef USE_SQLITE
sqlite3_stmt *sqlite;
#endif
void *all;
} statements[MAX_PREP_CNT];
static int md_num;
static enum logcode log_code;
#ifdef USE_MYSQL
static unsigned int bind_disk_id, bind_mdnum;
static int64 bind_devno, bind_ino, bind_size, bind_mtime, bind_ctime;
static char bind_sum[MAX_DIGEST_LEN];
static unsigned long result_length[MAX_RESULT_BINDS];
static bool result_is_null[MAX_RESULT_BINDS], result_error[MAX_RESULT_BINDS];
#elif defined USE_SQLITE
static int64 bind_mtime;
#endif
static char bind_thishost[128+1];
static unsigned long bind_thishost_len;
static char *mount_helper_script = NULL;
static char *error_log;
#if defined USE_SQLITE && defined SQLITE_CONFIG_LOG
static char bind_mount_uniq[128+1];
static unsigned long bind_mount_uniq_len;
static FILE *error_log_fp;
#endif
#define PTR_SIZE (sizeof (struct file_struct *))
#if defined USE_MYSQL || defined USE_SQLITE
static void update_mounts(void);
#endif
struct name_list {
struct name_list *next;
char name[1];
} *dirs_list;
int db_read_config(enum logcode code, const char *config_file)
{
char buf[2048], *cp;
FILE *fp;
int lineno = 0;
log_code = code;
bind_thishost_len = strlcpy(bind_thishost, "localhost", sizeof bind_thishost);
if (!(fp = fopen(config_file, "r"))) {
rsyserr(log_code, errno, "unable to open %s", config_file);
return 0;
}
if (DEBUG_GTE(DB, 1))
rprintf(FCLIENT, "[%s] Reading DB config from %s\n", who_am_i(), config_file);
while (fgets(buf, sizeof buf, fp)) {
lineno++;
if ((cp = strchr(buf, '#')) == NULL
&& (cp = strchr(buf, '\r')) == NULL
&& (cp = strchr(buf, '\n')) == NULL)
cp = buf + strlen(buf);
while (cp != buf && isSpace(cp-1)) cp--;
*cp = '\0';
if (!*buf)
continue;
if (!(cp = strchr(buf, ':')))
goto invalid_line;
*cp++ = '\0';
while (isSpace(cp)) cp++;
if (strcasecmp(buf, "dbhost") == 0)
dbhost = strdup(cp);
else if (strcasecmp(buf, "dbuser") == 0)
dbuser = strdup(cp);
else if (strcasecmp(buf, "dbpass") == 0)
dbpass = strdup(cp);
else if (strcasecmp(buf, "dbname") == 0)
dbname = strdup(cp);
else if (strcasecmp(buf, "dbport") == 0)
dbport = atoi(cp);
else if (strcasecmp(buf, "transaction") == 0)
transaction_state = atoi(cp) ? 0 : -1;
else if (strcasecmp(buf, "mountHelper") == 0)
mount_helper_script = strdup(cp);
else if (strcasecmp(buf, "errlog") == 0)
error_log = strdup(cp);
else if (strcasecmp(buf, "thishost") == 0)
bind_thishost_len = strlcpy(bind_thishost, cp, sizeof bind_thishost);
else if (strcasecmp(buf, "dbtype") == 0) {
#ifdef USE_MYSQL
if (strcasecmp(cp, "mysql") == 0) {
use_db = DB_TYPE_MYSQL;
continue;
}
#endif
#ifdef USE_SQLITE
if (strcasecmp(cp, "sqlite") == 0) {
use_db = DB_TYPE_SQLITE;
continue;
}
#endif
rprintf(log_code,
"Unsupported dbtype on line #%d in %s.\n",
lineno, config_file);
use_db = DB_TYPE_NONE;
return 0;
} else {
invalid_line:
rprintf(log_code, "Invalid line #%d in %s\n",
lineno, config_file);
use_db = DB_TYPE_NONE;
return 0;
}
}
fclose(fp);
if (bind_thishost_len >= (int)sizeof bind_thishost)
bind_thishost_len = sizeof bind_thishost - 1;
if (!use_db || !dbname) {
rprintf(log_code, "Please specify at least dbtype and dbname in %s.\n", config_file);
use_db = DB_TYPE_NONE;
return 0;
}
md_num = checksum_type == 5 ? 5 : 4;
if (error_log) {
if (use_db != DB_TYPE_SQLITE)
rprintf(log_code, "Ignoring errlog setting for non-SQLite DB.\n");
#ifndef SQLITE_CONFIG_LOG
else
rprintf(log_code, "Your sqlite doesn't support SQLITE_CONFIG_LOG.\n");
#endif
}
if (!mount_helper_script)
mount_helper_script = MOUNT_HELPER_SCRIPT;
return 1;
}
#if defined USE_SQLITE && defined SQLITE_CONFIG_LOG
static void errorLogCallback(UNUSED(void *pArg), int iErrCode, const char *zMsg)
{
fprintf(error_log_fp, "[%d] %s (%d)\n", (int)getpid(), zMsg, iErrCode);
}
#endif
static int run_sql(const char *fmt, ...)
{
va_list ap;
char *query;
int ok = 0, qlen;
va_start(ap, fmt);
qlen = vasprintf(&query, fmt, ap);
va_end(ap);
if (qlen < 0)
out_of_memory("run_sql");
if (DEBUG_GTE(DB, 3))
rprintf(FCLIENT, "[%s] SQL being run: %s\n", who_am_i(), query);
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL:
if (mysql_query(dbh.mysql, query) < 0) {
rprintf(FERROR, "Failed to run sql: %s\n", mysql_error(dbh.mysql));
rprintf(FERROR, "%s\n", query);
} else
ok = 1;
break;
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
int rc, lock_failures = 0;
while (1) {
if ((rc = sqlite3_exec(dbh.sqlite, query, NULL, NULL, NULL)) == 0)
break;
if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED)
break;
if (++lock_failures > MAX_LOCK_FAILURES)
break;
msleep(LOCK_FAIL_MSLEEP);
}
if (rc) {
rprintf(FERROR, "[%s] Failed to run sql: %s\n", who_am_i(), sqlite3_errmsg(dbh.sqlite));
rprintf(FERROR, "%s\n", query);
} else
ok = 1;
break;
}
#endif
}
free(query);
return ok;
}
#ifdef USE_MYSQL
static int prepare_mysql(int ndx, MYSQL_BIND *binds, int bind_cnt, const char *fmt, ...)
{
va_list ap;
char *query;
int qlen, param_cnt;
MYSQL_STMT *stmt = mysql_stmt_init(dbh.mysql);
if (stmt == NULL)
out_of_memory("prepare_mysql");
va_start(ap, fmt);
qlen = vasprintf(&query, fmt, ap);
va_end(ap);
if (qlen < 0)
out_of_memory("prepare_mysql");
if (DEBUG_GTE(DB, 3))
rprintf(FCLIENT, "[%s] SQL being prepared: %s\n", who_am_i(), query);
if (mysql_stmt_prepare(stmt, query, qlen) != 0) {
rprintf(log_code, "[%s] Prepare failed: %s\n", who_am_i(), mysql_stmt_error(stmt));
rprintf(log_code, "%s\n", query);
free(query);
return 0;
}
if ((param_cnt = mysql_stmt_param_count(stmt)) != bind_cnt) {
rprintf(log_code, "[%s] Parameters in statement = %d, bind vars = %d\n",
who_am_i(), param_cnt, bind_cnt);
rprintf(log_code, "%s\n", query);
free(query);
return 0;
}
if (bind_cnt)
mysql_stmt_bind_param(stmt, binds);
statements[ndx].mysql = stmt;
free(query);
return 1;
}
#endif
#ifdef USE_MYSQL
static int prepare_mysql_queries(int type)
{
MYSQL_BIND binds[MAX_BIND_CNT];
char *sql;
switch (type) {
case PREP_NORM:
sql="SELECT disk_id"
" FROM disk"
" WHERE host = ? AND devno = ?";
memset(binds, 0, sizeof binds);
binds[0].buffer_type = MYSQL_TYPE_STRING;
binds[0].buffer = &bind_thishost;
binds[0].buffer_length = bind_thishost_len;
binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
binds[1].buffer = &bind_devno;
if (!prepare_mysql(SEL_DEV, binds, 2, sql))
return 0;
memset(binds, 0, sizeof binds);
binds[0].buffer_type = MYSQL_TYPE_LONG;
binds[0].buffer = &bind_disk_id;
binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
binds[1].buffer = &bind_ino;
if (select_many_sums) {
sql="SELECT checksum, sum_type, size, mtime, ctime"
" FROM inode_map"
" WHERE disk_id = ? AND ino = ?";
if (!prepare_mysql(SEL_SUM, binds, 2, sql))
return 0;
} else {
sql="SELECT checksum"
" FROM inode_map"
" WHERE disk_id = ? AND ino = ? AND sum_type = %d"
" AND size = ? AND mtime = ? %s"; /* optional: AND ctime = ? */
binds[2].buffer_type = MYSQL_TYPE_LONGLONG;
binds[2].buffer = &bind_size;
binds[3].buffer_type = MYSQL_TYPE_LONGLONG;
binds[3].buffer = &bind_mtime;
if (!db_lax) {
binds[4].buffer_type = MYSQL_TYPE_LONGLONG;
binds[4].buffer = &bind_ctime;
}
if (!prepare_mysql(SEL_SUM, binds, 4 + !db_lax, sql, md_num, db_lax ? "" : "AND ctime = ?"))
return 0;
}
sql="INSERT INTO inode_map"
" SET disk_id = ?, ino = ?, sum_type = ?,"
" size = ?, mtime = ?, ctime = ?, checksum = ?"
" ON DUPLICATE KEY"
" UPDATE size = VALUES(size), mtime = VALUES(mtime),"
" ctime = VALUES(ctime), checksum = VALUES(checksum)";
memset(binds, 0, sizeof binds);
binds[0].buffer_type = MYSQL_TYPE_LONG;
binds[0].buffer = &bind_disk_id;
binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
binds[1].buffer = &bind_ino;
binds[2].buffer_type = MYSQL_TYPE_LONG;
binds[2].buffer = &bind_mdnum;
binds[3].buffer_type = MYSQL_TYPE_LONGLONG;
binds[3].buffer = &bind_size;
binds[4].buffer_type = MYSQL_TYPE_LONGLONG;
binds[4].buffer = &bind_mtime;
binds[5].buffer_type = MYSQL_TYPE_LONGLONG;
binds[5].buffer = &bind_ctime;
binds[6].buffer_type = MYSQL_TYPE_BLOB;
binds[6].buffer = &bind_sum;
binds[6].buffer_length = MD5_DIGEST_LEN; /* Same as MD4_DIGEST_LEN */
if (!prepare_mysql(REP_SUM, binds, 7, sql))
return 0;
sql="UPDATE inode_map"
" SET ctime = ?"
" WHERE disk_id = ? AND ino = ? AND sum_type = ? AND size = ? AND mtime = ?";
memset(binds, 0, sizeof binds);
binds[0].buffer_type = MYSQL_TYPE_LONGLONG;
binds[0].buffer = &bind_ctime;
binds[1].buffer_type = MYSQL_TYPE_LONG;
binds[1].buffer = &bind_disk_id;
binds[2].buffer_type = MYSQL_TYPE_LONGLONG;
binds[2].buffer = &bind_ino;
binds[3].buffer_type = MYSQL_TYPE_LONG;
binds[3].buffer = &bind_mdnum;
binds[4].buffer_type = MYSQL_TYPE_LONGLONG;
binds[4].buffer = &bind_size;
binds[5].buffer_type = MYSQL_TYPE_LONGLONG;
binds[5].buffer = &bind_mtime;
if (!prepare_mysql(UPD_CTIME, binds, 6, sql))
return 0;
break;
case PREP_MOUNT:
sql="INSERT INTO disk"
" SET host = ?, last_seen = ?, mount_uniq = ?, devno = ?"
" ON DUPLICATE KEY"
" UPDATE last_seen = VALUES(last_seen), devno = VALUES(devno)";
memset(binds, 0, sizeof binds);
binds[0].buffer_type = MYSQL_TYPE_STRING;
binds[0].buffer = &bind_thishost;
binds[0].buffer_length = bind_thishost_len;
binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
binds[1].buffer = &bind_mtime; /* we abuse mtime to hold the last_seen value */
binds[2].buffer_type = MYSQL_TYPE_STRING;
binds[2].buffer = &bind_mount_uniq;
binds[2].buffer_length = sizeof bind_mount_uniq;
binds[2].length = &bind_mount_uniq_len;
binds[3].buffer_type = MYSQL_TYPE_LONGLONG;
binds[3].buffer = &bind_devno;
if (!prepare_mysql(INS_MOUNT, binds, 4, sql))
return 0;
sql="SELECT mount_uniq"
" FROM disk"
" WHERE host = ? AND last_seen < ? AND devno != 0";
/* Reusing first 2 binds from INS_MOUNT */
if (!prepare_mysql(SEL_MOUNT, binds, 2, sql))
return 0;
sql="UPDATE disk"
" SET devno = 0"
" WHERE host = ? AND last_seen < ? AND devno != 0";
/* Reusing binds from SEL_MOUNT */
if (!prepare_mysql(UN_MOUNT, binds, 2, sql))
return 0;
break;
}
return 1;
}
#endif
#ifdef USE_MYSQL
static int db_connect_mysql(void)
{
const char *open_dbname = db_init ? "mysql" : dbname;
if (!(dbh.mysql = mysql_init(NULL)))
out_of_memory("db_read_config");
if (DEBUG_GTE(DB, 1)) {
rprintf(FCLIENT, "[%s] connecting: host=%s user=%s db=%s port=%d\n",
who_am_i(), dbhost, dbuser, open_dbname, dbport);
}
if (!mysql_real_connect(dbh.mysql, dbhost, dbuser, dbpass, open_dbname, dbport, NULL, 0)) {
rprintf(log_code, "[%s] Unable to connect to DB: %s\n", who_am_i(), mysql_error(dbh.mysql));
return 0;
}
if (db_init) {
if (db_output_msgs)
rprintf(FCLIENT, "Creating DB %s (if it does not exist)\n", dbname);
if (!run_sql("CREATE DATABASE IF NOT EXISTS `%s`", dbname)
|| !run_sql("USE `%s`", dbname))
exit_cleanup(RERR_IPC);
if (db_output_msgs)
rprintf(FCLIENT, "Dropping old tables (if they exist))\n");
if (!run_sql("DROP TABLE IF EXISTS disk")
|| !run_sql("DROP TABLE IF EXISTS inode_map"))
exit_cleanup(RERR_IPC);
if (db_output_msgs)
rprintf(FCLIENT, "Creating empty tables ...\n");
if (!run_sql(
"CREATE TABLE disk (\n"
" disk_id integer unsigned NOT NULL PRIMARY KEY AUTO_INCREMENT,\n"
" host varchar(128) NOT NULL default 'localhost',\n"
" mount_uniq varchar(128) default NULL,\n"
" devno bigint unsigned NOT NULL,\n" /* This is 0 when not mounted */
" last_seen bigint NOT NULL,\n"
" UNIQUE KEY mount_lookup (host, mount_uniq),\n"
" KEY dev_lookup (devno, host)\n"
")"))
exit_cleanup(RERR_IPC);
if (!run_sql(
"CREATE TABLE inode_map (\n"
" disk_id integer unsigned NOT NULL,\n"
" ino bigint unsigned NOT NULL,\n"
" sum_type tinyint NOT NULL default '0',\n"
" size bigint unsigned NOT NULL,\n"
" mtime bigint NOT NULL,\n"
" ctime bigint NOT NULL,\n"
" checksum binary(16) NOT NULL,\n"
" PRIMARY KEY (disk_id,ino,sum_type)\n"
")"))
exit_cleanup(RERR_IPC);
if (!db_mounts)
exit_cleanup(0);
}
if (db_mounts) {
if (!prepare_mysql_queries(PREP_MOUNT))
exit_cleanup(RERR_IPC);
update_mounts();
exit_cleanup(0);
}
if (!prepare_mysql_queries(PREP_NORM))
return 0;
return 1;
}
#endif
#ifdef USE_SQLITE
static int prepare_sqlite(int ndx, const char *fmt, ...)
{
va_list ap;
char *query;
int rc, qlen, lock_failures = 0;
va_start(ap, fmt);
qlen = vasprintf(&query, fmt, ap);
va_end(ap);
if (qlen < 0)
out_of_memory("prepare_sqlite");
if (DEBUG_GTE(DB, 3))
rprintf(FCLIENT, "[%s] SQL being prepared: %s\n", who_am_i(), query);
while ((rc = sqlite3_prepare_v2(dbh.sqlite, query, -1, &statements[ndx].sqlite, NULL)) != 0) {
if (DEBUG_GTE(DB, 4)) {
rprintf(FCLIENT, "[%s] sqlite3_prepare_v2(,%s,,) returned %d\n",
who_am_i(), query, rc);
}
if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED)
break;
if (++lock_failures > MAX_LOCK_FAILURES)
break;
msleep(LOCK_FAIL_MSLEEP);
}
if (rc) {
rprintf(log_code, "[%s] Failed to prepare SQL: %s (%d)\n", who_am_i(), sqlite3_errmsg(dbh.sqlite), rc);
rprintf(log_code, "%s\n", query);
free(query);
return 0;
}
free(query);
return 1;
}
#endif
#ifdef USE_SQLITE
static int prepare_sqlite_queries(int type)
{
char *sql;
switch (type) {
case PREP_NORM:
sql="SELECT disk_id"
" FROM disk"
" WHERE host = ? AND devno = ?";
if (!prepare_sqlite(SEL_DEV, sql))
return 0;
if (select_many_sums) {
sql="SELECT checksum, sum_type, size, mtime, ctime"
" FROM inode_map"
" WHERE disk_id = ? AND ino = ?";
if (!prepare_sqlite(SEL_SUM, sql))
return 0;
} else {
sql="SELECT checksum"
" FROM inode_map"
" WHERE disk_id = ? AND ino = ? AND sum_type = %d"
" AND size = ? AND mtime = ? %s";
if (!prepare_sqlite(SEL_SUM, sql, md_num, db_lax ? "" : "AND ctime = ?"))
return 0;
}
sql="INSERT OR REPLACE INTO inode_map"
" (disk_id, ino, sum_type, size, mtime, ctime, checksum)"
" VALUES (?, ?, ?, ?, ?, ?, ?)";
if (!prepare_sqlite(REP_SUM, sql))
return 0;
sql="UPDATE inode_map"
" SET ctime = ?"
" WHERE disk_id = ? AND ino = ? AND sum_type = ? AND size = ? AND mtime = ?";
if (!prepare_sqlite(UPD_CTIME, sql))
return 0;
break;
case PREP_MOUNT:
sql="INSERT OR IGNORE INTO disk"
" (host, last_seen, mount_uniq, devno)"
" VALUES (?, ?, ?, ?)";
if (!prepare_sqlite(INS_MOUNT, sql))
return 0;
sql="UPDATE disk"
" SET last_seen = ?, devno = ?"
" WHERE host = ? AND mount_uniq = ?";
if (!prepare_sqlite(UPD_MOUNT, sql))
return 0;
sql="SELECT mount_uniq"
" FROM disk"
" WHERE host = ? AND last_seen < ? AND devno != 0";
if (!prepare_sqlite(SEL_MOUNT, sql))
return 0;
sql="UPDATE disk"
" SET devno = 0"
" WHERE host = ? AND last_seen < ? AND devno != 0";
if (!prepare_sqlite(UN_MOUNT, sql))
return 0;
break;
}
return 1;
}
#endif
#ifdef USE_SQLITE
static int db_connect_sqlite(void)
{
int lock_failures = 0;
int rc;
#ifdef SQLITE_CONFIG_LOG
if (error_log) {
if (DEBUG_GTE(DB, 1))
rprintf(FCLIENT, "[%s] Setting sqlite errlog to %s\n", who_am_i(), error_log);
if (!(error_log_fp = fopen(error_log, "a"))) {
rsyserr(log_code, errno, "unable to append to logfile %s", error_log);
error_log = NULL;
} else if (sqlite3_config(SQLITE_CONFIG_LOG, errorLogCallback, NULL) != 0)
rprintf(log_code, "Failed to set errorLogCallback: %s\n", sqlite3_errmsg(dbh.sqlite));
}
#endif
while (1) {
int open_flags = SQLITE_OPEN_READWRITE;
if (db_init)
open_flags |= SQLITE_OPEN_CREATE;
if (DEBUG_GTE(DB, 1))
rprintf(FCLIENT, "[%s] opening %s (%d)\n", who_am_i(), dbname, open_flags);
if ((rc = sqlite3_open_v2(dbname, &dbh.sqlite, open_flags, NULL)) == 0) {
break;
}
if (DEBUG_GTE(DB, 4)) {
rprintf(FCLIENT, "[%s] sqlite3_open_v2(%s,,%d,NULL) returned %d\n",
who_am_i(), dbname, open_flags, rc);
}
if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED)
break;
if (++lock_failures > MAX_LOCK_FAILURES)
break;
msleep(LOCK_FAIL_MSLEEP);
}
if (rc) {
rprintf(log_code, "Unable to connect to DB: %s (%d)\n", sqlite3_errmsg(dbh.sqlite), rc);
return 0;
}
if (db_init) {
char *sql;
if (db_output_msgs)
rprintf(FCLIENT, "Dropping old tables (if they exist) ...\n");
if (!run_sql("DROP TABLE IF EXISTS disk")
|| !run_sql("DROP TABLE IF EXISTS inode_map"))
exit_cleanup(RERR_IPC);
if (db_output_msgs)
rprintf(FCLIENT, "Creating empty tables ...\n");
sql="CREATE TABLE disk (\n"
" disk_id integer NOT NULL PRIMARY KEY AUTOINCREMENT,\n"
" host varchar(128) NOT NULL default 'localhost',\n"
" mount_uniq varchar(128) default NULL,\n"
" devno bigint NOT NULL,\n" /* This is 0 when not mounted */
" last_seen bigint NOT NULL,\n"
" UNIQUE (host, mount_uniq)\n"
")";
if (!run_sql(sql))
exit_cleanup(RERR_IPC);
sql="CREATE TABLE inode_map (\n"
" disk_id integer NOT NULL,\n"
" ino bigint NOT NULL,\n"
" size bigint NOT NULL,\n"
" mtime bigint NOT NULL,\n"
" ctime bigint NOT NULL,\n"
" sum_type tinyint NOT NULL default '0',\n"
" checksum binary(16) NOT NULL,\n"
" PRIMARY KEY (disk_id,ino,sum_type)\n"
")";
if (!run_sql(sql))
exit_cleanup(RERR_IPC);
#if SQLITE_VERSION_NUMBER >= 3007000
/* Using WAL locking makes concurrency much better (requires sqlite 3.7.0). */
sql="PRAGMA journal_mode = wal";
run_sql(sql); /* We don't check this for success. */
#endif
if (!db_mounts)
exit_cleanup(0);
}
if (db_mounts) {
if (!prepare_sqlite_queries(PREP_MOUNT))
exit_cleanup(RERR_IPC);
update_mounts();
exit_cleanup(0);
}
if (!prepare_sqlite_queries(PREP_NORM)) {
db_disconnect(False);
return 0;
}
return 1;
}
#endif
int db_connect(int select_many)
{
select_many_sums = select_many;
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL:
if (db_connect_mysql())
return 1;
break;
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE:
if (db_connect_sqlite())
return 1;
break;
#endif
}
db_disconnect(False);
return 0;
}
void db_disconnect(BOOL commit)
{
int ndx;
if (!dbh.all)
return;
if (transaction_state > 0) {
if (DEBUG_GTE(DB, 1)) {
rprintf(FCLIENT, "[%s] %s our DB transaction\n",
who_am_i(), commit ? "Committing" : "Rolling back");
}
transaction_state = 0;
if (commit)
run_sql("COMMIT");
else
run_sql("ROLLBACK");
}
if (DEBUG_GTE(DB, 1))
rprintf(FCLIENT, "[%s] Disconnecting from the DB\n", who_am_i());
for (ndx = 0; ndx < MAX_PREP_CNT; ndx++) {
if (statements[ndx].all) {
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL:
mysql_stmt_close(statements[ndx].mysql);
break;
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE:
sqlite3_finalize(statements[ndx].sqlite);
break;
#endif
}
statements[ndx].all = NULL;
}
}
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL:
mysql_close(dbh.mysql);
break;
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE:
sqlite3_close(dbh.sqlite);
break;
#endif
}
dbh.all = NULL;
use_db = DB_TYPE_NONE;
}
#ifdef USE_MYSQL
static MYSQL_STMT *exec_mysql(int ndx)
{
MYSQL_STMT *stmt = statements[ndx].mysql;
int rc;
if ((rc = mysql_stmt_execute(stmt)) == CR_SERVER_LOST) {
db_disconnect(False);
use_db = DB_TYPE_MYSQL;
if (db_connect(select_many_sums)) {
stmt = statements[ndx].mysql;
rc = mysql_stmt_execute(stmt);
}
}
if (rc != 0) {
rprintf(log_code, "SQL execute failed: %s\n", mysql_stmt_error(stmt));
return NULL;
}
return stmt;
}
#endif
#ifdef USE_MYSQL
/* This stores up to max_rows into the values pointed to by the bind data arrays.
* If max_rows is > 1, then all the buffer pointers MUST be set to an array long
* enough to hold the max count of rows. The buffer pointer will be incremented
* to read additional rows (but never past the end). If stmt_ptr is non-NULL, it
* will be set to the "stmt" pointer IFF we didn't run out of rows before hitting
* the max. In this case, the caller should call mysql_stmt_fetch() to read any
* remaining rows (the buffer pointers will point at the final array element) and
* then call mysql_stmt_free_result(). If *stmt_ptr is a NULL value, there were
* not enough rows to fill the max_rows arrays, and the stmt was already freed. */
static int fetch_mysql(MYSQL_BIND *binds, int bind_cnt, int ndx, int max_rows, MYSQL_STMT **stmt_ptr)
{
MYSQL_STMT *stmt;
int i, rc, rows = 0;
if (bind_cnt > MAX_RESULT_BINDS) {
fprintf(stderr, "Internal error: MAX_RESULT_BINDS overflow\n");
exit_cleanup(RERR_UNSUPPORTED);
}
if ((stmt = exec_mysql(ndx)) == NULL)
return 0;
for (i = 0; i < bind_cnt; i++) {
binds[i].is_null = &result_is_null[i];
binds[i].length = &result_length[i];
binds[i].error = &result_error[i];
}
mysql_stmt_bind_result(stmt, binds);
while (rows < max_rows) {
if ((rc = mysql_stmt_fetch(stmt)) != 0) {
if (rc != MYSQL_NO_DATA)
rprintf(log_code, "SELECT fetch failed: %s\n", mysql_stmt_error(stmt));
break;
}
if (++rows >= max_rows)
break;
for (i = 0; i < bind_cnt; i++) {
switch (binds[i].buffer_type) {
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_STRING:
binds[i].buffer += binds[i].buffer_length;
break;
case MYSQL_TYPE_LONG:
binds[i].buffer += sizeof (int);
break;
case MYSQL_TYPE_LONGLONG:
binds[i].buffer += sizeof (int64);
break;
default:
fprintf(stderr, "Unknown MYSQL_TYPE_* in multi-row read: %d.\n", binds[i].buffer_type);
exit_cleanup(RERR_UNSUPPORTED);
}
}
}
if (!stmt_ptr || rows < max_rows) {
mysql_stmt_free_result(stmt);
stmt = NULL;
}
if (stmt_ptr)
*stmt_ptr = stmt;
return rows;
}
#endif
#if defined USE_MYSQL || defined USE_SQLITE
static void update_mounts(void)
{
char buf[2048], *argv[2];
int f_from, f_to, len;
STRUCT_STAT st;
int pid, status;
if (DEBUG_GTE(DB, 2))
printf("Running %s to grab mount info\n", mount_helper_script);
argv[0] = mount_helper_script;
argv[1] = NULL;
pid = piped_child(argv, &f_from, &f_to);
close(f_to);
bind_mtime = time(NULL); /* abuse mtime slightly to hold our last_seen value */
/* Strict format has 2 items with one tab as separator: MOUNT_UNIQ\tPATH */
while ((len = read_line(f_from, buf, sizeof buf, 0)) > 0) {
char *mount_uniq, *path;
if (DEBUG_GTE(DB, 3))
printf("Parsing mount info: %s\n", buf);
mount_uniq = strtok(buf, "\t");
path = mount_uniq ? strtok(NULL, "\r\n") : NULL;
if (!path) {
fprintf(stderr, "Failed to parse line from %s output\n", mount_helper_script);
exit_cleanup(RERR_SYNTAX);
}
if (lstat(path, &st) < 0) {
fprintf(stderr, "Failed to lstat(%s): %s\n", path, strerror(errno));
exit_cleanup(RERR_IPC);
}
bind_mount_uniq_len = strlcpy(bind_mount_uniq, mount_uniq, sizeof bind_mount_uniq);
if (bind_mount_uniq_len >= (int)sizeof bind_mount_uniq)
bind_mount_uniq_len = sizeof bind_mount_uniq - 1;
if (db_output_msgs) {
printf("Marking mount \"%s\" (%s) as a recent mount\n",
bind_mount_uniq, big_num(st.st_dev));
}
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL:
bind_devno = st.st_dev;
if (exec_mysql(INS_MOUNT) == NULL) {
fprintf(stderr, "Failed to update mount info for \"%s\" - %s\n",
bind_mount_uniq, mysql_error(dbh.mysql));
exit_cleanup(RERR_IPC);
}
break;
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
int rc, change_cnt;
sqlite3_stmt *stmt = statements[INS_MOUNT].sqlite;
sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 2, bind_mtime);
sqlite3_bind_text(stmt, 3, bind_mount_uniq, bind_mount_uniq_len, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 4, st.st_dev);
rc = sqlite3_step(stmt);
if (rc != SQLITE_DONE) {
fprintf(stderr, "Failed to insert mount info for \"%s\" - %s (%d)\n",
bind_mount_uniq, sqlite3_errmsg(dbh.sqlite), rc);
exit_cleanup(RERR_IPC);
}
change_cnt = sqlite3_changes(dbh.sqlite);
sqlite3_reset(stmt);
if (change_cnt == 0) {
stmt = statements[UPD_MOUNT].sqlite;
sqlite3_bind_int64(stmt, 1, bind_mtime);
sqlite3_bind_int64(stmt, 2, st.st_dev);
sqlite3_bind_text(stmt, 3, bind_thishost, bind_thishost_len, SQLITE_STATIC);
sqlite3_bind_text(stmt, 4, bind_mount_uniq, bind_mount_uniq_len, SQLITE_STATIC);
rc = sqlite3_step(stmt);
if (rc != SQLITE_DONE) {
fprintf(stderr, "Failed to update mount info for \"%s\" - %s (%d)\n",
bind_mount_uniq, sqlite3_errmsg(dbh.sqlite), rc);
exit_cleanup(RERR_IPC);
}
sqlite3_reset(stmt);
}
break;
}
#endif
}
}
close(f_from);
waitpid(pid, &status, 0);
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL: {
if (db_output_msgs) {
MYSQL_BIND binds[1];
MYSQL_STMT *stmt;
binds[0].buffer_type = MYSQL_TYPE_BLOB;
binds[0].buffer = bind_mount_uniq;
binds[0].buffer_length = sizeof bind_mount_uniq;
if (fetch_mysql(binds, 1, SEL_MOUNT, 1, &stmt)) {
while (1) {
printf("Marking mount \"%s\" as unmounted.\n", bind_mount_uniq);
if (mysql_stmt_fetch(stmt) != 0)
break;
}
mysql_stmt_free_result(stmt);
}
}
if (exec_mysql(UN_MOUNT) == NULL) {
fprintf(stderr, "Failed to update old mount info - %s\n",
mysql_error(dbh.mysql));
exit_cleanup(RERR_IPC);
}
break;
}
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
sqlite3_stmt *stmt;
int rc;
if (db_output_msgs) {
stmt = statements[SEL_MOUNT].sqlite;
sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 2, bind_mtime);
while (1) {
if (sqlite3_step(stmt) != SQLITE_ROW)
break;
printf("Marking mount \"%s\" as unmounted.\n", sqlite3_column_text(stmt, 0));
}
sqlite3_reset(stmt);
}
stmt = statements[UN_MOUNT].sqlite;
sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 2, bind_mtime);
rc = sqlite3_step(stmt);
sqlite3_reset(stmt);
if (rc != SQLITE_DONE) {
fprintf(stderr, "Failed to update old mount info - %s (%d)\n",
sqlite3_errmsg(dbh.sqlite), rc);
exit_cleanup(RERR_IPC);
}
break;
}
#endif
}
}
#endif
static unsigned int get_disk_id(int64 devno)
{
static unsigned int prior_disk_id = 0;
static int64 prior_devno = 0;
if (prior_devno == devno && prior_disk_id) {
if (DEBUG_GTE(DB, 5))
rprintf(FCLIENT, "get_disk_id(%s,%s) = %d (cached)\n", bind_thishost, big_num(devno), prior_disk_id);
return prior_disk_id;
}
prior_devno = devno;
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL: {
MYSQL_BIND binds[1];
bind_devno = devno; /* The one changing SEL_DEV input value. */
/* Bind where to put the output. */
binds[0].buffer_type = MYSQL_TYPE_LONG;
binds[0].buffer = &prior_disk_id;
if (!fetch_mysql(binds, 1, SEL_DEV, 1, NULL))
prior_disk_id = 0;
break;
}
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
sqlite3_stmt *stmt = statements[SEL_DEV].sqlite;
sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 2, devno);
if (sqlite3_step(stmt) == SQLITE_ROW)
prior_disk_id = sqlite3_column_int(stmt, 0);
else
prior_disk_id = 0;
sqlite3_reset(stmt);
break;
}
#endif
}
if (DEBUG_GTE(DB, 2))
rprintf(FCLIENT, "get_disk_id(%s,%s) = %d\n", bind_thishost, big_num(devno), prior_disk_id);
return prior_disk_id;
}
int db_get_checksum(const STRUCT_STAT *st_p, char *sum)
{
unsigned int disk_id = get_disk_id(st_p->st_dev);
int ok = 0;
if (disk_id == 0)
return 0;
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL: {
MYSQL_BIND binds[1];
bind_disk_id = disk_id;
bind_ino = st_p->st_ino;
bind_size = st_p->st_size;
bind_mtime = st_p->st_mtime;
if (!db_lax)
bind_ctime = st_p->st_ctime;
binds[0].buffer_type = MYSQL_TYPE_BLOB;
binds[0].buffer = sum;
binds[0].buffer_length = MD5_DIGEST_LEN;
ok = fetch_mysql(binds, 1, SEL_SUM, 1, NULL);
break;
}
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
sqlite3_stmt *stmt = statements[SEL_SUM].sqlite;
sqlite3_bind_int(stmt, 1, disk_id);
sqlite3_bind_int64(stmt, 2, st_p->st_ino);
sqlite3_bind_int64(stmt, 3, st_p->st_size);
sqlite3_bind_int64(stmt, 4, st_p->st_mtime);
if (!db_lax)
sqlite3_bind_int64(stmt, 5, st_p->st_ctime);
if (sqlite3_step(stmt) == SQLITE_ROW) {
int len = sqlite3_column_bytes(stmt, 0);
if (len > MAX_DIGEST_LEN)
len = MAX_DIGEST_LEN;
memcpy(sum, sqlite3_column_blob(stmt, 0), len);
ok = 1;
}
sqlite3_reset(stmt);
break;
}
#endif
}
if (DEBUG_GTE(DB, 2)) {
if (ok) {
rprintf(FCLIENT, "[%s] Found DB checksum for %s,%s,%d: %s\n",
who_am_i(), big_num(st_p->st_dev),
big_num(st_p->st_ino), md_num, sum_as_hex(md_num, sum, 0));
} else {
rprintf(FCLIENT, "[%s] No DB checksum for %s,%s,%d\n",
who_am_i(), big_num(st_p->st_dev),
big_num(st_p->st_ino), md_num);
}
}
return ok;
}
int db_get_both_checksums(const STRUCT_STAT *st_p, int *right_sum_cnt, int *wrong_sum_cnt, char **sum4, char **sum5)
{
static char dbsum[MD5_DIGEST_LEN*2];
int rows, j, sum_type[2];
int64 dbsize[2], dbmtime[2], dbctime[2];
unsigned int disk_id = get_disk_id(st_p->st_dev);
if (disk_id == 0)
return 0;
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL: {
MYSQL_BIND binds[5];
bind_disk_id = disk_id;
bind_ino = st_p->st_ino;
binds[0].buffer_type = MYSQL_TYPE_BLOB;
binds[0].buffer = dbsum;
binds[0].buffer_length = MD5_DIGEST_LEN;
binds[1].buffer_type = MYSQL_TYPE_LONG;
binds[1].buffer = (char*)sum_type;
binds[2].buffer_type = MYSQL_TYPE_LONGLONG;
binds[2].buffer = (char*)dbsize;
binds[3].buffer_type = MYSQL_TYPE_LONGLONG;
binds[3].buffer = (char*)dbmtime;
binds[4].buffer_type = MYSQL_TYPE_LONGLONG;
binds[4].buffer = (char*)dbctime;
rows = fetch_mysql(binds, 5, SEL_SUM, 2, NULL);
break;
}
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
sqlite3_stmt *stmt = statements[SEL_SUM].sqlite;
sqlite3_bind_int(stmt, 1, disk_id);
sqlite3_bind_int64(stmt, 2, st_p->st_ino);
for (j = 0; j < 2; j++) {
int len;
if (sqlite3_step(stmt) != SQLITE_ROW)
break;
len = sqlite3_column_bytes(stmt, 0);
if (len > MD5_DIGEST_LEN)
len = MD5_DIGEST_LEN;
memcpy(dbsum + MD5_DIGEST_LEN*j, sqlite3_column_blob(stmt, 0), len);
sum_type[j] = sqlite3_column_int(stmt, 1);
dbsize[j] = sqlite3_column_int(stmt, 2);
dbmtime[j] = sqlite3_column_int64(stmt, 3);
dbctime[j] = sqlite3_column_int64(stmt, 4);
}
sqlite3_reset(stmt);
rows = j;
break;
}
#endif
default:
return 0;
}
if (sum4)
*sum4 = NULL;
if (sum5)
*sum5 = NULL;
*right_sum_cnt = *wrong_sum_cnt = 0;
for (j = 0; j < rows; j++) {
if (DEBUG_GTE(DB, 3)) {
rprintf(FCLIENT, "DB checksum for %s,%s,%d: %s\n",
big_num(st_p->st_dev), big_num(st_p->st_ino), sum_type[j],
sum_as_hex(sum_type[j], dbsum + MD5_DIGEST_LEN*j, 0));
}
if (sum_type[j] == 4) {
if (!sum4)
continue;
*sum4 = dbsum + MD5_DIGEST_LEN*j;
} else {
if (!sum5)
continue;
*sum5 = dbsum + MD5_DIGEST_LEN*j;
}
if (st_p->st_size == dbsize[j] && st_p->st_mtime == dbmtime[j] && (db_lax || st_p->st_ctime == dbctime[j]))
++*right_sum_cnt;
else
++*wrong_sum_cnt;
}
return rows;
}
int db_set_checksum(int mdnum, const STRUCT_STAT *st_p, const char *sum)
{
unsigned int disk_id;
const char *errmsg = NULL;
int rc = 0;
if (am_receiver || (am_generator && same_db)) {
/* Forward the setting to a single process. The receiver always
* forwards to the generator, and the generator will forward to
* the sender ONLY if this is a local transfer. */
char data[MSG_CHECKSUM_LEN];
SIVAL64(data, 0, st_p->st_dev);
SIVAL64(data, 8, st_p->st_ino);
SIVAL64(data, 16, st_p->st_size);
SIVAL64(data, 24, st_p->st_mtime);
SIVAL64(data, 32, st_p->st_ctime);
#if MSG_CHECKSUM_LONGS != 5
#error Fix the setting of checksum long values
#endif
SIVAL(data, MSG_CHECKSUM_LONGS*8, mdnum);
memcpy(data + MSG_CHECKSUM_LONGS*8 + 4, sum, MAX_DIGEST_LEN);
return send_msg(MSG_CHECKSUM, data, sizeof data, 0);
}
if ((disk_id = get_disk_id(st_p->st_dev)) == 0)
return 0;
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL:
if (transaction_state == 0) {
if (!run_sql("BEGIN"))
return 0;
transaction_state = 1;
}
bind_disk_id = disk_id;
bind_ino = st_p->st_ino;
bind_mdnum = mdnum;
bind_size = st_p->st_size;
bind_mtime = st_p->st_mtime;
bind_ctime = st_p->st_ctime;
memcpy(bind_sum, sum, MD5_DIGEST_LEN);
if (exec_mysql(REP_SUM) == NULL)
errmsg = mysql_error(dbh.mysql);
break;
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
sqlite3_stmt *stmt = statements[REP_SUM].sqlite;
int lock_failures = 0;
if (transaction_state == 0) {
if (!run_sql("BEGIN"))
return 0;
transaction_state = 1;
}
sqlite3_bind_int(stmt, 1, disk_id);
sqlite3_bind_int64(stmt, 2, st_p->st_ino);
sqlite3_bind_int(stmt, 3, mdnum);
sqlite3_bind_int64(stmt, 4, st_p->st_size);
sqlite3_bind_int64(stmt, 5, st_p->st_mtime);
sqlite3_bind_int64(stmt, 6, st_p->st_ctime);
sqlite3_bind_blob(stmt, 7, sum, MD5_DIGEST_LEN, SQLITE_TRANSIENT);
while (1) {
rc = sqlite3_step(stmt);
if (rc != SQLITE_BUSY && rc != SQLITE_LOCKED)
break;
if (++lock_failures > MAX_LOCK_FAILURES)
break;
sqlite3_reset(stmt);
msleep(LOCK_FAIL_MSLEEP);
}
if (rc != SQLITE_DONE)
errmsg = sqlite3_errmsg(dbh.sqlite);
sqlite3_reset(stmt);
break;
}
#endif
}
if (!errmsg) {
if (DEBUG_GTE(DB, 2)) {
rprintf(FCLIENT, "[%s] Set DB checksum for %s,%s,%d: %s\n",
who_am_i(), big_num(st_p->st_dev), big_num(st_p->st_ino),
md_num, sum_as_hex(md_num, sum, 0));
}
} else {
rprintf(log_code, "[%s] Failed to set checksum for %s,%s,%d: %s (%d) -- closing DB\n",
who_am_i(), big_num(st_p->st_dev), big_num(st_p->st_ino),
md_num, errmsg, rc);
db_disconnect(False);
}
return errmsg ? 0 : 1;
}
/* For a delayed-update copy, we set the checksum on the file when it was
* inside the partial-dir. Since renaming the file changes its ctime, we need
* to update the ctime to its new value (we can skip this in db_lax mode). */
int db_update_ctime(UNUSED(int mdnum), const STRUCT_STAT *st_p)
{
unsigned int disk_id = get_disk_id(st_p->st_dev);
if (disk_id == 0)
return 0;
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL:
bind_ctime = st_p->st_ctime;
bind_disk_id = disk_id;
bind_ino = st_p->st_ino;
bind_mdnum = mdnum;
bind_size = st_p->st_size;
bind_mtime = st_p->st_mtime;
return exec_mysql(UPD_CTIME) != NULL;
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
int rc;
sqlite3_stmt *stmt = statements[UPD_CTIME].sqlite;
if (stmt == NULL)
return 0;
sqlite3_bind_int64(stmt, 1, st_p->st_ctime);
sqlite3_bind_int(stmt, 2, disk_id);
sqlite3_bind_int64(stmt, 3, st_p->st_ino);
sqlite3_bind_int(stmt, 4, mdnum);
sqlite3_bind_int64(stmt, 5, st_p->st_size);
sqlite3_bind_int64(stmt, 6, st_p->st_mtime);
rc = sqlite3_step(stmt);
sqlite3_reset(stmt);
return rc == SQLITE_DONE;
}
#endif
}
return 0;
}
static int db_clean_init(void)
{
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL: {
MYSQL_BIND binds[MAX_BIND_CNT];
char *sql;
mysql_query(dbh.mysql,
"CREATE TEMPORARY TABLE inode_present ("
" disk_id integer unsigned NOT NULL,"
" ino bigint unsigned NOT NULL,"
" PRIMARY KEY (disk_id,ino)"
") ENGINE=MEMORY"
);
sql="INSERT IGNORE INTO inode_present"
" SET disk_id = ?, ino = ?";
memset(binds, 0, sizeof binds);
binds[0].buffer_type = MYSQL_TYPE_LONG;
binds[0].buffer = &bind_disk_id;
binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
binds[1].buffer = &bind_ino;
if (!prepare_mysql(INS_PRESENT, binds, 2, sql))
exit_cleanup(RERR_SYNTAX);
sql="DELETE m.*"
" FROM inode_map AS m"
" LEFT JOIN inode_present AS p USING(disk_id, ino)"
" JOIN disk AS d ON(m.disk_id = d.disk_id)"
" WHERE host = ? AND devno != 0 AND p.disk_id IS NULL AND ctime < ?";
memset(binds, 0, sizeof binds);
binds[0].buffer_type = MYSQL_TYPE_STRING;
binds[0].buffer = &bind_thishost;
binds[0].buffer_length = bind_thishost_len;
binds[1].buffer_type = MYSQL_TYPE_LONGLONG;
binds[1].buffer = &bind_ctime;
if (!prepare_mysql(DEL_SUMS, binds, 2, sql))
exit_cleanup(RERR_SYNTAX);
return 1;
}
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
char *sql;
sql="ATTACH DATABASE '' AS aux1;"; /* Private temp DB, probably in-memory */
if (!run_sql(sql))
exit_cleanup(RERR_IPC);
sql="CREATE TABLE aux1.inode_present ("
" disk_id integer NOT NULL,"
" ino bigint NOT NULL,"
" PRIMARY KEY (disk_id,ino)"
")";
if (!run_sql(sql))
exit_cleanup(RERR_IPC);
sql="INSERT OR IGNORE INTO aux1.inode_present"
" (disk_id, ino)"
" VALUES (?, ?)";
if (!prepare_sqlite(INS_PRESENT, sql))
exit_cleanup(RERR_IPC);
sql="DELETE FROM inode_map"
" WHERE ROWID IN ("
" SELECT m.ROWID"
" FROM inode_map AS m"
" LEFT JOIN aux1.inode_present AS p USING(disk_id, ino)"
" JOIN disk AS d ON(m.disk_id = d.disk_id)"
" WHERE host = ? AND devno != 0 AND p.disk_id IS NULL AND ctime < ?"
" )";
if (!prepare_sqlite(DEL_SUMS, sql))
exit_cleanup(RERR_IPC);
transaction_state = -1; /* bug work-around -- force transaction off when cleaning XXX */
return 1;
}
#endif
}
return 0;
}
static int db_note_present(UNUSED(int disk_id), UNUSED(int64 ino))
{
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL:
bind_disk_id = disk_id;
bind_ino = ino;
return exec_mysql(INS_PRESENT) != NULL;
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
int rc;
sqlite3_stmt *stmt = statements[INS_PRESENT].sqlite;
sqlite3_bind_int(stmt, 1, disk_id);
sqlite3_bind_int64(stmt, 2, ino);
rc = sqlite3_step(stmt);
sqlite3_reset(stmt);
return rc == SQLITE_DONE;
}
#endif
}
return 0;
}
/* This function requires the user to have populated all disk_id+inode pairs
* into the inode_present table. */
static int db_clean_inodes(UNUSED(time_t start_time))
{
int del_cnt = 0;
/* The extra ctime < start_time check ensures that brand-new checksums that
* were added after the start of our cleaning run are not removed. */
switch (use_db) {
#ifdef USE_MYSQL
case DB_TYPE_MYSQL: {
MYSQL_STMT *stmt;
bind_ctime = start_time;
stmt = exec_mysql(DEL_SUMS);
if (stmt != NULL)
del_cnt = mysql_affected_rows(dbh.mysql);
break;
}
#endif
#ifdef USE_SQLITE
case DB_TYPE_SQLITE: {
int rc;
sqlite3_stmt *stmt = statements[DEL_SUMS].sqlite;
sqlite3_bind_text(stmt, 1, bind_thishost, bind_thishost_len, SQLITE_STATIC);
sqlite3_bind_int64(stmt, 2, start_time);
rc = sqlite3_step(stmt);
if (rc == SQLITE_DONE)
del_cnt = sqlite3_changes(dbh.sqlite);
sqlite3_reset(stmt);
break;
}
#endif
}
return del_cnt;
}
static int abs_path(char *buf, int bufsiz, const char *curdir, const char *dir)
{
if (*dir == '/')
strlcpy(buf, dir, bufsiz);
else {
int len = snprintf(buf, bufsiz, "%s/%s", curdir, dir);
assert(len > 0); /* silence a compiler warning */
}
return clean_fname(buf, CFN_DROP_TRAILING_DOT_DIR | CFN_COLLAPSE_DOT_DOT_DIRS);
}
static struct name_list *new_name(const char *basename, const char *filename)
{
struct name_list *n;
int blen = strlen(basename);
int slen = filename ? (int)strlen(filename) : -1;
int len = blen + 1 + slen;
if (len >= MAXPATHLEN) {
if (filename)
rprintf(FERROR, "Filename too long: %s/%s\n", basename, filename);
else
rprintf(FERROR, "Filename too long: %s\n", basename);
return NULL;
}
n = (struct name_list *)new_array(char, sizeof (struct name_list) + len);
memcpy(n->name, basename, blen);
if (filename) {
n->name[blen] = '/';
memcpy(n->name + 1 + blen, filename, slen);
}
n->name[len] = '\0';
n->next = NULL;
return n;
}
static int name_compare(const void *n1, const void *n2)
{
struct name_list *p1 = *(struct name_list **)n1;
struct name_list *p2 = *(struct name_list **)n2;
return strcmp(p1->name, p2->name);
}
static struct name_list *get_sorted_names(const char *dir)
{
struct name_list *add, **sortbuf, *names = NULL, *prior_name = NULL;
struct dirent *di;
int cnt = 0;
DIR *d;
if (!(d = opendir("."))) {
rprintf(FERROR, "Unable to opendir %s: %s\n", dir, strerror(errno));
return NULL;
}
while ((di = readdir(d)) != NULL) {
char *dname = d_name(di);
if (dname[0] == '.' && (dname[1] == '\0' || (dname[1] == '.' && dname[2] == '\0')))
continue;
if (!(add = new_name(dname, NULL)))
continue;
if (prior_name)
prior_name->next = add;
else
names = add;
prior_name = add;
cnt++;
}
closedir(d);
if (cnt) {
int j;
sortbuf = new_array(struct name_list *, cnt);
for (j = 0; j < cnt; j++) {
sortbuf[j] = names;
names = names->next;
}
qsort(sortbuf, cnt, PTR_SIZE, name_compare);
names = prior_name = NULL;
for (j = 0; j < cnt; j++) {
add = sortbuf[j];
if (prior_name)
prior_name->next = add;
else
names = add;
prior_name = add;
}
if (prior_name)
prior_name->next = NULL;
free(sortbuf);
}
return names;
}
static inline int sums_ne(const char *sum1, const char *sum2)
{
return memcmp(sum1, sum2, MD5_DIGEST_LEN) != 0;
}
/* Returns 1 if there is a checksum change, else 0. */
static int mention_file(const char *dir, const char *name, int right_cnt, int wrong_cnt,
const char *dbsum4, const char *dbsum5, const char *sum4, const char *sum5)
{
char *info_str = wrong_cnt && !right_cnt ? "!i " : " ";
char *md4_str = !db_do_md4 ? NULL : !dbsum4 ? "+4 " : !sum4 ? "?4 " : sums_ne(sum4, dbsum4) ? "!4 " : " ";
char *md5_str = !db_do_md5 ? NULL : !dbsum5 ? "+5 " : !sum5 ? "?5 " : sums_ne(sum5, dbsum5) ? "!5 " : " ";
int chg = *info_str != ' ' || (md4_str && *md4_str != ' ') || (md5_str && *md5_str != ' ');
if (chg || db_output_unchanged) {
if (db_output_info) {
fputs(info_str, stdout);
if (md4_str)
fputs(md4_str, stdout);
if (md5_str)
fputs(md5_str, stdout);
}
if (db_output_sum) {
if (db_do_md4)
printf("%s ", sum_as_hex(4, sum4, 0));
if (db_do_md5)
printf("%s ", sum_as_hex(5, sum5, 0));
}
if (db_output_name) {
if (db_output_sum)
putchar(' '); /* We want 2 spaces, like md5sum. */
if (*dir != '.' || dir[1]) {
fputs(dir, stdout);
putchar('/');
}
puts(name);
}
}
return chg;
}
NORETURN void run_dbonly(const char **args)
{
char start_dir[MAXPATHLEN], dirbuf[MAXPATHLEN];
int need_sum_cnt, start_dir_len;
struct name_list *prior_dir;
struct name_list *names;
time_t clean_start = 0;
int exit_code = 0;
checksum_type = 5;
need_sum_cnt = db_do_md4 + db_do_md5;
if (!db_read_config(FERROR, db_config) || !db_connect(1))
exit_cleanup(RERR_FILEIO);
if (db_clean) {
clean_start = time(NULL);
db_clean_init();
}
if (getcwd(start_dir, sizeof start_dir - 1) == NULL) {
rsyserr(FERROR, errno, "getcwd()");
exit_cleanup(RERR_FILESELECT);
}
start_dir_len = strlen(start_dir);
if (args) {
prior_dir = NULL;
while (*args) {
struct name_list *add;
if (abs_path(dirbuf, sizeof dirbuf, start_dir, *args++) <= 0)
continue;
if (!(add = new_name(dirbuf, NULL)))
continue;
if (prior_dir)
prior_dir->next = add;
else
dirs_list = add;
prior_dir = add;
}
} else
dirs_list = new_name(start_dir, NULL);
prior_dir = NULL;
while (dirs_list) {
struct name_list *subdirs, *prior_subdir, *prior_name;
const char *dir = dirs_list->name;
const char *reldir = dir;
if (prior_dir)
free((void*)prior_dir);
prior_dir = dirs_list;
dirs_list = dirs_list->next;
if (strncmp(reldir, start_dir, start_dir_len) == 0) {
if (reldir[start_dir_len] == '\0')
reldir = ".";
else if (reldir[start_dir_len] == '/')
reldir += start_dir_len + 1;
}
if (db_output_dirs)
printf("... %s/ ...\n", reldir);
if (chdir(dir) < 0) {
rprintf(FERROR, "Unable to chdir to %s: %s\n", dir, strerror(errno));
continue;
}
if (!(names = get_sorted_names(dir)))
continue;
subdirs = prior_subdir = prior_name = NULL;
while (names) {
STRUCT_STAT st;
char *dbsum4, *sum4, sumbuf4[MD5_DIGEST_LEN];
char *dbsum5, *sum5, sumbuf5[MD5_DIGEST_LEN];
int right_sum_cnt, wrong_sum_cnt;
const char *name = names->name;
unsigned int disk_id;
if (prior_name)
free((void*)prior_name);
prior_name = names;
names = names->next;
dbsum4 = dbsum5 = sum4 = sum5 = NULL;
if (lstat(name, &st) < 0) {
rprintf(FERROR, "Failed to lstat(%s): %s\n", name, strerror(errno));
continue;
}
if (S_ISLNK(st.st_mode))
continue;
if (S_ISDIR(st.st_mode)) {
/* add optional excluding of things like /^(CVS|\.svn|\.git|\.bzr)$/; */
if (recurse) {
struct name_list *add = new_name(dir, name);
if (add) {
if (prior_subdir)
prior_subdir->next = add;
else
subdirs = add;
prior_subdir = add;
}
}
continue;
}
if (!S_ISREG(st.st_mode))
continue;
if (!(disk_id = get_disk_id(st.st_dev)))
continue;
if (db_clean) {
db_note_present(disk_id, st.st_ino);
if (!db_update && !db_check)
continue;
}
db_get_both_checksums(&st, &right_sum_cnt, &wrong_sum_cnt,
db_do_md4 ? &dbsum4 : NULL, db_do_md5 ? &dbsum5 : NULL);
if (!db_check && right_sum_cnt == need_sum_cnt) {
mention_file(reldir, name, right_sum_cnt, wrong_sum_cnt, dbsum4, dbsum5, dbsum4, dbsum5);
continue;
}
if (db_update || (db_check && right_sum_cnt) || db_output_sum) {
uchar *data;
int32 remainder;
md_context m4;
MD5_CTX m5;
struct map_struct *buf;
OFF_T off, len = st.st_size;
int fd = do_open(name, O_RDONLY, 0);
if (fd < 0) {
rprintf(FERROR, "ERROR: unable to read %s: %s\n", name, strerror(errno));
continue;
}
if (db_do_md4)
mdfour_begin(&m4);
if (db_do_md5)
MD5_Init(&m5);
buf = map_file(fd, len, MAX_MAP_SIZE, CSUM_CHUNK);
for (off = 0; off + CSUM_CHUNK <= len; off += CSUM_CHUNK) {
data = (uchar*)map_ptr(buf, off, CSUM_CHUNK);
if (db_do_md4)
mdfour_update(&m4, data, CSUM_CHUNK);
if (db_do_md5)
MD5_Update(&m5, data, CSUM_CHUNK);
}
remainder = (int32)(len - off);
data = (uchar*)map_ptr(buf, off, remainder);
if (db_do_md4) {
mdfour_update(&m4, data, remainder);
mdfour_result(&m4, (uchar*)(sum4 = sumbuf4));
}
if (db_do_md5) {
MD5_Update(&m5, data, remainder);
MD5_Final((uchar*)(sum5 = sumbuf5), &m5);
}
close(fd);
unmap_file(buf);
}
int chg = mention_file(reldir, name, right_sum_cnt, wrong_sum_cnt, dbsum4, dbsum5, sum4, sum5);
if (!chg) {
/* Only db_check should get here... */
} else if (!db_update) {
exit_code = 1;
} else {
int fail = 0;
if (db_do_md4 && !db_set_checksum(4, &st, sum4))
fail = 1;
if (db_do_md5 && !db_set_checksum(5, &st, sum5))
fail = 1;
if (fail) {
fprintf(stderr, "Failed to set checksum on %s/%s\n", reldir, name);
exit_cleanup(RERR_FILEIO);
}
}
}
if (prior_name)
free((void*)prior_name);
if (recurse && subdirs) {
prior_subdir->next = dirs_list;
dirs_list = subdirs;
}
}
if (prior_dir)
free((void*)prior_dir);
if (db_clean) {
int rows = db_clean_inodes(clean_start);
if (db_output_msgs)
printf("Cleaned out %d old inode%s.\n", rows, rows == 1 ? "" : "s");
}
db_disconnect(True);
exit(exit_code);
}
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>