use crate::addressing::Address; use crate::database::{ insert_entry, insert_file, query_entries, retrieve_object, Entry, EntryQuery, EntryValue, InnerEntry, }; use crate::hash::{ComputeHash, HasherWorker}; use crate::models; use anyhow::{anyhow, Result}; use log::{error, info, trace}; use serde::export::Formatter; use serde_json::Value; use std::path::{Component, Path, PathBuf}; use std::{fs, iter}; use walkdir::WalkDir; use actix::prelude::*; use chrono::prelude::*; use diesel::sqlite::Sqlite; use diesel::Connection; use uuid::Uuid; const DIR_KEY: &str = "DIR"; const DIR_HAS_KEY: &str = "DIR_HAS"; const FILE_IDENTITY_KEY: &str = "FILE_IS"; const FILENAME_KEY: &str = "FILE_NAME"; #[derive(Debug, Clone, PartialEq)] pub struct UDirectory { name: String, } #[derive(Debug, Clone, PartialEq)] pub struct UPath(Vec); const TOP_SEPARATOR: &str = "//"; impl std::str::FromStr for UPath { type Err = anyhow::Error; fn from_str(string: &str) -> Result { if string.is_empty() { Ok(UPath(vec![])) } else { let result = match string.find(TOP_SEPARATOR) { Some(head_idx) => { let (head, rest) = string.split_at(head_idx); let mut result: Vec = Vec::new(); result.push(UDirectory { name: String::from(head), }); result.append( rest[TOP_SEPARATOR.len()..rest.len()] .trim_end_matches('/') .split('/') .map(|part| UDirectory { name: String::from(part), }) .collect::>() .as_mut(), ); result } None => string .trim_end_matches('/') .split('/') .map(|part| UDirectory { name: String::from(part), }) .collect(), }; for directory in &result { if directory.name.is_empty() { return Err(anyhow!("INVALID PATH: Directory name cannot be empty!")); } } Ok(UPath(result)) } } } impl std::fmt::Display for UDirectory { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.name) } } impl std::fmt::Display for UPath { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self.0.len() { 0 => write!(f, ""), 1 => write!(f, "{}", self.0.first().unwrap().name), _ => { let (head, tail) = self.0.split_first().unwrap(); write!( f, "{}//{}", head.name, tail.iter() .map(|udir| udir.name.clone()) .collect::>() .join("/") ) } } } } trait EntryList { fn extract_addresses(&self) -> Vec
; } impl EntryList for Vec { fn extract_addresses(&self) -> Vec
{ self.iter() .filter_map(|e| { if let EntryValue::Address(address) = &e.value { Some(address.clone()) } else { None } }) .collect() } } pub async fn list_roots>(connection: &C) -> Result> { let all_directories: Vec = query_entries( connection, EntryQuery { target: None, key: Some(DIR_KEY.to_string()), value: None, }, )?; let directories_with_parents: Vec
= query_entries( connection, EntryQuery { target: None, key: Some(DIR_HAS_KEY.to_string()), value: None, }, )? .extract_addresses(); Ok(all_directories .into_iter() .filter(|entry| !directories_with_parents.contains(&entry.target)) .collect()) } pub async fn list_directory>( connection: &C, path: &UPath, ) -> Result> { let entry_addresses = match path.0.len() { 0 => list_roots(connection) .await? .into_iter() .map(|e| e.target) .collect(), _ => { let resolved_path: Vec
= resolve_path(connection, path, false).await?; let last = resolved_path.last().unwrap(); query_entries( connection, EntryQuery { target: Some(last.clone()), key: Some(DIR_HAS_KEY.to_string()), value: None, }, )? .extract_addresses() } }; let mut result: Vec = vec![]; for address in entry_addresses { result.extend( retrieve_object(connection, address)? .into_iter() .filter(|e| [DIR_KEY, FILENAME_KEY, FILE_IDENTITY_KEY].contains(&e.key.as_str())) .collect::>(), ); } Ok(result) } pub async fn fetch_or_create_dir>( connection: &C, parent: Option
, directory: UDirectory, create: bool, ) -> Result
{ match parent.clone() { Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory), None => trace!("FETCHING/CREATING /{:#}", directory), } let dir_value = EntryValue::Value(Value::String(directory.name)); let directories: Vec
= query_entries( connection, EntryQuery { target: None, key: Some(String::from(DIR_KEY)), value: Some(dir_value.clone()), }, )? .into_iter() .map(|e: Entry| e.target) .collect(); let valid_directories: Vec
= match parent.clone() { Some(address) => { let parent_has: Vec
= query_entries( connection, EntryQuery { target: Some(address), key: Some(String::from(DIR_HAS_KEY)), value: None, }, )? .extract_addresses(); directories .into_iter() .filter(|a| parent_has.contains(a)) .collect() } None => directories, }; match valid_directories.len() { 0 => { if create { let new_directory_address = Address::UUID(Uuid::new_v4()); let directory_entry = InnerEntry { target: new_directory_address.clone(), key: String::from(DIR_KEY), value: dir_value, }; let _ = insert_entry(connection, directory_entry)?; if parent.is_some() { let has_entry = InnerEntry { target: parent.unwrap(), key: String::from(DIR_HAS_KEY), value: EntryValue::Address(new_directory_address.clone()), }; let _ = insert_entry(connection, has_entry)?; } Ok(new_directory_address) } else { Err(anyhow!("Directory does not exist.")) } } 1 => Ok(valid_directories[0].clone()), _ => Err(anyhow!( "Invalid database state - more than one directory matches the query!" )), } } pub async fn resolve_path>( connection: &C, path: &UPath, create: bool, ) -> Result> { let mut result: Vec
= vec![]; let mut path_stack = path.0.to_vec(); path_stack.reverse(); while !path_stack.is_empty() { let dir_address = fetch_or_create_dir( connection, result.last().cloned(), path_stack.pop().unwrap(), create, ) .await?; result.push(dir_address); } Ok(result) } pub async fn reimport_directory>( connection: C, directory: PathBuf, hasher_worker: Addr, ) { let result = _reimport_directory(&connection, directory, &hasher_worker).await; if result.is_err() { error!("Update did not succeed! {}", result.err().unwrap()); } } async fn _reimport_directory, T: AsRef>( connection: &C, directory: T, hasher_worker: &Addr, ) -> Result<()> { let path_entries: Result, std::io::Error> = WalkDir::new(&directory) .into_iter() .filter_map(|e| e.ok()) .filter(|e| e.path().is_file()) .map(|e| fs::canonicalize(e.into_path())) .collect(); let absolute_path = fs::canonicalize(&directory)?; for path in path_entries? { _process_directory_entry(connection, path, &absolute_path, &hasher_worker).await?; } info!("Finished updating {}.", directory.as_ref().display()); Ok(()) } async fn _process_directory_entry, P: AsRef>( connection: &C, path: PathBuf, directory_path: &P, hasher_worker: &Addr, ) -> Result<()> { info!("Processing: {:?}", path); let metadata = fs::metadata(&path)?; let size = metadata.len() as i64; if size < 0 { panic!("File {} too large?!", path.display()); } let digest = hasher_worker .send(ComputeHash { path: path.clone() }) .await??; // let existing_file: Option = db_executor // .send(RetrieveByHash { // hash: digest.clone(), // }) // .await??; let new_file = models::NewFile { path: path .strip_prefix(&directory_path)? .to_str() .expect("path not valid unicode?!") .to_string(), hash: (digest.clone()).0, created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0), size, }; let _ = insert_file(connection, new_file)?; let components = path .strip_prefix(&directory_path)? .components() .collect::>(); let (filename, dir_path) = components.split_last().unwrap(); let file_address = Address::UUID(Uuid::new_v4()); let name_entry = InnerEntry { target: file_address.clone(), key: FILENAME_KEY.to_string(), value: EntryValue::Value(Value::String( filename.as_os_str().to_string_lossy().to_string(), )), }; let _ = insert_entry(connection, name_entry)?; let identity_entry = InnerEntry { target: file_address.clone(), key: FILE_IDENTITY_KEY.to_string(), value: EntryValue::Address(Address::Hash(digest.clone())), }; let _ = insert_entry(connection, identity_entry)?; let upath = UPath( iter::once(UDirectory { name: "NATIVE".to_string(), }) .chain(dir_path.iter().map(|component| UDirectory { name: component.as_os_str().to_string_lossy().to_string(), })) .collect(), ); let resolved_path = resolve_path(connection, &upath, true).await?; let parent_dir = resolved_path.last().unwrap(); let dir_has_entry = InnerEntry { target: parent_dir.clone(), key: DIR_HAS_KEY.to_string(), value: EntryValue::Address(file_address), }; let _ = insert_entry(connection, dir_has_entry)?; Ok(()) } #[cfg(test)] mod tests { use crate::filesystem::{UDirectory, UPath}; use anyhow::Result; #[test] fn test_path_codec() { let path = UPath(vec![ UDirectory { name: "top".to_string(), }, UDirectory { name: "foo".to_string(), }, UDirectory { name: "bar".to_string(), }, UDirectory { name: "baz".to_string(), }, ]); let str_path = path.to_string(); assert!(str_path.len() > 0); let decoded_path: Result = str_path.parse(); assert!(decoded_path.is_ok()); assert_eq!(path, decoded_path.unwrap()); } #[test] fn test_validation() { let invalid_path: Result = "a//b/c//d/e/f///g".parse(); assert!(invalid_path.is_err()) } }