428 lines
13 KiB
Rust
428 lines
13 KiB
Rust
use std::convert::TryFrom;
|
|
use std::path::{Component, Path, PathBuf};
|
|
use std::sync::{Arc, RwLock};
|
|
use std::time::{Instant, UNIX_EPOCH};
|
|
use std::{fs, iter};
|
|
|
|
use crate::addressing::Address;
|
|
use crate::database::constants::{
|
|
HIER_HAS_ATTR, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, TYPE_HAS_ATTR,
|
|
};
|
|
use crate::database::entry::{Entry, EntryValue, InvariantEntry};
|
|
use crate::database::hierarchies::{resolve_path, UNode, UPath};
|
|
use crate::database::inner::models;
|
|
use crate::database::{
|
|
file_set_valid, insert_entry, insert_file, retrieve_all_files, DbPool, DATABASE_FILENAME,
|
|
};
|
|
use crate::util::hash::{Hash, Hashable};
|
|
use crate::util::jobs::{Job, JobContainer, JobId, State};
|
|
use anyhow::{Error, Result};
|
|
use chrono::prelude::*;
|
|
use diesel::Connection;
|
|
use log::{error, info, warn};
|
|
use rayon::prelude::*;
|
|
use serde_json::Value;
|
|
use walkdir::WalkDir;
|
|
|
|
const BLOB_TYPE: &str = "BLOB";
|
|
const ALIAS_KEY: &str = "ALIAS";
|
|
const FILE_MIME_KEY: &str = "FILE_MIME";
|
|
const FILE_MTIME_KEY: &str = "FILE_MTIME";
|
|
const FILE_SIZE_KEY: &str = "FILE_SIZE";
|
|
|
|
lazy_static! {
|
|
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
|
|
attribute: String::from(TYPE_BASE_ATTR),
|
|
value: EntryValue::Value(Value::from(BLOB_TYPE)),
|
|
};
|
|
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
|
}
|
|
|
|
fn initialize_types(pool: &DbPool) -> Result<()> {
|
|
// BLOB_TYPE
|
|
insert_entry(&pool.get()?, Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
|
|
upend_insert_addr!(&pool.get()?, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
|
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY);
|
|
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY);
|
|
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn rescan_vault(
|
|
pool: DbPool,
|
|
directory: PathBuf,
|
|
job_container: Arc<RwLock<JobContainer>>,
|
|
) {
|
|
let job_id = job_container
|
|
.write()
|
|
.unwrap()
|
|
.add_job(Job::new("REIMPORT", "Reimporting vault..."))
|
|
.unwrap();
|
|
|
|
let job_container_rescan = job_container.clone();
|
|
let result =
|
|
actix_web::web::block(move || _rescan_vault(pool, directory, job_container_rescan, job_id))
|
|
.await;
|
|
|
|
if result.is_err() {
|
|
let err = result.err().unwrap();
|
|
error!("Update did not succeed! {:?}", err);
|
|
|
|
job_container
|
|
.write()
|
|
.unwrap()
|
|
.update_state(&job_id, State::Failed)
|
|
.unwrap();
|
|
}
|
|
}
|
|
|
|
type UpdatePathResult = Result<UpdatePathOutcome>;
|
|
|
|
enum UpdatePathOutcome {
|
|
Added(PathBuf),
|
|
Unchanged(PathBuf),
|
|
Removed(PathBuf),
|
|
}
|
|
|
|
fn _rescan_vault<T: AsRef<Path>>(
|
|
pool: DbPool,
|
|
directory: T,
|
|
job_container: Arc<RwLock<JobContainer>>,
|
|
job_id: JobId,
|
|
) -> Result<Vec<UpdatePathResult>> {
|
|
let start = Instant::now();
|
|
|
|
// Initialize types, etc...
|
|
initialize_types(&pool)?;
|
|
|
|
// Walk through the vault, find all paths
|
|
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
|
|
.follow_links(true)
|
|
.into_iter()
|
|
.filter_map(|e| e.ok())
|
|
.filter(|e| e.path().is_file() && e.file_name() != DATABASE_FILENAME)
|
|
.map(|e| fs::canonicalize(e.into_path()).unwrap())
|
|
.collect();
|
|
|
|
// Prepare for processing
|
|
let rw_pool = Arc::new(RwLock::new(pool.clone()));
|
|
let absolute_path = fs::canonicalize(&directory)?;
|
|
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
|
|
|
|
// Actual processing
|
|
let count = RwLock::new(0_usize);
|
|
let total = path_entries.len() as f32;
|
|
let path_results: Vec<UpdatePathResult> = path_entries
|
|
.into_par_iter()
|
|
.map(|path| {
|
|
let result = _process_directory_entry(&rw_pool, path, &absolute_path, &existing_files)?;
|
|
|
|
let mut cnt = count.write().unwrap();
|
|
*cnt += 1;
|
|
|
|
job_container
|
|
.write()
|
|
.unwrap()
|
|
.update_progress(&job_id, *cnt as f32 / total * 100.0)
|
|
.unwrap();
|
|
|
|
Ok(result)
|
|
})
|
|
.collect();
|
|
|
|
let existing_files = existing_files.read().unwrap();
|
|
|
|
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
|
|
let connection = pool.get()?;
|
|
connection.transaction::<_, Error, _>(|| {
|
|
file_set_valid(&connection, file.id, false)?;
|
|
// remove_object(&connection, )?
|
|
Ok(UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())))
|
|
})
|
|
});
|
|
|
|
let mut failed: Vec<&Error> = vec![];
|
|
let mut created = 0;
|
|
let mut unchanged = 0;
|
|
let mut deleted = 0;
|
|
|
|
for result in &path_results {
|
|
match result {
|
|
Ok(result) => match result {
|
|
UpdatePathOutcome::Added(_) => created += 1,
|
|
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
|
|
UpdatePathOutcome::Removed(_) => deleted += 1,
|
|
},
|
|
Err(err) => failed.push(err),
|
|
}
|
|
}
|
|
|
|
if !failed.is_empty() {
|
|
warn!(
|
|
"{} path updates failed! ({})",
|
|
failed.len(),
|
|
failed
|
|
.iter()
|
|
.map(|e| e.to_string())
|
|
.collect::<Vec<String>>()
|
|
.join(", ")
|
|
)
|
|
}
|
|
|
|
info!(
|
|
"Finished updating {} ({} created, {} deleted, {} left unchanged). Took {}s.",
|
|
directory.as_ref().display(),
|
|
created,
|
|
deleted,
|
|
unchanged,
|
|
start.elapsed().as_secs()
|
|
);
|
|
|
|
Ok(path_results.into_iter().chain(cleanup_results).collect())
|
|
}
|
|
|
|
fn _process_directory_entry<P: AsRef<Path>>(
|
|
db_pool: &Arc<RwLock<DbPool>>,
|
|
path: PathBuf,
|
|
directory_path: &P,
|
|
existing_files: &Arc<RwLock<Vec<models::File>>>,
|
|
) -> UpdatePathResult {
|
|
info!("Processing: {:?}", path);
|
|
|
|
// Prepare the data
|
|
let connection = &db_pool.write().unwrap().get()?;
|
|
let existing_files = Arc::clone(existing_files);
|
|
|
|
let normalized_path = path.strip_prefix(&directory_path)?;
|
|
let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!");
|
|
|
|
let mut file_hash: Option<Hash> = None;
|
|
|
|
// Get size & mtime for quick comparison
|
|
let metadata = fs::metadata(&path)?;
|
|
let size = metadata.len() as i64;
|
|
if size < 0 {
|
|
panic!("File {} too large?!", path.display());
|
|
}
|
|
let mtime = metadata
|
|
.modified()
|
|
.map(|t| {
|
|
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
|
|
})
|
|
.ok();
|
|
|
|
// Check if the path entry for this file already exists in database
|
|
let existing_files_read = existing_files.read().unwrap();
|
|
let maybe_existing_file = existing_files_read
|
|
.iter()
|
|
.find(|file| file.path == normalized_path_str)
|
|
.clone();
|
|
|
|
if let Some(existing_file) = maybe_existing_file {
|
|
let existing_file = existing_file.clone();
|
|
drop(existing_files_read);
|
|
|
|
let mut same = size == existing_file.size && mtime == existing_file.mtime;
|
|
if !same {
|
|
file_hash = Some(path.hash()?);
|
|
same = file_hash.as_ref().unwrap() == &existing_file.hash;
|
|
}
|
|
|
|
if same {
|
|
if !existing_file.valid {
|
|
file_set_valid(connection, existing_file.id, true)?;
|
|
}
|
|
|
|
{
|
|
let mut existing_files_write = existing_files.write().unwrap();
|
|
let maybe_existing_file = existing_files_write
|
|
.iter()
|
|
.enumerate()
|
|
.find(|(_, file)| file.path == normalized_path_str)
|
|
.map(|(idx, _)| idx);
|
|
|
|
if let Some(idx) = maybe_existing_file {
|
|
existing_files_write.swap_remove(idx);
|
|
return Ok(UpdatePathOutcome::Unchanged(path));
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
drop(existing_files_read);
|
|
}
|
|
|
|
// If not, add it!
|
|
if file_hash.is_none() {
|
|
file_hash = Some(path.hash()?);
|
|
}
|
|
let file_hash = file_hash.unwrap();
|
|
|
|
let new_file = models::NewFile {
|
|
path: normalized_path_str.to_string(),
|
|
hash: (file_hash.clone()).0,
|
|
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
|
size,
|
|
mtime,
|
|
};
|
|
|
|
insert_file(connection, new_file)?;
|
|
|
|
// Insert metadata
|
|
let type_entry = Entry {
|
|
entity: Address::Hash(file_hash.clone()),
|
|
attribute: String::from(IS_OF_TYPE_ATTR),
|
|
value: EntryValue::Address(BLOB_TYPE_ADDR.clone()),
|
|
};
|
|
insert_entry(connection, type_entry)?;
|
|
|
|
let size_entry = Entry {
|
|
entity: Address::Hash(file_hash.clone()),
|
|
attribute: FILE_SIZE_KEY.to_string(),
|
|
value: EntryValue::Value(Value::from(size)),
|
|
};
|
|
insert_entry(connection, size_entry)?;
|
|
|
|
let mime_entry = Entry {
|
|
entity: Address::Hash(file_hash.clone()),
|
|
attribute: FILE_MIME_KEY.to_string(),
|
|
value: EntryValue::Value(Value::String(tree_magic::from_filepath(&path))),
|
|
};
|
|
insert_entry(connection, mime_entry)?;
|
|
|
|
// Finally, add the appropriate entries w/r/t virtual filesystem location
|
|
let components = normalized_path.components().collect::<Vec<Component>>();
|
|
let (filename, dir_path) = components.split_last().unwrap();
|
|
|
|
let upath = UPath(
|
|
iter::once(UNode::new("NATIVE").unwrap())
|
|
.chain(dir_path.iter().map(|component| {
|
|
UNode::new(component.as_os_str().to_string_lossy().to_string()).unwrap()
|
|
}))
|
|
.collect(),
|
|
);
|
|
let resolved_path = resolve_path(connection, &upath, true)?;
|
|
let parent_dir = resolved_path.last().unwrap();
|
|
|
|
connection.transaction::<_, Error, _>(|| {
|
|
let dir_has_entry = Entry {
|
|
entity: parent_dir.clone(),
|
|
attribute: HIER_HAS_ATTR.to_string(),
|
|
value: EntryValue::Address(Address::Hash(file_hash.clone())),
|
|
};
|
|
let dir_has_entry_addr = insert_entry(connection, dir_has_entry)?;
|
|
|
|
let name_entry = Entry {
|
|
entity: dir_has_entry_addr,
|
|
attribute: ALIAS_KEY.to_string(),
|
|
value: EntryValue::Value(Value::String(
|
|
filename.as_os_str().to_string_lossy().to_string(),
|
|
)),
|
|
};
|
|
insert_entry(connection, name_entry)?;
|
|
|
|
Ok(UpdatePathOutcome::Added(path.clone()))
|
|
})
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use crate::database::open_upend;
|
|
use crate::util;
|
|
|
|
use super::*;
|
|
use std::fs::File;
|
|
use std::io::Write;
|
|
use tempdir::TempDir;
|
|
|
|
#[test]
|
|
fn test_rescan() {
|
|
// Prepare temporary filesystem structure
|
|
let temp_dir = TempDir::new("upend-test").unwrap();
|
|
|
|
let file_path = temp_dir.path().join("my-temporary-note.txt");
|
|
let mut tmp_file = File::create(file_path).unwrap();
|
|
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
|
|
|
|
let file_path = temp_dir.path().join("hello-world.txt");
|
|
let mut tmp_file = File::create(file_path).unwrap();
|
|
writeln!(tmp_file, "Hello, World!").unwrap();
|
|
|
|
let file_path = temp_dir.path().join("empty");
|
|
File::create(file_path).unwrap();
|
|
|
|
// Initialize database
|
|
|
|
let open_result = open_upend(&temp_dir, None, true).unwrap();
|
|
let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default()));
|
|
let job_id = job_container
|
|
.write()
|
|
.unwrap()
|
|
.add_job(util::jobs::Job::new("RESCAN", "TEST JOB"))
|
|
.unwrap();
|
|
|
|
// Initial scan
|
|
let rescan_result = _rescan_vault(
|
|
open_result.pool.clone(),
|
|
temp_dir.as_ref().to_path_buf(),
|
|
job_container.clone(),
|
|
job_id,
|
|
);
|
|
|
|
assert!(rescan_result.is_ok());
|
|
let rescan_result = rescan_result.unwrap();
|
|
assert_eq!(rescan_result.len(), 3);
|
|
rescan_result.into_iter().for_each(|outcome| {
|
|
assert!(outcome.is_ok());
|
|
let outcome = outcome.unwrap();
|
|
assert!(matches!(outcome, UpdatePathOutcome::Added(_)))
|
|
});
|
|
|
|
// Modification-less rescan
|
|
|
|
let rescan_result = _rescan_vault(
|
|
open_result.pool.clone(),
|
|
temp_dir.as_ref().to_path_buf(),
|
|
job_container.clone(),
|
|
job_id,
|
|
);
|
|
|
|
assert!(rescan_result.is_ok());
|
|
let rescan_result = rescan_result.unwrap();
|
|
assert_eq!(rescan_result.len(), 3);
|
|
rescan_result.into_iter().for_each(|outcome| {
|
|
assert!(outcome.is_ok());
|
|
let outcome = outcome.unwrap();
|
|
assert!(matches!(outcome, UpdatePathOutcome::Unchanged(_)))
|
|
});
|
|
|
|
// Remove a file
|
|
|
|
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
|
|
|
|
let rescan_result = _rescan_vault(
|
|
open_result.pool,
|
|
temp_dir.as_ref().to_path_buf(),
|
|
job_container,
|
|
job_id,
|
|
);
|
|
|
|
assert!(rescan_result.is_ok());
|
|
let rescan_result = rescan_result.unwrap();
|
|
assert_eq!(rescan_result.len(), 3);
|
|
assert_eq!(
|
|
2,
|
|
rescan_result
|
|
.iter()
|
|
.filter(|upo| matches!(upo.as_ref().unwrap(), UpdatePathOutcome::Unchanged(_)))
|
|
.count()
|
|
);
|
|
assert_eq!(
|
|
1,
|
|
rescan_result
|
|
.iter()
|
|
.filter(|upo| matches!(upo.as_ref().unwrap(), UpdatePathOutcome::Removed(_)))
|
|
.count()
|
|
);
|
|
}
|
|
}
|