remove DbExecutor, store plain pool in state; refactor into functions

also put loggersink in util.rs
feat/vaults
Tomáš Mládek 2020-09-15 19:26:47 +02:00
parent 36e3c4a145
commit 00e0dc288c
5 changed files with 186 additions and 252 deletions

View File

@ -2,12 +2,10 @@ use crate::addressing::Address;
use crate::hash::{decode, hash, Hash, Hashable};
use crate::models;
use crate::util::LoggerSink;
use actix::prelude::*;
use actix_derive::Message;
use anyhow::{anyhow, Result};
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager};
use diesel::sqlite::SqliteConnection;
use diesel::sqlite::{Sqlite, SqliteConnection};
use log::debug;
use serde::export::Formatter;
use serde_json::json;
@ -135,183 +133,125 @@ impl std::str::FromStr for EntryValue {
}
}
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
pub fn insert_file<C: Connection<Backend = Sqlite>>(
connection: &C,
file: models::NewFile,
) -> Result<usize> {
use crate::schema::files;
pub struct DbExecutor(pub DbPool);
debug!(
"Inserting {} ({})...",
&file.path,
Address::Hash(Hash((&file.hash).clone()))
);
impl Actor for DbExecutor {
type Context = SyncContext<Self>;
Ok(diesel::insert_into(files::table)
.values(file)
.execute(connection)?)
}
#[derive(Message)]
#[rtype(result = "Result<usize>")]
pub struct InsertFile {
pub file: models::NewFile,
pub fn retrieve_by_hash<C: Connection<Backend = Sqlite>>(
connection: &C,
obj_hash: Hash,
) -> Result<Option<String>> {
use crate::schema::files::dsl::*;
let matches = files
.filter(valid.eq(true))
.filter(hash.eq(obj_hash.0))
.load::<models::File>(connection)?;
Ok(matches.get(0).map(|f| f.path.clone()))
}
impl Handler<InsertFile> for DbExecutor {
type Result = Result<usize>;
pub fn lookup_by_filename<C: Connection<Backend = Sqlite>>(
connection: &C,
query: String,
) -> Result<Vec<models::File>> {
use crate::schema::files::dsl::*;
fn handle(&mut self, msg: InsertFile, _: &mut Self::Context) -> Self::Result {
use crate::schema::files;
let matches = files
.filter(path.like(format!("%{}%", query)))
.filter(valid.eq(true))
.load::<models::File>(connection)?;
let connection = &self.0.get()?;
debug!(
"Inserting {} ({})...",
&msg.file.path,
Address::Hash(Hash((&msg.file.hash).clone()))
);
Ok(diesel::insert_into(files::table)
.values(msg.file)
.execute(connection)?)
}
Ok(matches)
}
#[derive(Message)]
#[rtype(result = "Result<Option<String>>")]
pub struct RetrieveByHash {
pub hash: Hash,
pub fn retrieve_object<C: Connection<Backend = Sqlite>>(
connection: &C,
object_address: Address,
) -> Result<Vec<Entry>> {
use crate::schema::data::dsl::*;
let matches = data
.filter(target.eq(object_address.encode()?))
.or_filter(value.eq(EntryValue::Address(object_address).to_str()?))
.load::<models::Entry>(connection)?;
let entries = matches
.into_iter()
.map(Entry::try_from)
.filter_map(Result::ok)
.collect();
Ok(entries)
}
impl Handler<RetrieveByHash> for DbExecutor {
type Result = Result<Option<String>>;
fn handle(&mut self, msg: RetrieveByHash, _: &mut Self::Context) -> Self::Result {
use crate::schema::files::dsl::*;
let connection = &self.0.get()?;
let matches = files
.filter(valid.eq(true))
.filter(hash.eq(msg.hash.0))
.load::<models::File>(connection)?;
Ok(matches.get(0).map(|f| f.path.clone()))
}
}
#[derive(Message)]
#[rtype(result = "Result<Vec<models::File>>")]
pub struct LookupByFilename {
pub query: String,
}
impl Handler<LookupByFilename> for DbExecutor {
type Result = Result<Vec<models::File>>;
fn handle(&mut self, msg: LookupByFilename, _: &mut Self::Context) -> Self::Result {
use crate::schema::files::dsl::*;
let connection = &self.0.get()?;
let matches = files
.filter(path.like(format!("%{}%", msg.query)))
.filter(valid.eq(true))
.load::<models::File>(connection)?;
Ok(matches)
}
}
#[derive(Message)]
#[rtype(result = "Result<Vec<Entry>>")]
pub struct RetrieveObject {
pub target: Address,
}
impl Handler<RetrieveObject> for DbExecutor {
type Result = Result<Vec<Entry>>;
fn handle(&mut self, msg: RetrieveObject, _: &mut Self::Context) -> Self::Result {
use crate::schema::data::dsl::*;
let connection = &self.0.get()?;
let matches = data
.filter(target.eq(msg.target.encode()?))
.or_filter(value.eq(EntryValue::Address(msg.target).to_str()?))
.load::<models::Entry>(connection)?;
let entries = matches
.into_iter()
.map(Entry::try_from)
.filter_map(Result::ok)
.collect();
Ok(entries)
}
}
#[derive(Message)]
#[rtype(result = "Result<Vec<Entry>>")]
pub struct QueryEntries {
pub struct EntryQuery {
pub target: Option<Address>,
pub key: Option<String>,
pub value: Option<EntryValue>,
}
impl Handler<QueryEntries> for DbExecutor {
type Result = Result<Vec<Entry>>;
pub fn query_entries<C: Connection<Backend = Sqlite>>(
connection: &C,
entry_query: EntryQuery,
) -> Result<Vec<Entry>> {
use crate::schema::data::dsl::*;
fn handle(&mut self, msg: QueryEntries, _: &mut Self::Context) -> Self::Result {
use crate::schema::data::dsl::*;
let mut query = data.into_boxed();
let connection = &self.0.get()?;
let mut query = data.into_boxed();
if let Some(q_target) = msg.target {
query = query.filter(target.eq(q_target.encode()?));
}
if let Some(q_key) = msg.key {
query = query.filter(key.eq(q_key));
}
if let Some(q_value) = msg.value {
query = query.filter(value.eq(q_value.to_str()?));
}
let matches = query.load::<models::Entry>(connection)?;
let entries = matches
.into_iter()
.map(Entry::try_from)
.filter_map(Result::ok)
.collect();
Ok(entries)
if let Some(q_target) = entry_query.target {
query = query.filter(target.eq(q_target.encode()?));
}
if let Some(q_key) = entry_query.key {
query = query.filter(key.eq(q_key));
}
if let Some(q_value) = entry_query.value {
query = query.filter(value.eq(q_value.to_str()?));
}
let matches = query.load::<models::Entry>(connection)?;
let entries = matches
.into_iter()
.map(Entry::try_from)
.filter_map(Result::ok)
.collect();
Ok(entries)
}
#[derive(Message)]
#[rtype(result = "Result<usize>")]
pub struct InsertEntry {
pub entry: InnerEntry,
}
pub fn insert_entry<C: Connection<Backend = Sqlite>>(
connection: &C,
entry: InnerEntry,
) -> Result<usize> {
use crate::schema::data;
impl Handler<InsertEntry> for DbExecutor {
type Result = Result<usize>;
debug!("Inserting: {}", entry);
fn handle(&mut self, msg: InsertEntry, _: &mut Self::Context) -> Self::Result {
use crate::schema::data;
let insert_entry = models::Entry {
identity: entry.hash()?.0,
target: entry.target.encode()?,
key: entry.key,
value: entry.value.to_str()?,
};
let connection = &self.0.get()?;
debug!("Inserting: {}", msg.entry);
let insert_entry = models::Entry {
identity: msg.entry.hash()?.0,
target: msg.entry.target.encode()?,
key: msg.entry.key,
value: msg.entry.value.to_str()?,
};
Ok(diesel::insert_into(data::table)
.values(insert_entry)
.execute(connection)?)
}
Ok(diesel::insert_into(data::table)
.values(insert_entry)
.execute(connection)?)
}
#[derive(Debug)]
@ -340,6 +280,8 @@ impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
}
}
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
pub struct OpenResult {
pub pool: DbPool,
pub new: bool,

View File

@ -1,11 +1,12 @@
use crate::addressing::Address;
use crate::database::{
DbExecutor, Entry, EntryValue, InnerEntry, InsertEntry, QueryEntries, RetrieveObject,
insert_entry, insert_file, query_entries, retrieve_object, Entry, EntryQuery, EntryValue,
InnerEntry,
};
use crate::hash::{ComputeHash, HasherWorker};
use crate::models;
use anyhow::{anyhow, Result};
use log::{info, trace, warn};
use log::{error, info, trace};
use serde::export::Formatter;
use serde_json::Value;
use std::path::{Component, Path, PathBuf};
@ -14,6 +15,8 @@ use walkdir::WalkDir;
use actix::prelude::*;
use chrono::prelude::*;
use diesel::sqlite::Sqlite;
use diesel::Connection;
use uuid::Uuid;
const DIR_KEY: &str = "DIR";
@ -124,23 +127,25 @@ impl EntryList for Vec<Entry> {
}
}
pub async fn list_roots(db_executor: &Addr<crate::database::DbExecutor>) -> Result<Vec<Entry>> {
let all_directories: Vec<Entry> = db_executor
.send(QueryEntries {
pub async fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec<Entry>> {
let all_directories: Vec<Entry> = query_entries(
connection,
EntryQuery {
target: None,
key: Some(DIR_KEY.to_string()),
value: None,
})
.await??;
},
)?;
let directories_with_parents: Vec<Address> = db_executor
.send(QueryEntries {
let directories_with_parents: Vec<Address> = query_entries(
connection,
EntryQuery {
target: None,
key: Some(DIR_HAS_KEY.to_string()),
value: None,
})
.await??
.extract_addresses();
},
)?
.extract_addresses();
Ok(all_directories
.into_iter()
@ -148,34 +153,36 @@ pub async fn list_roots(db_executor: &Addr<crate::database::DbExecutor>) -> Resu
.collect())
}
pub async fn list_directory(db_executor: &Addr<DbExecutor>, path: &UPath) -> Result<Vec<Entry>> {
pub async fn list_directory<C: Connection<Backend = Sqlite>>(
connection: &C,
path: &UPath,
) -> Result<Vec<Entry>> {
let entry_addresses = match path.0.len() {
0 => list_roots(db_executor)
0 => list_roots(connection)
.await?
.into_iter()
.map(|e| e.target)
.collect(),
_ => {
let resolved_path: Vec<Address> = resolve_path(db_executor, path, false).await?;
let resolved_path: Vec<Address> = resolve_path(connection, path, false).await?;
let last = resolved_path.last().unwrap();
db_executor
.send(QueryEntries {
query_entries(
connection,
EntryQuery {
target: Some(last.clone()),
key: Some(DIR_HAS_KEY.to_string()),
value: None,
})
.await??
.extract_addresses()
},
)?
.extract_addresses()
}
};
let mut result: Vec<Entry> = vec![];
for address in entry_addresses {
result.extend(
db_executor
.send(RetrieveObject { target: address })
.await??
retrieve_object(connection, address)?
.into_iter()
.filter(|e| [DIR_KEY, FILENAME_KEY, FILE_IDENTITY_KEY].contains(&e.key.as_str()))
.collect::<Vec<Entry>>(),
@ -184,8 +191,8 @@ pub async fn list_directory(db_executor: &Addr<DbExecutor>, path: &UPath) -> Res
Ok(result)
}
pub async fn fetch_or_create_dir(
db_executor: &Addr<crate::database::DbExecutor>,
pub async fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
connection: &C,
parent: Option<Address>,
directory: UDirectory,
create: bool,
@ -196,27 +203,29 @@ pub async fn fetch_or_create_dir(
}
let dir_value = EntryValue::Value(Value::String(directory.name));
let directories: Vec<Address> = db_executor
.send(QueryEntries {
let directories: Vec<Address> = query_entries(
connection,
EntryQuery {
target: None,
key: Some(String::from(DIR_KEY)),
value: Some(dir_value.clone()),
})
.await??
.into_iter()
.map(|e: Entry| e.target)
.collect();
},
)?
.into_iter()
.map(|e: Entry| e.target)
.collect();
let valid_directories: Vec<Address> = match parent.clone() {
Some(address) => {
let parent_has: Vec<Address> = db_executor
.send(QueryEntries {
let parent_has: Vec<Address> = query_entries(
connection,
EntryQuery {
target: Some(address),
key: Some(String::from(DIR_HAS_KEY)),
value: None,
})
.await??
.extract_addresses();
},
)?
.extract_addresses();
directories
.into_iter()
@ -235,11 +244,7 @@ pub async fn fetch_or_create_dir(
key: String::from(DIR_KEY),
value: dir_value,
};
let _ = db_executor
.send(InsertEntry {
entry: directory_entry,
})
.await??;
let _ = insert_entry(connection, directory_entry)?;
if parent.is_some() {
let has_entry = InnerEntry {
@ -247,7 +252,7 @@ pub async fn fetch_or_create_dir(
key: String::from(DIR_HAS_KEY),
value: EntryValue::Address(new_directory_address.clone()),
};
let _ = db_executor.send(InsertEntry { entry: has_entry }).await??;
let _ = insert_entry(connection, has_entry)?;
}
Ok(new_directory_address)
@ -262,8 +267,8 @@ pub async fn fetch_or_create_dir(
}
}
pub async fn resolve_path(
db_executor: &Addr<DbExecutor>,
pub async fn resolve_path<C: Connection<Backend = Sqlite>>(
connection: &C,
path: &UPath,
create: bool,
) -> Result<Vec<Address>> {
@ -273,7 +278,7 @@ pub async fn resolve_path(
path_stack.reverse();
while !path_stack.is_empty() {
let dir_address = fetch_or_create_dir(
db_executor,
connection,
result.last().cloned(),
path_stack.pop().unwrap(),
create,
@ -285,9 +290,9 @@ pub async fn resolve_path(
Ok(result)
}
async fn _reimport_directory<T: AsRef<Path>>(
async fn _reimport_directory<C: Connection<Backend = Sqlite>, T: AsRef<Path>>(
connection: &C,
directory: T,
db_executor: &Addr<crate::database::DbExecutor>,
hasher_worker: &Addr<HasherWorker>,
) -> Result<()> {
let absolute_path = fs::canonicalize(&directory)?;
@ -328,9 +333,7 @@ async fn _reimport_directory<T: AsRef<Path>>(
size,
};
db_executor
.send(crate::database::InsertFile { file: new_file })
.await??;
let _ = insert_file(connection, new_file)?;
let components = path
.strip_prefix(&absolute_path)?
@ -347,20 +350,15 @@ async fn _reimport_directory<T: AsRef<Path>>(
filename.as_os_str().to_string_lossy().to_string(),
)),
};
db_executor
.send(crate::database::InsertEntry { entry: name_entry })
.await??;
let _ = insert_entry(connection, name_entry)?;
let identity_entry = InnerEntry {
target: file_address.clone(),
key: FILE_IDENTITY_KEY.to_string(),
value: EntryValue::Address(Address::Hash(digest.clone())),
};
db_executor
.send(crate::database::InsertEntry {
entry: identity_entry,
})
.await??;
let _ = insert_entry(connection, identity_entry)?;
let upath = UPath(
iter::once(UDirectory {
@ -371,32 +369,28 @@ async fn _reimport_directory<T: AsRef<Path>>(
}))
.collect(),
);
let resolved_path = resolve_path(db_executor, &upath, true).await?;
let resolved_path = resolve_path(connection, &upath, true).await?;
let parent_dir = resolved_path.last().unwrap();
let dir_has_entry = InnerEntry {
target: parent_dir.clone(),
key: DIR_HAS_KEY.to_string(),
value: EntryValue::Address(file_address),
};
db_executor
.send(crate::database::InsertEntry {
entry: dir_has_entry,
})
.await??;
let _ = insert_entry(connection, dir_has_entry)?;
}
info!("Finished updating {}.", directory.as_ref().display());
Ok(())
}
pub async fn reimport_directory(
pub async fn reimport_directory<C: Connection<Backend = Sqlite>>(
connection: C,
directory: PathBuf,
db_executor: Addr<crate::database::DbExecutor>,
hasher_worker: Addr<HasherWorker>,
) {
let result = _reimport_directory(directory, &db_executor, &hasher_worker).await;
let result = _reimport_directory(&connection, directory, &hasher_worker).await;
if result.is_err() {
warn!("Update did not succeed! {}", result.err().unwrap());
error!("Update did not succeed! {}", result.err().unwrap());
}
}

View File

@ -9,6 +9,7 @@ use std::path::PathBuf;
use actix::prelude::*;
use actix_web::{middleware, App, HttpServer};
use anyhow::Result;
use clap::{App as ClapApp, Arg};
use log::{info, warn};
@ -23,7 +24,7 @@ mod util;
const VERSION: &str = env!("CARGO_PKG_VERSION");
fn main() -> std::io::Result<()> {
fn main() -> Result<()> {
let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info");
env_logger::init_from_env(env);
@ -59,8 +60,7 @@ fn main() -> std::io::Result<()> {
let open_result = database::open_upend(&vault_path, matches.is_present("REINITIALIZE"))
.expect("failed to open database!");
let pool = open_result.pool;
let db_addr = SyncArbiter::start(3, move || database::DbExecutor(pool.clone()));
let db_pool = open_result.pool;
let hash_addr = SyncArbiter::start(4, || hash::HasherWorker);
let bind: SocketAddr = matches
@ -72,7 +72,7 @@ fn main() -> std::io::Result<()> {
let state = routes::State {
directory: vault_path.clone(),
db: db_addr.clone(),
db_pool: db_pool.clone(),
hasher: hash_addr.clone(),
};
@ -99,8 +99,9 @@ fn main() -> std::io::Result<()> {
if open_result.new {
info!("The vault has been just created, running initial update...");
let connection = db_pool.get()?;
actix::spawn(filesystem::reimport_directory(
vault_path, db_addr, hash_addr,
connection, vault_path, hash_addr,
));
}
@ -111,5 +112,5 @@ fn main() -> std::io::Result<()> {
}
}
sys.run()
Ok(sys.run()?)
}

View File

@ -1,5 +1,5 @@
use crate::addressing::Address;
use crate::database::Entry;
use crate::database::{lookup_by_filename, retrieve_by_hash, retrieve_object, DbPool, Entry};
use crate::filesystem::{list_directory, UPath};
use crate::hash::{decode, encode};
use actix::prelude::*;
@ -15,7 +15,7 @@ use std::path::PathBuf;
#[derive(Clone)]
pub struct State {
pub directory: PathBuf,
pub db: Addr<crate::database::DbExecutor>,
pub db_pool: DbPool,
pub hasher: Addr<crate::hash::HasherWorker>,
}
@ -24,10 +24,8 @@ pub async fn get_raw(state: web::Data<State>, hash: web::Path<String>) -> Result
let address = Address::decode(&decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?;
if let Address::Hash(hash) = address {
let response = state
.db
.send(crate::database::RetrieveByHash { hash })
.await?;
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
let response = retrieve_by_hash(&connection, hash);
debug!("{:?}", response);
@ -48,15 +46,12 @@ pub async fn get_object(
state: web::Data<State>,
address_str: web::Path<String>,
) -> Result<HttpResponse, Error> {
let response: Result<Vec<Entry>> = state
.db
.send(crate::database::RetrieveObject {
target: Address::decode(
&decode(address_str.into_inner()).map_err(ErrorInternalServerError)?,
)
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
let response: Result<Vec<Entry>> = retrieve_object(
&connection,
Address::decode(&decode(address_str.into_inner()).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?,
})
.await?;
);
debug!("{:?}", response);
@ -72,8 +67,9 @@ pub async fn list_hier(
state: web::Data<State>,
path: web::Path<String>,
) -> Result<HttpResponse, Error> {
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
let upath: UPath = path.into_inner().parse().map_err(ErrorBadRequest)?;
let entries: Vec<Entry> = list_directory(&state.db, &upath)
let entries: Vec<Entry> = list_directory(&connection, &upath)
.await
.map_err(ErrorNotFound)?; // todo: 500 if actual error occurs
@ -95,19 +91,18 @@ pub async fn get_lookup(
state: web::Data<State>,
web::Query(info): web::Query<LookupRequest>,
) -> Result<HttpResponse, Error> {
let response = state
.db
.send(crate::database::LookupByFilename { query: info.query })
.await?;
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
let response = lookup_by_filename(&connection, info.query);
Ok(HttpResponse::Ok().json(response.map_err(error::ErrorInternalServerError)?))
}
#[post("/api/refresh")]
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
actix::spawn(crate::filesystem::reimport_directory(
connection,
state.directory.clone(),
state.db.clone(),
state.hasher.clone(),
));
Ok(HttpResponse::Ok().finish())

View File

@ -1,6 +1,8 @@
use log::debug;
#[derive(Default)]
pub struct LoggerSink {
buffer: Vec<u8>,
pub buffer: Vec<u8>,
}
impl std::io::Write for LoggerSink {