From a7b0a5c00ae2ee5fbeaa796ab06b37ab610c88c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Fri, 19 Aug 2022 14:04:18 +0200 Subject: [PATCH] feat!: multiple vaults incomplete, but passes tests --- .../2020-08-04-134952_file_hashes/down.sql | 2 - .../2020-08-04-134952_file_hashes/up.sql | 5 - migrations/fsvault/00_initial/down.sql | 1 + migrations/fsvault/00_initial/up.sql | 13 + .../down.sql | 1 - .../up.sql | 14 - src/database/inner/models.rs | 40 +- src/database/inner/schema.rs | 15 +- src/database/mod.rs | 97 +-- src/database/stores/fs/db.rs | 51 ++ src/database/stores/fs/mod.rs | 803 ++++++++++++++++++ src/database/stores/mod.rs | 64 ++ src/extractors/audio.rs | 13 +- src/extractors/mod.rs | 20 +- src/extractors/photo.rs | 10 +- src/extractors/web.rs | 6 +- src/filesystem.rs | 573 ------------- src/main.rs | 43 +- src/previews/mod.rs | 24 +- src/routes.rs | 175 ++-- 20 files changed, 1073 insertions(+), 897 deletions(-) delete mode 100644 migrations/config/2020-08-04-134952_file_hashes/down.sql delete mode 100644 migrations/config/2020-08-04-134952_file_hashes/up.sql create mode 100644 migrations/fsvault/00_initial/down.sql create mode 100644 migrations/fsvault/00_initial/up.sql rename migrations/upend/{00_initial_structure => 00_initial}/down.sql (64%) rename migrations/upend/{00_initial_structure => 00_initial}/up.sql (60%) create mode 100644 src/database/stores/fs/db.rs create mode 100644 src/database/stores/fs/mod.rs create mode 100644 src/database/stores/mod.rs delete mode 100644 src/filesystem.rs diff --git a/migrations/config/2020-08-04-134952_file_hashes/down.sql b/migrations/config/2020-08-04-134952_file_hashes/down.sql deleted file mode 100644 index d05f373..0000000 --- a/migrations/config/2020-08-04-134952_file_hashes/down.sql +++ /dev/null @@ -1,2 +0,0 @@ --- This file should undo anything in `up.sql` -DROP TABLE vaults; \ No newline at end of file diff --git a/migrations/config/2020-08-04-134952_file_hashes/up.sql b/migrations/config/2020-08-04-134952_file_hashes/up.sql deleted file mode 100644 index a9b6c46..0000000 --- a/migrations/config/2020-08-04-134952_file_hashes/up.sql +++ /dev/null @@ -1,5 +0,0 @@ --- Your SQL goes here -CREATE TABLE vaults ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - path VARCHAR NOT NULL -) \ No newline at end of file diff --git a/migrations/fsvault/00_initial/down.sql b/migrations/fsvault/00_initial/down.sql new file mode 100644 index 0000000..2a94593 --- /dev/null +++ b/migrations/fsvault/00_initial/down.sql @@ -0,0 +1 @@ +DROP TABLE files; \ No newline at end of file diff --git a/migrations/fsvault/00_initial/up.sql b/migrations/fsvault/00_initial/up.sql new file mode 100644 index 0000000..773167d --- /dev/null +++ b/migrations/fsvault/00_initial/up.sql @@ -0,0 +1,13 @@ +CREATE TABLE files +( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + hash BLOB NOT NULL, + path VARCHAR NOT NULL, + valid BOOLEAN NOT NULL DEFAULT TRUE, + added DATETIME NOT NULL, + size BIGINT NOT NULL, + mtime DATETIME NULL +); + +CREATE INDEX files_hash ON files (hash); +CREATE INDEX files_valid ON files (valid); \ No newline at end of file diff --git a/migrations/upend/00_initial_structure/down.sql b/migrations/upend/00_initial/down.sql similarity index 64% rename from migrations/upend/00_initial_structure/down.sql rename to migrations/upend/00_initial/down.sql index 0f8e6bf..7ef2077 100644 --- a/migrations/upend/00_initial_structure/down.sql +++ b/migrations/upend/00_initial/down.sql @@ -1,4 +1,3 @@ -- This file should undo anything in `up.sql` DROP TABLE meta; -DROP TABLE files; DROP TABLE data; \ No newline at end of file diff --git a/migrations/upend/00_initial_structure/up.sql b/migrations/upend/00_initial/up.sql similarity index 60% rename from migrations/upend/00_initial_structure/up.sql rename to migrations/upend/00_initial/up.sql index db897cf..b8f60e6 100644 --- a/migrations/upend/00_initial_structure/up.sql +++ b/migrations/upend/00_initial/up.sql @@ -9,20 +9,6 @@ CREATE TABLE meta INSERT INTO meta (key, value) VALUES ('VERSION', '0'); -CREATE TABLE files -( - id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, - hash BLOB NOT NULL, - path VARCHAR NOT NULL, - valid BOOLEAN NOT NULL DEFAULT TRUE, - added DATETIME NOT NULL, - size BIGINT NOT NULL, - mtime DATETIME NULL -); - -CREATE INDEX files_hash ON files (hash); -CREATE INDEX files_valid ON files (valid); - CREATE TABLE data ( identity BLOB PRIMARY KEY NOT NULL, diff --git a/src/database/inner/models.rs b/src/database/inner/models.rs index 1da0983..aea3382 100644 --- a/src/database/inner/models.rs +++ b/src/database/inner/models.rs @@ -1,43 +1,5 @@ -use std::path::PathBuf; - -use chrono::NaiveDateTime; use serde::Serialize; - -use super::schema::{data, files, meta}; -use crate::util::hash::Hash; - -#[derive(Queryable, Serialize, Clone, Debug)] -pub struct File { - pub id: i32, - pub hash: Hash, - pub path: String, - pub valid: bool, - pub added: NaiveDateTime, - pub size: i64, - pub mtime: Option, -} - -// todo - remove, try_from the actual model, impl queryable... -#[derive(Serialize, Clone, Debug)] -pub struct OutFile { - pub id: i32, - pub hash: Hash, - pub path: PathBuf, - pub valid: bool, - pub added: NaiveDateTime, - pub size: i64, - pub mtime: Option, -} - -#[derive(Insertable, Debug)] -#[table_name = "files"] -pub struct NewFile { - pub hash: Vec, - pub path: String, - pub added: NaiveDateTime, - pub size: i64, - pub mtime: Option, -} +use super::schema::{data, meta}; #[derive(Queryable, Insertable, Serialize, Debug)] #[table_name = "data"] diff --git a/src/database/inner/schema.rs b/src/database/inner/schema.rs index 7221c31..ba50579 100644 --- a/src/database/inner/schema.rs +++ b/src/database/inner/schema.rs @@ -9,19 +9,6 @@ table! { immutable -> Bool, } } - -table! { - files (id) { - id -> Integer, - hash -> Binary, - path -> Text, - valid -> Bool, - added -> Timestamp, - size -> BigInt, - mtime -> Nullable, - } -} - table! { meta (id) { id -> Integer, @@ -30,4 +17,4 @@ table! { } } -allow_tables_to_appear_in_same_query!(data, files, meta,); +allow_tables_to_appear_in_same_query!(data, meta,); diff --git a/src/database/mod.rs b/src/database/mod.rs index 34f948b..994b3b0 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -3,6 +3,8 @@ #[macro_use] mod macros; +pub mod stores; + pub mod constants; pub mod engine; pub mod entry; @@ -22,13 +24,12 @@ use crate::database::lang::Query; use crate::util::hash::Hash; use crate::util::LoggerSink; use anyhow::{anyhow, Result}; -use chrono::NaiveDateTime; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, PooledConnection}; use diesel::result::{DatabaseErrorKind, Error}; use diesel::sqlite::SqliteConnection; use hierarchies::initialize_hier; -use std::convert::{TryFrom, TryInto}; +use std::convert::TryFrom; use std::fs; use std::path::{Path, PathBuf}; use std::sync::{Arc, RwLock}; @@ -78,8 +79,8 @@ pub struct OpenResult { pub struct UpEndDatabase { pool: DbPool, lock: Arc>, - pub vault_path: Arc, - pub db_path: Arc, + vault_path: Arc, + db_path: Arc, } pub const UPEND_SUBDIR: &str = ".upend"; @@ -214,94 +215,6 @@ impl UpEndConnection { .map(|mv| mv.value.clone()) } - pub fn insert_file(&self, file: models::NewFile) -> Result { - use crate::database::inner::schema::files; - - debug!( - "Inserting {} ({})...", - &file.path, - Address::Hash(Hash((&file.hash).clone())) - ); - - let _lock = self.lock.write().unwrap(); - - diesel::insert_into(files::table) - .values(&file) - .execute(&self.conn)?; - - Ok(files::dsl::files - .filter(files::dsl::valid.eq(true)) - .filter(files::dsl::hash.eq(file.hash)) - .count() - .first::(&self.conn)? - .try_into() - .unwrap()) - } - - pub fn retrieve_file(&self, obj_hash: &Hash) -> Result> { - use crate::database::inner::schema::files::dsl::*; - - let _lock = self.lock.read().unwrap(); - - let matches = files - .filter(valid.eq(true)) - .filter(hash.eq(&obj_hash.0)) - .load::(&self.conn)?; - - let matches = matches - .into_iter() - .map(|f| models::OutFile { - id: f.id, - hash: f.hash, - path: self.vault_path.join(PathBuf::from(f.path)), - valid: f.valid, - added: f.added, - size: f.size, - mtime: f.mtime, - }) - .collect(); - - Ok(matches) - } - - pub fn retrieve_all_files(&self) -> Result> { - use crate::database::inner::schema::files::dsl::*; - let _lock = self.lock.read().unwrap(); - let matches = files.load::(&self.conn)?; - Ok(matches) - } - - pub fn file_update_mtime(&self, file_id: i32, m_time: Option) -> Result { - use crate::database::inner::schema::files::dsl::*; - - debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); - - let _lock = self.lock.write().unwrap(); - - Ok(diesel::update(files.filter(id.eq(file_id))) - .set(mtime.eq(m_time)) - .execute(&self.conn)?) - } - - pub fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result { - use crate::database::inner::schema::files::dsl::*; - - debug!("Setting file ID {} to valid = {}", file_id, is_valid); - - let _lock = self.lock.write().unwrap(); - - Ok(diesel::update(files.filter(id.eq(file_id))) - .set(valid.eq(is_valid)) - .execute(&self.conn)?) - } - - pub fn normalize_path(&self, path: &Path) -> Result { - Ok(path - .canonicalize()? - .strip_prefix(self.vault_path.as_path())? - .to_path_buf()) - } - pub fn retrieve_entry(&self, hash: &Hash) -> Result> { use crate::database::inner::schema::data::dsl::*; diff --git a/src/database/stores/fs/db.rs b/src/database/stores/fs/db.rs new file mode 100644 index 0000000..1447092 --- /dev/null +++ b/src/database/stores/fs/db.rs @@ -0,0 +1,51 @@ +use std::path::PathBuf; + +use crate::util::hash::Hash; +use chrono::NaiveDateTime; +use diesel::Queryable; +use serde::Serialize; + +table! { + files (id) { + id -> Integer, + hash -> Binary, + path -> Text, + valid -> Bool, + added -> Timestamp, + size -> BigInt, + mtime -> Nullable, + } +} + +#[derive(Queryable, Serialize, Clone, Debug)] +pub struct File { + pub id: i32, + pub hash: Hash, + pub path: String, + pub valid: bool, + pub added: NaiveDateTime, + pub size: i64, + pub mtime: Option, +} + +// todo - remove, try_from the actual model, impl queryable... +#[derive(Serialize, Clone, Debug)] +pub struct OutFile { + pub id: i32, + pub hash: Hash, + pub path: PathBuf, + pub valid: bool, + pub added: NaiveDateTime, + pub size: i64, + pub mtime: Option, +} + +#[derive(Insertable, Debug)] +#[table_name = "files"] +pub struct NewFile { + pub hash: Vec, + pub path: String, + pub added: NaiveDateTime, + pub size: i64, + pub mtime: Option, +} diff --git a/src/database/stores/fs/mod.rs b/src/database/stores/fs/mod.rs new file mode 100644 index 0000000..1f16305 --- /dev/null +++ b/src/database/stores/fs/mod.rs @@ -0,0 +1,803 @@ +use self::db::files; + +use super::{Blob, StoreError, UpStore, UpdatePathOutcome}; +use crate::addressing::Address; +use crate::database::constants::{ + ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, + TYPE_HAS_ATTR, +}; +use crate::database::entry::{Entry, InvariantEntry}; +use crate::database::hierarchies::{ + resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode, +}; +use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR}; +use crate::util::hash::{b58_encode, Hash, Hashable}; +use crate::util::jobs::{JobContainer, JobHandle}; +use anyhow::{anyhow, Error, Result}; +use chrono::prelude::*; +use diesel::r2d2::{ConnectionManager, ManageConnection}; +use diesel::ExpressionMethods; +use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection}; +use log::{debug, error, info, warn}; +use lru::LruCache; +use rayon::prelude::*; +use std::borrow::Borrow; +use std::convert::{TryFrom, TryInto}; +use std::path::PathBuf; +use std::path::{Component, Path}; +use std::sync::{Arc, Mutex, RwLock}; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use std::{fs, iter}; +use walkdir::WalkDir; + +mod db; + +const BLOB_TYPE: &str = "BLOB"; +const ALIAS_KEY: &str = "ALIAS"; +pub const FILE_MIME_KEY: &str = "FILE_MIME"; +const FILE_MTIME_KEY: &str = "FILE_MTIME"; +const FILE_SIZE_KEY: &str = "FILE_SIZE"; + +lazy_static! { + static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry { + attribute: String::from(TYPE_BASE_ATTR), + value: BLOB_TYPE.into(), + }; + static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); +} + +struct PragmaSynchronousGuard<'a>(&'a UpEndConnection); + +impl Drop for PragmaSynchronousGuard<'_> { + fn drop(&mut self) { + debug!("Re-enabling synchronous mode."); + let res = self.0.execute("PRAGMA synchronous = NORMAL;"); + if let Err(err) = res { + error!( + "Error setting synchronous mode back to NORMAL! Data loss possible! {}", + err + ); + } + } +} + +pub struct FsStore { + path: PathBuf, + manager: ConnectionManager, +} + +impl FsStore { + pub fn from_path>(path: P) -> Result { + let path = path.as_ref().to_path_buf().canonicalize()?; + let manager = ConnectionManager::::new( + path.join(UPEND_SUBDIR) + .join("upend_vault.sqlite3") + .to_str() + .unwrap(), + ); + let connection = manager.connect()?; + + // while diesel doesn't support multiple embedded migrations... + connection.execute( + r#" + CREATE TABLE IF NOT EXISTS files + ( + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + hash BLOB NOT NULL, + path VARCHAR NOT NULL, + valid BOOLEAN NOT NULL DEFAULT TRUE, + added DATETIME NOT NULL, + size BIGINT NOT NULL, + mtime DATETIME NULL + ); + + CREATE INDEX IF NOT EXISTS files_hash ON files (hash); + CREATE INDEX IF NOT EXISTS files_valid ON files (valid); + + "#, + )?; + + Ok(FsStore { path, manager }) + } + + fn rescan_vault>( + &self, + db: D, + job_handle: JobHandle, + quick_check: bool, + disable_synchronous: bool, + ) -> Result> { + let start = Instant::now(); + info!("Vault rescan started."); + + let db = db.borrow(); + let connection = db.connection()?; + + // Initialize types, etc... + debug!("Initializing DB types."); + connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; + upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; + upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?; + upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?; + upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?; + upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?; + + // Disable syncing in SQLite for the duration of the import + let mut _guard: Option = None; + if disable_synchronous { + debug!("Disabling SQLite synchronous mode"); + connection.execute("PRAGMA synchronous = OFF;")?; + _guard = Some(PragmaSynchronousGuard(&connection)); + } + + // Walk through the vault, find all paths + debug!("Traversing vault directory"); + let absolute_dir_path = fs::canonicalize(&*db.vault_path)?; + let path_entries: Vec = WalkDir::new(&*db.vault_path) + .follow_links(true) + .into_iter() + .filter_map(|e| e.ok()) + .filter_map(|e| fs::canonicalize(e.into_path()).ok()) + .filter(|e| e.is_file()) + .filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR))) + .collect(); + + // Prepare for processing + let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?)); + + // Actual processing + let count = RwLock::new(0_usize); + let resolve_cache = Arc::new(Mutex::new(LruCache::new(256))); + let total = path_entries.len() as f32; + let shared_job_handle = Arc::new(Mutex::new(job_handle)); + let path_outcomes: Vec = path_entries + .into_par_iter() + .map(|path| { + let result = self.process_directory_entry( + db.connection().unwrap(), + &resolve_cache, + path.clone(), + &existing_files, + quick_check, + ); + + let mut cnt = count.write().unwrap(); + *cnt += 1; + + let _ = shared_job_handle + .lock() + .unwrap() + .update_progress(*cnt as f32 / total * 100.0); + + match result { + Ok(result) => result, + Err(error) => { + error!("Failed to update {:?} ({})", path, error); + UpdatePathOutcome::Failed(path, StoreError::Unknown(error.to_string())) + } + } + }) + .collect(); + + debug!("Processing done, cleaning up..."); + + let existing_files = existing_files.read().unwrap(); + + let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| { + let trans_result = connection.transaction::<_, Error, _>(|| { + self.file_set_valid(file.id, false)?; + // remove_object(&connection, )? + Ok(()) + }); + + match trans_result { + Ok(_) => { + info!("Removed: {:?}", file.path); + UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())) + } + Err(error) => UpdatePathOutcome::Failed( + PathBuf::from(file.path.clone()), + StoreError::Unknown(error.to_string()), + ), + } + }); + + // Re-enable SQLite syncing + drop(_guard); + + // Reporting + let all_outcomes = path_outcomes + .into_iter() + .chain(cleanup_results) + .collect::>(); + + let mut failed: Vec<(&PathBuf, &StoreError)> = vec![]; + let mut created = 0; + let mut unchanged = 0; + let mut deleted = 0; + + for outcome in &all_outcomes { + match outcome { + UpdatePathOutcome::Added(_) => created += 1, + UpdatePathOutcome::Unchanged(_) => unchanged += 1, + UpdatePathOutcome::Removed(_) => deleted += 1, + UpdatePathOutcome::Failed(path, err) => failed.push((path, err)), + } + } + + if !failed.is_empty() { + warn!( + "{} path updates failed! ({})", + failed.len(), + failed + .iter() + .map(|(path, error)| format!("{:?}: {}", path, error)) + .collect::>() + .join(", ") + ) + } + + info!( + "Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.", + db.vault_path, + created, + deleted, + unchanged, + start.elapsed().as_secs() + ); + + Ok(all_outcomes) + } + + fn process_directory_entry( + &self, + connection: UpEndConnection, + resolve_cache: &Arc>, + path: PathBuf, + existing_files: &Arc>>, + quick_check: bool, + ) -> Result { + debug!("Processing: {:?}", path); + + // Prepare the data + let existing_files = Arc::clone(existing_files); + + let normalized_path = self.normalize_path(&path)?; + let normalized_path_str = normalized_path + .to_str() + .ok_or(anyhow!("Path not valid unicode!"))?; + + let mut file_hash: Option = None; + + // Get size & mtime for quick comparison + let metadata = fs::metadata(&path)?; + let size = metadata.len() as i64; + if size < 0 { + panic!("File {} too large?!", path.display()); + } + let mtime = metadata + .modified() + .map(|t| { + NaiveDateTime::from_timestamp( + t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, + 0, + ) + }) + .ok(); + + // Check if the path entry for this file already exists in database + let existing_files_read = existing_files.read().unwrap(); + let maybe_existing_file = existing_files_read + .iter() + .find(|file| file.path == normalized_path_str); + + if let Some(existing_file) = maybe_existing_file { + let existing_file = existing_file.clone(); + drop(existing_files_read); + + if !quick_check || size == existing_file.size { + let same_mtime = mtime.is_some() && mtime == existing_file.mtime; + let mut same_hash = false; + + if !quick_check || !same_mtime { + file_hash = Some(path.hash()?); + same_hash = file_hash.as_ref().unwrap() == &existing_file.hash; + } + + if same_mtime || same_hash { + if mtime != existing_file.mtime { + self.file_update_mtime(existing_file.id, mtime)?; + } + + if !existing_file.valid { + self.file_set_valid(existing_file.id, true)?; + } + + let mut existing_files_write = existing_files.write().unwrap(); + let maybe_existing_file = existing_files_write + .iter() + .enumerate() + .find(|(_, file)| file.path == normalized_path_str) + .map(|(idx, _)| idx); + + if let Some(idx) = maybe_existing_file { + existing_files_write.swap_remove(idx); + debug!("Unchanged: {:?}", path); + return Ok(UpdatePathOutcome::Unchanged(path)); + } + } + } + } else { + drop(existing_files_read); + } + + // If not, add it! + if file_hash.is_none() { + file_hash = Some(path.hash()?); + } + let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string()); + + self.insert_file_with_metadata( + &connection, + &normalized_path, + file_hash.unwrap(), + size, + mtime, + mime_type, + Some(resolve_cache), + ) + .map(|_| { + info!("Added: {:?}", path); + UpdatePathOutcome::Added(path.clone()) + }) + } + + fn add_file(&self, connection: &UpEndConnection, path: &Path, hash: Hash) -> Result
{ + let normalized_path = self.normalize_path(path)?; + let metadata = fs::metadata(&path)?; + let size = metadata.len() as i64; + let mtime = metadata + .modified() + .map(|t| { + NaiveDateTime::from_timestamp( + t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, + 0, + ) + }) + .ok(); + let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string()); + + self.insert_file_with_metadata( + connection, + &normalized_path, + hash, + size, + mtime, + mime_type, + None, + ) + } + + fn insert_file_with_metadata( + &self, + connection: &UpEndConnection, + normalized_path: &Path, + hash: Hash, + size: i64, + mtime: Option, + mime_type: Option, + resolve_cache: Option<&Arc>>, + ) -> Result
{ + let new_file = db::NewFile { + path: normalized_path + .to_str() + .ok_or(anyhow!("Path not UTF-8?!"))? + .to_string(), + hash: (hash.clone()).0, + added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), + size, + mtime, + }; + + let blob_address = Address::Hash(hash); + + // Metadata + let type_entry = Entry { + entity: blob_address.clone(), + attribute: String::from(IS_OF_TYPE_ATTR), + value: BLOB_TYPE_ADDR.clone().into(), + }; + + let size_entry = Entry { + entity: blob_address.clone(), + attribute: FILE_SIZE_KEY.to_string(), + value: (size as f64).into(), + }; + + let mime_entry = mime_type.map(|mime_type| Entry { + entity: blob_address.clone(), + attribute: FILE_MIME_KEY.to_string(), + value: mime_type.into(), + }); + + let added_entry = Entry { + entity: blob_address.clone(), + attribute: ADDED_ATTR.to_string(), + value: (SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() as f64) + .into(), + }; + + // Add the appropriate entries w/r/t virtual filesystem location + let components = normalized_path.components().collect::>(); + let (filename, dir_path) = components.split_last().unwrap(); + + let upath = UHierPath( + iter::once(UNode::new("NATIVE").unwrap()) + .chain(dir_path.iter().map(|component| { + UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap() + })) + .collect(), + ); + let resolved_path = match resolve_cache { + Some(cache) => resolve_path_cached(connection, &upath, true, cache)?, + None => resolve_path(connection, &upath, true)?, + }; + let parent_dir = resolved_path.last().unwrap(); + + // Insert all + let file_count = self.insert_file(new_file)?; + + connection.insert_entry_immutable(type_entry)?; + connection.insert_entry_immutable(size_entry)?; + if file_count == 1 { + connection.insert_entry_immutable(added_entry)?; + } + if let Some(mime_entry) = mime_entry { + connection.insert_entry(mime_entry)?; + } + + let dir_has_entry = Entry { + entity: parent_dir.clone(), + attribute: HIER_HAS_ATTR.to_string(), + value: blob_address.clone().into(), + }; + let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?; + + let label_entry = Entry { + entity: blob_address.clone(), + attribute: LABEL_ATTR.to_string(), + value: filename.as_os_str().to_string_lossy().to_string().into(), + }; + let label_entry_addr = connection.insert_entry(label_entry)?; + + let alias_entry = Entry { + entity: dir_has_entry_addr, + attribute: ALIAS_KEY.to_string(), + value: label_entry_addr.into(), + }; + connection.insert_entry(alias_entry)?; + + Ok(blob_address) + } + + pub fn insert_file(&self, file: db::NewFile) -> Result { + debug!( + "Inserting {} ({})...", + &file.path, + Address::Hash(Hash((&file.hash).clone())) + ); + + // let _lock = self.lock.write().unwrap(); + let conn = self.manager.connect()?; + + diesel::insert_into(files::table) + .values(&file) + .execute(&conn)?; + + Ok(files::dsl::files + .filter(files::dsl::valid.eq(true)) + .filter(files::dsl::hash.eq(file.hash)) + .count() + .first::(&conn)? + .try_into() + .unwrap()) + } + + fn retrieve_file(&self, obj_hash: &Hash) -> Result> { + use self::db::files::dsl::*; + + // let _lock = self.lock.read().unwrap(); + let conn = self.manager.connect()?; + + let matches = files + .filter(valid.eq(true)) + .filter(hash.eq(&obj_hash.0)) + .load::(&conn)?; + + let matches = matches + .into_iter() + .map(|f| db::OutFile { + id: f.id, + hash: f.hash, + path: self.path.join(PathBuf::from(f.path)), + valid: f.valid, + added: f.added, + size: f.size, + mtime: f.mtime, + }) + .collect(); + + Ok(matches) + } + + fn retrieve_all_files(&self) -> Result> { + use self::db::files::dsl::*; + + // let _lock = self.lock.read().unwrap(); + let conn = self.manager.connect()?; + + let matches = files.load::(&conn)?; + Ok(matches) + } + + fn file_update_mtime(&self, file_id: i32, m_time: Option) -> Result { + use self::db::files::dsl::*; + debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); + + // let _lock = self.lock.write().unwrap(); + let conn = self.manager.connect()?; + + Ok(diesel::update(files.filter(id.eq(file_id))) + .set(mtime.eq(m_time)) + .execute(&conn)?) + } + + fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result { + use self::db::files::dsl::*; + debug!("Setting file ID {} to valid = {}", file_id, is_valid); + + // let _lock = self.lock.write().unwrap(); + let conn = self.manager.connect()?; + + Ok(diesel::update(files.filter(id.eq(file_id))) + .set(valid.eq(is_valid)) + .execute(&conn)?) + } + + fn normalize_path(&self, path: &Path) -> Result { + Ok(path + .canonicalize()? + .strip_prefix(self.path.as_path())? + .to_path_buf()) + } +} + +impl From for Blob { + fn from(of: db::OutFile) -> Self { + Blob { file_path: of.path } + } +} + +impl From for Blob { + fn from(f: db::File) -> Self { + Blob { + file_path: PathBuf::from(f.path), + } + } +} + +impl UpStore for FsStore { + fn retrieve(&self, hash: &crate::util::hash::Hash) -> Result, super::StoreError> { + Ok(self + .retrieve_file(hash) + .map_err(|e| StoreError::Unknown(e.to_string()))? + .into_iter() + .map(Blob::from) + .collect()) + } + + fn retrieve_all(&self) -> Result, super::StoreError> { + Ok(self + .retrieve_all_files() + .map_err(|e| StoreError::Unknown(e.to_string()))? + .into_iter() + .map(Blob::from) + .collect()) + } + + fn store>>( + &self, + connection: UpEndConnection, + blob: Blob, + name_hint: S, + ) -> Result { + let file_path = blob.get_file_path(); + let hash = file_path + .hash() + .map_err(|e| StoreError::Unknown(e.to_string()))?; + + let existing_files = self.retrieve(&hash)?; + + if existing_files.is_empty() { + let address = Address::Hash(hash.clone()); + let addr_str = b58_encode( + address + .encode() + .map_err(|e| StoreError::Unknown(e.to_string()))?, + ); + + let final_name = if let Some(name_hint) = name_hint.into() { + format!("{addr_str}_{name_hint}") + } else { + addr_str + }; + + let final_path = self.path.join(&final_name); + + fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?; + + + self.add_file(&connection, &final_path, hash.clone()) + .map_err(|e| StoreError::Unknown(e.to_string()))?; + } + + Ok(hash) + } + + fn update>( + &self, + db: D, + mut job_container: JobContainer, + ) -> Result, StoreError> { + let job_result = job_container.add_job("REIMPORT", "Scaning vault directory..."); + + match job_result { + Ok(job_handle) => { + let result = self.rescan_vault(db, job_handle, true, false); + + if let Err(err) = &result { + error!("Update did not succeed! {:?}", err); + } + + result.map_err(|err| StoreError::Unknown(err.to_string())) + } + Err(err) => Err(StoreError::Unknown(err.to_string())), + } + } +} + +#[cfg(test)] +mod test { + use crate::database::UpEndDatabase; + use crate::util::jobs::JobContainer; + + use super::*; + use std::fs::File; + use std::io::Write; + use tempfile::TempDir; + + use std::sync::Once; + use tracing_subscriber::filter::{EnvFilter, LevelFilter}; + + static INIT: Once = Once::new(); + + pub fn initialize() { + INIT.call_once(|| { + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::builder() + .with_default_directive(LevelFilter::DEBUG.into()) + .from_env_lossy(), + ) + .init(); + }) + } + + #[test] + fn test_update() { + // Prepare temporary filesystem structure + let temp_dir = TempDir::new().unwrap(); + + let file_path = temp_dir.path().join("my-temporary-note.txt"); + let mut tmp_file = File::create(file_path).unwrap(); + writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); + + let file_path = temp_dir.path().join("hello-world.txt"); + let mut tmp_file = File::create(file_path).unwrap(); + writeln!(tmp_file, "Hello, World!").unwrap(); + + let file_path = temp_dir.path().join("empty"); + File::create(file_path).unwrap(); + + // Initialize database + let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap(); + let store = FsStore::from_path(&temp_dir).unwrap(); + let job_container = JobContainer::new(); + + // Store scan + let rescan_result = store.update(&open_result.db, job_container.clone()); + assert!(rescan_result.is_ok()); + } + + #[test] + fn test_rescan_quick() { + initialize(); + _test_rescan(true) + } + + #[test] + fn test_rescan_full() { + initialize(); + _test_rescan(false) + } + + fn _test_rescan(quick: bool) { + // Prepare temporary filesystem structure + let temp_dir = TempDir::new().unwrap(); + + let file_path = temp_dir.path().join("my-temporary-note.txt"); + let mut tmp_file = File::create(file_path).unwrap(); + writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); + + let file_path = temp_dir.path().join("hello-world.txt"); + let mut tmp_file = File::create(file_path).unwrap(); + writeln!(tmp_file, "Hello, World!").unwrap(); + + let file_path = temp_dir.path().join("empty"); + File::create(file_path).unwrap(); + + // Initialize database + let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap(); + let store = FsStore::from_path(&temp_dir).unwrap(); + let mut job_container = JobContainer::new(); + + // Initial scan + let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); + let rescan_result = store.rescan_vault(&open_result.db, job, quick, true); + + assert!(rescan_result.is_ok()); + let rescan_result = rescan_result.unwrap(); + assert_eq!(rescan_result.len(), 3); + rescan_result + .into_iter() + .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_)))); + + // Modification-less rescan + let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); + let rescan_result = store.rescan_vault(&open_result.db, job, quick, false); + + assert!(rescan_result.is_ok()); + let rescan_result = rescan_result.unwrap(); + assert_eq!(rescan_result.len(), 3); + rescan_result + .into_iter() + .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_)))); + + // Remove a file + std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap(); + + let job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); + let rescan_result = store.rescan_vault(&open_result.db, job, quick, false); + + assert!(rescan_result.is_ok()); + let rescan_result = rescan_result.unwrap(); + assert_eq!(rescan_result.len(), 3); + assert_eq!( + 2, + rescan_result + .iter() + .filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_))) + .count() + ); + assert_eq!( + 1, + rescan_result + .iter() + .filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_))) + .count() + ); + } +} diff --git a/src/database/stores/mod.rs b/src/database/stores/mod.rs new file mode 100644 index 0000000..66ca8d4 --- /dev/null +++ b/src/database/stores/mod.rs @@ -0,0 +1,64 @@ +use std::{ + borrow::Borrow, + path::{Path, PathBuf}, +}; + +use super::{UpEndDatabase, UpEndConnection}; +use crate::util::{hash::Hash, jobs::JobContainer}; + +pub mod fs; + +#[derive(Debug, Clone)] +pub enum StoreError { + NotFound, + Unknown(String), +} + +impl std::fmt::Display for StoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "STORE ERROR") + } +} + +impl std::error::Error for StoreError {} + +type Result = std::result::Result; + +pub struct Blob { + file_path: PathBuf, +} + +impl Blob { + pub fn from_filepath>(path: P) -> Blob { + Blob { + file_path: PathBuf::from(path.as_ref()), + } + } + + pub fn get_file_path(&self) -> &Path { + self.file_path.as_path() + } +} + +#[derive(Debug)] +pub enum UpdatePathOutcome { + Added(PathBuf), + Unchanged(PathBuf), + Removed(PathBuf), + Failed(PathBuf, StoreError), +} +pub trait UpStore { + fn retrieve(&self, hash: &Hash) -> Result>; + fn retrieve_all(&self) -> Result>; + fn store>>( + &self, + connection: UpEndConnection, + blob: Blob, + name_hint: S, + ) -> Result; + fn update>( + &self, + database: D, + job_container: JobContainer, + ) -> Result>; +} diff --git a/src/extractors/audio.rs b/src/extractors/audio.rs index 67e0e44..62e0b95 100644 --- a/src/extractors/audio.rs +++ b/src/extractors/audio.rs @@ -4,9 +4,12 @@ use crate::{ database::{ constants, entry::{Entry, EntryValue}, + stores::{ + fs::{FsStore, FILE_MIME_KEY}, + UpStore, + }, UpEndConnection, }, - filesystem::FILE_MIME_KEY, util::jobs::{JobContainer, JobState}, }; use anyhow::{anyhow, Result}; @@ -18,6 +21,7 @@ impl Extractor for ID3Extractor { &self, address: &Address, connection: &UpEndConnection, + store: &FsStore, mut job_container: JobContainer, ) -> Result> { if let Address::Hash(hash) = address { @@ -34,14 +38,15 @@ impl Extractor for ID3Extractor { return Ok(vec![]); } - let files = connection.retrieve_file(hash)?; + let files = store.retrieve(hash)?; if let Some(file) = files.get(0) { + let file_path = file.get_file_path(); let mut job_handle = job_container.add_job( None, &format!( r#"Getting ID3 info from "{:}""#, - file.path + file_path .components() .last() .unwrap() @@ -50,7 +55,7 @@ impl Extractor for ID3Extractor { ), )?; - let tags = id3::Tag::read_from_path(&file.path)?; + let tags = id3::Tag::read_from_path(&file_path)?; let result: Vec = tags .frames() diff --git a/src/extractors/mod.rs b/src/extractors/mod.rs index 2644e16..72d93bf 100644 --- a/src/extractors/mod.rs +++ b/src/extractors/mod.rs @@ -1,5 +1,6 @@ use crate::{ addressing::Address, + database::stores::fs::FsStore, database::{entry::Entry, UpEndConnection, UpEndDatabase}, util::jobs::JobContainer, }; @@ -25,6 +26,7 @@ pub trait Extractor { &self, address: &Address, connection: &UpEndConnection, + store: &FsStore, job_container: JobContainer, ) -> Result>; @@ -36,10 +38,11 @@ pub trait Extractor { &self, address: &Address, connection: &UpEndConnection, + store: &FsStore, job_container: JobContainer, ) -> Result { if self.is_needed(address, connection)? { - let entries = self.get(address, connection, job_container)?; + let entries = self.get(address, connection, store, job_container)?; connection.transaction(|| { let len = entries.len(); @@ -54,13 +57,15 @@ pub trait Extractor { } } -pub fn extract_all>( +pub fn extract_all, S: Borrow>( db: D, + store: S, mut job_container: JobContainer, ) -> Result { info!("Extracting metadata for all addresses."); let db = db.borrow(); + let store = store.borrow(); let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?; let all_addresses = db.connection()?.get_all_addresses()?; @@ -72,7 +77,7 @@ pub fn extract_all>( .par_iter() .map(|address| { let connection = db.connection()?; - let extract_result = extract(address, &connection, job_container.clone()); + let extract_result = extract(address, &connection, store, job_container.clone()); let mut cnt = count.write().unwrap(); *cnt += 1; @@ -99,6 +104,7 @@ pub fn extract_all>( pub fn extract( address: &Address, connection: &UpEndConnection, + store: &FsStore, job_container: JobContainer, ) -> Result { let mut entry_count = 0; @@ -106,18 +112,20 @@ pub fn extract( #[cfg(feature = "extractors-web")] { - entry_count += web::WebExtractor.insert_info(address, connection, job_container.clone())?; + entry_count += + web::WebExtractor.insert_info(address, connection, store, job_container.clone())?; } #[cfg(feature = "extractors-audio")] { entry_count += - audio::ID3Extractor.insert_info(address, connection, job_container.clone())?; + audio::ID3Extractor.insert_info(address, connection, store, job_container.clone())?; } #[cfg(feature = "extractors-photo")] { - entry_count += photo::ExifExtractor.insert_info(address, connection, job_container)?; + entry_count += + photo::ExifExtractor.insert_info(address, connection, store, job_container)?; } trace!("Extracting metadata for {address:?} - got {entry_count} entries."); diff --git a/src/extractors/photo.rs b/src/extractors/photo.rs index c650308..d89b1a9 100644 --- a/src/extractors/photo.rs +++ b/src/extractors/photo.rs @@ -4,9 +4,9 @@ use crate::{ database::{ constants, entry::{Entry, EntryValue}, + stores::{fs::{FILE_MIME_KEY, FsStore}, UpStore}, UpEndConnection, }, - filesystem::FILE_MIME_KEY, util::jobs::{JobContainer, JobState}, }; use anyhow::{anyhow, Result}; @@ -21,6 +21,7 @@ impl Extractor for ExifExtractor { &self, address: &Address, connection: &UpEndConnection, + store: &FsStore, mut job_container: JobContainer, ) -> Result> { if let Address::Hash(hash) = address { @@ -37,14 +38,15 @@ impl Extractor for ExifExtractor { return Ok(vec![]); } - let files = connection.retrieve_file(hash)?; + let files = store.retrieve(hash)?; if let Some(file) = files.get(0) { + let file_path = file.get_file_path(); let mut job_handle = job_container.add_job( None, &format!( r#"Getting EXIF info from "{:}""#, - file.path + file_path .components() .last() .unwrap() @@ -53,7 +55,7 @@ impl Extractor for ExifExtractor { ), )?; - let file = std::fs::File::open(&file.path)?; + let file = std::fs::File::open(&file_path)?; let mut bufreader = std::io::BufReader::new(&file); let exifreader = exif::Reader::new(); let exif = exifreader.read_from_container(&mut bufreader)?; diff --git a/src/extractors/web.rs b/src/extractors/web.rs index 5ea2423..20208a1 100644 --- a/src/extractors/web.rs +++ b/src/extractors/web.rs @@ -1,7 +1,7 @@ use super::Extractor; use crate::{ addressing::Address, - database::{entry::Entry, UpEndConnection}, + database::{entry::Entry, stores::fs::FsStore, UpEndConnection}, util::jobs::{JobContainer, JobState}, }; use anyhow::anyhow; @@ -16,6 +16,7 @@ impl Extractor for WebExtractor { &self, address: &Address, _: &UpEndConnection, + _: &FsStore, mut job_container: JobContainer, ) -> anyhow::Result> { if let Address::Url(url) = address { @@ -92,12 +93,13 @@ mod test { let temp_dir = TempDir::new().unwrap(); let open_result = crate::database::UpEndDatabase::open(&temp_dir, None, true)?; let connection = open_result.db.connection()?; + let store = FsStore::from_path(&temp_dir)?; let job_container = JobContainer::new(); let address = Address::Url("https://upend.dev".into()); assert!(WebExtractor.is_needed(&address, &connection)?); - WebExtractor.insert_info(&address, &connection, job_container)?; + WebExtractor.insert_info(&address, &connection, &store, job_container)?; assert!(!WebExtractor.is_needed(&address, &connection)?); diff --git a/src/filesystem.rs b/src/filesystem.rs deleted file mode 100644 index 49aac00..0000000 --- a/src/filesystem.rs +++ /dev/null @@ -1,573 +0,0 @@ -use std::borrow::Borrow; -use std::convert::TryFrom; -use std::path::{Component, Path, PathBuf}; -use std::sync::{Arc, Mutex, RwLock}; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; -use std::{fs, iter}; - -use crate::addressing::Address; -use crate::database::constants::{ - ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, - TYPE_HAS_ATTR, -}; -use crate::database::entry::{Entry, InvariantEntry}; -use crate::database::hierarchies::{ - resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode, -}; -use crate::database::inner::models; -use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR}; -use crate::util::hash::{Hash, Hashable}; -use crate::util::jobs::{JobContainer, JobHandle}; -use anyhow::{anyhow, Error, Result}; -use chrono::prelude::*; -use log::{debug, error, info, warn}; -use lru::LruCache; -use rayon::prelude::*; -use walkdir::WalkDir; - -const BLOB_TYPE: &str = "BLOB"; -const ALIAS_KEY: &str = "ALIAS"; -pub const FILE_MIME_KEY: &str = "FILE_MIME"; -const FILE_MTIME_KEY: &str = "FILE_MTIME"; -const FILE_SIZE_KEY: &str = "FILE_SIZE"; - -lazy_static! { - static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry { - attribute: String::from(TYPE_BASE_ATTR), - value: BLOB_TYPE.into(), - }; - static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap(); -} - -fn initialize_types(connection: &UpEndConnection) -> Result<()> { - // BLOB_TYPE - connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?; - upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?; - upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?; - - Ok(()) -} - -pub fn rescan_vault>( - db: D, - mut job_container: JobContainer, - quick_check: bool, - disable_synchronous: bool, -) -> Result> { - let job_result = job_container.add_job("REIMPORT", "Scaning vault directory..."); - - match job_result { - Ok(job_handle) => { - let result = rescan_vault_inner(db, job_handle, quick_check, disable_synchronous); - - if let Err(err) = &result { - error!("Update did not succeed! {:?}", err); - } - - result - } - Err(err) => Err(err), - } -} -struct PragmaSynchronousGuard<'a>(&'a UpEndConnection); - -impl Drop for PragmaSynchronousGuard<'_> { - fn drop(&mut self) { - debug!("Re-enabling synchronous mode."); - let res = self.0.execute("PRAGMA synchronous = NORMAL;"); - if let Err(err) = res { - error!( - "Error setting synchronous mode back to NORMAL! Data loss possible! {}", - err - ); - } - } -} - -type UpdatePathResult = Result; - -#[derive(Debug)] -pub enum UpdatePathOutcome { - Added(PathBuf), - Unchanged(PathBuf), - Removed(PathBuf), - Failed(PathBuf, Error), -} - -fn rescan_vault_inner>( - db: D, - job_handle: JobHandle, - quick_check: bool, - disable_synchronous: bool, -) -> Result> { - let start = Instant::now(); - info!("Vault rescan started."); - - let db = db.borrow(); - let connection = db.connection()?; - - // Initialize types, etc... - debug!("Initializing DB types."); - initialize_types(&connection)?; - - // Disable syncing in SQLite for the duration of the import - let mut _guard: Option = None; - if disable_synchronous { - debug!("Disabling SQLite synchronous mode"); - connection.execute("PRAGMA synchronous = OFF;")?; - _guard = Some(PragmaSynchronousGuard(&connection)); - } - - // Walk through the vault, find all paths - debug!("Traversing vault directory"); - let absolute_dir_path = fs::canonicalize(&*db.vault_path)?; - let path_entries: Vec = WalkDir::new(&*db.vault_path) - .follow_links(true) - .into_iter() - .filter_map(|e| e.ok()) - .filter_map(|e| fs::canonicalize(e.into_path()).ok()) - .filter(|e| e.is_file()) - .filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR))) - .collect(); - - // Prepare for processing - let existing_files = Arc::new(RwLock::new(connection.retrieve_all_files()?)); - - // Actual processing - let count = RwLock::new(0_usize); - let resolve_cache = Arc::new(Mutex::new(LruCache::new(256))); - let total = path_entries.len() as f32; - let shared_job_handle = Arc::new(Mutex::new(job_handle)); - let path_outcomes: Vec = path_entries - .into_par_iter() - .map(|path| { - let result = process_directory_entry( - db.connection().unwrap(), - &resolve_cache, - path.clone(), - &existing_files, - quick_check, - ); - - let mut cnt = count.write().unwrap(); - *cnt += 1; - - let _ = shared_job_handle - .lock() - .unwrap() - .update_progress(*cnt as f32 / total * 100.0); - - match result { - Ok(result) => result, - Err(error) => { - error!("Failed to update {:?} ({})", path, error); - UpdatePathOutcome::Failed(path, error) - } - } - }) - .collect(); - - debug!("Processing done, cleaning up..."); - - let existing_files = existing_files.read().unwrap(); - - let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| { - let trans_result = connection.transaction::<_, Error, _>(|| { - connection.file_set_valid(file.id, false)?; - // remove_object(&connection, )? - Ok(()) - }); - - match trans_result { - Ok(_) => { - info!("Removed: {:?}", file.path); - UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())) - } - Err(error) => UpdatePathOutcome::Failed(PathBuf::from(file.path.clone()), error), - } - }); - - // Re-enable SQLite syncing - drop(_guard); - - // Reporting - let all_outcomes = path_outcomes - .into_iter() - .chain(cleanup_results) - .collect::>(); - - let mut failed: Vec<(&PathBuf, &Error)> = vec![]; - let mut created = 0; - let mut unchanged = 0; - let mut deleted = 0; - - for outcome in &all_outcomes { - match outcome { - UpdatePathOutcome::Added(_) => created += 1, - UpdatePathOutcome::Unchanged(_) => unchanged += 1, - UpdatePathOutcome::Removed(_) => deleted += 1, - UpdatePathOutcome::Failed(path, err) => failed.push((path, err)), - } - } - - if !failed.is_empty() { - warn!( - "{} path updates failed! ({})", - failed.len(), - failed - .iter() - .map(|(path, error)| format!("{:?}: {}", path, error)) - .collect::>() - .join(", ") - ) - } - - info!( - "Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.", - db.vault_path, - created, - deleted, - unchanged, - start.elapsed().as_secs() - ); - - Ok(all_outcomes) -} - -fn process_directory_entry( - connection: UpEndConnection, - resolve_cache: &Arc>, - path: PathBuf, - existing_files: &Arc>>, - quick_check: bool, -) -> UpdatePathResult { - debug!("Processing: {:?}", path); - - // Prepare the data - let existing_files = Arc::clone(existing_files); - - let normalized_path = connection.normalize_path(&path)?; - let normalized_path_str = normalized_path - .to_str() - .ok_or(anyhow!("Path not valid unicode!"))?; - - let mut file_hash: Option = None; - - // Get size & mtime for quick comparison - let metadata = fs::metadata(&path)?; - let size = metadata.len() as i64; - if size < 0 { - panic!("File {} too large?!", path.display()); - } - let mtime = metadata - .modified() - .map(|t| { - NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0) - }) - .ok(); - - // Check if the path entry for this file already exists in database - let existing_files_read = existing_files.read().unwrap(); - let maybe_existing_file = existing_files_read - .iter() - .find(|file| file.path == normalized_path_str); - - if let Some(existing_file) = maybe_existing_file { - let existing_file = existing_file.clone(); - drop(existing_files_read); - - if !quick_check || size == existing_file.size { - let same_mtime = mtime.is_some() && mtime == existing_file.mtime; - let mut same_hash = false; - - if !quick_check || !same_mtime { - file_hash = Some(path.hash()?); - same_hash = file_hash.as_ref().unwrap() == &existing_file.hash; - } - - if same_mtime || same_hash { - if mtime != existing_file.mtime { - connection.file_update_mtime(existing_file.id, mtime)?; - } - - if !existing_file.valid { - connection.file_set_valid(existing_file.id, true)?; - } - - let mut existing_files_write = existing_files.write().unwrap(); - let maybe_existing_file = existing_files_write - .iter() - .enumerate() - .find(|(_, file)| file.path == normalized_path_str) - .map(|(idx, _)| idx); - - if let Some(idx) = maybe_existing_file { - existing_files_write.swap_remove(idx); - debug!("Unchanged: {:?}", path); - return Ok(UpdatePathOutcome::Unchanged(path)); - } - } - } - } else { - drop(existing_files_read); - } - - // If not, add it! - if file_hash.is_none() { - file_hash = Some(path.hash()?); - } - let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string()); - - insert_file_with_metadata( - &connection, - &normalized_path, - file_hash.unwrap(), - size, - mtime, - mime_type, - Some(resolve_cache), - ) - .map(|_| { - info!("Added: {:?}", path); - UpdatePathOutcome::Added(path.clone()) - }) -} - -pub fn add_file(connection: &UpEndConnection, path: &Path, hash: Hash) -> Result
{ - let normalized_path = connection.normalize_path(path)?; - let metadata = fs::metadata(&path)?; - let size = metadata.len() as i64; - let mtime = metadata - .modified() - .map(|t| { - NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0) - }) - .ok(); - let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string()); - - insert_file_with_metadata( - connection, - &normalized_path, - hash, - size, - mtime, - mime_type, - None, - ) -} - -fn insert_file_with_metadata( - connection: &UpEndConnection, - normalized_path: &Path, - hash: Hash, - size: i64, - mtime: Option, - mime_type: Option, - resolve_cache: Option<&Arc>>, -) -> Result
{ - let new_file = models::NewFile { - path: normalized_path - .to_str() - .ok_or(anyhow!("Path not UTF-8?!"))? - .to_string(), - hash: (hash.clone()).0, - added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), - size, - mtime, - }; - - let blob_address = Address::Hash(hash); - - // Metadata - let type_entry = Entry { - entity: blob_address.clone(), - attribute: String::from(IS_OF_TYPE_ATTR), - value: BLOB_TYPE_ADDR.clone().into(), - }; - - let size_entry = Entry { - entity: blob_address.clone(), - attribute: FILE_SIZE_KEY.to_string(), - value: (size as f64).into(), - }; - - let mime_entry = mime_type.map(|mime_type| Entry { - entity: blob_address.clone(), - attribute: FILE_MIME_KEY.to_string(), - value: mime_type.into(), - }); - - let added_entry = Entry { - entity: blob_address.clone(), - attribute: ADDED_ATTR.to_string(), - value: (SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs() as f64) - .into(), - }; - - // Add the appropriate entries w/r/t virtual filesystem location - let components = normalized_path.components().collect::>(); - let (filename, dir_path) = components.split_last().unwrap(); - - let upath = UHierPath( - iter::once(UNode::new("NATIVE").unwrap()) - .chain(dir_path.iter().map(|component| { - UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap() - })) - .collect(), - ); - let resolved_path = match resolve_cache { - Some(cache) => resolve_path_cached(connection, &upath, true, cache)?, - None => resolve_path(connection, &upath, true)?, - }; - let parent_dir = resolved_path.last().unwrap(); - - // Insert all - connection.transaction::<_, Error, _>(|| { - let file_count = connection.insert_file(new_file)?; - - connection.insert_entry_immutable(type_entry)?; - connection.insert_entry_immutable(size_entry)?; - if file_count == 1 { - connection.insert_entry_immutable(added_entry)?; - } - if let Some(mime_entry) = mime_entry { - connection.insert_entry(mime_entry)?; - } - - let dir_has_entry = Entry { - entity: parent_dir.clone(), - attribute: HIER_HAS_ATTR.to_string(), - value: blob_address.clone().into(), - }; - let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?; - - let label_entry = Entry { - entity: blob_address.clone(), - attribute: LABEL_ATTR.to_string(), - value: filename.as_os_str().to_string_lossy().to_string().into(), - }; - let label_entry_addr = connection.insert_entry(label_entry)?; - - let alias_entry = Entry { - entity: dir_has_entry_addr, - attribute: ALIAS_KEY.to_string(), - value: label_entry_addr.into(), - }; - connection.insert_entry(alias_entry)?; - - Ok(blob_address) - }) -} - -#[cfg(test)] -mod test { - use crate::database::UpEndDatabase; - use crate::util::jobs::JobContainer; - - use super::*; - use std::fs::File; - use std::io::Write; - use tempfile::TempDir; - - use std::sync::Once; - use tracing_subscriber::filter::{EnvFilter, LevelFilter}; - - static INIT: Once = Once::new(); - - pub fn initialize() { - INIT.call_once(|| { - tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::builder() - .with_default_directive(LevelFilter::DEBUG.into()) - .from_env_lossy(), - ) - .init(); - }) - } - - #[test] - fn test_rescan_quick() { - initialize(); - _test_rescan(true) - } - - #[test] - fn test_rescan_full() { - initialize(); - _test_rescan(false) - } - - fn _test_rescan(quick: bool) { - // Prepare temporary filesystem structure - let temp_dir = TempDir::new().unwrap(); - - let file_path = temp_dir.path().join("my-temporary-note.txt"); - let mut tmp_file = File::create(file_path).unwrap(); - writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); - - let file_path = temp_dir.path().join("hello-world.txt"); - let mut tmp_file = File::create(file_path).unwrap(); - writeln!(tmp_file, "Hello, World!").unwrap(); - - let file_path = temp_dir.path().join("empty"); - File::create(file_path).unwrap(); - - // Initialize database - - let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap(); - let mut job_container = JobContainer::new(); - let _job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); - - // Initial scan - let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true); - - assert!(rescan_result.is_ok()); - let rescan_result = rescan_result.unwrap(); - assert_eq!(rescan_result.len(), 3); - rescan_result - .into_iter() - .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_)))); - - // Modification-less rescan - - let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, false); - - assert!(rescan_result.is_ok()); - let rescan_result = rescan_result.unwrap(); - assert_eq!(rescan_result.len(), 3); - rescan_result - .into_iter() - .for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_)))); - - // Remove a file - - std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap(); - - let rescan_result = rescan_vault(&open_result.db, job_container, quick, false); - - assert!(rescan_result.is_ok()); - let rescan_result = rescan_result.unwrap(); - assert_eq!(rescan_result.len(), 3); - assert_eq!( - 2, - rescan_result - .iter() - .filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_))) - .count() - ); - assert_eq!( - 1, - rescan_result - .iter() - .filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_))) - .count() - ); - } -} diff --git a/src/main.rs b/src/main.rs index dfe91e7..76b2460 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,10 @@ use tracing_subscriber::filter::{EnvFilter, LevelFilter}; use crate::{ common::{get_static_dir, PKG_VERSION}, - database::UpEndDatabase, + database::{ + stores::{fs::FsStore, UpStore}, + UpEndDatabase, + }, util::{exec::block_background, jobs::JobContainer}, }; @@ -26,7 +29,6 @@ mod addressing; mod common; mod database; mod extractors; -mod filesystem; mod previews; mod routes; mod util; @@ -125,6 +127,7 @@ fn main() -> Result<()> { .expect("failed to open database!"); let upend = Arc::new(open_result.db); + let store = Arc::new(FsStore::from_path(vault_path.clone()).unwrap()); let ui_path = get_static_dir("webui"); if ui_path.is_err() { @@ -137,22 +140,22 @@ fn main() -> Result<()> { let ui_enabled = ui_path.is_ok() && !matches.is_present("NO_UI"); let browser_enabled = desktop_enabled && !matches.is_present("NO_BROWSER"); - let preview_path = upend.db_path.join("previews"); + let preview_dir = tempfile::tempdir().unwrap(); #[cfg(feature = "previews")] let preview_store = Some(Arc::new(crate::previews::PreviewStore::new( - preview_path.clone(), - upend.clone(), + preview_dir.path(), + store.clone(), ))); - if matches.is_present("CLEAN") { - info!("Cleaning temporary directories..."); - if preview_path.exists() { - std::fs::remove_dir_all(&preview_path).unwrap(); - debug!("Removed {preview_path:?}"); - } else { - debug!("No preview path exists, continuing..."); - } - } + // if matches.is_present("CLEAN") { + // info!("Cleaning temporary directories..."); + // if preview_path.exists() { + // std::fs::remove_dir_all(&preview_path).unwrap(); + // debug!("Removed {preview_path:?}"); + // } else { + // debug!("No preview path exists, continuing..."); + // } + // } #[cfg(not(feature = "previews"))] let preview_store = None; @@ -193,6 +196,7 @@ fn main() -> Result<()> { .into_owned() }), ), + store, job_container: job_container.clone(), preview_store, desktop_enabled, @@ -226,7 +230,6 @@ fn main() -> Result<()> { .service(routes::list_hier) .service(routes::list_hier_roots) .service(routes::store_info) - .service(routes::get_file) .service(routes::get_jobs) .service(routes::get_info); @@ -257,14 +260,10 @@ fn main() -> Result<()> { if !matches.is_present("NO_INITIAL_UPDATE") { info!("Running initial update..."); - let new = open_result.new; + // let new = open_result.new; block_background::<_, _, anyhow::Error>(move || { - let _ = if new { - filesystem::rescan_vault(upend.clone(), job_container.clone(), false, true) - } else { - filesystem::rescan_vault(upend.clone(), job_container.clone(), true, false) - }; - let _ = extractors::extract_all(upend, job_container); + state.store.update(upend.clone(), job_container.clone()); + let _ = extractors::extract_all(upend, state.store, job_container); Ok(()) }) } diff --git a/src/previews/mod.rs b/src/previews/mod.rs index 02d9c18..7364c69 100644 --- a/src/previews/mod.rs +++ b/src/previews/mod.rs @@ -1,3 +1,5 @@ +use crate::database::stores::fs::FsStore; +use crate::database::stores::UpStore; use crate::util::hash::Hash; use crate::util::jobs::{JobContainer, JobState}; use crate::{database::UpEndDatabase, util::hash::b58_encode}; @@ -27,17 +29,17 @@ pub trait Previewable { } pub struct PreviewStore { path: PathBuf, - db: Arc, + store: Arc, locks: Mutex>>>, } #[cfg(feature = "previews")] impl PreviewStore { - pub fn new>(path: P, db: Arc) -> Self { + pub fn new>(path: P, store: Arc) -> Self { PreviewStore { path: PathBuf::from(path.as_ref()), - db, + store, locks: Mutex::new(HashMap::new()), } } @@ -71,12 +73,12 @@ impl PreviewStore { Ok(Some(thumbpath.clone())) } else { trace!("Calculating preview for {hash:?}..."); - let connection = self.db.connection()?; - let files = connection.retrieve_file(&hash)?; + let files = self.store.retrieve(&hash)?; if let Some(file) = files.get(0) { + let file_path = file.get_file_path(); let mut job_handle = job_container.add_job( None, - &format!("Creating preview for {:?}", file.path.file_name().unwrap()), + &format!("Creating preview for {:?}", file_path.file_name().unwrap()), )?; let mime_type = mime_type.into(); @@ -84,18 +86,18 @@ impl PreviewStore { let mime_type: Option = if mime_type.is_some() { mime_type } else { - tree_magic_mini::from_filepath(&file.path).map(|m| m.into()) + tree_magic_mini::from_filepath(&file_path).map(|m| m.into()) }; let preview = match mime_type { - Some(tm) if tm.starts_with("text") => TextPath(&file.path).get_thumbnail(), + Some(tm) if tm.starts_with("text") => TextPath(&file_path).get_thumbnail(), Some(tm) if tm.starts_with("video") || tm == "application/x-matroska" => { - VideoPath(&file.path).get_thumbnail() + VideoPath(&file_path).get_thumbnail() } Some(tm) if tm.starts_with("audio") || tm == "application/x-riff" => { - AudioPath(&file.path).get_thumbnail() + AudioPath(&file_path).get_thumbnail() } - Some(tm) if tm.starts_with("image") => ImagePath(&file.path).get_thumbnail(), + Some(tm) if tm.starts_with("image") => ImagePath(&file_path).get_thumbnail(), Some(unknown) => Err(anyhow!("No capability for {:?} thumbnails.", unknown)), _ => Err(anyhow!("Unknown file type, or file doesn't exist.")), }; diff --git a/src/routes.rs b/src/routes.rs index caf277c..e3512db 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -3,9 +3,10 @@ use crate::database::constants::{ADDED_ATTR, LABEL_ATTR}; use crate::database::entry::{Entry, EntryValue, InvariantEntry}; use crate::database::hierarchies::{list_roots, resolve_path, UHierPath}; use crate::database::lang::Query; +use crate::database::stores::fs::FsStore; +use crate::database::stores::{Blob, UpStore}; use crate::database::UpEndDatabase; use crate::extractors::{self}; -use crate::filesystem::add_file; use crate::previews::PreviewStore; use crate::util::exec::block_background; use crate::util::hash::{b58_decode, b58_encode, Hashable}; @@ -26,13 +27,12 @@ use futures_util::TryStreamExt; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use serde_json::json; +use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; -use std::fs; use std::io::Write; use std::path::PathBuf; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, io}; use tempfile::NamedTempFile; use uuid::Uuid; @@ -42,6 +42,7 @@ use is_executable::IsExecutable; #[derive(Clone)] pub struct State { pub upend: Arc, + pub store: Arc, pub vault_name: Option, pub job_container: JobContainer, pub preview_store: Option>, @@ -142,11 +143,12 @@ pub async fn get_raw( // Then, check the files let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _hash = hash.clone(); - let files = web::block(move || connection.retrieve_file(_hash.as_ref())) + let _store = state.store.clone(); + let blobs = web::block(move || _store.retrieve(_hash.as_ref())) .await .map_err(ErrorInternalServerError)?; - if let Some(file) = files.get(0) { - let file_path = state.upend.vault_path.join(&file.path); + if let Some(blob) = blobs.get(0) { + let file_path = blob.get_file_path(); if query.native.is_none() { Ok(Either::A( @@ -185,12 +187,9 @@ pub async fn get_raw( http::header::WARNING.to_string(), ); - file_path - .parent() - .ok_or_else(|| { - ErrorInternalServerError("No parent to open as fallback.") - })? - .to_path_buf() + file_path.parent().ok_or_else(|| { + ErrorInternalServerError("No parent to open as fallback.") + })? }; opener::open(path).map_err(error::ErrorServiceUnavailable)?; Ok(Either::B(response.finish())) @@ -441,9 +440,10 @@ pub async fn put_object( let _address = address.clone(); let _job_container = state.job_container.clone(); + let _store = state.store.clone(); block_background::<_, _, anyhow::Error>(move || { let extract_result = - extractors::extract(&_address, &connection, _job_container); + extractors::extract(&_address, &connection, &_store, _job_container); if let Ok(entry_count) = extract_result { debug!("Added {entry_count} extracted entries for {_address:?}"); } else { @@ -493,40 +493,17 @@ pub async fn put_object( while let Some(chunk) = field.try_next().await? { file = web::block(move || file.write_all(&chunk).map(|_| file)).await?; } - let path = PathBuf::from(file.path()); - let hash = web::block(move || path.hash()).await?; - let address = Address::Hash(hash.clone()); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; + let _store = state.store.clone(); + let _filename = filename.clone(); + let hash = web::block(move || { + _store.store(connection, Blob::from_filepath(file.path()), _filename) + }) + .await + .map_err(ErrorInternalServerError)?; - let _hash = hash.clone(); - let existing_files = web::block(move || connection.retrieve_file(&_hash)) - .await - .map_err(ErrorInternalServerError)?; - - if existing_files.is_empty() { - let addr_str = b58_encode(address.encode().map_err(ErrorInternalServerError)?); - let final_name = if let Some(ref filename) = filename { - format!("{addr_str}_{filename}") - } else { - addr_str - }; - - let final_path = state.upend.vault_path.join(&final_name); - - let (_, tmp_path) = file.keep().map_err(ErrorInternalServerError)?; - let final_path = web::block::<_, _, io::Error>(move || { - fs::copy(&tmp_path, &final_path)?; - fs::remove_file(tmp_path)?; - Ok(final_path) - }) - .await?; - - let connection = state.upend.connection().map_err(ErrorInternalServerError)?; - web::block(move || add_file(&connection, &final_path, hash)) - .await - .map_err(ErrorInternalServerError)?; - } + let address = Address::Hash(hash); if let Some(ref filename) = filename { let connection = state.upend.connection().map_err(ErrorInternalServerError)?; @@ -540,10 +517,11 @@ pub async fn put_object( let _address = address.clone(); let _job_container = state.job_container.clone(); + let _store = state.store.clone(); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; block_background::<_, _, anyhow::Error>(move || { let extract_result = - extractors::extract(&_address, &connection, _job_container); + extractors::extract(&_address, &connection, &_store, _job_container); if let Ok(entry_count) = extract_result { debug!("Added {entry_count} extracted entries for {_address:?}"); } else { @@ -732,13 +710,14 @@ pub async fn api_refresh( check_auth(&req, &state)?; block_background::<_, _, anyhow::Error>(move || { - let _ = crate::filesystem::rescan_vault( + let _ = state + .store + .update(state.upend.clone(), state.job_container.clone()); + let _ = crate::extractors::extract_all( state.upend.clone(), + state.store.clone(), state.job_container.clone(), - query.full.is_none(), - false, ); - let _ = crate::extractors::extract_all(state.upend.clone(), state.job_container.clone()); Ok(()) }); Ok(HttpResponse::Ok().finish()) @@ -746,68 +725,48 @@ pub async fn api_refresh( #[get("/api/store")] pub async fn store_info(state: web::Data) -> Result { - let connection = state.upend.connection().map_err(ErrorInternalServerError)?; - let files = web::block(move || connection.retrieve_all_files()) - .await - .map_err(ErrorInternalServerError)?; - let mut files_by_hash = HashMap::new(); - for file in &files { - if !files_by_hash.contains_key(&file.hash) { - files_by_hash.insert(&file.hash, vec![]); - } - files_by_hash.get_mut(&file.hash).unwrap().push(file); - } + // let connection = state.upend.connection().map_err(ErrorInternalServerError)?; + // let files = web::block(move || connection.retrieve_all_files()) + // .await + // .map_err(ErrorInternalServerError)?; + // let mut files_by_hash = HashMap::new(); + // for file in &files { + // if !files_by_hash.contains_key(&file.hash) { + // files_by_hash.insert(&file.hash, vec![]); + // } + // files_by_hash.get_mut(&file.hash).unwrap().push(file); + // } - for paths in files_by_hash.values_mut() { - paths.sort_unstable_by_key(|f| !f.valid); - } + // for paths in files_by_hash.values_mut() { + // paths.sort_unstable_by_key(|f| !f.valid); + // } - let mut blobs = files_by_hash - .iter() - .map(|(hash, files)| { - json!({ - "hash": hash, - "size": files[0].size, - "paths": files.iter().map(|f| json!({ - "added": f.added, - "valid": f.valid, - "path": f.path - })).collect::() - }) - }) - .collect::>(); + // let mut blobs = files_by_hash + // .iter() + // .map(|(hash, files)| { + // json!({ + // "hash": hash, + // "size": files[0].size, + // "paths": files.iter().map(|f| json!({ + // "added": f.added, + // "valid": f.valid, + // "path": f.path + // })).collect::() + // }) + // }) + // .collect::>(); - blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap()); - blobs.reverse(); + // blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap()); + // blobs.reverse(); - Ok(HttpResponse::Ok().json(json!({ - "totals": { - "count": files_by_hash.len(), - "size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::() - }, - "blobs": blobs - }))) -} - -#[get("/api/store/{hash}")] -pub async fn get_file( - state: web::Data, - hash: web::Path, -) -> Result { - let address = - Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?) - .map_err(ErrorInternalServerError)?; - - if let Address::Hash(hash) = address { - let connection = state.upend.connection().map_err(ErrorInternalServerError)?; - let response = web::block(move || connection.retrieve_file(&hash)) - .await - .map_err(ErrorInternalServerError)?; - - Ok(HttpResponse::Ok().json(response)) - } else { - Err(ErrorBadRequest("Address does not refer to a file.")) - } + // Ok(HttpResponse::Ok().json(json!({ + // "totals": { + // "count": files_by_hash.len(), + // "size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::() + // }, + // "blobs": blobs + // }))) + todo!(); } #[derive(Deserialize)] @@ -838,7 +797,7 @@ pub async fn get_jobs( pub async fn get_info(state: web::Data) -> Result { Ok(HttpResponse::Ok().json(json!({ "name": state.vault_name, - "location": &*state.upend.vault_path, + // "location": &*state.store.path, "version": crate::common::PKG_VERSION, "desktop": state.desktop_enabled })))