use std::borrow::Borrow; use std::convert::TryFrom; use std::path::{Component, Path, PathBuf}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Instant, UNIX_EPOCH}; use std::{fs, iter}; use crate::addressing::Address; use crate::database::constants::{ HIER_HAS_ATTR, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, TYPE_HAS_ATTR, }; use crate::database::entry::{Entry, EntryValue, InvariantEntry}; use crate::database::hierarchies::{resolve_path_cached, ResolveCache, UHierPath, UNode}; use crate::database::inner::models; use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR}; use crate::util::hash::{Hash, Hashable}; use crate::util::jobs::{Job, JobContainer, JobId, State}; use anyhow::{Error, Result}; use chrono::prelude::*; use log::{debug, error, info, warn}; use lru::LruCache; use rayon::prelude::*; use serde_json::Value; use walkdir::WalkDir; const BLOB_TYPE: &str = "BLOB"; const ALIAS_KEY: &str = "ALIAS"; const FILE_MIME_KEY: &str = "FILE_MIME"; const FILE_MTIME_KEY: &str = "FILE_MTIME"; const FILE_SIZE_KEY: &str = "FILE_SIZE"; lazy_static! { static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry { attribute: String::from(TYPE_BASE_ATTR), value: EntryValue::Value(Value::from(BLOB_TYPE)), }; static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); } fn initialize_types(connection: &UpEndConnection) -> Result<()> { // BLOB_TYPE connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?; upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?; upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?; Ok(()) } pub async fn rescan_vault( db: Arc, job_container: Arc>, initial: bool, ) { let job_id = job_container .write() .unwrap() .add_job(Job::new("REIMPORT", "Reimporting vault...")) .unwrap(); let job_container_rescan = job_container.clone(); let result = actix_web::web::block(move || _rescan_vault(db, job_container_rescan, job_id, initial)) .await; if result.is_err() { let err = result.err().unwrap(); error!("Update did not succeed! {:?}", err); job_container .write() .unwrap() .update_state(&job_id, State::Failed) .unwrap(); } } struct PragmaSynchronousGuard<'a>(&'a UpEndConnection); impl Drop for PragmaSynchronousGuard<'_> { fn drop(&mut self) { debug!("Re-enabling synchronous mode."); let res = self.0.execute("PRAGMA synchronous = NORMAL;"); if let Err(err) = res { error!( "Error setting synchronous mode back to NORMAL! Data loss possible! {}", err ); } } } type UpdatePathResult = Result; #[derive(Debug)] enum UpdatePathOutcome { Added(PathBuf), Unchanged(PathBuf), Removed(PathBuf), Failed(PathBuf, Error), } fn _rescan_vault>( db: D, job_container: Arc>, job_id: JobId, initial: bool, ) -> Result> { let start = Instant::now(); info!("Vault rescan started."); let db = db.borrow(); let connection = db.connection()?; // Initialize types, etc... debug!("Initializing DB types."); initialize_types(&connection)?; // Disable syncing in SQLite for the duration of the import let mut _guard: Option = None; if initial { debug!("Disabling SQLite synchronous mode"); connection.execute("PRAGMA synchronous = OFF;")?; _guard = Some(PragmaSynchronousGuard(&connection)); } // Walk through the vault, find all paths debug!("Traversing vault directory"); let absolute_dir_path = fs::canonicalize(&*db.vault_path)?; let path_entries: Vec = WalkDir::new(&*db.vault_path) .follow_links(true) .into_iter() .filter_map(|e| e.ok()) .map(|e| fs::canonicalize(e.into_path()).unwrap()) .filter(|e| e.is_file()) .filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR))) .collect(); // Prepare for processing let existing_files = Arc::new(RwLock::new(connection.retrieve_all_files()?)); // Actual processing let count = RwLock::new(0_usize); let resolve_cache = Arc::new(Mutex::new(LruCache::new(256))); let total = path_entries.len() as f32; let path_outcomes: Vec = path_entries .into_par_iter() .map(|path| { let result = _process_directory_entry( db.connection().unwrap(), &resolve_cache, path.clone(), &absolute_dir_path, &existing_files, ); let mut cnt = count.write().unwrap(); *cnt += 1; job_container .write() .unwrap() .update_progress(&job_id, *cnt as f32 / total * 100.0) .unwrap(); match result { Ok(result) => result, Err(error) => UpdatePathOutcome::Failed(path, error), } }) .collect(); debug!("Processing done, cleaning up..."); let existing_files = existing_files.read().unwrap(); let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| { let trans_result = connection.transaction::<_, Error, _>(|| { connection.file_set_valid(file.id, false)?; // remove_object(&connection, )? Ok(()) }); match trans_result { Ok(_) => { info!("Removed: {:?}", file.path); UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())) } Err(error) => UpdatePathOutcome::Failed(PathBuf::from(file.path.clone()), error), } }); // Re-enable SQLite syncing drop(_guard); // Reporting let all_outcomes = path_outcomes .into_iter() .chain(cleanup_results) .collect::>(); let mut failed: Vec<(&PathBuf, &Error)> = vec![]; let mut created = 0; let mut unchanged = 0; let mut deleted = 0; for outcome in &all_outcomes { match outcome { UpdatePathOutcome::Added(_) => created += 1, UpdatePathOutcome::Unchanged(_) => unchanged += 1, UpdatePathOutcome::Removed(_) => deleted += 1, UpdatePathOutcome::Failed(path, err) => failed.push((path, err)), } } if !failed.is_empty() { warn!( "{} path updates failed! ({})", failed.len(), failed .iter() .map(|(path, error)| format!("{:?}: {}", path, error)) .collect::>() .join(", ") ) } info!( "Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.", db.vault_path, created, deleted, unchanged, start.elapsed().as_secs() ); Ok(all_outcomes) } fn _process_directory_entry>( connection: UpEndConnection, resolve_cache: &Arc>, path: PathBuf, directory_path: &P, existing_files: &Arc>>, ) -> UpdatePathResult { debug!("Processing: {:?}", path); // Prepare the data let existing_files = Arc::clone(existing_files); let normalized_path = path.strip_prefix(&directory_path)?; let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!"); let mut file_hash: Option = None; // Get size & mtime for quick comparison let metadata = fs::metadata(&path)?; let size = metadata.len() as i64; if size < 0 { panic!("File {} too large?!", path.display()); } let mtime = metadata .modified() .map(|t| { NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0) }) .ok(); // Check if the path entry for this file already exists in database let existing_files_read = existing_files.read().unwrap(); let maybe_existing_file = existing_files_read .iter() .find(|file| file.path == normalized_path_str); if let Some(existing_file) = maybe_existing_file { let existing_file = existing_file.clone(); drop(existing_files_read); if size == existing_file.size { let same_mtime = mtime.is_some() && mtime == existing_file.mtime; let mut same_hash = false; if !same_mtime { file_hash = Some(path.hash()?); same_hash = file_hash.as_ref().unwrap() == &existing_file.hash; } if same_mtime || same_hash { if mtime != existing_file.mtime { connection.file_update_mtime(existing_file.id, mtime)?; } if !existing_file.valid { connection.file_set_valid(existing_file.id, true)?; } let mut existing_files_write = existing_files.write().unwrap(); let maybe_existing_file = existing_files_write .iter() .enumerate() .find(|(_, file)| file.path == normalized_path_str) .map(|(idx, _)| idx); if let Some(idx) = maybe_existing_file { existing_files_write.swap_remove(idx); debug!("Unchanged: {:?}", path); return Ok(UpdatePathOutcome::Unchanged(path)); } } } } else { drop(existing_files_read); } // If not, add it! if file_hash.is_none() { file_hash = Some(path.hash()?); } let file_hash = file_hash.unwrap(); let new_file = models::NewFile { path: normalized_path_str.to_string(), hash: (file_hash.clone()).0, added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), size, mtime, }; // Insert metadata let type_entry = Entry { entity: Address::Hash(file_hash.clone()), attribute: String::from(IS_OF_TYPE_ATTR), value: EntryValue::Address(BLOB_TYPE_ADDR.clone()), }; let size_entry = Entry { entity: Address::Hash(file_hash.clone()), attribute: FILE_SIZE_KEY.to_string(), value: EntryValue::Value(Value::from(size)), }; let mime_entry = tree_magic_mini::from_filepath(&path).map(|mime_type| Entry { entity: Address::Hash(file_hash.clone()), attribute: FILE_MIME_KEY.to_string(), value: EntryValue::Value(Value::String(mime_type.to_string())), }); // Finally, add the appropriate entries w/r/t virtual filesystem location let components = normalized_path.components().collect::>(); let (filename, dir_path) = components.split_last().unwrap(); let upath = UHierPath( iter::once(UNode::new("NATIVE").unwrap()) .chain(dir_path.iter().map(|component| { UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap() })) .collect(), ); let resolved_path = resolve_path_cached(&connection, &upath, true, resolve_cache)?; let parent_dir = resolved_path.last().unwrap(); connection.transaction::<_, Error, _>(|| { connection.insert_file(new_file)?; connection.insert_entry_immutable(type_entry)?; connection.insert_entry_immutable(size_entry)?; if let Some(mime_entry) = mime_entry { connection.insert_entry(mime_entry)?; } let dir_has_entry = Entry { entity: parent_dir.clone(), attribute: HIER_HAS_ATTR.to_string(), value: EntryValue::Address(Address::Hash(file_hash.clone())), }; let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?; let name_entry = Entry { entity: dir_has_entry_addr, attribute: ALIAS_KEY.to_string(), value: EntryValue::Value(Value::String( filename.as_os_str().to_string_lossy().to_string(), )), }; connection.insert_entry(name_entry)?; info!("Added: {:?}", path); Ok(UpdatePathOutcome::Added(path.clone())) }) } #[cfg(test)] mod test { use crate::database::UpEndDatabase; use crate::util; use super::*; use std::fs::File; use std::io::Write; use tempfile::TempDir; #[test] fn test_rescan() { // Prepare temporary filesystem structure let temp_dir = TempDir::new().unwrap(); let file_path = temp_dir.path().join("my-temporary-note.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); let file_path = temp_dir.path().join("hello-world.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Hello, World!").unwrap(); let file_path = temp_dir.path().join("empty"); File::create(file_path).unwrap(); // Initialize database let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap(); let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default())); let job_id = job_container .write() .unwrap() .add_job(util::jobs::Job::new("RESCAN", "TEST JOB")) .unwrap(); // Initial scan let rescan_result = _rescan_vault(&open_result.db, job_container.clone(), job_id, true); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); rescan_result .into_iter() .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_)))); // Modification-less rescan let rescan_result = _rescan_vault(&open_result.db, job_container.clone(), job_id, false); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); rescan_result .into_iter() .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_)))); // Remove a file std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap(); let rescan_result = _rescan_vault(&open_result.db, job_container, job_id, false); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); assert_eq!( 2, rescan_result .iter() .filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_))) .count() ); assert_eq!( 1, rescan_result .iter() .filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_))) .count() ); } }