upend/src/filesystem.rs

513 lines
15 KiB
Rust

use crate::addressing::Address;
use crate::database::{
bulk_retrieve_objects, file_set_valid, insert_entry, insert_file, query_entries,
retrieve_all_files, DbPool, Entry, EntryQuery, EntryValue, InnerEntry, QueryComponent,
DATABASE_FILENAME,
};
use crate::hash::Hashable;
use crate::models;
use crate::models::File;
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use diesel::sqlite::Sqlite;
use diesel::Connection;
use log::{error, info, trace};
use rayon::prelude::*;
use serde::export::Formatter;
use serde_json::Value;
use std::path::{Component, Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::{Instant, UNIX_EPOCH};
use std::{fs, iter};
use uuid::Uuid;
use walkdir::WalkDir;
const DIR_KEY: &str = "DIR";
const DIR_HAS_KEY: &str = "DIR_HAS";
const FILE_IDENTITY_KEY: &str = "FILE_IS";
const FILENAME_KEY: &str = "FILE_NAME";
#[derive(Debug, Clone, PartialEq)]
pub struct UDirectory {
name: String,
}
#[derive(Debug, Clone, PartialEq)]
pub struct UPath(Vec<UDirectory>);
const TOP_SEPARATOR: &str = "//";
impl std::str::FromStr for UPath {
type Err = anyhow::Error;
fn from_str(string: &str) -> Result<Self, Self::Err> {
if string.is_empty() {
Ok(UPath(vec![]))
} else {
let result = match string.find(TOP_SEPARATOR) {
Some(head_idx) => {
let (head, rest) = string.split_at(head_idx);
let mut result: Vec<UDirectory> = Vec::new();
result.push(UDirectory {
name: String::from(head),
});
result.append(
rest[TOP_SEPARATOR.len()..rest.len()]
.trim_end_matches('/')
.split('/')
.map(|part| UDirectory {
name: String::from(part),
})
.collect::<Vec<UDirectory>>()
.as_mut(),
);
result
}
None => string
.trim_end_matches('/')
.split('/')
.map(|part| UDirectory {
name: String::from(part),
})
.collect(),
};
for directory in &result {
if directory.name.is_empty() {
return Err(anyhow!("INVALID PATH: Directory name cannot be empty!"));
}
}
Ok(UPath(result))
}
}
}
impl std::fmt::Display for UDirectory {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
impl std::fmt::Display for UPath {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.0.len() {
0 => write!(f, ""),
1 => write!(f, "{}", self.0.first().unwrap().name),
_ => {
let (head, tail) = self.0.split_first().unwrap();
write!(
f,
"{}//{}",
head.name,
tail.iter()
.map(|udir| udir.name.clone())
.collect::<Vec<String>>()
.join("/")
)
}
}
}
}
trait EntryList {
fn extract_addresses(&self) -> Vec<Address>;
}
impl EntryList for Vec<Entry> {
fn extract_addresses(&self) -> Vec<Address> {
self.iter()
.filter_map(|e| {
if let EntryValue::Address(address) = &e.value {
Some(address.clone())
} else {
None
}
})
.collect()
}
}
pub fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec<Entry>> {
let all_directories: Vec<Entry> = query_entries(
connection,
EntryQuery {
target: QueryComponent::Any,
key: QueryComponent::Exact(DIR_KEY.to_string()),
value: QueryComponent::Any,
},
)?;
let directories_with_parents: Vec<Address> = query_entries(
connection,
EntryQuery {
target: QueryComponent::Any,
key: QueryComponent::Exact(DIR_HAS_KEY.to_string()),
value: QueryComponent::Any,
},
)?
.extract_addresses();
Ok(all_directories
.into_iter()
.filter(|entry| !directories_with_parents.contains(&entry.target))
.collect())
}
pub async fn list_directory<C: Connection<Backend = Sqlite>>(
connection: &C,
path: &UPath,
) -> Result<Vec<Entry>> {
let entry_addresses = match path.0.len() {
0 => list_roots(connection)?
.into_iter()
.map(|e| e.target)
.collect(),
_ => {
let resolved_path: Vec<Address> = resolve_path(connection, path, false)?;
let last = resolved_path.last().unwrap();
query_entries(
connection,
EntryQuery {
target: QueryComponent::Exact(last.clone()),
key: QueryComponent::Exact(DIR_HAS_KEY.to_string()),
value: QueryComponent::Any,
},
)?
.extract_addresses()
}
};
Ok(bulk_retrieve_objects(connection, entry_addresses)?
.into_iter()
.filter(|e| [DIR_KEY, FILENAME_KEY, FILE_IDENTITY_KEY].contains(&e.key.as_str()))
.collect::<Vec<Entry>>())
}
pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
connection: &C,
parent: Option<Address>,
directory: UDirectory,
create: bool,
) -> Result<Address> {
match parent.clone() {
Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory),
None => trace!("FETCHING/CREATING /{:#}", directory),
}
let dir_value = EntryValue::Value(Value::String(directory.name));
let directories: Vec<Address> = query_entries(
connection,
EntryQuery {
target: QueryComponent::Any,
key: QueryComponent::Exact(String::from(DIR_KEY)),
value: QueryComponent::Exact(dir_value.clone()),
},
)?
.into_iter()
.map(|e: Entry| e.target)
.collect();
let valid_directories: Vec<Address> = match parent.clone() {
Some(address) => {
let parent_has: Vec<Address> = query_entries(
connection,
EntryQuery {
target: QueryComponent::Exact(address),
key: QueryComponent::Exact(String::from(DIR_HAS_KEY)),
value: QueryComponent::Any,
},
)?
.extract_addresses();
directories
.into_iter()
.filter(|a| parent_has.contains(a))
.collect()
}
None => directories,
};
match valid_directories.len() {
0 => {
if create {
let new_directory_address = Address::UUID(Uuid::new_v4());
let directory_entry = InnerEntry {
target: new_directory_address.clone(),
key: String::from(DIR_KEY),
value: dir_value,
};
let _ = insert_entry(connection, directory_entry)?;
if let Some(parent_addr) = parent {
let has_entry = InnerEntry {
target: parent_addr,
key: String::from(DIR_HAS_KEY),
value: EntryValue::Address(new_directory_address.clone()),
};
let _ = insert_entry(connection, has_entry)?;
}
Ok(new_directory_address)
} else {
Err(anyhow!("Directory does not exist."))
}
}
1 => Ok(valid_directories[0].clone()),
_ => Err(anyhow!(
"Invalid database state - more than one directory matches the query!"
)),
}
}
pub fn resolve_path<C: Connection<Backend = Sqlite>>(
connection: &C,
path: &UPath,
create: bool,
) -> Result<Vec<Address>> {
let mut result: Vec<Address> = vec![];
let mut path_stack = path.0.to_vec();
path_stack.reverse();
while !path_stack.is_empty() {
let dir_address = fetch_or_create_dir(
connection,
result.last().cloned(),
path_stack.pop().unwrap(),
create,
)?;
result.push(dir_address);
}
Ok(result)
}
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await;
if result.is_err() {
let err = result.err().unwrap();
error!("Update did not succeed! {}", err);
}
}
fn _reimport_directory<T: AsRef<Path>>(pool: DbPool, directory: T) -> Result<Vec<Result<()>>> {
let start = Instant::now();
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file() && e.file_name() != DATABASE_FILENAME)
.map(|e| fs::canonicalize(e.into_path()).unwrap())
.collect();
let rw_pool = Arc::new(RwLock::new(pool.clone()));
let absolute_path = fs::canonicalize(&directory)?;
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
let path_results: Vec<Result<()>> = path_entries
.into_par_iter()
.map(|path| {
Ok(_process_directory_entry(
&rw_pool,
path,
&absolute_path,
&existing_files,
)?)
})
.collect();
let cleanup_results: Vec<Result<()>> = existing_files
.write()
.unwrap()
.iter()
.filter(|f| f.valid)
.map(|file| {
let connection = pool.get()?;
connection.transaction::<_, Error, _>(|| {
let _ = file_set_valid(&connection, file.id, false)?;
// remove_object(&connection, )?
Ok(())
})
})
.collect();
info!(
"Finished updating {}. Took {}s.",
directory.as_ref().display(),
start.elapsed().as_secs()
);
Ok(path_results
.into_iter()
.chain(cleanup_results.into_iter())
.collect())
}
fn _process_directory_entry<P: AsRef<Path>>(
db_pool: &Arc<RwLock<DbPool>>,
path: PathBuf,
directory_path: &P,
existing_files: &Arc<RwLock<Vec<File>>>,
) -> Result<()> {
info!("Processing: {:?}", path);
let db_pool = Arc::clone(&db_pool);
let existing_files = Arc::clone(&existing_files);
let normalized_path = path.strip_prefix(&directory_path)?;
let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!");
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", path.display());
}
let mtime = metadata
.modified()
.map(|t| {
NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0)
})
.ok();
{
let mut existing_files = existing_files.write().unwrap();
let maybe_existing_file = existing_files
.iter()
.enumerate()
.find(|(_, file)| file.path == normalized_path_str)
.clone();
if let Some((idx, existing_file)) = maybe_existing_file {
if size == existing_file.size && mtime == existing_file.mtime {
if !existing_file.valid {
let _ =
file_set_valid(&db_pool.write().unwrap().get()?, existing_file.id, true)?;
}
existing_files.remove(idx);
return Ok(());
}
}
}
let digest = path.hash()?;
let new_file = models::NewFile {
path: normalized_path_str.to_string(),
hash: (digest.clone()).0,
added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
mtime,
};
let _ = insert_file(&db_pool.write().unwrap().get()?, new_file)?;
let components = normalized_path.components().collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let upath = UPath(
iter::once(UDirectory {
name: "NATIVE".to_string(),
})
.chain(dir_path.iter().map(|component| UDirectory {
name: component.as_os_str().to_string_lossy().to_string(),
}))
.collect(),
);
let resolved_path = resolve_path(&db_pool.write().unwrap().get()?, &upath, true)?;
let parent_dir = resolved_path.last().unwrap();
let _pool = &db_pool.write().unwrap();
let connection = _pool.get()?;
connection.transaction::<_, Error, _>(|| {
let file_address = Address::UUID(Uuid::new_v4());
let name_entry = InnerEntry {
target: file_address.clone(),
key: FILENAME_KEY.to_string(),
value: EntryValue::Value(Value::String(
filename.as_os_str().to_string_lossy().to_string(),
)),
};
let _ = insert_entry(&connection, name_entry)?;
let identity_entry = InnerEntry {
target: file_address.clone(),
key: FILE_IDENTITY_KEY.to_string(),
value: EntryValue::Address(Address::Hash(digest.clone())),
};
let _ = insert_entry(&connection, identity_entry)?;
let dir_has_entry = InnerEntry {
target: parent_dir.clone(),
key: DIR_HAS_KEY.to_string(),
value: EntryValue::Address(file_address),
};
let _ = insert_entry(&connection, dir_has_entry)?;
Ok(())
})
}
pub fn lookup_by_filename<C: Connection<Backend = Sqlite>>(
connection: &C,
filename: String,
) -> Result<Vec<Entry>> {
let dir_value = EntryValue::Value(Value::String(filename));
let entity_addresses = query_entries(
connection,
EntryQuery {
target: QueryComponent::Any,
key: QueryComponent::In(vec![DIR_KEY.to_string(), FILENAME_KEY.to_string()]), // ?
value: QueryComponent::ILike(dir_value),
},
)?;
bulk_retrieve_objects(
connection,
entity_addresses.into_iter().map(|e| e.target).collect(),
)
}
#[cfg(test)]
mod tests {
use crate::filesystem::{UDirectory, UPath};
use anyhow::Result;
#[test]
fn test_path_codec() {
let path = UPath(vec![
UDirectory {
name: "top".to_string(),
},
UDirectory {
name: "foo".to_string(),
},
UDirectory {
name: "bar".to_string(),
},
UDirectory {
name: "baz".to_string(),
},
]);
let str_path = path.to_string();
assert!(str_path.len() > 0);
let decoded_path: Result<UPath> = str_path.parse();
assert!(decoded_path.is_ok());
assert_eq!(path, decoded_path.unwrap());
}
#[test]
fn test_validation() {
let invalid_path: Result<UPath> = "a//b/c//d/e/f///g".parse();
assert!(invalid_path.is_err())
}
}