upend/src/filesystem.rs

574 lines
18 KiB
Rust

use std::borrow::Borrow;
use std::convert::TryFrom;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use std::{fs, iter};
use crate::addressing::Address;
use crate::database::constants::{
ADDED_ATTR, HIER_HAS_ATTR, IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_BASE_ATTR,
TYPE_HAS_ATTR,
};
use crate::database::entry::{Entry, InvariantEntry};
use crate::database::hierarchies::{
resolve_path, resolve_path_cached, ResolveCache, UHierPath, UNode,
};
use crate::database::inner::models;
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
use crate::util::hash::{Hash, Hashable};
use crate::util::jobs::{JobContainer, JobHandle};
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use log::{debug, error, info, warn};
use lru::LruCache;
use rayon::prelude::*;
use walkdir::WalkDir;
const BLOB_TYPE: &str = "BLOB";
const ALIAS_KEY: &str = "ALIAS";
pub const FILE_MIME_KEY: &str = "FILE_MIME";
const FILE_MTIME_KEY: &str = "FILE_MTIME";
const FILE_SIZE_KEY: &str = "FILE_SIZE";
lazy_static! {
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
attribute: String::from(TYPE_BASE_ATTR),
value: BLOB_TYPE.into(),
};
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
}
fn initialize_types(connection: &UpEndConnection) -> Result<()> {
// BLOB_TYPE
connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
upend_insert_addr!(connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY)?;
upend_insert_val!(connection, BLOB_TYPE_ADDR, LABEL_ATTR, "Data Blob")?;
Ok(())
}
pub fn rescan_vault<D: Borrow<UpEndDatabase>>(
db: D,
mut job_container: JobContainer,
quick_check: bool,
disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
match job_result {
Ok(job_handle) => {
let result = rescan_vault_inner(db, job_handle, quick_check, disable_synchronous);
if let Err(err) = &result {
error!("Update did not succeed! {:?}", err);
}
result
}
Err(err) => Err(err),
}
}
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
impl Drop for PragmaSynchronousGuard<'_> {
fn drop(&mut self) {
debug!("Re-enabling synchronous mode.");
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
if let Err(err) = res {
error!(
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
err
);
}
}
}
type UpdatePathResult = Result<UpdatePathOutcome>;
#[derive(Debug)]
pub enum UpdatePathOutcome {
Added(PathBuf),
Unchanged(PathBuf),
Removed(PathBuf),
Failed(PathBuf, Error),
}
fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
db: D,
job_handle: JobHandle,
quick_check: bool,
disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
let start = Instant::now();
info!("Vault rescan started.");
let db = db.borrow();
let connection = db.connection()?;
// Initialize types, etc...
debug!("Initializing DB types.");
initialize_types(&connection)?;
// Disable syncing in SQLite for the duration of the import
let mut _guard: Option<PragmaSynchronousGuard> = None;
if disable_synchronous {
debug!("Disabling SQLite synchronous mode");
connection.execute("PRAGMA synchronous = OFF;")?;
_guard = Some(PragmaSynchronousGuard(&connection));
}
// Walk through the vault, find all paths
debug!("Traversing vault directory");
let absolute_dir_path = fs::canonicalize(&*db.vault_path)?;
let path_entries: Vec<PathBuf> = WalkDir::new(&*db.vault_path)
.follow_links(true)
.into_iter()
.filter_map(|e| e.ok())
.filter_map(|e| fs::canonicalize(e.into_path()).ok())
.filter(|e| e.is_file())
.filter(|e| !e.starts_with(&absolute_dir_path.join(UPEND_SUBDIR)))
.collect();
// Prepare for processing
let existing_files = Arc::new(RwLock::new(connection.retrieve_all_files()?));
// Actual processing
let count = RwLock::new(0_usize);
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
let total = path_entries.len() as f32;
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
.into_par_iter()
.map(|path| {
let result = process_directory_entry(
db.connection().unwrap(),
&resolve_cache,
path.clone(),
&existing_files,
quick_check,
);
let mut cnt = count.write().unwrap();
*cnt += 1;
let _ = shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0);
match result {
Ok(result) => result,
Err(error) => {
error!("Failed to update {:?} ({})", path, error);
UpdatePathOutcome::Failed(path, error)
}
}
})
.collect();
debug!("Processing done, cleaning up...");
let existing_files = existing_files.read().unwrap();
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
let trans_result = connection.transaction::<_, Error, _>(|| {
connection.file_set_valid(file.id, false)?;
// remove_object(&connection, )?
Ok(())
});
match trans_result {
Ok(_) => {
info!("Removed: {:?}", file.path);
UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))
}
Err(error) => UpdatePathOutcome::Failed(PathBuf::from(file.path.clone()), error),
}
});
// Re-enable SQLite syncing
drop(_guard);
// Reporting
let all_outcomes = path_outcomes
.into_iter()
.chain(cleanup_results)
.collect::<Vec<UpdatePathOutcome>>();
let mut failed: Vec<(&PathBuf, &Error)> = vec![];
let mut created = 0;
let mut unchanged = 0;
let mut deleted = 0;
for outcome in &all_outcomes {
match outcome {
UpdatePathOutcome::Added(_) => created += 1,
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
UpdatePathOutcome::Removed(_) => deleted += 1,
UpdatePathOutcome::Failed(path, err) => failed.push((path, err)),
}
}
if !failed.is_empty() {
warn!(
"{} path updates failed! ({})",
failed.len(),
failed
.iter()
.map(|(path, error)| format!("{:?}: {}", path, error))
.collect::<Vec<String>>()
.join(", ")
)
}
info!(
"Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.",
db.vault_path,
created,
deleted,
unchanged,
start.elapsed().as_secs()
);
Ok(all_outcomes)
}
fn process_directory_entry(
connection: UpEndConnection,
resolve_cache: &Arc<Mutex<ResolveCache>>,
path: PathBuf,
existing_files: &Arc<RwLock<Vec<models::File>>>,
quick_check: bool,
) -> UpdatePathResult {
debug!("Processing: {:?}", path);
// Prepare the data
let existing_files = Arc::clone(existing_files);
let normalized_path = connection.normalize_path(&path)?;
let normalized_path_str = normalized_path
.to_str()
.ok_or(anyhow!("Path not valid unicode!"))?;
let mut file_hash: Option<Hash> = None;
// Get size & mtime for quick comparison
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
})
.ok();
// Check if the path entry for this file already exists in database
let existing_files_read = existing_files.read().unwrap();
let maybe_existing_file = existing_files_read
.iter()
.find(|file| file.path == normalized_path_str);
if let Some(existing_file) = maybe_existing_file {
let existing_file = existing_file.clone();
drop(existing_files_read);
if !quick_check || size == existing_file.size {
let same_mtime = mtime.is_some() && mtime == existing_file.mtime;
let mut same_hash = false;
if !quick_check || !same_mtime {
file_hash = Some(path.hash()?);
same_hash = file_hash.as_ref().unwrap() == &existing_file.hash;
}
if same_mtime || same_hash {
if mtime != existing_file.mtime {
connection.file_update_mtime(existing_file.id, mtime)?;
}
if !existing_file.valid {
connection.file_set_valid(existing_file.id, true)?;
}
let mut existing_files_write = existing_files.write().unwrap();
let maybe_existing_file = existing_files_write
.iter()
.enumerate()
.find(|(_, file)| file.path == normalized_path_str)
.map(|(idx, _)| idx);
if let Some(idx) = maybe_existing_file {
existing_files_write.swap_remove(idx);
debug!("Unchanged: {:?}", path);
return Ok(UpdatePathOutcome::Unchanged(path));
}
}
}
} else {
drop(existing_files_read);
}
// If not, add it!
if file_hash.is_none() {
file_hash = Some(path.hash()?);
}
let mime_type = tree_magic_mini::from_filepath(&path).map(|s| s.to_string());
insert_file_with_metadata(
&connection,
&normalized_path,
file_hash.unwrap(),
size,
mtime,
mime_type,
Some(resolve_cache),
)
.map(|_| {
info!("Added: {:?}", path);
UpdatePathOutcome::Added(path.clone())
})
}
pub fn add_file(connection: &UpEndConnection, path: &Path, hash: Hash) -> Result<Address> {
let normalized_path = connection.normalize_path(path)?;
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
})
.ok();
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
insert_file_with_metadata(
connection,
&normalized_path,
hash,
size,
mtime,
mime_type,
None,
)
}
fn insert_file_with_metadata(
connection: &UpEndConnection,
normalized_path: &Path,
hash: Hash,
size: i64,
mtime: Option<NaiveDateTime>,
mime_type: Option<String>,
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
) -> Result<Address> {
let new_file = models::NewFile {
path: normalized_path
.to_str()
.ok_or(anyhow!("Path not UTF-8?!"))?
.to_string(),
hash: (hash.clone()).0,
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
mtime,
};
let blob_address = Address::Hash(hash);
// Metadata
let type_entry = Entry {
entity: blob_address.clone(),
attribute: String::from(IS_OF_TYPE_ATTR),
value: BLOB_TYPE_ADDR.clone().into(),
};
let size_entry = Entry {
entity: blob_address.clone(),
attribute: FILE_SIZE_KEY.to_string(),
value: (size as f64).into(),
};
let mime_entry = mime_type.map(|mime_type| Entry {
entity: blob_address.clone(),
attribute: FILE_MIME_KEY.to_string(),
value: mime_type.into(),
});
let added_entry = Entry {
entity: blob_address.clone(),
attribute: ADDED_ATTR.to_string(),
value: (SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as f64)
.into(),
};
// Add the appropriate entries w/r/t virtual filesystem location
let components = normalized_path.components().collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let upath = UHierPath(
iter::once(UNode::new("NATIVE").unwrap())
.chain(dir_path.iter().map(|component| {
UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap()
}))
.collect(),
);
let resolved_path = match resolve_cache {
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?,
None => resolve_path(connection, &upath, true)?,
};
let parent_dir = resolved_path.last().unwrap();
// Insert all
connection.transaction::<_, Error, _>(|| {
let file_count = connection.insert_file(new_file)?;
connection.insert_entry_immutable(type_entry)?;
connection.insert_entry_immutable(size_entry)?;
if file_count == 1 {
connection.insert_entry_immutable(added_entry)?;
}
if let Some(mime_entry) = mime_entry {
connection.insert_entry(mime_entry)?;
}
let dir_has_entry = Entry {
entity: parent_dir.clone(),
attribute: HIER_HAS_ATTR.to_string(),
value: blob_address.clone().into(),
};
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
let label_entry = Entry {
entity: blob_address.clone(),
attribute: LABEL_ATTR.to_string(),
value: filename.as_os_str().to_string_lossy().to_string().into(),
};
let label_entry_addr = connection.insert_entry(label_entry)?;
let alias_entry = Entry {
entity: dir_has_entry_addr,
attribute: ALIAS_KEY.to_string(),
value: label_entry_addr.into(),
};
connection.insert_entry(alias_entry)?;
Ok(blob_address)
})
}
#[cfg(test)]
mod test {
use crate::database::UpEndDatabase;
use crate::util::jobs::JobContainer;
use super::*;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
use std::sync::Once;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
static INIT: Once = Once::new();
pub fn initialize() {
INIT.call_once(|| {
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::DEBUG.into())
.from_env_lossy(),
)
.init();
})
}
#[test]
fn test_rescan_quick() {
initialize();
_test_rescan(true)
}
#[test]
fn test_rescan_full() {
initialize();
_test_rescan(false)
}
fn _test_rescan(quick: bool) {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir.path().join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let file_path = temp_dir.path().join("empty");
File::create(file_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
let mut job_container = JobContainer::new();
let _job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
// Initial scan
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result
.into_iter()
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Added(_))));
// Modification-less rescan
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
rescan_result
.into_iter()
.for_each(|outcome| assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_))));
// Remove a file
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
let rescan_result = rescan_vault(&open_result.db, job_container, quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
assert_eq!(rescan_result.len(), 3);
assert_eq!(
2,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Unchanged(_)))
.count()
);
assert_eq!(
1,
rescan_result
.iter()
.filter(|upo| matches!(upo, UpdatePathOutcome::Removed(_)))
.count()
);
}
}