2022-08-19 14:04:18 +02:00
|
|
|
use self::db::files;
|
|
|
|
|
|
|
|
use super::{Blob, StoreError, UpStore, UpdatePathOutcome};
|
|
|
|
use crate::addressing::Address;
|
|
|
|
use crate::database::constants::{
|
|
|
|
ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR,
|
|
|
|
TYPE_HAS_ATTR,
|
|
|
|
};
|
|
|
|
use crate::database::entry::{Entry, InvariantEntry};
|
|
|
|
use crate::database::hierarchies::{
|
|
|
|
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
|
|
|
|
};
|
|
|
|
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
|
|
|
|
use crate::util::hash::{b58_encode, Hash, Hashable};
|
|
|
|
use crate::util::jobs::{JobContainer, JobHandle};
|
|
|
|
use anyhow::{anyhow, Error, Result};
|
|
|
|
use chrono::prelude::*;
|
|
|
|
use diesel::r2d2::{ConnectionManager, ManageConnection};
|
|
|
|
use diesel::ExpressionMethods;
|
|
|
|
use diesel::{Connection, QueryDsl, RunQueryDsl, SqliteConnection};
|
|
|
|
use log::{debug, error, info, warn};
|
|
|
|
use lru::LruCache;
|
|
|
|
use rayon::prelude::*;
|
|
|
|
use std::borrow::Borrow;
|
|
|
|
use std::convert::{TryFrom, TryInto};
|
|
|
|
use std::path::PathBuf;
|
|
|
|
use std::path::{Component, Path};
|
|
|
|
use std::sync::{Arc, Mutex, RwLock};
|
|
|
|
use std::time::{Instant, SystemTime, UNIX_EPOCH};
|
|
|
|
use std::{fs, iter};
|
|
|
|
use walkdir::WalkDir;
|
|
|
|
|
|
|
|
mod db;
|
|
|
|
|
|
|
|
const BLOB_TYPE: &str = "BLOB";
|
|
|
|
const ALIAS_KEY: &str = "ALIAS";
|
|
|
|
pub const FILE_MIME_KEY: &str = "FILE_MIME";
|
|
|
|
const FILE_MTIME_KEY: &str = "FILE_MTIME";
|
|
|
|
const FILE_SIZE_KEY: &str = "FILE_SIZE";
|
|
|
|
|
|
|
|
lazy_static! {
|
|
|
|
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
|
|
|
|
attribute: String::from(TYPE_BASE_ATTR),
|
|
|
|
value: BLOB_TYPE.into(),
|
|
|
|
};
|
|
|
|
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
|
|
|
}
|
|
|
|
|
|
|
|
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
|
|
|
|
|
|
|
|
impl Drop for PragmaSynchronousGuard<'_> {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
debug!("Re-enabling synchronous mode.");
|
|
|
|
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
|
|
|
|
if let Err(err) = res {
|
|
|
|
error!(
|
|
|
|
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
|
|
|
|
err
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
pub struct FsStore {
|
|
|
|
path: PathBuf,
|
|
|
|
manager: ConnectionManager<SqliteConnection>,
|
2022-09-13 19:43:30 +02:00
|
|
|
lock: Arc<RwLock<()>>,
|
2022-08-19 14:04:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl FsStore {
|
|
|
|
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<FsStore> {
|
|
|
|
let path = path.as_ref().to_path_buf().canonicalize()?;
|
|
|
|
let manager = ConnectionManager::<SqliteConnection>::new(
|
|
|
|
path.join(UPEND_SUBDIR)
|
|
|
|
.join("upend_vault.sqlite3")
|
|
|
|
.to_str()
|
|
|
|
.unwrap(),
|
|
|
|
);
|
|
|
|
let connection = manager.connect()?;
|
|
|
|
|
|
|
|
// while diesel doesn't support multiple embedded migrations...
|
|
|
|
connection.execute(
|
|
|
|
r#"
|
|
|
|
CREATE TABLE IF NOT EXISTS files
|
|
|
|
(
|
|
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
|
|
|
hash BLOB NOT NULL,
|
|
|
|
path VARCHAR NOT NULL,
|
|
|
|
valid BOOLEAN NOT NULL DEFAULT TRUE,
|
|
|
|
added DATETIME NOT NULL,
|
|
|
|
size BIGINT NOT NULL,
|
|
|
|
mtime DATETIME NULL
|
|
|
|
);
|
|
|
|
|
|
|
|
CREATE INDEX IF NOT EXISTS files_hash ON files (hash);
|
|
|
|
CREATE INDEX IF NOT EXISTS files_valid ON files (valid);
|
|
|
|
|
|
|
|
"#,
|
|
|
|
)?;
|
|
|
|
|
2022-09-13 19:43:30 +02:00
|
|
|
let lock = Arc::new(RwLock::new(()));
|
|
|
|
|
|
|
|
Ok(FsStore {
|
|
|
|
path,
|
|
|
|
manager,
|
|
|
|
lock,
|
|
|
|
})
|
2022-08-19 14:04:18 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
fn rescan_vault<D: Borrow<UpEndDatabase>>(
|
|
|
|
&self,
|
|
|
|
db: D,
|
|
|
|
job_handle: JobHandle,
|
|
|
|
quick_check: bool,
|
|
|
|
disable_synchronous: bool,
|
|
|
|
) -> Result<Vec<UpdatePathOutcome>> {
|
|
|
|
let start = Instant::now();
|
|
|
|
info!("Vault rescan started.");
|
|
|
|
|
|
|
|
let db = db.borrow();
|
|
|
|
let connection = db.connection()?;
|
|
|
|
|
|
|
|
// Initialize types, etc...
|
|
|
|
debug!("Initializing DB types.");
|
|
|
|
connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
|
|
|
|
upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
|
|
|
|
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
|
|
|
|
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
|
|
|
|
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
|
|
|
|
upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
|
|
|
|
|
|
|
|
// Disable syncing in SQLite for the duration of the import
|
|
|
|
let mut _guard: Option<PragmaSynchronousGuard> = None;
|
|
|
|
if disable_synchronous {
|
|
|
|
debug!("Disabling SQLite synchronous mode");
|
|
|
|
connection.execute("PRAGMA synchronous = OFF;")?;
|
|
|
|
_guard = Some(PragmaSynchronousGuard(&connection));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Walk through the vault, find all paths
|
|
|
|
debug!("Traversing vault directory");
|
|
|
|
let absolute_dir_path = fs::canonicalize(&*db.vault_path)?;
|
|
|
|
let path_entries: Vec<PathBuf> = WalkDir::new(&*db.vault_path)
|
|
|
|
.follow_links(true)
|
|
|
|
.into_iter()
|
|
|
|
.filter_map(|e| e.ok())
|
|
|
|
.filter_map(|e| fs::canonicalize(e.into_path()).ok())
|
|
|
|
.filter(|e| e.is_file())
|
|
|
|
.filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR)))
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
// Prepare for processing
|
|
|
|
let existing_files = Arc::new(RwLock::new(self.retrieve_all_files()?));
|
|
|
|
|
|
|
|
// Actual processing
|
|
|
|
let count = RwLock::new(0_usize);
|
|
|
|
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
|
|
|
|
let total = path_entries.len() as f32;
|
|
|
|
let shared_job_handle = Arc::new(Mutex::new(job_handle));
|
|
|
|
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
|
|
|
|
.into_par_iter()
|
|
|
|
.map(|path| {
|
|
|
|
let result = self.process_directory_entry(
|
|
|
|
db.connection().unwrap(),
|
|
|
|
&resolve_cache,
|
|
|
|
path.clone(),
|
|
|
|
&existing_files,
|
|
|
|
quick_check,
|
|
|
|
);
|
|
|
|
|
|
|
|
let mut cnt = count.write().unwrap();
|
|
|
|
*cnt += 1;
|
|
|
|
|
|
|
|
let _ = shared_job_handle
|
|
|
|
.lock()
|
|
|
|
.unwrap()
|
|
|
|
.update_progress(*cnt as f32 / total * 100.0);
|
|
|
|
|
|
|
|
match result {
|
|
|
|
Ok(result) => result,
|
|
|
|
Err(error) => {
|
|
|
|
error!("Failed to update {:?} ({})", path, error);
|
|
|
|
UpdatePathOutcome::Failed(path, StoreError::Unknown(error.to_string()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
debug!("Processing done, cleaning up...");
|
|
|
|
|
|
|
|
let existing_files = existing_files.read().unwrap();
|
|
|
|
|
|
|
|
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
|
|
|
|
let trans_result = connection.transaction::<_, Error, _>(|| {
|
|
|
|
self.file_set_valid(file.id, false)?;
|
|
|
|
// remove_object(&connection, )?
|
|
|
|
Ok(())
|
|
|
|
});
|
|
|
|
|
|
|
|
match trans_result {
|
|
|
|
Ok(_) => {
|
|
|
|
info!("Removed: {:?}", file.path);
|
|
|
|
UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))
|
|
|
|
}
|
|
|
|
Err(error) => UpdatePathOutcome::Failed(
|
|
|
|
PathBuf::from(file.path.clone()),
|
|
|
|
StoreError::Unknown(error.to_string()),
|
|
|
|
),
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
// Re-enable SQLite syncing
|
|
|
|
drop(_guard);
|
|
|
|
|
|
|
|
// Reporting
|
|
|
|
let all_outcomes = path_outcomes
|
|
|
|
.into_iter()
|
|
|
|
.chain(cleanup_results)
|
|
|
|
.collect::<Vec<UpdatePathOutcome>>();
|
|
|
|
|
|
|
|
let mut failed: Vec<(&PathBuf, &StoreError)> = vec![];
|
|
|
|
let mut created = 0;
|
|
|
|
let mut unchanged = 0;
|
|
|
|
let mut deleted = 0;
|
|
|
|
|
|
|
|
for outcome in &all_outcomes {
|
|
|
|
match outcome {
|
|
|
|
UpdatePathOutcome::Added(_) => created += 1,
|
|
|
|
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
|
|
|
|
UpdatePathOutcome::Removed(_) => deleted += 1,
|
|
|
|
UpdatePathOutcome::Failed(path, err) => failed.push((path, err)),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if !failed.is_empty() {
|
|
|
|
warn!(
|
|
|
|
"{} path updates failed! ({})",
|
|
|
|
failed.len(),
|
|
|
|
failed
|
|
|
|
.iter()
|
|
|
|
.map(|(path, error)| format!("{:?}: {}", path, error))
|
|
|
|
.collect::<Vec<String>>()
|
|
|
|
.join(", ")
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
info!(
|
|
|
|
"Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.",
|
|
|
|
db.vault_path,
|
|
|
|
created,
|
|
|
|
deleted,
|
|
|
|
unchanged,
|
|
|
|
start.elapsed().as_secs()
|
|
|
|
);
|
|
|
|
|
|
|
|
Ok(all_outcomes)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn process_directory_entry(
|
|
|
|
&self,
|
|
|
|
connection: UpEndConnection,
|
|
|
|
resolve_cache: &Arc<Mutex<ResolveCache>>,
|
|
|
|
path: PathBuf,
|
|
|
|
existing_files: &Arc<RwLock<Vec<db::File>>>,
|
|
|
|
quick_check: bool,
|
|
|
|
) -> Result<UpdatePathOutcome> {
|
|
|
|
debug!("Processing: {:?}", path);
|
|
|
|
|
|
|
|
// Prepare the data
|
|
|
|
let existing_files = Arc::clone(existing_files);
|
|
|
|
|
|
|
|
let normalized_path = self.normalize_path(&path)?;
|
|
|
|
let normalized_path_str = normalized_path
|
|
|
|
.to_str()
|
|
|
|
.ok_or(anyhow!("Path not valid unicode!"))?;
|
|
|
|
|
|
|
|
let mut file_hash: Option<Hash> = None;
|
|
|
|
|
|
|
|
// Get size & mtime for quick comparison
|
|
|
|
let metadata = fs::metadata(&path)?;
|
|
|
|
let size = metadata.len() as i64;
|
|
|
|
if size < 0 {
|
|
|
|
panic!("File {} too large?!", path.display());
|
|
|
|
}
|
|
|
|
let mtime = metadata
|
|
|
|
.modified()
|
|
|
|
.map(|t| {
|
|
|
|
NaiveDateTime::from_timestamp(
|
|
|
|
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
|
|
|
|
0,
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.ok();
|
|
|
|
|
|
|
|
// Check if the path entry for this file already exists in database
|
|
|
|
let existing_files_read = existing_files.read().unwrap();
|
|
|
|
let maybe_existing_file = existing_files_read
|
|
|
|
.iter()
|
|
|
|
.find(|file| file.path == normalized_path_str);
|
|
|
|
|
|
|
|
if let Some(existing_file) = maybe_existing_file {
|
|
|
|
let existing_file = existing_file.clone();
|
|
|
|
drop(existing_files_read);
|
|
|
|
|
|
|
|
if !quick_check || size == existing_file.size {
|
|
|
|
let same_mtime = mtime.is_some() && mtime == existing_file.mtime;
|
|
|
|
let mut same_hash = false;
|
|
|
|
|
|
|
|
if !quick_check || !same_mtime {
|
|
|
|
file_hash = Some(path.hash()?);
|
|
|
|
same_hash = file_hash.as_ref().unwrap() == &existing_file.hash;
|
|
|
|
}
|
|
|
|
|
|
|
|
if same_mtime || same_hash {
|
|
|
|
if mtime != existing_file.mtime {
|
|
|
|
self.file_update_mtime(existing_file.id, mtime)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
if !existing_file.valid {
|
|
|
|
self.file_set_valid(existing_file.id, true)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut existing_files_write = existing_files.write().unwrap();
|
|
|
|
let maybe_existing_file = existing_files_write
|
|
|
|
.iter()
|
|
|
|
.enumerate()
|
|
|
|
.find(|(_, file)| file.path == normalized_path_str)
|
|
|
|
.map(|(idx, _)| idx);
|
|
|
|
|
|
|
|
if let Some(idx) = maybe_existing_file {
|
|
|
|
existing_files_write.swap_remove(idx);
|
|
|
|
debug!("Unchanged: {:?}", path);
|
|
|
|
return Ok(UpdatePathOutcome::Unchanged(path));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
drop(existing_files_read);
|
|
|
|
}
|
|
|
|
|
|
|
|
// If not, add it!
|
|
|
|
if file_hash.is_none() {
|
|
|
|
file_hash = Some(path.hash()?);
|
|
|
|
}
|
|
|
|
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
|
|
|
|
|
|
|
|
self.insert_file_with_metadata(
|
|
|
|
&connection,
|
|
|
|
&normalized_path,
|
|
|
|
file_hash.unwrap(),
|
|
|
|
size,
|
|
|
|
mtime,
|
|
|
|
mime_type,
|
|
|
|
Some(resolve_cache),
|
|
|
|
)
|
|
|
|
.map(|_| {
|
|
|
|
info!("Added: {:?}", path);
|
|
|
|
UpdatePathOutcome::Added(path.clone())
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
fn add_file(&self, connection: &UpEndConnection, path: &Path, hash: Hash) -> Result<Address> {
|
|
|
|
let normalized_path = self.normalize_path(path)?;
|
|
|
|
let metadata = fs::metadata(&path)?;
|
|
|
|
let size = metadata.len() as i64;
|
|
|
|
let mtime = metadata
|
|
|
|
.modified()
|
|
|
|
.map(|t| {
|
|
|
|
NaiveDateTime::from_timestamp(
|
|
|
|
t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64,
|
|
|
|
0,
|
|
|
|
)
|
|
|
|
})
|
|
|
|
.ok();
|
|
|
|
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
|
|
|
|
|
|
|
|
self.insert_file_with_metadata(
|
|
|
|
connection,
|
|
|
|
&normalized_path,
|
|
|
|
hash,
|
|
|
|
size,
|
|
|
|
mtime,
|
|
|
|
mime_type,
|
|
|
|
None,
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn insert_file_with_metadata(
|
|
|
|
&self,
|
|
|
|
connection: &UpEndConnection,
|
|
|
|
normalized_path: &Path,
|
|
|
|
hash: Hash,
|
|
|
|
size: i64,
|
|
|
|
mtime: Option<NaiveDateTime>,
|
|
|
|
mime_type: Option<String>,
|
|
|
|
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
|
|
|
|
) -> Result<Address> {
|
|
|
|
let new_file = db::NewFile {
|
|
|
|
path: normalized_path
|
|
|
|
.to_str()
|
|
|
|
.ok_or(anyhow!("Path not UTF-8?!"))?
|
|
|
|
.to_string(),
|
|
|
|
hash: (hash.clone()).0,
|
|
|
|
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
|
|
|
size,
|
|
|
|
mtime,
|
|
|
|
};
|
|
|
|
|
|
|
|
let blob_address = Address::Hash(hash);
|
|
|
|
|
|
|
|
// Metadata
|
|
|
|
let type_entry = Entry {
|
|
|
|
entity: blob_address.clone(),
|
|
|
|
attribute: String::from(IS_OF_TYPE_ATTR),
|
|
|
|
value: BLOB_TYPE_ADDR.clone().into(),
|
|
|
|
};
|
|
|
|
|
|
|
|
let size_entry = Entry {
|
|
|
|
entity: blob_address.clone(),
|
|
|
|
attribute: FILE_SIZE_KEY.to_string(),
|
|
|
|
value: (size as f64).into(),
|
|
|
|
};
|
|
|
|
|
|
|
|
let mime_entry = mime_type.map(|mime_type| Entry {
|
|
|
|
entity: blob_address.clone(),
|
|
|
|
attribute: FILE_MIME_KEY.to_string(),
|
|
|
|
value: mime_type.into(),
|
|
|
|
});
|
|
|
|
|
|
|
|
let added_entry = Entry {
|
|
|
|
entity: blob_address.clone(),
|
|
|
|
attribute: ADDED_ATTR.to_string(),
|
|
|
|
value: (SystemTime::now()
|
|
|
|
.duration_since(UNIX_EPOCH)
|
|
|
|
.unwrap()
|
|
|
|
.as_secs() as f64)
|
|
|
|
.into(),
|
|
|
|
};
|
|
|
|
|
|
|
|
// Add the appropriate entries w/r/t virtual filesystem location
|
|
|
|
let components = normalized_path.components().collect::<Vec<Component>>();
|
|
|
|
let (filename, dir_path) = components.split_last().unwrap();
|
|
|
|
|
|
|
|
let upath = UHierPath(
|
|
|
|
iter::once(UNode::new("NATIVE").unwrap())
|
|
|
|
.chain(dir_path.iter().map(|component| {
|
|
|
|
UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap()
|
|
|
|
}))
|
|
|
|
.collect(),
|
|
|
|
);
|
|
|
|
let resolved_path = match resolve_cache {
|
|
|
|
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
|
|
|
|
None => resolve_path(connection, &upath, true)?,
|
|
|
|
};
|
|
|
|
let parent_dir = resolved_path.last().unwrap();
|
|
|
|
|
|
|
|
// Insert all
|
|
|
|
let file_count = self.insert_file(new_file)?;
|
|
|
|
|
|
|
|
connection.insert_entry_immutable(type_entry)?;
|
|
|
|
connection.insert_entry_immutable(size_entry)?;
|
|
|
|
if file_count == 1 {
|
|
|
|
connection.insert_entry_immutable(added_entry)?;
|
|
|
|
}
|
|
|
|
if let Some(mime_entry) = mime_entry {
|
|
|
|
connection.insert_entry(mime_entry)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
let dir_has_entry = Entry {
|
|
|
|
entity: parent_dir.clone(),
|
|
|
|
attribute: HIER_HAS_ATTR.to_string(),
|
|
|
|
value: blob_address.clone().into(),
|
|
|
|
};
|
|
|
|
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
|
|
|
|
|
|
|
|
let label_entry = Entry {
|
|
|
|
entity: blob_address.clone(),
|
|
|
|
attribute: LABEL_ATTR.to_string(),
|
|
|
|
value: filename.as_os_str().to_string_lossy().to_string().into(),
|
|
|
|
};
|
|
|
|
let label_entry_addr = connection.insert_entry(label_entry)?;
|
|
|
|
|
|
|
|
let alias_entry = Entry {
|
|
|
|
entity: dir_has_entry_addr,
|
|
|
|
attribute: ALIAS_KEY.to_string(),
|
|
|
|
value: label_entry_addr.into(),
|
|
|
|
};
|
|
|
|
connection.insert_entry(alias_entry)?;
|
|
|
|
|
|
|
|
Ok(blob_address)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn insert_file(&self, file: db::NewFile) -> Result<u32> {
|
|
|
|
debug!(
|
|
|
|
"Inserting {} ({})...",
|
|
|
|
&file.path,
|
|
|
|
Address::Hash(Hash((&file.hash).clone()))
|
|
|
|
);
|
|
|
|
|
2022-09-13 19:43:30 +02:00
|
|
|
let _lock = self.lock.write().unwrap();
|
2022-08-19 14:04:18 +02:00
|
|
|
let conn = self.manager.connect()?;
|
|
|
|
|
|
|
|
diesel::insert_into(files::table)
|
|
|
|
.values(&file)
|
|
|
|
.execute(&conn)?;
|
|
|
|
|
|
|
|
Ok(files::dsl::files
|
|
|
|
.filter(files::dsl::valid.eq(true))
|
|
|
|
.filter(files::dsl::hash.eq(file.hash))
|
|
|
|
.count()
|
|
|
|
.first::<i64>(&conn)?
|
|
|
|
.try_into()
|
|
|
|
.unwrap())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<db::OutFile>> {
|
|
|
|
use self::db::files::dsl::*;
|
|
|
|
|
2022-09-13 19:43:30 +02:00
|
|
|
let _lock = self.lock.read().unwrap();
|
2022-08-19 14:04:18 +02:00
|
|
|
let conn = self.manager.connect()?;
|
|
|
|
|
|
|
|
let matches = files
|
|
|
|
.filter(valid.eq(true))
|
|
|
|
.filter(hash.eq(&obj_hash.0))
|
|
|
|
.load::<db::File>(&conn)?;
|
|
|
|
|
|
|
|
let matches = matches
|
|
|
|
.into_iter()
|
|
|
|
.map(|f| db::OutFile {
|
|
|
|
id: f.id,
|
|
|
|
hash: f.hash,
|
|
|
|
path: self.path.join(PathBuf::from(f.path)),
|
|
|
|
valid: f.valid,
|
|
|
|
added: f.added,
|
|
|
|
size: f.size,
|
|
|
|
mtime: f.mtime,
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
Ok(matches)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn retrieve_all_files(&self) -> Result<Vec<db::File>> {
|
|
|
|
use self::db::files::dsl::*;
|
|
|
|
|
2022-09-13 19:43:30 +02:00
|
|
|
let _lock = self.lock.read().unwrap();
|
2022-08-19 14:04:18 +02:00
|
|
|
let conn = self.manager.connect()?;
|
|
|
|
|
|
|
|
let matches = files.load::<db::File>(&conn)?;
|
|
|
|
Ok(matches)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn file_update_mtime(&self, file_id: i32, m_time: Option<NaiveDateTime>) -> Result<usize> {
|
|
|
|
use self::db::files::dsl::*;
|
|
|
|
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
|
|
|
|
2022-09-13 19:43:30 +02:00
|
|
|
let _lock = self.lock.write().unwrap();
|
2022-08-19 14:04:18 +02:00
|
|
|
let conn = self.manager.connect()?;
|
|
|
|
|
|
|
|
Ok(diesel::update(files.filter(id.eq(file_id)))
|
|
|
|
.set(mtime.eq(m_time))
|
|
|
|
.execute(&conn)?)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result<usize> {
|
|
|
|
use self::db::files::dsl::*;
|
|
|
|
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
|
|
|
|
2022-09-13 19:43:30 +02:00
|
|
|
let _lock = self.lock.write().unwrap();
|
2022-08-19 14:04:18 +02:00
|
|
|
let conn = self.manager.connect()?;
|
|
|
|
|
|
|
|
Ok(diesel::update(files.filter(id.eq(file_id)))
|
|
|
|
.set(valid.eq(is_valid))
|
|
|
|
.execute(&conn)?)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn normalize_path(&self, path: &Path) -> Result<PathBuf> {
|
|
|
|
Ok(path
|
|
|
|
.canonicalize()?
|
|
|
|
.strip_prefix(self.path.as_path())?
|
|
|
|
.to_path_buf())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<db::OutFile> for Blob {
|
|
|
|
fn from(of: db::OutFile) -> Self {
|
|
|
|
Blob { file_path: of.path }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<db::File> for Blob {
|
|
|
|
fn from(f: db::File) -> Self {
|
|
|
|
Blob {
|
|
|
|
file_path: PathBuf::from(f.path),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl UpStore for FsStore {
|
|
|
|
fn retrieve(&self, hash: &crate::util::hash::Hash) -> Result<Vec<Blob>, super::StoreError> {
|
|
|
|
Ok(self
|
|
|
|
.retrieve_file(hash)
|
|
|
|
.map_err(|e| StoreError::Unknown(e.to_string()))?
|
|
|
|
.into_iter()
|
|
|
|
.map(Blob::from)
|
|
|
|
.collect())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn retrieve_all(&self) -> Result<Vec<Blob>, super::StoreError> {
|
|
|
|
Ok(self
|
|
|
|
.retrieve_all_files()
|
|
|
|
.map_err(|e| StoreError::Unknown(e.to_string()))?
|
|
|
|
.into_iter()
|
|
|
|
.map(Blob::from)
|
|
|
|
.collect())
|
|
|
|
}
|
|
|
|
|
2022-09-13 19:16:22 +02:00
|
|
|
fn store(
|
2022-08-19 14:04:18 +02:00
|
|
|
&self,
|
|
|
|
connection: UpEndConnection,
|
|
|
|
blob: Blob,
|
2022-09-13 19:16:22 +02:00
|
|
|
name_hint: Option<String>,
|
2022-08-19 14:04:18 +02:00
|
|
|
) -> Result<Hash, super::StoreError> {
|
|
|
|
let file_path = blob.get_file_path();
|
|
|
|
let hash = file_path
|
|
|
|
.hash()
|
|
|
|
.map_err(|e| StoreError::Unknown(e.to_string()))?;
|
|
|
|
|
|
|
|
let existing_files = self.retrieve(&hash)?;
|
|
|
|
|
|
|
|
if existing_files.is_empty() {
|
|
|
|
let address = Address::Hash(hash.clone());
|
|
|
|
let addr_str = b58_encode(
|
|
|
|
address
|
|
|
|
.encode()
|
|
|
|
.map_err(|e| StoreError::Unknown(e.to_string()))?,
|
|
|
|
);
|
|
|
|
|
2022-09-13 19:16:22 +02:00
|
|
|
let final_name = if let Some(name_hint) = name_hint {
|
2022-08-19 14:04:18 +02:00
|
|
|
format!("{addr_str}_{name_hint}")
|
|
|
|
} else {
|
|
|
|
addr_str
|
|
|
|
};
|
|
|
|
|
|
|
|
let final_path = self.path.join(&final_name);
|
|
|
|
|
|
|
|
fs::copy(file_path, &final_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
|
|
|
|
|
|
|
|
self.add_file(&connection, &final_path, hash.clone())
|
|
|
|
.map_err(|e| StoreError::Unknown(e.to_string()))?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(hash)
|
|
|
|
}
|
|
|
|
|
2022-09-13 19:16:22 +02:00
|
|
|
fn update(
|
2022-08-19 14:04:18 +02:00
|
|
|
&self,
|
2022-09-13 19:16:22 +02:00
|
|
|
db: &UpEndDatabase,
|
2022-08-19 14:04:18 +02:00
|
|
|
mut job_container: JobContainer,
|
2022-09-15 20:20:23 +02:00
|
|
|
initial: bool,
|
2022-08-19 14:04:18 +02:00
|
|
|
) -> Result<Vec<UpdatePathOutcome>, StoreError> {
|
|
|
|
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
|
|
|
|
|
|
|
|
match job_result {
|
|
|
|
Ok(job_handle) => {
|
2022-09-15 20:20:23 +02:00
|
|
|
let result = self.rescan_vault(db, job_handle, !initial, initial);
|
2022-08-19 14:04:18 +02:00
|
|
|
|
|
|
|
if let Err(err) = &result {
|
|
|
|
error!("Update did not succeed! {:?}", err);
|
|
|
|
}
|
|
|
|
|
|
|
|
result.map_err(|err| StoreError::Unknown(err.to_string()))
|
|
|
|
}
|
|
|
|
Err(err) => Err(StoreError::Unknown(err.to_string())),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use crate::database::UpEndDatabase;
|
|
|
|
use crate::util::jobs::JobContainer;
|
|
|
|
|
|
|
|
use super::*;
|
|
|
|
use std::fs::File;
|
|
|
|
use std::io::Write;
|
|
|
|
use tempfile::TempDir;
|
|
|
|
|
|
|
|
use std::sync::Once;
|
2022-09-13 19:43:30 +02:00
|
|
|
use tracing_subscriber::filter::EnvFilter;
|
2022-08-19 14:04:18 +02:00
|
|
|
|
|
|
|
static INIT: Once = Once::new();
|
|
|
|
|
|
|
|
pub fn initialize() {
|
|
|
|
INIT.call_once(|| {
|
|
|
|
tracing_subscriber::fmt()
|
2022-09-13 19:16:06 +02:00
|
|
|
.with_env_filter(EnvFilter::builder().from_env_lossy())
|
2022-08-19 14:04:18 +02:00
|
|
|
.init();
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_update() {
|
|
|
|
// Prepare temporary filesystem structure
|
|
|
|
let temp_dir = TempDir::new().unwrap();
|
|
|
|
|
|
|
|
let file_path = temp_dir.path().join("my-temporary-note.txt");
|
|
|
|
let mut tmp_file = File::create(file_path).unwrap();
|
|
|
|
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
|
|
|
|
|
|
|
|
let file_path = temp_dir.path().join("hello-world.txt");
|
|
|
|
let mut tmp_file = File::create(file_path).unwrap();
|
|
|
|
writeln!(tmp_file, "Hello, World!").unwrap();
|
|
|
|
|
|
|
|
let file_path = temp_dir.path().join("empty");
|
|
|
|
File::create(file_path).unwrap();
|
|
|
|
|
|
|
|
// Initialize database
|
|
|
|
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
|
|
|
|
let store = FsStore::from_path(&temp_dir).unwrap();
|
|
|
|
let job_container = JobContainer::new();
|
|
|
|
|
|
|
|
// Store scan
|
2022-09-15 20:20:23 +02:00
|
|
|
let rescan_result = store.update(&open_result.db, job_container.clone(), false);
|
2022-08-19 14:04:18 +02:00
|
|
|
assert!(rescan_result.is_ok());
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_rescan_quick() {
|
|
|
|
initialize();
|
|
|
|
_test_rescan(true)
|
|
|
|
}
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_rescan_full() {
|
|
|
|
initialize();
|
|
|
|
_test_rescan(false)
|
|
|
|
}
|
|
|
|
|
|
|
|
fn _test_rescan(quick: bool) {
|
|
|
|
// Prepare temporary filesystem structure
|
|
|
|
let temp_dir = TempDir::new().unwrap();
|
|
|
|
|
|
|
|
let file_path = temp_dir.path().join("my-temporary-note.txt");
|
|
|
|
let mut tmp_file = File::create(file_path).unwrap();
|
|
|
|
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
|
|
|
|
|
|
|
|
let file_path = temp_dir.path().join("hello-world.txt");
|
|
|
|
let mut tmp_file = File::create(file_path).unwrap();
|
|
|
|
writeln!(tmp_file, "Hello, World!").unwrap();
|
|
|
|
|
|
|
|
let file_path = temp_dir.path().join("empty");
|
|
|
|
File::create(file_path).unwrap();
|
|
|
|
|
|
|
|
// Initialize database
|
|
|
|
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
|
|
|
|
let store = FsStore::from_path(&temp_dir).unwrap();
|
|
|
|
let mut job_container = JobContainer::new();
|
|
|
|
|
|
|
|
// Initial scan
|
|
|
|
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
|
|
|
|
let rescan_result = store.rescan_vault(&open_result.db, job, quick, true);
|
|
|
|
|
|
|
|
assert!(rescan_result.is_ok());
|
|
|
|
let rescan_result = rescan_result.unwrap();
|
|
|
|
assert_eq!(rescan_result.len(), 3);
|
|
|
|
rescan_result
|
|
|
|
.into_iter()
|
|
|
|
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_))));
|
|
|
|
|
|
|
|
// Modification-less rescan
|
|
|
|
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
|
|
|
|
let rescan_result = store.rescan_vault(&open_result.db, job, quick, false);
|
|
|
|
|
|
|
|
assert!(rescan_result.is_ok());
|
|
|
|
let rescan_result = rescan_result.unwrap();
|
|
|
|
assert_eq!(rescan_result.len(), 3);
|
|
|
|
rescan_result
|
|
|
|
.into_iter()
|
|
|
|
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_))));
|
|
|
|
|
|
|
|
// Remove a file
|
|
|
|
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
|
|
|
|
|
|
|
|
let job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
|
|
|
|
let rescan_result = store.rescan_vault(&open_result.db, job, quick, false);
|
|
|
|
|
|
|
|
assert!(rescan_result.is_ok());
|
|
|
|
let rescan_result = rescan_result.unwrap();
|
|
|
|
assert_eq!(rescan_result.len(), 3);
|
|
|
|
assert_eq!(
|
|
|
|
2,
|
|
|
|
rescan_result
|
|
|
|
.iter()
|
|
|
|
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
|
|
|
|
.count()
|
|
|
|
);
|
|
|
|
assert_eq!(
|
|
|
|
1,
|
|
|
|
rescan_result
|
|
|
|
.iter()
|
|
|
|
.filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_)))
|
|
|
|
.count()
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|