upend/db/src/stores/fs/mod.rs

1155 lines
38 KiB
Rust

use self::db::files;
use super::{Blob, StoreError, UpStore, UpdateOptions, UpdatePathOutcome};
use crate::hierarchies::{resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode};
use crate::jobs::{JobContainer, JobHandle};
use crate::util::hash_at_path;
use crate::{
BlobMode, ConnectionOptions, LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR,
};
use anyhow::{anyhow, Result};
use chrono::prelude::*;
use diesel::r2d2::{self, ConnectionManager, ManageConnection};
use diesel::ExpressionMethods;
use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection};
use jwalk::WalkDir;
use lru::LruCache;
use rayon::prelude::*;
use serde_json::json;
use std::borrow::Borrow;
use std::convert::TryInto;
use std::path::PathBuf;
use std::path::{Component, Path};
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use std::{fs, iter};
use tracing::{error, info, trace, warn};
use upend_base::addressing::Address;
use upend_base::constants::{ATTR_ADDED, ATTR_BY, ATTR_IN, ATTR_LABEL, ATTR_OF, TYPE_HASH_ADDRESS};
use upend_base::entry::Entry;
use upend_base::hash::{b58_encode, UpMultihash};
mod db;
pub const FILE_MIME_KEY: &str = "FILE_MIME";
const FILE_SIZE_KEY: &str = "FILE_SIZE";
pub struct FsStore {
path: PathBuf,
pool: r2d2::Pool<ConnectionManager<SqliteConnection>>,
lock: Arc<RwLock<()>>,
}
impl FsStore {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<FsStore> {
trace!("Initializing FS store.");
let path = path.as_ref().to_path_buf().canonicalize()?;
let store_dir = path.join(UPEND_SUBDIR);
if !store_dir.exists() {
fs::create_dir(&store_dir)?;
}
let manager = ConnectionManager::<SqliteConnection>::new(
store_dir.join("upend_vault.sqlite3").to_str().unwrap(),
);
// while diesel doesn't support multiple embedded migrations...
let connection = manager.connect()?;
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS files
(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
hash BLOB NOT NULL,
path VARCHAR NOT NULL,
valid BOOLEAN NOT NULL DEFAULT TRUE,
added DATETIME NOT NULL,
size BIGINT NOT NULL,
mtime DATETIME NULL
);
CREATE INDEX IF NOT EXISTS files_hash ON files (hash);
PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);
"#,
)?;
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {
busy_timeout: Some(Duration::from_secs(30)),
enable_wal_mode: true,
mutex: Arc::new(Mutex::new(())),
}))
.error_handler(Box::new(LoggingHandler { name: "fs_store" }))
.build(manager)?;
let lock = Arc::new(RwLock::new(()));
trace!("FS store created.");
Ok(FsStore { path, pool, lock })
}
#[tracing::instrument(name = "FS store rescan", skip_all)]
fn rescan_vault<D: Borrow<UpEndDatabase>>(
&self,
db: D,
job_handle: JobHandle,
options: UpdateOptions,
) -> Result<Vec<UpdatePathOutcome>> {
let start = Instant::now();
info!("Vault rescan started.");
let quick_check = options.initial;
let db = db.borrow();
let upconnection = db.connection()?;
// Initialize types, etc...
trace!("Initializing DB types.");
upend_insert_addr!(
upconnection,
Address::Attribute(FILE_SIZE_KEY.parse().unwrap()),
ATTR_OF,
TYPE_HASH_ADDRESS
)?;
upend_insert_addr!(
upconnection,
Address::Attribute(FILE_MIME_KEY.parse().unwrap()),
ATTR_OF,
TYPE_HASH_ADDRESS
)?;
// Walk through the vault, find all paths
trace!("Traversing vault directory");
let absolute_dir_path = fs::canonicalize(&*self.path)?;
let paths: Vec<PathBuf> = WalkDir::new(&*self.path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
.filter_map(|e| fs::canonicalize(e.path()).ok())
.filter(|e| e.is_file())
.filter(|e| !e.starts_with(absolute_dir_path.join(UPEND_SUBDIR)))
.collect();
// Prepare for processing
let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?));
// Actual processing
let count = RwLock::new(0_usize);
#[allow(clippy::type_complexity)]
let resolve_cache: Arc<Mutex<LruCache<(Option<Address>, UNode), Address>>> =
Arc::new(Mutex::new(LruCache::new(256)));
let total = paths.len() as f32;
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let path_outcomes: Vec<UpdatePathOutcome> = paths
.into_par_iter()
.map(|path| {
let result = self.process_directory_entry(
db,
path.clone(),
options.tree_mode.clone(),
options.initial,
&existing_files,
&resolve_cache,
quick_check,
);
let mut cnt = count.write().unwrap();
*cnt += 1;
let _ = shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0);
match result {
Ok(result) => result,
Err(error) => {
error!("Failed to update {:?} ({})", path, error);
UpdatePathOutcome::Failed(path, StoreError::Unknown(error.to_string()))
}
}
})
.collect();
trace!("Processing done, cleaning up...");
let existing_files = existing_files.read().unwrap();
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
let trans_result = self.file_set_valid(file.id, false);
match trans_result {
Ok(_) => {
info!("Removed: {:?}", file.path);
UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))
}
Err(error) => UpdatePathOutcome::Failed(
PathBuf::from(file.path.clone()),
StoreError::Unknown(error.to_string()),
),
}
});
// Reporting
let all_outcomes = path_outcomes
.into_iter()
.chain(cleanup_results)
.collect::<Vec<UpdatePathOutcome>>();
let mut failed: Vec<(&PathBuf, &StoreError)> = vec![];
let mut created = 0;
let mut unchanged = 0;
let mut skipped = 0;
let mut deleted = 0;
for outcome in &all_outcomes {
match outcome {
UpdatePathOutcome::Added(_) => created += 1,
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
UpdatePathOutcome::Skipped(_) => skipped += 1,
UpdatePathOutcome::Removed(_) => deleted += 1,
UpdatePathOutcome::Failed(path, err) => failed.push((path, err)),
}
}
if !failed.is_empty() {
warn!(
"{} path updates failed! ({})",
failed.len(),
failed
.iter()
.map(|(path, error)| format!("{:?}: {}", path, error))
.collect::<Vec<String>>()
.join(", ")
)
}
info!(
"Finished updating {:?} ({} created, {} deleted, {} skipped, {} left unchanged). Took {}s.",
self.path,
created,
deleted,
skipped,
unchanged,
start.elapsed().as_secs()
);
Ok(all_outcomes)
}
#[allow(clippy::too_many_arguments)]
fn process_directory_entry<D: Borrow<UpEndDatabase>>(
&self,
db: D,
path: PathBuf,
mode: BlobMode,
initial: bool,
existing_files: &Arc<RwLock<Vec<db::File>>>,
resolve_cache: &Arc<Mutex<ResolveCache>>,
quick_check: bool,
) -> Result<UpdatePathOutcome> {
trace!("Processing: {:?}", path);
// Prepare the data
let existing_files = Arc::clone(existing_files);
let normalized_path = self.normalize_path(&path)?;
let normalized_path_str = normalized_path
.to_str()
.ok_or(anyhow!("Path not valid unicode!"))?;
let mut file_hash: Option<UpMultihash> = None;
// Get size & mtime for quick comparison
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
if size == 0 {
return Ok(UpdatePathOutcome::Skipped(path));
}
let mtime: Option<NaiveDateTime> = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp_opt(
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
0,
)
})
.ok()
.flatten();
// Check if the path entry for this file already exists in database
let existing_files_read = existing_files.read().unwrap();
let maybe_existing_file = existing_files_read
.iter()
.find(|file| file.path == normalized_path_str);
if let Some(existing_file) = maybe_existing_file {
let existing_file = existing_file.clone();
drop(existing_files_read);
if !quick_check || size == existing_file.size {
let same_mtime = mtime.is_some() && mtime == existing_file.mtime;
let mut same_hash = false;
if !quick_check || !same_mtime {
file_hash = Some(hash_at_path(&path)?);
same_hash = file_hash.as_ref().unwrap() == &existing_file.hash;
}
if same_mtime || same_hash {
if mtime != existing_file.mtime {
self.file_update_mtime(existing_file.id, mtime)?;
}
if !existing_file.valid {
self.file_set_valid(existing_file.id, true)?;
}
let mut existing_files_write = existing_files.write().unwrap();
let maybe_existing_file = existing_files_write
.iter()
.enumerate()
.find(|(_, file)| file.path == normalized_path_str)
.map(|(idx, _)| idx);
if let Some(idx) = maybe_existing_file {
existing_files_write.swap_remove(idx);
return if existing_file.valid {
trace!("Unchanged: {:?}", path);
Ok(UpdatePathOutcome::Unchanged(path))
} else {
trace!("Re-added: {:?}", path);
Ok(UpdatePathOutcome::Added(path.clone()))
};
}
}
}
} else {
drop(existing_files_read);
}
// If not, hash it.
if file_hash.is_none() {
file_hash = Some(hash_at_path(&path)?);
}
let file_hash = file_hash.unwrap();
let connection: UpEndConnection = db.borrow().connection()?;
let file_is_known = !connection
.query(
format!(
"(matches @{} \"{}\" ?)",
Address::Hash(file_hash.clone()),
ATTR_IN
)
.parse()?,
)?
.is_empty();
let upath = if !file_is_known || initial {
self.path_to_upath(&path, mode)?
} else {
None
};
self.insert_file_with_metadata(
&db.borrow().connection()?,
&path,
upath,
file_hash,
None,
size,
mtime,
Some(resolve_cache),
)
.map(|_| {
info!("Added: {:?}", path);
UpdatePathOutcome::Added(path.clone())
})
}
fn path_to_upath(&self, path: &Path, mode: BlobMode) -> Result<Option<UHierPath>> {
match mode {
BlobMode::Flat => {
let normalized_path = self.normalize_path(path).unwrap();
let dirname = normalized_path.parent().and_then(|p| p.components().last());
let upath = UHierPath(if let Some(dirname) = dirname {
vec![
"NATIVE".parse().unwrap(),
UNode::from_str(&dirname.as_os_str().to_string_lossy()).unwrap(),
]
} else {
vec!["NATIVE".parse().unwrap()]
});
Ok(Some(upath))
}
BlobMode::Mirror => {
let normalized_path = self.normalize_path(path).unwrap();
let path = normalized_path.parent().unwrap();
let upath =
iter::once("NATIVE".parse().unwrap())
.chain(path.iter().map(|component| {
UNode::from_str(&component.to_string_lossy()).unwrap()
}))
.collect::<Vec<UNode>>();
Ok(Some(UHierPath(upath)))
}
BlobMode::Incoming(group) => {
let upath = UHierPath(vec![group.unwrap_or("INCOMING".to_string()).parse()?]);
Ok(Some(upath))
}
BlobMode::StoreOnly => Ok(None),
}
}
#[allow(clippy::too_many_arguments)]
fn insert_file_with_metadata(
&self,
connection: &UpEndConnection,
path: &Path,
upath: Option<UHierPath>,
hash: UpMultihash,
name: Option<String>,
size: i64,
mtime: Option<NaiveDateTime>,
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
) -> Result<Address> {
let normalized_path = self.normalize_path(path)?;
let new_file = db::NewFile {
path: normalized_path
.to_str()
.ok_or(anyhow!("Path not UTF-8?!"))?
.to_string(),
hash: hash.to_bytes(),
added: NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap(),
size,
mtime,
};
let blob_address = Address::Hash(hash);
// Metadata
let size_entry = Entry {
entity: blob_address.clone(),
attribute: FILE_SIZE_KEY.parse().unwrap(),
value: (size as f64).into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
let mime_entry = mime_type.map(|mime_type| Entry {
entity: blob_address.clone(),
attribute: FILE_MIME_KEY.parse().unwrap(),
value: mime_type.into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
});
let added_entry = Entry {
entity: blob_address.clone(),
attribute: ATTR_ADDED.parse().unwrap(),
value: (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as f64)
.into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
let components = normalized_path.components().collect::<Vec<Component>>();
let filename = components.last().unwrap();
let file_count = self.insert_file_record(new_file)?;
connection.insert_entry_immutable(size_entry)?;
if file_count == 1 {
connection.insert_entry_immutable(added_entry)?;
}
if let Some(mime_entry) = mime_entry {
connection.insert_entry(mime_entry)?;
}
let label_entry = Entry {
entity: blob_address.clone(),
attribute: ATTR_LABEL.parse().unwrap(),
value: name
.unwrap_or_else(|| filename.as_os_str().to_string_lossy().to_string())
.into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
let label_entry_addr = connection.insert_entry(label_entry)?;
if let Some(upath) = upath {
let resolved_path = match resolve_cache {
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
None => resolve_path(connection, &upath, true)?,
};
let parent_dir = resolved_path.last().unwrap();
let dir_has_entry = Entry {
entity: blob_address.clone(),
attribute: ATTR_IN.parse().unwrap(),
value: parent_dir.clone().into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
let alias_entry = Entry {
entity: dir_has_entry_addr,
attribute: ATTR_BY.parse().unwrap(),
value: label_entry_addr.into(),
provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(),
};
connection.insert_entry(alias_entry)?;
}
Ok(blob_address)
}
fn insert_file_record(&self, file: db::NewFile) -> Result<u32> {
trace!(
"Inserting {} ({})...",
&file.path,
Address::Hash(UpMultihash::from_bytes(&file.hash)?)
);
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
diesel::insert_into(files::table)
.values(&file)
.execute(&conn)?;
Ok(files::dsl::files
.filter(files::dsl::valid.eq(true))
.filter(files::dsl::hash.eq(file.hash))
.count()
.first::<i64>(&conn)?
.try_into()
.unwrap())
}
fn retrieve_file(&self, obj_hash: &UpMultihash) -> Result<Vec<db::OutFile>> {
use self::db::files::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let matches = files
.filter(valid.eq(true))
.filter(hash.eq(&obj_hash.to_bytes()))
.load::<db::File>(&conn)?;
let matches = matches
.into_iter()
.map(|f| db::OutFile {
id: f.id,
hash: f.hash,
path: self.path.join(PathBuf::from(f.path)),
valid: f.valid,
added: f.added,
size: f.size,
mtime: f.mtime,
})
.collect();
Ok(matches)
}
fn retrieve_all_files(&self) -> Result<Vec<db::File>> {
use self::db::files::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let matches = files.load::<db::File>(&conn)?;
Ok(matches)
}
fn file_update_mtime(&self, file_id: i32, m_time: Option<NaiveDateTime>) -> Result<usize> {
use self::db::files::dsl::*;
trace!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(mtime.eq(m_time))
.execute(&conn)?)
}
fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result<usize> {
use self::db::files::dsl::*;
trace!("Setting file ID {} to valid = {}", file_id, is_valid);
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(valid.eq(is_valid))
.execute(&conn)?)
}
fn normalize_path(&self, path: &Path) -> Result<PathBuf> {
Ok(path
.canonicalize()?
.strip_prefix(self.path.as_path())?
.to_path_buf())
}
}
impl From<db::OutFile> for Blob {
fn from(of: db::OutFile) -> Self {
Blob { file_path: of.path }
}
}
impl From<db::File> for Blob {
fn from(f: db::File) -> Self {
Blob {
file_path: PathBuf::from(f.path),
}
}
}
impl UpStore for FsStore {
fn retrieve(&self, hash: &UpMultihash) -> Result<Vec<Blob>, super::StoreError> {
Ok(self
.retrieve_file(hash)
.map_err(|e| StoreError::Unknown(e.to_string()))?
.into_iter()
.map(Blob::from)
.collect())
}
fn retrieve_all(&self) -> Result<Vec<Blob>, super::StoreError> {
Ok(self
.retrieve_all_files()
.map_err(|e| StoreError::Unknown(e.to_string()))?
.into_iter()
.map(Blob::from)
.collect())
}
fn store(
&self,
connection: &UpEndConnection,
blob: Blob,
name_hint: Option<String>,
blob_mode: Option<BlobMode>,
) -> Result<UpMultihash, super::StoreError> {
let file_path = blob.get_file_path();
let hash = hash_at_path(file_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
let existing_files = self.retrieve(&hash)?;
if existing_files.is_empty() {
let address = Address::Hash(hash.clone());
let addr_str = b58_encode(
address
.encode()
.map_err(|e| StoreError::Unknown(e.to_string()))?,
);
let final_name = if let Some(name_hint) = &name_hint {
format!("{addr_str}_{name_hint}")
} else {
addr_str
};
let final_path = self.path.join(final_name);
fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
let upath = if let Some(bm) = blob_mode {
self.path_to_upath(&final_path, bm)
.map_err(|e| StoreError::Unknown(e.to_string()))?
} else {
None
};
let metadata =
fs::metadata(&final_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
let size = metadata.len() as i64;
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp_opt(
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
0,
)
})
.ok()
.flatten();
self.insert_file_with_metadata(
connection,
&final_path,
upath,
hash.clone(),
name_hint,
size,
mtime,
None,
)
.map_err(|e| StoreError::Unknown(e.to_string()))?;
}
Ok(hash)
}
fn update(
&self,
db: &UpEndDatabase,
mut job_container: JobContainer,
options: UpdateOptions,
) -> Result<Vec<UpdatePathOutcome>, StoreError> {
trace!(
"Running a vault update of {:?}, options = {:?}.",
self.path,
options
);
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
match job_result {
Ok(job_handle) => {
let result = self.rescan_vault(db, job_handle, options);
if let Err(err) = &result {
error!("Update did not succeed! {:?}", err);
}
result.map_err(|err| StoreError::Unknown(err.to_string()))
}
Err(err) => Err(StoreError::Unknown(err.to_string())),
}
}
fn stats(&self) -> std::result::Result<serde_json::Value, StoreError> {
let files = self
.retrieve_all_files()
.map_err(|e| StoreError::Unknown(e.to_string()))?;
let mut files_by_hash = std::collections::HashMap::new();
for file in &files {
if !files_by_hash.contains_key(&file.hash) {
files_by_hash.insert(&file.hash, vec![]);
}
files_by_hash.get_mut(&file.hash).unwrap().push(file);
}
for paths in files_by_hash.values_mut() {
paths.sort_unstable_by_key(|f| !f.valid);
}
let mut blobs = files_by_hash
.iter()
.map(|(hash, files)| {
json!({
"hash": hash,
"size": files[0].size,
"paths": files.iter().map(|f| json!({
"added": f.added,
"valid": f.valid,
"path": f.path
})).collect::<serde_json::Value>()
})
})
.collect::<Vec<serde_json::Value>>();
blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap());
blobs.reverse();
Ok(json!({
"totals": {
"count": files_by_hash.len(),
"size": files_by_hash.values().map(|f| f[0].size as u64).sum::<u64>()
},
"blobs": blobs
}))
}
}
#[cfg(test)]
mod test {
use crate::jobs::JobContainer;
use crate::UpEndDatabase;
use super::*;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
use std::sync::Once;
use tracing_subscriber::filter::EnvFilter;
static INIT: Once = Once::new();
pub fn initialize() {
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::builder().from_env_lossy())
.init();
})
}
#[test]
fn test_update() {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let temp_dir_path = temp_dir.path().canonicalize().unwrap();
let file_path = temp_dir_path.join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir_path.join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let file_path = temp_dir_path.join("empty");
File::create(file_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, true).unwrap();
let store = FsStore::from_path(&temp_dir).unwrap();
let job_container = JobContainer::new();
// Store scan
let rescan_result = store.update(
&open_result.db,
job_container,
UpdateOptions {
initial: true,
tree_mode: BlobMode::default(),
},
);
assert!(rescan_result.is_ok());
}
#[test]
fn test_rescan_quick() {
initialize();
_test_rescan(true)
}
#[test]
fn test_rescan_full() {
initialize();
_test_rescan(false)
}
fn _test_rescan(quick: bool) {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let temp_dir_path = temp_dir.path().canonicalize().unwrap();
let file_path = temp_dir_path.join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir_path.join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let empty_path = temp_dir_path.join("empty");
File::create(&empty_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, true).unwrap();
let store = FsStore::from_path(&temp_dir).unwrap();
let mut job_container = JobContainer::new();
// Initial scan
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(
&open_result.db,
job,
UpdateOptions {
initial: quick,
tree_mode: BlobMode::default(),
},
);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result.into_iter().for_each(|outcome| match outcome {
UpdatePathOutcome::Added(_) => (),
UpdatePathOutcome::Skipped(path) => assert_eq!(path, empty_path),
outcome => panic!("Unexpected outcome: {:?}", outcome),
});
// Modification-less rescan
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(
&open_result.db,
job,
UpdateOptions {
initial: quick,
tree_mode: BlobMode::default(),
},
);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result.into_iter().for_each(|outcome| {
assert!(matches!(
outcome,
UpdatePathOutcome::Unchanged(_) | UpdatePathOutcome::Skipped(_)
))
});
// Remove a file
std::fs::remove_file(temp_dir_path.join("hello-world.txt")).unwrap();
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(
&open_result.db,
job,
UpdateOptions {
initial: quick,
tree_mode: BlobMode::default(),
},
);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Skipped(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_)))
.count()
);
assert!(store
.retrieve_all_files()
.unwrap()
.iter()
.filter(|f| f.path == "hello-world.txt")
.all(|f| !f.valid));
assert!(store
.retrieve_all_files()
.unwrap()
.iter()
.filter(|f| f.path == "hello-world.txt")
.all(|f| !f.valid));
// Re-add the file
let file_path = temp_dir_path.join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(
&open_result.db,
job,
UpdateOptions {
initial: quick,
tree_mode: BlobMode::default(),
},
);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Skipped(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Added(_)))
.count()
);
assert!(store
.retrieve_all_files()
.unwrap()
.iter()
.filter(|f| f.path == "hello-world.txt")
.all(|f| f.valid));
assert!(store
.retrieve_all_files()
.unwrap()
.iter()
.filter(|f| f.path == "hello-world.txt")
.all(|f| f.valid));
}
/// Prepare a temporary filesystem structure for testing
/// Returns the database connection
/// The structure is as follows:
/// ```text
/// NATIVE
/// ├── nested_directory
/// │   ├── nested_two
/// │   │   └── nested_three
/// │   │   │ └── foo.txt
/// │   │ └── nested_four
/// │   │ └── baz.txt
/// │   └── nested_three
/// │   └── bar.txt
/// └── in_root.txt
/// ```
fn _prepare_hier_vault(tree_mode: BlobMode) -> (UpEndConnection, TempDir) {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let temp_dir_path = temp_dir.path().canonicalize().unwrap();
let nested_directory_path = temp_dir_path.join("nested_directory");
fs::create_dir(&nested_directory_path).unwrap();
let nested_two_path = nested_directory_path.join("nested_two");
fs::create_dir(&nested_two_path).unwrap();
let nested_three_first_path = nested_directory_path.join("nested_three");
fs::create_dir(&nested_three_first_path).unwrap();
let nested_three_second_path = nested_two_path.join("nested_three");
fs::create_dir(&nested_three_second_path).unwrap();
let nested_four_path = nested_two_path.join("nested_four");
fs::create_dir(&nested_four_path).unwrap();
let file_path = nested_three_second_path.join("foo.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World! I'm foo, and deep.").unwrap();
let file_path = nested_three_first_path.join("bar.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World! I'm bar, and shallower.").unwrap();
let file_path = nested_four_path.join("baz.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World! I'm baz.").unwrap();
let file_path = temp_dir_path.join("in_root.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World! I'm in root.").unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, true).unwrap();
let store = FsStore::from_path(&temp_dir).unwrap();
let mut job_container = JobContainer::new();
// Initial scan
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
store
.rescan_vault(
&open_result.db,
job,
UpdateOptions {
initial: true,
tree_mode,
},
)
.unwrap();
(open_result.db.connection().unwrap(), temp_dir)
}
fn assert_paths(paths: Vec<&str>, connection: &UpEndConnection) {
paths.iter().for_each(|path| {
let upath: UHierPath = path.parse().unwrap();
assert!(
resolve_path(&connection, &upath, false).is_ok(),
"Failed: {}",
upath
);
});
}
fn test_initial_scan(mode: BlobMode, expected_paths: Vec<&str>) {
let (connection, _vault_dir) = _prepare_hier_vault(mode);
assert_paths(expected_paths, &connection);
}
#[test]
fn test_mirror_mode() {
test_initial_scan(
BlobMode::Mirror,
vec![
"NATIVE",
"NATIVE/nested_directory/nested_two/nested_three/foo.txt",
"NATIVE/nested_directory/nested_two/nested_four/baz.txt",
"NATIVE/nested_directory/nested_three/bar.txt",
"NATIVE/in_root.txt",
],
);
}
#[test]
fn test_flat_mode() {
test_initial_scan(
BlobMode::Flat,
vec![
"NATIVE",
"NATIVE/nested_three/foo.txt",
"NATIVE/nested_four/baz.txt",
"NATIVE/nested_three/bar.txt",
"NATIVE/in_root.txt",
],
);
}
#[test]
fn test_incoming_mode() {
test_initial_scan(
BlobMode::Incoming(None),
vec![
"INCOMING/foo.txt",
"INCOMING/baz.txt",
"INCOMING/bar.txt",
"INCOMING/in_root.txt",
],
);
test_initial_scan(
BlobMode::Incoming(Some("new files".to_string())),
vec![
"new files/foo.txt",
"new files/baz.txt",
"new files/bar.txt",
"new files/in_root.txt",
],
);
}
}