From 7c9d0717c251fa0c42094ac7d4b27cc4e833e876 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Fri, 19 Aug 2022 14:04:18 +0200 Subject: [PATCH 1/6] feat!: multiple vaults incomplete, but passes tests --- .../2020-08-04-134952_file_hashes/down.sql | 2 - .../2020-08-04-134952_file_hashes/up.sql | 5 - migrations/fsvault/00_initial/down.sql | 1 + migrations/fsvault/00_initial/up.sql | 13 + .../down.sql | 1 - .../up.sql | 14 - src/database/inner/models.rs | 40 +- src/database/inner/schema.rs | 15 +- src/database/mod.rs | 102 +-- src/database/stores/fs/db.rs | 51 ++ src/database/stores/fs/mod.rs | 803 ++++++++++++++++++ src/database/stores/mod.rs | 64 ++ src/extractors/audio.rs | 13 +- src/extractors/mod.rs | 20 +- src/extractors/photo.rs | 10 +- src/extractors/web.rs | 6 +- src/filesystem.rs | 573 ------------- src/main.rs | 43 +- src/previews/mod.rs | 24 +- src/routes.rs | 176 ++-- 20 files changed, 1074 insertions(+), 902 deletions(-) delete mode 100644 migrations/config/2020-08-04-134952_file_hashes/down.sql delete mode 100644 migrations/config/2020-08-04-134952_file_hashes/up.sql create mode 100644 migrations/fsvault/00_initial/down.sql create mode 100644 migrations/fsvault/00_initial/up.sql rename migrations/upend/{00_initial_structure => 00_initial}/down.sql (64%) rename migrations/upend/{00_initial_structure => 00_initial}/up.sql (60%) create mode 100644 src/database/stores/fs/db.rs create mode 100644 src/database/stores/fs/mod.rs create mode 100644 src/database/stores/mod.rs delete mode 100644 src/filesystem.rs diff --git a/migrations/config/2020-08-04-134952_file_hashes/down.sql b/migrations/config/2020-08-04-134952_file_hashes/down.sql deleted file mode 100644 index d05f373..0000000 --- a/migrations/config/2020-08-04-134952_file_hashes/down.sql +++ /dev/null @@ -1,2 +0,0 @@ --- This file should undo anything in `up.sql` -DROP TABLE vaults; \ No newline at end of file diff --git a/migrations/config/2020-08-04-134952_file_hashes/up.sql b/migrations/config/2020-08-04-134952_file_hashes/up.sql deleted file mode 100644 index a9b6c46..0000000 --- a/migrations/config/2020-08-04-134952_file_hashes/up.sql +++ /dev/null @@ -1,5 +0,0 @@ --- Your SQL goes here -CREATE TABLE vaults ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - path VARCHAR NOT NULL -) \ No newline at end of file diff --git a/migrations/fsvault/00_initial/down.sql b/migrations/fsvault/00_initial/down.sql new file mode 100644 index 0000000..2a94593 --- /dev/null +++ b/migrations/fsvault/00_initial/down.sql @@ -0,0 +1 @@ +DROP TABLE files; \ No newline at end of file diff --git a/migrations/fsvault/00_initial/up.sql b/migrations/fsvault/00_initial/up.sql new file mode 100644 index 0000000..773167d --- /dev/null +++ b/migrations/fsvault/00_initial/up.sql @@ -0,0 +1,13 @@ +CREATE TABLE files +( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + hash BLOB NOT NULL, + path VARCHAR NOT NULL, + valid BOOLEAN NOT NULL DEFAULT TRUE, + added DATETIME NOT NULL, + size BIGINT NOT NULL, + mtime DATETIME NULL +); + +CREATE INDEX files_hash ON files (hash); +CREATE INDEX files_valid ON files (valid); \ No newline at end of file diff --git a/migrations/upend/00_initial_structure/down.sql b/migrations/upend/00_initial/down.sql similarity index 64% rename from migrations/upend/00_initial_structure/down.sql rename to migrations/upend/00_initial/down.sql index 0f8e6bf..7ef2077 100644 --- a/migrations/upend/00_initial_structure/down.sql +++ b/migrations/upend/00_initial/down.sql @@ -1,4 +1,3 @@ -- This file should undo anything in `up.sql` DROP TABLE meta; -DROP TABLE files; DROP TABLE data; \ No newline at end of file diff --git a/migrations/upend/00_initial_structure/up.sql b/migrations/upend/00_initial/up.sql similarity index 60% rename from migrations/upend/00_initial_structure/up.sql rename to migrations/upend/00_initial/up.sql index db897cf..b8f60e6 100644 --- a/migrations/upend/00_initial_structure/up.sql +++ b/migrations/upend/00_initial/up.sql @@ -9,20 +9,6 @@ CREATE TABLE meta INSERT INTO meta (key, value) VALUES ('VERSION', '0'); -CREATE TABLE files -( - id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, - hash BLOB NOT NULL, - path VARCHAR NOT NULL, - valid BOOLEAN NOT NULL DEFAULT TRUE, - added DATETIME NOT NULL, - size BIGINT NOT NULL, - mtime DATETIME NULL -); - -CREATE INDEX files_hash ON files (hash); -CREATE INDEX files_valid ON files (valid); - CREATE TABLE data ( identity BLOB PRIMARY KEY NOT NULL, diff --git a/src/database/inner/models.rs b/src/database/inner/models.rs index 1da0983..aea3382 100644 --- a/src/database/inner/models.rs +++ b/src/database/inner/models.rs @@ -1,43 +1,5 @@ -use std::path::PathBuf; - -use chrono::NaiveDateTime; use serde::Serialize; - -use super::schema::{data, files, meta}; -use crate::util::hash::Hash; - -#[derive(Queryable, Serialize, Clone, Debug)] -pub struct File { - pub id: i32, - pub hash: Hash, - pub path: String, - pub valid: bool, - pub added: NaiveDateTime, - pub size: i64, - pub mtime: Option, -} - -// todo - remove, try_from the actual model, impl queryable... -#[derive(Serialize, Clone, Debug)] -pub struct OutFile { - pub id: i32, - pub hash: Hash, - pub path: PathBuf, - pub valid: bool, - pub added: NaiveDateTime, - pub size: i64, - pub mtime: Option, -} - -#[derive(Insertable, Debug)] -#[table_name = "files"] -pub struct NewFile { - pub hash: Vec, - pub path: String, - pub added: NaiveDateTime, - pub size: i64, - pub mtime: Option, -} +use super::schema::{data, meta}; #[derive(Queryable, Insertable, Serialize, Debug)] #[table_name = "data"] diff --git a/src/database/inner/schema.rs b/src/database/inner/schema.rs index 7221c31..ba50579 100644 --- a/src/database/inner/schema.rs +++ b/src/database/inner/schema.rs @@ -9,19 +9,6 @@ table! { immutable -> Bool, } } - -table! { - files (id) { - id -> Integer, - hash -> Binary, - path -> Text, - valid -> Bool, - added -> Timestamp, - size -> BigInt, - mtime -> Nullable, - } -} - table! { meta (id) { id -> Integer, @@ -30,4 +17,4 @@ table! { } } -allow_tables_to_appear_in_same_query!(data, files, meta,); +allow_tables_to_appear_in_same_query!(data, meta,); diff --git a/src/database/mod.rs b/src/database/mod.rs index cebdc66..c388028 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -3,6 +3,8 @@ #[macro_use] mod macros; +pub mod stores; + pub mod constants; pub mod engine; pub mod entry; @@ -22,13 +24,12 @@ use crate::database::lang::Query; use crate::util::hash::Hash; use crate::util::LoggerSink; use anyhow::{anyhow, Result}; -use chrono::NaiveDateTime; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager}; use diesel::result::{DatabaseErrorKind, Error}; use diesel::sqlite::SqliteConnection; use hierarchies::initialize_hier; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use std::fs; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; @@ -78,8 +79,8 @@ pub struct OpenResult { pub struct UpEndDatabase { pool: Arc, lock: Arc>, - pub vault_path: Arc, - pub db_path: Arc, + vault_path: Arc, + db_path: Arc, } pub const UPEND_SUBDIR: &str = ".upend"; @@ -217,99 +218,6 @@ impl UpEndConnection { .map(|mv| mv.value.clone()) } - pub fn insert_file(&self, file: models::NewFile) -> Result { - use crate::database::inner::schema::files; - - debug!( - "Inserting {} ({})...", - &file.path, - Address::Hash(Hash((&file.hash).clone())) - ); - - let _lock = self.lock.write().unwrap(); - let conn = self.pool.get()?; - - diesel::insert_into(files::table) - .values(&file) - .execute(&conn)?; - - Ok(files::dsl::files - .filter(files::dsl::valid.eq(true)) - .filter(files::dsl::hash.eq(file.hash)) - .count() - .first::(&conn)? - .try_into() - .unwrap()) - } - - pub fn retrieve_file(&self, obj_hash: &Hash) -> Result> { - use crate::database::inner::schema::files::dsl::*; - - let _lock = self.lock.read().unwrap(); - let conn = self.pool.get()?; - - let matches = files - .filter(valid.eq(true)) - .filter(hash.eq(&obj_hash.0)) - .load::(&conn)?; - - let matches = matches - .into_iter() - .map(|f| models::OutFile { - id: f.id, - hash: f.hash, - path: self.vault_path.join(PathBuf::from(f.path)), - valid: f.valid, - added: f.added, - size: f.size, - mtime: f.mtime, - }) - .collect(); - - Ok(matches) - } - - pub fn retrieve_all_files(&self) -> Result> { - use crate::database::inner::schema::files::dsl::*; - let _lock = self.lock.read().unwrap(); - let conn = self.pool.get()?; - let matches = files.load::(&conn)?; - Ok(matches) - } - - pub fn file_update_mtime(&self, file_id: i32, m_time: Option) -> Result { - use crate::database::inner::schema::files::dsl::*; - - debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); - - let _lock = self.lock.write().unwrap(); - let conn = self.pool.get()?; - - Ok(diesel::update(files.filter(id.eq(file_id))) - .set(mtime.eq(m_time)) - .execute(&conn)?) - } - - pub fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result { - use crate::database::inner::schema::files::dsl::*; - - debug!("Setting file ID {} to valid = {}", file_id, is_valid); - - let _lock = self.lock.write().unwrap(); - let conn = self.pool.get()?; - - Ok(diesel::update(files.filter(id.eq(file_id))) - .set(valid.eq(is_valid)) - .execute(&conn)?) - } - - pub fn normalize_path(&self, path: &Path) -> Result { - Ok(path - .canonicalize()? - .strip_prefix(self.vault_path.as_path())? - .to_path_buf()) - } - pub fn retrieve_entry(&self, hash: &Hash) -> Result> { use crate::database::inner::schema::data::dsl::*; diff --git a/src/database/stores/fs/db.rs b/src/database/stores/fs/db.rs new file mode 100644 index 0000000..1447092 --- /dev/null +++ b/src/database/stores/fs/db.rs @@ -0,0 +1,51 @@ +use std::path::PathBuf; + +use crate::util::hash::Hash; +use chrono::NaiveDateTime; +use diesel::Queryable; +use serde::Serialize; + +table! { + files (id) { + id -> Integer, + hash -> Binary, + path -> Text, + valid -> Bool, + added -> Timestamp, + size -> BigInt, + mtime -> Nullable, + } +} + +#[derive(Queryable, Serialize, Clone, Debug)] +pub struct File { + pub id: i32, + pub hash: Hash, + pub path: String, + pub valid: bool, + pub added: NaiveDateTime, + pub size: i64, + pub mtime: Option, +} + +// todo - remove, try_from the actual model, impl queryable... +#[derive(Serialize, Clone, Debug)] +pub struct OutFile { + pub id: i32, + pub hash: Hash, + pub path: PathBuf, + pub valid: bool, + pub added: NaiveDateTime, + pub size: i64, + pub mtime: Option, +} + +#[derive(Insertable, Debug)] +#[table_name = "files"] +pub struct NewFile { + pub hash: Vec, + pub path: String, + pub added: NaiveDateTime, + pub size: i64, + pub mtime: Option, +} diff --git a/src/database/stores/fs/mod.rs b/src/database/stores/fs/mod.rs new file mode 100644 index 0000000..1f16305 --- /dev/null +++ b/src/database/stores/fs/mod.rs @@ -0,0 +1,803 @@ +use self::db::files; + +use super::{Blob, StoreError, UpStore, UpdatePathOutcome}; +use crate::addressing::Address; +use crate::database::constants::{ + ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, + TYPE_HAS_ATTR, +}; +use crate::database::entry::{Entry, InvariantEntry}; +use crate::database::hierarchies::{ + resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode, +}; +use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR}; +use crate::util::hash::{b58_encode, Hash, Hashable}; +use crate::util::jobs::{JobContainer, JobHandle}; +use anyhow::{anyhow, Error, Result}; +use chrono::prelude::*; +use diesel::r2d2::{ConnectionManager, ManageConnection}; +use diesel::ExpressionMethods; +use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection}; +use log::{debug, error, info, warn}; +use lru::LruCache; +use rayon::prelude::*; +use std::borrow::Borrow; +use std::convert::{TryFrom, TryInto}; +use std::path::PathBuf; +use std::path::{Component, Path}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use std::{fs, iter}; +use walkdir::WalkDir; + +mod db; + +const BLOB_TYPE: &str = "BLOB"; +const ALIAS_KEY: &str = "ALIAS"; +pub const FILE_MIME_KEY: &str = "FILE_MIME"; +const FILE_MTIME_KEY: &str = "FILE_MTIME"; +const FILE_SIZE_KEY: &str = "FILE_SIZE"; + +lazy_static! { + static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry { + attribute: String::from(TYPE_BASE_ATTR), + value: BLOB_TYPE.into(), + }; + static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); +} + +struct PragmaSynchronousGuard<'a>(&'a UpEndConnection); + +impl Drop for PragmaSynchronousGuard<'_> { + fn drop(&mut self) { + debug!("Re-enabling synchronous mode."); + let res = self.0.execute("PRAGMA synchronous = NORMAL;"); + if let Err(err) = res { + error!( + "Error setting synchronous mode back to NORMAL! Data loss possible! {}", + err + ); + } + } +} + +pub struct FsStore { + path: PathBuf, + manager: ConnectionManager, +} + +impl FsStore { + pub fn from_path>(path: P) -> Result { + let path = path.as_ref().to_path_buf().canonicalize()?; + let manager = ConnectionManager::::new( + path.join(UPEND_SUBDIR) + .join("upend_vault.sqlite3") + .to_str() + .unwrap(), + ); + let connection = manager.connect()?; + + // while diesel doesn't support multiple embedded migrations... + connection.execute( + r#" + CREATE TABLE IF NOT EXISTS files + ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + hash BLOB NOT NULL, + path VARCHAR NOT NULL, + valid BOOLEAN NOT NULL DEFAULT TRUE, + added DATETIME NOT NULL, + size BIGINT NOT NULL, + mtime DATETIME NULL + ); + + CREATE INDEX IF NOT EXISTS files_hash ON files (hash); + CREATE INDEX IF NOT EXISTS files_valid ON files (valid); + + "#, + )?; + + Ok(FsStore { path, manager }) + } + + fn rescan_vault>( + &self, + db: D, + job_handle: JobHandle, + quick_check: bool, + disable_synchronous: bool, + ) -> Result> { + let start = Instant::now(); + info!("Vault rescan started."); + + let db = db.borrow(); + let connection = db.connection()?; + + // Initialize types, etc... + debug!("Initializing DB types."); + connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; + upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; + upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?; + upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?; + upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?; + upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?; + + // Disable syncing in SQLite for the duration of the import + let mut _guard: Option = None; + if disable_synchronous { + debug!("Disabling SQLite synchronous mode"); + connection.execute("PRAGMA synchronous = OFF;")?; + _guard = Some(PragmaSynchronousGuard(&connection)); + } + + // Walk through the vault, find all paths + debug!("Traversing vault directory"); + let absolute_dir_path = fs::canonicalize(&*db.vault_path)?; + let path_entries: Vec = WalkDir::new(&*db.vault_path) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()) + .filter_map(|e| fs::canonicalize(e.into_path()).ok()) + .filter(|e| e.is_file()) + .filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR))) + .collect(); + + // Prepare for processing + let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?)); + + // Actual processing + let count = RwLock::new(0_usize); + let resolve_cache = Arc::new(Mutex::new(LruCache::new(256))); + let total = path_entries.len() as f32; + let shared_job_handle = Arc::new(Mutex::new(job_handle)); + let path_outcomes: Vec = path_entries + .into_par_iter() + .map(|path| { + let result = self.process_directory_entry( + db.connection().unwrap(), + &resolve_cache, + path.clone(), + &existing_files, + quick_check, + ); + + let mut cnt = count.write().unwrap(); + *cnt += 1; + + let _ = shared_job_handle + .lock() + .unwrap() + .update_progress(*cnt as f32 / total * 100.0); + + match result { + Ok(result) => result, + Err(error) => { + error!("Failed to update {:?} ({})", path, error); + UpdatePathOutcome::Failed(path, StoreError::Unknown(error.to_string())) + } + } + }) + .collect(); + + debug!("Processing done, cleaning up..."); + + let existing_files = existing_files.read().unwrap(); + + let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| { + let trans_result = connection.transaction::<_, Error, _>(|| { + self.file_set_valid(file.id, false)?; + // remove_object(&connection, )? + Ok(()) + }); + + match trans_result { + Ok(_) => { + info!("Removed: {:?}", file.path); + UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())) + } + Err(error) => UpdatePathOutcome::Failed( + PathBuf::from(file.path.clone()), + StoreError::Unknown(error.to_string()), + ), + } + }); + + // Re-enable SQLite syncing + drop(_guard); + + // Reporting + let all_outcomes = path_outcomes + .into_iter() + .chain(cleanup_results) + .collect::>(); + + let mut failed: Vec<(&PathBuf, &StoreError)> = vec![]; + let mut created = 0; + let mut unchanged = 0; + let mut deleted = 0; + + for outcome in &all_outcomes { + match outcome { + UpdatePathOutcome::Added(_) => created += 1, + UpdatePathOutcome::Unchanged(_) => unchanged += 1, + UpdatePathOutcome::Removed(_) => deleted += 1, + UpdatePathOutcome::Failed(path, err) => failed.push((path, err)), + } + } + + if !failed.is_empty() { + warn!( + "{} path updates failed! ({})", + failed.len(), + failed + .iter() + .map(|(path, error)| format!("{:?}: {}", path, error)) + .collect::>() + .join(", ") + ) + } + + info!( + "Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.", + db.vault_path, + created, + deleted, + unchanged, + start.elapsed().as_secs() + ); + + Ok(all_outcomes) + } + + fn process_directory_entry( + &self, + connection: UpEndConnection, + resolve_cache: &Arc>, + path: PathBuf, + existing_files: &Arc>>, + quick_check: bool, + ) -> Result { + debug!("Processing: {:?}", path); + + // Prepare the data + let existing_files = Arc::clone(existing_files); + + let normalized_path = self.normalize_path(&path)?; + let normalized_path_str = normalized_path + .to_str() + .ok_or(anyhow!("Path not valid unicode!"))?; + + let mut file_hash: Option = None; + + // Get size & mtime for quick comparison + let metadata = fs::metadata(&path)?; + let size = metadata.len() as i64; + if size < 0 { + panic!("File {} too large?!", path.display()); + } + let mtime = metadata + .modified() + .map(|t| { + NaiveDateTime::from_timestamp( + t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, + 0, + ) + }) + .ok(); + + // Check if the path entry for this file already exists in database + let existing_files_read = existing_files.read().unwrap(); + let maybe_existing_file = existing_files_read + .iter() + .find(|file| file.path == normalized_path_str); + + if let Some(existing_file) = maybe_existing_file { + let existing_file = existing_file.clone(); + drop(existing_files_read); + + if !quick_check || size == existing_file.size { + let same_mtime = mtime.is_some() && mtime == existing_file.mtime; + let mut same_hash = false; + + if !quick_check || !same_mtime { + file_hash = Some(path.hash()?); + same_hash = file_hash.as_ref().unwrap() == &existing_file.hash; + } + + if same_mtime || same_hash { + if mtime != existing_file.mtime { + self.file_update_mtime(existing_file.id, mtime)?; + } + + if !existing_file.valid { + self.file_set_valid(existing_file.id, true)?; + } + + let mut existing_files_write = existing_files.write().unwrap(); + let maybe_existing_file = existing_files_write + .iter() + .enumerate() + .find(|(_, file)| file.path == normalized_path_str) + .map(|(idx, _)| idx); + + if let Some(idx) = maybe_existing_file { + existing_files_write.swap_remove(idx); + debug!("Unchanged: {:?}", path); + return Ok(UpdatePathOutcome::Unchanged(path)); + } + } + } + } else { + drop(existing_files_read); + } + + // If not, add it! + if file_hash.is_none() { + file_hash = Some(path.hash()?); + } + let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string()); + + self.insert_file_with_metadata( + &connection, + &normalized_path, + file_hash.unwrap(), + size, + mtime, + mime_type, + Some(resolve_cache), + ) + .map(|_| { + info!("Added: {:?}", path); + UpdatePathOutcome::Added(path.clone()) + }) + } + + fn add_file(&self, connection: &UpEndConnection, path: &Path, hash: Hash) -> Result
{ + let normalized_path = self.normalize_path(path)?; + let metadata = fs::metadata(&path)?; + let size = metadata.len() as i64; + let mtime = metadata + .modified() + .map(|t| { + NaiveDateTime::from_timestamp( + t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, + 0, + ) + }) + .ok(); + let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string()); + + self.insert_file_with_metadata( + connection, + &normalized_path, + hash, + size, + mtime, + mime_type, + None, + ) + } + + fn insert_file_with_metadata( + &self, + connection: &UpEndConnection, + normalized_path: &Path, + hash: Hash, + size: i64, + mtime: Option, + mime_type: Option, + resolve_cache: Option<&Arc>>, + ) -> Result
{ + let new_file = db::NewFile { + path: normalized_path + .to_str() + .ok_or(anyhow!("Path not UTF-8?!"))? + .to_string(), + hash: (hash.clone()).0, + added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), + size, + mtime, + }; + + let blob_address = Address::Hash(hash); + + // Metadata + let type_entry = Entry { + entity: blob_address.clone(), + attribute: String::from(IS_OF_TYPE_ATTR), + value: BLOB_TYPE_ADDR.clone().into(), + }; + + let size_entry = Entry { + entity: blob_address.clone(), + attribute: FILE_SIZE_KEY.to_string(), + value: (size as f64).into(), + }; + + let mime_entry = mime_type.map(|mime_type| Entry { + entity: blob_address.clone(), + attribute: FILE_MIME_KEY.to_string(), + value: mime_type.into(), + }); + + let added_entry = Entry { + entity: blob_address.clone(), + attribute: ADDED_ATTR.to_string(), + value: (SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as f64) + .into(), + }; + + // Add the appropriate entries w/r/t virtual filesystem location + let components = normalized_path.components().collect::>(); + let (filename, dir_path) = components.split_last().unwrap(); + + let upath = UHierPath( + iter::once(UNode::new("NATIVE").unwrap()) + .chain(dir_path.iter().map(|component| { + UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap() + })) + .collect(), + ); + let resolved_path = match resolve_cache { + Some(cache) => resolve_path_cached(connection, &upath, true, cache)?, + None => resolve_path(connection, &upath, true)?, + }; + let parent_dir = resolved_path.last().unwrap(); + + // Insert all + let file_count = self.insert_file(new_file)?; + + connection.insert_entry_immutable(type_entry)?; + connection.insert_entry_immutable(size_entry)?; + if file_count == 1 { + connection.insert_entry_immutable(added_entry)?; + } + if let Some(mime_entry) = mime_entry { + connection.insert_entry(mime_entry)?; + } + + let dir_has_entry = Entry { + entity: parent_dir.clone(), + attribute: HIER_HAS_ATTR.to_string(), + value: blob_address.clone().into(), + }; + let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?; + + let label_entry = Entry { + entity: blob_address.clone(), + attribute: LABEL_ATTR.to_string(), + value: filename.as_os_str().to_string_lossy().to_string().into(), + }; + let label_entry_addr = connection.insert_entry(label_entry)?; + + let alias_entry = Entry { + entity: dir_has_entry_addr, + attribute: ALIAS_KEY.to_string(), + value: label_entry_addr.into(), + }; + connection.insert_entry(alias_entry)?; + + Ok(blob_address) + } + + pub fn insert_file(&self, file: db::NewFile) -> Result { + debug!( + "Inserting {} ({})...", + &file.path, + Address::Hash(Hash((&file.hash).clone())) + ); + + // let _lock = self.lock.write().unwrap(); + let conn = self.manager.connect()?; + + diesel::insert_into(files::table) + .values(&file) + .execute(&conn)?; + + Ok(files::dsl::files + .filter(files::dsl::valid.eq(true)) + .filter(files::dsl::hash.eq(file.hash)) + .count() + .first::(&conn)? + .try_into() + .unwrap()) + } + + fn retrieve_file(&self, obj_hash: &Hash) -> Result> { + use self::db::files::dsl::*; + + // let _lock = self.lock.read().unwrap(); + let conn = self.manager.connect()?; + + let matches = files + .filter(valid.eq(true)) + .filter(hash.eq(&obj_hash.0)) + .load::(&conn)?; + + let matches = matches + .into_iter() + .map(|f| db::OutFile { + id: f.id, + hash: f.hash, + path: self.path.join(PathBuf::from(f.path)), + valid: f.valid, + added: f.added, + size: f.size, + mtime: f.mtime, + }) + .collect(); + + Ok(matches) + } + + fn retrieve_all_files(&self) -> Result> { + use self::db::files::dsl::*; + + // let _lock = self.lock.read().unwrap(); + let conn = self.manager.connect()?; + + let matches = files.load::(&conn)?; + Ok(matches) + } + + fn file_update_mtime(&self, file_id: i32, m_time: Option) -> Result { + use self::db::files::dsl::*; + debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); + + // let _lock = self.lock.write().unwrap(); + let conn = self.manager.connect()?; + + Ok(diesel::update(files.filter(id.eq(file_id))) + .set(mtime.eq(m_time)) + .execute(&conn)?) + } + + fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result { + use self::db::files::dsl::*; + debug!("Setting file ID {} to valid = {}", file_id, is_valid); + + // let _lock = self.lock.write().unwrap(); + let conn = self.manager.connect()?; + + Ok(diesel::update(files.filter(id.eq(file_id))) + .set(valid.eq(is_valid)) + .execute(&conn)?) + } + + fn normalize_path(&self, path: &Path) -> Result { + Ok(path + .canonicalize()? + .strip_prefix(self.path.as_path())? + .to_path_buf()) + } +} + +impl From for Blob { + fn from(of: db::OutFile) -> Self { + Blob { file_path: of.path } + } +} + +impl From for Blob { + fn from(f: db::File) -> Self { + Blob { + file_path: PathBuf::from(f.path), + } + } +} + +impl UpStore for FsStore { + fn retrieve(&self, hash: &crate::util::hash::Hash) -> Result, super::StoreError> { + Ok(self + .retrieve_file(hash) + .map_err(|e| StoreError::Unknown(e.to_string()))? + .into_iter() + .map(Blob::from) + .collect()) + } + + fn retrieve_all(&self) -> Result, super::StoreError> { + Ok(self + .retrieve_all_files() + .map_err(|e| StoreError::Unknown(e.to_string()))? + .into_iter() + .map(Blob::from) + .collect()) + } + + fn store>>( + &self, + connection: UpEndConnection, + blob: Blob, + name_hint: S, + ) -> Result { + let file_path = blob.get_file_path(); + let hash = file_path + .hash() + .map_err(|e| StoreError::Unknown(e.to_string()))?; + + let existing_files = self.retrieve(&hash)?; + + if existing_files.is_empty() { + let address = Address::Hash(hash.clone()); + let addr_str = b58_encode( + address + .encode() + .map_err(|e| StoreError::Unknown(e.to_string()))?, + ); + + let final_name = if let Some(name_hint) = name_hint.into() { + format!("{addr_str}_{name_hint}") + } else { + addr_str + }; + + let final_path = self.path.join(&final_name); + + fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?; + + + self.add_file(&connection, &final_path, hash.clone()) + .map_err(|e| StoreError::Unknown(e.to_string()))?; + } + + Ok(hash) + } + + fn update>( + &self, + db: D, + mut job_container: JobContainer, + ) -> Result, StoreError> { + let job_result = job_container.add_job("REIMPORT", "Scaning vault directory..."); + + match job_result { + Ok(job_handle) => { + let result = self.rescan_vault(db, job_handle, true, false); + + if let Err(err) = &result { + error!("Update did not succeed! {:?}", err); + } + + result.map_err(|err| StoreError::Unknown(err.to_string())) + } + Err(err) => Err(StoreError::Unknown(err.to_string())), + } + } +} + +#[cfg(test)] +mod test { + use crate::database::UpEndDatabase; + use crate::util::jobs::JobContainer; + + use super::*; + use std::fs::File; + use std::io::Write; + use tempfile::TempDir; + + use std::sync::Once; + use tracing_subscriber::filter::{EnvFilter, LevelFilter}; + + static INIT: Once = Once::new(); + + pub fn initialize() { + INIT.call_once(|| { + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::DEBUG.into()) + .from_env_lossy(), + ) + .init(); + }) + } + + #[test] + fn test_update() { + // Prepare temporary filesystem structure + let temp_dir = TempDir::new().unwrap(); + + let file_path = temp_dir.path().join("my-temporary-note.txt"); + let mut tmp_file = File::create(file_path).unwrap(); + writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); + + let file_path = temp_dir.path().join("hello-world.txt"); + let mut tmp_file = File::create(file_path).unwrap(); + writeln!(tmp_file, "Hello, World!").unwrap(); + + let file_path = temp_dir.path().join("empty"); + File::create(file_path).unwrap(); + + // Initialize database + let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap(); + let store = FsStore::from_path(&temp_dir).unwrap(); + let job_container = JobContainer::new(); + + // Store scan + let rescan_result = store.update(&open_result.db, job_container.clone()); + assert!(rescan_result.is_ok()); + } + + #[test] + fn test_rescan_quick() { + initialize(); + _test_rescan(true) + } + + #[test] + fn test_rescan_full() { + initialize(); + _test_rescan(false) + } + + fn _test_rescan(quick: bool) { + // Prepare temporary filesystem structure + let temp_dir = TempDir::new().unwrap(); + + let file_path = temp_dir.path().join("my-temporary-note.txt"); + let mut tmp_file = File::create(file_path).unwrap(); + writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); + + let file_path = temp_dir.path().join("hello-world.txt"); + let mut tmp_file = File::create(file_path).unwrap(); + writeln!(tmp_file, "Hello, World!").unwrap(); + + let file_path = temp_dir.path().join("empty"); + File::create(file_path).unwrap(); + + // Initialize database + let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap(); + let store = FsStore::from_path(&temp_dir).unwrap(); + let mut job_container = JobContainer::new(); + + // Initial scan + let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); + let rescan_result = store.rescan_vault(&open_result.db, job, quick, true); + + assert!(rescan_result.is_ok()); + let rescan_result = rescan_result.unwrap(); + assert_eq!(rescan_result.len(), 3); + rescan_result + .into_iter() + .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_)))); + + // Modification-less rescan + let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); + let rescan_result = store.rescan_vault(&open_result.db, job, quick, false); + + assert!(rescan_result.is_ok()); + let rescan_result = rescan_result.unwrap(); + assert_eq!(rescan_result.len(), 3); + rescan_result + .into_iter() + .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_)))); + + // Remove a file + std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap(); + + let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); + let rescan_result = store.rescan_vault(&open_result.db, job, quick, false); + + assert!(rescan_result.is_ok()); + let rescan_result = rescan_result.unwrap(); + assert_eq!(rescan_result.len(), 3); + assert_eq!( + 2, + rescan_result + .iter() + .filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_))) + .count() + ); + assert_eq!( + 1, + rescan_result + .iter() + .filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_))) + .count() + ); + } +} diff --git a/src/database/stores/mod.rs b/src/database/stores/mod.rs new file mode 100644 index 0000000..66ca8d4 --- /dev/null +++ b/src/database/stores/mod.rs @@ -0,0 +1,64 @@ +use std::{ + borrow::Borrow, + path::{Path, PathBuf}, +}; + +use super::{UpEndDatabase, UpEndConnection}; +use crate::util::{hash::Hash, jobs::JobContainer}; + +pub mod fs; + +#[derive(Debug, Clone)] +pub enum StoreError { + NotFound, + Unknown(String), +} + +impl std::fmt::Display for StoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "STORE ERROR") + } +} + +impl std::error::Error for StoreError {} + +type Result = std::result::Result; + +pub struct Blob { + file_path: PathBuf, +} + +impl Blob { + pub fn from_filepath>(path: P) -> Blob { + Blob { + file_path: PathBuf::from(path.as_ref()), + } + } + + pub fn get_file_path(&self) -> &Path { + self.file_path.as_path() + } +} + +#[derive(Debug)] +pub enum UpdatePathOutcome { + Added(PathBuf), + Unchanged(PathBuf), + Removed(PathBuf), + Failed(PathBuf, StoreError), +} +pub trait UpStore { + fn retrieve(&self, hash: &Hash) -> Result>; + fn retrieve_all(&self) -> Result>; + fn store>>( + &self, + connection: UpEndConnection, + blob: Blob, + name_hint: S, + ) -> Result; + fn update>( + &self, + database: D, + job_container: JobContainer, + ) -> Result>; +} diff --git a/src/extractors/audio.rs b/src/extractors/audio.rs index 67e0e44..62e0b95 100644 --- a/src/extractors/audio.rs +++ b/src/extractors/audio.rs @@ -4,9 +4,12 @@ use crate::{ database::{ constants, entry::{Entry, EntryValue}, + stores::{ + fs::{FsStore, FILE_MIME_KEY}, + UpStore, + }, UpEndConnection, }, - filesystem::FILE_MIME_KEY, util::jobs::{JobContainer, JobState}, }; use anyhow::{anyhow, Result}; @@ -18,6 +21,7 @@ impl Extractor for ID3Extractor { &self, address: &Address, connection: &UpEndConnection, + store: &FsStore, mut job_container: JobContainer, ) -> Result> { if let Address::Hash(hash) = address { @@ -34,14 +38,15 @@ impl Extractor for ID3Extractor { return Ok(vec![]); } - let files = connection.retrieve_file(hash)?; + let files = store.retrieve(hash)?; if let Some(file) = files.get(0) { + let file_path = file.get_file_path(); let mut job_handle = job_container.add_job( None, &format!( r#"Getting ID3 info from "{:}""#, - file.path + file_path .components() .last() .unwrap() @@ -50,7 +55,7 @@ impl Extractor for ID3Extractor { ), )?; - let tags = id3::Tag::read_from_path(&file.path)?; + let tags = id3::Tag::read_from_path(&file_path)?; let result: Vec = tags .frames() diff --git a/src/extractors/mod.rs b/src/extractors/mod.rs index 2644e16..72d93bf 100644 --- a/src/extractors/mod.rs +++ b/src/extractors/mod.rs @@ -1,5 +1,6 @@ use crate::{ addressing::Address, + database::stores::fs::FsStore, database::{entry::Entry, UpEndConnection, UpEndDatabase}, util::jobs::JobContainer, }; @@ -25,6 +26,7 @@ pub trait Extractor { &self, address: &Address, connection: &UpEndConnection, + store: &FsStore, job_container: JobContainer, ) -> Result>; @@ -36,10 +38,11 @@ pub trait Extractor { &self, address: &Address, connection: &UpEndConnection, + store: &FsStore, job_container: JobContainer, ) -> Result { if self.is_needed(address, connection)? { - let entries = self.get(address, connection, job_container)?; + let entries = self.get(address, connection, store, job_container)?; connection.transaction(|| { let len = entries.len(); @@ -54,13 +57,15 @@ pub trait Extractor { } } -pub fn extract_all>( +pub fn extract_all, S: Borrow>( db: D, + store: S, mut job_container: JobContainer, ) -> Result { info!("Extracting metadata for all addresses."); let db = db.borrow(); + let store = store.borrow(); let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?; let all_addresses = db.connection()?.get_all_addresses()?; @@ -72,7 +77,7 @@ pub fn extract_all>( .par_iter() .map(|address| { let connection = db.connection()?; - let extract_result = extract(address, &connection, job_container.clone()); + let extract_result = extract(address, &connection, store, job_container.clone()); let mut cnt = count.write().unwrap(); *cnt += 1; @@ -99,6 +104,7 @@ pub fn extract_all>( pub fn extract( address: &Address, connection: &UpEndConnection, + store: &FsStore, job_container: JobContainer, ) -> Result { let mut entry_count = 0; @@ -106,18 +112,20 @@ pub fn extract( #[cfg(feature = "extractors-web")] { - entry_count += web::WebExtractor.insert_info(address, connection, job_container.clone())?; + entry_count += + web::WebExtractor.insert_info(address, connection, store, job_container.clone())?; } #[cfg(feature = "extractors-audio")] { entry_count += - audio::ID3Extractor.insert_info(address, connection, job_container.clone())?; + audio::ID3Extractor.insert_info(address, connection, store, job_container.clone())?; } #[cfg(feature = "extractors-photo")] { - entry_count += photo::ExifExtractor.insert_info(address, connection, job_container)?; + entry_count += + photo::ExifExtractor.insert_info(address, connection, store, job_container)?; } trace!("Extracting metadata for {address:?} - got {entry_count} entries."); diff --git a/src/extractors/photo.rs b/src/extractors/photo.rs index c650308..d89b1a9 100644 --- a/src/extractors/photo.rs +++ b/src/extractors/photo.rs @@ -4,9 +4,9 @@ use crate::{ database::{ constants, entry::{Entry, EntryValue}, + stores::{fs::{FILE_MIME_KEY, FsStore}, UpStore}, UpEndConnection, }, - filesystem::FILE_MIME_KEY, util::jobs::{JobContainer, JobState}, }; use anyhow::{anyhow, Result}; @@ -21,6 +21,7 @@ impl Extractor for ExifExtractor { &self, address: &Address, connection: &UpEndConnection, + store: &FsStore, mut job_container: JobContainer, ) -> Result> { if let Address::Hash(hash) = address { @@ -37,14 +38,15 @@ impl Extractor for ExifExtractor { return Ok(vec![]); } - let files = connection.retrieve_file(hash)?; + let files = store.retrieve(hash)?; if let Some(file) = files.get(0) { + let file_path = file.get_file_path(); let mut job_handle = job_container.add_job( None, &format!( r#"Getting EXIF info from "{:}""#, - file.path + file_path .components() .last() .unwrap() @@ -53,7 +55,7 @@ impl Extractor for ExifExtractor { ), )?; - let file = std::fs::File::open(&file.path)?; + let file = std::fs::File::open(&file_path)?; let mut bufreader = std::io::BufReader::new(&file); let exifreader = exif::Reader::new(); let exif = exifreader.read_from_container(&mut bufreader)?; diff --git a/src/extractors/web.rs b/src/extractors/web.rs index 5ea2423..20208a1 100644 --- a/src/extractors/web.rs +++ b/src/extractors/web.rs @@ -1,7 +1,7 @@ use super::Extractor; use crate::{ addressing::Address, - database::{entry::Entry, UpEndConnection}, + database::{entry::Entry, stores::fs::FsStore, UpEndConnection}, util::jobs::{JobContainer, JobState}, }; use anyhow::anyhow; @@ -16,6 +16,7 @@ impl Extractor for WebExtractor { &self, address: &Address, _: &UpEndConnection, + _: &FsStore, mut job_container: JobContainer, ) -> anyhow::Result> { if let Address::Url(url) = address { @@ -92,12 +93,13 @@ mod test { let temp_dir = TempDir::new().unwrap(); let open_result = crate::database::UpEndDatabase::open(&temp_dir, None, true)?; let connection = open_result.db.connection()?; + let store = FsStore::from_path(&temp_dir)?; let job_container = JobContainer::new(); let address = Address::Url("https://upend.dev".into()); assert!(WebExtractor.is_needed(&address, &connection)?); - WebExtractor.insert_info(&address, &connection, job_container)?; + WebExtractor.insert_info(&address, &connection, &store, job_container)?; assert!(!WebExtractor.is_needed(&address, &connection)?); diff --git a/src/filesystem.rs b/src/filesystem.rs deleted file mode 100644 index 49aac00..0000000 --- a/src/filesystem.rs +++ /dev/null @@ -1,573 +0,0 @@ -use std::borrow::Borrow; -use std::convert::TryFrom; -use std::path::{Component, Path, PathBuf}; -use std::sync::{Arc, Mutex, RwLock}; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; -use std::{fs, iter}; - -use crate::addressing::Address; -use crate::database::constants::{ - ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, - TYPE_HAS_ATTR, -}; -use crate::database::entry::{Entry, InvariantEntry}; -use crate::database::hierarchies::{ - resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode, -}; -use crate::database::inner::models; -use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR}; -use crate::util::hash::{Hash, Hashable}; -use crate::util::jobs::{JobContainer, JobHandle}; -use anyhow::{anyhow, Error, Result}; -use chrono::prelude::*; -use log::{debug, error, info, warn}; -use lru::LruCache; -use rayon::prelude::*; -use walkdir::WalkDir; - -const BLOB_TYPE: &str = "BLOB"; -const ALIAS_KEY: &str = "ALIAS"; -pub const FILE_MIME_KEY: &str = "FILE_MIME"; -const FILE_MTIME_KEY: &str = "FILE_MTIME"; -const FILE_SIZE_KEY: &str = "FILE_SIZE"; - -lazy_static! { - static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry { - attribute: String::from(TYPE_BASE_ATTR), - value: BLOB_TYPE.into(), - }; - static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); -} - -fn initialize_types(connection: &UpEndConnection) -> Result<()> { - // BLOB_TYPE - connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; - upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?; - - Ok(()) -} - -pub fn rescan_vault>( - db: D, - mut job_container: JobContainer, - quick_check: bool, - disable_synchronous: bool, -) -> Result> { - let job_result = job_container.add_job("REIMPORT", "Scaning vault directory..."); - - match job_result { - Ok(job_handle) => { - let result = rescan_vault_inner(db, job_handle, quick_check, disable_synchronous); - - if let Err(err) = &result { - error!("Update did not succeed! {:?}", err); - } - - result - } - Err(err) => Err(err), - } -} -struct PragmaSynchronousGuard<'a>(&'a UpEndConnection); - -impl Drop for PragmaSynchronousGuard<'_> { - fn drop(&mut self) { - debug!("Re-enabling synchronous mode."); - let res = self.0.execute("PRAGMA synchronous = NORMAL;"); - if let Err(err) = res { - error!( - "Error setting synchronous mode back to NORMAL! Data loss possible! {}", - err - ); - } - } -} - -type UpdatePathResult = Result; - -#[derive(Debug)] -pub enum UpdatePathOutcome { - Added(PathBuf), - Unchanged(PathBuf), - Removed(PathBuf), - Failed(PathBuf, Error), -} - -fn rescan_vault_inner>( - db: D, - job_handle: JobHandle, - quick_check: bool, - disable_synchronous: bool, -) -> Result> { - let start = Instant::now(); - info!("Vault rescan started."); - - let db = db.borrow(); - let connection = db.connection()?; - - // Initialize types, etc... - debug!("Initializing DB types."); - initialize_types(&connection)?; - - // Disable syncing in SQLite for the duration of the import - let mut _guard: Option = None; - if disable_synchronous { - debug!("Disabling SQLite synchronous mode"); - connection.execute("PRAGMA synchronous = OFF;")?; - _guard = Some(PragmaSynchronousGuard(&connection)); - } - - // Walk through the vault, find all paths - debug!("Traversing vault directory"); - let absolute_dir_path = fs::canonicalize(&*db.vault_path)?; - let path_entries: Vec = WalkDir::new(&*db.vault_path) - .follow_links(true) - .into_iter() - .filter_map(|e| e.ok()) - .filter_map(|e| fs::canonicalize(e.into_path()).ok()) - .filter(|e| e.is_file()) - .filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR))) - .collect(); - - // Prepare for processing - let existing_files = Arc::new(RwLock::new(connection.retrieve_all_files()?)); - - // Actual processing - let count = RwLock::new(0_usize); - let resolve_cache = Arc::new(Mutex::new(LruCache::new(256))); - let total = path_entries.len() as f32; - let shared_job_handle = Arc::new(Mutex::new(job_handle)); - let path_outcomes: Vec = path_entries - .into_par_iter() - .map(|path| { - let result = process_directory_entry( - db.connection().unwrap(), - &resolve_cache, - path.clone(), - &existing_files, - quick_check, - ); - - let mut cnt = count.write().unwrap(); - *cnt += 1; - - let _ = shared_job_handle - .lock() - .unwrap() - .update_progress(*cnt as f32 / total * 100.0); - - match result { - Ok(result) => result, - Err(error) => { - error!("Failed to update {:?} ({})", path, error); - UpdatePathOutcome::Failed(path, error) - } - } - }) - .collect(); - - debug!("Processing done, cleaning up..."); - - let existing_files = existing_files.read().unwrap(); - - let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| { - let trans_result = connection.transaction::<_, Error, _>(|| { - connection.file_set_valid(file.id, false)?; - // remove_object(&connection, )? - Ok(()) - }); - - match trans_result { - Ok(_) => { - info!("Removed: {:?}", file.path); - UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())) - } - Err(error) => UpdatePathOutcome::Failed(PathBuf::from(file.path.clone()), error), - } - }); - - // Re-enable SQLite syncing - drop(_guard); - - // Reporting - let all_outcomes = path_outcomes - .into_iter() - .chain(cleanup_results) - .collect::>(); - - let mut failed: Vec<(&PathBuf, &Error)> = vec![]; - let mut created = 0; - let mut unchanged = 0; - let mut deleted = 0; - - for outcome in &all_outcomes { - match outcome { - UpdatePathOutcome::Added(_) => created += 1, - UpdatePathOutcome::Unchanged(_) => unchanged += 1, - UpdatePathOutcome::Removed(_) => deleted += 1, - UpdatePathOutcome::Failed(path, err) => failed.push((path, err)), - } - } - - if !failed.is_empty() { - warn!( - "{} path updates failed! ({})", - failed.len(), - failed - .iter() - .map(|(path, error)| format!("{:?}: {}", path, error)) - .collect::>() - .join(", ") - ) - } - - info!( - "Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.", - db.vault_path, - created, - deleted, - unchanged, - start.elapsed().as_secs() - ); - - Ok(all_outcomes) -} - -fn process_directory_entry( - connection: UpEndConnection, - resolve_cache: &Arc>, - path: PathBuf, - existing_files: &Arc>>, - quick_check: bool, -) -> UpdatePathResult { - debug!("Processing: {:?}", path); - - // Prepare the data - let existing_files = Arc::clone(existing_files); - - let normalized_path = connection.normalize_path(&path)?; - let normalized_path_str = normalized_path - .to_str() - .ok_or(anyhow!("Path not valid unicode!"))?; - - let mut file_hash: Option = None; - - // Get size & mtime for quick comparison - let metadata = fs::metadata(&path)?; - let size = metadata.len() as i64; - if size < 0 { - panic!("File {} too large?!", path.display()); - } - let mtime = metadata - .modified() - .map(|t| { - NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0) - }) - .ok(); - - // Check if the path entry for this file already exists in database - let existing_files_read = existing_files.read().unwrap(); - let maybe_existing_file = existing_files_read - .iter() - .find(|file| file.path == normalized_path_str); - - if let Some(existing_file) = maybe_existing_file { - let existing_file = existing_file.clone(); - drop(existing_files_read); - - if !quick_check || size == existing_file.size { - let same_mtime = mtime.is_some() && mtime == existing_file.mtime; - let mut same_hash = false; - - if !quick_check || !same_mtime { - file_hash = Some(path.hash()?); - same_hash = file_hash.as_ref().unwrap() == &existing_file.hash; - } - - if same_mtime || same_hash { - if mtime != existing_file.mtime { - connection.file_update_mtime(existing_file.id, mtime)?; - } - - if !existing_file.valid { - connection.file_set_valid(existing_file.id, true)?; - } - - let mut existing_files_write = existing_files.write().unwrap(); - let maybe_existing_file = existing_files_write - .iter() - .enumerate() - .find(|(_, file)| file.path == normalized_path_str) - .map(|(idx, _)| idx); - - if let Some(idx) = maybe_existing_file { - existing_files_write.swap_remove(idx); - debug!("Unchanged: {:?}", path); - return Ok(UpdatePathOutcome::Unchanged(path)); - } - } - } - } else { - drop(existing_files_read); - } - - // If not, add it! - if file_hash.is_none() { - file_hash = Some(path.hash()?); - } - let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string()); - - insert_file_with_metadata( - &connection, - &normalized_path, - file_hash.unwrap(), - size, - mtime, - mime_type, - Some(resolve_cache), - ) - .map(|_| { - info!("Added: {:?}", path); - UpdatePathOutcome::Added(path.clone()) - }) -} - -pub fn add_file(connection: &UpEndConnection, path: &Path, hash: Hash) -> Result
{ - let normalized_path = connection.normalize_path(path)?; - let metadata = fs::metadata(&path)?; - let size = metadata.len() as i64; - let mtime = metadata - .modified() - .map(|t| { - NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0) - }) - .ok(); - let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string()); - - insert_file_with_metadata( - connection, - &normalized_path, - hash, - size, - mtime, - mime_type, - None, - ) -} - -fn insert_file_with_metadata( - connection: &UpEndConnection, - normalized_path: &Path, - hash: Hash, - size: i64, - mtime: Option, - mime_type: Option, - resolve_cache: Option<&Arc>>, -) -> Result
{ - let new_file = models::NewFile { - path: normalized_path - .to_str() - .ok_or(anyhow!("Path not UTF-8?!"))? - .to_string(), - hash: (hash.clone()).0, - added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), - size, - mtime, - }; - - let blob_address = Address::Hash(hash); - - // Metadata - let type_entry = Entry { - entity: blob_address.clone(), - attribute: String::from(IS_OF_TYPE_ATTR), - value: BLOB_TYPE_ADDR.clone().into(), - }; - - let size_entry = Entry { - entity: blob_address.clone(), - attribute: FILE_SIZE_KEY.to_string(), - value: (size as f64).into(), - }; - - let mime_entry = mime_type.map(|mime_type| Entry { - entity: blob_address.clone(), - attribute: FILE_MIME_KEY.to_string(), - value: mime_type.into(), - }); - - let added_entry = Entry { - entity: blob_address.clone(), - attribute: ADDED_ATTR.to_string(), - value: (SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() as f64) - .into(), - }; - - // Add the appropriate entries w/r/t virtual filesystem location - let components = normalized_path.components().collect::>(); - let (filename, dir_path) = components.split_last().unwrap(); - - let upath = UHierPath( - iter::once(UNode::new("NATIVE").unwrap()) - .chain(dir_path.iter().map(|component| { - UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap() - })) - .collect(), - ); - let resolved_path = match resolve_cache { - Some(cache) => resolve_path_cached(connection, &upath, true, cache)?, - None => resolve_path(connection, &upath, true)?, - }; - let parent_dir = resolved_path.last().unwrap(); - - // Insert all - connection.transaction::<_, Error, _>(|| { - let file_count = connection.insert_file(new_file)?; - - connection.insert_entry_immutable(type_entry)?; - connection.insert_entry_immutable(size_entry)?; - if file_count == 1 { - connection.insert_entry_immutable(added_entry)?; - } - if let Some(mime_entry) = mime_entry { - connection.insert_entry(mime_entry)?; - } - - let dir_has_entry = Entry { - entity: parent_dir.clone(), - attribute: HIER_HAS_ATTR.to_string(), - value: blob_address.clone().into(), - }; - let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?; - - let label_entry = Entry { - entity: blob_address.clone(), - attribute: LABEL_ATTR.to_string(), - value: filename.as_os_str().to_string_lossy().to_string().into(), - }; - let label_entry_addr = connection.insert_entry(label_entry)?; - - let alias_entry = Entry { - entity: dir_has_entry_addr, - attribute: ALIAS_KEY.to_string(), - value: label_entry_addr.into(), - }; - connection.insert_entry(alias_entry)?; - - Ok(blob_address) - }) -} - -#[cfg(test)] -mod test { - use crate::database::UpEndDatabase; - use crate::util::jobs::JobContainer; - - use super::*; - use std::fs::File; - use std::io::Write; - use tempfile::TempDir; - - use std::sync::Once; - use tracing_subscriber::filter::{EnvFilter, LevelFilter}; - - static INIT: Once = Once::new(); - - pub fn initialize() { - INIT.call_once(|| { - tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::builder() - .with_default_directive(LevelFilter::DEBUG.into()) - .from_env_lossy(), - ) - .init(); - }) - } - - #[test] - fn test_rescan_quick() { - initialize(); - _test_rescan(true) - } - - #[test] - fn test_rescan_full() { - initialize(); - _test_rescan(false) - } - - fn _test_rescan(quick: bool) { - // Prepare temporary filesystem structure - let temp_dir = TempDir::new().unwrap(); - - let file_path = temp_dir.path().join("my-temporary-note.txt"); - let mut tmp_file = File::create(file_path).unwrap(); - writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); - - let file_path = temp_dir.path().join("hello-world.txt"); - let mut tmp_file = File::create(file_path).unwrap(); - writeln!(tmp_file, "Hello, World!").unwrap(); - - let file_path = temp_dir.path().join("empty"); - File::create(file_path).unwrap(); - - // Initialize database - - let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap(); - let mut job_container = JobContainer::new(); - let _job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); - - // Initial scan - let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true); - - assert!(rescan_result.is_ok()); - let rescan_result = rescan_result.unwrap(); - assert_eq!(rescan_result.len(), 3); - rescan_result - .into_iter() - .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_)))); - - // Modification-less rescan - - let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, false); - - assert!(rescan_result.is_ok()); - let rescan_result = rescan_result.unwrap(); - assert_eq!(rescan_result.len(), 3); - rescan_result - .into_iter() - .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_)))); - - // Remove a file - - std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap(); - - let rescan_result = rescan_vault(&open_result.db, job_container, quick, false); - - assert!(rescan_result.is_ok()); - let rescan_result = rescan_result.unwrap(); - assert_eq!(rescan_result.len(), 3); - assert_eq!( - 2, - rescan_result - .iter() - .filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_))) - .count() - ); - assert_eq!( - 1, - rescan_result - .iter() - .filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_))) - .count() - ); - } -} diff --git a/src/main.rs b/src/main.rs index 987c704..14cebeb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,10 @@ use tracing_subscriber::filter::{EnvFilter, LevelFilter}; use crate::{ common::{get_static_dir, PKG_VERSION}, - database::UpEndDatabase, + database::{ + stores::{fs::FsStore, UpStore}, + UpEndDatabase, + }, util::{exec::block_background, jobs::JobContainer}, }; @@ -27,7 +30,6 @@ mod addressing; mod common; mod database; mod extractors; -mod filesystem; mod previews; mod routes; mod util; @@ -126,6 +128,7 @@ fn main() -> Result<()> { .expect("failed to open database!"); let upend = Arc::new(open_result.db); + let store = Arc::new(FsStore::from_path(vault_path.clone()).unwrap()); let ui_path = get_static_dir("webui"); if ui_path.is_err() { @@ -138,22 +141,22 @@ fn main() -> Result<()> { let ui_enabled = ui_path.is_ok() && !matches.is_present("NO_UI"); let browser_enabled = desktop_enabled && !matches.is_present("NO_BROWSER"); - let preview_path = upend.db_path.join("previews"); + let preview_dir = tempfile::tempdir().unwrap(); #[cfg(feature = "previews")] let preview_store = Some(Arc::new(crate::previews::PreviewStore::new( - preview_path.clone(), - upend.clone(), + preview_dir.path(), + store.clone(), ))); - if matches.is_present("CLEAN") { - info!("Cleaning temporary directories..."); - if preview_path.exists() { - std::fs::remove_dir_all(&preview_path).unwrap(); - debug!("Removed {preview_path:?}"); - } else { - debug!("No preview path exists, continuing..."); - } - } + // if matches.is_present("CLEAN") { + // info!("Cleaning temporary directories..."); + // if preview_path.exists() { + // std::fs::remove_dir_all(&preview_path).unwrap(); + // debug!("Removed {preview_path:?}"); + // } else { + // debug!("No preview path exists, continuing..."); + // } + // } #[cfg(not(feature = "previews"))] let preview_store = None; @@ -194,6 +197,7 @@ fn main() -> Result<()> { .into_owned() }), ), + store, job_container: job_container.clone(), preview_store, desktop_enabled, @@ -235,7 +239,6 @@ fn main() -> Result<()> { .service(routes::list_hier) .service(routes::list_hier_roots) .service(routes::store_info) - .service(routes::get_file) .service(routes::get_jobs) .service(routes::get_info); @@ -266,14 +269,10 @@ fn main() -> Result<()> { if !matches.is_present("NO_INITIAL_UPDATE") { info!("Running initial update..."); - let new = open_result.new; + // let new = open_result.new; block_background::<_, _, anyhow::Error>(move || { - let _ = if new { - filesystem::rescan_vault(upend.clone(), job_container.clone(), false, true) - } else { - filesystem::rescan_vault(upend.clone(), job_container.clone(), true, false) - }; - let _ = extractors::extract_all(upend, job_container); + state.store.update(upend.clone(), job_container.clone()); + let _ = extractors::extract_all(upend, state.store, job_container); Ok(()) }) } diff --git a/src/previews/mod.rs b/src/previews/mod.rs index 02d9c18..7364c69 100644 --- a/src/previews/mod.rs +++ b/src/previews/mod.rs @@ -1,3 +1,5 @@ +use crate::database::stores::fs::FsStore; +use crate::database::stores::UpStore; use crate::util::hash::Hash; use crate::util::jobs::{JobContainer, JobState}; use crate::{database::UpEndDatabase, util::hash::b58_encode}; @@ -27,17 +29,17 @@ pub trait Previewable { } pub struct PreviewStore { path: PathBuf, - db: Arc, + store: Arc, locks: Mutex>>>, } #[cfg(feature = "previews")] impl PreviewStore { - pub fn new>(path: P, db: Arc) -> Self { + pub fn new>(path: P, store: Arc) -> Self { PreviewStore { path: PathBuf::from(path.as_ref()), - db, + store, locks: Mutex::new(HashMap::new()), } } @@ -71,12 +73,12 @@ impl PreviewStore { Ok(Some(thumbpath.clone())) } else { trace!("Calculating preview for {hash:?}..."); - let connection = self.db.connection()?; - let files = connection.retrieve_file(&hash)?; + let files = self.store.retrieve(&hash)?; if let Some(file) = files.get(0) { + let file_path = file.get_file_path(); let mut job_handle = job_container.add_job( None, - &format!("Creating preview for {:?}", file.path.file_name().unwrap()), + &format!("Creating preview for {:?}", file_path.file_name().unwrap()), )?; let mime_type = mime_type.into(); @@ -84,18 +86,18 @@ impl PreviewStore { let mime_type: Option = if mime_type.is_some() { mime_type } else { - tree_magic_mini::from_filepath(&file.path).map(|m| m.into()) + tree_magic_mini::from_filepath(&file_path).map(|m| m.into()) }; let preview = match mime_type { - Some(tm) if tm.starts_with("text") => TextPath(&file.path).get_thumbnail(), + Some(tm) if tm.starts_with("text") => TextPath(&file_path).get_thumbnail(), Some(tm) if tm.starts_with("video") || tm == "application/x-matroska" => { - VideoPath(&file.path).get_thumbnail() + VideoPath(&file_path).get_thumbnail() } Some(tm) if tm.starts_with("audio") || tm == "application/x-riff" => { - AudioPath(&file.path).get_thumbnail() + AudioPath(&file_path).get_thumbnail() } - Some(tm) if tm.starts_with("image") => ImagePath(&file.path).get_thumbnail(), + Some(tm) if tm.starts_with("image") => ImagePath(&file_path).get_thumbnail(), Some(unknown) => Err(anyhow!("No capability for {:?} thumbnails.", unknown)), _ => Err(anyhow!("Unknown file type, or file doesn't exist.")), }; diff --git a/src/routes.rs b/src/routes.rs index 1422fe8..a601696 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -3,9 +3,10 @@ use crate::database::constants::{ADDED_ATTR, LABEL_ATTR}; use crate::database::entry::{Entry, EntryValue, InvariantEntry}; use crate::database::hierarchies::{list_roots, resolve_path, UHierPath}; use crate::database::lang::Query; +use crate::database::stores::fs::FsStore; +use crate::database::stores::{Blob, UpStore}; use crate::database::UpEndDatabase; use crate::extractors::{self}; -use crate::filesystem::add_file; use crate::previews::PreviewStore; use crate::util::exec::block_background; use crate::util::hash::{b58_decode, b58_encode, Hashable}; @@ -26,13 +27,12 @@ use futures_util::TryStreamExt; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; -use std::fs; use std::io::Write; use std::path::PathBuf; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, io}; use tempfile::NamedTempFile; use uuid::Uuid; @@ -42,6 +42,7 @@ use is_executable::IsExecutable; #[derive(Clone)] pub struct State { pub upend: Arc, + pub store: Arc, pub vault_name: Option, pub job_container: JobContainer, pub preview_store: Option>, @@ -131,11 +132,13 @@ pub async fn get_raw( let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _hash = hash.clone(); - let files = web::block(move || connection.retrieve_file(_hash.as_ref())) + let _store = state.store.clone(); + let blobs = web::block(move || _store.retrieve(_hash.as_ref())) .await .map_err(ErrorInternalServerError)?; - if let Some(file) = files.get(0) { - let file_path = state.upend.vault_path.join(&file.path); + if let Some(blob) = blobs.get(0) { + let file_path = blob.get_file_path(); + if query.native.is_none() { return Ok(Either::A( NamedFile::open(file_path)? @@ -173,12 +176,9 @@ pub async fn get_raw( http::header::WARNING.to_string(), ); - file_path - .parent() - .ok_or_else(|| { - ErrorInternalServerError("No parent to open as fallback.") - })? - .to_path_buf() + file_path.parent().ok_or_else(|| { + ErrorInternalServerError("No parent to open as fallback.") + })? }; opener::open(path).map_err(error::ErrorServiceUnavailable)?; return Ok(Either::B(response.finish())); @@ -438,9 +438,10 @@ pub async fn put_object( let _address = address.clone(); let _job_container = state.job_container.clone(); + let _store = state.store.clone(); block_background::<_, _, anyhow::Error>(move || { let extract_result = - extractors::extract(&_address, &connection, _job_container); + extractors::extract(&_address, &connection, &_store, _job_container); if let Ok(entry_count) = extract_result { debug!("Added {entry_count} extracted entries for {_address:?}"); } else { @@ -490,40 +491,17 @@ pub async fn put_object( while let Some(chunk) = field.try_next().await? { file = web::block(move || file.write_all(&chunk).map(|_| file)).await?; } - let path = PathBuf::from(file.path()); - let hash = web::block(move || path.hash()).await?; - let address = Address::Hash(hash.clone()); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; + let _store = state.store.clone(); + let _filename = filename.clone(); + let hash = web::block(move || { + _store.store(connection, Blob::from_filepath(file.path()), _filename) + }) + .await + .map_err(ErrorInternalServerError)?; - let _hash = hash.clone(); - let existing_files = web::block(move || connection.retrieve_file(&_hash)) - .await - .map_err(ErrorInternalServerError)?; - - if existing_files.is_empty() { - let addr_str = b58_encode(address.encode().map_err(ErrorInternalServerError)?); - let final_name = if let Some(ref filename) = filename { - format!("{addr_str}_{filename}") - } else { - addr_str - }; - - let final_path = state.upend.vault_path.join(&final_name); - - let (_, tmp_path) = file.keep().map_err(ErrorInternalServerError)?; - let final_path = web::block::<_, _, io::Error>(move || { - fs::copy(&tmp_path, &final_path)?; - fs::remove_file(tmp_path)?; - Ok(final_path) - }) - .await?; - - let connection = state.upend.connection().map_err(ErrorInternalServerError)?; - web::block(move || add_file(&connection, &final_path, hash)) - .await - .map_err(ErrorInternalServerError)?; - } + let address = Address::Hash(hash); if let Some(ref filename) = filename { let connection = state.upend.connection().map_err(ErrorInternalServerError)?; @@ -537,10 +515,11 @@ pub async fn put_object( let _address = address.clone(); let _job_container = state.job_container.clone(); + let _store = state.store.clone(); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; block_background::<_, _, anyhow::Error>(move || { let extract_result = - extractors::extract(&_address, &connection, _job_container); + extractors::extract(&_address, &connection, &_store, _job_container); if let Ok(entry_count) = extract_result { debug!("Added {entry_count} extracted entries for {_address:?}"); } else { @@ -729,13 +708,14 @@ pub async fn api_refresh( check_auth(&req, &state)?; block_background::<_, _, anyhow::Error>(move || { - let _ = crate::filesystem::rescan_vault( + let _ = state + .store + .update(state.upend.clone(), state.job_container.clone()); + let _ = crate::extractors::extract_all( state.upend.clone(), + state.store.clone(), state.job_container.clone(), - query.full.is_none(), - false, ); - let _ = crate::extractors::extract_all(state.upend.clone(), state.job_container.clone()); Ok(()) }); Ok(HttpResponse::Ok().finish()) @@ -743,68 +723,48 @@ pub async fn api_refresh( #[get("/api/store")] pub async fn store_info(state: web::Data) -> Result { - let connection = state.upend.connection().map_err(ErrorInternalServerError)?; - let files = web::block(move || connection.retrieve_all_files()) - .await - .map_err(ErrorInternalServerError)?; - let mut files_by_hash = HashMap::new(); - for file in &files { - if !files_by_hash.contains_key(&file.hash) { - files_by_hash.insert(&file.hash, vec![]); - } - files_by_hash.get_mut(&file.hash).unwrap().push(file); - } + // let connection = state.upend.connection().map_err(ErrorInternalServerError)?; + // let files = web::block(move || connection.retrieve_all_files()) + // .await + // .map_err(ErrorInternalServerError)?; + // let mut files_by_hash = HashMap::new(); + // for file in &files { + // if !files_by_hash.contains_key(&file.hash) { + // files_by_hash.insert(&file.hash, vec![]); + // } + // files_by_hash.get_mut(&file.hash).unwrap().push(file); + // } - for paths in files_by_hash.values_mut() { - paths.sort_unstable_by_key(|f| !f.valid); - } + // for paths in files_by_hash.values_mut() { + // paths.sort_unstable_by_key(|f| !f.valid); + // } - let mut blobs = files_by_hash - .iter() - .map(|(hash, files)| { - json!({ - "hash": hash, - "size": files[0].size, - "paths": files.iter().map(|f| json!({ - "added": f.added, - "valid": f.valid, - "path": f.path - })).collect::() - }) - }) - .collect::>(); + // let mut blobs = files_by_hash + // .iter() + // .map(|(hash, files)| { + // json!({ + // "hash": hash, + // "size": files[0].size, + // "paths": files.iter().map(|f| json!({ + // "added": f.added, + // "valid": f.valid, + // "path": f.path + // })).collect::() + // }) + // }) + // .collect::>(); - blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap()); - blobs.reverse(); + // blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap()); + // blobs.reverse(); - Ok(HttpResponse::Ok().json(json!({ - "totals": { - "count": files_by_hash.len(), - "size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::() - }, - "blobs": blobs - }))) -} - -#[get("/api/store/{hash}")] -pub async fn get_file( - state: web::Data, - hash: web::Path, -) -> Result { - let address = - Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?) - .map_err(ErrorInternalServerError)?; - - if let Address::Hash(hash) = address { - let connection = state.upend.connection().map_err(ErrorInternalServerError)?; - let response = web::block(move || connection.retrieve_file(&hash)) - .await - .map_err(ErrorInternalServerError)?; - - Ok(HttpResponse::Ok().json(response)) - } else { - Err(ErrorBadRequest("Address does not refer to a file.")) - } + // Ok(HttpResponse::Ok().json(json!({ + // "totals": { + // "count": files_by_hash.len(), + // "size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::() + // }, + // "blobs": blobs + // }))) + todo!(); } #[derive(Deserialize)] @@ -835,7 +795,7 @@ pub async fn get_jobs( pub async fn get_info(state: web::Data) -> Result { Ok(HttpResponse::Ok().json(json!({ "name": state.vault_name, - "location": &*state.upend.vault_path, + // "location": &*state.store.path, "version": crate::common::PKG_VERSION, "desktop": state.desktop_enabled }))) From 4a988acdad25541a6050e79d2e002b65195ef720 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Tue, 13 Sep 2022 19:16:06 +0200 Subject: [PATCH 2/6] chore: no default debug output in tests --- src/database/stores/fs/mod.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/database/stores/fs/mod.rs b/src/database/stores/fs/mod.rs index 1f16305..85284b9 100644 --- a/src/database/stores/fs/mod.rs +++ b/src/database/stores/fs/mod.rs @@ -680,18 +680,14 @@ mod test { use tempfile::TempDir; use std::sync::Once; - use tracing_subscriber::filter::{EnvFilter, LevelFilter}; + use tracing_subscriber::filter::{EnvFilter}; static INIT: Once = Once::new(); pub fn initialize() { INIT.call_once(|| { tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::builder() - .with_default_directive(LevelFilter::DEBUG.into()) - .from_env_lossy(), - ) + .with_env_filter(EnvFilter::builder().from_env_lossy()) .init(); }) } From 5152675bad797baf883b4c89edfb0c4a78f110f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Tue, 13 Sep 2022 19:16:22 +0200 Subject: [PATCH 3/6] refactor: use trait objects instead of FsStore directly also fix most clippy hints --- src/database/mod.rs | 6 +----- src/database/stores/fs/mod.rs | 11 +++++------ src/database/stores/mod.rs | 24 +++++++++++++----------- src/extractors/audio.rs | 9 ++++----- src/extractors/mod.rs | 22 ++++++++++------------ src/extractors/photo.rs | 6 ++++-- src/extractors/web.rs | 19 ++++++++++++------- src/main.rs | 8 +++++--- src/previews/mod.rs | 17 ++++++++--------- src/routes.rs | 23 ++++++++++------------- 10 files changed, 72 insertions(+), 73 deletions(-) diff --git a/src/database/mod.rs b/src/database/mod.rs index c388028..b433943 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -79,8 +79,7 @@ pub struct OpenResult { pub struct UpEndDatabase { pool: Arc, lock: Arc>, - vault_path: Arc, - db_path: Arc, + vault_path: Arc } pub const UPEND_SUBDIR: &str = ".upend"; @@ -123,7 +122,6 @@ impl UpEndDatabase { pool: Arc::new(pool), lock: Arc::new(RwLock::new(())), vault_path: Arc::new(dirpath.as_ref().canonicalize()?), - db_path: Arc::new(upend_path), }; let connection = db.connection().unwrap(); @@ -168,7 +166,6 @@ impl UpEndDatabase { Ok(UpEndConnection { pool: self.pool.clone(), lock: self.lock.clone(), - vault_path: self.vault_path.clone(), }) } } @@ -176,7 +173,6 @@ impl UpEndDatabase { pub struct UpEndConnection { pool: Arc, lock: Arc>, - vault_path: Arc, } impl UpEndConnection { diff --git a/src/database/stores/fs/mod.rs b/src/database/stores/fs/mod.rs index 85284b9..9ffa94d 100644 --- a/src/database/stores/fs/mod.rs +++ b/src/database/stores/fs/mod.rs @@ -608,11 +608,11 @@ impl UpStore for FsStore { .collect()) } - fn store>>( + fn store( &self, connection: UpEndConnection, blob: Blob, - name_hint: S, + name_hint: Option, ) -> Result { let file_path = blob.get_file_path(); let hash = file_path @@ -629,7 +629,7 @@ impl UpStore for FsStore { .map_err(|e| StoreError::Unknown(e.to_string()))?, ); - let final_name = if let Some(name_hint) = name_hint.into() { + let final_name = if let Some(name_hint) = name_hint { format!("{addr_str}_{name_hint}") } else { addr_str @@ -639,7 +639,6 @@ impl UpStore for FsStore { fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?; - self.add_file(&connection, &final_path, hash.clone()) .map_err(|e| StoreError::Unknown(e.to_string()))?; } @@ -647,9 +646,9 @@ impl UpStore for FsStore { Ok(hash) } - fn update>( + fn update( &self, - db: D, + db: &UpEndDatabase, mut job_container: JobContainer, ) -> Result, StoreError> { let job_result = job_container.add_job("REIMPORT", "Scaning vault directory..."); diff --git a/src/database/stores/mod.rs b/src/database/stores/mod.rs index 66ca8d4..44e36bd 100644 --- a/src/database/stores/mod.rs +++ b/src/database/stores/mod.rs @@ -1,22 +1,24 @@ -use std::{ - borrow::Borrow, - path::{Path, PathBuf}, -}; +use std::path::{Path, PathBuf}; -use super::{UpEndDatabase, UpEndConnection}; +use super::{UpEndConnection, UpEndDatabase}; use crate::util::{hash::Hash, jobs::JobContainer}; pub mod fs; #[derive(Debug, Clone)] pub enum StoreError { - NotFound, Unknown(String), } impl std::fmt::Display for StoreError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "STORE ERROR") + write!( + f, + "STORE ERROR: {}", + match self { + StoreError::Unknown(err) => err, + } + ) } } @@ -50,15 +52,15 @@ pub enum UpdatePathOutcome { pub trait UpStore { fn retrieve(&self, hash: &Hash) -> Result>; fn retrieve_all(&self) -> Result>; - fn store>>( + fn store( &self, connection: UpEndConnection, blob: Blob, - name_hint: S, + name_hint: Option, ) -> Result; - fn update>( + fn update( &self, - database: D, + database: &UpEndDatabase, job_container: JobContainer, ) -> Result>; } diff --git a/src/extractors/audio.rs b/src/extractors/audio.rs index 62e0b95..7480802 100644 --- a/src/extractors/audio.rs +++ b/src/extractors/audio.rs @@ -1,13 +1,12 @@ +use std::sync::Arc; + use super::Extractor; use crate::{ addressing::Address, database::{ constants, entry::{Entry, EntryValue}, - stores::{ - fs::{FsStore, FILE_MIME_KEY}, - UpStore, - }, + stores::{fs::FILE_MIME_KEY, UpStore}, UpEndConnection, }, util::jobs::{JobContainer, JobState}, @@ -21,7 +20,7 @@ impl Extractor for ID3Extractor { &self, address: &Address, connection: &UpEndConnection, - store: &FsStore, + store: Arc>, mut job_container: JobContainer, ) -> Result> { if let Address::Hash(hash) = address { diff --git a/src/extractors/mod.rs b/src/extractors/mod.rs index 72d93bf..a2b938b 100644 --- a/src/extractors/mod.rs +++ b/src/extractors/mod.rs @@ -1,7 +1,6 @@ use crate::{ addressing::Address, - database::stores::fs::FsStore, - database::{entry::Entry, UpEndConnection, UpEndDatabase}, + database::{entry::Entry, stores::UpStore, UpEndConnection, UpEndDatabase}, util::jobs::JobContainer, }; use anyhow::Result; @@ -26,7 +25,7 @@ pub trait Extractor { &self, address: &Address, connection: &UpEndConnection, - store: &FsStore, + store: Arc>, job_container: JobContainer, ) -> Result>; @@ -38,7 +37,7 @@ pub trait Extractor { &self, address: &Address, connection: &UpEndConnection, - store: &FsStore, + store: Arc>, job_container: JobContainer, ) -> Result { if self.is_needed(address, connection)? { @@ -57,15 +56,14 @@ pub trait Extractor { } } -pub fn extract_all, S: Borrow>( +pub fn extract_all>( db: D, - store: S, + store: Arc>, mut job_container: JobContainer, ) -> Result { info!("Extracting metadata for all addresses."); let db = db.borrow(); - let store = store.borrow(); let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?; let all_addresses = db.connection()?.get_all_addresses()?; @@ -77,7 +75,7 @@ pub fn extract_all, S: Borrow>( .par_iter() .map(|address| { let connection = db.connection()?; - let extract_result = extract(address, &connection, store, job_container.clone()); + let extract_result = extract(address, &connection, store.clone(), job_container.clone()); let mut cnt = count.write().unwrap(); *cnt += 1; @@ -104,7 +102,7 @@ pub fn extract_all, S: Borrow>( pub fn extract( address: &Address, connection: &UpEndConnection, - store: &FsStore, + store: Arc>, job_container: JobContainer, ) -> Result { let mut entry_count = 0; @@ -113,19 +111,19 @@ pub fn extract( #[cfg(feature = "extractors-web")] { entry_count += - web::WebExtractor.insert_info(address, connection, store, job_container.clone())?; + web::WebExtractor.insert_info(address, connection, store.clone(), job_container.clone())?; } #[cfg(feature = "extractors-audio")] { entry_count += - audio::ID3Extractor.insert_info(address, connection, store, job_container.clone())?; + audio::ID3Extractor.insert_info(address, connection, store.clone(), job_container.clone())?; } #[cfg(feature = "extractors-photo")] { entry_count += - photo::ExifExtractor.insert_info(address, connection, store, job_container)?; + photo::ExifExtractor.insert_info(address, connection, store.clone(), job_container)?; } trace!("Extracting metadata for {address:?} - got {entry_count} entries."); diff --git a/src/extractors/photo.rs b/src/extractors/photo.rs index d89b1a9..9b2244e 100644 --- a/src/extractors/photo.rs +++ b/src/extractors/photo.rs @@ -1,10 +1,12 @@ +use std::sync::Arc; + use super::Extractor; use crate::{ addressing::Address, database::{ constants, entry::{Entry, EntryValue}, - stores::{fs::{FILE_MIME_KEY, FsStore}, UpStore}, + stores::{fs::{FILE_MIME_KEY}, UpStore}, UpEndConnection, }, util::jobs::{JobContainer, JobState}, @@ -21,7 +23,7 @@ impl Extractor for ExifExtractor { &self, address: &Address, connection: &UpEndConnection, - store: &FsStore, + store: Arc>, mut job_container: JobContainer, ) -> Result> { if let Address::Hash(hash) = address { diff --git a/src/extractors/web.rs b/src/extractors/web.rs index 20208a1..26756c1 100644 --- a/src/extractors/web.rs +++ b/src/extractors/web.rs @@ -1,7 +1,9 @@ +use std::sync::Arc; + use super::Extractor; use crate::{ addressing::Address, - database::{entry::Entry, stores::fs::FsStore, UpEndConnection}, + database::{entry::Entry, stores::UpStore, UpEndConnection}, util::jobs::{JobContainer, JobState}, }; use anyhow::anyhow; @@ -15,10 +17,10 @@ impl Extractor for WebExtractor { fn get( &self, address: &Address, - _: &UpEndConnection, - _: &FsStore, + _connection: &UpEndConnection, + _store: Arc>, mut job_container: JobContainer, - ) -> anyhow::Result> { + ) -> Result> { if let Address::Url(url) = address { let mut job_handle = job_container.add_job(None, &format!("Getting info about {url:?}"))?; @@ -82,10 +84,12 @@ impl Extractor for WebExtractor { #[cfg(test)] mod test { - use crate::util::jobs::JobContainer; + + use crate::{database::stores::fs::FsStore, util::jobs::JobContainer}; use super::*; use anyhow::Result; + use std::sync::Arc; use tempfile::TempDir; #[test] @@ -93,13 +97,14 @@ mod test { let temp_dir = TempDir::new().unwrap(); let open_result = crate::database::UpEndDatabase::open(&temp_dir, None, true)?; let connection = open_result.db.connection()?; - let store = FsStore::from_path(&temp_dir)?; + let store = + Arc::new(Box::new(FsStore::from_path(&temp_dir)?) as Box); let job_container = JobContainer::new(); let address = Address::Url("https://upend.dev".into()); assert!(WebExtractor.is_needed(&address, &connection)?); - WebExtractor.insert_info(&address, &connection, &store, job_container)?; + WebExtractor.insert_info(&address, &connection, store, job_container)?; assert!(!WebExtractor.is_needed(&address, &connection)?); diff --git a/src/main.rs b/src/main.rs index 14cebeb..63182d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use actix_cors::Cors; use actix_web::{middleware, App, HttpServer}; use anyhow::Result; use clap::{App as ClapApp, Arg}; -use log::{debug, info, warn}; +use log::{info, warn}; use rand::{thread_rng, Rng}; use std::sync::Arc; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; @@ -128,7 +128,9 @@ fn main() -> Result<()> { .expect("failed to open database!"); let upend = Arc::new(open_result.db); - let store = Arc::new(FsStore::from_path(vault_path.clone()).unwrap()); + let store = + Arc::new(Box::new(FsStore::from_path(vault_path.clone()).unwrap()) + as Box); let ui_path = get_static_dir("webui"); if ui_path.is_err() { @@ -271,7 +273,7 @@ fn main() -> Result<()> { info!("Running initial update..."); // let new = open_result.new; block_background::<_, _, anyhow::Error>(move || { - state.store.update(upend.clone(), job_container.clone()); + let _ = state.store.update(&upend, job_container.clone()); let _ = extractors::extract_all(upend, state.store, job_container); Ok(()) }) diff --git a/src/previews/mod.rs b/src/previews/mod.rs index 7364c69..4a4bbdb 100644 --- a/src/previews/mod.rs +++ b/src/previews/mod.rs @@ -1,8 +1,7 @@ -use crate::database::stores::fs::FsStore; use crate::database::stores::UpStore; +use crate::util::hash::b58_encode; use crate::util::hash::Hash; use crate::util::jobs::{JobContainer, JobState}; -use crate::{database::UpEndDatabase, util::hash::b58_encode}; use anyhow::{anyhow, Result}; use log::{debug, trace}; @@ -29,14 +28,14 @@ pub trait Previewable { } pub struct PreviewStore { path: PathBuf, - store: Arc, + store: Arc>, locks: Mutex>>>, } #[cfg(feature = "previews")] impl PreviewStore { - pub fn new>(path: P, store: Arc) -> Self { + pub fn new>(path: P, store: Arc>) -> Self { PreviewStore { path: PathBuf::from(path.as_ref()), store, @@ -86,18 +85,18 @@ impl PreviewStore { let mime_type: Option = if mime_type.is_some() { mime_type } else { - tree_magic_mini::from_filepath(&file_path).map(|m| m.into()) + tree_magic_mini::from_filepath(file_path).map(|m| m.into()) }; let preview = match mime_type { - Some(tm) if tm.starts_with("text") => TextPath(&file_path).get_thumbnail(), + Some(tm) if tm.starts_with("text") => TextPath(file_path).get_thumbnail(), Some(tm) if tm.starts_with("video") || tm == "application/x-matroska" => { - VideoPath(&file_path).get_thumbnail() + VideoPath(file_path).get_thumbnail() } Some(tm) if tm.starts_with("audio") || tm == "application/x-riff" => { - AudioPath(&file_path).get_thumbnail() + AudioPath(file_path).get_thumbnail() } - Some(tm) if tm.starts_with("image") => ImagePath(&file_path).get_thumbnail(), + Some(tm) if tm.starts_with("image") => ImagePath(file_path).get_thumbnail(), Some(unknown) => Err(anyhow!("No capability for {:?} thumbnails.", unknown)), _ => Err(anyhow!("Unknown file type, or file doesn't exist.")), }; diff --git a/src/routes.rs b/src/routes.rs index a601696..f1eb331 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -3,13 +3,12 @@ use crate::database::constants::{ADDED_ATTR, LABEL_ATTR}; use crate::database::entry::{Entry, EntryValue, InvariantEntry}; use crate::database::hierarchies::{list_roots, resolve_path, UHierPath}; use crate::database::lang::Query; -use crate::database::stores::fs::FsStore; use crate::database::stores::{Blob, UpStore}; use crate::database::UpEndDatabase; use crate::extractors::{self}; use crate::previews::PreviewStore; use crate::util::exec::block_background; -use crate::util::hash::{b58_decode, b58_encode, Hashable}; +use crate::util::hash::{b58_decode, b58_encode}; use crate::util::jobs::JobContainer; use actix_files::NamedFile; use actix_multipart::Multipart; @@ -30,7 +29,6 @@ use serde_json::json; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::io::Write; -use std::path::PathBuf; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tempfile::NamedTempFile; @@ -42,7 +40,7 @@ use is_executable::IsExecutable; #[derive(Clone)] pub struct State { pub upend: Arc, - pub store: Arc, + pub store: Arc>, pub vault_name: Option, pub job_container: JobContainer, pub preview_store: Option>, @@ -130,7 +128,6 @@ pub async fn get_raw( if let Address::Hash(hash) = address { let hash = Arc::new(hash); - let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _hash = hash.clone(); let _store = state.store.clone(); let blobs = web::block(move || _store.retrieve(_hash.as_ref())) @@ -441,7 +438,7 @@ pub async fn put_object( let _store = state.store.clone(); block_background::<_, _, anyhow::Error>(move || { let extract_result = - extractors::extract(&_address, &connection, &_store, _job_container); + extractors::extract(&_address, &connection, _store, _job_container); if let Ok(entry_count) = extract_result { debug!("Added {entry_count} extracted entries for {_address:?}"); } else { @@ -519,7 +516,7 @@ pub async fn put_object( let connection = state.upend.connection().map_err(ErrorInternalServerError)?; block_background::<_, _, anyhow::Error>(move || { let extract_result = - extractors::extract(&_address, &connection, &_store, _job_container); + extractors::extract(&_address, &connection, _store, _job_container); if let Ok(entry_count) = extract_result { debug!("Added {entry_count} extracted entries for {_address:?}"); } else { @@ -694,23 +691,23 @@ pub async fn list_hier_roots(state: web::Data) -> Result, -} +// #[derive(Deserialize)] +// pub struct RescanRequest { +// full: Option, +// } #[post("/api/refresh")] pub async fn api_refresh( req: HttpRequest, state: web::Data, - web::Query(query): web::Query, + // web::Query(query): web::Query, ) -> Result { check_auth(&req, &state)?; block_background::<_, _, anyhow::Error>(move || { let _ = state .store - .update(state.upend.clone(), state.job_container.clone()); + .update(&state.upend, state.job_container.clone()); let _ = crate::extractors::extract_all( state.upend.clone(), state.store.clone(), From e17431bb3f90885ceb3ffddef75db4cdb7a29382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Tue, 13 Sep 2022 19:43:30 +0200 Subject: [PATCH 4/6] fix: reenable locks --- src/database/stores/fs/mod.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/database/stores/fs/mod.rs b/src/database/stores/fs/mod.rs index 9ffa94d..e5ebe08 100644 --- a/src/database/stores/fs/mod.rs +++ b/src/database/stores/fs/mod.rs @@ -64,6 +64,7 @@ impl Drop for PragmaSynchronousGuard<'_> { pub struct FsStore { path: PathBuf, manager: ConnectionManager, + lock: Arc>, } impl FsStore { @@ -97,7 +98,13 @@ impl FsStore { "#, )?; - Ok(FsStore { path, manager }) + let lock = Arc::new(RwLock::new(())); + + Ok(FsStore { + path, + manager, + lock, + }) } fn rescan_vault>( @@ -490,7 +497,7 @@ impl FsStore { Address::Hash(Hash((&file.hash).clone())) ); - // let _lock = self.lock.write().unwrap(); + let _lock = self.lock.write().unwrap(); let conn = self.manager.connect()?; diesel::insert_into(files::table) @@ -509,7 +516,7 @@ impl FsStore { fn retrieve_file(&self, obj_hash: &Hash) -> Result> { use self::db::files::dsl::*; - // let _lock = self.lock.read().unwrap(); + let _lock = self.lock.read().unwrap(); let conn = self.manager.connect()?; let matches = files @@ -536,7 +543,7 @@ impl FsStore { fn retrieve_all_files(&self) -> Result> { use self::db::files::dsl::*; - // let _lock = self.lock.read().unwrap(); + let _lock = self.lock.read().unwrap(); let conn = self.manager.connect()?; let matches = files.load::(&conn)?; @@ -547,7 +554,7 @@ impl FsStore { use self::db::files::dsl::*; debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); - // let _lock = self.lock.write().unwrap(); + let _lock = self.lock.write().unwrap(); let conn = self.manager.connect()?; Ok(diesel::update(files.filter(id.eq(file_id))) @@ -559,7 +566,7 @@ impl FsStore { use self::db::files::dsl::*; debug!("Setting file ID {} to valid = {}", file_id, is_valid); - // let _lock = self.lock.write().unwrap(); + let _lock = self.lock.write().unwrap(); let conn = self.manager.connect()?; Ok(diesel::update(files.filter(id.eq(file_id))) @@ -679,7 +686,7 @@ mod test { use tempfile::TempDir; use std::sync::Once; - use tracing_subscriber::filter::{EnvFilter}; + use tracing_subscriber::filter::EnvFilter; static INIT: Once = Once::new(); From 0b0c6f2ec38d9c8a0931b37c0a71e43bace5cb1d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Thu, 15 Sep 2022 20:20:23 +0200 Subject: [PATCH 5/6] fix: reenable initial quick vault scan --- src/database/stores/fs/mod.rs | 5 +++-- src/database/stores/mod.rs | 1 + src/main.rs | 2 +- src/routes.rs | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/database/stores/fs/mod.rs b/src/database/stores/fs/mod.rs index e5ebe08..93dd2d7 100644 --- a/src/database/stores/fs/mod.rs +++ b/src/database/stores/fs/mod.rs @@ -657,12 +657,13 @@ impl UpStore for FsStore { &self, db: &UpEndDatabase, mut job_container: JobContainer, + initial: bool, ) -> Result, StoreError> { let job_result = job_container.add_job("REIMPORT", "Scaning vault directory..."); match job_result { Ok(job_handle) => { - let result = self.rescan_vault(db, job_handle, true, false); + let result = self.rescan_vault(db, job_handle, !initial, initial); if let Err(err) = &result { error!("Update did not succeed! {:?}", err); @@ -720,7 +721,7 @@ mod test { let job_container = JobContainer::new(); // Store scan - let rescan_result = store.update(&open_result.db, job_container.clone()); + let rescan_result = store.update(&open_result.db, job_container.clone(), false); assert!(rescan_result.is_ok()); } diff --git a/src/database/stores/mod.rs b/src/database/stores/mod.rs index 44e36bd..9e65b81 100644 --- a/src/database/stores/mod.rs +++ b/src/database/stores/mod.rs @@ -62,5 +62,6 @@ pub trait UpStore { &self, database: &UpEndDatabase, job_container: JobContainer, + initial: bool, ) -> Result>; } diff --git a/src/main.rs b/src/main.rs index 63182d1..72a599a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -273,7 +273,7 @@ fn main() -> Result<()> { info!("Running initial update..."); // let new = open_result.new; block_background::<_, _, anyhow::Error>(move || { - let _ = state.store.update(&upend, job_container.clone()); + let _ = state.store.update(&upend, job_container.clone(), true); let _ = extractors::extract_all(upend, state.store, job_container); Ok(()) }) diff --git a/src/routes.rs b/src/routes.rs index f1eb331..77bb4e7 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -707,7 +707,7 @@ pub async fn api_refresh( block_background::<_, _, anyhow::Error>(move || { let _ = state .store - .update(&state.upend, state.job_container.clone()); + .update(&state.upend, state.job_container.clone(), false); let _ = crate::extractors::extract_all( state.upend.clone(), state.store.clone(), From 7f519d9de8bbc866ba524a74b373463367b3e293 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Thu, 15 Sep 2022 20:27:06 +0200 Subject: [PATCH 6/6] perf: implement speed-ups for vault db have a pool; WAL journal mode; PRAGMA SYNCHRONOUS --- src/database/stores/fs/mod.rs | 51 +++++++++++++++++------------------ 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/src/database/stores/fs/mod.rs b/src/database/stores/fs/mod.rs index 93dd2d7..05e8873 100644 --- a/src/database/stores/fs/mod.rs +++ b/src/database/stores/fs/mod.rs @@ -15,7 +15,7 @@ use crate::util::hash::{b58_encode, Hash, Hashable}; use crate::util::jobs::{JobContainer, JobHandle}; use anyhow::{anyhow, Error, Result}; use chrono::prelude::*; -use diesel::r2d2::{ConnectionManager, ManageConnection}; +use diesel::r2d2::{self, ConnectionManager}; use diesel::ExpressionMethods; use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection}; use log::{debug, error, info, warn}; @@ -46,7 +46,7 @@ lazy_static! { static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); } -struct PragmaSynchronousGuard<'a>(&'a UpEndConnection); +struct PragmaSynchronousGuard<'a>(&'a SqliteConnection); impl Drop for PragmaSynchronousGuard<'_> { fn drop(&mut self) { @@ -63,7 +63,7 @@ impl Drop for PragmaSynchronousGuard<'_> { pub struct FsStore { path: PathBuf, - manager: ConnectionManager, + pool: r2d2::Pool>, lock: Arc>, } @@ -76,7 +76,8 @@ impl FsStore { .to_str() .unwrap(), ); - let connection = manager.connect()?; + let pool = r2d2::Pool::builder().build(manager)?; + let connection = pool.get()?; // while diesel doesn't support multiple embedded migrations... connection.execute( @@ -95,16 +96,13 @@ impl FsStore { CREATE INDEX IF NOT EXISTS files_hash ON files (hash); CREATE INDEX IF NOT EXISTS files_valid ON files (valid); + PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE); "#, )?; let lock = Arc::new(RwLock::new(())); - Ok(FsStore { - path, - manager, - lock, - }) + Ok(FsStore { path, pool, lock }) } fn rescan_vault>( @@ -118,16 +116,17 @@ impl FsStore { info!("Vault rescan started."); let db = db.borrow(); - let connection = db.connection()?; + let upconnection = db.connection()?; + let connection = self.pool.get()?; // Initialize types, etc... debug!("Initializing DB types."); - connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; - upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?; + upconnection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; + upend_insert_addr!(upconnection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; + upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?; + upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?; + upend_insert_val!(upconnection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?; + upend_insert_val!(upconnection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?; // Disable syncing in SQLite for the duration of the import let mut _guard: Option = None; @@ -161,7 +160,7 @@ impl FsStore { .into_par_iter() .map(|path| { let result = self.process_directory_entry( - db.connection().unwrap(), + db, &resolve_cache, path.clone(), &existing_files, @@ -191,7 +190,7 @@ impl FsStore { let existing_files = existing_files.read().unwrap(); let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| { - let trans_result = connection.transaction::<_, Error, _>(|| { + let trans_result = upconnection.transaction::<_, Error, _>(|| { self.file_set_valid(file.id, false)?; // remove_object(&connection, )? Ok(()) @@ -256,9 +255,9 @@ impl FsStore { Ok(all_outcomes) } - fn process_directory_entry( + fn process_directory_entry>( &self, - connection: UpEndConnection, + db: D, resolve_cache: &Arc>, path: PathBuf, existing_files: &Arc>>, @@ -345,7 +344,7 @@ impl FsStore { let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string()); self.insert_file_with_metadata( - &connection, + &db.borrow().connection()?, &normalized_path, file_hash.unwrap(), size, @@ -498,7 +497,7 @@ impl FsStore { ); let _lock = self.lock.write().unwrap(); - let conn = self.manager.connect()?; + let conn = self.pool.get()?; diesel::insert_into(files::table) .values(&file) @@ -517,7 +516,7 @@ impl FsStore { use self::db::files::dsl::*; let _lock = self.lock.read().unwrap(); - let conn = self.manager.connect()?; + let conn = self.pool.get()?; let matches = files .filter(valid.eq(true)) @@ -544,7 +543,7 @@ impl FsStore { use self::db::files::dsl::*; let _lock = self.lock.read().unwrap(); - let conn = self.manager.connect()?; + let conn = self.pool.get()?; let matches = files.load::(&conn)?; Ok(matches) @@ -555,7 +554,7 @@ impl FsStore { debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); let _lock = self.lock.write().unwrap(); - let conn = self.manager.connect()?; + let conn = self.pool.get()?; Ok(diesel::update(files.filter(id.eq(file_id))) .set(mtime.eq(m_time)) @@ -567,7 +566,7 @@ impl FsStore { debug!("Setting file ID {} to valid = {}", file_id, is_valid); let _lock = self.lock.write().unwrap(); - let conn = self.manager.connect()?; + let conn = self.pool.get()?; Ok(diesel::update(files.filter(id.eq(file_id))) .set(valid.eq(is_valid))