upend/src/filesystem.rs

386 lines
13 KiB
Rust
Raw Normal View History

use std::convert::TryFrom;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::{Instant, UNIX_EPOCH};
use std::{fs, iter};
2021-03-20 16:49:01 +01:00
use crate::addressing::Address;
use crate::database::constants::{
HIER_HAS_ATTR, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_ID_ATTR, TYPE_INSTANCES_ATTR,
TYPE_IS_ATTR, TYPE_REQUIRES_ATTR,
};
use crate::database::entry::{Entry, EntryValue, InvariantEntry};
use crate::database::hierarchies::{resolve_path, UNode, UPath};
use crate::database::inner::models;
2021-03-20 16:49:01 +01:00
use crate::database::{
file_set_valid, insert_entry, insert_file, retrieve_all_files, DbPool, DATABASE_FILENAME,
2021-03-20 16:49:01 +01:00
};
use crate::util::hash::Hashable;
use crate::util::jobs::{Job, JobContainer, JobId, State};
2021-03-18 22:42:03 +01:00
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use diesel::Connection;
use log::{error, info, warn};
use once_cell::unsync::Lazy;
2021-03-18 22:42:03 +01:00
use rayon::prelude::*;
use serde_json::Value;
use uuid::Uuid;
use walkdir::WalkDir;
2021-03-14 22:16:28 +01:00
const DIR_TYPE: &str = "FS_DIR";
const DIR_KEY: &str = "DIR";
lazy_static! {
static ref DIR_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
attribute: String::from(TYPE_IS_ATTR),
value: EntryValue::Value(Value::from(DIR_TYPE)),
};
2021-03-18 22:42:03 +01:00
static ref DIR_TYPE_ADDR: Address = DIR_TYPE_INVARIANT.entity().unwrap();
}
2020-09-12 23:07:50 +02:00
2021-06-04 15:14:58 +02:00
const BLOB_TYPE: &str = "BLOB";
2021-03-14 22:16:28 +01:00
const FILE_TYPE: &str = "FS_FILE";
2020-09-12 23:07:50 +02:00
const FILE_IDENTITY_KEY: &str = "FILE_IS";
const FILENAME_KEY: &str = "FILE_NAME";
const FILE_MIME_KEY: &str = "FILE_MIME";
const FILE_MTIME_KEY: &str = "FILE_MTIME";
const FILE_SIZE_KEY: &str = "FILE_SIZE";
lazy_static! {
2021-06-04 15:14:58 +02:00
static ref BLOB_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
attribute: String::from(TYPE_IS_ATTR),
value: EntryValue::Value(Value::from(BLOB_TYPE)),
};
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
static ref FILE_TYPE_INVARIANT: InvariantEntry = InvariantEntry {
attribute: String::from(TYPE_IS_ATTR),
value: EntryValue::Value(Value::from(FILE_TYPE)),
};
2021-03-18 22:42:03 +01:00
static ref FILE_TYPE_ADDR: Address = FILE_TYPE_INVARIANT.entity().unwrap();
}
2020-08-27 00:11:50 +02:00
fn initialize_types(pool: &DbPool) -> Result<()> {
2021-06-04 15:14:58 +02:00
// BLOB_TYPE
insert_entry(&pool.get()?, Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
upend_insert_addr!(&pool.get()?, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
2021-06-19 15:56:10 +02:00
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY);
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY);
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY);
2021-06-04 15:14:58 +02:00
// FILE_TYPE
insert_entry(&pool.get()?, Entry::try_from(&*FILE_TYPE_INVARIANT)?)?;
upend_insert_addr!(&pool.get()?, FILE_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
upend_insert_val!(
&pool.get()?,
FILE_TYPE_ADDR,
TYPE_INSTANCES_ATTR,
FILE_IDENTITY_KEY
);
upend_insert_val!(&pool.get()?, FILE_TYPE_ADDR, TYPE_ID_ATTR, FILENAME_KEY);
upend_insert_val!(
&pool.get()?,
FILE_TYPE_ADDR,
TYPE_REQUIRES_ATTR,
FILE_IDENTITY_KEY
);
upend_insert_val!(&pool.get()?, FILE_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY);
// DIR_TYPE
insert_entry(&pool.get()?, Entry::try_from(&*DIR_TYPE_INVARIANT)?)?;
upend_insert_addr!(&pool.get()?, DIR_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
upend_insert_val!(&pool.get()?, DIR_TYPE_ADDR, TYPE_ID_ATTR, DIR_KEY);
upend_insert_val!(&pool.get()?, DIR_TYPE_ADDR, TYPE_HAS_ATTR, HIER_HAS_ATTR);
Ok(())
}
pub async fn rescan_vault(
pool: DbPool,
directory: PathBuf,
job_container: Arc<RwLock<JobContainer>>,
) {
let job_id = job_container
.write()
.unwrap()
.add_job(Job::new("REIMPORT", "Reimporting vault..."))
.unwrap();
let job_container_rescan = job_container.clone();
let result =
actix_web::web::block(move || _rescan_vault(pool, directory, job_container_rescan, job_id))
.await;
if result.is_err() {
2020-09-20 19:28:44 +02:00
let err = result.err().unwrap();
error!("Update did not succeed! {:?}", err);
job_container
.write()
.unwrap()
.update_state(&job_id, State::Failed)
.unwrap();
}
}
type UpdatePathResult = Result<UpdatePathOutcome>;
enum UpdatePathOutcome {
Added(PathBuf),
Unchanged(PathBuf),
Removed(PathBuf),
}
fn _rescan_vault<T: AsRef<Path>>(
pool: DbPool,
directory: T,
job_container: Arc<RwLock<JobContainer>>,
job_id: JobId,
) -> Result<Vec<UpdatePathResult>> {
2020-09-23 23:11:50 +02:00
let start = Instant::now();
2021-03-14 22:16:28 +01:00
// Initialize types, etc...
initialize_types(&pool)?;
2021-03-14 22:16:28 +01:00
// Walk through the vault, find all paths
2020-09-20 19:28:44 +02:00
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
2020-08-27 00:11:50 +02:00
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file() && e.file_name() != DATABASE_FILENAME)
.map(|e| fs::canonicalize(e.into_path()).unwrap())
.collect();
2021-03-14 22:16:28 +01:00
// Prepare for processing
let rw_pool = Arc::new(RwLock::new(pool.clone()));
let absolute_path = fs::canonicalize(&directory)?;
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
2021-03-14 22:16:28 +01:00
// Actual processing
let count = RwLock::new(0_usize);
let total = path_entries.len() as f32;
let path_results: Vec<UpdatePathResult> = path_entries
2020-09-20 19:28:44 +02:00
.into_par_iter()
.map(|path| {
let result = _process_directory_entry(&rw_pool, path, &absolute_path, &existing_files)?;
let mut cnt = count.write().unwrap();
*cnt += 1;
job_container
.write()
.unwrap()
.update_progress(&job_id, *cnt as f32 / total * 100.0)
.unwrap();
Ok(result)
})
.collect();
let cleanup_results: Vec<UpdatePathResult> = existing_files
.write()
.unwrap()
.iter()
.filter(|f| f.valid)
.map(|file| {
let connection = pool.get()?;
connection.transaction::<_, Error, _>(|| {
2021-03-14 22:16:28 +01:00
file_set_valid(&connection, file.id, false)?;
// remove_object(&connection, )?
Ok(UpdatePathOutcome::Removed(PathBuf::from(file.path.clone())))
})
})
.collect();
2021-03-20 16:49:01 +01:00
let mut failed: Vec<&Error> = vec![];
let mut created = 0;
let mut unchanged = 0;
let mut deleted = 0;
for result in &path_results {
match result {
Ok(result) => match result {
UpdatePathOutcome::Added(_) => created += 1,
UpdatePathOutcome::Unchanged(_) => unchanged += 1,
UpdatePathOutcome::Removed(_) => deleted += 1,
},
Err(err) => failed.push(err),
}
}
if !failed.is_empty() {
warn!(
"{} path updates failed! ({})",
failed.len(),
failed
.iter()
.map(|e| e.to_string())
.collect::<Vec<String>>()
.join(", ")
)
}
2020-09-23 23:11:50 +02:00
info!(
2021-03-20 16:49:01 +01:00
"Finished updating {} ({} created, {} deleted, {} left unchanged). Took {}s.",
2020-09-23 23:11:50 +02:00
directory.as_ref().display(),
2021-03-20 16:49:01 +01:00
created,
deleted,
unchanged,
2020-09-23 23:11:50 +02:00
start.elapsed().as_secs()
);
2020-08-27 00:11:50 +02:00
Ok(path_results
.into_iter()
.chain(cleanup_results.into_iter())
.collect())
2020-08-27 00:11:50 +02:00
}
fn _process_directory_entry<P: AsRef<Path>>(
db_pool: &Arc<RwLock<DbPool>>,
path: PathBuf,
directory_path: &P,
existing_files: &Arc<RwLock<Vec<models::File>>>,
) -> UpdatePathResult {
info!("Processing: {:?}", path);
// Prepare the data
let db_pool = Arc::clone(&db_pool);
let existing_files = Arc::clone(&existing_files);
let normalized_path = path.strip_prefix(&directory_path)?;
let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!");
let digest = Lazy::new(|| path.hash());
// Get size & mtime for quick comparison
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
})
.ok();
// Check if the path entry for this file already exists in database
{
// Only grab existing_files for the duration of this block
let mut existing_files = existing_files.write().unwrap();
let maybe_existing_file = existing_files
.iter()
.enumerate()
2020-09-30 01:33:36 +02:00
.find(|(_, file)| file.path == normalized_path_str);
if let Some((idx, existing_file)) = maybe_existing_file {
if (size == existing_file.size && mtime == existing_file.mtime)
|| ((*digest).is_ok() && &existing_file.hash == (*digest).as_ref().unwrap())
{
if !existing_file.valid {
file_set_valid(&db_pool.write().unwrap().get()?, existing_file.id, true)?;
}
existing_files.swap_remove(idx);
return Ok(UpdatePathOutcome::Unchanged(path));
}
}
}
// If not, add it!
if let Err(err) = &*digest {
return Err(anyhow!(format!("Error hashing: {}", err)));
}
let digest = (*digest).as_ref().unwrap().clone();
let new_file = models::NewFile {
path: normalized_path_str.to_string(),
hash: (digest.clone()).0,
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
mtime,
};
2021-03-14 22:16:28 +01:00
insert_file(&db_pool.write().unwrap().get()?, new_file)?;
// Insert metadata
2021-06-04 15:14:58 +02:00
let type_entry = Entry {
entity: Address::Hash(digest.clone()),
attribute: String::from(IS_OF_TYPE_ATTR),
value: EntryValue::Address(BLOB_TYPE_ADDR.clone()),
};
insert_entry(&db_pool.write().unwrap().get()?, type_entry)?;
let size_entry = Entry {
entity: Address::Hash(digest.clone()),
attribute: FILE_SIZE_KEY.to_string(),
value: EntryValue::Value(Value::from(size)),
};
insert_entry(&db_pool.write().unwrap().get()?, size_entry)?;
if let Some(mtime) = mtime {
let mtime_entry = Entry {
entity: Address::Hash(digest.clone()),
attribute: FILE_MTIME_KEY.to_string(),
value: EntryValue::Value(Value::from(mtime.timestamp())),
};
insert_entry(&db_pool.write().unwrap().get()?, mtime_entry)?;
}
let mime_entry = Entry {
entity: Address::Hash(digest.clone()),
attribute: FILE_MIME_KEY.to_string(),
value: EntryValue::Value(Value::String(tree_magic::from_filepath(&path))),
};
insert_entry(&db_pool.write().unwrap().get()?, mime_entry)?;
// Finally, add the appropriate entries w/r/t virtual filesystem location
let components = normalized_path.components().collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let upath = UPath(
iter::once(UNode("NATIVE".to_string()))
.chain(
dir_path
.iter()
.map(|component| UNode(component.as_os_str().to_string_lossy().to_string())),
)
.collect(),
);
let resolved_path = resolve_path(&db_pool.write().unwrap().get()?, &upath, true)?;
let parent_dir = resolved_path.last().unwrap();
let _pool = &db_pool.write().unwrap();
let connection = _pool.get()?;
connection.transaction::<_, Error, _>(|| {
2021-04-24 00:08:17 +02:00
let file_address = Address::Uuid(Uuid::new_v4());
2021-03-14 22:16:28 +01:00
let type_entry = Entry {
entity: file_address.clone(),
attribute: String::from(IS_OF_TYPE_ATTR),
value: EntryValue::Address(FILE_TYPE_ADDR.clone()),
2021-03-14 22:16:28 +01:00
};
insert_entry(&connection, type_entry)?;
let name_entry = Entry {
entity: file_address.clone(),
attribute: FILENAME_KEY.to_string(),
value: EntryValue::Value(Value::String(
filename.as_os_str().to_string_lossy().to_string(),
)),
};
2021-03-14 22:16:28 +01:00
insert_entry(&connection, name_entry)?;
let identity_entry = Entry {
entity: file_address.clone(),
attribute: FILE_IDENTITY_KEY.to_string(),
value: EntryValue::Address(Address::Hash(digest.clone())),
};
2021-03-14 22:16:28 +01:00
insert_entry(&connection, identity_entry)?;
let dir_has_entry = Entry {
entity: parent_dir.clone(),
attribute: HIER_HAS_ATTR.to_string(),
value: EntryValue::Address(file_address),
};
2021-03-14 22:16:28 +01:00
insert_entry(&connection, dir_has_entry)?;
Ok(UpdatePathOutcome::Added(path.clone()))
})
}