use std::convert::TryFrom; use std::path::{Component, Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::{Instant, UNIX_EPOCH}; use std::{fs, iter}; use crate::addressing::Address; use crate::database::constants::{ HIER_HAS_ATTR, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_ID_ATTR, TYPE_INSTANCES_ATTR, TYPE_IS_ATTR, TYPE_REQUIRES_ATTR, }; use crate::database::entry::{Entry, EntryValue, InvariantEntry}; use crate::database::hierarchies::{resolve_path, UNode, UPath}; use crate::database::inner::models; use crate::database::{ file_set_valid, insert_entry, insert_file, retrieve_all_files, DbPool, DATABASE_FILENAME, }; use crate::util::hash::Hashable; use crate::util::jobs::{Job, JobContainer, JobId, State}; use anyhow::{anyhow, Error, Result}; use chrono::prelude::*; use diesel::Connection; use log::{error, info, warn}; use once_cell::unsync::Lazy; use rayon::prelude::*; use serde_json::Value; use uuid::Uuid; use walkdir::WalkDir; const DIR_TYPE: &str = "FS_DIR"; const DIR_KEY: &str = "DIR"; lazy_static! { static ref DIR_TYPE_INVARIANT: InvariantEntry = InvariantEntry { attribute: String::from(TYPE_IS_ATTR), value: EntryValue::Value(Value::from(DIR_TYPE)), }; static ref DIR_TYPE_ADDR: Address = DIR_TYPE_INVARIANT.entity().unwrap(); } const BLOB_TYPE: &str = "BLOB"; const FILE_TYPE: &str = "FS_FILE"; const FILE_IDENTITY_KEY: &str = "FILE_IS"; const FILENAME_KEY: &str = "FILE_NAME"; const FILE_MIME_KEY: &str = "FILE_MIME"; const FILE_MTIME_KEY: &str = "FILE_MTIME"; const FILE_SIZE_KEY: &str = "FILE_SIZE"; lazy_static! { static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry { attribute: String::from(TYPE_IS_ATTR), value: EntryValue::Value(Value::from(BLOB_TYPE)), }; static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); static ref FILE_TYPE_INVARIANT: InvariantEntry = InvariantEntry { attribute: String::from(TYPE_IS_ATTR), value: EntryValue::Value(Value::from(FILE_TYPE)), }; static ref FILE_TYPE_ADDR: Address = FILE_TYPE_INVARIANT.entity().unwrap(); } fn initialize_types(pool: &DbPool) -> Result<()> { // BLOB_TYPE insert_entry(&pool.get()?, Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; upend_insert_addr!(&pool.get()?, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR); upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY); upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY); upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY); // FILE_TYPE insert_entry(&pool.get()?, Entry::try_from(&*FILE_TYPE_INVARIANT)?)?; upend_insert_addr!(&pool.get()?, FILE_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR); upend_insert_val!( &pool.get()?, FILE_TYPE_ADDR, TYPE_INSTANCES_ATTR, FILE_IDENTITY_KEY ); upend_insert_val!(&pool.get()?, FILE_TYPE_ADDR, TYPE_ID_ATTR, FILENAME_KEY); upend_insert_val!( &pool.get()?, FILE_TYPE_ADDR, TYPE_REQUIRES_ATTR, FILE_IDENTITY_KEY ); upend_insert_val!(&pool.get()?, FILE_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY); // DIR_TYPE insert_entry(&pool.get()?, Entry::try_from(&*DIR_TYPE_INVARIANT)?)?; upend_insert_addr!(&pool.get()?, DIR_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR); upend_insert_val!(&pool.get()?, DIR_TYPE_ADDR, TYPE_ID_ATTR, DIR_KEY); upend_insert_val!(&pool.get()?, DIR_TYPE_ADDR, TYPE_HAS_ATTR, HIER_HAS_ATTR); Ok(()) } pub async fn rescan_vault( pool: DbPool, directory: PathBuf, job_container: Arc>, ) { let job_id = job_container .write() .unwrap() .add_job(Job::new("REIMPORT", "Reimporting vault...")) .unwrap(); let job_container_rescan = job_container.clone(); let result = actix_web::web::block(move || _rescan_vault(pool, directory, job_container_rescan, job_id)) .await; if result.is_err() { let err = result.err().unwrap(); error!("Update did not succeed! {:?}", err); job_container .write() .unwrap() .update_state(&job_id, State::Failed) .unwrap(); } } type UpdatePathResult = Result; enum UpdatePathOutcome { Added(PathBuf), Unchanged(PathBuf), Removed(PathBuf), } fn _rescan_vault>( pool: DbPool, directory: T, job_container: Arc>, job_id: JobId, ) -> Result> { let start = Instant::now(); // Initialize types, etc... initialize_types(&pool)?; // Walk through the vault, find all paths let path_entries: Vec = WalkDir::new(&directory) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.path().is_file() && e.file_name() != DATABASE_FILENAME) .map(|e| fs::canonicalize(e.into_path()).unwrap()) .collect(); // Prepare for processing let rw_pool = Arc::new(RwLock::new(pool.clone())); let absolute_path = fs::canonicalize(&directory)?; let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?)); // Actual processing let count = RwLock::new(0_usize); let total = path_entries.len() as f32; let path_results: Vec = path_entries .into_par_iter() .map(|path| { let result = _process_directory_entry(&rw_pool, path, &absolute_path, &existing_files)?; let mut cnt = count.write().unwrap(); *cnt += 1; job_container .write() .unwrap() .update_progress(&job_id, *cnt as f32 / total * 100.0) .unwrap(); Ok(result) }) .collect(); let cleanup_results: Vec = existing_files .write() .unwrap() .iter() .filter(|f| f.valid) .map(|file| { let connection = pool.get()?; connection.transaction::<_, Error, _>(|| { file_set_valid(&connection, file.id, false)?; // remove_object(&connection, )? Ok(UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))) }) }) .collect(); let mut failed: Vec<&Error> = vec![]; let mut created = 0; let mut unchanged = 0; let mut deleted = 0; for result in &path_results { match result { Ok(result) => match result { UpdatePathOutcome::Added(_) => created += 1, UpdatePathOutcome::Unchanged(_) => unchanged += 1, UpdatePathOutcome::Removed(_) => deleted += 1, }, Err(err) => failed.push(err), } } if !failed.is_empty() { warn!( "{} path updates failed! ({})", failed.len(), failed .iter() .map(|e| e.to_string()) .collect::>() .join(", ") ) } info!( "Finished updating {} ({} created, {} deleted, {} left unchanged). Took {}s.", directory.as_ref().display(), created, deleted, unchanged, start.elapsed().as_secs() ); Ok(path_results .into_iter() .chain(cleanup_results.into_iter()) .collect()) } fn _process_directory_entry>( db_pool: &Arc>, path: PathBuf, directory_path: &P, existing_files: &Arc>>, ) -> UpdatePathResult { info!("Processing: {:?}", path); // Prepare the data let db_pool = Arc::clone(&db_pool); let existing_files = Arc::clone(&existing_files); let normalized_path = path.strip_prefix(&directory_path)?; let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!"); let digest = Lazy::new(|| path.hash()); // Get size & mtime for quick comparison let metadata = fs::metadata(&path)?; let size = metadata.len() as i64; if size < 0 { panic!("File {} too large?!", path.display()); } let mtime = metadata .modified() .map(|t| { NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0) }) .ok(); // Check if the path entry for this file already exists in database { // Only grab existing_files for the duration of this block let mut existing_files = existing_files.write().unwrap(); let maybe_existing_file = existing_files .iter() .enumerate() .find(|(_, file)| file.path == normalized_path_str); if let Some((idx, existing_file)) = maybe_existing_file { if (size == existing_file.size && mtime == existing_file.mtime) || ((*digest).is_ok() && &existing_file.hash == (*digest).as_ref().unwrap()) { if !existing_file.valid { file_set_valid(&db_pool.write().unwrap().get()?, existing_file.id, true)?; } existing_files.swap_remove(idx); return Ok(UpdatePathOutcome::Unchanged(path)); } } } // If not, add it! if let Err(err) = &*digest { return Err(anyhow!(format!("Error hashing: {}", err))); } let digest = (*digest).as_ref().unwrap().clone(); let new_file = models::NewFile { path: normalized_path_str.to_string(), hash: (digest.clone()).0, added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), size, mtime, }; insert_file(&db_pool.write().unwrap().get()?, new_file)?; // Insert metadata let type_entry = Entry { entity: Address::Hash(digest.clone()), attribute: String::from(IS_OF_TYPE_ATTR), value: EntryValue::Address(BLOB_TYPE_ADDR.clone()), }; insert_entry(&db_pool.write().unwrap().get()?, type_entry)?; let size_entry = Entry { entity: Address::Hash(digest.clone()), attribute: FILE_SIZE_KEY.to_string(), value: EntryValue::Value(Value::from(size)), }; insert_entry(&db_pool.write().unwrap().get()?, size_entry)?; if let Some(mtime) = mtime { let mtime_entry = Entry { entity: Address::Hash(digest.clone()), attribute: FILE_MTIME_KEY.to_string(), value: EntryValue::Value(Value::from(mtime.timestamp())), }; insert_entry(&db_pool.write().unwrap().get()?, mtime_entry)?; } let mime_entry = Entry { entity: Address::Hash(digest.clone()), attribute: FILE_MIME_KEY.to_string(), value: EntryValue::Value(Value::String(tree_magic::from_filepath(&path))), }; insert_entry(&db_pool.write().unwrap().get()?, mime_entry)?; // Finally, add the appropriate entries w/r/t virtual filesystem location let components = normalized_path.components().collect::>(); let (filename, dir_path) = components.split_last().unwrap(); let upath = UPath( iter::once(UNode::new("NATIVE").unwrap()) .chain(dir_path.iter().map(|component| { UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap() })) .collect(), ); let resolved_path = resolve_path(&db_pool.write().unwrap().get()?, &upath, true)?; let parent_dir = resolved_path.last().unwrap(); let _pool = &db_pool.write().unwrap(); let connection = _pool.get()?; connection.transaction::<_, Error, _>(|| { let file_address = Address::Uuid(Uuid::new_v4()); let type_entry = Entry { entity: file_address.clone(), attribute: String::from(IS_OF_TYPE_ATTR), value: EntryValue::Address(FILE_TYPE_ADDR.clone()), }; insert_entry(&connection, type_entry)?; let name_entry = Entry { entity: file_address.clone(), attribute: FILENAME_KEY.to_string(), value: EntryValue::Value(Value::String( filename.as_os_str().to_string_lossy().to_string(), )), }; insert_entry(&connection, name_entry)?; let identity_entry = Entry { entity: file_address.clone(), attribute: FILE_IDENTITY_KEY.to_string(), value: EntryValue::Address(Address::Hash(digest.clone())), }; insert_entry(&connection, identity_entry)?; let dir_has_entry = Entry { entity: parent_dir.clone(), attribute: HIER_HAS_ATTR.to_string(), value: EntryValue::Address(file_address), }; insert_entry(&connection, dir_has_entry)?; Ok(UpdatePathOutcome::Added(path.clone())) }) }