Compare commits
4 Commits
main
...
feat/vault
Author | SHA1 | Date |
---|---|---|
Tomáš Mládek | 54897f1468 | |
Tomáš Mládek | 0ced93adcc | |
Tomáš Mládek | 2c4a7f32e5 | |
Tomáš Mládek | a7b0a5c00a |
|
@ -1,2 +0,0 @@
|
|||
-- This file should undo anything in `up.sql`
|
||||
DROP TABLE vaults;
|
|
@ -1,5 +0,0 @@
|
|||
-- Your SQL goes here
|
||||
CREATE TABLE vaults (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
path VARCHAR NOT NULL
|
||||
)
|
|
@ -0,0 +1 @@
|
|||
DROP TABLE files;
|
|
@ -0,0 +1,13 @@
|
|||
CREATE TABLE files
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
hash BLOB NOT NULL,
|
||||
path VARCHAR NOT NULL,
|
||||
valid BOOLEAN NOT NULL DEFAULT TRUE,
|
||||
added DATETIME NOT NULL,
|
||||
size BIGINT NOT NULL,
|
||||
mtime DATETIME NULL
|
||||
);
|
||||
|
||||
CREATE INDEX files_hash ON files (hash);
|
||||
CREATE INDEX files_valid ON files (valid);
|
|
@ -1,4 +1,3 @@
|
|||
-- This file should undo anything in `up.sql`
|
||||
DROP TABLE meta;
|
||||
DROP TABLE files;
|
||||
DROP TABLE data;
|
|
@ -9,20 +9,6 @@ CREATE TABLE meta
|
|||
INSERT INTO meta (key, value)
|
||||
VALUES ('VERSION', '0');
|
||||
|
||||
CREATE TABLE files
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
hash BLOB NOT NULL,
|
||||
path VARCHAR NOT NULL,
|
||||
valid BOOLEAN NOT NULL DEFAULT TRUE,
|
||||
added DATETIME NOT NULL,
|
||||
size BIGINT NOT NULL,
|
||||
mtime DATETIME NULL
|
||||
);
|
||||
|
||||
CREATE INDEX files_hash ON files (hash);
|
||||
CREATE INDEX files_valid ON files (valid);
|
||||
|
||||
CREATE TABLE data
|
||||
(
|
||||
identity BLOB PRIMARY KEY NOT NULL,
|
|
@ -1,43 +1,5 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use chrono::NaiveDateTime;
|
||||
use serde::Serialize;
|
||||
|
||||
use super::schema::{data, files, meta};
|
||||
use crate::util::hash::Hash;
|
||||
|
||||
#[derive(Queryable, Serialize, Clone, Debug)]
|
||||
pub struct File {
|
||||
pub id: i32,
|
||||
pub hash: Hash,
|
||||
pub path: String,
|
||||
pub valid: bool,
|
||||
pub added: NaiveDateTime,
|
||||
pub size: i64,
|
||||
pub mtime: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
// todo - remove, try_from the actual model, impl queryable...
|
||||
#[derive(Serialize, Clone, Debug)]
|
||||
pub struct OutFile {
|
||||
pub id: i32,
|
||||
pub hash: Hash,
|
||||
pub path: PathBuf,
|
||||
pub valid: bool,
|
||||
pub added: NaiveDateTime,
|
||||
pub size: i64,
|
||||
pub mtime: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Insertable, Debug)]
|
||||
#[table_name = "files"]
|
||||
pub struct NewFile {
|
||||
pub hash: Vec<u8>,
|
||||
pub path: String,
|
||||
pub added: NaiveDateTime,
|
||||
pub size: i64,
|
||||
pub mtime: Option<NaiveDateTime>,
|
||||
}
|
||||
use super::schema::{data, meta};
|
||||
|
||||
#[derive(Queryable, Insertable, Serialize, Debug)]
|
||||
#[table_name = "data"]
|
||||
|
|
|
@ -9,19 +9,6 @@ table! {
|
|||
immutable -> Bool,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
files (id) {
|
||||
id -> Integer,
|
||||
hash -> Binary,
|
||||
path -> Text,
|
||||
valid -> Bool,
|
||||
added -> Timestamp,
|
||||
size -> BigInt,
|
||||
mtime -> Nullable<Timestamp>,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
meta (id) {
|
||||
id -> Integer,
|
||||
|
@ -30,4 +17,4 @@ table! {
|
|||
}
|
||||
}
|
||||
|
||||
allow_tables_to_appear_in_same_query!(data, files, meta,);
|
||||
allow_tables_to_appear_in_same_query!(data, meta,);
|
||||
|
|
|
@ -3,6 +3,8 @@
|
|||
#[macro_use]
|
||||
mod macros;
|
||||
|
||||
pub mod stores;
|
||||
|
||||
pub mod constants;
|
||||
pub mod engine;
|
||||
pub mod entry;
|
||||
|
@ -22,13 +24,12 @@ use crate::database::lang::Query;
|
|||
use crate::util::hash::Hash;
|
||||
use crate::util::LoggerSink;
|
||||
use anyhow::{anyhow, Result};
|
||||
use chrono::NaiveDateTime;
|
||||
use diesel::prelude::*;
|
||||
use diesel::r2d2::{self, ConnectionManager, PooledConnection};
|
||||
use diesel::result::{DatabaseErrorKind, Error};
|
||||
use diesel::sqlite::SqliteConnection;
|
||||
use hierarchies::initialize_hier;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::convert::TryFrom;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
@ -78,8 +79,7 @@ pub struct OpenResult {
|
|||
pub struct UpEndDatabase {
|
||||
pool: DbPool,
|
||||
lock: Arc<RwLock<()>>,
|
||||
pub vault_path: Arc<PathBuf>,
|
||||
pub db_path: Arc<PathBuf>,
|
||||
vault_path: Arc<PathBuf>
|
||||
}
|
||||
|
||||
pub const UPEND_SUBDIR: &str = ".upend";
|
||||
|
@ -122,7 +122,6 @@ impl UpEndDatabase {
|
|||
pool,
|
||||
lock: Arc::new(RwLock::new(())),
|
||||
vault_path: Arc::new(dirpath.as_ref().canonicalize()?),
|
||||
db_path: Arc::new(upend_path),
|
||||
};
|
||||
let connection = db.connection().unwrap();
|
||||
|
||||
|
@ -167,7 +166,6 @@ impl UpEndDatabase {
|
|||
Ok(UpEndConnection {
|
||||
conn: self.pool.get()?,
|
||||
lock: self.lock.clone(),
|
||||
vault_path: self.vault_path.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -175,7 +173,6 @@ impl UpEndDatabase {
|
|||
pub struct UpEndConnection {
|
||||
conn: PooledConnection<ConnectionManager<SqliteConnection>>,
|
||||
lock: Arc<RwLock<()>>,
|
||||
vault_path: Arc<PathBuf>,
|
||||
}
|
||||
|
||||
impl UpEndConnection {
|
||||
|
@ -214,94 +211,6 @@ impl UpEndConnection {
|
|||
.map(|mv| mv.value.clone())
|
||||
}
|
||||
|
||||
pub fn insert_file(&self, file: models::NewFile) -> Result<u32> {
|
||||
use crate::database::inner::schema::files;
|
||||
|
||||
debug!(
|
||||
"Inserting {} ({})...",
|
||||
&file.path,
|
||||
Address::Hash(Hash((&file.hash).clone()))
|
||||
);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
|
||||
diesel::insert_into(files::table)
|
||||
.values(&file)
|
||||
.execute(&self.conn)?;
|
||||
|
||||
Ok(files::dsl::files
|
||||
.filter(files::dsl::valid.eq(true))
|
||||
.filter(files::dsl::hash.eq(file.hash))
|
||||
.count()
|
||||
.first::<i64>(&self.conn)?
|
||||
.try_into()
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
pub fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<models::OutFile>> {
|
||||
use crate::database::inner::schema::files::dsl::*;
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
|
||||
let matches = files
|
||||
.filter(valid.eq(true))
|
||||
.filter(hash.eq(&obj_hash.0))
|
||||
.load::<models::File>(&self.conn)?;
|
||||
|
||||
let matches = matches
|
||||
.into_iter()
|
||||
.map(|f| models::OutFile {
|
||||
id: f.id,
|
||||
hash: f.hash,
|
||||
path: self.vault_path.join(PathBuf::from(f.path)),
|
||||
valid: f.valid,
|
||||
added: f.added,
|
||||
size: f.size,
|
||||
mtime: f.mtime,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn retrieve_all_files(&self) -> Result<Vec<models::File>> {
|
||||
use crate::database::inner::schema::files::dsl::*;
|
||||
let _lock = self.lock.read().unwrap();
|
||||
let matches = files.load::<models::File>(&self.conn)?;
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn file_update_mtime(&self, file_id: i32, m_time: Option<NaiveDateTime>) -> Result<usize> {
|
||||
use crate::database::inner::schema::files::dsl::*;
|
||||
|
||||
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(mtime.eq(m_time))
|
||||
.execute(&self.conn)?)
|
||||
}
|
||||
|
||||
pub fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result<usize> {
|
||||
use crate::database::inner::schema::files::dsl::*;
|
||||
|
||||
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(valid.eq(is_valid))
|
||||
.execute(&self.conn)?)
|
||||
}
|
||||
|
||||
pub fn normalize_path(&self, path: &Path) -> Result<PathBuf> {
|
||||
Ok(path
|
||||
.canonicalize()?
|
||||
.strip_prefix(self.vault_path.as_path())?
|
||||
.to_path_buf())
|
||||
}
|
||||
|
||||
pub fn retrieve_entry(&self, hash: &Hash) -> Result<Option<Entry>> {
|
||||
use crate::database::inner::schema::data::dsl::*;
|
||||
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use crate::util::hash::Hash;
|
||||
use chrono::NaiveDateTime;
|
||||
use diesel::Queryable;
|
||||
use serde::Serialize;
|
||||
|
||||
table! {
|
||||
files (id) {
|
||||
id -> Integer,
|
||||
hash -> Binary,
|
||||
path -> Text,
|
||||
valid -> Bool,
|
||||
added -> Timestamp,
|
||||
size -> BigInt,
|
||||
mtime -> Nullable<Timestamp>,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Queryable, Serialize, Clone, Debug)]
|
||||
pub struct File {
|
||||
pub id: i32,
|
||||
pub hash: Hash,
|
||||
pub path: String,
|
||||
pub valid: bool,
|
||||
pub added: NaiveDateTime,
|
||||
pub size: i64,
|
||||
pub mtime: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
// todo - remove, try_from the actual model, impl queryable...
|
||||
#[derive(Serialize, Clone, Debug)]
|
||||
pub struct OutFile {
|
||||
pub id: i32,
|
||||
pub hash: Hash,
|
||||
pub path: PathBuf,
|
||||
pub valid: bool,
|
||||
pub added: NaiveDateTime,
|
||||
pub size: i64,
|
||||
pub mtime: Option<NaiveDateTime>,
|
||||
}
|
||||
|
||||
#[derive(Insertable, Debug)]
|
||||
#[table_name = "files"]
|
||||
pub struct NewFile {
|
||||
pub hash: Vec<u8>,
|
||||
pub path: String,
|
||||
pub added: NaiveDateTime,
|
||||
pub size: i64,
|
||||
pub mtime: Option<NaiveDateTime>,
|
||||
}
|
|
@ -0,0 +1,805 @@
|
|||
use self::db::files;
|
||||
|
||||
use super::{Blob, StoreError, UpStore, UpdatePathOutcome};
|
||||
use crate::addressing::Address;
|
||||
use crate::database::constants::{
|
||||
ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR,
|
||||
TYPE_HAS_ATTR,
|
||||
};
|
||||
use crate::database::entry::{Entry, InvariantEntry};
|
||||
use crate::database::hierarchies::{
|
||||
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
|
||||
};
|
||||
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
|
||||
use crate::util::hash::{b58_encode, Hash, Hashable};
|
||||
use crate::util::jobs::{JobContainer, JobHandle};
|
||||
use anyhow::{anyhow, Error, Result};
|
||||
use chrono::prelude::*;
|
||||
use diesel::r2d2::{ConnectionManager, ManageConnection};
|
||||
use diesel::ExpressionMethods;
|
||||
use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection};
|
||||
use log::{debug, error, info, warn};
|
||||
use lru::LruCache;
|
||||
use rayon::prelude::*;
|
||||
use std::borrow::Borrow;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Component, Path};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::{Instant, SystemTime, UNIX_EPOCH};
|
||||
use std::{fs, iter};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
mod db;
|
||||
|
||||
const BLOB_TYPE: &str = "BLOB";
|
||||
const ALIAS_KEY: &str = "ALIAS";
|
||||
pub const FILE_MIME_KEY: &str = "FILE_MIME";
|
||||
const FILE_MTIME_KEY: &str = "FILE_MTIME";
|
||||
const FILE_SIZE_KEY: &str = "FILE_SIZE";
|
||||
|
||||
lazy_static! {
|
||||
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
|
||||
attribute: String::from(TYPE_BASE_ATTR),
|
||||
value: BLOB_TYPE.into(),
|
||||
};
|
||||
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
||||
}
|
||||
|
||||
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
|
||||
|
||||
impl Drop for PragmaSynchronousGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
debug!("Re-enabling synchronous mode.");
|
||||
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
|
||||
if let Err(err) = res {
|
||||
error!(
|
||||
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FsStore {
|
||||
path: PathBuf,
|
||||
manager: ConnectionManager<SqliteConnection>,
|
||||
lock: Arc<RwLock<()>>,
|
||||
}
|
||||
|
||||
impl FsStore {
|
||||
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<FsStore> {
|
||||
let path = path.as_ref().to_path_buf().canonicalize()?;
|
||||
let manager = ConnectionManager::<SqliteConnection>::new(
|
||||
path.join(UPEND_SUBDIR)
|
||||
.join("upend_vault.sqlite3")
|
||||
.to_str()
|
||||
.unwrap(),
|
||||
);
|
||||
let connection = manager.connect()?;
|
||||
|
||||
// while diesel doesn't support multiple embedded migrations...
|
||||
connection.execute(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS files
|
||||
(
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
hash BLOB NOT NULL,
|
||||
path VARCHAR NOT NULL,
|
||||
valid BOOLEAN NOT NULL DEFAULT TRUE,
|
||||
added DATETIME NOT NULL,
|
||||
size BIGINT NOT NULL,
|
||||
mtime DATETIME NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS files_hash ON files (hash);
|
||||
CREATE INDEX IF NOT EXISTS files_valid ON files (valid);
|
||||
|
||||
"#,
|
||||
)?;
|
||||
|
||||
let lock = Arc::new(RwLock::new(()));
|
||||
|
||||
Ok(FsStore {
|
||||
path,
|
||||
manager,
|
||||
lock,
|
||||
})
|
||||
}
|
||||
|
||||
fn rescan_vault<D: Borrow<UpEndDatabase>>(
|
||||
&self,
|
||||
db: D,
|
||||
job_handle: JobHandle,
|
||||
quick_check: bool,
|
||||
disable_synchronous: bool,
|
||||
) -> Result<Vec<UpdatePathOutcome>> {
|
||||
let start = Instant::now();
|
||||
info!("Vault rescan started.");
|
||||
|
||||
let db = db.borrow();
|
||||
let connection = db.connection()?;
|
||||
|
||||
// Initialize types, etc...
|
||||
debug!("Initializing DB types.");
|
||||
connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
|
||||
upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
|
||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
|
||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
|
||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
|
||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
|
||||
|
||||
// Disable syncing in SQLite for the duration of the import
|
||||
let mut _guard: Option<PragmaSynchronousGuard> = None;
|
||||
if disable_synchronous {
|
||||
debug!("Disabling SQLite synchronous mode");
|
||||
connection.execute("PRAGMA synchronous = OFF;")?;
|
||||
_guard = Some(PragmaSynchronousGuard(&connection));
|
||||
}
|
||||
|
||||
// Walk through the vault, find all paths
|
||||
debug!("Traversing vault directory");
|
||||
let absolute_dir_path = fs::canonicalize(&*db.vault_path)?;
|
||||
let path_entries: Vec<PathBuf> = WalkDir::new(&*db.vault_path)
|
||||
.follow_links(true)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter_map(|e| fs::canonicalize(e.into_path()).ok())
|
||||
.filter(|e| e.is_file())
|
||||
.filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR)))
|
||||
.collect();
|
||||
|
||||
// Prepare for processing
|
||||
let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?));
|
||||
|
||||
// Actual processing
|
||||
let count = RwLock::new(0_usize);
|
||||
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
|
||||
let total = path_entries.len() as f32;
|
||||
let shared_job_handle = Arc::new(Mutex::new(job_handle));
|
||||
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
|
||||
.into_par_iter()
|
||||
.map(|path| {
|
||||
let result = self.process_directory_entry(
|
||||
db.connection().unwrap(),
|
||||
&resolve_cache,
|
||||
path.clone(),
|
||||
&existing_files,
|
||||
quick_check,
|
||||
);
|
||||
|
||||
let mut cnt = count.write().unwrap();
|
||||
*cnt += 1;
|
||||
|
||||
let _ = shared_job_handle
|
||||
.lock()
|
||||
.unwrap()
|
||||
.update_progress(*cnt as f32 / total * 100.0);
|
||||
|
||||
match result {
|
||||
Ok(result) => result,
|
||||
Err(error) => {
|
||||
error!("Failed to update {:?} ({})", path, error);
|
||||
UpdatePathOutcome::Failed(path, StoreError::Unknown(error.to_string()))
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
debug!("Processing done, cleaning up...");
|
||||
|
||||
let existing_files = existing_files.read().unwrap();
|
||||
|
||||
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
|
||||
let trans_result = connection.transaction::<_, Error, _>(|| {
|
||||
self.file_set_valid(file.id, false)?;
|
||||
// remove_object(&connection, )?
|
||||
Ok(())
|
||||
});
|
||||
|
||||
match trans_result {
|
||||
Ok(_) => {
|
||||
info!("Removed: {:?}", file.path);
|
||||
UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))
|
||||
}
|
||||
Err(error) => UpdatePathOutcome::Failed(
|
||||
PathBuf::from(file.path.clone()),
|
||||
StoreError::Unknown(error.to_string()),
|
||||
),
|
||||
}
|
||||
});
|
||||
|
||||
// Re-enable SQLite syncing
|
||||
drop(_guard);
|
||||
|
||||
// Reporting
|
||||
let all_outcomes = path_outcomes
|
||||
.into_iter()
|
||||
.chain(cleanup_results)
|
||||
.collect::<Vec<UpdatePathOutcome>>();
|
||||
|
||||
let mut failed: Vec<(&PathBuf, &StoreError)> = vec![];
|
||||
let mut created = 0;
|
||||
let mut unchanged = 0;
|
||||
let mut deleted = 0;
|
||||
|
||||
for outcome in &all_outcomes {
|
||||
match outcome {
|
||||
UpdatePathOutcome::Added(_) => created += 1,
|
||||
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
|
||||
UpdatePathOutcome::Removed(_) => deleted += 1,
|
||||
UpdatePathOutcome::Failed(path, err) => failed.push((path, err)),
|
||||
}
|
||||
}
|
||||
|
||||
if !failed.is_empty() {
|
||||
warn!(
|
||||
"{} path updates failed! ({})",
|
||||
failed.len(),
|
||||
failed
|
||||
.iter()
|
||||
.map(|(path, error)| format!("{:?}: {}", path, error))
|
||||
.collect::<Vec<String>>()
|
||||
.join(", ")
|
||||
)
|
||||
}
|
||||
|
||||
info!(
|
||||
"Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.",
|
||||
db.vault_path,
|
||||
created,
|
||||
deleted,
|
||||
unchanged,
|
||||
start.elapsed().as_secs()
|
||||
);
|
||||
|
||||
Ok(all_outcomes)
|
||||
}
|
||||
|
||||
fn process_directory_entry(
|
||||
&self,
|
||||
connection: UpEndConnection,
|
||||
resolve_cache: &Arc<Mutex<ResolveCache>>,
|
||||
path: PathBuf,
|
||||
existing_files: &Arc<RwLock<Vec<db::File>>>,
|
||||
quick_check: bool,
|
||||
) -> Result<UpdatePathOutcome> {
|
||||
debug!("Processing: {:?}", path);
|
||||
|
||||
// Prepare the data
|
||||
let existing_files = Arc::clone(existing_files);
|
||||
|
||||
let normalized_path = self.normalize_path(&path)?;
|
||||
let normalized_path_str = normalized_path
|
||||
.to_str()
|
||||
.ok_or(anyhow!("Path not valid unicode!"))?;
|
||||
|
||||
let mut file_hash: Option<Hash> = None;
|
||||
|
||||
// Get size & mtime for quick comparison
|
||||
let metadata = fs::metadata(&path)?;
|
||||
let size = metadata.len() as i64;
|
||||
if size < 0 {
|
||||
panic!("File {} too large?!", path.display());
|
||||
}
|
||||
let mtime = metadata
|
||||
.modified()
|
||||
.map(|t| {
|
||||
NaiveDateTime::from_timestamp(
|
||||
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
|
||||
0,
|
||||
)
|
||||
})
|
||||
.ok();
|
||||
|
||||
// Check if the path entry for this file already exists in database
|
||||
let existing_files_read = existing_files.read().unwrap();
|
||||
let maybe_existing_file = existing_files_read
|
||||
.iter()
|
||||
.find(|file| file.path == normalized_path_str);
|
||||
|
||||
if let Some(existing_file) = maybe_existing_file {
|
||||
let existing_file = existing_file.clone();
|
||||
drop(existing_files_read);
|
||||
|
||||
if !quick_check || size == existing_file.size {
|
||||
let same_mtime = mtime.is_some() && mtime == existing_file.mtime;
|
||||
let mut same_hash = false;
|
||||
|
||||
if !quick_check || !same_mtime {
|
||||
file_hash = Some(path.hash()?);
|
||||
same_hash = file_hash.as_ref().unwrap() == &existing_file.hash;
|
||||
}
|
||||
|
||||
if same_mtime || same_hash {
|
||||
if mtime != existing_file.mtime {
|
||||
self.file_update_mtime(existing_file.id, mtime)?;
|
||||
}
|
||||
|
||||
if !existing_file.valid {
|
||||
self.file_set_valid(existing_file.id, true)?;
|
||||
}
|
||||
|
||||
let mut existing_files_write = existing_files.write().unwrap();
|
||||
let maybe_existing_file = existing_files_write
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, file)| file.path == normalized_path_str)
|
||||
.map(|(idx, _)| idx);
|
||||
|
||||
if let Some(idx) = maybe_existing_file {
|
||||
existing_files_write.swap_remove(idx);
|
||||
debug!("Unchanged: {:?}", path);
|
||||
return Ok(UpdatePathOutcome::Unchanged(path));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
drop(existing_files_read);
|
||||
}
|
||||
|
||||
// If not, add it!
|
||||
if file_hash.is_none() {
|
||||
file_hash = Some(path.hash()?);
|
||||
}
|
||||
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
|
||||
|
||||
self.insert_file_with_metadata(
|
||||
&connection,
|
||||
&normalized_path,
|
||||
file_hash.unwrap(),
|
||||
size,
|
||||
mtime,
|
||||
mime_type,
|
||||
Some(resolve_cache),
|
||||
)
|
||||
.map(|_| {
|
||||
info!("Added: {:?}", path);
|
||||
UpdatePathOutcome::Added(path.clone())
|
||||
})
|
||||
}
|
||||
|
||||
fn add_file(&self, connection: &UpEndConnection, path: &Path, hash: Hash) -> Result<Address> {
|
||||
let normalized_path = self.normalize_path(path)?;
|
||||
let metadata = fs::metadata(&path)?;
|
||||
let size = metadata.len() as i64;
|
||||
let mtime = metadata
|
||||
.modified()
|
||||
.map(|t| {
|
||||
NaiveDateTime::from_timestamp(
|
||||
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
|
||||
0,
|
||||
)
|
||||
})
|
||||
.ok();
|
||||
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
|
||||
|
||||
self.insert_file_with_metadata(
|
||||
connection,
|
||||
&normalized_path,
|
||||
hash,
|
||||
size,
|
||||
mtime,
|
||||
mime_type,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn insert_file_with_metadata(
|
||||
&self,
|
||||
connection: &UpEndConnection,
|
||||
normalized_path: &Path,
|
||||
hash: Hash,
|
||||
size: i64,
|
||||
mtime: Option<NaiveDateTime>,
|
||||
mime_type: Option<String>,
|
||||
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
|
||||
) -> Result<Address> {
|
||||
let new_file = db::NewFile {
|
||||
path: normalized_path
|
||||
.to_str()
|
||||
.ok_or(anyhow!("Path not UTF-8?!"))?
|
||||
.to_string(),
|
||||
hash: (hash.clone()).0,
|
||||
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
||||
size,
|
||||
mtime,
|
||||
};
|
||||
|
||||
let blob_address = Address::Hash(hash);
|
||||
|
||||
// Metadata
|
||||
let type_entry = Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: String::from(IS_OF_TYPE_ATTR),
|
||||
value: BLOB_TYPE_ADDR.clone().into(),
|
||||
};
|
||||
|
||||
let size_entry = Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: FILE_SIZE_KEY.to_string(),
|
||||
value: (size as f64).into(),
|
||||
};
|
||||
|
||||
let mime_entry = mime_type.map(|mime_type| Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: FILE_MIME_KEY.to_string(),
|
||||
value: mime_type.into(),
|
||||
});
|
||||
|
||||
let added_entry = Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: ADDED_ATTR.to_string(),
|
||||
value: (SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as f64)
|
||||
.into(),
|
||||
};
|
||||
|
||||
// Add the appropriate entries w/r/t virtual filesystem location
|
||||
let components = normalized_path.components().collect::<Vec<Component>>();
|
||||
let (filename, dir_path) = components.split_last().unwrap();
|
||||
|
||||
let upath = UHierPath(
|
||||
iter::once(UNode::new("NATIVE").unwrap())
|
||||
.chain(dir_path.iter().map(|component| {
|
||||
UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap()
|
||||
}))
|
||||
.collect(),
|
||||
);
|
||||
let resolved_path = match resolve_cache {
|
||||
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
|
||||
None => resolve_path(connection, &upath, true)?,
|
||||
};
|
||||
let parent_dir = resolved_path.last().unwrap();
|
||||
|
||||
// Insert all
|
||||
let file_count = self.insert_file(new_file)?;
|
||||
|
||||
connection.insert_entry_immutable(type_entry)?;
|
||||
connection.insert_entry_immutable(size_entry)?;
|
||||
if file_count == 1 {
|
||||
connection.insert_entry_immutable(added_entry)?;
|
||||
}
|
||||
if let Some(mime_entry) = mime_entry {
|
||||
connection.insert_entry(mime_entry)?;
|
||||
}
|
||||
|
||||
let dir_has_entry = Entry {
|
||||
entity: parent_dir.clone(),
|
||||
attribute: HIER_HAS_ATTR.to_string(),
|
||||
value: blob_address.clone().into(),
|
||||
};
|
||||
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
|
||||
|
||||
let label_entry = Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: LABEL_ATTR.to_string(),
|
||||
value: filename.as_os_str().to_string_lossy().to_string().into(),
|
||||
};
|
||||
let label_entry_addr = connection.insert_entry(label_entry)?;
|
||||
|
||||
let alias_entry = Entry {
|
||||
entity: dir_has_entry_addr,
|
||||
attribute: ALIAS_KEY.to_string(),
|
||||
value: label_entry_addr.into(),
|
||||
};
|
||||
connection.insert_entry(alias_entry)?;
|
||||
|
||||
Ok(blob_address)
|
||||
}
|
||||
|
||||
pub fn insert_file(&self, file: db::NewFile) -> Result<u32> {
|
||||
debug!(
|
||||
"Inserting {} ({})...",
|
||||
&file.path,
|
||||
Address::Hash(Hash((&file.hash).clone()))
|
||||
);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
let conn = self.manager.connect()?;
|
||||
|
||||
diesel::insert_into(files::table)
|
||||
.values(&file)
|
||||
.execute(&conn)?;
|
||||
|
||||
Ok(files::dsl::files
|
||||
.filter(files::dsl::valid.eq(true))
|
||||
.filter(files::dsl::hash.eq(file.hash))
|
||||
.count()
|
||||
.first::<i64>(&conn)?
|
||||
.try_into()
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<db::OutFile>> {
|
||||
use self::db::files::dsl::*;
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
let conn = self.manager.connect()?;
|
||||
|
||||
let matches = files
|
||||
.filter(valid.eq(true))
|
||||
.filter(hash.eq(&obj_hash.0))
|
||||
.load::<db::File>(&conn)?;
|
||||
|
||||
let matches = matches
|
||||
.into_iter()
|
||||
.map(|f| db::OutFile {
|
||||
id: f.id,
|
||||
hash: f.hash,
|
||||
path: self.path.join(PathBuf::from(f.path)),
|
||||
valid: f.valid,
|
||||
added: f.added,
|
||||
size: f.size,
|
||||
mtime: f.mtime,
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
fn retrieve_all_files(&self) -> Result<Vec<db::File>> {
|
||||
use self::db::files::dsl::*;
|
||||
|
||||
let _lock = self.lock.read().unwrap();
|
||||
let conn = self.manager.connect()?;
|
||||
|
||||
let matches = files.load::<db::File>(&conn)?;
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
fn file_update_mtime(&self, file_id: i32, m_time: Option<NaiveDateTime>) -> Result<usize> {
|
||||
use self::db::files::dsl::*;
|
||||
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
let conn = self.manager.connect()?;
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(mtime.eq(m_time))
|
||||
.execute(&conn)?)
|
||||
}
|
||||
|
||||
fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result<usize> {
|
||||
use self::db::files::dsl::*;
|
||||
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
||||
|
||||
let _lock = self.lock.write().unwrap();
|
||||
let conn = self.manager.connect()?;
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(valid.eq(is_valid))
|
||||
.execute(&conn)?)
|
||||
}
|
||||
|
||||
fn normalize_path(&self, path: &Path) -> Result<PathBuf> {
|
||||
Ok(path
|
||||
.canonicalize()?
|
||||
.strip_prefix(self.path.as_path())?
|
||||
.to_path_buf())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::OutFile> for Blob {
|
||||
fn from(of: db::OutFile) -> Self {
|
||||
Blob { file_path: of.path }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<db::File> for Blob {
|
||||
fn from(f: db::File) -> Self {
|
||||
Blob {
|
||||
file_path: PathBuf::from(f.path),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UpStore for FsStore {
|
||||
fn retrieve(&self, hash: &crate::util::hash::Hash) -> Result<Vec<Blob>, super::StoreError> {
|
||||
Ok(self
|
||||
.retrieve_file(hash)
|
||||
.map_err(|e| StoreError::Unknown(e.to_string()))?
|
||||
.into_iter()
|
||||
.map(Blob::from)
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn retrieve_all(&self) -> Result<Vec<Blob>, super::StoreError> {
|
||||
Ok(self
|
||||
.retrieve_all_files()
|
||||
.map_err(|e| StoreError::Unknown(e.to_string()))?
|
||||
.into_iter()
|
||||
.map(Blob::from)
|
||||
.collect())
|
||||
}
|
||||
|
||||
fn store(
|
||||
&self,
|
||||
connection: UpEndConnection,
|
||||
blob: Blob,
|
||||
name_hint: Option<String>,
|
||||
) -> Result<Hash, super::StoreError> {
|
||||
let file_path = blob.get_file_path();
|
||||
let hash = file_path
|
||||
.hash()
|
||||
.map_err(|e| StoreError::Unknown(e.to_string()))?;
|
||||
|
||||
let existing_files = self.retrieve(&hash)?;
|
||||
|
||||
if existing_files.is_empty() {
|
||||
let address = Address::Hash(hash.clone());
|
||||
let addr_str = b58_encode(
|
||||
address
|
||||
.encode()
|
||||
.map_err(|e| StoreError::Unknown(e.to_string()))?,
|
||||
);
|
||||
|
||||
let final_name = if let Some(name_hint) = name_hint {
|
||||
format!("{addr_str}_{name_hint}")
|
||||
} else {
|
||||
addr_str
|
||||
};
|
||||
|
||||
let final_path = self.path.join(&final_name);
|
||||
|
||||
fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
|
||||
|
||||
self.add_file(&connection, &final_path, hash.clone())
|
||||
.map_err(|e| StoreError::Unknown(e.to_string()))?;
|
||||
}
|
||||
|
||||
Ok(hash)
|
||||
}
|
||||
|
||||
fn update(
|
||||
&self,
|
||||
db: &UpEndDatabase,
|
||||
mut job_container: JobContainer,
|
||||
) -> Result<Vec<UpdatePathOutcome>, StoreError> {
|
||||
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
|
||||
|
||||
match job_result {
|
||||
Ok(job_handle) => {
|
||||
let result = self.rescan_vault(db, job_handle, true, false);
|
||||
|
||||
if let Err(err) = &result {
|
||||
error!("Update did not succeed! {:?}", err);
|
||||
}
|
||||
|
||||
result.map_err(|err| StoreError::Unknown(err.to_string()))
|
||||
}
|
||||
Err(err) => Err(StoreError::Unknown(err.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::database::UpEndDatabase;
|
||||
use crate::util::jobs::JobContainer;
|
||||
|
||||
use super::*;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use std::sync::Once;
|
||||
use tracing_subscriber::filter::EnvFilter;
|
||||
|
||||
static INIT: Once = Once::new();
|
||||
|
||||
pub fn initialize() {
|
||||
INIT.call_once(|| {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(EnvFilter::builder().from_env_lossy())
|
||||
.init();
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_update() {
|
||||
// Prepare temporary filesystem structure
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("my-temporary-note.txt");
|
||||
let mut tmp_file = File::create(file_path).unwrap();
|
||||
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("hello-world.txt");
|
||||
let mut tmp_file = File::create(file_path).unwrap();
|
||||
writeln!(tmp_file, "Hello, World!").unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("empty");
|
||||
File::create(file_path).unwrap();
|
||||
|
||||
// Initialize database
|
||||
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
|
||||
let store = FsStore::from_path(&temp_dir).unwrap();
|
||||
let job_container = JobContainer::new();
|
||||
|
||||
// Store scan
|
||||
let rescan_result = store.update(&open_result.db, job_container.clone());
|
||||
assert!(rescan_result.is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rescan_quick() {
|
||||
initialize();
|
||||
_test_rescan(true)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rescan_full() {
|
||||
initialize();
|
||||
_test_rescan(false)
|
||||
}
|
||||
|
||||
fn _test_rescan(quick: bool) {
|
||||
// Prepare temporary filesystem structure
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("my-temporary-note.txt");
|
||||
let mut tmp_file = File::create(file_path).unwrap();
|
||||
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("hello-world.txt");
|
||||
let mut tmp_file = File::create(file_path).unwrap();
|
||||
writeln!(tmp_file, "Hello, World!").unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("empty");
|
||||
File::create(file_path).unwrap();
|
||||
|
||||
// Initialize database
|
||||
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
|
||||
let store = FsStore::from_path(&temp_dir).unwrap();
|
||||
let mut job_container = JobContainer::new();
|
||||
|
||||
// Initial scan
|
||||
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
|
||||
let rescan_result = store.rescan_vault(&open_result.db, job, quick, true);
|
||||
|
||||
assert!(rescan_result.is_ok());
|
||||
let rescan_result = rescan_result.unwrap();
|
||||
assert_eq!(rescan_result.len(), 3);
|
||||
rescan_result
|
||||
.into_iter()
|
||||
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_))));
|
||||
|
||||
// Modification-less rescan
|
||||
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
|
||||
let rescan_result = store.rescan_vault(&open_result.db, job, quick, false);
|
||||
|
||||
assert!(rescan_result.is_ok());
|
||||
let rescan_result = rescan_result.unwrap();
|
||||
assert_eq!(rescan_result.len(), 3);
|
||||
rescan_result
|
||||
.into_iter()
|
||||
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_))));
|
||||
|
||||
// Remove a file
|
||||
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
|
||||
|
||||
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
|
||||
let rescan_result = store.rescan_vault(&open_result.db, job, quick, false);
|
||||
|
||||
assert!(rescan_result.is_ok());
|
||||
let rescan_result = rescan_result.unwrap();
|
||||
assert_eq!(rescan_result.len(), 3);
|
||||
assert_eq!(
|
||||
2,
|
||||
rescan_result
|
||||
.iter()
|
||||
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
|
||||
.count()
|
||||
);
|
||||
assert_eq!(
|
||||
1,
|
||||
rescan_result
|
||||
.iter()
|
||||
.filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_)))
|
||||
.count()
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
use std::path::{Path, PathBuf};
|
||||
|
||||
use super::{UpEndConnection, UpEndDatabase};
|
||||
use crate::util::{hash::Hash, jobs::JobContainer};
|
||||
|
||||
pub mod fs;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum StoreError {
|
||||
Unknown(String),
|
||||
}
|
||||
|
||||
impl std::fmt::Display for StoreError {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"STORE ERROR: {}",
|
||||
match self {
|
||||
StoreError::Unknown(err) => err,
|
||||
}
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::error::Error for StoreError {}
|
||||
|
||||
type Result<T> = std::result::Result<T, StoreError>;
|
||||
|
||||
pub struct Blob {
|
||||
file_path: PathBuf,
|
||||
}
|
||||
|
||||
impl Blob {
|
||||
pub fn from_filepath<P: AsRef<Path>>(path: P) -> Blob {
|
||||
Blob {
|
||||
file_path: PathBuf::from(path.as_ref()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_file_path(&self) -> &Path {
|
||||
self.file_path.as_path()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum UpdatePathOutcome {
|
||||
Added(PathBuf),
|
||||
Unchanged(PathBuf),
|
||||
Removed(PathBuf),
|
||||
Failed(PathBuf, StoreError),
|
||||
}
|
||||
pub trait UpStore {
|
||||
fn retrieve(&self, hash: &Hash) -> Result<Vec<Blob>>;
|
||||
fn retrieve_all(&self) -> Result<Vec<Blob>>;
|
||||
fn store(
|
||||
&self,
|
||||
connection: UpEndConnection,
|
||||
blob: Blob,
|
||||
name_hint: Option<String>,
|
||||
) -> Result<Hash>;
|
||||
fn update(
|
||||
&self,
|
||||
database: &UpEndDatabase,
|
||||
job_container: JobContainer,
|
||||
) -> Result<Vec<UpdatePathOutcome>>;
|
||||
}
|
|
@ -1,12 +1,14 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use super::Extractor;
|
||||
use crate::{
|
||||
addressing::Address,
|
||||
database::{
|
||||
constants,
|
||||
entry::{Entry, EntryValue},
|
||||
stores::{fs::FILE_MIME_KEY, UpStore},
|
||||
UpEndConnection,
|
||||
},
|
||||
filesystem::FILE_MIME_KEY,
|
||||
util::jobs::{JobContainer, JobState},
|
||||
};
|
||||
use anyhow::{anyhow, Result};
|
||||
|
@ -18,6 +20,7 @@ impl Extractor for ID3Extractor {
|
|||
&self,
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
store: Arc<Box<dyn UpStore + Send + Sync>>,
|
||||
mut job_container: JobContainer,
|
||||
) -> Result<Vec<Entry>> {
|
||||
if let Address::Hash(hash) = address {
|
||||
|
@ -34,14 +37,15 @@ impl Extractor for ID3Extractor {
|
|||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let files = connection.retrieve_file(hash)?;
|
||||
let files = store.retrieve(hash)?;
|
||||
|
||||
if let Some(file) = files.get(0) {
|
||||
let file_path = file.get_file_path();
|
||||
let mut job_handle = job_container.add_job(
|
||||
None,
|
||||
&format!(
|
||||
r#"Getting ID3 info from "{:}""#,
|
||||
file.path
|
||||
file_path
|
||||
.components()
|
||||
.last()
|
||||
.unwrap()
|
||||
|
@ -50,7 +54,7 @@ impl Extractor for ID3Extractor {
|
|||
),
|
||||
)?;
|
||||
|
||||
let tags = id3::Tag::read_from_path(&file.path)?;
|
||||
let tags = id3::Tag::read_from_path(&file_path)?;
|
||||
|
||||
let result: Vec<Entry> = tags
|
||||
.frames()
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use crate::{
|
||||
addressing::Address,
|
||||
database::{entry::Entry, UpEndConnection, UpEndDatabase},
|
||||
database::{entry::Entry, stores::UpStore, UpEndConnection, UpEndDatabase},
|
||||
util::jobs::JobContainer,
|
||||
};
|
||||
use anyhow::Result;
|
||||
|
@ -25,6 +25,7 @@ pub trait Extractor {
|
|||
&self,
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
store: Arc<Box<dyn UpStore + Send + Sync>>,
|
||||
job_container: JobContainer,
|
||||
) -> Result<Vec<Entry>>;
|
||||
|
||||
|
@ -36,10 +37,11 @@ pub trait Extractor {
|
|||
&self,
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
store: Arc<Box<dyn UpStore + Send + Sync>>,
|
||||
job_container: JobContainer,
|
||||
) -> Result<usize> {
|
||||
if self.is_needed(address, connection)? {
|
||||
let entries = self.get(address, connection, job_container)?;
|
||||
let entries = self.get(address, connection, store, job_container)?;
|
||||
|
||||
connection.transaction(|| {
|
||||
let len = entries.len();
|
||||
|
@ -56,6 +58,7 @@ pub trait Extractor {
|
|||
|
||||
pub fn extract_all<D: Borrow<UpEndDatabase>>(
|
||||
db: D,
|
||||
store: Arc<Box<dyn UpStore + Send + Sync>>,
|
||||
mut job_container: JobContainer,
|
||||
) -> Result<usize> {
|
||||
info!("Extracting metadata for all addresses.");
|
||||
|
@ -72,7 +75,7 @@ pub fn extract_all<D: Borrow<UpEndDatabase>>(
|
|||
.par_iter()
|
||||
.map(|address| {
|
||||
let connection = db.connection()?;
|
||||
let extract_result = extract(address, &connection, job_container.clone());
|
||||
let extract_result = extract(address, &connection, store.clone(), job_container.clone());
|
||||
|
||||
let mut cnt = count.write().unwrap();
|
||||
*cnt += 1;
|
||||
|
@ -99,6 +102,7 @@ pub fn extract_all<D: Borrow<UpEndDatabase>>(
|
|||
pub fn extract(
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
store: Arc<Box<dyn UpStore + Send + Sync>>,
|
||||
job_container: JobContainer,
|
||||
) -> Result<usize> {
|
||||
let mut entry_count = 0;
|
||||
|
@ -106,18 +110,20 @@ pub fn extract(
|
|||
|
||||
#[cfg(feature = "extractors-web")]
|
||||
{
|
||||
entry_count += web::WebExtractor.insert_info(address, connection, job_container.clone())?;
|
||||
entry_count +=
|
||||
web::WebExtractor.insert_info(address, connection, store.clone(), job_container.clone())?;
|
||||
}
|
||||
|
||||
#[cfg(feature = "extractors-audio")]
|
||||
{
|
||||
entry_count +=
|
||||
audio::ID3Extractor.insert_info(address, connection, job_container.clone())?;
|
||||
audio::ID3Extractor.insert_info(address, connection, store.clone(), job_container.clone())?;
|
||||
}
|
||||
|
||||
#[cfg(feature = "extractors-photo")]
|
||||
{
|
||||
entry_count += photo::ExifExtractor.insert_info(address, connection, job_container)?;
|
||||
entry_count +=
|
||||
photo::ExifExtractor.insert_info(address, connection, store.clone(), job_container)?;
|
||||
}
|
||||
|
||||
trace!("Extracting metadata for {address:?} - got {entry_count} entries.");
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use super::Extractor;
|
||||
use crate::{
|
||||
addressing::Address,
|
||||
database::{
|
||||
constants,
|
||||
entry::{Entry, EntryValue},
|
||||
stores::{fs::{FILE_MIME_KEY}, UpStore},
|
||||
UpEndConnection,
|
||||
},
|
||||
filesystem::FILE_MIME_KEY,
|
||||
util::jobs::{JobContainer, JobState},
|
||||
};
|
||||
use anyhow::{anyhow, Result};
|
||||
|
@ -21,6 +23,7 @@ impl Extractor for ExifExtractor {
|
|||
&self,
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
store: Arc<Box<dyn UpStore + Send + Sync>>,
|
||||
mut job_container: JobContainer,
|
||||
) -> Result<Vec<Entry>> {
|
||||
if let Address::Hash(hash) = address {
|
||||
|
@ -37,14 +40,15 @@ impl Extractor for ExifExtractor {
|
|||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let files = connection.retrieve_file(hash)?;
|
||||
let files = store.retrieve(hash)?;
|
||||
|
||||
if let Some(file) = files.get(0) {
|
||||
let file_path = file.get_file_path();
|
||||
let mut job_handle = job_container.add_job(
|
||||
None,
|
||||
&format!(
|
||||
r#"Getting EXIF info from "{:}""#,
|
||||
file.path
|
||||
file_path
|
||||
.components()
|
||||
.last()
|
||||
.unwrap()
|
||||
|
@ -53,7 +57,7 @@ impl Extractor for ExifExtractor {
|
|||
),
|
||||
)?;
|
||||
|
||||
let file = std::fs::File::open(&file.path)?;
|
||||
let file = std::fs::File::open(&file_path)?;
|
||||
let mut bufreader = std::io::BufReader::new(&file);
|
||||
let exifreader = exif::Reader::new();
|
||||
let exif = exifreader.read_from_container(&mut bufreader)?;
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use super::Extractor;
|
||||
use crate::{
|
||||
addressing::Address,
|
||||
database::{entry::Entry, UpEndConnection},
|
||||
database::{entry::Entry, stores::UpStore, UpEndConnection},
|
||||
util::jobs::{JobContainer, JobState},
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
|
@ -15,9 +17,10 @@ impl Extractor for WebExtractor {
|
|||
fn get(
|
||||
&self,
|
||||
address: &Address,
|
||||
_: &UpEndConnection,
|
||||
_connection: &UpEndConnection,
|
||||
_store: Arc<Box<dyn UpStore + Send + Sync>>,
|
||||
mut job_container: JobContainer,
|
||||
) -> anyhow::Result<Vec<Entry>> {
|
||||
) -> Result<Vec<Entry>> {
|
||||
if let Address::Url(url) = address {
|
||||
let mut job_handle =
|
||||
job_container.add_job(None, &format!("Getting info about {url:?}"))?;
|
||||
|
@ -81,10 +84,12 @@ impl Extractor for WebExtractor {
|
|||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::util::jobs::JobContainer;
|
||||
|
||||
use crate::{database::stores::fs::FsStore, util::jobs::JobContainer};
|
||||
|
||||
use super::*;
|
||||
use anyhow::Result;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
|
@ -92,12 +97,14 @@ mod test {
|
|||
let temp_dir = TempDir::new().unwrap();
|
||||
let open_result = crate::database::UpEndDatabase::open(&temp_dir, None, true)?;
|
||||
let connection = open_result.db.connection()?;
|
||||
let store =
|
||||
Arc::new(Box::new(FsStore::from_path(&temp_dir)?) as Box<dyn UpStore + Sync + Send>);
|
||||
let job_container = JobContainer::new();
|
||||
|
||||
let address = Address::Url("https://upend.dev".into());
|
||||
assert!(WebExtractor.is_needed(&address, &connection)?);
|
||||
|
||||
WebExtractor.insert_info(&address, &connection, job_container)?;
|
||||
WebExtractor.insert_info(&address, &connection, store, job_container)?;
|
||||
|
||||
assert!(!WebExtractor.is_needed(&address, &connection)?);
|
||||
|
||||
|
|
|
@ -1,573 +0,0 @@
|
|||
use std::borrow::Borrow;
|
||||
use std::convert::TryFrom;
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::{Instant, SystemTime, UNIX_EPOCH};
|
||||
use std::{fs, iter};
|
||||
|
||||
use crate::addressing::Address;
|
||||
use crate::database::constants::{
|
||||
ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR,
|
||||
TYPE_HAS_ATTR,
|
||||
};
|
||||
use crate::database::entry::{Entry, InvariantEntry};
|
||||
use crate::database::hierarchies::{
|
||||
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
|
||||
};
|
||||
use crate::database::inner::models;
|
||||
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
|
||||
use crate::util::hash::{Hash, Hashable};
|
||||
use crate::util::jobs::{JobContainer, JobHandle};
|
||||
use anyhow::{anyhow, Error, Result};
|
||||
use chrono::prelude::*;
|
||||
use log::{debug, error, info, warn};
|
||||
use lru::LruCache;
|
||||
use rayon::prelude::*;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
const BLOB_TYPE: &str = "BLOB";
|
||||
const ALIAS_KEY: &str = "ALIAS";
|
||||
pub const FILE_MIME_KEY: &str = "FILE_MIME";
|
||||
const FILE_MTIME_KEY: &str = "FILE_MTIME";
|
||||
const FILE_SIZE_KEY: &str = "FILE_SIZE";
|
||||
|
||||
lazy_static! {
|
||||
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
|
||||
attribute: String::from(TYPE_BASE_ATTR),
|
||||
value: BLOB_TYPE.into(),
|
||||
};
|
||||
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
||||
}
|
||||
|
||||
fn initialize_types(connection: &UpEndConnection) -> Result<()> {
|
||||
// BLOB_TYPE
|
||||
connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
|
||||
upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
|
||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
|
||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
|
||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
|
||||
upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn rescan_vault<D: Borrow<UpEndDatabase>>(
|
||||
db: D,
|
||||
mut job_container: JobContainer,
|
||||
quick_check: bool,
|
||||
disable_synchronous: bool,
|
||||
) -> Result<Vec<UpdatePathOutcome>> {
|
||||
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
|
||||
|
||||
match job_result {
|
||||
Ok(job_handle) => {
|
||||
let result = rescan_vault_inner(db, job_handle, quick_check, disable_synchronous);
|
||||
|
||||
if let Err(err) = &result {
|
||||
error!("Update did not succeed! {:?}", err);
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
|
||||
|
||||
impl Drop for PragmaSynchronousGuard<'_> {
|
||||
fn drop(&mut self) {
|
||||
debug!("Re-enabling synchronous mode.");
|
||||
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
|
||||
if let Err(err) = res {
|
||||
error!(
|
||||
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type UpdatePathResult = Result<UpdatePathOutcome>;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum UpdatePathOutcome {
|
||||
Added(PathBuf),
|
||||
Unchanged(PathBuf),
|
||||
Removed(PathBuf),
|
||||
Failed(PathBuf, Error),
|
||||
}
|
||||
|
||||
fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
|
||||
db: D,
|
||||
job_handle: JobHandle,
|
||||
quick_check: bool,
|
||||
disable_synchronous: bool,
|
||||
) -> Result<Vec<UpdatePathOutcome>> {
|
||||
let start = Instant::now();
|
||||
info!("Vault rescan started.");
|
||||
|
||||
let db = db.borrow();
|
||||
let connection = db.connection()?;
|
||||
|
||||
// Initialize types, etc...
|
||||
debug!("Initializing DB types.");
|
||||
initialize_types(&connection)?;
|
||||
|
||||
// Disable syncing in SQLite for the duration of the import
|
||||
let mut _guard: Option<PragmaSynchronousGuard> = None;
|
||||
if disable_synchronous {
|
||||
debug!("Disabling SQLite synchronous mode");
|
||||
connection.execute("PRAGMA synchronous = OFF;")?;
|
||||
_guard = Some(PragmaSynchronousGuard(&connection));
|
||||
}
|
||||
|
||||
// Walk through the vault, find all paths
|
||||
debug!("Traversing vault directory");
|
||||
let absolute_dir_path = fs::canonicalize(&*db.vault_path)?;
|
||||
let path_entries: Vec<PathBuf> = WalkDir::new(&*db.vault_path)
|
||||
.follow_links(true)
|
||||
.into_iter()
|
||||
.filter_map(|e| e.ok())
|
||||
.filter_map(|e| fs::canonicalize(e.into_path()).ok())
|
||||
.filter(|e| e.is_file())
|
||||
.filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR)))
|
||||
.collect();
|
||||
|
||||
// Prepare for processing
|
||||
let existing_files = Arc::new(RwLock::new(connection.retrieve_all_files()?));
|
||||
|
||||
// Actual processing
|
||||
let count = RwLock::new(0_usize);
|
||||
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
|
||||
let total = path_entries.len() as f32;
|
||||
let shared_job_handle = Arc::new(Mutex::new(job_handle));
|
||||
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
|
||||
.into_par_iter()
|
||||
.map(|path| {
|
||||
let result = process_directory_entry(
|
||||
db.connection().unwrap(),
|
||||
&resolve_cache,
|
||||
path.clone(),
|
||||
&existing_files,
|
||||
quick_check,
|
||||
);
|
||||
|
||||
let mut cnt = count.write().unwrap();
|
||||
*cnt += 1;
|
||||
|
||||
let _ = shared_job_handle
|
||||
.lock()
|
||||
.unwrap()
|
||||
.update_progress(*cnt as f32 / total * 100.0);
|
||||
|
||||
match result {
|
||||
Ok(result) => result,
|
||||
Err(error) => {
|
||||
error!("Failed to update {:?} ({})", path, error);
|
||||
UpdatePathOutcome::Failed(path, error)
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
debug!("Processing done, cleaning up...");
|
||||
|
||||
let existing_files = existing_files.read().unwrap();
|
||||
|
||||
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
|
||||
let trans_result = connection.transaction::<_, Error, _>(|| {
|
||||
connection.file_set_valid(file.id, false)?;
|
||||
// remove_object(&connection, )?
|
||||
Ok(())
|
||||
});
|
||||
|
||||
match trans_result {
|
||||
Ok(_) => {
|
||||
info!("Removed: {:?}", file.path);
|
||||
UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))
|
||||
}
|
||||
Err(error) => UpdatePathOutcome::Failed(PathBuf::from(file.path.clone()), error),
|
||||
}
|
||||
});
|
||||
|
||||
// Re-enable SQLite syncing
|
||||
drop(_guard);
|
||||
|
||||
// Reporting
|
||||
let all_outcomes = path_outcomes
|
||||
.into_iter()
|
||||
.chain(cleanup_results)
|
||||
.collect::<Vec<UpdatePathOutcome>>();
|
||||
|
||||
let mut failed: Vec<(&PathBuf, &Error)> = vec![];
|
||||
let mut created = 0;
|
||||
let mut unchanged = 0;
|
||||
let mut deleted = 0;
|
||||
|
||||
for outcome in &all_outcomes {
|
||||
match outcome {
|
||||
UpdatePathOutcome::Added(_) => created += 1,
|
||||
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
|
||||
UpdatePathOutcome::Removed(_) => deleted += 1,
|
||||
UpdatePathOutcome::Failed(path, err) => failed.push((path, err)),
|
||||
}
|
||||
}
|
||||
|
||||
if !failed.is_empty() {
|
||||
warn!(
|
||||
"{} path updates failed! ({})",
|
||||
failed.len(),
|
||||
failed
|
||||
.iter()
|
||||
.map(|(path, error)| format!("{:?}: {}", path, error))
|
||||
.collect::<Vec<String>>()
|
||||
.join(", ")
|
||||
)
|
||||
}
|
||||
|
||||
info!(
|
||||
"Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.",
|
||||
db.vault_path,
|
||||
created,
|
||||
deleted,
|
||||
unchanged,
|
||||
start.elapsed().as_secs()
|
||||
);
|
||||
|
||||
Ok(all_outcomes)
|
||||
}
|
||||
|
||||
fn process_directory_entry(
|
||||
connection: UpEndConnection,
|
||||
resolve_cache: &Arc<Mutex<ResolveCache>>,
|
||||
path: PathBuf,
|
||||
existing_files: &Arc<RwLock<Vec<models::File>>>,
|
||||
quick_check: bool,
|
||||
) -> UpdatePathResult {
|
||||
debug!("Processing: {:?}", path);
|
||||
|
||||
// Prepare the data
|
||||
let existing_files = Arc::clone(existing_files);
|
||||
|
||||
let normalized_path = connection.normalize_path(&path)?;
|
||||
let normalized_path_str = normalized_path
|
||||
.to_str()
|
||||
.ok_or(anyhow!("Path not valid unicode!"))?;
|
||||
|
||||
let mut file_hash: Option<Hash> = None;
|
||||
|
||||
// Get size & mtime for quick comparison
|
||||
let metadata = fs::metadata(&path)?;
|
||||
let size = metadata.len() as i64;
|
||||
if size < 0 {
|
||||
panic!("File {} too large?!", path.display());
|
||||
}
|
||||
let mtime = metadata
|
||||
.modified()
|
||||
.map(|t| {
|
||||
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
|
||||
})
|
||||
.ok();
|
||||
|
||||
// Check if the path entry for this file already exists in database
|
||||
let existing_files_read = existing_files.read().unwrap();
|
||||
let maybe_existing_file = existing_files_read
|
||||
.iter()
|
||||
.find(|file| file.path == normalized_path_str);
|
||||
|
||||
if let Some(existing_file) = maybe_existing_file {
|
||||
let existing_file = existing_file.clone();
|
||||
drop(existing_files_read);
|
||||
|
||||
if !quick_check || size == existing_file.size {
|
||||
let same_mtime = mtime.is_some() && mtime == existing_file.mtime;
|
||||
let mut same_hash = false;
|
||||
|
||||
if !quick_check || !same_mtime {
|
||||
file_hash = Some(path.hash()?);
|
||||
same_hash = file_hash.as_ref().unwrap() == &existing_file.hash;
|
||||
}
|
||||
|
||||
if same_mtime || same_hash {
|
||||
if mtime != existing_file.mtime {
|
||||
connection.file_update_mtime(existing_file.id, mtime)?;
|
||||
}
|
||||
|
||||
if !existing_file.valid {
|
||||
connection.file_set_valid(existing_file.id, true)?;
|
||||
}
|
||||
|
||||
let mut existing_files_write = existing_files.write().unwrap();
|
||||
let maybe_existing_file = existing_files_write
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find(|(_, file)| file.path == normalized_path_str)
|
||||
.map(|(idx, _)| idx);
|
||||
|
||||
if let Some(idx) = maybe_existing_file {
|
||||
existing_files_write.swap_remove(idx);
|
||||
debug!("Unchanged: {:?}", path);
|
||||
return Ok(UpdatePathOutcome::Unchanged(path));
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
drop(existing_files_read);
|
||||
}
|
||||
|
||||
// If not, add it!
|
||||
if file_hash.is_none() {
|
||||
file_hash = Some(path.hash()?);
|
||||
}
|
||||
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
|
||||
|
||||
insert_file_with_metadata(
|
||||
&connection,
|
||||
&normalized_path,
|
||||
file_hash.unwrap(),
|
||||
size,
|
||||
mtime,
|
||||
mime_type,
|
||||
Some(resolve_cache),
|
||||
)
|
||||
.map(|_| {
|
||||
info!("Added: {:?}", path);
|
||||
UpdatePathOutcome::Added(path.clone())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_file(connection: &UpEndConnection, path: &Path, hash: Hash) -> Result<Address> {
|
||||
let normalized_path = connection.normalize_path(path)?;
|
||||
let metadata = fs::metadata(&path)?;
|
||||
let size = metadata.len() as i64;
|
||||
let mtime = metadata
|
||||
.modified()
|
||||
.map(|t| {
|
||||
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
|
||||
})
|
||||
.ok();
|
||||
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
|
||||
|
||||
insert_file_with_metadata(
|
||||
connection,
|
||||
&normalized_path,
|
||||
hash,
|
||||
size,
|
||||
mtime,
|
||||
mime_type,
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
fn insert_file_with_metadata(
|
||||
connection: &UpEndConnection,
|
||||
normalized_path: &Path,
|
||||
hash: Hash,
|
||||
size: i64,
|
||||
mtime: Option<NaiveDateTime>,
|
||||
mime_type: Option<String>,
|
||||
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
|
||||
) -> Result<Address> {
|
||||
let new_file = models::NewFile {
|
||||
path: normalized_path
|
||||
.to_str()
|
||||
.ok_or(anyhow!("Path not UTF-8?!"))?
|
||||
.to_string(),
|
||||
hash: (hash.clone()).0,
|
||||
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
||||
size,
|
||||
mtime,
|
||||
};
|
||||
|
||||
let blob_address = Address::Hash(hash);
|
||||
|
||||
// Metadata
|
||||
let type_entry = Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: String::from(IS_OF_TYPE_ATTR),
|
||||
value: BLOB_TYPE_ADDR.clone().into(),
|
||||
};
|
||||
|
||||
let size_entry = Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: FILE_SIZE_KEY.to_string(),
|
||||
value: (size as f64).into(),
|
||||
};
|
||||
|
||||
let mime_entry = mime_type.map(|mime_type| Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: FILE_MIME_KEY.to_string(),
|
||||
value: mime_type.into(),
|
||||
});
|
||||
|
||||
let added_entry = Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: ADDED_ATTR.to_string(),
|
||||
value: (SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs() as f64)
|
||||
.into(),
|
||||
};
|
||||
|
||||
// Add the appropriate entries w/r/t virtual filesystem location
|
||||
let components = normalized_path.components().collect::<Vec<Component>>();
|
||||
let (filename, dir_path) = components.split_last().unwrap();
|
||||
|
||||
let upath = UHierPath(
|
||||
iter::once(UNode::new("NATIVE").unwrap())
|
||||
.chain(dir_path.iter().map(|component| {
|
||||
UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap()
|
||||
}))
|
||||
.collect(),
|
||||
);
|
||||
let resolved_path = match resolve_cache {
|
||||
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
|
||||
None => resolve_path(connection, &upath, true)?,
|
||||
};
|
||||
let parent_dir = resolved_path.last().unwrap();
|
||||
|
||||
// Insert all
|
||||
connection.transaction::<_, Error, _>(|| {
|
||||
let file_count = connection.insert_file(new_file)?;
|
||||
|
||||
connection.insert_entry_immutable(type_entry)?;
|
||||
connection.insert_entry_immutable(size_entry)?;
|
||||
if file_count == 1 {
|
||||
connection.insert_entry_immutable(added_entry)?;
|
||||
}
|
||||
if let Some(mime_entry) = mime_entry {
|
||||
connection.insert_entry(mime_entry)?;
|
||||
}
|
||||
|
||||
let dir_has_entry = Entry {
|
||||
entity: parent_dir.clone(),
|
||||
attribute: HIER_HAS_ATTR.to_string(),
|
||||
value: blob_address.clone().into(),
|
||||
};
|
||||
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
|
||||
|
||||
let label_entry = Entry {
|
||||
entity: blob_address.clone(),
|
||||
attribute: LABEL_ATTR.to_string(),
|
||||
value: filename.as_os_str().to_string_lossy().to_string().into(),
|
||||
};
|
||||
let label_entry_addr = connection.insert_entry(label_entry)?;
|
||||
|
||||
let alias_entry = Entry {
|
||||
entity: dir_has_entry_addr,
|
||||
attribute: ALIAS_KEY.to_string(),
|
||||
value: label_entry_addr.into(),
|
||||
};
|
||||
connection.insert_entry(alias_entry)?;
|
||||
|
||||
Ok(blob_address)
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::database::UpEndDatabase;
|
||||
use crate::util::jobs::JobContainer;
|
||||
|
||||
use super::*;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use tempfile::TempDir;
|
||||
|
||||
use std::sync::Once;
|
||||
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
|
||||
|
||||
static INIT: Once = Once::new();
|
||||
|
||||
pub fn initialize() {
|
||||
INIT.call_once(|| {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::DEBUG.into())
|
||||
.from_env_lossy(),
|
||||
)
|
||||
.init();
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rescan_quick() {
|
||||
initialize();
|
||||
_test_rescan(true)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rescan_full() {
|
||||
initialize();
|
||||
_test_rescan(false)
|
||||
}
|
||||
|
||||
fn _test_rescan(quick: bool) {
|
||||
// Prepare temporary filesystem structure
|
||||
let temp_dir = TempDir::new().unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("my-temporary-note.txt");
|
||||
let mut tmp_file = File::create(file_path).unwrap();
|
||||
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("hello-world.txt");
|
||||
let mut tmp_file = File::create(file_path).unwrap();
|
||||
writeln!(tmp_file, "Hello, World!").unwrap();
|
||||
|
||||
let file_path = temp_dir.path().join("empty");
|
||||
File::create(file_path).unwrap();
|
||||
|
||||
// Initialize database
|
||||
|
||||
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
|
||||
let mut job_container = JobContainer::new();
|
||||
let _job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
|
||||
|
||||
// Initial scan
|
||||
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true);
|
||||
|
||||
assert!(rescan_result.is_ok());
|
||||
let rescan_result = rescan_result.unwrap();
|
||||
assert_eq!(rescan_result.len(), 3);
|
||||
rescan_result
|
||||
.into_iter()
|
||||
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_))));
|
||||
|
||||
// Modification-less rescan
|
||||
|
||||
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, false);
|
||||
|
||||
assert!(rescan_result.is_ok());
|
||||
let rescan_result = rescan_result.unwrap();
|
||||
assert_eq!(rescan_result.len(), 3);
|
||||
rescan_result
|
||||
.into_iter()
|
||||
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_))));
|
||||
|
||||
// Remove a file
|
||||
|
||||
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
|
||||
|
||||
let rescan_result = rescan_vault(&open_result.db, job_container, quick, false);
|
||||
|
||||
assert!(rescan_result.is_ok());
|
||||
let rescan_result = rescan_result.unwrap();
|
||||
assert_eq!(rescan_result.len(), 3);
|
||||
assert_eq!(
|
||||
2,
|
||||
rescan_result
|
||||
.iter()
|
||||
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
|
||||
.count()
|
||||
);
|
||||
assert_eq!(
|
||||
1,
|
||||
rescan_result
|
||||
.iter()
|
||||
.filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_)))
|
||||
.count()
|
||||
);
|
||||
}
|
||||
}
|
47
src/main.rs
47
src/main.rs
|
@ -11,14 +11,17 @@ use std::path::PathBuf;
|
|||
use actix_web::{middleware, App, HttpServer};
|
||||
use anyhow::Result;
|
||||
use clap::{App as ClapApp, Arg};
|
||||
use log::{debug, info, warn};
|
||||
use log::{info, warn};
|
||||
use rand::{thread_rng, Rng};
|
||||
use std::sync::Arc;
|
||||
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
|
||||
|
||||
use crate::{
|
||||
common::{get_static_dir, PKG_VERSION},
|
||||
database::UpEndDatabase,
|
||||
database::{
|
||||
stores::{fs::FsStore, UpStore},
|
||||
UpEndDatabase,
|
||||
},
|
||||
util::{exec::block_background, jobs::JobContainer},
|
||||
};
|
||||
|
||||
|
@ -26,7 +29,6 @@ mod addressing;
|
|||
mod common;
|
||||
mod database;
|
||||
mod extractors;
|
||||
mod filesystem;
|
||||
mod previews;
|
||||
mod routes;
|
||||
mod util;
|
||||
|
@ -125,6 +127,9 @@ fn main() -> Result<()> {
|
|||
.expect("failed to open database!");
|
||||
|
||||
let upend = Arc::new(open_result.db);
|
||||
let store =
|
||||
Arc::new(Box::new(FsStore::from_path(vault_path.clone()).unwrap())
|
||||
as Box<dyn UpStore + Send + Sync>);
|
||||
|
||||
let ui_path = get_static_dir("webui");
|
||||
if ui_path.is_err() {
|
||||
|
@ -137,22 +142,22 @@ fn main() -> Result<()> {
|
|||
let ui_enabled = ui_path.is_ok() && !matches.is_present("NO_UI");
|
||||
let browser_enabled = desktop_enabled && !matches.is_present("NO_BROWSER");
|
||||
|
||||
let preview_path = upend.db_path.join("previews");
|
||||
let preview_dir = tempfile::tempdir().unwrap();
|
||||
#[cfg(feature = "previews")]
|
||||
let preview_store = Some(Arc::new(crate::previews::PreviewStore::new(
|
||||
preview_path.clone(),
|
||||
upend.clone(),
|
||||
preview_dir.path(),
|
||||
store.clone(),
|
||||
)));
|
||||
|
||||
if matches.is_present("CLEAN") {
|
||||
info!("Cleaning temporary directories...");
|
||||
if preview_path.exists() {
|
||||
std::fs::remove_dir_all(&preview_path).unwrap();
|
||||
debug!("Removed {preview_path:?}");
|
||||
} else {
|
||||
debug!("No preview path exists, continuing...");
|
||||
}
|
||||
}
|
||||
// if matches.is_present("CLEAN") {
|
||||
// info!("Cleaning temporary directories...");
|
||||
// if preview_path.exists() {
|
||||
// std::fs::remove_dir_all(&preview_path).unwrap();
|
||||
// debug!("Removed {preview_path:?}");
|
||||
// } else {
|
||||
// debug!("No preview path exists, continuing...");
|
||||
// }
|
||||
// }
|
||||
|
||||
#[cfg(not(feature = "previews"))]
|
||||
let preview_store = None;
|
||||
|
@ -193,6 +198,7 @@ fn main() -> Result<()> {
|
|||
.into_owned()
|
||||
}),
|
||||
),
|
||||
store,
|
||||
job_container: job_container.clone(),
|
||||
preview_store,
|
||||
desktop_enabled,
|
||||
|
@ -226,7 +232,6 @@ fn main() -> Result<()> {
|
|||
.service(routes::list_hier)
|
||||
.service(routes::list_hier_roots)
|
||||
.service(routes::store_info)
|
||||
.service(routes::get_file)
|
||||
.service(routes::get_jobs)
|
||||
.service(routes::get_info);
|
||||
|
||||
|
@ -257,14 +262,10 @@ fn main() -> Result<()> {
|
|||
|
||||
if !matches.is_present("NO_INITIAL_UPDATE") {
|
||||
info!("Running initial update...");
|
||||
let new = open_result.new;
|
||||
// let new = open_result.new;
|
||||
block_background::<_, _, anyhow::Error>(move || {
|
||||
let _ = if new {
|
||||
filesystem::rescan_vault(upend.clone(), job_container.clone(), false, true)
|
||||
} else {
|
||||
filesystem::rescan_vault(upend.clone(), job_container.clone(), true, false)
|
||||
};
|
||||
let _ = extractors::extract_all(upend, job_container);
|
||||
let _ = state.store.update(&upend, job_container.clone());
|
||||
let _ = extractors::extract_all(upend, state.store, job_container);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use crate::database::stores::UpStore;
|
||||
use crate::util::hash::b58_encode;
|
||||
use crate::util::hash::Hash;
|
||||
use crate::util::jobs::{JobContainer, JobState};
|
||||
use crate::{database::UpEndDatabase, util::hash::b58_encode};
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::{debug, trace};
|
||||
|
||||
|
@ -27,17 +28,17 @@ pub trait Previewable {
|
|||
}
|
||||
pub struct PreviewStore {
|
||||
path: PathBuf,
|
||||
db: Arc<UpEndDatabase>,
|
||||
store: Arc<Box<dyn UpStore + Send + Sync>>,
|
||||
|
||||
locks: Mutex<HashMap<Hash, Arc<Mutex<PathBuf>>>>,
|
||||
}
|
||||
|
||||
#[cfg(feature = "previews")]
|
||||
impl PreviewStore {
|
||||
pub fn new<P: AsRef<Path>>(path: P, db: Arc<UpEndDatabase>) -> Self {
|
||||
pub fn new<P: AsRef<Path>>(path: P, store: Arc<Box<dyn UpStore + Send + Sync>>) -> Self {
|
||||
PreviewStore {
|
||||
path: PathBuf::from(path.as_ref()),
|
||||
db,
|
||||
store,
|
||||
locks: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
@ -71,12 +72,12 @@ impl PreviewStore {
|
|||
Ok(Some(thumbpath.clone()))
|
||||
} else {
|
||||
trace!("Calculating preview for {hash:?}...");
|
||||
let connection = self.db.connection()?;
|
||||
let files = connection.retrieve_file(&hash)?;
|
||||
let files = self.store.retrieve(&hash)?;
|
||||
if let Some(file) = files.get(0) {
|
||||
let file_path = file.get_file_path();
|
||||
let mut job_handle = job_container.add_job(
|
||||
None,
|
||||
&format!("Creating preview for {:?}", file.path.file_name().unwrap()),
|
||||
&format!("Creating preview for {:?}", file_path.file_name().unwrap()),
|
||||
)?;
|
||||
|
||||
let mime_type = mime_type.into();
|
||||
|
@ -84,18 +85,18 @@ impl PreviewStore {
|
|||
let mime_type: Option<String> = if mime_type.is_some() {
|
||||
mime_type
|
||||
} else {
|
||||
tree_magic_mini::from_filepath(&file.path).map(|m| m.into())
|
||||
tree_magic_mini::from_filepath(file_path).map(|m| m.into())
|
||||
};
|
||||
|
||||
let preview = match mime_type {
|
||||
Some(tm) if tm.starts_with("text") => TextPath(&file.path).get_thumbnail(),
|
||||
Some(tm) if tm.starts_with("text") => TextPath(file_path).get_thumbnail(),
|
||||
Some(tm) if tm.starts_with("video") || tm == "application/x-matroska" => {
|
||||
VideoPath(&file.path).get_thumbnail()
|
||||
VideoPath(file_path).get_thumbnail()
|
||||
}
|
||||
Some(tm) if tm.starts_with("audio") || tm == "application/x-riff" => {
|
||||
AudioPath(&file.path).get_thumbnail()
|
||||
AudioPath(file_path).get_thumbnail()
|
||||
}
|
||||
Some(tm) if tm.starts_with("image") => ImagePath(&file.path).get_thumbnail(),
|
||||
Some(tm) if tm.starts_with("image") => ImagePath(file_path).get_thumbnail(),
|
||||
Some(unknown) => Err(anyhow!("No capability for {:?} thumbnails.", unknown)),
|
||||
_ => Err(anyhow!("Unknown file type, or file doesn't exist.")),
|
||||
};
|
||||
|
|
188
src/routes.rs
188
src/routes.rs
|
@ -3,12 +3,12 @@ use crate::database::constants::{ADDED_ATTR, LABEL_ATTR};
|
|||
use crate::database::entry::{Entry, EntryValue, InvariantEntry};
|
||||
use crate::database::hierarchies::{list_roots, resolve_path, UHierPath};
|
||||
use crate::database::lang::Query;
|
||||
use crate::database::stores::{Blob, UpStore};
|
||||
use crate::database::UpEndDatabase;
|
||||
use crate::extractors::{self};
|
||||
use crate::filesystem::add_file;
|
||||
use crate::previews::PreviewStore;
|
||||
use crate::util::exec::block_background;
|
||||
use crate::util::hash::{b58_decode, b58_encode, Hashable};
|
||||
use crate::util::hash::{b58_decode, b58_encode};
|
||||
use crate::util::jobs::JobContainer;
|
||||
use actix_files::NamedFile;
|
||||
use actix_multipart::Multipart;
|
||||
|
@ -26,13 +26,11 @@ use futures_util::TryStreamExt;
|
|||
use log::{debug, info, trace, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
use std::convert::{TryFrom, TryInto};
|
||||
use std::fs;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::{collections::HashMap, io};
|
||||
use tempfile::NamedTempFile;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -42,6 +40,7 @@ use is_executable::IsExecutable;
|
|||
#[derive(Clone)]
|
||||
pub struct State {
|
||||
pub upend: Arc<UpEndDatabase>,
|
||||
pub store: Arc<Box<dyn UpStore + Sync + Send>>,
|
||||
pub vault_name: Option<String>,
|
||||
pub job_container: JobContainer,
|
||||
pub preview_store: Option<Arc<PreviewStore>>,
|
||||
|
@ -140,13 +139,13 @@ pub async fn get_raw(
|
|||
}
|
||||
|
||||
// Then, check the files
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
let _hash = hash.clone();
|
||||
let files = web::block(move || connection.retrieve_file(_hash.as_ref()))
|
||||
let _store = state.store.clone();
|
||||
let blobs = web::block(move || _store.retrieve(_hash.as_ref()))
|
||||
.await
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
if let Some(file) = files.get(0) {
|
||||
let file_path = state.upend.vault_path.join(&file.path);
|
||||
if let Some(blob) = blobs.get(0) {
|
||||
let file_path = blob.get_file_path();
|
||||
|
||||
if query.native.is_none() {
|
||||
Ok(Either::A(
|
||||
|
@ -185,12 +184,9 @@ pub async fn get_raw(
|
|||
http::header::WARNING.to_string(),
|
||||
);
|
||||
|
||||
file_path
|
||||
.parent()
|
||||
.ok_or_else(|| {
|
||||
ErrorInternalServerError("No parent to open as fallback.")
|
||||
})?
|
||||
.to_path_buf()
|
||||
file_path.parent().ok_or_else(|| {
|
||||
ErrorInternalServerError("No parent to open as fallback.")
|
||||
})?
|
||||
};
|
||||
opener::open(path).map_err(error::ErrorServiceUnavailable)?;
|
||||
Ok(Either::B(response.finish()))
|
||||
|
@ -441,9 +437,10 @@ pub async fn put_object(
|
|||
|
||||
let _address = address.clone();
|
||||
let _job_container = state.job_container.clone();
|
||||
let _store = state.store.clone();
|
||||
block_background::<_, _, anyhow::Error>(move || {
|
||||
let extract_result =
|
||||
extractors::extract(&_address, &connection, _job_container);
|
||||
extractors::extract(&_address, &connection, _store, _job_container);
|
||||
if let Ok(entry_count) = extract_result {
|
||||
debug!("Added {entry_count} extracted entries for {_address:?}");
|
||||
} else {
|
||||
|
@ -493,40 +490,17 @@ pub async fn put_object(
|
|||
while let Some(chunk) = field.try_next().await? {
|
||||
file = web::block(move || file.write_all(&chunk).map(|_| file)).await?;
|
||||
}
|
||||
let path = PathBuf::from(file.path());
|
||||
let hash = web::block(move || path.hash()).await?;
|
||||
let address = Address::Hash(hash.clone());
|
||||
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
let _store = state.store.clone();
|
||||
let _filename = filename.clone();
|
||||
let hash = web::block(move || {
|
||||
_store.store(connection, Blob::from_filepath(file.path()), _filename)
|
||||
})
|
||||
.await
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
let _hash = hash.clone();
|
||||
let existing_files = web::block(move || connection.retrieve_file(&_hash))
|
||||
.await
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
if existing_files.is_empty() {
|
||||
let addr_str = b58_encode(address.encode().map_err(ErrorInternalServerError)?);
|
||||
let final_name = if let Some(ref filename) = filename {
|
||||
format!("{addr_str}_{filename}")
|
||||
} else {
|
||||
addr_str
|
||||
};
|
||||
|
||||
let final_path = state.upend.vault_path.join(&final_name);
|
||||
|
||||
let (_, tmp_path) = file.keep().map_err(ErrorInternalServerError)?;
|
||||
let final_path = web::block::<_, _, io::Error>(move || {
|
||||
fs::copy(&tmp_path, &final_path)?;
|
||||
fs::remove_file(tmp_path)?;
|
||||
Ok(final_path)
|
||||
})
|
||||
.await?;
|
||||
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
web::block(move || add_file(&connection, &final_path, hash))
|
||||
.await
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
}
|
||||
let address = Address::Hash(hash);
|
||||
|
||||
if let Some(ref filename) = filename {
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
|
@ -540,10 +514,11 @@ pub async fn put_object(
|
|||
|
||||
let _address = address.clone();
|
||||
let _job_container = state.job_container.clone();
|
||||
let _store = state.store.clone();
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
block_background::<_, _, anyhow::Error>(move || {
|
||||
let extract_result =
|
||||
extractors::extract(&_address, &connection, _job_container);
|
||||
extractors::extract(&_address, &connection, _store, _job_container);
|
||||
if let Ok(entry_count) = extract_result {
|
||||
debug!("Added {entry_count} extracted entries for {_address:?}");
|
||||
} else {
|
||||
|
@ -718,27 +693,28 @@ pub async fn list_hier_roots(state: web::Data<State>) -> Result<HttpResponse, Er
|
|||
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct RescanRequest {
|
||||
full: Option<String>,
|
||||
}
|
||||
// #[derive(Deserialize)]
|
||||
// pub struct RescanRequest {
|
||||
// full: Option<String>,
|
||||
// }
|
||||
|
||||
#[post("/api/refresh")]
|
||||
pub async fn api_refresh(
|
||||
req: HttpRequest,
|
||||
state: web::Data<State>,
|
||||
web::Query(query): web::Query<RescanRequest>,
|
||||
// web::Query(query): web::Query<RescanRequest>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
check_auth(&req, &state)?;
|
||||
|
||||
block_background::<_, _, anyhow::Error>(move || {
|
||||
let _ = crate::filesystem::rescan_vault(
|
||||
let _ = state
|
||||
.store
|
||||
.update(&state.upend, state.job_container.clone());
|
||||
let _ = crate::extractors::extract_all(
|
||||
state.upend.clone(),
|
||||
state.store.clone(),
|
||||
state.job_container.clone(),
|
||||
query.full.is_none(),
|
||||
false,
|
||||
);
|
||||
let _ = crate::extractors::extract_all(state.upend.clone(), state.job_container.clone());
|
||||
Ok(())
|
||||
});
|
||||
Ok(HttpResponse::Ok().finish())
|
||||
|
@ -746,68 +722,48 @@ pub async fn api_refresh(
|
|||
|
||||
#[get("/api/store")]
|
||||
pub async fn store_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
let files = web::block(move || connection.retrieve_all_files())
|
||||
.await
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
let mut files_by_hash = HashMap::new();
|
||||
for file in &files {
|
||||
if !files_by_hash.contains_key(&file.hash) {
|
||||
files_by_hash.insert(&file.hash, vec![]);
|
||||
}
|
||||
files_by_hash.get_mut(&file.hash).unwrap().push(file);
|
||||
}
|
||||
// let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
// let files = web::block(move || connection.retrieve_all_files())
|
||||
// .await
|
||||
// .map_err(ErrorInternalServerError)?;
|
||||
// let mut files_by_hash = HashMap::new();
|
||||
// for file in &files {
|
||||
// if !files_by_hash.contains_key(&file.hash) {
|
||||
// files_by_hash.insert(&file.hash, vec![]);
|
||||
// }
|
||||
// files_by_hash.get_mut(&file.hash).unwrap().push(file);
|
||||
// }
|
||||
|
||||
for paths in files_by_hash.values_mut() {
|
||||
paths.sort_unstable_by_key(|f| !f.valid);
|
||||
}
|
||||
// for paths in files_by_hash.values_mut() {
|
||||
// paths.sort_unstable_by_key(|f| !f.valid);
|
||||
// }
|
||||
|
||||
let mut blobs = files_by_hash
|
||||
.iter()
|
||||
.map(|(hash, files)| {
|
||||
json!({
|
||||
"hash": hash,
|
||||
"size": files[0].size,
|
||||
"paths": files.iter().map(|f| json!({
|
||||
"added": f.added,
|
||||
"valid": f.valid,
|
||||
"path": f.path
|
||||
})).collect::<serde_json::Value>()
|
||||
})
|
||||
})
|
||||
.collect::<Vec<serde_json::Value>>();
|
||||
// let mut blobs = files_by_hash
|
||||
// .iter()
|
||||
// .map(|(hash, files)| {
|
||||
// json!({
|
||||
// "hash": hash,
|
||||
// "size": files[0].size,
|
||||
// "paths": files.iter().map(|f| json!({
|
||||
// "added": f.added,
|
||||
// "valid": f.valid,
|
||||
// "path": f.path
|
||||
// })).collect::<serde_json::Value>()
|
||||
// })
|
||||
// })
|
||||
// .collect::<Vec<serde_json::Value>>();
|
||||
|
||||
blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap());
|
||||
blobs.reverse();
|
||||
// blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap());
|
||||
// blobs.reverse();
|
||||
|
||||
Ok(HttpResponse::Ok().json(json!({
|
||||
"totals": {
|
||||
"count": files_by_hash.len(),
|
||||
"size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::<u64>()
|
||||
},
|
||||
"blobs": blobs
|
||||
})))
|
||||
}
|
||||
|
||||
#[get("/api/store/{hash}")]
|
||||
pub async fn get_file(
|
||||
state: web::Data<State>,
|
||||
hash: web::Path<String>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let address =
|
||||
Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
if let Address::Hash(hash) = address {
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
let response = web::block(move || connection.retrieve_file(&hash))
|
||||
.await
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(response))
|
||||
} else {
|
||||
Err(ErrorBadRequest("Address does not refer to a file."))
|
||||
}
|
||||
// Ok(HttpResponse::Ok().json(json!({
|
||||
// "totals": {
|
||||
// "count": files_by_hash.len(),
|
||||
// "size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::<u64>()
|
||||
// },
|
||||
// "blobs": blobs
|
||||
// })))
|
||||
todo!();
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -838,7 +794,7 @@ pub async fn get_jobs(
|
|||
pub async fn get_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||
Ok(HttpResponse::Ok().json(json!({
|
||||
"name": state.vault_name,
|
||||
"location": &*state.upend.vault_path,
|
||||
// "location": &*state.store.path,
|
||||
"version": crate::common::PKG_VERSION,
|
||||
"desktop": state.desktop_enabled
|
||||
})))
|
||||
|
|
Loading…
Reference in New Issue