Compare commits

...

4 Commits

Author SHA1 Message Date
Tomáš Mládek 54897f1468 fix: reenable locks 2022-09-14 22:49:04 +02:00
Tomáš Mládek 0ced93adcc refactor: use trait objects instead of FsStore directly
also fix most clippy hints
2022-09-14 22:49:04 +02:00
Tomáš Mládek 2c4a7f32e5 chore: no default debug output in tests 2022-09-14 22:49:04 +02:00
Tomáš Mládek a7b0a5c00a feat!: multiple vaults
incomplete, but passes tests
2022-09-14 22:48:59 +02:00
20 changed files with 1091 additions and 913 deletions

View File

@ -1,2 +0,0 @@
-- This file should undo anything in `up.sql`
DROP TABLE vaults;

View File

@ -1,5 +0,0 @@
-- Your SQL goes here
CREATE TABLE vaults (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path VARCHAR NOT NULL
)

View File

@ -0,0 +1 @@
DROP TABLE files;

View File

@ -0,0 +1,13 @@
CREATE TABLE files
(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
hash BLOB NOT NULL,
path VARCHAR NOT NULL,
valid BOOLEAN NOT NULL DEFAULT TRUE,
added DATETIME NOT NULL,
size BIGINT NOT NULL,
mtime DATETIME NULL
);
CREATE INDEX files_hash ON files (hash);
CREATE INDEX files_valid ON files (valid);

View File

@ -1,4 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TABLE meta;
DROP TABLE files;
DROP TABLE data;

View File

@ -9,20 +9,6 @@ CREATE TABLE meta
INSERT INTO meta (key, value)
VALUES ('VERSION', '0');
CREATE TABLE files
(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
hash BLOB NOT NULL,
path VARCHAR NOT NULL,
valid BOOLEAN NOT NULL DEFAULT TRUE,
added DATETIME NOT NULL,
size BIGINT NOT NULL,
mtime DATETIME NULL
);
CREATE INDEX files_hash ON files (hash);
CREATE INDEX files_valid ON files (valid);
CREATE TABLE data
(
identity BLOB PRIMARY KEY NOT NULL,

View File

@ -1,43 +1,5 @@
use std::path::PathBuf;
use chrono::NaiveDateTime;
use serde::Serialize;
use super::schema::{data, files, meta};
use crate::util::hash::Hash;
#[derive(Queryable, Serialize, Clone, Debug)]
pub struct File {
pub id: i32,
pub hash: Hash,
pub path: String,
pub valid: bool,
pub added: NaiveDateTime,
pub size: i64,
pub mtime: Option<NaiveDateTime>,
}
// todo - remove, try_from the actual model, impl queryable...
#[derive(Serialize, Clone, Debug)]
pub struct OutFile {
pub id: i32,
pub hash: Hash,
pub path: PathBuf,
pub valid: bool,
pub added: NaiveDateTime,
pub size: i64,
pub mtime: Option<NaiveDateTime>,
}
#[derive(Insertable, Debug)]
#[table_name = "files"]
pub struct NewFile {
pub hash: Vec<u8>,
pub path: String,
pub added: NaiveDateTime,
pub size: i64,
pub mtime: Option<NaiveDateTime>,
}
use super::schema::{data, meta};
#[derive(Queryable, Insertable, Serialize, Debug)]
#[table_name = "data"]

View File

@ -9,19 +9,6 @@ table! {
immutable -> Bool,
}
}
table! {
files (id) {
id -> Integer,
hash -> Binary,
path -> Text,
valid -> Bool,
added -> Timestamp,
size -> BigInt,
mtime -> Nullable<Timestamp>,
}
}
table! {
meta (id) {
id -> Integer,
@ -30,4 +17,4 @@ table! {
}
}
allow_tables_to_appear_in_same_query!(data, files, meta,);
allow_tables_to_appear_in_same_query!(data, meta,);

View File

@ -3,6 +3,8 @@
#[macro_use]
mod macros;
pub mod stores;
pub mod constants;
pub mod engine;
pub mod entry;
@ -22,13 +24,12 @@ use crate::database::lang::Query;
use crate::util::hash::Hash;
use crate::util::LoggerSink;
use anyhow::{anyhow, Result};
use chrono::NaiveDateTime;
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager, PooledConnection};
use diesel::result::{DatabaseErrorKind, Error};
use diesel::sqlite::SqliteConnection;
use hierarchies::initialize_hier;
use std::convert::{TryFrom, TryInto};
use std::convert::TryFrom;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
@ -78,8 +79,7 @@ pub struct OpenResult {
pub struct UpEndDatabase {
pool: DbPool,
lock: Arc<RwLock<()>>,
pub vault_path: Arc<PathBuf>,
pub db_path: Arc<PathBuf>,
vault_path: Arc<PathBuf>
}
pub const UPEND_SUBDIR: &str = ".upend";
@ -122,7 +122,6 @@ impl UpEndDatabase {
pool,
lock: Arc::new(RwLock::new(())),
vault_path: Arc::new(dirpath.as_ref().canonicalize()?),
db_path: Arc::new(upend_path),
};
let connection = db.connection().unwrap();
@ -167,7 +166,6 @@ impl UpEndDatabase {
Ok(UpEndConnection {
conn: self.pool.get()?,
lock: self.lock.clone(),
vault_path: self.vault_path.clone(),
})
}
}
@ -175,7 +173,6 @@ impl UpEndDatabase {
pub struct UpEndConnection {
conn: PooledConnection<ConnectionManager<SqliteConnection>>,
lock: Arc<RwLock<()>>,
vault_path: Arc<PathBuf>,
}
impl UpEndConnection {
@ -214,94 +211,6 @@ impl UpEndConnection {
.map(|mv| mv.value.clone())
}
pub fn insert_file(&self, file: models::NewFile) -> Result<u32> {
use crate::database::inner::schema::files;
debug!(
"Inserting {} ({})...",
&file.path,
Address::Hash(Hash((&file.hash).clone()))
);
let _lock = self.lock.write().unwrap();
diesel::insert_into(files::table)
.values(&file)
.execute(&self.conn)?;
Ok(files::dsl::files
.filter(files::dsl::valid.eq(true))
.filter(files::dsl::hash.eq(file.hash))
.count()
.first::<i64>(&self.conn)?
.try_into()
.unwrap())
}
pub fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<models::OutFile>> {
use crate::database::inner::schema::files::dsl::*;
let _lock = self.lock.read().unwrap();
let matches = files
.filter(valid.eq(true))
.filter(hash.eq(&obj_hash.0))
.load::<models::File>(&self.conn)?;
let matches = matches
.into_iter()
.map(|f| models::OutFile {
id: f.id,
hash: f.hash,
path: self.vault_path.join(PathBuf::from(f.path)),
valid: f.valid,
added: f.added,
size: f.size,
mtime: f.mtime,
})
.collect();
Ok(matches)
}
pub fn retrieve_all_files(&self) -> Result<Vec<models::File>> {
use crate::database::inner::schema::files::dsl::*;
let _lock = self.lock.read().unwrap();
let matches = files.load::<models::File>(&self.conn)?;
Ok(matches)
}
pub fn file_update_mtime(&self, file_id: i32, m_time: Option<NaiveDateTime>) -> Result<usize> {
use crate::database::inner::schema::files::dsl::*;
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
let _lock = self.lock.write().unwrap();
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(mtime.eq(m_time))
.execute(&self.conn)?)
}
pub fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result<usize> {
use crate::database::inner::schema::files::dsl::*;
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
let _lock = self.lock.write().unwrap();
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(valid.eq(is_valid))
.execute(&self.conn)?)
}
pub fn normalize_path(&self, path: &Path) -> Result<PathBuf> {
Ok(path
.canonicalize()?
.strip_prefix(self.vault_path.as_path())?
.to_path_buf())
}
pub fn retrieve_entry(&self, hash: &Hash) -> Result<Option<Entry>> {
use crate::database::inner::schema::data::dsl::*;

View File

@ -0,0 +1,51 @@
use std::path::PathBuf;
use crate::util::hash::Hash;
use chrono::NaiveDateTime;
use diesel::Queryable;
use serde::Serialize;
table! {
files (id) {
id -> Integer,
hash -> Binary,
path -> Text,
valid -> Bool,
added -> Timestamp,
size -> BigInt,
mtime -> Nullable<Timestamp>,
}
}
#[derive(Queryable, Serialize, Clone, Debug)]
pub struct File {
pub id: i32,
pub hash: Hash,
pub path: String,
pub valid: bool,
pub added: NaiveDateTime,
pub size: i64,
pub mtime: Option<NaiveDateTime>,
}
// todo - remove, try_from the actual model, impl queryable...
#[derive(Serialize, Clone, Debug)]
pub struct OutFile {
pub id: i32,
pub hash: Hash,
pub path: PathBuf,
pub valid: bool,
pub added: NaiveDateTime,
pub size: i64,
pub mtime: Option<NaiveDateTime>,
}
#[derive(Insertable, Debug)]
#[table_name = "files"]
pub struct NewFile {
pub hash: Vec<u8>,
pub path: String,
pub added: NaiveDateTime,
pub size: i64,
pub mtime: Option<NaiveDateTime>,
}

View File

@ -0,0 +1,805 @@
use self::db::files;
use super::{Blob, StoreError, UpStore, UpdatePathOutcome};
use crate::addressing::Address;
use crate::database::constants::{
ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR,
TYPE_HAS_ATTR,
};
use crate::database::entry::{Entry, InvariantEntry};
use crate::database::hierarchies::{
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
};
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
use crate::util::hash::{b58_encode, Hash, Hashable};
use crate::util::jobs::{JobContainer, JobHandle};
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use diesel::r2d2::{ConnectionManager, ManageConnection};
use diesel::ExpressionMethods;
use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection};
use log::{debug, error, info, warn};
use lru::LruCache;
use rayon::prelude::*;
use std::borrow::Borrow;
use std::convert::{TryFrom, TryInto};
use std::path::PathBuf;
use std::path::{Component, Path};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::{fs, iter};
use walkdir::WalkDir;
mod db;
const BLOB_TYPE: &str = "BLOB";
const ALIAS_KEY: &str = "ALIAS";
pub const FILE_MIME_KEY: &str = "FILE_MIME";
const FILE_MTIME_KEY: &str = "FILE_MTIME";
const FILE_SIZE_KEY: &str = "FILE_SIZE";
lazy_static! {
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
attribute: String::from(TYPE_BASE_ATTR),
value: BLOB_TYPE.into(),
};
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
}
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
impl Drop for PragmaSynchronousGuard<'_> {
fn drop(&mut self) {
debug!("Re-enabling synchronous mode.");
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
if let Err(err) = res {
error!(
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
err
);
}
}
}
pub struct FsStore {
path: PathBuf,
manager: ConnectionManager<SqliteConnection>,
lock: Arc<RwLock<()>>,
}
impl FsStore {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<FsStore> {
let path = path.as_ref().to_path_buf().canonicalize()?;
let manager = ConnectionManager::<SqliteConnection>::new(
path.join(UPEND_SUBDIR)
.join("upend_vault.sqlite3")
.to_str()
.unwrap(),
);
let connection = manager.connect()?;
// while diesel doesn't support multiple embedded migrations...
connection.execute(
r#"
CREATE TABLE IF NOT EXISTS files
(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
hash BLOB NOT NULL,
path VARCHAR NOT NULL,
valid BOOLEAN NOT NULL DEFAULT TRUE,
added DATETIME NOT NULL,
size BIGINT NOT NULL,
mtime DATETIME NULL
);
CREATE INDEX IF NOT EXISTS files_hash ON files (hash);
CREATE INDEX IF NOT EXISTS files_valid ON files (valid);
"#,
)?;
let lock = Arc::new(RwLock::new(()));
Ok(FsStore {
path,
manager,
lock,
})
}
fn rescan_vault<D: Borrow<UpEndDatabase>>(
&self,
db: D,
job_handle: JobHandle,
quick_check: bool,
disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
let start = Instant::now();
info!("Vault rescan started.");
let db = db.borrow();
let connection = db.connection()?;
// Initialize types, etc...
debug!("Initializing DB types.");
connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
// Disable syncing in SQLite for the duration of the import
let mut _guard: Option<PragmaSynchronousGuard> = None;
if disable_synchronous {
debug!("Disabling SQLite synchronous mode");
connection.execute("PRAGMA synchronous = OFF;")?;
_guard = Some(PragmaSynchronousGuard(&connection));
}
// Walk through the vault, find all paths
debug!("Traversing vault directory");
let absolute_dir_path = fs::canonicalize(&*db.vault_path)?;
let path_entries: Vec<PathBuf> = WalkDir::new(&*db.vault_path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
.filter_map(|e| fs::canonicalize(e.into_path()).ok())
.filter(|e| e.is_file())
.filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR)))
.collect();
// Prepare for processing
let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?));
// Actual processing
let count = RwLock::new(0_usize);
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
let total = path_entries.len() as f32;
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
.into_par_iter()
.map(|path| {
let result = self.process_directory_entry(
db.connection().unwrap(),
&resolve_cache,
path.clone(),
&existing_files,
quick_check,
);
let mut cnt = count.write().unwrap();
*cnt += 1;
let _ = shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0);
match result {
Ok(result) => result,
Err(error) => {
error!("Failed to update {:?} ({})", path, error);
UpdatePathOutcome::Failed(path, StoreError::Unknown(error.to_string()))
}
}
})
.collect();
debug!("Processing done, cleaning up...");
let existing_files = existing_files.read().unwrap();
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
let trans_result = connection.transaction::<_, Error, _>(|| {
self.file_set_valid(file.id, false)?;
// remove_object(&connection, )?
Ok(())
});
match trans_result {
Ok(_) => {
info!("Removed: {:?}", file.path);
UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))
}
Err(error) => UpdatePathOutcome::Failed(
PathBuf::from(file.path.clone()),
StoreError::Unknown(error.to_string()),
),
}
});
// Re-enable SQLite syncing
drop(_guard);
// Reporting
let all_outcomes = path_outcomes
.into_iter()
.chain(cleanup_results)
.collect::<Vec<UpdatePathOutcome>>();
let mut failed: Vec<(&PathBuf, &StoreError)> = vec![];
let mut created = 0;
let mut unchanged = 0;
let mut deleted = 0;
for outcome in &all_outcomes {
match outcome {
UpdatePathOutcome::Added(_) => created += 1,
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
UpdatePathOutcome::Removed(_) => deleted += 1,
UpdatePathOutcome::Failed(path, err) => failed.push((path, err)),
}
}
if !failed.is_empty() {
warn!(
"{} path updates failed! ({})",
failed.len(),
failed
.iter()
.map(|(path, error)| format!("{:?}: {}", path, error))
.collect::<Vec<String>>()
.join(", ")
)
}
info!(
"Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.",
db.vault_path,
created,
deleted,
unchanged,
start.elapsed().as_secs()
);
Ok(all_outcomes)
}
fn process_directory_entry(
&self,
connection: UpEndConnection,
resolve_cache: &Arc<Mutex<ResolveCache>>,
path: PathBuf,
existing_files: &Arc<RwLock<Vec<db::File>>>,
quick_check: bool,
) -> Result<UpdatePathOutcome> {
debug!("Processing: {:?}", path);
// Prepare the data
let existing_files = Arc::clone(existing_files);
let normalized_path = self.normalize_path(&path)?;
let normalized_path_str = normalized_path
.to_str()
.ok_or(anyhow!("Path not valid unicode!"))?;
let mut file_hash: Option<Hash> = None;
// Get size & mtime for quick comparison
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
0,
)
})
.ok();
// Check if the path entry for this file already exists in database
let existing_files_read = existing_files.read().unwrap();
let maybe_existing_file = existing_files_read
.iter()
.find(|file| file.path == normalized_path_str);
if let Some(existing_file) = maybe_existing_file {
let existing_file = existing_file.clone();
drop(existing_files_read);
if !quick_check || size == existing_file.size {
let same_mtime = mtime.is_some() && mtime == existing_file.mtime;
let mut same_hash = false;
if !quick_check || !same_mtime {
file_hash = Some(path.hash()?);
same_hash = file_hash.as_ref().unwrap() == &existing_file.hash;
}
if same_mtime || same_hash {
if mtime != existing_file.mtime {
self.file_update_mtime(existing_file.id, mtime)?;
}
if !existing_file.valid {
self.file_set_valid(existing_file.id, true)?;
}
let mut existing_files_write = existing_files.write().unwrap();
let maybe_existing_file = existing_files_write
.iter()
.enumerate()
.find(|(_, file)| file.path == normalized_path_str)
.map(|(idx, _)| idx);
if let Some(idx) = maybe_existing_file {
existing_files_write.swap_remove(idx);
debug!("Unchanged: {:?}", path);
return Ok(UpdatePathOutcome::Unchanged(path));
}
}
}
} else {
drop(existing_files_read);
}
// If not, add it!
if file_hash.is_none() {
file_hash = Some(path.hash()?);
}
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
self.insert_file_with_metadata(
&connection,
&normalized_path,
file_hash.unwrap(),
size,
mtime,
mime_type,
Some(resolve_cache),
)
.map(|_| {
info!("Added: {:?}", path);
UpdatePathOutcome::Added(path.clone())
})
}
fn add_file(&self, connection: &UpEndConnection, path: &Path, hash: Hash) -> Result<Address> {
let normalized_path = self.normalize_path(path)?;
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
0,
)
})
.ok();
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
self.insert_file_with_metadata(
connection,
&normalized_path,
hash,
size,
mtime,
mime_type,
None,
)
}
fn insert_file_with_metadata(
&self,
connection: &UpEndConnection,
normalized_path: &Path,
hash: Hash,
size: i64,
mtime: Option<NaiveDateTime>,
mime_type: Option<String>,
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
) -> Result<Address> {
let new_file = db::NewFile {
path: normalized_path
.to_str()
.ok_or(anyhow!("Path not UTF-8?!"))?
.to_string(),
hash: (hash.clone()).0,
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
mtime,
};
let blob_address = Address::Hash(hash);
// Metadata
let type_entry = Entry {
entity: blob_address.clone(),
attribute: String::from(IS_OF_TYPE_ATTR),
value: BLOB_TYPE_ADDR.clone().into(),
};
let size_entry = Entry {
entity: blob_address.clone(),
attribute: FILE_SIZE_KEY.to_string(),
value: (size as f64).into(),
};
let mime_entry = mime_type.map(|mime_type| Entry {
entity: blob_address.clone(),
attribute: FILE_MIME_KEY.to_string(),
value: mime_type.into(),
});
let added_entry = Entry {
entity: blob_address.clone(),
attribute: ADDED_ATTR.to_string(),
value: (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as f64)
.into(),
};
// Add the appropriate entries w/r/t virtual filesystem location
let components = normalized_path.components().collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let upath = UHierPath(
iter::once(UNode::new("NATIVE").unwrap())
.chain(dir_path.iter().map(|component| {
UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap()
}))
.collect(),
);
let resolved_path = match resolve_cache {
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
None => resolve_path(connection, &upath, true)?,
};
let parent_dir = resolved_path.last().unwrap();
// Insert all
let file_count = self.insert_file(new_file)?;
connection.insert_entry_immutable(type_entry)?;
connection.insert_entry_immutable(size_entry)?;
if file_count == 1 {
connection.insert_entry_immutable(added_entry)?;
}
if let Some(mime_entry) = mime_entry {
connection.insert_entry(mime_entry)?;
}
let dir_has_entry = Entry {
entity: parent_dir.clone(),
attribute: HIER_HAS_ATTR.to_string(),
value: blob_address.clone().into(),
};
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
let label_entry = Entry {
entity: blob_address.clone(),
attribute: LABEL_ATTR.to_string(),
value: filename.as_os_str().to_string_lossy().to_string().into(),
};
let label_entry_addr = connection.insert_entry(label_entry)?;
let alias_entry = Entry {
entity: dir_has_entry_addr,
attribute: ALIAS_KEY.to_string(),
value: label_entry_addr.into(),
};
connection.insert_entry(alias_entry)?;
Ok(blob_address)
}
pub fn insert_file(&self, file: db::NewFile) -> Result<u32> {
debug!(
"Inserting {} ({})...",
&file.path,
Address::Hash(Hash((&file.hash).clone()))
);
let _lock = self.lock.write().unwrap();
let conn = self.manager.connect()?;
diesel::insert_into(files::table)
.values(&file)
.execute(&conn)?;
Ok(files::dsl::files
.filter(files::dsl::valid.eq(true))
.filter(files::dsl::hash.eq(file.hash))
.count()
.first::<i64>(&conn)?
.try_into()
.unwrap())
}
fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<db::OutFile>> {
use self::db::files::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.manager.connect()?;
let matches = files
.filter(valid.eq(true))
.filter(hash.eq(&obj_hash.0))
.load::<db::File>(&conn)?;
let matches = matches
.into_iter()
.map(|f| db::OutFile {
id: f.id,
hash: f.hash,
path: self.path.join(PathBuf::from(f.path)),
valid: f.valid,
added: f.added,
size: f.size,
mtime: f.mtime,
})
.collect();
Ok(matches)
}
fn retrieve_all_files(&self) -> Result<Vec<db::File>> {
use self::db::files::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.manager.connect()?;
let matches = files.load::<db::File>(&conn)?;
Ok(matches)
}
fn file_update_mtime(&self, file_id: i32, m_time: Option<NaiveDateTime>) -> Result<usize> {
use self::db::files::dsl::*;
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
let _lock = self.lock.write().unwrap();
let conn = self.manager.connect()?;
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(mtime.eq(m_time))
.execute(&conn)?)
}
fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result<usize> {
use self::db::files::dsl::*;
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
let _lock = self.lock.write().unwrap();
let conn = self.manager.connect()?;
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(valid.eq(is_valid))
.execute(&conn)?)
}
fn normalize_path(&self, path: &Path) -> Result<PathBuf> {
Ok(path
.canonicalize()?
.strip_prefix(self.path.as_path())?
.to_path_buf())
}
}
impl From<db::OutFile> for Blob {
fn from(of: db::OutFile) -> Self {
Blob { file_path: of.path }
}
}
impl From<db::File> for Blob {
fn from(f: db::File) -> Self {
Blob {
file_path: PathBuf::from(f.path),
}
}
}
impl UpStore for FsStore {
fn retrieve(&self, hash: &crate::util::hash::Hash) -> Result<Vec<Blob>, super::StoreError> {
Ok(self
.retrieve_file(hash)
.map_err(|e| StoreError::Unknown(e.to_string()))?
.into_iter()
.map(Blob::from)
.collect())
}
fn retrieve_all(&self) -> Result<Vec<Blob>, super::StoreError> {
Ok(self
.retrieve_all_files()
.map_err(|e| StoreError::Unknown(e.to_string()))?
.into_iter()
.map(Blob::from)
.collect())
}
fn store(
&self,
connection: UpEndConnection,
blob: Blob,
name_hint: Option<String>,
) -> Result<Hash, super::StoreError> {
let file_path = blob.get_file_path();
let hash = file_path
.hash()
.map_err(|e| StoreError::Unknown(e.to_string()))?;
let existing_files = self.retrieve(&hash)?;
if existing_files.is_empty() {
let address = Address::Hash(hash.clone());
let addr_str = b58_encode(
address
.encode()
.map_err(|e| StoreError::Unknown(e.to_string()))?,
);
let final_name = if let Some(name_hint) = name_hint {
format!("{addr_str}_{name_hint}")
} else {
addr_str
};
let final_path = self.path.join(&final_name);
fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
self.add_file(&connection, &final_path, hash.clone())
.map_err(|e| StoreError::Unknown(e.to_string()))?;
}
Ok(hash)
}
fn update(
&self,
db: &UpEndDatabase,
mut job_container: JobContainer,
) -> Result<Vec<UpdatePathOutcome>, StoreError> {
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
match job_result {
Ok(job_handle) => {
let result = self.rescan_vault(db, job_handle, true, false);
if let Err(err) = &result {
error!("Update did not succeed! {:?}", err);
}
result.map_err(|err| StoreError::Unknown(err.to_string()))
}
Err(err) => Err(StoreError::Unknown(err.to_string())),
}
}
}
#[cfg(test)]
mod test {
use crate::database::UpEndDatabase;
use crate::util::jobs::JobContainer;
use super::*;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
use std::sync::Once;
use tracing_subscriber::filter::EnvFilter;
static INIT: Once = Once::new();
pub fn initialize() {
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::builder().from_env_lossy())
.init();
})
}
#[test]
fn test_update() {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir.path().join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let file_path = temp_dir.path().join("empty");
File::create(file_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
let store = FsStore::from_path(&temp_dir).unwrap();
let job_container = JobContainer::new();
// Store scan
let rescan_result = store.update(&open_result.db, job_container.clone());
assert!(rescan_result.is_ok());
}
#[test]
fn test_rescan_quick() {
initialize();
_test_rescan(true)
}
#[test]
fn test_rescan_full() {
initialize();
_test_rescan(false)
}
fn _test_rescan(quick: bool) {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir.path().join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let file_path = temp_dir.path().join("empty");
File::create(file_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
let store = FsStore::from_path(&temp_dir).unwrap();
let mut job_container = JobContainer::new();
// Initial scan
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(&open_result.db, job, quick, true);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result
.into_iter()
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_))));
// Modification-less rescan
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(&open_result.db, job, quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result
.into_iter()
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_))));
// Remove a file
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
let rescan_result = store.rescan_vault(&open_result.db, job, quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
assert_eq!(
2,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_)))
.count()
);
}
}

View File

@ -0,0 +1,66 @@
use std::path::{Path, PathBuf};
use super::{UpEndConnection, UpEndDatabase};
use crate::util::{hash::Hash, jobs::JobContainer};
pub mod fs;
#[derive(Debug, Clone)]
pub enum StoreError {
Unknown(String),
}
impl std::fmt::Display for StoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"STORE ERROR: {}",
match self {
StoreError::Unknown(err) => err,
}
)
}
}
impl std::error::Error for StoreError {}
type Result<T> = std::result::Result<T, StoreError>;
pub struct Blob {
file_path: PathBuf,
}
impl Blob {
pub fn from_filepath<P: AsRef<Path>>(path: P) -> Blob {
Blob {
file_path: PathBuf::from(path.as_ref()),
}
}
pub fn get_file_path(&self) -> &Path {
self.file_path.as_path()
}
}
#[derive(Debug)]
pub enum UpdatePathOutcome {
Added(PathBuf),
Unchanged(PathBuf),
Removed(PathBuf),
Failed(PathBuf, StoreError),
}
pub trait UpStore {
fn retrieve(&self, hash: &Hash) -> Result<Vec<Blob>>;
fn retrieve_all(&self) -> Result<Vec<Blob>>;
fn store(
&self,
connection: UpEndConnection,
blob: Blob,
name_hint: Option<String>,
) -> Result<Hash>;
fn update(
&self,
database: &UpEndDatabase,
job_container: JobContainer,
) -> Result<Vec<UpdatePathOutcome>>;
}

View File

@ -1,12 +1,14 @@
use std::sync::Arc;
use super::Extractor;
use crate::{
addressing::Address,
database::{
constants,
entry::{Entry, EntryValue},
stores::{fs::FILE_MIME_KEY, UpStore},
UpEndConnection,
},
filesystem::FILE_MIME_KEY,
util::jobs::{JobContainer, JobState},
};
use anyhow::{anyhow, Result};
@ -18,6 +20,7 @@ impl Extractor for ID3Extractor {
&self,
address: &Address,
connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
) -> Result<Vec<Entry>> {
if let Address::Hash(hash) = address {
@ -34,14 +37,15 @@ impl Extractor for ID3Extractor {
return Ok(vec![]);
}
let files = connection.retrieve_file(hash)?;
let files = store.retrieve(hash)?;
if let Some(file) = files.get(0) {
let file_path = file.get_file_path();
let mut job_handle = job_container.add_job(
None,
&format!(
r#"Getting ID3 info from "{:}""#,
file.path
file_path
.components()
.last()
.unwrap()
@ -50,7 +54,7 @@ impl Extractor for ID3Extractor {
),
)?;
let tags = id3::Tag::read_from_path(&file.path)?;
let tags = id3::Tag::read_from_path(&file_path)?;
let result: Vec<Entry> = tags
.frames()

View File

@ -1,6 +1,6 @@
use crate::{
addressing::Address,
database::{entry::Entry, UpEndConnection, UpEndDatabase},
database::{entry::Entry, stores::UpStore, UpEndConnection, UpEndDatabase},
util::jobs::JobContainer,
};
use anyhow::Result;
@ -25,6 +25,7 @@ pub trait Extractor {
&self,
address: &Address,
connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer,
) -> Result<Vec<Entry>>;
@ -36,10 +37,11 @@ pub trait Extractor {
&self,
address: &Address,
connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer,
) -> Result<usize> {
if self.is_needed(address, connection)? {
let entries = self.get(address, connection, job_container)?;
let entries = self.get(address, connection, store, job_container)?;
connection.transaction(|| {
let len = entries.len();
@ -56,6 +58,7 @@ pub trait Extractor {
pub fn extract_all<D: Borrow<UpEndDatabase>>(
db: D,
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
) -> Result<usize> {
info!("Extracting metadata for all addresses.");
@ -72,7 +75,7 @@ pub fn extract_all<D: Borrow<UpEndDatabase>>(
.par_iter()
.map(|address| {
let connection = db.connection()?;
let extract_result = extract(address, &connection, job_container.clone());
let extract_result = extract(address, &connection, store.clone(), job_container.clone());
let mut cnt = count.write().unwrap();
*cnt += 1;
@ -99,6 +102,7 @@ pub fn extract_all<D: Borrow<UpEndDatabase>>(
pub fn extract(
address: &Address,
connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer,
) -> Result<usize> {
let mut entry_count = 0;
@ -106,18 +110,20 @@ pub fn extract(
#[cfg(feature = "extractors-web")]
{
entry_count += web::WebExtractor.insert_info(address, connection, job_container.clone())?;
entry_count +=
web::WebExtractor.insert_info(address, connection, store.clone(), job_container.clone())?;
}
#[cfg(feature = "extractors-audio")]
{
entry_count +=
audio::ID3Extractor.insert_info(address, connection, job_container.clone())?;
audio::ID3Extractor.insert_info(address, connection, store.clone(), job_container.clone())?;
}
#[cfg(feature = "extractors-photo")]
{
entry_count += photo::ExifExtractor.insert_info(address, connection, job_container)?;
entry_count +=
photo::ExifExtractor.insert_info(address, connection, store.clone(), job_container)?;
}
trace!("Extracting metadata for {address:?} - got {entry_count} entries.");

View File

@ -1,12 +1,14 @@
use std::sync::Arc;
use super::Extractor;
use crate::{
addressing::Address,
database::{
constants,
entry::{Entry, EntryValue},
stores::{fs::{FILE_MIME_KEY}, UpStore},
UpEndConnection,
},
filesystem::FILE_MIME_KEY,
util::jobs::{JobContainer, JobState},
};
use anyhow::{anyhow, Result};
@ -21,6 +23,7 @@ impl Extractor for ExifExtractor {
&self,
address: &Address,
connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
) -> Result<Vec<Entry>> {
if let Address::Hash(hash) = address {
@ -37,14 +40,15 @@ impl Extractor for ExifExtractor {
return Ok(vec![]);
}
let files = connection.retrieve_file(hash)?;
let files = store.retrieve(hash)?;
if let Some(file) = files.get(0) {
let file_path = file.get_file_path();
let mut job_handle = job_container.add_job(
None,
&format!(
r#"Getting EXIF info from "{:}""#,
file.path
file_path
.components()
.last()
.unwrap()
@ -53,7 +57,7 @@ impl Extractor for ExifExtractor {
),
)?;
let file = std::fs::File::open(&file.path)?;
let file = std::fs::File::open(&file_path)?;
let mut bufreader = std::io::BufReader::new(&file);
let exifreader = exif::Reader::new();
let exif = exifreader.read_from_container(&mut bufreader)?;

View File

@ -1,7 +1,9 @@
use std::sync::Arc;
use super::Extractor;
use crate::{
addressing::Address,
database::{entry::Entry, UpEndConnection},
database::{entry::Entry, stores::UpStore, UpEndConnection},
util::jobs::{JobContainer, JobState},
};
use anyhow::anyhow;
@ -15,9 +17,10 @@ impl Extractor for WebExtractor {
fn get(
&self,
address: &Address,
_: &UpEndConnection,
_connection: &UpEndConnection,
_store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
) -> anyhow::Result<Vec<Entry>> {
) -> Result<Vec<Entry>> {
if let Address::Url(url) = address {
let mut job_handle =
job_container.add_job(None, &format!("Getting info about {url:?}"))?;
@ -81,10 +84,12 @@ impl Extractor for WebExtractor {
#[cfg(test)]
mod test {
use crate::util::jobs::JobContainer;
use crate::{database::stores::fs::FsStore, util::jobs::JobContainer};
use super::*;
use anyhow::Result;
use std::sync::Arc;
use tempfile::TempDir;
#[test]
@ -92,12 +97,14 @@ mod test {
let temp_dir = TempDir::new().unwrap();
let open_result = crate::database::UpEndDatabase::open(&temp_dir, None, true)?;
let connection = open_result.db.connection()?;
let store =
Arc::new(Box::new(FsStore::from_path(&temp_dir)?) as Box<dyn UpStore + Sync + Send>);
let job_container = JobContainer::new();
let address = Address::Url("https://upend.dev".into());
assert!(WebExtractor.is_needed(&address, &connection)?);
WebExtractor.insert_info(&address, &connection, job_container)?;
WebExtractor.insert_info(&address, &connection, store, job_container)?;
assert!(!WebExtractor.is_needed(&address, &connection)?);

View File

@ -1,573 +0,0 @@
use std::borrow::Borrow;
use std::convert::TryFrom;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::{fs, iter};
use crate::addressing::Address;
use crate::database::constants::{
ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR,
TYPE_HAS_ATTR,
};
use crate::database::entry::{Entry, InvariantEntry};
use crate::database::hierarchies::{
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
};
use crate::database::inner::models;
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
use crate::util::hash::{Hash, Hashable};
use crate::util::jobs::{JobContainer, JobHandle};
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use log::{debug, error, info, warn};
use lru::LruCache;
use rayon::prelude::*;
use walkdir::WalkDir;
const BLOB_TYPE: &str = "BLOB";
const ALIAS_KEY: &str = "ALIAS";
pub const FILE_MIME_KEY: &str = "FILE_MIME";
const FILE_MTIME_KEY: &str = "FILE_MTIME";
const FILE_SIZE_KEY: &str = "FILE_SIZE";
lazy_static! {
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
attribute: String::from(TYPE_BASE_ATTR),
value: BLOB_TYPE.into(),
};
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
}
fn initialize_types(connection: &UpEndConnection) -> Result<()> {
// BLOB_TYPE
connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
Ok(())
}
pub fn rescan_vault<D: Borrow<UpEndDatabase>>(
db: D,
mut job_container: JobContainer,
quick_check: bool,
disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
match job_result {
Ok(job_handle) => {
let result = rescan_vault_inner(db, job_handle, quick_check, disable_synchronous);
if let Err(err) = &result {
error!("Update did not succeed! {:?}", err);
}
result
}
Err(err) => Err(err),
}
}
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
impl Drop for PragmaSynchronousGuard<'_> {
fn drop(&mut self) {
debug!("Re-enabling synchronous mode.");
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
if let Err(err) = res {
error!(
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
err
);
}
}
}
type UpdatePathResult = Result<UpdatePathOutcome>;
#[derive(Debug)]
pub enum UpdatePathOutcome {
Added(PathBuf),
Unchanged(PathBuf),
Removed(PathBuf),
Failed(PathBuf, Error),
}
fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
db: D,
job_handle: JobHandle,
quick_check: bool,
disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
let start = Instant::now();
info!("Vault rescan started.");
let db = db.borrow();
let connection = db.connection()?;
// Initialize types, etc...
debug!("Initializing DB types.");
initialize_types(&connection)?;
// Disable syncing in SQLite for the duration of the import
let mut _guard: Option<PragmaSynchronousGuard> = None;
if disable_synchronous {
debug!("Disabling SQLite synchronous mode");
connection.execute("PRAGMA synchronous = OFF;")?;
_guard = Some(PragmaSynchronousGuard(&connection));
}
// Walk through the vault, find all paths
debug!("Traversing vault directory");
let absolute_dir_path = fs::canonicalize(&*db.vault_path)?;
let path_entries: Vec<PathBuf> = WalkDir::new(&*db.vault_path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
.filter_map(|e| fs::canonicalize(e.into_path()).ok())
.filter(|e| e.is_file())
.filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR)))
.collect();
// Prepare for processing
let existing_files = Arc::new(RwLock::new(connection.retrieve_all_files()?));
// Actual processing
let count = RwLock::new(0_usize);
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
let total = path_entries.len() as f32;
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
.into_par_iter()
.map(|path| {
let result = process_directory_entry(
db.connection().unwrap(),
&resolve_cache,
path.clone(),
&existing_files,
quick_check,
);
let mut cnt = count.write().unwrap();
*cnt += 1;
let _ = shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0);
match result {
Ok(result) => result,
Err(error) => {
error!("Failed to update {:?} ({})", path, error);
UpdatePathOutcome::Failed(path, error)
}
}
})
.collect();
debug!("Processing done, cleaning up...");
let existing_files = existing_files.read().unwrap();
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
let trans_result = connection.transaction::<_, Error, _>(|| {
connection.file_set_valid(file.id, false)?;
// remove_object(&connection, )?
Ok(())
});
match trans_result {
Ok(_) => {
info!("Removed: {:?}", file.path);
UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))
}
Err(error) => UpdatePathOutcome::Failed(PathBuf::from(file.path.clone()), error),
}
});
// Re-enable SQLite syncing
drop(_guard);
// Reporting
let all_outcomes = path_outcomes
.into_iter()
.chain(cleanup_results)
.collect::<Vec<UpdatePathOutcome>>();
let mut failed: Vec<(&PathBuf, &Error)> = vec![];
let mut created = 0;
let mut unchanged = 0;
let mut deleted = 0;
for outcome in &all_outcomes {
match outcome {
UpdatePathOutcome::Added(_) => created += 1,
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
UpdatePathOutcome::Removed(_) => deleted += 1,
UpdatePathOutcome::Failed(path, err) => failed.push((path, err)),
}
}
if !failed.is_empty() {
warn!(
"{} path updates failed! ({})",
failed.len(),
failed
.iter()
.map(|(path, error)| format!("{:?}: {}", path, error))
.collect::<Vec<String>>()
.join(", ")
)
}
info!(
"Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.",
db.vault_path,
created,
deleted,
unchanged,
start.elapsed().as_secs()
);
Ok(all_outcomes)
}
fn process_directory_entry(
connection: UpEndConnection,
resolve_cache: &Arc<Mutex<ResolveCache>>,
path: PathBuf,
existing_files: &Arc<RwLock<Vec<models::File>>>,
quick_check: bool,
) -> UpdatePathResult {
debug!("Processing: {:?}", path);
// Prepare the data
let existing_files = Arc::clone(existing_files);
let normalized_path = connection.normalize_path(&path)?;
let normalized_path_str = normalized_path
.to_str()
.ok_or(anyhow!("Path not valid unicode!"))?;
let mut file_hash: Option<Hash> = None;
// Get size & mtime for quick comparison
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
})
.ok();
// Check if the path entry for this file already exists in database
let existing_files_read = existing_files.read().unwrap();
let maybe_existing_file = existing_files_read
.iter()
.find(|file| file.path == normalized_path_str);
if let Some(existing_file) = maybe_existing_file {
let existing_file = existing_file.clone();
drop(existing_files_read);
if !quick_check || size == existing_file.size {
let same_mtime = mtime.is_some() && mtime == existing_file.mtime;
let mut same_hash = false;
if !quick_check || !same_mtime {
file_hash = Some(path.hash()?);
same_hash = file_hash.as_ref().unwrap() == &existing_file.hash;
}
if same_mtime || same_hash {
if mtime != existing_file.mtime {
connection.file_update_mtime(existing_file.id, mtime)?;
}
if !existing_file.valid {
connection.file_set_valid(existing_file.id, true)?;
}
let mut existing_files_write = existing_files.write().unwrap();
let maybe_existing_file = existing_files_write
.iter()
.enumerate()
.find(|(_, file)| file.path == normalized_path_str)
.map(|(idx, _)| idx);
if let Some(idx) = maybe_existing_file {
existing_files_write.swap_remove(idx);
debug!("Unchanged: {:?}", path);
return Ok(UpdatePathOutcome::Unchanged(path));
}
}
}
} else {
drop(existing_files_read);
}
// If not, add it!
if file_hash.is_none() {
file_hash = Some(path.hash()?);
}
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
insert_file_with_metadata(
&connection,
&normalized_path,
file_hash.unwrap(),
size,
mtime,
mime_type,
Some(resolve_cache),
)
.map(|_| {
info!("Added: {:?}", path);
UpdatePathOutcome::Added(path.clone())
})
}
pub fn add_file(connection: &UpEndConnection, path: &Path, hash: Hash) -> Result<Address> {
let normalized_path = connection.normalize_path(path)?;
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
})
.ok();
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
insert_file_with_metadata(
connection,
&normalized_path,
hash,
size,
mtime,
mime_type,
None,
)
}
fn insert_file_with_metadata(
connection: &UpEndConnection,
normalized_path: &Path,
hash: Hash,
size: i64,
mtime: Option<NaiveDateTime>,
mime_type: Option<String>,
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
) -> Result<Address> {
let new_file = models::NewFile {
path: normalized_path
.to_str()
.ok_or(anyhow!("Path not UTF-8?!"))?
.to_string(),
hash: (hash.clone()).0,
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
mtime,
};
let blob_address = Address::Hash(hash);
// Metadata
let type_entry = Entry {
entity: blob_address.clone(),
attribute: String::from(IS_OF_TYPE_ATTR),
value: BLOB_TYPE_ADDR.clone().into(),
};
let size_entry = Entry {
entity: blob_address.clone(),
attribute: FILE_SIZE_KEY.to_string(),
value: (size as f64).into(),
};
let mime_entry = mime_type.map(|mime_type| Entry {
entity: blob_address.clone(),
attribute: FILE_MIME_KEY.to_string(),
value: mime_type.into(),
});
let added_entry = Entry {
entity: blob_address.clone(),
attribute: ADDED_ATTR.to_string(),
value: (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as f64)
.into(),
};
// Add the appropriate entries w/r/t virtual filesystem location
let components = normalized_path.components().collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let upath = UHierPath(
iter::once(UNode::new("NATIVE").unwrap())
.chain(dir_path.iter().map(|component| {
UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap()
}))
.collect(),
);
let resolved_path = match resolve_cache {
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
None => resolve_path(connection, &upath, true)?,
};
let parent_dir = resolved_path.last().unwrap();
// Insert all
connection.transaction::<_, Error, _>(|| {
let file_count = connection.insert_file(new_file)?;
connection.insert_entry_immutable(type_entry)?;
connection.insert_entry_immutable(size_entry)?;
if file_count == 1 {
connection.insert_entry_immutable(added_entry)?;
}
if let Some(mime_entry) = mime_entry {
connection.insert_entry(mime_entry)?;
}
let dir_has_entry = Entry {
entity: parent_dir.clone(),
attribute: HIER_HAS_ATTR.to_string(),
value: blob_address.clone().into(),
};
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
let label_entry = Entry {
entity: blob_address.clone(),
attribute: LABEL_ATTR.to_string(),
value: filename.as_os_str().to_string_lossy().to_string().into(),
};
let label_entry_addr = connection.insert_entry(label_entry)?;
let alias_entry = Entry {
entity: dir_has_entry_addr,
attribute: ALIAS_KEY.to_string(),
value: label_entry_addr.into(),
};
connection.insert_entry(alias_entry)?;
Ok(blob_address)
})
}
#[cfg(test)]
mod test {
use crate::database::UpEndDatabase;
use crate::util::jobs::JobContainer;
use super::*;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
use std::sync::Once;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
static INIT: Once = Once::new();
pub fn initialize() {
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy(),
)
.init();
})
}
#[test]
fn test_rescan_quick() {
initialize();
_test_rescan(true)
}
#[test]
fn test_rescan_full() {
initialize();
_test_rescan(false)
}
fn _test_rescan(quick: bool) {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir.path().join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let file_path = temp_dir.path().join("empty");
File::create(file_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
let mut job_container = JobContainer::new();
let _job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
// Initial scan
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result
.into_iter()
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_))));
// Modification-less rescan
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result
.into_iter()
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_))));
// Remove a file
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
let rescan_result = rescan_vault(&open_result.db, job_container, quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
assert_eq!(
2,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_)))
.count()
);
}
}

View File

@ -11,14 +11,17 @@ use std::path::PathBuf;
use actix_web::{middleware, App, HttpServer};
use anyhow::Result;
use clap::{App as ClapApp, Arg};
use log::{debug, info, warn};
use log::{info, warn};
use rand::{thread_rng, Rng};
use std::sync::Arc;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use crate::{
common::{get_static_dir, PKG_VERSION},
database::UpEndDatabase,
database::{
stores::{fs::FsStore, UpStore},
UpEndDatabase,
},
util::{exec::block_background, jobs::JobContainer},
};
@ -26,7 +29,6 @@ mod addressing;
mod common;
mod database;
mod extractors;
mod filesystem;
mod previews;
mod routes;
mod util;
@ -125,6 +127,9 @@ fn main() -> Result<()> {
.expect("failed to open database!");
let upend = Arc::new(open_result.db);
let store =
Arc::new(Box::new(FsStore::from_path(vault_path.clone()).unwrap())
as Box<dyn UpStore + Send + Sync>);
let ui_path = get_static_dir("webui");
if ui_path.is_err() {
@ -137,22 +142,22 @@ fn main() -> Result<()> {
let ui_enabled = ui_path.is_ok() && !matches.is_present("NO_UI");
let browser_enabled = desktop_enabled && !matches.is_present("NO_BROWSER");
let preview_path = upend.db_path.join("previews");
let preview_dir = tempfile::tempdir().unwrap();
#[cfg(feature = "previews")]
let preview_store = Some(Arc::new(crate::previews::PreviewStore::new(
preview_path.clone(),
upend.clone(),
preview_dir.path(),
store.clone(),
)));
if matches.is_present("CLEAN") {
info!("Cleaning temporary directories...");
if preview_path.exists() {
std::fs::remove_dir_all(&preview_path).unwrap();
debug!("Removed {preview_path:?}");
} else {
debug!("No preview path exists, continuing...");
}
}
// if matches.is_present("CLEAN") {
// info!("Cleaning temporary directories...");
// if preview_path.exists() {
// std::fs::remove_dir_all(&preview_path).unwrap();
// debug!("Removed {preview_path:?}");
// } else {
// debug!("No preview path exists, continuing...");
// }
// }
#[cfg(not(feature = "previews"))]
let preview_store = None;
@ -193,6 +198,7 @@ fn main() -> Result<()> {
.into_owned()
}),
),
store,
job_container: job_container.clone(),
preview_store,
desktop_enabled,
@ -226,7 +232,6 @@ fn main() -> Result<()> {
.service(routes::list_hier)
.service(routes::list_hier_roots)
.service(routes::store_info)
.service(routes::get_file)
.service(routes::get_jobs)
.service(routes::get_info);
@ -257,14 +262,10 @@ fn main() -> Result<()> {
if !matches.is_present("NO_INITIAL_UPDATE") {
info!("Running initial update...");
let new = open_result.new;
// let new = open_result.new;
block_background::<_, _, anyhow::Error>(move || {
let _ = if new {
filesystem::rescan_vault(upend.clone(), job_container.clone(), false, true)
} else {
filesystem::rescan_vault(upend.clone(), job_container.clone(), true, false)
};
let _ = extractors::extract_all(upend, job_container);
let _ = state.store.update(&upend, job_container.clone());
let _ = extractors::extract_all(upend, state.store, job_container);
Ok(())
})
}

View File

@ -1,6 +1,7 @@
use crate::database::stores::UpStore;
use crate::util::hash::b58_encode;
use crate::util::hash::Hash;
use crate::util::jobs::{JobContainer, JobState};
use crate::{database::UpEndDatabase, util::hash::b58_encode};
use anyhow::{anyhow, Result};
use log::{debug, trace};
@ -27,17 +28,17 @@ pub trait Previewable {
}
pub struct PreviewStore {
path: PathBuf,
db: Arc<UpEndDatabase>,
store: Arc<Box<dyn UpStore + Send + Sync>>,
locks: Mutex<HashMap<Hash, Arc<Mutex<PathBuf>>>>,
}
#[cfg(feature = "previews")]
impl PreviewStore {
pub fn new<P: AsRef<Path>>(path: P, db: Arc<UpEndDatabase>) -> Self {
pub fn new<P: AsRef<Path>>(path: P, store: Arc<Box<dyn UpStore + Send + Sync>>) -> Self {
PreviewStore {
path: PathBuf::from(path.as_ref()),
db,
store,
locks: Mutex::new(HashMap::new()),
}
}
@ -71,12 +72,12 @@ impl PreviewStore {
Ok(Some(thumbpath.clone()))
} else {
trace!("Calculating preview for {hash:?}...");
let connection = self.db.connection()?;
let files = connection.retrieve_file(&hash)?;
let files = self.store.retrieve(&hash)?;
if let Some(file) = files.get(0) {
let file_path = file.get_file_path();
let mut job_handle = job_container.add_job(
None,
&format!("Creating preview for {:?}", file.path.file_name().unwrap()),
&format!("Creating preview for {:?}", file_path.file_name().unwrap()),
)?;
let mime_type = mime_type.into();
@ -84,18 +85,18 @@ impl PreviewStore {
let mime_type: Option<String> = if mime_type.is_some() {
mime_type
} else {
tree_magic_mini::from_filepath(&file.path).map(|m| m.into())
tree_magic_mini::from_filepath(file_path).map(|m| m.into())
};
let preview = match mime_type {
Some(tm) if tm.starts_with("text") => TextPath(&file.path).get_thumbnail(),
Some(tm) if tm.starts_with("text") => TextPath(file_path).get_thumbnail(),
Some(tm) if tm.starts_with("video") || tm == "application/x-matroska" => {
VideoPath(&file.path).get_thumbnail()
VideoPath(file_path).get_thumbnail()
}
Some(tm) if tm.starts_with("audio") || tm == "application/x-riff" => {
AudioPath(&file.path).get_thumbnail()
AudioPath(file_path).get_thumbnail()
}
Some(tm) if tm.starts_with("image") => ImagePath(&file.path).get_thumbnail(),
Some(tm) if tm.starts_with("image") => ImagePath(file_path).get_thumbnail(),
Some(unknown) => Err(anyhow!("No capability for {:?} thumbnails.", unknown)),
_ => Err(anyhow!("Unknown file type, or file doesn't exist.")),
};

View File

@ -3,12 +3,12 @@ use crate::database::constants::{ADDED_ATTR, LABEL_ATTR};
use crate::database::entry::{Entry, EntryValue, InvariantEntry};
use crate::database::hierarchies::{list_roots, resolve_path, UHierPath};
use crate::database::lang::Query;
use crate::database::stores::{Blob, UpStore};
use crate::database::UpEndDatabase;
use crate::extractors::{self};
use crate::filesystem::add_file;
use crate::previews::PreviewStore;
use crate::util::exec::block_background;
use crate::util::hash::{b58_decode, b58_encode, Hashable};
use crate::util::hash::{b58_decode, b58_encode};
use crate::util::jobs::JobContainer;
use actix_files::NamedFile;
use actix_multipart::Multipart;
@ -26,13 +26,11 @@ use futures_util::TryStreamExt;
use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, io};
use tempfile::NamedTempFile;
use uuid::Uuid;
@ -42,6 +40,7 @@ use is_executable::IsExecutable;
#[derive(Clone)]
pub struct State {
pub upend: Arc<UpEndDatabase>,
pub store: Arc<Box<dyn UpStore + Sync + Send>>,
pub vault_name: Option<String>,
pub job_container: JobContainer,
pub preview_store: Option<Arc<PreviewStore>>,
@ -140,13 +139,13 @@ pub async fn get_raw(
}
// Then, check the files
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _hash = hash.clone();
let files = web::block(move || connection.retrieve_file(_hash.as_ref()))
let _store = state.store.clone();
let blobs = web::block(move || _store.retrieve(_hash.as_ref()))
.await
.map_err(ErrorInternalServerError)?;
if let Some(file) = files.get(0) {
let file_path = state.upend.vault_path.join(&file.path);
if let Some(blob) = blobs.get(0) {
let file_path = blob.get_file_path();
if query.native.is_none() {
Ok(Either::A(
@ -185,12 +184,9 @@ pub async fn get_raw(
http::header::WARNING.to_string(),
);
file_path
.parent()
.ok_or_else(|| {
ErrorInternalServerError("No parent to open as fallback.")
})?
.to_path_buf()
file_path.parent().ok_or_else(|| {
ErrorInternalServerError("No parent to open as fallback.")
})?
};
opener::open(path).map_err(error::ErrorServiceUnavailable)?;
Ok(Either::B(response.finish()))
@ -441,9 +437,10 @@ pub async fn put_object(
let _address = address.clone();
let _job_container = state.job_container.clone();
let _store = state.store.clone();
block_background::<_, _, anyhow::Error>(move || {
let extract_result =
extractors::extract(&_address, &connection, _job_container);
extractors::extract(&_address, &connection, _store, _job_container);
if let Ok(entry_count) = extract_result {
debug!("Added {entry_count} extracted entries for {_address:?}");
} else {
@ -493,40 +490,17 @@ pub async fn put_object(
while let Some(chunk) = field.try_next().await? {
file = web::block(move || file.write_all(&chunk).map(|_| file)).await?;
}
let path = PathBuf::from(file.path());
let hash = web::block(move || path.hash()).await?;
let address = Address::Hash(hash.clone());
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _store = state.store.clone();
let _filename = filename.clone();
let hash = web::block(move || {
_store.store(connection, Blob::from_filepath(file.path()), _filename)
})
.await
.map_err(ErrorInternalServerError)?;
let _hash = hash.clone();
let existing_files = web::block(move || connection.retrieve_file(&_hash))
.await
.map_err(ErrorInternalServerError)?;
if existing_files.is_empty() {
let addr_str = b58_encode(address.encode().map_err(ErrorInternalServerError)?);
let final_name = if let Some(ref filename) = filename {
format!("{addr_str}_{filename}")
} else {
addr_str
};
let final_path = state.upend.vault_path.join(&final_name);
let (_, tmp_path) = file.keep().map_err(ErrorInternalServerError)?;
let final_path = web::block::<_, _, io::Error>(move || {
fs::copy(&tmp_path, &final_path)?;
fs::remove_file(tmp_path)?;
Ok(final_path)
})
.await?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
web::block(move || add_file(&connection, &final_path, hash))
.await
.map_err(ErrorInternalServerError)?;
}
let address = Address::Hash(hash);
if let Some(ref filename) = filename {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
@ -540,10 +514,11 @@ pub async fn put_object(
let _address = address.clone();
let _job_container = state.job_container.clone();
let _store = state.store.clone();
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
block_background::<_, _, anyhow::Error>(move || {
let extract_result =
extractors::extract(&_address, &connection, _job_container);
extractors::extract(&_address, &connection, _store, _job_container);
if let Ok(entry_count) = extract_result {
debug!("Added {entry_count} extracted entries for {_address:?}");
} else {
@ -718,27 +693,28 @@ pub async fn list_hier_roots(state: web::Data<State>) -> Result<HttpResponse, Er
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
}
#[derive(Deserialize)]
pub struct RescanRequest {
full: Option<String>,
}
// #[derive(Deserialize)]
// pub struct RescanRequest {
// full: Option<String>,
// }
#[post("/api/refresh")]
pub async fn api_refresh(
req: HttpRequest,
state: web::Data<State>,
web::Query(query): web::Query<RescanRequest>,
// web::Query(query): web::Query<RescanRequest>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
block_background::<_, _, anyhow::Error>(move || {
let _ = crate::filesystem::rescan_vault(
let _ = state
.store
.update(&state.upend, state.job_container.clone());
let _ = crate::extractors::extract_all(
state.upend.clone(),
state.store.clone(),
state.job_container.clone(),
query.full.is_none(),
false,
);
let _ = crate::extractors::extract_all(state.upend.clone(), state.job_container.clone());
Ok(())
});
Ok(HttpResponse::Ok().finish())
@ -746,68 +722,48 @@ pub async fn api_refresh(
#[get("/api/store")]
pub async fn store_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let files = web::block(move || connection.retrieve_all_files())
.await
.map_err(ErrorInternalServerError)?;
let mut files_by_hash = HashMap::new();
for file in &files {
if !files_by_hash.contains_key(&file.hash) {
files_by_hash.insert(&file.hash, vec![]);
}
files_by_hash.get_mut(&file.hash).unwrap().push(file);
}
// let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
// let files = web::block(move || connection.retrieve_all_files())
// .await
// .map_err(ErrorInternalServerError)?;
// let mut files_by_hash = HashMap::new();
// for file in &files {
// if !files_by_hash.contains_key(&file.hash) {
// files_by_hash.insert(&file.hash, vec![]);
// }
// files_by_hash.get_mut(&file.hash).unwrap().push(file);
// }
for paths in files_by_hash.values_mut() {
paths.sort_unstable_by_key(|f| !f.valid);
}
// for paths in files_by_hash.values_mut() {
// paths.sort_unstable_by_key(|f| !f.valid);
// }
let mut blobs = files_by_hash
.iter()
.map(|(hash, files)| {
json!({
"hash": hash,
"size": files[0].size,
"paths": files.iter().map(|f| json!({
"added": f.added,
"valid": f.valid,
"path": f.path
})).collect::<serde_json::Value>()
})
})
.collect::<Vec<serde_json::Value>>();
// let mut blobs = files_by_hash
// .iter()
// .map(|(hash, files)| {
// json!({
// "hash": hash,
// "size": files[0].size,
// "paths": files.iter().map(|f| json!({
// "added": f.added,
// "valid": f.valid,
// "path": f.path
// })).collect::<serde_json::Value>()
// })
// })
// .collect::<Vec<serde_json::Value>>();
blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap());
blobs.reverse();
// blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap());
// blobs.reverse();
Ok(HttpResponse::Ok().json(json!({
"totals": {
"count": files_by_hash.len(),
"size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::<u64>()
},
"blobs": blobs
})))
}
#[get("/api/store/{hash}")]
pub async fn get_file(
state: web::Data<State>,
hash: web::Path<String>,
) -> Result<HttpResponse, Error> {
let address =
Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?;
if let Address::Hash(hash) = address {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let response = web::block(move || connection.retrieve_file(&hash))
.await
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(response))
} else {
Err(ErrorBadRequest("Address does not refer to a file."))
}
// Ok(HttpResponse::Ok().json(json!({
// "totals": {
// "count": files_by_hash.len(),
// "size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::<u64>()
// },
// "blobs": blobs
// })))
todo!();
}
#[derive(Deserialize)]
@ -838,7 +794,7 @@ pub async fn get_jobs(
pub async fn get_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
Ok(HttpResponse::Ok().json(json!({
"name": state.vault_name,
"location": &*state.upend.vault_path,
// "location": &*state.store.path,
"version": crate::common::PKG_VERSION,
"desktop": state.desktop_enabled
})))