various fixes, fleshing out, directory retrieval / creation

feat/vaults
Tomáš Mládek 2020-09-12 14:27:45 +02:00
parent e49d4c9ff5
commit 1d952b34dd
11 changed files with 322 additions and 62 deletions

View File

@ -15,7 +15,7 @@ log = "0.4"
anyhow = "1.0"
thiserror = "1.0"
diesel = {version = "1.4", features = ["sqlite", "r2d2", "chrono", "serde_json"]}
diesel = { version = "1.4", features = ["sqlite", "r2d2", "chrono", "serde_json"] }
diesel_migrations = "1.4"
actix = "0.9.0"
@ -24,15 +24,15 @@ actix-rt = "1.0.0"
actix-web = "2.0"
actix_derive = "0.3.2"
chrono = {version = "0.4", features = ["serde"]}
serde = {version = "1.0", features = ["derive"]}
chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
bs58 = "0.3.1"
filebuffer = "0.4.0"
tiny-keccak = {version = "2.0", features = ["k12"]}
unsigned-varint = "0.5.0"
uuid = {version = "0.8", features = ["v4"]}
tiny-keccak = { version = "2.0", features = ["k12"] }
unsigned-varint = { version = "0.5.0", features = ["std"] }
uuid = { version = "0.8", features = ["v4"] }
walkdir = "2"
dotenv = "0.15.0"

View File

@ -1,2 +1,4 @@
-- This file should undo anything in `up.sql`
DROP TABLE files;
DROP TABLE meta;
DROP TABLE files;
DROP TABLE data;

View File

@ -8,7 +8,7 @@ INSERT INTO meta (key, value) VALUES ('version', '0.1.0');
CREATE TABLE files (
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
hash VARCHAR NOT NULL,
hash BLOB NOT NULL,
path VARCHAR NOT NULL,
size BIGINT NOT NULL,
created DATETIME NOT NULL,

View File

@ -8,7 +8,7 @@ use uuid::Uuid;
use crate::hash::{encode, Hash};
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub enum Address {
Hash(Hash),
UUID(Uuid),
@ -36,10 +36,8 @@ impl Address {
}
pub fn decode(buffer: &Vec<u8>) -> Result<Self> {
let (hash_func_type, rest) =
unsigned_varint::decode::u128(buffer).map_err(|_| anyhow!("varint decode error"))?;
let (digest_len, rest) =
unsigned_varint::decode::usize(rest).map_err(|_| anyhow!("varint decode error"))?;
let (hash_func_type, rest) = unsigned_varint::decode::u128(buffer)?;
let (digest_len, rest) = unsigned_varint::decode::usize(rest)?;
let digest = rest;
if digest_len != digest.len() {
Err(anyhow!(

View File

@ -1,3 +1,4 @@
use std::convert::TryFrom;
use std::io::{Cursor, Write};
use std::path::{Path, PathBuf};
use std::time::Duration;
@ -8,34 +9,47 @@ use anyhow::{anyhow, Result};
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager};
use diesel::sqlite::SqliteConnection;
use log::debug;
use log::{debug, trace};
use crate::addressing::Address;
use crate::hash::{decode, encode, hash, Hash, Hashable};
use crate::models;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Entry {
identity: Hash,
target: Address,
key: String,
value: EntryValue,
pub identity: Hash,
pub target: Address,
pub key: String,
pub value: EntryValue,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct InnerEntry {
target: Address,
key: String,
value: EntryValue,
pub target: Address,
pub key: String,
pub value: EntryValue,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum EntryValue {
Value(serde_json::Value),
Address(Address),
Invalid,
}
impl TryFrom<models::Entry> for Entry {
type Error = anyhow::Error;
fn try_from(e: models::Entry) -> Result<Self, Self::Error> {
Ok(Entry {
identity: Hash(e.identity),
target: Address::decode(&e.target)?,
key: e.key,
value: e.value.parse().unwrap(),
})
}
}
impl EntryValue {
fn to_str(&self) -> Result<String> {
let (type_char, content) = match self {
@ -96,7 +110,7 @@ pub struct InsertFile {
#[derive(Message)]
#[rtype(result = "Result<Option<String>>")]
pub struct RetrieveByHash {
pub hash: String,
pub hash: Hash,
}
#[derive(Message)]
@ -107,8 +121,26 @@ pub struct LookupByFilename {
#[derive(Message)]
#[rtype(result = "Result<Vec<Entry>>")]
pub struct RetrieveEntries {
pub hash: Vec<u8>,
pub struct RetrieveObject {
pub target: Address,
}
#[derive(Message)]
#[rtype(result = "Result<Vec<Entry>>")]
pub struct QueryEntries {
pub target: Option<Address>,
pub key: Option<String>,
pub value: Option<EntryValue>,
}
impl Default for QueryEntries {
fn default() -> Self {
QueryEntries {
target: None,
key: None,
value: None,
}
}
}
#[derive(Message)]
@ -125,7 +157,12 @@ impl Handler<InsertFile> for DbExecutor {
let connection = &self.0.get()?;
debug!("Inserting {:?}...", msg.file);
debug!(
"Inserting {} ({})...",
&msg.file.path,
encode(&msg.file.hash)
);
trace!("{:?}", msg.file);
Ok(diesel::insert_into(files::table)
.values(msg.file)
@ -142,8 +179,8 @@ impl Handler<RetrieveByHash> for DbExecutor {
let connection = &self.0.get()?;
let matches = files
.filter(hash.eq(msg.hash))
.filter(valid.eq(true))
.filter(hash.eq(msg.hash.0))
.load::<models::File>(connection)?;
Ok(matches.get(0).map(|f| f.path.clone()))
@ -167,27 +204,57 @@ impl Handler<LookupByFilename> for DbExecutor {
}
}
impl Handler<RetrieveEntries> for DbExecutor {
impl Handler<RetrieveObject> for DbExecutor {
type Result = Result<Vec<Entry>>;
fn handle(&mut self, msg: RetrieveEntries, _: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: RetrieveObject, _: &mut Self::Context) -> Self::Result {
use crate::schema::data::dsl::*;
let connection = &self.0.get()?;
let matches = data
.filter(target.eq(msg.hash))
.filter(target.eq(msg.target.encode()?))
.or_filter(
value.eq(EntryValue::Address(Address::Hash(Hash(msg.target.encode()?))).to_str()?),
)
.load::<models::Entry>(connection)?;
let entries = matches
.into_iter()
.map(|e| -> Result<Entry> {
Ok(Entry {
identity: Hash(e.identity),
target: Address::decode(&e.target)?,
key: e.key,
value: e.value.parse().unwrap(),
})
})
.map(Entry::try_from)
.filter_map(Result::ok)
.collect();
Ok(entries)
}
}
impl Handler<QueryEntries> for DbExecutor {
type Result = Result<Vec<Entry>>;
fn handle(&mut self, msg: QueryEntries, _: &mut Self::Context) -> Self::Result {
use crate::schema::data::dsl::*;
let connection = &self.0.get()?;
let mut query = data.into_boxed();
if let Some(q_target) = msg.target {
query = query.filter(target.eq(q_target.encode()?));
}
if let Some(q_key) = msg.key {
query = query.filter(key.eq(q_key));
}
if let Some(q_value) = msg.value {
query = query.filter(value.eq(q_value.to_str()?));
}
let matches = query.load::<models::Entry>(connection)?;
let entries = matches
.into_iter()
.map(Entry::try_from)
.filter_map(Result::ok)
.collect();

View File

@ -1,16 +1,171 @@
use crate::addressing::Address;
use crate::database::{Entry, EntryValue, InnerEntry, InsertEntry, QueryEntries};
use crate::hash::{ComputeHash, Hash, HasherWorker};
use crate::models;
use anyhow::{anyhow, Result};
use log::{info, warn};
use serde::export::Formatter;
use serde_json::Value;
use std::fs;
use std::path::{Path, PathBuf};
use actix::prelude::*;
use anyhow::Result;
use chrono::prelude::*;
use log::{info, warn};
use walkdir::WalkDir;
use crate::hash::{encode, ComputeHash, Hash, HasherWorker};
use crate::models::NewFile;
use actix::prelude::*;
use chrono::prelude::*;
use uuid::Uuid;
pub async fn update_directory<T: AsRef<Path>>(
const DIR_KEY: &str = "DIR";
const DIR_HAS_KEY: &str = "DIR_HAS";
#[derive(Debug, PartialEq)]
pub struct UDirectory {
name: String,
}
#[derive(Debug, PartialEq)]
pub struct UPath(Vec<UDirectory>);
const TOP_SEPARATOR: &str = "//";
impl std::str::FromStr for UPath {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() == 0 {
Ok(UPath(vec![]))
} else {
match s.find(TOP_SEPARATOR) {
Some(head_idx) => {
let (head, rest) = s.split_at(head_idx);
let mut result: Vec<UDirectory> = Vec::new();
result.push(UDirectory {
name: String::from(head),
});
result.append(
rest[TOP_SEPARATOR.len()..rest.len()]
.split("/")
.map(|part| UDirectory {
name: String::from(part),
})
.collect::<Vec<UDirectory>>()
.as_mut(),
);
Ok(UPath(result))
}
None => Ok(UPath(
s.split("/")
.map(|part| UDirectory {
name: String::from(part),
})
.collect(),
)),
}
}
}
}
impl std::fmt::Display for UDirectory {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name)
}
}
impl std::fmt::Display for UPath {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self.0.len() {
0 => write!(f, ""),
1 => write!(f, "{}", self.0.first().unwrap().name),
_ => {
let (head, tail) = self.0.split_first().unwrap();
write!(
f,
"{}//{}",
head.name,
tail.iter()
.map(|udir| udir.name.clone())
.collect::<Vec<String>>()
.join("/")
)
}
}
}
}
pub async fn fetch_or_create_dir(
db_executor: &Addr<crate::database::DbExecutor>,
parent: Option<Address>,
directory: UDirectory,
) -> Result<Address> {
let dir_value = EntryValue::Value(Value::String(directory.name));
let directories: Vec<Address> = db_executor
.send(QueryEntries {
target: None,
key: Some(String::from(DIR_KEY)),
value: Some(dir_value.clone()),
})
.await??
.into_iter()
.map(|e: Entry| e.target)
.collect();
let valid_directories: Vec<Address> = match parent.clone() {
Some(address) => {
let parent_has: Vec<Address> = db_executor
.send(QueryEntries {
target: Some(address),
key: Some(String::from(DIR_HAS_KEY)),
value: None,
})
.await??
.into_iter()
.map(|e: Entry| e.target)
.collect();
directories
.into_iter()
.filter(|a| parent_has.contains(a))
.collect()
}
None => directories,
};
match valid_directories.len() {
0 => {
let new_directory_address = Address::UUID(Uuid::new_v4());
let directory_entry = InnerEntry {
target: new_directory_address.clone(),
key: String::from(DIR_KEY),
value: dir_value,
};
let _ = db_executor
.send(InsertEntry {
entry: directory_entry,
})
.await??;
if parent.is_some() {
let has_entry = InnerEntry {
target: parent.unwrap(),
key: String::from(DIR_HAS_KEY),
value: EntryValue::Address(new_directory_address.clone()),
};
let _ = db_executor.send(InsertEntry { entry: has_entry }).await??;
}
Ok(new_directory_address)
}
1 => Ok(valid_directories[0].clone()),
_ => Err(anyhow!(
"Invalid database state - more than one directory matches the query!"
)),
}
}
pub fn fetch_path(path: &UPath) -> Vec<Entry> {
unimplemented!();
}
async fn _reimport_directory<T: AsRef<Path>>(
directory: T,
db_executor: &Addr<crate::database::DbExecutor>,
hasher_worker: &Addr<HasherWorker>,
@ -34,13 +189,13 @@ pub async fn update_directory<T: AsRef<Path>>(
let digest: Result<Result<Hash>, MailboxError> = hasher_worker.send(msg).await;
let new_file = NewFile {
let new_file = models::NewFile {
path: entry
.path()
.to_str()
.expect("path not valid unicode?!")
.to_string(),
hash: encode(digest??.0),
hash: digest??.0,
size,
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
};
@ -54,13 +209,45 @@ pub async fn update_directory<T: AsRef<Path>>(
Ok(())
}
pub async fn update_directory_bg(
pub async fn reimport_directory(
directory: PathBuf,
db_executor: Addr<crate::database::DbExecutor>,
hasher_worker: Addr<HasherWorker>,
) {
let result = update_directory(directory, &db_executor, &hasher_worker).await;
let result = _reimport_directory(directory, &db_executor, &hasher_worker).await;
if result.is_err() {
warn!("Update did not succeed!");
}
}
#[cfg(test)]
mod tests {
use crate::filesystem::{UDirectory, UPath};
use anyhow::Result;
#[test]
fn test_path_codec() {
let path = UPath(vec![
UDirectory {
name: "top".to_string(),
},
UDirectory {
name: "foo".to_string(),
},
UDirectory {
name: "bar".to_string(),
},
UDirectory {
name: "baz".to_string(),
},
]);
let str_path = path.to_string();
assert!(str_path.len() > 0);
let decoded_path: Result<UPath> = str_path.parse();
assert!(decoded_path.is_ok());
assert_eq!(path, decoded_path.unwrap());
}
}

View File

@ -6,7 +6,7 @@ use bs58;
use filebuffer::FileBuffer;
use tiny_keccak::{Hasher, KangarooTwelve};
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq)]
pub struct Hash(pub Vec<u8>);
// impl Hash {

View File

@ -91,7 +91,7 @@ fn main() -> std::io::Result<()> {
if open_result.new {
info!("The vault has been just created, running initial update...");
actix::spawn(filesystem::update_directory_bg(
actix::spawn(filesystem::reimport_directory(
vault_path, db_addr, hash_addr,
));
}

View File

@ -6,7 +6,7 @@ use super::schema::{data, files};
#[derive(Queryable, Serialize, Debug)]
pub struct File {
pub id: i32,
pub hash: String,
pub hash: Vec<u8>,
pub path: String,
pub size: i64,
pub created: NaiveDateTime,
@ -16,7 +16,7 @@ pub struct File {
#[derive(Insertable, Debug)]
#[table_name = "files"]
pub struct NewFile {
pub hash: String,
pub hash: Vec<u8>,
pub path: String,
pub size: i64,
pub created: NaiveDateTime,

View File

@ -3,8 +3,11 @@ use std::path::PathBuf;
use actix::prelude::*;
use actix_files::NamedFile;
use actix_web::{error, get, post, web, Error, HttpResponse};
use log::debug;
use serde::Deserialize;
use crate::hash::{decode, Hash};
#[derive(Clone)]
pub struct State {
pub directory: PathBuf,
@ -17,10 +20,12 @@ pub async fn get_raw(state: web::Data<State>, hash: web::Path<String>) -> Result
let response = state
.db
.send(crate::database::RetrieveByHash {
hash: hash.into_inner(),
hash: Hash(decode(hash.into_inner()).map_err(error::ErrorInternalServerError)?),
})
.await?;
debug!("{:?}", response);
match response {
Ok(result) => match result {
Some(path) => Ok(NamedFile::open(path)?),
@ -50,9 +55,10 @@ pub async fn get_lookup(
#[post("/api/refresh")]
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
crate::filesystem::update_directory(&state.directory, &state.db, &state.hasher)
.await
.map_err(|_| error::ErrorInternalServerError("UPDATE ERROR"))?;
actix::spawn(crate::filesystem::reimport_directory(
state.directory.clone(),
state.db.clone(),
state.hasher.clone(),
));
Ok(HttpResponse::Ok().finish())
}

View File

@ -10,7 +10,7 @@ table! {
table! {
files (id) {
id -> Integer,
hash -> Text,
hash -> Binary,
path -> Text,
size -> BigInt,
created -> Timestamp,