use crate::addressing::Address; use crate::database::{ bulk_retrieve_objects, file_set_valid, insert_entry, insert_file, query_entries, retrieve_all_files, DbPool, Entry, EntryQuery, EntryValue, InnerEntry, QueryComponent, DATABASE_FILENAME, }; use crate::hash::Hashable; use crate::models; use crate::models::File; use anyhow::{anyhow, Error, Result}; use chrono::prelude::*; use diesel::sqlite::Sqlite; use diesel::Connection; use log::{error, info, trace}; use rayon::prelude::*; use serde::export::Formatter; use serde_json::Value; use std::path::{Component, Path, PathBuf}; use std::sync::{Arc, RwLock}; use std::time::{Instant, UNIX_EPOCH}; use std::{fs, iter}; use uuid::Uuid; use walkdir::WalkDir; const DIR_KEY: &str = "DIR"; const DIR_HAS_KEY: &str = "DIR_HAS"; const FILE_IDENTITY_KEY: &str = "FILE_IS"; const FILENAME_KEY: &str = "FILE_NAME"; #[derive(Debug, Clone, PartialEq)] pub struct UDirectory { name: String, } #[derive(Debug, Clone, PartialEq)] pub struct UPath(Vec); const TOP_SEPARATOR: &str = "//"; impl std::str::FromStr for UPath { type Err = anyhow::Error; fn from_str(string: &str) -> Result { if string.is_empty() { Ok(UPath(vec![])) } else { let result = match string.find(TOP_SEPARATOR) { Some(head_idx) => { let (head, rest) = string.split_at(head_idx); let mut result: Vec = Vec::new(); result.push(UDirectory { name: String::from(head), }); result.append( rest[TOP_SEPARATOR.len()..rest.len()] .trim_end_matches('/') .split('/') .map(|part| UDirectory { name: String::from(part), }) .collect::>() .as_mut(), ); result } None => string .trim_end_matches('/') .split('/') .map(|part| UDirectory { name: String::from(part), }) .collect(), }; for directory in &result { if directory.name.is_empty() { return Err(anyhow!("INVALID PATH: Directory name cannot be empty!")); } } Ok(UPath(result)) } } } impl std::fmt::Display for UDirectory { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name) } } impl std::fmt::Display for UPath { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.0.len() { 0 => write!(f, ""), 1 => write!(f, "{}", self.0.first().unwrap().name), _ => { let (head, tail) = self.0.split_first().unwrap(); write!( f, "{}//{}", head.name, tail.iter() .map(|udir| udir.name.clone()) .collect::>() .join("/") ) } } } } trait EntryList { fn extract_addresses(&self) -> Vec
; } impl EntryList for Vec { fn extract_addresses(&self) -> Vec
{ self.iter() .filter_map(|e| { if let EntryValue::Address(address) = &e.value { Some(address.clone()) } else { None } }) .collect() } } pub fn list_roots>(connection: &C) -> Result> { let all_directories: Vec = query_entries( connection, EntryQuery { target: QueryComponent::Any, key: QueryComponent::Exact(DIR_KEY.to_string()), value: QueryComponent::Any, }, )?; let directories_with_parents: Vec
= query_entries( connection, EntryQuery { target: QueryComponent::Any, key: QueryComponent::Exact(DIR_HAS_KEY.to_string()), value: QueryComponent::Any, }, )? .extract_addresses(); Ok(all_directories .into_iter() .filter(|entry| !directories_with_parents.contains(&entry.target)) .collect()) } pub async fn list_directory>( connection: &C, path: &UPath, ) -> Result> { let entry_addresses = match path.0.len() { 0 => list_roots(connection)? .into_iter() .map(|e| e.target) .collect(), _ => { let resolved_path: Vec
= resolve_path(connection, path, false)?; let last = resolved_path.last().unwrap(); query_entries( connection, EntryQuery { target: QueryComponent::Exact(last.clone()), key: QueryComponent::Exact(DIR_HAS_KEY.to_string()), value: QueryComponent::Any, }, )? .extract_addresses() } }; Ok(bulk_retrieve_objects(connection, entry_addresses)? .into_iter() .filter(|e| [DIR_KEY, FILENAME_KEY, FILE_IDENTITY_KEY].contains(&e.key.as_str())) .collect::>()) } pub fn fetch_or_create_dir>( connection: &C, parent: Option
, directory: UDirectory, create: bool, ) -> Result
{ match parent.clone() { Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory), None => trace!("FETCHING/CREATING /{:#}", directory), } let dir_value = EntryValue::Value(Value::String(directory.name)); let directories: Vec
= query_entries( connection, EntryQuery { target: QueryComponent::Any, key: QueryComponent::Exact(String::from(DIR_KEY)), value: QueryComponent::Exact(dir_value.clone()), }, )? .into_iter() .map(|e: Entry| e.target) .collect(); let valid_directories: Vec
= match parent.clone() { Some(address) => { let parent_has: Vec
= query_entries( connection, EntryQuery { target: QueryComponent::Exact(address), key: QueryComponent::Exact(String::from(DIR_HAS_KEY)), value: QueryComponent::Any, }, )? .extract_addresses(); directories .into_iter() .filter(|a| parent_has.contains(a)) .collect() } None => directories, }; match valid_directories.len() { 0 => { if create { let new_directory_address = Address::UUID(Uuid::new_v4()); let directory_entry = InnerEntry { target: new_directory_address.clone(), key: String::from(DIR_KEY), value: dir_value, }; let _ = insert_entry(connection, directory_entry)?; if let Some(parent_addr) = parent { let has_entry = InnerEntry { target: parent_addr, key: String::from(DIR_HAS_KEY), value: EntryValue::Address(new_directory_address.clone()), }; let _ = insert_entry(connection, has_entry)?; } Ok(new_directory_address) } else { Err(anyhow!("Directory does not exist.")) } } 1 => Ok(valid_directories[0].clone()), _ => Err(anyhow!( "Invalid database state - more than one directory matches the query!" )), } } pub fn resolve_path>( connection: &C, path: &UPath, create: bool, ) -> Result> { let mut result: Vec
= vec![]; let mut path_stack = path.0.to_vec(); path_stack.reverse(); while !path_stack.is_empty() { let dir_address = fetch_or_create_dir( connection, result.last().cloned(), path_stack.pop().unwrap(), create, )?; result.push(dir_address); } Ok(result) } pub async fn reimport_directory(pool: DbPool, directory: PathBuf) { let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await; if result.is_err() { let err = result.err().unwrap(); error!("Update did not succeed! {}", err); } } type UpdatePathResult = Result; enum UpdatePathOutcome { Added(PathBuf), Unchanged(PathBuf), Removed(PathBuf), } fn _reimport_directory>( pool: DbPool, directory: T, ) -> Result> { let start = Instant::now(); let path_entries: Vec = WalkDir::new(&directory) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.path().is_file() && e.file_name() != DATABASE_FILENAME) .map(|e| fs::canonicalize(e.into_path()).unwrap()) .collect(); let rw_pool = Arc::new(RwLock::new(pool.clone())); let absolute_path = fs::canonicalize(&directory)?; let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?)); let path_results: Vec = path_entries .into_par_iter() .map(|path| { Ok(_process_directory_entry( &rw_pool, path, &absolute_path, &existing_files, )?) }) .collect(); let cleanup_results: Vec = existing_files .write() .unwrap() .iter() .filter(|f| f.valid) .map(|file| { let connection = pool.get()?; connection.transaction::<_, Error, _>(|| { let _ = file_set_valid(&connection, file.id, false)?; // remove_object(&connection, )? Ok(UpdatePathOutcome::Removed(PathBuf::from(file.path.clone()))) }) }) .collect(); info!( "Finished updating {}. Took {}s.", directory.as_ref().display(), start.elapsed().as_secs() ); Ok(path_results .into_iter() .chain(cleanup_results.into_iter()) .collect()) } fn _process_directory_entry>( db_pool: &Arc>, path: PathBuf, directory_path: &P, existing_files: &Arc>>, ) -> UpdatePathResult { info!("Processing: {:?}", path); let db_pool = Arc::clone(&db_pool); let existing_files = Arc::clone(&existing_files); let normalized_path = path.strip_prefix(&directory_path)?; let normalized_path_str = normalized_path.to_str().expect("path not valid unicode?!"); let metadata = fs::metadata(&path)?; let size = metadata.len() as i64; if size < 0 { panic!("File {} too large?!", path.display()); } let mtime = metadata .modified() .map(|t| { NaiveDateTime::from_timestamp(t.duration_since(UNIX_EPOCH).unwrap().as_secs() as i64, 0) }) .ok(); { let mut existing_files = existing_files.write().unwrap(); let maybe_existing_file = existing_files .iter() .enumerate() .find(|(_, file)| file.path == normalized_path_str); if let Some((idx, existing_file)) = maybe_existing_file { if size == existing_file.size && mtime == existing_file.mtime { if !existing_file.valid { let _ = file_set_valid(&db_pool.write().unwrap().get()?, existing_file.id, true)?; } existing_files.remove(idx); return Ok(UpdatePathOutcome::Unchanged(path)); } } } let digest = path.hash()?; let new_file = models::NewFile { path: normalized_path_str.to_string(), hash: (digest.clone()).0, added: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), size, mtime, }; let _ = insert_file(&db_pool.write().unwrap().get()?, new_file)?; let components = normalized_path.components().collect::>(); let (filename, dir_path) = components.split_last().unwrap(); let upath = UPath( iter::once(UDirectory { name: "NATIVE".to_string(), }) .chain(dir_path.iter().map(|component| UDirectory { name: component.as_os_str().to_string_lossy().to_string(), })) .collect(), ); let resolved_path = resolve_path(&db_pool.write().unwrap().get()?, &upath, true)?; let parent_dir = resolved_path.last().unwrap(); let _pool = &db_pool.write().unwrap(); let connection = _pool.get()?; connection.transaction::<_, Error, _>(|| { let file_address = Address::UUID(Uuid::new_v4()); let name_entry = InnerEntry { target: file_address.clone(), key: FILENAME_KEY.to_string(), value: EntryValue::Value(Value::String( filename.as_os_str().to_string_lossy().to_string(), )), }; let _ = insert_entry(&connection, name_entry)?; let identity_entry = InnerEntry { target: file_address.clone(), key: FILE_IDENTITY_KEY.to_string(), value: EntryValue::Address(Address::Hash(digest.clone())), }; let _ = insert_entry(&connection, identity_entry)?; let dir_has_entry = InnerEntry { target: parent_dir.clone(), key: DIR_HAS_KEY.to_string(), value: EntryValue::Address(file_address), }; let _ = insert_entry(&connection, dir_has_entry)?; Ok(UpdatePathOutcome::Added(path.clone())) }) } pub fn lookup_by_filename>( connection: &C, filename: String, ) -> Result> { let dir_value = EntryValue::Value(Value::String(filename)); let entity_addresses = query_entries( connection, EntryQuery { target: QueryComponent::Any, key: QueryComponent::In(vec![DIR_KEY.to_string(), FILENAME_KEY.to_string()]), // ? value: QueryComponent::ILike(dir_value), }, )?; bulk_retrieve_objects( connection, entity_addresses.into_iter().map(|e| e.target).collect(), ) } #[cfg(test)] mod tests { use crate::filesystem::{UDirectory, UPath}; use anyhow::Result; #[test] fn test_path_codec() { let path = UPath(vec![ UDirectory { name: "top".to_string(), }, UDirectory { name: "foo".to_string(), }, UDirectory { name: "bar".to_string(), }, UDirectory { name: "baz".to_string(), }, ]); let str_path = path.to_string(); assert!(str_path.len() > 0); let decoded_path: Result = str_path.parse(); assert!(decoded_path.is_ok()); assert_eq!(path, decoded_path.unwrap()); } #[test] fn test_validation() { let invalid_path: Result = "a//b/c//d/e/f///g".parse(); assert!(invalid_path.is_err()) } }