use std::convert::TryFrom; use std::path::{Component, Path, PathBuf}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Instant, UNIX_EPOCH}; use std::{fs, iter}; use crate::addressing::Address; use crate::database::constants::{ HIER_HAS_ATTR, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, TYPE_HAS_ATTR, }; use crate::database::entry::{Entry, EntryValue, InvariantEntry}; use crate::database::hierarchies::{resolve_path_cached, ResolveCache, UNode, UPath}; use crate::database::inner::models; use crate::database::{ file_set_valid, file_update_mtime, insert_entry, insert_file, retrieve_all_files, DbPool, UPEND_SUBDIR, }; use crate::util::hash::{Hash, Hashable}; use crate::util::jobs::{Job, JobContainer, JobId, State}; use anyhow::{Error, Result}; use chrono::prelude::*; use diesel::Connection; use log::{debug, error, info, warn}; use lru::LruCache; use rayon::prelude::*; use serde_json::Value; use walkdir::WalkDir; const BLOB_TYPE: &str = "BLOB"; const ALIAS_KEY: &str = "ALIAS"; const FILE_MIME_KEY: &str = "FILE_MIME"; const FILE_MTIME_KEY: &str = "FILE_MTIME"; const FILE_SIZE_KEY: &str = "FILE_SIZE"; lazy_static! { static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry { attribute: String::from(TYPE_BASE_ATTR), value: EntryValue::Value(Value::from(BLOB_TYPE)), }; static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); } fn initialize_types(pool: &DbPool) -> Result<()> { // BLOB_TYPE insert_entry(&pool.get()?, Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; upend_insert_addr!(&pool.get()?, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR); upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY); upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY); upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY); Ok(()) } pub async fn rescan_vault( pool: DbPool, directory: PathBuf, job_container: Arc>, ) { let job_id = job_container .write() .unwrap() .add_job(Job::new("REIMPORT", "Reimporting vault...")) .unwrap(); let job_container_rescan = job_container.clone(); let result = actix_web::web::block(move || _rescan_vault(pool, directory, job_container_rescan, job_id)) .await; if result.is_err() { let err = result.err().unwrap(); error!("Update did not succeed! {:?}", err); job_container .write() .unwrap() .update_state(&job_id, State::Failed) .unwrap(); } } type UpdatePathResult = Result; #[derive(Debug)] enum UpdatePathOutcome { Added(PathBuf), Unchanged(PathBuf), Removed(PathBuf), Failed(PathBuf, Error), } fn _rescan_vault>( pool: DbPool, directory: T, job_container: Arc>, job_id: JobId, ) -> Result> { let start = Instant::now(); info!("Vault rescan started."); // Initialize types, etc... debug!("Initializing DB types."); initialize_types(&pool)?; // Disable syncing in SQLite for the duration of the import debug!("Disabling SQLite synchronous mode"); pool.get()?.execute("PRAGMA synchronous = OFF;")?; // Walk through the vault, find all paths debug!("Traversing vault directory"); let absolute_dir_path = fs::canonicalize(&directory)?; let path_entries: Vec = WalkDir::new(&directory) .follow_links(true) .into_iter() .filter_map(|e| e.ok()) .map(|e| fs::canonicalize(e.into_path()).unwrap()) .filter(|e| e.is_file()) .filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR))) .collect(); // Prepare for processing let rw_pool = Arc::new(RwLock::new(pool.clone())); let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?)); // Actual processing let count = RwLock::new(0_usize); let resolve_cache = Arc::new(Mutex::new(LruCache::new(256))); let total = path_entries.len() as f32; let path_outcomes: Vec = path_entries .into_par_iter() .map(|path| { let result = _process_directory_entry( &rw_pool, &resolve_cache, path.clone(), &absolute_dir_path, &existing_files, ); let mut cnt = count.write().unwrap(); *cnt += 1; job_container .write() .unwrap() .update_progress(&job_id, *cnt as f32 / total * 100.0) .unwrap(); match result { Ok(result) => result, Err(error) => UpdatePathOutcome::Failed(path, error), } }) .collect(); let existing_files = existing_files.read().unwrap(); let connection = pool.get()?; let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| { let trans_result = connection.transaction::<_, Error, _>(|| { file_set_valid(&connection, file.id, false)?; // remove_object(&connection, )? Ok(()) }); match trans_result { Ok(_) => { info!("Removed: {:?}", file.path); UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())) } Err(error) => UpdatePathOutcome::Failed(PathBuf::from(file.path.clone()), error), } }); let mut failed: Vec<(&PathBuf, &Error)> = vec![]; let mut created = 0; let mut unchanged = 0; let mut deleted = 0; for outcome in &path_outcomes { match outcome { UpdatePathOutcome::Added(_) => created += 1, UpdatePathOutcome::Unchanged(_) => unchanged += 1, UpdatePathOutcome::Removed(_) => deleted += 1, UpdatePathOutcome::Failed(path, err) => failed.push((path, err)), } } if !failed.is_empty() { warn!( "{} path updates failed! ({})", failed.len(), failed .iter() .map(|(path, error)| format!("{:?}: {}", path, error)) .collect::>() .join(", ") ) } // Re-enable SQLite syncing debug!("Re-enabling synchronous mode."); pool.get()?.execute("PRAGMA synchronous = NORMAL;")?; info!( "Finished updating {} ({} created, {} deleted, {} left unchanged). Took {}s.", directory.as_ref().display(), created, deleted, unchanged, start.elapsed().as_secs() ); Ok(path_outcomes.into_iter().chain(cleanup_results).collect()) } fn _process_directory_entry>( db_pool: &Arc>, resolve_cache: &Arc>, path: PathBuf, directory_path: &P, existing_files: &Arc>>, ) -> UpdatePathResult { debug!("Processing: {:?}", path); // Prepare the data let connection = &db_pool.write().unwrap().get()?; let existing_files = Arc::clone(existing_files); let normalized_path = path.strip_prefix(&directory_path)?; let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!"); let mut file_hash: Option = None; // Get size & mtime for quick comparison let metadata = fs::metadata(&path)?; let size = metadata.len() as i64; if size < 0 { panic!("File {} too large?!", path.display()); } let mtime = metadata .modified() .map(|t| { NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0) }) .ok(); // Check if the path entry for this file already exists in database let existing_files_read = existing_files.read().unwrap(); let maybe_existing_file = existing_files_read .iter() .find(|file| file.path == normalized_path_str); if let Some(existing_file) = maybe_existing_file { let existing_file = existing_file.clone(); drop(existing_files_read); if size == existing_file.size { let same_mtime = mtime.is_some() && mtime == existing_file.mtime; let mut same_hash = false; if !same_mtime { file_hash = Some(path.hash()?); same_hash = file_hash.as_ref().unwrap() == &existing_file.hash; } if same_mtime || same_hash { if mtime != existing_file.mtime { file_update_mtime(connection, existing_file.id, mtime)?; } if !existing_file.valid { file_set_valid(connection, existing_file.id, true)?; } let mut existing_files_write = existing_files.write().unwrap(); let maybe_existing_file = existing_files_write .iter() .enumerate() .find(|(_, file)| file.path == normalized_path_str) .map(|(idx, _)| idx); if let Some(idx) = maybe_existing_file { existing_files_write.swap_remove(idx); debug!("Unchanged: {:?}", path); return Ok(UpdatePathOutcome::Unchanged(path)); } } } } else { drop(existing_files_read); } // If not, add it! if file_hash.is_none() { file_hash = Some(path.hash()?); } let file_hash = file_hash.unwrap(); let new_file = models::NewFile { path: normalized_path_str.to_string(), hash: (file_hash.clone()).0, added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), size, mtime, }; // Insert metadata let type_entry = Entry { entity: Address::Hash(file_hash.clone()), attribute: String::from(IS_OF_TYPE_ATTR), value: EntryValue::Address(BLOB_TYPE_ADDR.clone()), }; let size_entry = Entry { entity: Address::Hash(file_hash.clone()), attribute: FILE_SIZE_KEY.to_string(), value: EntryValue::Value(Value::from(size)), }; let mime_entry = Entry { entity: Address::Hash(file_hash.clone()), attribute: FILE_MIME_KEY.to_string(), value: EntryValue::Value(Value::String(tree_magic::from_filepath(&path))), }; // Finally, add the appropriate entries w/r/t virtual filesystem location let components = normalized_path.components().collect::>(); let (filename, dir_path) = components.split_last().unwrap(); let upath = UPath( iter::once(UNode::new("NATIVE").unwrap()) .chain(dir_path.iter().map(|component| { UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap() })) .collect(), ); let resolved_path = resolve_path_cached(connection, &upath, true, resolve_cache)?; let parent_dir = resolved_path.last().unwrap(); connection.transaction::<_, Error, _>(|| { insert_file(connection, new_file)?; insert_entry(connection, type_entry)?; insert_entry(connection, size_entry)?; insert_entry(connection, mime_entry)?; let dir_has_entry = Entry { entity: parent_dir.clone(), attribute: HIER_HAS_ATTR.to_string(), value: EntryValue::Address(Address::Hash(file_hash.clone())), }; let dir_has_entry_addr = insert_entry(connection, dir_has_entry)?; let name_entry = Entry { entity: dir_has_entry_addr, attribute: ALIAS_KEY.to_string(), value: EntryValue::Value(Value::String( filename.as_os_str().to_string_lossy().to_string(), )), }; insert_entry(connection, name_entry)?; info!("Added: {:?}", path); Ok(UpdatePathOutcome::Added(path.clone())) }) } #[cfg(test)] mod test { use crate::database::open_upend; use crate::util; use super::*; use std::fs::File; use std::io::Write; use tempdir::TempDir; #[test] fn test_rescan() { // Prepare temporary filesystem structure let temp_dir = TempDir::new("upend-test").unwrap(); let file_path = temp_dir.path().join("my-temporary-note.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); let file_path = temp_dir.path().join("hello-world.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Hello, World!").unwrap(); let file_path = temp_dir.path().join("empty"); File::create(file_path).unwrap(); // Initialize database let open_result = open_upend(&temp_dir, None, true).unwrap(); let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default())); let job_id = job_container .write() .unwrap() .add_job(util::jobs::Job::new("RESCAN", "TEST JOB")) .unwrap(); // Initial scan let rescan_result = _rescan_vault( open_result.pool.clone(), temp_dir.as_ref().to_path_buf(), job_container.clone(), job_id, ); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); rescan_result .into_iter() .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_)))); // Modification-less rescan let rescan_result = _rescan_vault( open_result.pool.clone(), temp_dir.as_ref().to_path_buf(), job_container.clone(), job_id, ); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); rescan_result .into_iter() .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_)))); // Remove a file std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap(); let rescan_result = _rescan_vault( open_result.pool, temp_dir.as_ref().to_path_buf(), job_container, job_id, ); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); assert_eq!( 2, rescan_result .iter() .filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_))) .count() ); assert_eq!( 1, rescan_result .iter() .filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_))) .count() ); } }