use crate::addressing::Address; use crate::database::{ DbExecutor, Entry, EntryValue, InnerEntry, InsertEntry, QueryEntries, RetrieveByHash, RetrieveObject, }; use crate::hash::{ComputeHash, HasherWorker}; use crate::models; use anyhow::{anyhow, Result}; use log::{info, trace, warn}; use serde::export::Formatter; use serde_json::Value; use std::path::{Component, Path, PathBuf}; use std::{fs, iter}; use walkdir::WalkDir; use actix::prelude::*; use chrono::prelude::*; use uuid::Uuid; const DIR_KEY: &str = "DIR"; const DIR_HAS_KEY: &str = "DIR_HAS"; const FILE_IDENTITY_KEY: &str = "FILE_IS"; const FILENAME_KEY: &str = "FILE_NAME"; #[derive(Debug, Clone, PartialEq)] pub struct UDirectory { name: String, } #[derive(Debug, Clone, PartialEq)] pub struct UPath(Vec); const TOP_SEPARATOR: &str = "//"; impl std::str::FromStr for UPath { type Err = anyhow::Error; fn from_str(string: &str) -> Result { if string.len() == 0 { Ok(UPath(vec![])) } else { let result = match string.find(TOP_SEPARATOR) { Some(head_idx) => { let (head, rest) = string.split_at(head_idx); let mut result: Vec = Vec::new(); result.push(UDirectory { name: String::from(head), }); result.append( rest[TOP_SEPARATOR.len()..rest.len()] .trim_end_matches('/') .split("/") .map(|part| UDirectory { name: String::from(part), }) .collect::>() .as_mut(), ); result } None => string .trim_end_matches('/') .split("/") .map(|part| UDirectory { name: String::from(part), }) .collect(), }; for directory in &result { if directory.name.len() == 0 { return Err(anyhow!("INVALID PATH: Directory name cannot be empty!")); } } Ok(UPath(result)) } } } impl std::fmt::Display for UDirectory { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name) } } impl std::fmt::Display for UPath { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.0.len() { 0 => write!(f, ""), 1 => write!(f, "{}", self.0.first().unwrap().name), _ => { let (head, tail) = self.0.split_first().unwrap(); write!( f, "{}//{}", head.name, tail.iter() .map(|udir| udir.name.clone()) .collect::>() .join("/") ) } } } } trait EntryList { fn extract_addresses(&self) -> Vec
; } impl EntryList for Vec { fn extract_addresses(&self) -> Vec
{ self.into_iter() .filter_map(|e| { if let EntryValue::Address(address) = &e.value { Some(address.clone()) } else { None } }) .collect() } } pub async fn list_roots(db_executor: &Addr) -> Result> { let all_directories: Vec = db_executor .send(QueryEntries { target: None, key: Some(DIR_KEY.to_string()), value: None, }) .await??; let directories_with_parents: Vec
= db_executor .send(QueryEntries { target: None, key: Some(DIR_HAS_KEY.to_string()), value: None, }) .await?? .extract_addresses(); Ok(all_directories .into_iter() .filter(|entry| !directories_with_parents.contains(&entry.target)) .collect()) } pub async fn list_directory(db_executor: &Addr, path: &UPath) -> Result> { let entry_addresses = match path.0.len() { 0 => list_roots(db_executor) .await? .into_iter() .map(|e| e.target) .collect(), _ => { let resolved_path: Vec
= resolve_path(db_executor, path, false).await?; let last = resolved_path.last().unwrap(); db_executor .send(QueryEntries { target: Some(last.clone()), key: Some(DIR_HAS_KEY.to_string()), value: None, }) .await?? .extract_addresses() } }; let mut result: Vec = vec![]; for address in entry_addresses { result.extend( db_executor .send(RetrieveObject { target: address }) .await?? .into_iter() .filter(|e| [DIR_KEY, FILENAME_KEY, FILE_IDENTITY_KEY].contains(&e.key.as_str())) .collect::>(), ); } Ok(result) } pub async fn fetch_or_create_dir( db_executor: &Addr, parent: Option
, directory: UDirectory, create: bool, ) -> Result
{ match parent.clone() { Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory), None => trace!("FETCHING/CREATING /{:#}", directory), } let dir_value = EntryValue::Value(Value::String(directory.name)); let directories: Vec
= db_executor .send(QueryEntries { target: None, key: Some(String::from(DIR_KEY)), value: Some(dir_value.clone()), }) .await?? .into_iter() .map(|e: Entry| e.target) .collect(); let valid_directories: Vec
= match parent.clone() { Some(address) => { let parent_has: Vec
= db_executor .send(QueryEntries { target: Some(address), key: Some(String::from(DIR_HAS_KEY)), value: None, }) .await?? .extract_addresses(); let valid = directories .into_iter() .filter(|a| parent_has.contains(a)) .collect(); valid } None => directories, }; match valid_directories.len() { 0 => { if create { let new_directory_address = Address::UUID(Uuid::new_v4()); let directory_entry = InnerEntry { target: new_directory_address.clone(), key: String::from(DIR_KEY), value: dir_value, }; let _ = db_executor .send(InsertEntry { entry: directory_entry, }) .await??; if parent.is_some() { let has_entry = InnerEntry { target: parent.unwrap(), key: String::from(DIR_HAS_KEY), value: EntryValue::Address(new_directory_address.clone()), }; let _ = db_executor.send(InsertEntry { entry: has_entry }).await??; } Ok(new_directory_address) } else { Err(anyhow!("Directory does not exist.")) } } 1 => Ok(valid_directories[0].clone()), _ => Err(anyhow!( "Invalid database state - more than one directory matches the query!" )), } } pub async fn resolve_path( db_executor: &Addr, path: &UPath, create: bool, ) -> Result> { let mut result: Vec
= vec![]; let mut path_stack = path.0.to_vec(); path_stack.reverse(); while path_stack.len() > 0 { let dir_address = fetch_or_create_dir( db_executor, result.last().cloned(), path_stack.pop().unwrap(), create, ) .await?; result.push(dir_address); } Ok(result) } async fn _reimport_directory>( directory: T, db_executor: &Addr, hasher_worker: &Addr, ) -> Result<()> { for path in WalkDir::new(&directory) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.path().is_file()) .map(|e| e.into_path()) { info!("Processing: {:?}", path); let metadata = fs::metadata(&path)?; let size = metadata.len() as i64; if size < 0 { panic!("File {} too large?!", path.display()); } let digest = hasher_worker .send(ComputeHash { path: path.to_path_buf(), }) .await??; let existing_file: Option = db_executor .send(RetrieveByHash { hash: digest.clone(), }) .await??; if existing_file.is_none() { let new_file = models::NewFile { path: path.to_str().expect("path not valid unicode?!").to_string(), hash: (digest.clone()).0, created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), size, }; db_executor .send(crate::database::InsertFile { file: new_file }) .await??; } let components = path.components().collect::>(); let (filename, dir_path) = components.split_last().unwrap(); let file_address = Address::UUID(Uuid::new_v4()); let name_entry = InnerEntry { target: file_address.clone(), key: FILENAME_KEY.to_string(), value: EntryValue::Value(Value::String( filename.as_os_str().to_string_lossy().to_string(), )), }; db_executor .send(crate::database::InsertEntry { entry: name_entry }) .await??; let identity_entry = InnerEntry { target: file_address.clone(), key: FILE_IDENTITY_KEY.to_string(), value: EntryValue::Address(Address::Hash(digest.clone())), }; db_executor .send(crate::database::InsertEntry { entry: identity_entry, }) .await??; let upath = UPath( iter::once(UDirectory { name: "NATIVE".to_string(), }) .chain(dir_path.iter().map(|component| UDirectory { name: component.as_os_str().to_string_lossy().to_string(), })) .collect(), ); let resolved_path = resolve_path(db_executor, &upath, true).await?; let parent_dir = resolved_path.last().unwrap(); let dir_has_entry = InnerEntry { target: parent_dir.clone(), key: DIR_HAS_KEY.to_string(), value: EntryValue::Address(file_address), }; db_executor .send(crate::database::InsertEntry { entry: dir_has_entry, }) .await??; } info!("Finished updating {}.", directory.as_ref().display()); Ok(()) } pub async fn reimport_directory( directory: PathBuf, db_executor: Addr, hasher_worker: Addr, ) { let result = _reimport_directory(directory, &db_executor, &hasher_worker).await; if result.is_err() { warn!("Update did not succeed! {}", result.err().unwrap()); } } #[cfg(test)] mod tests { use crate::filesystem::{UDirectory, UPath}; use anyhow::Result; #[test] fn test_path_codec() { let path = UPath(vec![ UDirectory { name: "top".to_string(), }, UDirectory { name: "foo".to_string(), }, UDirectory { name: "bar".to_string(), }, UDirectory { name: "baz".to_string(), }, ]); let str_path = path.to_string(); assert!(str_path.len() > 0); let decoded_path: Result = str_path.parse(); assert!(decoded_path.is_ok()); assert_eq!(path, decoded_path.unwrap()); } #[test] fn test_validation() { let invalid_path: Result = "a//b/c//d/e/f///g".parse(); assert!(invalid_path.is_err()) } }