upend/src/database/stores/fs/mod.rs

898 lines
29 KiB
Rust

use self::db::files;
use super::{Blob, StoreError, UpStore, UpdatePathOutcome};
use crate::addressing::Address;
use crate::database::constants::{
ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR,
TYPE_HAS_ATTR,
};
use crate::database::entry::{Entry, InvariantEntry};
use crate::database::hierarchies::{
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
};
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
use crate::util::hash::{b58_encode, Hash, Hashable};
use crate::util::jobs::{JobContainer, JobHandle};
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use diesel::r2d2::{self, ConnectionManager, ManageConnection};
use diesel::ExpressionMethods;
use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection};
use log::{debug, error, info, trace, warn};
use lru::LruCache;
use rayon::prelude::*;
use serde_json::json;
use std::borrow::Borrow;
use std::convert::{TryFrom, TryInto};
use std::path::PathBuf;
use std::path::{Component, Path};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::{fs, iter};
use walkdir::WalkDir;
mod db;
const BLOB_TYPE: &str = "BLOB";
const ALIAS_KEY: &str = "ALIAS";
pub const FILE_MIME_KEY: &str = "FILE_MIME";
const FILE_MTIME_KEY: &str = "FILE_MTIME";
const FILE_SIZE_KEY: &str = "FILE_SIZE";
lazy_static! {
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
attribute: String::from(TYPE_BASE_ATTR),
value: BLOB_TYPE.into(),
};
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
}
struct PragmaSynchronousGuard<'a>(&'a SqliteConnection);
impl Drop for PragmaSynchronousGuard<'_> {
fn drop(&mut self) {
debug!("Re-enabling synchronous mode.");
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
if let Err(err) = res {
error!(
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
err
);
}
}
}
#[derive(Debug)]
struct ConnectionOptions;
impl ConnectionOptions {
pub fn apply(&self, conn: &SqliteConnection) -> diesel::QueryResult<()> {
debug!(r#"Setting "synchronous" to NORMAL"#);
conn.execute("PRAGMA synchronous = NORMAL;")?;
Ok(())
}
}
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
for ConnectionOptions
{
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
}
}
pub struct FsStore {
path: PathBuf,
pool: r2d2::Pool<ConnectionManager<SqliteConnection>>,
lock: Arc<RwLock<()>>,
}
impl FsStore {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<FsStore> {
debug!("Initializing FS store.");
let path = path.as_ref().to_path_buf().canonicalize()?;
let store_dir = path.join(UPEND_SUBDIR);
if !store_dir.exists() {
fs::create_dir(&store_dir)?;
}
let manager = ConnectionManager::<SqliteConnection>::new(
store_dir.join("upend_vault.sqlite3").to_str().unwrap(),
);
// while diesel doesn't support multiple embedded migrations...
let connection = manager.connect()?;
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS files
(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
hash BLOB NOT NULL,
path VARCHAR NOT NULL,
valid BOOLEAN NOT NULL DEFAULT TRUE,
added DATETIME NOT NULL,
size BIGINT NOT NULL,
mtime DATETIME NULL
);
CREATE INDEX IF NOT EXISTS files_hash ON files (hash);
PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);
"#,
)?;
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {}))
.build(manager)?;
let lock = Arc::new(RwLock::new(()));
debug!("FS store created.");
Ok(FsStore { path, pool, lock })
}
fn rescan_vault<D: Borrow<UpEndDatabase>>(
&self,
db: D,
job_handle: JobHandle,
quick_check: bool,
disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
let start = Instant::now();
info!("Vault rescan started.");
let db = db.borrow();
let upconnection = db.connection()?;
let connection = self.pool.get()?;
// Initialize types, etc...
debug!("Initializing DB types.");
upconnection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
upend_insert_addr!(upconnection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
upend_insert_val!(upconnection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
// Disable syncing in SQLite for the duration of the import
let mut _guard: Option<PragmaSynchronousGuard> = None;
if disable_synchronous {
debug!("Disabling SQLite synchronous mode");
connection.execute("PRAGMA synchronous = OFF;")?;
_guard = Some(PragmaSynchronousGuard(&connection));
}
// Walk through the vault, find all paths
debug!("Traversing vault directory");
let absolute_dir_path = fs::canonicalize(&*self.path)?;
let path_entries: Vec<PathBuf> = WalkDir::new(&*self.path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
.filter_map(|e| fs::canonicalize(e.into_path()).ok())
.filter(|e| e.is_file())
.filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR)))
.collect();
// Prepare for processing
let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?));
// Actual processing
let count = RwLock::new(0_usize);
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
let total = path_entries.len() as f32;
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
.into_par_iter()
.map(|path| {
let result = self.process_directory_entry(
db,
&resolve_cache,
path.clone(),
&existing_files,
quick_check,
);
let mut cnt = count.write().unwrap();
*cnt += 1;
let _ = shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0);
match result {
Ok(result) => result,
Err(error) => {
error!("Failed to update {:?} ({})", path, error);
UpdatePathOutcome::Failed(path, StoreError::Unknown(error.to_string()))
}
}
})
.collect();
debug!("Processing done, cleaning up...");
let existing_files = existing_files.read().unwrap();
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
let trans_result = upconnection.transaction::<_, Error, _>(|| {
self.file_set_valid(file.id, false)?;
upconnection.remove_object(Address::from(file.clone()))?;
Ok(())
});
match trans_result {
Ok(_) => {
info!("Removed: {:?}", file.path);
UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))
}
Err(error) => UpdatePathOutcome::Failed(
PathBuf::from(file.path.clone()),
StoreError::Unknown(error.to_string()),
),
}
});
// Re-enable SQLite syncing
drop(_guard);
// Reporting
let all_outcomes = path_outcomes
.into_iter()
.chain(cleanup_results)
.collect::<Vec<UpdatePathOutcome>>();
let mut failed: Vec<(&PathBuf, &StoreError)> = vec![];
let mut created = 0;
let mut unchanged = 0;
let mut skipped = 0;
let mut deleted = 0;
for outcome in &all_outcomes {
match outcome {
UpdatePathOutcome::Added(_) => created += 1,
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
UpdatePathOutcome::Skipped(_) => skipped += 1,
UpdatePathOutcome::Removed(_) => deleted += 1,
UpdatePathOutcome::Failed(path, err) => failed.push((path, err)),
}
}
if !failed.is_empty() {
warn!(
"{} path updates failed! ({})",
failed.len(),
failed
.iter()
.map(|(path, error)| format!("{:?}: {}", path, error))
.collect::<Vec<String>>()
.join(", ")
)
}
info!(
"Finished updating {:?} ({} created, {} deleted, {} skipped, {} left unchanged). Took {}s.",
self.path,
created,
deleted,
skipped,
unchanged,
start.elapsed().as_secs()
);
Ok(all_outcomes)
}
fn process_directory_entry<D: Borrow<UpEndDatabase>>(
&self,
db: D,
resolve_cache: &Arc<Mutex<ResolveCache>>,
path: PathBuf,
existing_files: &Arc<RwLock<Vec<db::File>>>,
quick_check: bool,
) -> Result<UpdatePathOutcome> {
debug!("Processing: {:?}", path);
// Prepare the data
let existing_files = Arc::clone(existing_files);
let normalized_path = self.normalize_path(&path)?;
let normalized_path_str = normalized_path
.to_str()
.ok_or(anyhow!("Path not valid unicode!"))?;
let mut file_hash: Option<Hash> = None;
// Get size & mtime for quick comparison
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
if size == 0 {
return Ok(UpdatePathOutcome::Skipped(path));
}
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
0,
)
})
.ok();
// Check if the path entry for this file already exists in database
let existing_files_read = existing_files.read().unwrap();
let maybe_existing_file = existing_files_read
.iter()
.find(|file| file.path == normalized_path_str);
if let Some(existing_file) = maybe_existing_file {
let existing_file = existing_file.clone();
drop(existing_files_read);
if !quick_check || size == existing_file.size {
let same_mtime = mtime.is_some() && mtime == existing_file.mtime;
let mut same_hash = false;
if !quick_check || !same_mtime {
file_hash = Some(path.hash()?);
same_hash = file_hash.as_ref().unwrap() == &existing_file.hash;
}
if same_mtime || same_hash {
if mtime != existing_file.mtime {
self.file_update_mtime(existing_file.id, mtime)?;
}
if !existing_file.valid {
self.file_set_valid(existing_file.id, true)?;
}
let mut existing_files_write = existing_files.write().unwrap();
let maybe_existing_file = existing_files_write
.iter()
.enumerate()
.find(|(_, file)| file.path == normalized_path_str)
.map(|(idx, _)| idx);
if let Some(idx) = maybe_existing_file {
existing_files_write.swap_remove(idx);
debug!("Unchanged: {:?}", path);
return Ok(UpdatePathOutcome::Unchanged(path));
}
}
}
} else {
drop(existing_files_read);
}
// If not, add it!
if file_hash.is_none() {
file_hash = Some(path.hash()?);
}
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
self.insert_file_with_metadata(
&db.borrow().connection()?,
&normalized_path,
file_hash.unwrap(),
size,
mtime,
mime_type,
Some(resolve_cache),
)
.map(|_| {
info!("Added: {:?}", path);
UpdatePathOutcome::Added(path.clone())
})
}
fn add_file(&self, connection: &UpEndConnection, path: &Path, hash: Hash) -> Result<Address> {
let normalized_path = self.normalize_path(path)?;
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
0,
)
})
.ok();
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
self.insert_file_with_metadata(
connection,
&normalized_path,
hash,
size,
mtime,
mime_type,
None,
)
}
fn insert_file_with_metadata(
&self,
connection: &UpEndConnection,
normalized_path: &Path,
hash: Hash,
size: i64,
mtime: Option<NaiveDateTime>,
mime_type: Option<String>,
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
) -> Result<Address> {
let new_file = db::NewFile {
path: normalized_path
.to_str()
.ok_or(anyhow!("Path not UTF-8?!"))?
.to_string(),
hash: (hash.clone()).0,
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
mtime,
};
let blob_address = Address::Hash(hash);
// Metadata
let type_entry = Entry {
entity: blob_address.clone(),
attribute: String::from(IS_OF_TYPE_ATTR),
value: BLOB_TYPE_ADDR.clone().into(),
};
let size_entry = Entry {
entity: blob_address.clone(),
attribute: FILE_SIZE_KEY.to_string(),
value: (size as f64).into(),
};
let mime_entry = mime_type.map(|mime_type| Entry {
entity: blob_address.clone(),
attribute: FILE_MIME_KEY.to_string(),
value: mime_type.into(),
});
let added_entry = Entry {
entity: blob_address.clone(),
attribute: ADDED_ATTR.to_string(),
value: (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as f64)
.into(),
};
// Add the appropriate entries w/r/t virtual filesystem location
let components = normalized_path.components().collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let upath = UHierPath(
iter::once(UNode::new("NATIVE").unwrap())
.chain(dir_path.iter().map(|component| {
UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap()
}))
.collect(),
);
let resolved_path = match resolve_cache {
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
None => resolve_path(connection, &upath, true)?,
};
let parent_dir = resolved_path.last().unwrap();
// Insert all
let file_count = self.insert_file(new_file)?;
connection.insert_entry_immutable(type_entry)?;
connection.insert_entry_immutable(size_entry)?;
if file_count == 1 {
connection.insert_entry_immutable(added_entry)?;
}
if let Some(mime_entry) = mime_entry {
connection.insert_entry(mime_entry)?;
}
let dir_has_entry = Entry {
entity: parent_dir.clone(),
attribute: HIER_HAS_ATTR.to_string(),
value: blob_address.clone().into(),
};
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
let label_entry = Entry {
entity: blob_address.clone(),
attribute: LABEL_ATTR.to_string(),
value: filename.as_os_str().to_string_lossy().to_string().into(),
};
let label_entry_addr = connection.insert_entry(label_entry)?;
let alias_entry = Entry {
entity: dir_has_entry_addr,
attribute: ALIAS_KEY.to_string(),
value: label_entry_addr.into(),
};
connection.insert_entry(alias_entry)?;
Ok(blob_address)
}
pub fn insert_file(&self, file: db::NewFile) -> Result<u32> {
debug!(
"Inserting {} ({})...",
&file.path,
Address::Hash(Hash((&file.hash).clone()))
);
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
diesel::insert_into(files::table)
.values(&file)
.execute(&conn)?;
Ok(files::dsl::files
.filter(files::dsl::valid.eq(true))
.filter(files::dsl::hash.eq(file.hash))
.count()
.first::<i64>(&conn)?
.try_into()
.unwrap())
}
fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<db::OutFile>> {
use self::db::files::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let matches = files
.filter(valid.eq(true))
.filter(hash.eq(&obj_hash.0))
.load::<db::File>(&conn)?;
let matches = matches
.into_iter()
.map(|f| db::OutFile {
id: f.id,
hash: f.hash,
path: self.path.join(PathBuf::from(f.path)),
valid: f.valid,
added: f.added,
size: f.size,
mtime: f.mtime,
})
.collect();
Ok(matches)
}
fn retrieve_all_files(&self) -> Result<Vec<db::File>> {
use self::db::files::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let matches = files.load::<db::File>(&conn)?;
Ok(matches)
}
fn file_update_mtime(&self, file_id: i32, m_time: Option<NaiveDateTime>) -> Result<usize> {
use self::db::files::dsl::*;
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(mtime.eq(m_time))
.execute(&conn)?)
}
fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result<usize> {
use self::db::files::dsl::*;
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(valid.eq(is_valid))
.execute(&conn)?)
}
fn normalize_path(&self, path: &Path) -> Result<PathBuf> {
Ok(path
.canonicalize()?
.strip_prefix(self.path.as_path())?
.to_path_buf())
}
}
impl From<db::OutFile> for Blob {
fn from(of: db::OutFile) -> Self {
Blob { file_path: of.path }
}
}
impl From<db::File> for Blob {
fn from(f: db::File) -> Self {
Blob {
file_path: PathBuf::from(f.path),
}
}
}
impl UpStore for FsStore {
fn retrieve(&self, hash: &crate::util::hash::Hash) -> Result<Vec<Blob>, super::StoreError> {
Ok(self
.retrieve_file(hash)
.map_err(|e| StoreError::Unknown(e.to_string()))?
.into_iter()
.map(Blob::from)
.collect())
}
fn retrieve_all(&self) -> Result<Vec<Blob>, super::StoreError> {
Ok(self
.retrieve_all_files()
.map_err(|e| StoreError::Unknown(e.to_string()))?
.into_iter()
.map(Blob::from)
.collect())
}
fn store(
&self,
connection: UpEndConnection,
blob: Blob,
name_hint: Option<String>,
) -> Result<Hash, super::StoreError> {
let file_path = blob.get_file_path();
let hash = file_path
.hash()
.map_err(|e| StoreError::Unknown(e.to_string()))?;
let existing_files = self.retrieve(&hash)?;
if existing_files.is_empty() {
let address = Address::Hash(hash.clone());
let addr_str = b58_encode(
address
.encode()
.map_err(|e| StoreError::Unknown(e.to_string()))?,
);
let final_name = if let Some(name_hint) = name_hint {
format!("{addr_str}_{name_hint}")
} else {
addr_str
};
let final_path = self.path.join(&final_name);
fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
self.add_file(&connection, &final_path, hash.clone())
.map_err(|e| StoreError::Unknown(e.to_string()))?;
}
Ok(hash)
}
fn update(
&self,
db: &UpEndDatabase,
mut job_container: JobContainer,
initial: bool,
) -> Result<Vec<UpdatePathOutcome>, StoreError> {
trace!(
"Running a vault update of {:?}, initial = {}.",
self.path,
initial
);
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
match job_result {
Ok(job_handle) => {
let result = self.rescan_vault(db, job_handle, !initial, initial);
if let Err(err) = &result {
error!("Update did not succeed! {:?}", err);
}
result.map_err(|err| StoreError::Unknown(err.to_string()))
}
Err(err) => Err(StoreError::Unknown(err.to_string())),
}
}
fn stats(&self) -> std::result::Result<serde_json::Value, StoreError> {
let files = self
.retrieve_all_files()
.map_err(|e| StoreError::Unknown(e.to_string()))?;
let mut files_by_hash = std::collections::HashMap::new();
for file in &files {
if !files_by_hash.contains_key(&file.hash) {
files_by_hash.insert(&file.hash, vec![]);
}
files_by_hash.get_mut(&file.hash).unwrap().push(file);
}
for paths in files_by_hash.values_mut() {
paths.sort_unstable_by_key(|f| !f.valid);
}
let mut blobs = files_by_hash
.iter()
.map(|(hash, files)| {
json!({
"hash": hash,
"size": files[0].size,
"paths": files.iter().map(|f| json!({
"added": f.added,
"valid": f.valid,
"path": f.path
})).collect::<serde_json::Value>()
})
})
.collect::<Vec<serde_json::Value>>();
blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap());
blobs.reverse();
Ok(json!({
"totals": {
"count": files_by_hash.len(),
"size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::<u64>()
},
"blobs": blobs
}))
}
}
#[cfg(test)]
mod test {
use crate::database::UpEndDatabase;
use crate::util::jobs::JobContainer;
use super::*;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
use std::sync::Once;
use tracing_subscriber::filter::EnvFilter;
static INIT: Once = Once::new();
pub fn initialize() {
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::builder().from_env_lossy())
.init();
})
}
#[test]
fn test_update() {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir.path().join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let file_path = temp_dir.path().join("empty");
File::create(file_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, true).unwrap();
let store = FsStore::from_path(&temp_dir).unwrap();
let job_container = JobContainer::new();
// Store scan
let rescan_result = store.update(&open_result.db, job_container.clone(), false);
assert!(rescan_result.is_ok());
}
#[test]
fn test_rescan_quick() {
initialize();
_test_rescan(true)
}
#[test]
fn test_rescan_full() {
initialize();
_test_rescan(false)
}
fn _test_rescan(quick: bool) {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir.path().join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let empty_path = temp_dir.path().join("empty");
File::create(&empty_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, true).unwrap();
let store = FsStore::from_path(&temp_dir).unwrap();
let mut job_container = JobContainer::new();
// Initial scan
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(&open_result.db, job, quick, true);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result.into_iter().for_each(|outcome| match outcome {
UpdatePathOutcome::Added(_) => assert!(true),
UpdatePathOutcome::Skipped(path) => assert_eq!(path, empty_path),
outcome => panic!("Unexpected outcome: {:?}", outcome),
});
// Modification-less rescan
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(&open_result.db, job, quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result.into_iter().for_each(|outcome| {
assert!(matches!(
outcome,
UpdatePathOutcome::Unchanged(_) | UpdatePathOutcome::Skipped(_)
))
});
// Remove a file
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(&open_result.db, job, quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Skipped(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_)))
.count()
);
}
}