use self::db::files; use super::{Blob, StoreError, UpStore, UpdatePathOutcome}; use crate::addressing::Address; use crate::database::constants::{ ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, TYPE_HAS_ATTR, }; use crate::database::entry::{Entry, InvariantEntry}; use crate::database::hierarchies::{ resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode, }; use crate::database::{ ConnectionOptions, LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR, }; use crate::util::hash::{b58_encode, Hash, Hashable}; use crate::util::jobs::{JobContainer, JobHandle}; use anyhow::{anyhow, Error, Result}; use chrono::prelude::*; use diesel::r2d2::{self, ConnectionManager, ManageConnection}; use diesel::ExpressionMethods; use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection}; use lru::LruCache; use rayon::prelude::*; use serde_json::json; use std::borrow::Borrow; use std::convert::{TryFrom, TryInto}; use std::path::PathBuf; use std::path::{Component, Path}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::{fs, iter}; use tracing::{debug, error, info, trace, warn}; use walkdir::WalkDir; mod db; const BLOB_TYPE: &str = "BLOB"; const ALIAS_KEY: &str = "ALIAS"; pub const FILE_MIME_KEY: &str = "FILE_MIME"; const FILE_MTIME_KEY: &str = "FILE_MTIME"; const FILE_SIZE_KEY: &str = "FILE_SIZE"; lazy_static! { static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry { attribute: String::from(TYPE_BASE_ATTR), value: BLOB_TYPE.into(), }; static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); } pub struct FsStore { path: PathBuf, pool: r2d2::Pool>, lock: Arc>, } impl FsStore { pub fn from_path>(path: P) -> Result { debug!("Initializing FS store."); let path = path.as_ref().to_path_buf().canonicalize()?; let store_dir = path.join(UPEND_SUBDIR); if !store_dir.exists() { fs::create_dir(&store_dir)?; } let manager = ConnectionManager::::new( store_dir.join("upend_vault.sqlite3").to_str().unwrap(), ); // while diesel doesn't support multiple embedded migrations... let connection = manager.connect()?; connection.execute( r#" CREATE TABLE IF NOT EXISTS files ( id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, hash BLOB NOT NULL, path VARCHAR NOT NULL, valid BOOLEAN NOT NULL DEFAULT TRUE, added DATETIME NOT NULL, size BIGINT NOT NULL, mtime DATETIME NULL ); CREATE INDEX IF NOT EXISTS files_hash ON files (hash); PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE); "#, )?; let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { busy_timeout: Some(Duration::from_secs(30)), enable_wal_mode: true, mutex: Arc::new(Mutex::new(())), })) .error_handler(Box::new(LoggingHandler { name: "fs_store" })) .build(manager)?; let lock = Arc::new(RwLock::new(())); debug!("FS store created."); Ok(FsStore { path, pool, lock }) } #[tracing::instrument(name = "FS store rescan", skip_all)] fn rescan_vault>( &self, db: D, job_handle: JobHandle, quick_check: bool, _disable_synchronous: bool, ) -> Result> { let start = Instant::now(); info!("Vault rescan started."); let db = db.borrow(); let upconnection = db.connection()?; // Initialize types, etc... debug!("Initializing DB types."); upconnection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; upend_insert_addr!(upconnection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?; upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?; upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?; upend_insert_val!(upconnection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?; // Walk through the vault, find all paths debug!("Traversing vault directory"); let absolute_dir_path = fs::canonicalize(&*self.path)?; let path_entries: Vec = WalkDir::new(&*self.path) .follow_links(true) .into_iter() .filter_map(|e| e.ok()) .filter_map(|e| fs::canonicalize(e.into_path()).ok()) .filter(|e| e.is_file()) .filter(|e| !e.starts_with(absolute_dir_path.join(UPEND_SUBDIR))) .collect(); // Prepare for processing let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?)); // Actual processing let count = RwLock::new(0_usize); let resolve_cache = Arc::new(Mutex::new(LruCache::new(256))); let total = path_entries.len() as f32; let shared_job_handle = Arc::new(Mutex::new(job_handle)); let path_outcomes: Vec = path_entries .into_par_iter() .map(|path| { let result = self.process_directory_entry( db, &resolve_cache, path.clone(), &existing_files, quick_check, ); let mut cnt = count.write().unwrap(); *cnt += 1; let _ = shared_job_handle .lock() .unwrap() .update_progress(*cnt as f32 / total * 100.0); match result { Ok(result) => result, Err(error) => { error!("Failed to update {:?} ({})", path, error); UpdatePathOutcome::Failed(path, StoreError::Unknown(error.to_string())) } } }) .collect(); debug!("Processing done, cleaning up..."); let existing_files =; let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| { let trans_result = upconnection.transaction::<_, Error, _>(|| { self.file_set_valid(, false)?; upconnection.remove_object(Address::from(file.clone()))?; Ok(()) }); match trans_result { Ok(_) => { info!("Removed: {:?}", file.path); UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())) } Err(error) => UpdatePathOutcome::Failed( PathBuf::from(file.path.clone()), StoreError::Unknown(error.to_string()), ), } }); // Reporting let all_outcomes = path_outcomes .into_iter() .chain(cleanup_results) .collect::>(); let mut failed: Vec<(&PathBuf, &StoreError)> = vec![]; let mut created = 0; let mut unchanged = 0; let mut skipped = 0; let mut deleted = 0; for outcome in &all_outcomes { match outcome { UpdatePathOutcome::Added(_) => created += 1, UpdatePathOutcome::Unchanged(_) => unchanged += 1, UpdatePathOutcome::Skipped(_) => skipped += 1, UpdatePathOutcome::Removed(_) => deleted += 1, UpdatePathOutcome::Failed(path, err) => failed.push((path, err)), } } if !failed.is_empty() { warn!( "{} path updates failed! ({})", failed.len(), failed .iter() .map(|(path, error)| format!("{:?}: {}", path, error)) .collect::>() .join(", ") ) } info!( "Finished updating {:?} ({} created, {} deleted, {} skipped, {} left unchanged). Took {}s.", self.path, created, deleted, skipped, unchanged, start.elapsed().as_secs() ); Ok(all_outcomes) } fn process_directory_entry>( &self, db: D, resolve_cache: &Arc>, path: PathBuf, existing_files: &Arc>>, quick_check: bool, ) -> Result { debug!("Processing: {:?}", path); // Prepare the data let existing_files = Arc::clone(existing_files); let normalized_path = self.normalize_path(&path)?; let normalized_path_str = normalized_path .to_str() .ok_or(anyhow!("Path not valid unicode!"))?; let mut file_hash: Option = None; // Get size & mtime for quick comparison let metadata = fs::metadata(&path)?; let size = metadata.len() as i64; if size < 0 { panic!("File {} too large?!", path.display()); } if size == 0 { return Ok(UpdatePathOutcome::Skipped(path)); } let mtime = metadata .modified() .map(|t| { NaiveDateTime::from_timestamp( t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0, ) }) .ok(); // Check if the path entry for this file already exists in database let existing_files_read =; let maybe_existing_file = existing_files_read .iter() .find(|file| file.path == normalized_path_str); if let Some(existing_file) = maybe_existing_file { let existing_file = existing_file.clone(); drop(existing_files_read); if !quick_check || size == existing_file.size { let same_mtime = mtime.is_some() && mtime == existing_file.mtime; let mut same_hash = false; if !quick_check || !same_mtime { file_hash = Some(path.hash()?); same_hash = file_hash.as_ref().unwrap() == &existing_file.hash; } if same_mtime || same_hash { if mtime != existing_file.mtime { self.file_update_mtime(, mtime)?; } if !existing_file.valid { self.file_set_valid(, true)?; } let mut existing_files_write = existing_files.write().unwrap(); let maybe_existing_file = existing_files_write .iter() .enumerate() .find(|(_, file)| file.path == normalized_path_str) .map(|(idx, _)| idx); if let Some(idx) = maybe_existing_file { existing_files_write.swap_remove(idx); debug!("Unchanged: {:?}", path); return Ok(UpdatePathOutcome::Unchanged(path)); } } } } else { drop(existing_files_read); } // If not, add it! if file_hash.is_none() { file_hash = Some(path.hash()?); } let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string()); self.insert_file_with_metadata( &db.borrow().connection()?, &normalized_path, file_hash.unwrap(), size, mtime, mime_type, Some(resolve_cache), ) .map(|_| { info!("Added: {:?}", path); UpdatePathOutcome::Added(path.clone()) }) } fn add_file(&self, connection: &UpEndConnection, path: &Path, hash: Hash) -> Result
{ let normalized_path = self.normalize_path(path)?; let metadata = fs::metadata(path)?; let size = metadata.len() as i64; let mtime = metadata .modified() .map(|t| { NaiveDateTime::from_timestamp( t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0, ) }) .ok(); let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string()); self.insert_file_with_metadata( connection, &normalized_path, hash, size, mtime, mime_type, None, ) } fn insert_file_with_metadata( &self, connection: &UpEndConnection, normalized_path: &Path, hash: Hash, size: i64, mtime: Option, mime_type: Option, resolve_cache: Option<&Arc>>, ) -> Result
{ let new_file = db::NewFile { path: normalized_path .to_str() .ok_or(anyhow!("Path not UTF-8?!"))? .to_string(), hash: (hash.clone()).0, added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), size, mtime, }; let blob_address = Address::Hash(hash); // Metadata let type_entry = Entry { entity: blob_address.clone(), attribute: String::from(IS_OF_TYPE_ATTR), value: BLOB_TYPE_ADDR.clone().into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), }; let size_entry = Entry { entity: blob_address.clone(), attribute: FILE_SIZE_KEY.to_string(), value: (size as f64).into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), }; let mime_entry =|mime_type| Entry { entity: blob_address.clone(), attribute: FILE_MIME_KEY.to_string(), value: mime_type.into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), }); let added_entry = Entry { entity: blob_address.clone(), attribute: ADDED_ATTR.to_string(), value: (SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() as f64) .into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), }; // Add the appropriate entries w/r/t virtual filesystem location let components = normalized_path.components().collect::>(); let (filename, dir_path) = components.split_last().unwrap(); let upath = UHierPath( iter::once(UNode::new("NATIVE").unwrap()) .chain(dir_path.iter().map(|component| { UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap() })) .collect(), ); let resolved_path = match resolve_cache { Some(cache) => resolve_path_cached(connection, &upath, true, cache)?, None => resolve_path(connection, &upath, true)?, }; let parent_dir = resolved_path.last().unwrap(); // Insert all let file_count = self.insert_file(new_file)?; connection.insert_entry_immutable(type_entry)?; connection.insert_entry_immutable(size_entry)?; if file_count == 1 { connection.insert_entry_immutable(added_entry)?; } if let Some(mime_entry) = mime_entry { connection.insert_entry(mime_entry)?; } let dir_has_entry = Entry { entity: parent_dir.clone(), attribute: HIER_HAS_ATTR.to_string(), value: blob_address.clone().into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), }; let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?; let label_entry = Entry { entity: blob_address.clone(), attribute: LABEL_ATTR.to_string(), value: filename.as_os_str().to_string_lossy().to_string().into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), }; let label_entry_addr = connection.insert_entry(label_entry)?; let alias_entry = Entry { entity: dir_has_entry_addr, attribute: ALIAS_KEY.to_string(), value: label_entry_addr.into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), }; connection.insert_entry(alias_entry)?; Ok(blob_address) } pub fn insert_file(&self, file: db::NewFile) -> Result { debug!( "Inserting {} ({})...", &file.path, Address::Hash(Hash((file.hash).clone())) ); let _lock = self.lock.write().unwrap(); let conn = self.pool.get()?; diesel::insert_into(files::table) .values(&file) .execute(&conn)?; Ok(files::dsl::files .filter(files::dsl::valid.eq(true)) .filter(files::dsl::hash.eq(file.hash)) .count() .first::(&conn)? .try_into() .unwrap()) } fn retrieve_file(&self, obj_hash: &Hash) -> Result> { use self::db::files::dsl::*; let _lock =; let conn = self.pool.get()?; let matches = files .filter(valid.eq(true)) .filter(hash.eq(&obj_hash.0)) .load::(&conn)?; let matches = matches .into_iter() .map(|f| db::OutFile { id:, hash: f.hash, path: self.path.join(PathBuf::from(f.path)), valid: f.valid, added: f.added, size: f.size, mtime: f.mtime, }) .collect(); Ok(matches) } fn retrieve_all_files(&self) -> Result> { use self::db::files::dsl::*; let _lock =; let conn = self.pool.get()?; let matches = files.load::(&conn)?; Ok(matches) } fn file_update_mtime(&self, file_id: i32, m_time: Option) -> Result { use self::db::files::dsl::*; debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); let _lock = self.lock.write().unwrap(); let conn = self.pool.get()?; Ok(diesel::update(files.filter(id.eq(file_id))) .set(mtime.eq(m_time)) .execute(&conn)?) } fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result { use self::db::files::dsl::*; debug!("Setting file ID {} to valid = {}", file_id, is_valid); let _lock = self.lock.write().unwrap(); let conn = self.pool.get()?; Ok(diesel::update(files.filter(id.eq(file_id))) .set(valid.eq(is_valid)) .execute(&conn)?) } fn normalize_path(&self, path: &Path) -> Result { Ok(path .canonicalize()? .strip_prefix(self.path.as_path())? .to_path_buf()) } } impl From for Blob { fn from(of: db::OutFile) -> Self { Blob { file_path: of.path } } } impl From for Blob { fn from(f: db::File) -> Self { Blob { file_path: PathBuf::from(f.path), } } } impl UpStore for FsStore { fn retrieve(&self, hash: &crate::util::hash::Hash) -> Result, super::StoreError> { Ok(self .retrieve_file(hash) .map_err(|e| StoreError::Unknown(e.to_string()))? .into_iter() .map(Blob::from) .collect()) } fn retrieve_all(&self) -> Result, super::StoreError> { Ok(self .retrieve_all_files() .map_err(|e| StoreError::Unknown(e.to_string()))? .into_iter() .map(Blob::from) .collect()) } fn store( &self, connection: UpEndConnection, blob: Blob, name_hint: Option, ) -> Result { let file_path = blob.get_file_path(); let hash = file_path .hash() .map_err(|e| StoreError::Unknown(e.to_string()))?; let existing_files = self.retrieve(&hash)?; if existing_files.is_empty() { let address = Address::Hash(hash.clone()); let addr_str = b58_encode( address .encode() .map_err(|e| StoreError::Unknown(e.to_string()))?, ); let final_name = if let Some(name_hint) = name_hint { format!("{addr_str}_{name_hint}") } else { addr_str }; let final_path = self.path.join(final_name); fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?; self.add_file(&connection, &final_path, hash.clone()) .map_err(|e| StoreError::Unknown(e.to_string()))?; } Ok(hash) } fn update( &self, db: &UpEndDatabase, mut job_container: JobContainer, initial: bool, ) -> Result, StoreError> { trace!( "Running a vault update of {:?}, initial = {}.", self.path, initial ); let job_result = job_container.add_job("REIMPORT", "Scaning vault directory..."); match job_result { Ok(job_handle) => { let result = self.rescan_vault(db, job_handle, !initial, initial); if let Err(err) = &result { error!("Update did not succeed! {:?}", err); } result.map_err(|err| StoreError::Unknown(err.to_string())) } Err(err) => Err(StoreError::Unknown(err.to_string())), } } fn stats(&self) -> std::result::Result { let files = self .retrieve_all_files() .map_err(|e| StoreError::Unknown(e.to_string()))?; let mut files_by_hash = std::collections::HashMap::new(); for file in &files { if !files_by_hash.contains_key(&file.hash) { files_by_hash.insert(&file.hash, vec![]); } files_by_hash.get_mut(&file.hash).unwrap().push(file); } for paths in files_by_hash.values_mut() { paths.sort_unstable_by_key(|f| !f.valid); } let mut blobs = files_by_hash .iter() .map(|(hash, files)| { json!({ "hash": hash, "size": files[0].size, "paths": files.iter().map(|f| json!({ "added": f.added, "valid": f.valid, "path": f.path })).collect::() }) }) .collect::>(); blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap()); blobs.reverse(); Ok(json!({ "totals": { "count": files_by_hash.len(), "size": files_by_hash.values().map(|f| f[0].size as u64).sum::() }, "blobs": blobs })) } } #[cfg(test)] mod test { use crate::database::UpEndDatabase; use crate::util::jobs::JobContainer; use super::*; use std::fs::File; use std::io::Write; use tempfile::TempDir; use std::sync::Once; use tracing_subscriber::filter::EnvFilter; static INIT: Once = Once::new(); pub fn initialize() { INIT.call_once(|| { tracing_subscriber::fmt() .with_env_filter(EnvFilter::builder().from_env_lossy()) .init(); }) } #[test] fn test_update() { // Prepare temporary filesystem structure let temp_dir = TempDir::new().unwrap(); let file_path = temp_dir.path().join("my-temporary-note.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); let file_path = temp_dir.path().join("hello-world.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Hello, World!").unwrap(); let file_path = temp_dir.path().join("empty"); File::create(file_path).unwrap(); // Initialize database let open_result = UpEndDatabase::open(&temp_dir, true).unwrap(); let store = FsStore::from_path(&temp_dir).unwrap(); let job_container = JobContainer::new(); // Store scan let rescan_result = store.update(&open_result.db, job_container.clone(), false); assert!(rescan_result.is_ok()); } #[test] fn test_rescan_quick() { initialize(); _test_rescan(true) } #[test] fn test_rescan_full() { initialize(); _test_rescan(false) } fn _test_rescan(quick: bool) { // Prepare temporary filesystem structure let temp_dir = TempDir::new().unwrap(); let file_path = temp_dir.path().join("my-temporary-note.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); let file_path = temp_dir.path().join("hello-world.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Hello, World!").unwrap(); let empty_path = temp_dir.path().join("empty"); File::create(&empty_path).unwrap(); // Initialize database let open_result = UpEndDatabase::open(&temp_dir, true).unwrap(); let store = FsStore::from_path(&temp_dir).unwrap(); let mut job_container = JobContainer::new(); // Initial scan let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); let rescan_result = store.rescan_vault(&open_result.db, job, quick, true); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); rescan_result.into_iter().for_each(|outcome| match outcome { UpdatePathOutcome::Added(_) => assert!(true), UpdatePathOutcome::Skipped(path) => assert_eq!(path, empty_path), outcome => panic!("Unexpected outcome: {:?}", outcome), }); // Modification-less rescan let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); let rescan_result = store.rescan_vault(&open_result.db, job, quick, false); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); rescan_result.into_iter().for_each(|outcome| { assert!(matches!( outcome, UpdatePathOutcome::Unchanged(_) | UpdatePathOutcome::Skipped(_) )) }); // Remove a file std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap(); let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); let rescan_result = store.rescan_vault(&open_result.db, job, quick, false); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); assert_eq!(rescan_result.len(), 3); assert_eq!( 1, rescan_result .iter() .filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_))) .count() ); assert_eq!( 1, rescan_result .iter() .filter(|upo| matches!(upo, UpdatePathOutcome::Skipped(_))) .count() ); assert_eq!( 1, rescan_result .iter() .filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_))) .count() ); } }