remove DbExecutor, store plain pool in state; refactor into functions
also put loggersink in util.rsfeat/vaults
parent
36e3c4a145
commit
00e0dc288c
252
src/database.rs
252
src/database.rs
|
@ -2,12 +2,10 @@ use crate::addressing::Address;
|
|||
use crate::hash::{decode, hash, Hash, Hashable};
|
||||
use crate::models;
|
||||
use crate::util::LoggerSink;
|
||||
use actix::prelude::*;
|
||||
use actix_derive::Message;
|
||||
use anyhow::{anyhow, Result};
|
||||
use diesel::prelude::*;
|
||||
use diesel::r2d2::{self, ConnectionManager};
|
||||
use diesel::sqlite::SqliteConnection;
|
||||
use diesel::sqlite::{Sqlite, SqliteConnection};
|
||||
use log::debug;
|
||||
use serde::export::Formatter;
|
||||
use serde_json::json;
|
||||
|
@ -135,183 +133,125 @@ impl std::str::FromStr for EntryValue {
|
|||
}
|
||||
}
|
||||
|
||||
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
||||
pub fn insert_file<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
file: models::NewFile,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::files;
|
||||
|
||||
pub struct DbExecutor(pub DbPool);
|
||||
debug!(
|
||||
"Inserting {} ({})...",
|
||||
&file.path,
|
||||
Address::Hash(Hash((&file.hash).clone()))
|
||||
);
|
||||
|
||||
impl Actor for DbExecutor {
|
||||
type Context = SyncContext<Self>;
|
||||
Ok(diesel::insert_into(files::table)
|
||||
.values(file)
|
||||
.execute(connection)?)
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<usize>")]
|
||||
pub struct InsertFile {
|
||||
pub file: models::NewFile,
|
||||
pub fn retrieve_by_hash<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
obj_hash: Hash,
|
||||
) -> Result<Option<String>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
let matches = files
|
||||
.filter(valid.eq(true))
|
||||
.filter(hash.eq(obj_hash.0))
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
Ok(matches.get(0).map(|f| f.path.clone()))
|
||||
}
|
||||
|
||||
impl Handler<InsertFile> for DbExecutor {
|
||||
type Result = Result<usize>;
|
||||
pub fn lookup_by_filename<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
query: String,
|
||||
) -> Result<Vec<models::File>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
fn handle(&mut self, msg: InsertFile, _: &mut Self::Context) -> Self::Result {
|
||||
use crate::schema::files;
|
||||
let matches = files
|
||||
.filter(path.like(format!("%{}%", query)))
|
||||
.filter(valid.eq(true))
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
debug!(
|
||||
"Inserting {} ({})...",
|
||||
&msg.file.path,
|
||||
Address::Hash(Hash((&msg.file.hash).clone()))
|
||||
);
|
||||
|
||||
Ok(diesel::insert_into(files::table)
|
||||
.values(msg.file)
|
||||
.execute(connection)?)
|
||||
}
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<Option<String>>")]
|
||||
pub struct RetrieveByHash {
|
||||
pub hash: Hash,
|
||||
pub fn retrieve_object<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
object_address: Address,
|
||||
) -> Result<Vec<Entry>> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
let matches = data
|
||||
.filter(target.eq(object_address.encode()?))
|
||||
.or_filter(value.eq(EntryValue::Address(object_address).to_str()?))
|
||||
.load::<models::Entry>(connection)?;
|
||||
let entries = matches
|
||||
.into_iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
impl Handler<RetrieveByHash> for DbExecutor {
|
||||
type Result = Result<Option<String>>;
|
||||
|
||||
fn handle(&mut self, msg: RetrieveByHash, _: &mut Self::Context) -> Self::Result {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
let matches = files
|
||||
.filter(valid.eq(true))
|
||||
.filter(hash.eq(msg.hash.0))
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
Ok(matches.get(0).map(|f| f.path.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<Vec<models::File>>")]
|
||||
pub struct LookupByFilename {
|
||||
pub query: String,
|
||||
}
|
||||
|
||||
impl Handler<LookupByFilename> for DbExecutor {
|
||||
type Result = Result<Vec<models::File>>;
|
||||
|
||||
fn handle(&mut self, msg: LookupByFilename, _: &mut Self::Context) -> Self::Result {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
let matches = files
|
||||
.filter(path.like(format!("%{}%", msg.query)))
|
||||
.filter(valid.eq(true))
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<Vec<Entry>>")]
|
||||
pub struct RetrieveObject {
|
||||
pub target: Address,
|
||||
}
|
||||
|
||||
impl Handler<RetrieveObject> for DbExecutor {
|
||||
type Result = Result<Vec<Entry>>;
|
||||
|
||||
fn handle(&mut self, msg: RetrieveObject, _: &mut Self::Context) -> Self::Result {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
let matches = data
|
||||
.filter(target.eq(msg.target.encode()?))
|
||||
.or_filter(value.eq(EntryValue::Address(msg.target).to_str()?))
|
||||
.load::<models::Entry>(connection)?;
|
||||
let entries = matches
|
||||
.into_iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<Vec<Entry>>")]
|
||||
pub struct QueryEntries {
|
||||
pub struct EntryQuery {
|
||||
pub target: Option<Address>,
|
||||
pub key: Option<String>,
|
||||
pub value: Option<EntryValue>,
|
||||
}
|
||||
|
||||
impl Handler<QueryEntries> for DbExecutor {
|
||||
type Result = Result<Vec<Entry>>;
|
||||
pub fn query_entries<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
entry_query: EntryQuery,
|
||||
) -> Result<Vec<Entry>> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
fn handle(&mut self, msg: QueryEntries, _: &mut Self::Context) -> Self::Result {
|
||||
use crate::schema::data::dsl::*;
|
||||
let mut query = data.into_boxed();
|
||||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
let mut query = data.into_boxed();
|
||||
|
||||
if let Some(q_target) = msg.target {
|
||||
query = query.filter(target.eq(q_target.encode()?));
|
||||
}
|
||||
|
||||
if let Some(q_key) = msg.key {
|
||||
query = query.filter(key.eq(q_key));
|
||||
}
|
||||
|
||||
if let Some(q_value) = msg.value {
|
||||
query = query.filter(value.eq(q_value.to_str()?));
|
||||
}
|
||||
|
||||
let matches = query.load::<models::Entry>(connection)?;
|
||||
|
||||
let entries = matches
|
||||
.into_iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
if let Some(q_target) = entry_query.target {
|
||||
query = query.filter(target.eq(q_target.encode()?));
|
||||
}
|
||||
|
||||
if let Some(q_key) = entry_query.key {
|
||||
query = query.filter(key.eq(q_key));
|
||||
}
|
||||
|
||||
if let Some(q_value) = entry_query.value {
|
||||
query = query.filter(value.eq(q_value.to_str()?));
|
||||
}
|
||||
|
||||
let matches = query.load::<models::Entry>(connection)?;
|
||||
|
||||
let entries = matches
|
||||
.into_iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<usize>")]
|
||||
pub struct InsertEntry {
|
||||
pub entry: InnerEntry,
|
||||
}
|
||||
pub fn insert_entry<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
entry: InnerEntry,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::data;
|
||||
|
||||
impl Handler<InsertEntry> for DbExecutor {
|
||||
type Result = Result<usize>;
|
||||
debug!("Inserting: {}", entry);
|
||||
|
||||
fn handle(&mut self, msg: InsertEntry, _: &mut Self::Context) -> Self::Result {
|
||||
use crate::schema::data;
|
||||
let insert_entry = models::Entry {
|
||||
identity: entry.hash()?.0,
|
||||
target: entry.target.encode()?,
|
||||
key: entry.key,
|
||||
value: entry.value.to_str()?,
|
||||
};
|
||||
|
||||
let connection = &self.0.get()?;
|
||||
|
||||
debug!("Inserting: {}", msg.entry);
|
||||
|
||||
let insert_entry = models::Entry {
|
||||
identity: msg.entry.hash()?.0,
|
||||
target: msg.entry.target.encode()?,
|
||||
key: msg.entry.key,
|
||||
value: msg.entry.value.to_str()?,
|
||||
};
|
||||
|
||||
Ok(diesel::insert_into(data::table)
|
||||
.values(insert_entry)
|
||||
.execute(connection)?)
|
||||
}
|
||||
Ok(diesel::insert_into(data::table)
|
||||
.values(insert_entry)
|
||||
.execute(connection)?)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -340,6 +280,8 @@ impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
|
|||
}
|
||||
}
|
||||
|
||||
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
||||
|
||||
pub struct OpenResult {
|
||||
pub pool: DbPool,
|
||||
pub new: bool,
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::{
|
||||
DbExecutor, Entry, EntryValue, InnerEntry, InsertEntry, QueryEntries, RetrieveObject,
|
||||
insert_entry, insert_file, query_entries, retrieve_object, Entry, EntryQuery, EntryValue,
|
||||
InnerEntry,
|
||||
};
|
||||
use crate::hash::{ComputeHash, HasherWorker};
|
||||
use crate::models;
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::{info, trace, warn};
|
||||
use log::{error, info, trace};
|
||||
use serde::export::Formatter;
|
||||
use serde_json::Value;
|
||||
use std::path::{Component, Path, PathBuf};
|
||||
|
@ -14,6 +15,8 @@ use walkdir::WalkDir;
|
|||
|
||||
use actix::prelude::*;
|
||||
use chrono::prelude::*;
|
||||
use diesel::sqlite::Sqlite;
|
||||
use diesel::Connection;
|
||||
use uuid::Uuid;
|
||||
|
||||
const DIR_KEY: &str = "DIR";
|
||||
|
@ -124,23 +127,25 @@ impl EntryList for Vec<Entry> {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn list_roots(db_executor: &Addr<crate::database::DbExecutor>) -> Result<Vec<Entry>> {
|
||||
let all_directories: Vec<Entry> = db_executor
|
||||
.send(QueryEntries {
|
||||
pub async fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec<Entry>> {
|
||||
let all_directories: Vec<Entry> = query_entries(
|
||||
connection,
|
||||
EntryQuery {
|
||||
target: None,
|
||||
key: Some(DIR_KEY.to_string()),
|
||||
value: None,
|
||||
})
|
||||
.await??;
|
||||
},
|
||||
)?;
|
||||
|
||||
let directories_with_parents: Vec<Address> = db_executor
|
||||
.send(QueryEntries {
|
||||
let directories_with_parents: Vec<Address> = query_entries(
|
||||
connection,
|
||||
EntryQuery {
|
||||
target: None,
|
||||
key: Some(DIR_HAS_KEY.to_string()),
|
||||
value: None,
|
||||
})
|
||||
.await??
|
||||
.extract_addresses();
|
||||
},
|
||||
)?
|
||||
.extract_addresses();
|
||||
|
||||
Ok(all_directories
|
||||
.into_iter()
|
||||
|
@ -148,34 +153,36 @@ pub async fn list_roots(db_executor: &Addr<crate::database::DbExecutor>) -> Resu
|
|||
.collect())
|
||||
}
|
||||
|
||||
pub async fn list_directory(db_executor: &Addr<DbExecutor>, path: &UPath) -> Result<Vec<Entry>> {
|
||||
pub async fn list_directory<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
path: &UPath,
|
||||
) -> Result<Vec<Entry>> {
|
||||
let entry_addresses = match path.0.len() {
|
||||
0 => list_roots(db_executor)
|
||||
0 => list_roots(connection)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|e| e.target)
|
||||
.collect(),
|
||||
_ => {
|
||||
let resolved_path: Vec<Address> = resolve_path(db_executor, path, false).await?;
|
||||
let resolved_path: Vec<Address> = resolve_path(connection, path, false).await?;
|
||||
let last = resolved_path.last().unwrap();
|
||||
|
||||
db_executor
|
||||
.send(QueryEntries {
|
||||
query_entries(
|
||||
connection,
|
||||
EntryQuery {
|
||||
target: Some(last.clone()),
|
||||
key: Some(DIR_HAS_KEY.to_string()),
|
||||
value: None,
|
||||
})
|
||||
.await??
|
||||
.extract_addresses()
|
||||
},
|
||||
)?
|
||||
.extract_addresses()
|
||||
}
|
||||
};
|
||||
|
||||
let mut result: Vec<Entry> = vec![];
|
||||
for address in entry_addresses {
|
||||
result.extend(
|
||||
db_executor
|
||||
.send(RetrieveObject { target: address })
|
||||
.await??
|
||||
retrieve_object(connection, address)?
|
||||
.into_iter()
|
||||
.filter(|e| [DIR_KEY, FILENAME_KEY, FILE_IDENTITY_KEY].contains(&e.key.as_str()))
|
||||
.collect::<Vec<Entry>>(),
|
||||
|
@ -184,8 +191,8 @@ pub async fn list_directory(db_executor: &Addr<DbExecutor>, path: &UPath) -> Res
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn fetch_or_create_dir(
|
||||
db_executor: &Addr<crate::database::DbExecutor>,
|
||||
pub async fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
parent: Option<Address>,
|
||||
directory: UDirectory,
|
||||
create: bool,
|
||||
|
@ -196,27 +203,29 @@ pub async fn fetch_or_create_dir(
|
|||
}
|
||||
|
||||
let dir_value = EntryValue::Value(Value::String(directory.name));
|
||||
let directories: Vec<Address> = db_executor
|
||||
.send(QueryEntries {
|
||||
let directories: Vec<Address> = query_entries(
|
||||
connection,
|
||||
EntryQuery {
|
||||
target: None,
|
||||
key: Some(String::from(DIR_KEY)),
|
||||
value: Some(dir_value.clone()),
|
||||
})
|
||||
.await??
|
||||
.into_iter()
|
||||
.map(|e: Entry| e.target)
|
||||
.collect();
|
||||
},
|
||||
)?
|
||||
.into_iter()
|
||||
.map(|e: Entry| e.target)
|
||||
.collect();
|
||||
|
||||
let valid_directories: Vec<Address> = match parent.clone() {
|
||||
Some(address) => {
|
||||
let parent_has: Vec<Address> = db_executor
|
||||
.send(QueryEntries {
|
||||
let parent_has: Vec<Address> = query_entries(
|
||||
connection,
|
||||
EntryQuery {
|
||||
target: Some(address),
|
||||
key: Some(String::from(DIR_HAS_KEY)),
|
||||
value: None,
|
||||
})
|
||||
.await??
|
||||
.extract_addresses();
|
||||
},
|
||||
)?
|
||||
.extract_addresses();
|
||||
|
||||
directories
|
||||
.into_iter()
|
||||
|
@ -235,11 +244,7 @@ pub async fn fetch_or_create_dir(
|
|||
key: String::from(DIR_KEY),
|
||||
value: dir_value,
|
||||
};
|
||||
let _ = db_executor
|
||||
.send(InsertEntry {
|
||||
entry: directory_entry,
|
||||
})
|
||||
.await??;
|
||||
let _ = insert_entry(connection, directory_entry)?;
|
||||
|
||||
if parent.is_some() {
|
||||
let has_entry = InnerEntry {
|
||||
|
@ -247,7 +252,7 @@ pub async fn fetch_or_create_dir(
|
|||
key: String::from(DIR_HAS_KEY),
|
||||
value: EntryValue::Address(new_directory_address.clone()),
|
||||
};
|
||||
let _ = db_executor.send(InsertEntry { entry: has_entry }).await??;
|
||||
let _ = insert_entry(connection, has_entry)?;
|
||||
}
|
||||
|
||||
Ok(new_directory_address)
|
||||
|
@ -262,8 +267,8 @@ pub async fn fetch_or_create_dir(
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn resolve_path(
|
||||
db_executor: &Addr<DbExecutor>,
|
||||
pub async fn resolve_path<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
path: &UPath,
|
||||
create: bool,
|
||||
) -> Result<Vec<Address>> {
|
||||
|
@ -273,7 +278,7 @@ pub async fn resolve_path(
|
|||
path_stack.reverse();
|
||||
while !path_stack.is_empty() {
|
||||
let dir_address = fetch_or_create_dir(
|
||||
db_executor,
|
||||
connection,
|
||||
result.last().cloned(),
|
||||
path_stack.pop().unwrap(),
|
||||
create,
|
||||
|
@ -285,9 +290,9 @@ pub async fn resolve_path(
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
async fn _reimport_directory<T: AsRef<Path>>(
|
||||
async fn _reimport_directory<C: Connection<Backend = Sqlite>, T: AsRef<Path>>(
|
||||
connection: &C,
|
||||
directory: T,
|
||||
db_executor: &Addr<crate::database::DbExecutor>,
|
||||
hasher_worker: &Addr<HasherWorker>,
|
||||
) -> Result<()> {
|
||||
let absolute_path = fs::canonicalize(&directory)?;
|
||||
|
@ -328,9 +333,7 @@ async fn _reimport_directory<T: AsRef<Path>>(
|
|||
size,
|
||||
};
|
||||
|
||||
db_executor
|
||||
.send(crate::database::InsertFile { file: new_file })
|
||||
.await??;
|
||||
let _ = insert_file(connection, new_file)?;
|
||||
|
||||
let components = path
|
||||
.strip_prefix(&absolute_path)?
|
||||
|
@ -347,20 +350,15 @@ async fn _reimport_directory<T: AsRef<Path>>(
|
|||
filename.as_os_str().to_string_lossy().to_string(),
|
||||
)),
|
||||
};
|
||||
db_executor
|
||||
.send(crate::database::InsertEntry { entry: name_entry })
|
||||
.await??;
|
||||
let _ = insert_entry(connection, name_entry)?;
|
||||
|
||||
let identity_entry = InnerEntry {
|
||||
target: file_address.clone(),
|
||||
key: FILE_IDENTITY_KEY.to_string(),
|
||||
value: EntryValue::Address(Address::Hash(digest.clone())),
|
||||
};
|
||||
db_executor
|
||||
.send(crate::database::InsertEntry {
|
||||
entry: identity_entry,
|
||||
})
|
||||
.await??;
|
||||
|
||||
let _ = insert_entry(connection, identity_entry)?;
|
||||
|
||||
let upath = UPath(
|
||||
iter::once(UDirectory {
|
||||
|
@ -371,32 +369,28 @@ async fn _reimport_directory<T: AsRef<Path>>(
|
|||
}))
|
||||
.collect(),
|
||||
);
|
||||
let resolved_path = resolve_path(db_executor, &upath, true).await?;
|
||||
let resolved_path = resolve_path(connection, &upath, true).await?;
|
||||
let parent_dir = resolved_path.last().unwrap();
|
||||
let dir_has_entry = InnerEntry {
|
||||
target: parent_dir.clone(),
|
||||
key: DIR_HAS_KEY.to_string(),
|
||||
value: EntryValue::Address(file_address),
|
||||
};
|
||||
db_executor
|
||||
.send(crate::database::InsertEntry {
|
||||
entry: dir_has_entry,
|
||||
})
|
||||
.await??;
|
||||
let _ = insert_entry(connection, dir_has_entry)?;
|
||||
}
|
||||
info!("Finished updating {}.", directory.as_ref().display());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn reimport_directory(
|
||||
pub async fn reimport_directory<C: Connection<Backend = Sqlite>>(
|
||||
connection: C,
|
||||
directory: PathBuf,
|
||||
db_executor: Addr<crate::database::DbExecutor>,
|
||||
hasher_worker: Addr<HasherWorker>,
|
||||
) {
|
||||
let result = _reimport_directory(directory, &db_executor, &hasher_worker).await;
|
||||
let result = _reimport_directory(&connection, directory, &hasher_worker).await;
|
||||
if result.is_err() {
|
||||
warn!("Update did not succeed! {}", result.err().unwrap());
|
||||
error!("Update did not succeed! {}", result.err().unwrap());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
13
src/main.rs
13
src/main.rs
|
@ -9,6 +9,7 @@ use std::path::PathBuf;
|
|||
|
||||
use actix::prelude::*;
|
||||
use actix_web::{middleware, App, HttpServer};
|
||||
use anyhow::Result;
|
||||
use clap::{App as ClapApp, Arg};
|
||||
use log::{info, warn};
|
||||
|
||||
|
@ -23,7 +24,7 @@ mod util;
|
|||
|
||||
const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
||||
fn main() -> std::io::Result<()> {
|
||||
fn main() -> Result<()> {
|
||||
let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info");
|
||||
env_logger::init_from_env(env);
|
||||
|
||||
|
@ -59,8 +60,7 @@ fn main() -> std::io::Result<()> {
|
|||
let open_result = database::open_upend(&vault_path, matches.is_present("REINITIALIZE"))
|
||||
.expect("failed to open database!");
|
||||
|
||||
let pool = open_result.pool;
|
||||
let db_addr = SyncArbiter::start(3, move || database::DbExecutor(pool.clone()));
|
||||
let db_pool = open_result.pool;
|
||||
let hash_addr = SyncArbiter::start(4, || hash::HasherWorker);
|
||||
|
||||
let bind: SocketAddr = matches
|
||||
|
@ -72,7 +72,7 @@ fn main() -> std::io::Result<()> {
|
|||
|
||||
let state = routes::State {
|
||||
directory: vault_path.clone(),
|
||||
db: db_addr.clone(),
|
||||
db_pool: db_pool.clone(),
|
||||
hasher: hash_addr.clone(),
|
||||
};
|
||||
|
||||
|
@ -99,8 +99,9 @@ fn main() -> std::io::Result<()> {
|
|||
|
||||
if open_result.new {
|
||||
info!("The vault has been just created, running initial update...");
|
||||
let connection = db_pool.get()?;
|
||||
actix::spawn(filesystem::reimport_directory(
|
||||
vault_path, db_addr, hash_addr,
|
||||
connection, vault_path, hash_addr,
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -111,5 +112,5 @@ fn main() -> std::io::Result<()> {
|
|||
}
|
||||
}
|
||||
|
||||
sys.run()
|
||||
Ok(sys.run()?)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::Entry;
|
||||
use crate::database::{lookup_by_filename, retrieve_by_hash, retrieve_object, DbPool, Entry};
|
||||
use crate::filesystem::{list_directory, UPath};
|
||||
use crate::hash::{decode, encode};
|
||||
use actix::prelude::*;
|
||||
|
@ -15,7 +15,7 @@ use std::path::PathBuf;
|
|||
#[derive(Clone)]
|
||||
pub struct State {
|
||||
pub directory: PathBuf,
|
||||
pub db: Addr<crate::database::DbExecutor>,
|
||||
pub db_pool: DbPool,
|
||||
pub hasher: Addr<crate::hash::HasherWorker>,
|
||||
}
|
||||
|
||||
|
@ -24,10 +24,8 @@ pub async fn get_raw(state: web::Data<State>, hash: web::Path<String>) -> Result
|
|||
let address = Address::decode(&decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
if let Address::Hash(hash) = address {
|
||||
let response = state
|
||||
.db
|
||||
.send(crate::database::RetrieveByHash { hash })
|
||||
.await?;
|
||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
||||
let response = retrieve_by_hash(&connection, hash);
|
||||
|
||||
debug!("{:?}", response);
|
||||
|
||||
|
@ -48,15 +46,12 @@ pub async fn get_object(
|
|||
state: web::Data<State>,
|
||||
address_str: web::Path<String>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let response: Result<Vec<Entry>> = state
|
||||
.db
|
||||
.send(crate::database::RetrieveObject {
|
||||
target: Address::decode(
|
||||
&decode(address_str.into_inner()).map_err(ErrorInternalServerError)?,
|
||||
)
|
||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
||||
let response: Result<Vec<Entry>> = retrieve_object(
|
||||
&connection,
|
||||
Address::decode(&decode(address_str.into_inner()).map_err(ErrorInternalServerError)?)
|
||||
.map_err(ErrorInternalServerError)?,
|
||||
})
|
||||
.await?;
|
||||
);
|
||||
|
||||
debug!("{:?}", response);
|
||||
|
||||
|
@ -72,8 +67,9 @@ pub async fn list_hier(
|
|||
state: web::Data<State>,
|
||||
path: web::Path<String>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
||||
let upath: UPath = path.into_inner().parse().map_err(ErrorBadRequest)?;
|
||||
let entries: Vec<Entry> = list_directory(&state.db, &upath)
|
||||
let entries: Vec<Entry> = list_directory(&connection, &upath)
|
||||
.await
|
||||
.map_err(ErrorNotFound)?; // todo: 500 if actual error occurs
|
||||
|
||||
|
@ -95,19 +91,18 @@ pub async fn get_lookup(
|
|||
state: web::Data<State>,
|
||||
web::Query(info): web::Query<LookupRequest>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let response = state
|
||||
.db
|
||||
.send(crate::database::LookupByFilename { query: info.query })
|
||||
.await?;
|
||||
|
||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
||||
let response = lookup_by_filename(&connection, info.query);
|
||||
Ok(HttpResponse::Ok().json(response.map_err(error::ErrorInternalServerError)?))
|
||||
}
|
||||
|
||||
#[post("/api/refresh")]
|
||||
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
||||
|
||||
actix::spawn(crate::filesystem::reimport_directory(
|
||||
connection,
|
||||
state.directory.clone(),
|
||||
state.db.clone(),
|
||||
state.hasher.clone(),
|
||||
));
|
||||
Ok(HttpResponse::Ok().finish())
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
use log::debug;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct LoggerSink {
|
||||
buffer: Vec<u8>,
|
||||
pub buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl std::io::Write for LoggerSink {
|
||||
|
|
Loading…
Reference in New Issue