first somewhat working version of import!

feat/vaults
Tomáš Mládek 2020-09-12 22:50:14 +02:00
parent fc7635bf70
commit 762a7914a1
2 changed files with 117 additions and 35 deletions

View File

@ -7,8 +7,10 @@ use unsigned_varint::encode;
use uuid::Uuid;
use crate::hash::{encode, Hash};
use serde::export::Formatter;
use thiserror::private::DisplayAsDisplay;
#[derive(Debug, Clone, PartialEq)]
#[derive(Clone, PartialEq)]
pub enum Address {
Hash(Hash),
UUID(Uuid),
@ -61,6 +63,12 @@ impl std::fmt::Display for Address {
}
}
impl std::fmt::Debug for Address {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_display())
}
}
#[cfg(test)]
mod tests {
use uuid::Uuid;

View File

@ -1,13 +1,15 @@
use crate::addressing::Address;
use crate::database::{Entry, EntryValue, InnerEntry, InsertEntry, QueryEntries};
use crate::hash::{ComputeHash, Hash, HasherWorker};
use crate::database::{
DbExecutor, Entry, EntryValue, InnerEntry, InsertEntry, QueryEntries, RetrieveByHash,
};
use crate::hash::{ComputeHash, HasherWorker};
use crate::models;
use anyhow::{anyhow, Result};
use log::{info, warn};
use log::{info, trace, warn};
use serde::export::Formatter;
use serde_json::Value;
use std::fs;
use std::path::{Path, PathBuf};
use std::path::{Component, Path, PathBuf};
use std::{fs, iter};
use walkdir::WalkDir;
use actix::prelude::*;
@ -16,13 +18,14 @@ use uuid::Uuid;
const DIR_KEY: &str = "DIR";
const DIR_HAS_KEY: &str = "DIR_HAS";
const FILENAME_KEY: &str = "NAME";
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct UDirectory {
name: String,
}
#[derive(Debug, PartialEq)]
#[derive(Debug, Clone, PartialEq)]
pub struct UPath(Vec<UDirectory>);
const TOP_SEPARATOR: &str = "//";
@ -96,6 +99,11 @@ pub async fn fetch_or_create_dir(
parent: Option<Address>,
directory: UDirectory,
) -> Result<Address> {
match parent.clone() {
Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory),
None => trace!("FETCHING/CREATING /{:#}", directory),
}
let dir_value = EntryValue::Value(Value::String(directory.name));
let directories: Vec<Address> = db_executor
.send(QueryEntries {
@ -118,13 +126,21 @@ pub async fn fetch_or_create_dir(
})
.await??
.into_iter()
.map(|e: Entry| e.target)
.filter_map(|e: Entry| {
if let EntryValue::Address(address) = e.value {
Some(address)
} else {
None
}
})
.collect();
directories
let valid = directories
.into_iter()
.filter(|a| parent_has.contains(a))
.collect()
.collect();
valid
}
None => directories,
};
@ -161,8 +177,25 @@ pub async fn fetch_or_create_dir(
}
}
pub fn fetch_path(path: &UPath) -> Vec<Entry> {
unimplemented!();
pub async fn resolve_path_with_parents(
db_executor: &Addr<DbExecutor>,
path: &UPath,
) -> Result<Vec<Address>> {
let mut result: Vec<Address> = vec![];
let mut path_stack = path.0.to_vec();
path_stack.reverse();
while path_stack.len() > 0 {
let dir_address = fetch_or_create_dir(
db_executor,
result.last().cloned(),
path_stack.pop().unwrap(),
)
.await?;
result.push(dir_address);
}
Ok(result)
}
async fn _reimport_directory<T: AsRef<Path>>(
@ -170,39 +203,80 @@ async fn _reimport_directory<T: AsRef<Path>>(
db_executor: &Addr<crate::database::DbExecutor>,
hasher_worker: &Addr<HasherWorker>,
) -> Result<()> {
for entry in WalkDir::new(&directory)
for path in WalkDir::new(&directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file())
.map(|e| e.into_path())
{
info!("Processing: {}", entry.path().display());
info!("Processing: {:?}", path);
let metadata = fs::metadata(entry.path())?;
let metadata = fs::metadata(&path)?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", entry.path().display());
panic!("File {} too large?!", path.display());
}
let msg = ComputeHash {
path: entry.path().to_path_buf(),
let digest = hasher_worker
.send(ComputeHash {
path: path.to_path_buf(),
})
.await??;
let existing_file: Option<String> = db_executor
.send(RetrieveByHash {
hash: digest.clone(),
})
.await??;
if existing_file.is_none() {
let new_file = models::NewFile {
path: path.to_str().expect("path not valid unicode?!").to_string(),
hash: (digest.clone()).0,
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
size,
};
db_executor
.send(crate::database::InsertFile { file: new_file })
.await??;
}
let components = path.components().collect::<Vec<Component>>();
let (filename, dir_path) = components.split_last().unwrap();
let name_entry = InnerEntry {
target: Address::Hash(digest.clone()),
key: FILENAME_KEY.to_string(),
value: EntryValue::Value(Value::String(
filename.as_os_str().to_string_lossy().to_string(),
)),
};
db_executor
.send(crate::database::InsertEntry { entry: name_entry })
.await??;
let digest: Result<Result<Hash>, MailboxError> = hasher_worker.send(msg).await;
let new_file = models::NewFile {
path: entry
.path()
.to_str()
.expect("path not valid unicode?!")
.to_string(),
hash: digest??.0,
size,
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
let upath = UPath(
iter::once(UDirectory {
name: "NATIVE".to_string(),
})
.chain(dir_path.iter().map(|component| UDirectory {
name: component.as_os_str().to_string_lossy().to_string(),
}))
.collect(),
);
let resolved_path = resolve_path_with_parents(db_executor, &upath).await?;
let parent_dir = resolved_path.last().unwrap();
let dir_has_entry = InnerEntry {
target: parent_dir.clone(),
key: DIR_HAS_KEY.to_string(),
value: EntryValue::Address(Address::Hash(digest)),
};
let _insert_result = db_executor
.send(crate::database::InsertFile { file: new_file })
.await?;
db_executor
.send(crate::database::InsertEntry {
entry: dir_has_entry,
})
.await??;
}
info!("Finished updating {}.", directory.as_ref().display());
@ -216,7 +290,7 @@ pub async fn reimport_directory(
) {
let result = _reimport_directory(directory, &db_executor, &hasher_worker).await;
if result.is_err() {
warn!("Update did not succeed!");
warn!("Update did not succeed! {}", result.err().unwrap());
}
}