use crate::addressing::Address; use crate::database::{ DbExecutor, Entry, EntryValue, InnerEntry, InsertEntry, QueryEntries, RetrieveByHash, }; use crate::hash::{ComputeHash, HasherWorker}; use crate::models; use anyhow::{anyhow, Result}; use log::{info, trace, warn}; use serde::export::Formatter; use serde_json::Value; use std::path::{Component, Path, PathBuf}; use std::{fs, iter}; use walkdir::WalkDir; use actix::prelude::*; use chrono::prelude::*; use uuid::Uuid; const DIR_KEY: &str = "DIR"; const DIR_HAS_KEY: &str = "DIR_HAS"; const FILE_IDENTITY_KEY: &str = "FILE_IS"; const FILENAME_KEY: &str = "FILE_NAME"; #[derive(Debug, Clone, PartialEq)] pub struct UDirectory { name: String, } #[derive(Debug, Clone, PartialEq)] pub struct UPath(Vec); const TOP_SEPARATOR: &str = "//"; impl std::str::FromStr for UPath { type Err = anyhow::Error; fn from_str(s: &str) -> Result { if s.len() == 0 { Ok(UPath(vec![])) } else { match s.find(TOP_SEPARATOR) { Some(head_idx) => { let (head, rest) = s.split_at(head_idx); let mut result: Vec = Vec::new(); result.push(UDirectory { name: String::from(head), }); result.append( rest[TOP_SEPARATOR.len()..rest.len()] .split("/") .map(|part| UDirectory { name: String::from(part), }) .collect::>() .as_mut(), ); Ok(UPath(result)) } None => Ok(UPath( s.split("/") .map(|part| UDirectory { name: String::from(part), }) .collect(), )), } } } } impl std::fmt::Display for UDirectory { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name) } } impl std::fmt::Display for UPath { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.0.len() { 0 => write!(f, ""), 1 => write!(f, "{}", self.0.first().unwrap().name), _ => { let (head, tail) = self.0.split_first().unwrap(); write!( f, "{}//{}", head.name, tail.iter() .map(|udir| udir.name.clone()) .collect::>() .join("/") ) } } } } pub async fn fetch_or_create_dir( db_executor: &Addr, parent: Option
, directory: UDirectory, ) -> Result
{ match parent.clone() { Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory), None => trace!("FETCHING/CREATING /{:#}", directory), } let dir_value = EntryValue::Value(Value::String(directory.name)); let directories: Vec
= db_executor .send(QueryEntries { target: None, key: Some(String::from(DIR_KEY)), value: Some(dir_value.clone()), }) .await?? .into_iter() .map(|e: Entry| e.target) .collect(); let valid_directories: Vec
= match parent.clone() { Some(address) => { let parent_has: Vec
= db_executor .send(QueryEntries { target: Some(address), key: Some(String::from(DIR_HAS_KEY)), value: None, }) .await?? .into_iter() .filter_map(|e: Entry| { if let EntryValue::Address(address) = e.value { Some(address) } else { None } }) .collect(); let valid = directories .into_iter() .filter(|a| parent_has.contains(a)) .collect(); valid } None => directories, }; match valid_directories.len() { 0 => { let new_directory_address = Address::UUID(Uuid::new_v4()); let directory_entry = InnerEntry { target: new_directory_address.clone(), key: String::from(DIR_KEY), value: dir_value, }; let _ = db_executor .send(InsertEntry { entry: directory_entry, }) .await??; if parent.is_some() { let has_entry = InnerEntry { target: parent.unwrap(), key: String::from(DIR_HAS_KEY), value: EntryValue::Address(new_directory_address.clone()), }; let _ = db_executor.send(InsertEntry { entry: has_entry }).await??; } Ok(new_directory_address) } 1 => Ok(valid_directories[0].clone()), _ => Err(anyhow!( "Invalid database state - more than one directory matches the query!" )), } } pub async fn resolve_path_with_parents( db_executor: &Addr, path: &UPath, ) -> Result> { let mut result: Vec
= vec![]; let mut path_stack = path.0.to_vec(); path_stack.reverse(); while path_stack.len() > 0 { let dir_address = fetch_or_create_dir( db_executor, result.last().cloned(), path_stack.pop().unwrap(), ) .await?; result.push(dir_address); } Ok(result) } async fn _reimport_directory>( directory: T, db_executor: &Addr, hasher_worker: &Addr, ) -> Result<()> { for path in WalkDir::new(&directory) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.path().is_file()) .map(|e| e.into_path()) { info!("Processing: {:?}", path); let metadata = fs::metadata(&path)?; let size = metadata.len() as i64; if size < 0 { panic!("File {} too large?!", path.display()); } let digest = hasher_worker .send(ComputeHash { path: path.to_path_buf(), }) .await??; let existing_file: Option = db_executor .send(RetrieveByHash { hash: digest.clone(), }) .await??; if existing_file.is_none() { let new_file = models::NewFile { path: path.to_str().expect("path not valid unicode?!").to_string(), hash: (digest.clone()).0, created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), size, }; db_executor .send(crate::database::InsertFile { file: new_file }) .await??; } let components = path.components().collect::>(); let (filename, dir_path) = components.split_last().unwrap(); let file_address = Address::UUID(Uuid::new_v4()); let name_entry = InnerEntry { target: file_address.clone(), key: FILENAME_KEY.to_string(), value: EntryValue::Value(Value::String( filename.as_os_str().to_string_lossy().to_string(), )), }; db_executor .send(crate::database::InsertEntry { entry: name_entry }) .await??; let identity_entry = InnerEntry { target: file_address.clone(), key: FILE_IDENTITY_KEY.to_string(), value: EntryValue::Address(Address::Hash(digest.clone())), }; db_executor .send(crate::database::InsertEntry { entry: identity_entry, }) .await??; let upath = UPath( iter::once(UDirectory { name: "NATIVE".to_string(), }) .chain(dir_path.iter().map(|component| UDirectory { name: component.as_os_str().to_string_lossy().to_string(), })) .collect(), ); let resolved_path = resolve_path_with_parents(db_executor, &upath).await?; let parent_dir = resolved_path.last().unwrap(); let dir_has_entry = InnerEntry { target: parent_dir.clone(), key: DIR_HAS_KEY.to_string(), value: EntryValue::Address(file_address), }; db_executor .send(crate::database::InsertEntry { entry: dir_has_entry, }) .await??; } info!("Finished updating {}.", directory.as_ref().display()); Ok(()) } pub async fn reimport_directory( directory: PathBuf, db_executor: Addr, hasher_worker: Addr, ) { let result = _reimport_directory(directory, &db_executor, &hasher_worker).await; if result.is_err() { warn!("Update did not succeed! {}", result.err().unwrap()); } } #[cfg(test)] mod tests { use crate::filesystem::{UDirectory, UPath}; use anyhow::Result; #[test] fn test_path_codec() { let path = UPath(vec![ UDirectory { name: "top".to_string(), }, UDirectory { name: "foo".to_string(), }, UDirectory { name: "bar".to_string(), }, UDirectory { name: "baz".to_string(), }, ]); let str_path = path.to_string(); assert!(str_path.len() > 0); let decoded_path: Result = str_path.parse(); assert!(decoded_path.is_ok()); assert_eq!(path, decoded_path.unwrap()); } }