various fixes, fleshing out, directory retrieval / creation
parent
e49d4c9ff5
commit
1d952b34dd
12
Cargo.toml
12
Cargo.toml
|
@ -15,7 +15,7 @@ log = "0.4"
|
|||
anyhow = "1.0"
|
||||
thiserror = "1.0"
|
||||
|
||||
diesel = {version = "1.4", features = ["sqlite", "r2d2", "chrono", "serde_json"]}
|
||||
diesel = { version = "1.4", features = ["sqlite", "r2d2", "chrono", "serde_json"] }
|
||||
diesel_migrations = "1.4"
|
||||
|
||||
actix = "0.9.0"
|
||||
|
@ -24,15 +24,15 @@ actix-rt = "1.0.0"
|
|||
actix-web = "2.0"
|
||||
actix_derive = "0.3.2"
|
||||
|
||||
chrono = {version = "0.4", features = ["serde"]}
|
||||
serde = {version = "1.0", features = ["derive"]}
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
||||
bs58 = "0.3.1"
|
||||
filebuffer = "0.4.0"
|
||||
tiny-keccak = {version = "2.0", features = ["k12"]}
|
||||
unsigned-varint = "0.5.0"
|
||||
uuid = {version = "0.8", features = ["v4"]}
|
||||
tiny-keccak = { version = "2.0", features = ["k12"] }
|
||||
unsigned-varint = { version = "0.5.0", features = ["std"] }
|
||||
uuid = { version = "0.8", features = ["v4"] }
|
||||
walkdir = "2"
|
||||
|
||||
dotenv = "0.15.0"
|
||||
|
|
|
@ -1,2 +1,4 @@
|
|||
-- This file should undo anything in `up.sql`
|
||||
DROP TABLE files;
|
||||
DROP TABLE meta;
|
||||
DROP TABLE files;
|
||||
DROP TABLE data;
|
|
@ -8,7 +8,7 @@ INSERT INTO meta (key, value) VALUES ('version', '0.1.0');
|
|||
|
||||
CREATE TABLE files (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
|
||||
hash VARCHAR NOT NULL,
|
||||
hash BLOB NOT NULL,
|
||||
path VARCHAR NOT NULL,
|
||||
size BIGINT NOT NULL,
|
||||
created DATETIME NOT NULL,
|
||||
|
|
|
@ -8,7 +8,7 @@ use uuid::Uuid;
|
|||
|
||||
use crate::hash::{encode, Hash};
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub enum Address {
|
||||
Hash(Hash),
|
||||
UUID(Uuid),
|
||||
|
@ -36,10 +36,8 @@ impl Address {
|
|||
}
|
||||
|
||||
pub fn decode(buffer: &Vec<u8>) -> Result<Self> {
|
||||
let (hash_func_type, rest) =
|
||||
unsigned_varint::decode::u128(buffer).map_err(|_| anyhow!("varint decode error"))?;
|
||||
let (digest_len, rest) =
|
||||
unsigned_varint::decode::usize(rest).map_err(|_| anyhow!("varint decode error"))?;
|
||||
let (hash_func_type, rest) = unsigned_varint::decode::u128(buffer)?;
|
||||
let (digest_len, rest) = unsigned_varint::decode::usize(rest)?;
|
||||
let digest = rest;
|
||||
if digest_len != digest.len() {
|
||||
Err(anyhow!(
|
||||
|
|
121
src/database.rs
121
src/database.rs
|
@ -1,3 +1,4 @@
|
|||
use std::convert::TryFrom;
|
||||
use std::io::{Cursor, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
@ -8,34 +9,47 @@ use anyhow::{anyhow, Result};
|
|||
use diesel::prelude::*;
|
||||
use diesel::r2d2::{self, ConnectionManager};
|
||||
use diesel::sqlite::SqliteConnection;
|
||||
use log::debug;
|
||||
use log::{debug, trace};
|
||||
|
||||
use crate::addressing::Address;
|
||||
use crate::hash::{decode, encode, hash, Hash, Hashable};
|
||||
use crate::models;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Entry {
|
||||
identity: Hash,
|
||||
target: Address,
|
||||
key: String,
|
||||
value: EntryValue,
|
||||
pub identity: Hash,
|
||||
pub target: Address,
|
||||
pub key: String,
|
||||
pub value: EntryValue,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InnerEntry {
|
||||
target: Address,
|
||||
key: String,
|
||||
value: EntryValue,
|
||||
pub target: Address,
|
||||
pub key: String,
|
||||
pub value: EntryValue,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum EntryValue {
|
||||
Value(serde_json::Value),
|
||||
Address(Address),
|
||||
Invalid,
|
||||
}
|
||||
|
||||
impl TryFrom<models::Entry> for Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(e: models::Entry) -> Result<Self, Self::Error> {
|
||||
Ok(Entry {
|
||||
identity: Hash(e.identity),
|
||||
target: Address::decode(&e.target)?,
|
||||
key: e.key,
|
||||
value: e.value.parse().unwrap(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl EntryValue {
|
||||
fn to_str(&self) -> Result<String> {
|
||||
let (type_char, content) = match self {
|
||||
|
@ -96,7 +110,7 @@ pub struct InsertFile {
|
|||
#[derive(Message)]
|
||||
#[rtype(result = "Result<Option<String>>")]
|
||||
pub struct RetrieveByHash {
|
||||
pub hash: String,
|
||||
pub hash: Hash,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
|
@ -107,8 +121,26 @@ pub struct LookupByFilename {
|
|||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<Vec<Entry>>")]
|
||||
pub struct RetrieveEntries {
|
||||
pub hash: Vec<u8>,
|
||||
pub struct RetrieveObject {
|
||||
pub target: Address,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<Vec<Entry>>")]
|
||||
pub struct QueryEntries {
|
||||
pub target: Option<Address>,
|
||||
pub key: Option<String>,
|
||||
pub value: Option<EntryValue>,
|
||||
}
|
||||
|
||||
impl Default for QueryEntries {
|
||||
fn default() -> Self {
|
||||
QueryEntries {
|
||||
target: None,
|
||||
key: None,
|
||||
value: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
|
@ -125,7 +157,12 @@ impl Handler<InsertFile> for DbExecutor {
|
|||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
debug!("Inserting {:?}...", msg.file);
|
||||
debug!(
|
||||
"Inserting {} ({})...",
|
||||
&msg.file.path,
|
||||
encode(&msg.file.hash)
|
||||
);
|
||||
trace!("{:?}", msg.file);
|
||||
|
||||
Ok(diesel::insert_into(files::table)
|
||||
.values(msg.file)
|
||||
|
@ -142,8 +179,8 @@ impl Handler<RetrieveByHash> for DbExecutor {
|
|||
let connection = &self.0.get()?;
|
||||
|
||||
let matches = files
|
||||
.filter(hash.eq(msg.hash))
|
||||
.filter(valid.eq(true))
|
||||
.filter(hash.eq(msg.hash.0))
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
Ok(matches.get(0).map(|f| f.path.clone()))
|
||||
|
@ -167,27 +204,57 @@ impl Handler<LookupByFilename> for DbExecutor {
|
|||
}
|
||||
}
|
||||
|
||||
impl Handler<RetrieveEntries> for DbExecutor {
|
||||
impl Handler<RetrieveObject> for DbExecutor {
|
||||
type Result = Result<Vec<Entry>>;
|
||||
|
||||
fn handle(&mut self, msg: RetrieveEntries, _: &mut Self::Context) -> Self::Result {
|
||||
fn handle(&mut self, msg: RetrieveObject, _: &mut Self::Context) -> Self::Result {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
let matches = data
|
||||
.filter(target.eq(msg.hash))
|
||||
.filter(target.eq(msg.target.encode()?))
|
||||
.or_filter(
|
||||
value.eq(EntryValue::Address(Address::Hash(Hash(msg.target.encode()?))).to_str()?),
|
||||
)
|
||||
.load::<models::Entry>(connection)?;
|
||||
let entries = matches
|
||||
.into_iter()
|
||||
.map(|e| -> Result<Entry> {
|
||||
Ok(Entry {
|
||||
identity: Hash(e.identity),
|
||||
target: Address::decode(&e.target)?,
|
||||
key: e.key,
|
||||
value: e.value.parse().unwrap(),
|
||||
})
|
||||
})
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<QueryEntries> for DbExecutor {
|
||||
type Result = Result<Vec<Entry>>;
|
||||
|
||||
fn handle(&mut self, msg: QueryEntries, _: &mut Self::Context) -> Self::Result {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
let mut query = data.into_boxed();
|
||||
|
||||
if let Some(q_target) = msg.target {
|
||||
query = query.filter(target.eq(q_target.encode()?));
|
||||
}
|
||||
|
||||
if let Some(q_key) = msg.key {
|
||||
query = query.filter(key.eq(q_key));
|
||||
}
|
||||
|
||||
if let Some(q_value) = msg.value {
|
||||
query = query.filter(value.eq(q_value.to_str()?));
|
||||
}
|
||||
|
||||
let matches = query.load::<models::Entry>(connection)?;
|
||||
|
||||
let entries = matches
|
||||
.into_iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
|
|
|
@ -1,16 +1,171 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::{Entry, EntryValue, InnerEntry, InsertEntry, QueryEntries};
|
||||
use crate::hash::{ComputeHash, Hash, HasherWorker};
|
||||
use crate::models;
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::{info, warn};
|
||||
use serde::export::Formatter;
|
||||
use serde_json::Value;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use actix::prelude::*;
|
||||
use anyhow::Result;
|
||||
use chrono::prelude::*;
|
||||
use log::{info, warn};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::hash::{encode, ComputeHash, Hash, HasherWorker};
|
||||
use crate::models::NewFile;
|
||||
use actix::prelude::*;
|
||||
use chrono::prelude::*;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn update_directory<T: AsRef<Path>>(
|
||||
const DIR_KEY: &str = "DIR";
|
||||
const DIR_HAS_KEY: &str = "DIR_HAS";
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct UDirectory {
|
||||
name: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct UPath(Vec<UDirectory>);
|
||||
|
||||
const TOP_SEPARATOR: &str = "//";
|
||||
|
||||
impl std::str::FromStr for UPath {
|
||||
type Err = anyhow::Error;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if s.len() == 0 {
|
||||
Ok(UPath(vec![]))
|
||||
} else {
|
||||
match s.find(TOP_SEPARATOR) {
|
||||
Some(head_idx) => {
|
||||
let (head, rest) = s.split_at(head_idx);
|
||||
let mut result: Vec<UDirectory> = Vec::new();
|
||||
result.push(UDirectory {
|
||||
name: String::from(head),
|
||||
});
|
||||
result.append(
|
||||
rest[TOP_SEPARATOR.len()..rest.len()]
|
||||
.split("/")
|
||||
.map(|part| UDirectory {
|
||||
name: String::from(part),
|
||||
})
|
||||
.collect::<Vec<UDirectory>>()
|
||||
.as_mut(),
|
||||
);
|
||||
Ok(UPath(result))
|
||||
}
|
||||
None => Ok(UPath(
|
||||
s.split("/")
|
||||
.map(|part| UDirectory {
|
||||
name: String::from(part),
|
||||
})
|
||||
.collect(),
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for UDirectory {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.name)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for UPath {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self.0.len() {
|
||||
0 => write!(f, ""),
|
||||
1 => write!(f, "{}", self.0.first().unwrap().name),
|
||||
_ => {
|
||||
let (head, tail) = self.0.split_first().unwrap();
|
||||
write!(
|
||||
f,
|
||||
"{}//{}",
|
||||
head.name,
|
||||
tail.iter()
|
||||
.map(|udir| udir.name.clone())
|
||||
.collect::<Vec<String>>()
|
||||
.join("/")
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn fetch_or_create_dir(
|
||||
db_executor: &Addr<crate::database::DbExecutor>,
|
||||
parent: Option<Address>,
|
||||
directory: UDirectory,
|
||||
) -> Result<Address> {
|
||||
let dir_value = EntryValue::Value(Value::String(directory.name));
|
||||
let directories: Vec<Address> = db_executor
|
||||
.send(QueryEntries {
|
||||
target: None,
|
||||
key: Some(String::from(DIR_KEY)),
|
||||
value: Some(dir_value.clone()),
|
||||
})
|
||||
.await??
|
||||
.into_iter()
|
||||
.map(|e: Entry| e.target)
|
||||
.collect();
|
||||
|
||||
let valid_directories: Vec<Address> = match parent.clone() {
|
||||
Some(address) => {
|
||||
let parent_has: Vec<Address> = db_executor
|
||||
.send(QueryEntries {
|
||||
target: Some(address),
|
||||
key: Some(String::from(DIR_HAS_KEY)),
|
||||
value: None,
|
||||
})
|
||||
.await??
|
||||
.into_iter()
|
||||
.map(|e: Entry| e.target)
|
||||
.collect();
|
||||
|
||||
directories
|
||||
.into_iter()
|
||||
.filter(|a| parent_has.contains(a))
|
||||
.collect()
|
||||
}
|
||||
None => directories,
|
||||
};
|
||||
|
||||
match valid_directories.len() {
|
||||
0 => {
|
||||
let new_directory_address = Address::UUID(Uuid::new_v4());
|
||||
let directory_entry = InnerEntry {
|
||||
target: new_directory_address.clone(),
|
||||
key: String::from(DIR_KEY),
|
||||
value: dir_value,
|
||||
};
|
||||
let _ = db_executor
|
||||
.send(InsertEntry {
|
||||
entry: directory_entry,
|
||||
})
|
||||
.await??;
|
||||
|
||||
if parent.is_some() {
|
||||
let has_entry = InnerEntry {
|
||||
target: parent.unwrap(),
|
||||
key: String::from(DIR_HAS_KEY),
|
||||
value: EntryValue::Address(new_directory_address.clone()),
|
||||
};
|
||||
let _ = db_executor.send(InsertEntry { entry: has_entry }).await??;
|
||||
}
|
||||
|
||||
Ok(new_directory_address)
|
||||
}
|
||||
1 => Ok(valid_directories[0].clone()),
|
||||
_ => Err(anyhow!(
|
||||
"Invalid database state - more than one directory matches the query!"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fetch_path(path: &UPath) -> Vec<Entry> {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
async fn _reimport_directory<T: AsRef<Path>>(
|
||||
directory: T,
|
||||
db_executor: &Addr<crate::database::DbExecutor>,
|
||||
hasher_worker: &Addr<HasherWorker>,
|
||||
|
@ -34,13 +189,13 @@ pub async fn update_directory<T: AsRef<Path>>(
|
|||
|
||||
let digest: Result<Result<Hash>, MailboxError> = hasher_worker.send(msg).await;
|
||||
|
||||
let new_file = NewFile {
|
||||
let new_file = models::NewFile {
|
||||
path: entry
|
||||
.path()
|
||||
.to_str()
|
||||
.expect("path not valid unicode?!")
|
||||
.to_string(),
|
||||
hash: encode(digest??.0),
|
||||
hash: digest??.0,
|
||||
size,
|
||||
created: NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0),
|
||||
};
|
||||
|
@ -54,13 +209,45 @@ pub async fn update_directory<T: AsRef<Path>>(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_directory_bg(
|
||||
pub async fn reimport_directory(
|
||||
directory: PathBuf,
|
||||
db_executor: Addr<crate::database::DbExecutor>,
|
||||
hasher_worker: Addr<HasherWorker>,
|
||||
) {
|
||||
let result = update_directory(directory, &db_executor, &hasher_worker).await;
|
||||
let result = _reimport_directory(directory, &db_executor, &hasher_worker).await;
|
||||
if result.is_err() {
|
||||
warn!("Update did not succeed!");
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::filesystem::{UDirectory, UPath};
|
||||
use anyhow::Result;
|
||||
|
||||
#[test]
|
||||
fn test_path_codec() {
|
||||
let path = UPath(vec![
|
||||
UDirectory {
|
||||
name: "top".to_string(),
|
||||
},
|
||||
UDirectory {
|
||||
name: "foo".to_string(),
|
||||
},
|
||||
UDirectory {
|
||||
name: "bar".to_string(),
|
||||
},
|
||||
UDirectory {
|
||||
name: "baz".to_string(),
|
||||
},
|
||||
]);
|
||||
|
||||
let str_path = path.to_string();
|
||||
assert!(str_path.len() > 0);
|
||||
|
||||
let decoded_path: Result<UPath> = str_path.parse();
|
||||
assert!(decoded_path.is_ok());
|
||||
|
||||
assert_eq!(path, decoded_path.unwrap());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use bs58;
|
|||
use filebuffer::FileBuffer;
|
||||
use tiny_keccak::{Hasher, KangarooTwelve};
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct Hash(pub Vec<u8>);
|
||||
|
||||
// impl Hash {
|
||||
|
|
|
@ -91,7 +91,7 @@ fn main() -> std::io::Result<()> {
|
|||
|
||||
if open_result.new {
|
||||
info!("The vault has been just created, running initial update...");
|
||||
actix::spawn(filesystem::update_directory_bg(
|
||||
actix::spawn(filesystem::reimport_directory(
|
||||
vault_path, db_addr, hash_addr,
|
||||
));
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use super::schema::{data, files};
|
|||
#[derive(Queryable, Serialize, Debug)]
|
||||
pub struct File {
|
||||
pub id: i32,
|
||||
pub hash: String,
|
||||
pub hash: Vec<u8>,
|
||||
pub path: String,
|
||||
pub size: i64,
|
||||
pub created: NaiveDateTime,
|
||||
|
@ -16,7 +16,7 @@ pub struct File {
|
|||
#[derive(Insertable, Debug)]
|
||||
#[table_name = "files"]
|
||||
pub struct NewFile {
|
||||
pub hash: String,
|
||||
pub hash: Vec<u8>,
|
||||
pub path: String,
|
||||
pub size: i64,
|
||||
pub created: NaiveDateTime,
|
||||
|
|
|
@ -3,8 +3,11 @@ use std::path::PathBuf;
|
|||
use actix::prelude::*;
|
||||
use actix_files::NamedFile;
|
||||
use actix_web::{error, get, post, web, Error, HttpResponse};
|
||||
use log::debug;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::hash::{decode, Hash};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct State {
|
||||
pub directory: PathBuf,
|
||||
|
@ -17,10 +20,12 @@ pub async fn get_raw(state: web::Data<State>, hash: web::Path<String>) -> Result
|
|||
let response = state
|
||||
.db
|
||||
.send(crate::database::RetrieveByHash {
|
||||
hash: hash.into_inner(),
|
||||
hash: Hash(decode(hash.into_inner()).map_err(error::ErrorInternalServerError)?),
|
||||
})
|
||||
.await?;
|
||||
|
||||
debug!("{:?}", response);
|
||||
|
||||
match response {
|
||||
Ok(result) => match result {
|
||||
Some(path) => Ok(NamedFile::open(path)?),
|
||||
|
@ -50,9 +55,10 @@ pub async fn get_lookup(
|
|||
|
||||
#[post("/api/refresh")]
|
||||
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||
crate::filesystem::update_directory(&state.directory, &state.db, &state.hasher)
|
||||
.await
|
||||
.map_err(|_| error::ErrorInternalServerError("UPDATE ERROR"))?;
|
||||
|
||||
actix::spawn(crate::filesystem::reimport_directory(
|
||||
state.directory.clone(),
|
||||
state.db.clone(),
|
||||
state.hasher.clone(),
|
||||
));
|
||||
Ok(HttpResponse::Ok().finish())
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ table! {
|
|||
table! {
|
||||
files (id) {
|
||||
id -> Integer,
|
||||
hash -> Text,
|
||||
hash -> Binary,
|
||||
path -> Text,
|
||||
size -> BigInt,
|
||||
created -> Timestamp,
|
||||
|
|
Loading…
Reference in New Issue