rewrite database module as a struct instead of bare fns
parent
b2768cbad9
commit
ce9e552844
|
@ -2,8 +2,6 @@ use std::convert::TryFrom;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use diesel::sqlite::Sqlite;
|
|
||||||
use diesel::Connection;
|
|
||||||
use log::trace;
|
use log::trace;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
@ -15,7 +13,8 @@ use crate::database::constants::{
|
||||||
};
|
};
|
||||||
use crate::database::entry::{Entry, EntryValue};
|
use crate::database::entry::{Entry, EntryValue};
|
||||||
use crate::database::lang::{EntryQuery, Query, QueryComponent, QueryPart};
|
use crate::database::lang::{EntryQuery, Query, QueryComponent, QueryPart};
|
||||||
use crate::database::{insert_entry, query, DbPool};
|
|
||||||
|
use super::UpEndConnection;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
|
||||||
pub struct UNode(String);
|
pub struct UNode(String);
|
||||||
|
@ -95,9 +94,8 @@ impl PointerEntries for Vec<Entry> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec<Address>> {
|
pub fn list_roots(connection: &UpEndConnection) -> Result<Vec<Address>> {
|
||||||
let all_directories: Vec<Entry> = query(
|
let all_directories: Vec<Entry> = connection.query(
|
||||||
connection,
|
|
||||||
Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
||||||
entity: QueryComponent::Any,
|
entity: QueryComponent::Any,
|
||||||
attribute: QueryComponent::Exact(IS_OF_TYPE_ATTR.to_string()),
|
attribute: QueryComponent::Exact(IS_OF_TYPE_ATTR.to_string()),
|
||||||
|
@ -106,8 +104,7 @@ pub fn list_roots<C: Connection<Backend = Sqlite>>(connection: &C) -> Result<Vec
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// TODO: this is horrible
|
// TODO: this is horrible
|
||||||
let directories_with_parents: Vec<Address> = query(
|
let directories_with_parents: Vec<Address> = connection.query(
|
||||||
connection,
|
|
||||||
Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
||||||
entity: QueryComponent::Any,
|
entity: QueryComponent::Any,
|
||||||
attribute: QueryComponent::Exact(HIER_HAS_ATTR.to_string()),
|
attribute: QueryComponent::Exact(HIER_HAS_ATTR.to_string()),
|
||||||
|
@ -130,8 +127,8 @@ lazy_static! {
|
||||||
static ref FETCH_CREATE_LOCK: Mutex<()> = Mutex::new(());
|
static ref FETCH_CREATE_LOCK: Mutex<()> = Mutex::new(());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
pub fn fetch_or_create_dir(
|
||||||
connection: &C,
|
connection: &UpEndConnection,
|
||||||
parent: Option<Address>,
|
parent: Option<Address>,
|
||||||
directory: UNode,
|
directory: UNode,
|
||||||
create: bool,
|
create: bool,
|
||||||
|
@ -146,8 +143,7 @@ pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
||||||
_lock = FETCH_CREATE_LOCK.lock().unwrap();
|
_lock = FETCH_CREATE_LOCK.lock().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
let matching_directories = query(
|
let matching_directories = connection.query(
|
||||||
connection,
|
|
||||||
Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
||||||
entity: QueryComponent::Any,
|
entity: QueryComponent::Any,
|
||||||
attribute: QueryComponent::Exact(String::from(LABEL_ATTR)),
|
attribute: QueryComponent::Exact(String::from(LABEL_ATTR)),
|
||||||
|
@ -160,8 +156,7 @@ pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
||||||
.map(|e: Entry| e.entity);
|
.map(|e: Entry| e.entity);
|
||||||
|
|
||||||
let parent_has: Vec<Address> = match parent.clone() {
|
let parent_has: Vec<Address> = match parent.clone() {
|
||||||
Some(parent) => query(
|
Some(parent) => connection.query(
|
||||||
connection,
|
|
||||||
Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
||||||
entity: QueryComponent::Exact(parent),
|
entity: QueryComponent::Exact(parent),
|
||||||
attribute: QueryComponent::Exact(String::from(HIER_HAS_ATTR)),
|
attribute: QueryComponent::Exact(String::from(HIER_HAS_ATTR)),
|
||||||
|
@ -188,14 +183,14 @@ pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
||||||
attribute: String::from(IS_OF_TYPE_ATTR),
|
attribute: String::from(IS_OF_TYPE_ATTR),
|
||||||
value: EntryValue::Address(HIER_ADDR.clone()),
|
value: EntryValue::Address(HIER_ADDR.clone()),
|
||||||
};
|
};
|
||||||
insert_entry(connection, type_entry)?;
|
connection.insert_entry(type_entry)?;
|
||||||
|
|
||||||
let directory_entry = Entry {
|
let directory_entry = Entry {
|
||||||
entity: new_directory_address.clone(),
|
entity: new_directory_address.clone(),
|
||||||
attribute: String::from(LABEL_ATTR),
|
attribute: String::from(LABEL_ATTR),
|
||||||
value: EntryValue::Value(Value::String(directory.as_ref().clone())),
|
value: EntryValue::Value(Value::String(directory.as_ref().clone())),
|
||||||
};
|
};
|
||||||
insert_entry(connection, directory_entry)?;
|
connection.insert_entry(directory_entry)?;
|
||||||
|
|
||||||
if let Some(parent) = parent {
|
if let Some(parent) = parent {
|
||||||
let has_entry = Entry {
|
let has_entry = Entry {
|
||||||
|
@ -203,7 +198,7 @@ pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
||||||
attribute: String::from(HIER_HAS_ATTR),
|
attribute: String::from(HIER_HAS_ATTR),
|
||||||
value: EntryValue::Address(new_directory_address.clone()),
|
value: EntryValue::Address(new_directory_address.clone()),
|
||||||
};
|
};
|
||||||
insert_entry(connection, has_entry)?;
|
connection.insert_entry(has_entry)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(new_directory_address)
|
Ok(new_directory_address)
|
||||||
|
@ -219,8 +214,8 @@ pub fn fetch_or_create_dir<C: Connection<Backend = Sqlite>>(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn resolve_path<C: Connection<Backend = Sqlite>>(
|
pub fn resolve_path(
|
||||||
connection: &C,
|
connection: &UpEndConnection,
|
||||||
path: &UHierPath,
|
path: &UHierPath,
|
||||||
create: bool,
|
create: bool,
|
||||||
) -> Result<Vec<Address>> {
|
) -> Result<Vec<Address>> {
|
||||||
|
@ -243,8 +238,8 @@ pub fn resolve_path<C: Connection<Backend = Sqlite>>(
|
||||||
|
|
||||||
pub type ResolveCache = LruCache<(Option<Address>, UNode), Address>;
|
pub type ResolveCache = LruCache<(Option<Address>, UNode), Address>;
|
||||||
|
|
||||||
pub fn resolve_path_cached<C: Connection<Backend = Sqlite>>(
|
pub fn resolve_path_cached(
|
||||||
connection: &C,
|
connection: &UpEndConnection,
|
||||||
path: &UHierPath,
|
path: &UHierPath,
|
||||||
create: bool,
|
create: bool,
|
||||||
cache: &Arc<Mutex<ResolveCache>>,
|
cache: &Arc<Mutex<ResolveCache>>,
|
||||||
|
@ -272,10 +267,10 @@ pub fn resolve_path_cached<C: Connection<Backend = Sqlite>>(
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn initialize_hier(pool: &DbPool) -> Result<()> {
|
pub fn initialize_hier(connection: &UpEndConnection) -> Result<()> {
|
||||||
insert_entry(&pool.get()?, Entry::try_from(&*HIER_INVARIANT)?)?;
|
connection.insert_entry(Entry::try_from(&*HIER_INVARIANT)?)?;
|
||||||
upend_insert_addr!(&pool.get()?, HIER_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
upend_insert_addr!(&connection, HIER_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
||||||
upend_insert_val!(&pool.get()?, HIER_ADDR, TYPE_HAS_ATTR, HIER_HAS_ATTR);
|
upend_insert_val!(&connection, HIER_ADDR, TYPE_HAS_ATTR, HIER_HAS_ATTR);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -283,7 +278,7 @@ pub fn initialize_hier(pool: &DbPool) -> Result<()> {
|
||||||
mod tests {
|
mod tests {
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
|
|
||||||
use crate::database::open_upend;
|
use crate::database::UpEndDatabase;
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -331,10 +326,11 @@ mod tests {
|
||||||
fn test_path_manipulation() {
|
fn test_path_manipulation() {
|
||||||
// Initialize database
|
// Initialize database
|
||||||
let temp_dir = TempDir::new("upend-test").unwrap();
|
let temp_dir = TempDir::new("upend-test").unwrap();
|
||||||
let open_result = open_upend(&temp_dir, None, true).unwrap();
|
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
|
||||||
|
let connection = open_result.db.connection().unwrap();
|
||||||
|
|
||||||
let foo_result = fetch_or_create_dir(
|
let foo_result = fetch_or_create_dir(
|
||||||
&open_result.pool.get().unwrap(),
|
&connection,
|
||||||
None,
|
None,
|
||||||
UNode("foo".to_string()),
|
UNode("foo".to_string()),
|
||||||
true,
|
true,
|
||||||
|
@ -343,7 +339,7 @@ mod tests {
|
||||||
let foo_result = foo_result.unwrap();
|
let foo_result = foo_result.unwrap();
|
||||||
|
|
||||||
let bar_result = fetch_or_create_dir(
|
let bar_result = fetch_or_create_dir(
|
||||||
&open_result.pool.get().unwrap(),
|
&connection,
|
||||||
None,
|
None,
|
||||||
UNode("bar".to_string()),
|
UNode("bar".to_string()),
|
||||||
true,
|
true,
|
||||||
|
@ -352,7 +348,7 @@ mod tests {
|
||||||
let bar_result = bar_result.unwrap();
|
let bar_result = bar_result.unwrap();
|
||||||
|
|
||||||
let baz_result = fetch_or_create_dir(
|
let baz_result = fetch_or_create_dir(
|
||||||
&open_result.pool.get().unwrap(),
|
&connection,
|
||||||
Some(bar_result.clone()),
|
Some(bar_result.clone()),
|
||||||
UNode("baz".to_string()),
|
UNode("baz".to_string()),
|
||||||
true,
|
true,
|
||||||
|
@ -360,11 +356,11 @@ mod tests {
|
||||||
assert!(baz_result.is_ok());
|
assert!(baz_result.is_ok());
|
||||||
let baz_result = baz_result.unwrap();
|
let baz_result = baz_result.unwrap();
|
||||||
|
|
||||||
let roots = list_roots(&open_result.pool.get().unwrap());
|
let roots = list_roots(&connection);
|
||||||
assert_eq!(roots.unwrap(), [foo_result, bar_result.clone()]);
|
assert_eq!(roots.unwrap(), [foo_result, bar_result.clone()]);
|
||||||
|
|
||||||
let resolve_result = resolve_path(
|
let resolve_result = resolve_path(
|
||||||
&open_result.pool.get().unwrap(),
|
&connection,
|
||||||
&"bar/baz".parse().unwrap(),
|
&"bar/baz".parse().unwrap(),
|
||||||
false,
|
false,
|
||||||
);
|
);
|
||||||
|
@ -376,21 +372,21 @@ mod tests {
|
||||||
);
|
);
|
||||||
|
|
||||||
let resolve_result = resolve_path(
|
let resolve_result = resolve_path(
|
||||||
&open_result.pool.get().unwrap(),
|
&connection,
|
||||||
&"bar/baz/bax".parse().unwrap(),
|
&"bar/baz/bax".parse().unwrap(),
|
||||||
false,
|
false,
|
||||||
);
|
);
|
||||||
assert!(resolve_result.is_err());
|
assert!(resolve_result.is_err());
|
||||||
|
|
||||||
let resolve_result = resolve_path(
|
let resolve_result = resolve_path(
|
||||||
&open_result.pool.get().unwrap(),
|
&connection,
|
||||||
&"bar/baz/bax".parse().unwrap(),
|
&"bar/baz/bax".parse().unwrap(),
|
||||||
true,
|
true,
|
||||||
);
|
);
|
||||||
assert!(resolve_result.is_ok());
|
assert!(resolve_result.is_ok());
|
||||||
|
|
||||||
let bax_result = fetch_or_create_dir(
|
let bax_result = fetch_or_create_dir(
|
||||||
&open_result.pool.get().unwrap(),
|
&connection,
|
||||||
Some(baz_result.clone()),
|
Some(baz_result.clone()),
|
||||||
UNode("bax".to_string()),
|
UNode("bax".to_string()),
|
||||||
false,
|
false,
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
macro_rules! upend_insert_val {
|
macro_rules! upend_insert_val {
|
||||||
($db_connection:expr, $entity:expr, $attribute:expr, $value:expr) => {{
|
($db_connection:expr, $entity:expr, $attribute:expr, $value:expr) => {{
|
||||||
insert_entry(
|
$db_connection.insert_entry(
|
||||||
$db_connection,
|
|
||||||
Entry {
|
Entry {
|
||||||
entity: $entity.clone(),
|
entity: $entity.clone(),
|
||||||
attribute: String::from($attribute),
|
attribute: String::from($attribute),
|
||||||
|
@ -13,8 +12,7 @@ macro_rules! upend_insert_val {
|
||||||
|
|
||||||
macro_rules! upend_insert_addr {
|
macro_rules! upend_insert_addr {
|
||||||
($db_connection:expr, $entity:expr, $attribute:expr, $addr:expr) => {{
|
($db_connection:expr, $entity:expr, $attribute:expr, $addr:expr) => {{
|
||||||
insert_entry(
|
$db_connection.insert_entry(
|
||||||
$db_connection,
|
|
||||||
Entry {
|
Entry {
|
||||||
entity: $entity.clone(),
|
entity: $entity.clone(),
|
||||||
attribute: String::from($attribute),
|
attribute: String::from($attribute),
|
||||||
|
|
|
@ -21,9 +21,9 @@ use anyhow::{anyhow, Result};
|
||||||
use chrono::NaiveDateTime;
|
use chrono::NaiveDateTime;
|
||||||
use diesel::debug_query;
|
use diesel::debug_query;
|
||||||
use diesel::prelude::*;
|
use diesel::prelude::*;
|
||||||
use diesel::r2d2::{self, ConnectionManager};
|
use diesel::r2d2::{self, ConnectionManager, PooledConnection};
|
||||||
use diesel::result::{DatabaseErrorKind, Error};
|
use diesel::result::{DatabaseErrorKind, Error};
|
||||||
use diesel::sqlite::{Sqlite, SqliteConnection};
|
use diesel::sqlite::SqliteConnection;
|
||||||
use hierarchies::initialize_hier;
|
use hierarchies::initialize_hier;
|
||||||
use log::{debug, trace};
|
use log::{debug, trace};
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
@ -31,184 +31,6 @@ use std::fs;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
pub fn insert_file<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
file: models::NewFile,
|
|
||||||
) -> Result<usize> {
|
|
||||||
use crate::database::inner::schema::files;
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"Inserting {} ({})...",
|
|
||||||
&file.path,
|
|
||||||
Address::Hash(Hash((&file.hash).clone()))
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(diesel::insert_into(files::table)
|
|
||||||
.values(file)
|
|
||||||
.execute(connection)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn retrieve_file<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
obj_hash: Hash,
|
|
||||||
) -> Result<Vec<models::File>> {
|
|
||||||
use crate::database::inner::schema::files::dsl::*;
|
|
||||||
|
|
||||||
let matches = files
|
|
||||||
.filter(valid.eq(true))
|
|
||||||
.filter(hash.eq(obj_hash.0))
|
|
||||||
.load::<models::File>(connection)?;
|
|
||||||
|
|
||||||
Ok(matches)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn retrieve_all_files<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
) -> Result<Vec<models::File>> {
|
|
||||||
use crate::database::inner::schema::files::dsl::*;
|
|
||||||
let matches = files.load::<models::File>(connection)?;
|
|
||||||
Ok(matches)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_latest_files<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
count: i64,
|
|
||||||
) -> Result<Vec<models::File>> {
|
|
||||||
use crate::database::inner::schema::files::dsl::*;
|
|
||||||
|
|
||||||
let matches = files
|
|
||||||
.order_by(added.desc())
|
|
||||||
.limit(count)
|
|
||||||
.load::<models::File>(connection)?;
|
|
||||||
|
|
||||||
Ok(matches)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn file_update_mtime<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
file_id: i32,
|
|
||||||
m_time: Option<NaiveDateTime>,
|
|
||||||
) -> Result<usize> {
|
|
||||||
use crate::database::inner::schema::files::dsl::*;
|
|
||||||
|
|
||||||
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
|
||||||
|
|
||||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
|
||||||
.set(mtime.eq(m_time))
|
|
||||||
.execute(connection)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn file_set_valid<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
file_id: i32,
|
|
||||||
is_valid: bool,
|
|
||||||
) -> Result<usize> {
|
|
||||||
use crate::database::inner::schema::files::dsl::*;
|
|
||||||
|
|
||||||
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
|
||||||
|
|
||||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
|
||||||
.set(valid.eq(is_valid))
|
|
||||||
.execute(connection)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn retrieve_object<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
object_address: Address,
|
|
||||||
) -> Result<Vec<Entry>> {
|
|
||||||
use crate::database::inner::schema::data::dsl::*;
|
|
||||||
|
|
||||||
let primary = data
|
|
||||||
.filter(entity.eq(object_address.encode()?))
|
|
||||||
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?))
|
|
||||||
.load::<models::Entry>(connection)?;
|
|
||||||
|
|
||||||
let entries = primary
|
|
||||||
.iter()
|
|
||||||
.map(Entry::try_from)
|
|
||||||
.collect::<Result<Vec<Entry>>>()?;
|
|
||||||
|
|
||||||
let secondary = data
|
|
||||||
.filter(
|
|
||||||
entity.eq_any(
|
|
||||||
entries
|
|
||||||
.iter()
|
|
||||||
.map(|e| e.address())
|
|
||||||
.filter_map(Result::ok)
|
|
||||||
.map(|addr| addr.encode())
|
|
||||||
.collect::<Result<Vec<Vec<u8>>>>()?,
|
|
||||||
),
|
|
||||||
)
|
|
||||||
.load::<models::Entry>(connection)?;
|
|
||||||
|
|
||||||
let secondary_entries = secondary
|
|
||||||
.iter()
|
|
||||||
.map(Entry::try_from)
|
|
||||||
.collect::<Result<Vec<Entry>>>()?;
|
|
||||||
|
|
||||||
Ok([entries, secondary_entries].concat())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn remove_object<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
object_address: Address,
|
|
||||||
) -> Result<usize> {
|
|
||||||
use crate::database::inner::schema::data::dsl::*;
|
|
||||||
|
|
||||||
debug!("Deleting {}!", object_address);
|
|
||||||
|
|
||||||
let matches = data
|
|
||||||
.filter(identity.eq(object_address.encode()?))
|
|
||||||
.or_filter(entity.eq(object_address.encode()?))
|
|
||||||
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?));
|
|
||||||
|
|
||||||
Ok(diesel::delete(matches).execute(connection)?)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn query<C: Connection<Backend = Sqlite>>(connection: &C, query: Query) -> Result<Vec<Entry>> {
|
|
||||||
use crate::database::inner::schema::data::dsl::*;
|
|
||||||
|
|
||||||
trace!("Querying: {:?}", query);
|
|
||||||
|
|
||||||
let db_query = data.filter(query.to_sqlite_predicates()?);
|
|
||||||
|
|
||||||
trace!("DB query: {}", debug_query(&db_query));
|
|
||||||
|
|
||||||
let matches = db_query.load::<models::Entry>(connection)?;
|
|
||||||
|
|
||||||
let entries = matches
|
|
||||||
.iter()
|
|
||||||
.map(Entry::try_from)
|
|
||||||
.filter_map(Result::ok)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Ok(entries)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn insert_entry<C: Connection<Backend = Sqlite>>(
|
|
||||||
connection: &C,
|
|
||||||
entry: Entry,
|
|
||||||
) -> Result<Address> {
|
|
||||||
debug!("Inserting: {}", entry);
|
|
||||||
|
|
||||||
let insert_entry = models::Entry::try_from(&entry)?;
|
|
||||||
|
|
||||||
let entry = Entry::try_from(&insert_entry)?;
|
|
||||||
|
|
||||||
let result = diesel::insert_into(data::table)
|
|
||||||
.values(insert_entry)
|
|
||||||
.execute(connection);
|
|
||||||
|
|
||||||
if let Some(error) = result.err() {
|
|
||||||
match error {
|
|
||||||
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {}
|
|
||||||
_ => return Err(anyhow!(error)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Address::Hash(entry.hash()?))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ConnectionOptions {
|
pub struct ConnectionOptions {
|
||||||
pub enable_foreign_keys: bool,
|
pub enable_foreign_keys: bool,
|
||||||
|
@ -242,78 +64,260 @@ impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
||||||
|
|
||||||
pub struct OpenResult {
|
pub struct OpenResult {
|
||||||
pub pool: DbPool,
|
pub db: UpEndDatabase,
|
||||||
pub new: bool,
|
pub new: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct UpEndDatabase {
|
||||||
|
pool: DbPool,
|
||||||
|
pub vault_path: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
pub const UPEND_SUBDIR: &str = ".upend";
|
pub const UPEND_SUBDIR: &str = ".upend";
|
||||||
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
|
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
|
||||||
|
|
||||||
pub fn open_upend<P: AsRef<Path>>(
|
impl UpEndDatabase {
|
||||||
dirpath: P,
|
pub fn open<P: AsRef<Path>>(
|
||||||
db_path: Option<PathBuf>,
|
dirpath: P,
|
||||||
reinitialize: bool,
|
db_path: Option<PathBuf>,
|
||||||
) -> Result<OpenResult> {
|
reinitialize: bool,
|
||||||
embed_migrations!("./migrations/upend/");
|
) -> Result<OpenResult> {
|
||||||
|
embed_migrations!("./migrations/upend/");
|
||||||
|
|
||||||
let upend_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(UPEND_SUBDIR));
|
let upend_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(UPEND_SUBDIR));
|
||||||
|
|
||||||
if reinitialize {
|
if reinitialize {
|
||||||
trace!("Reinitializing - removing previous database...");
|
trace!("Reinitializing - removing previous database...");
|
||||||
let _ = fs::remove_dir_all(&upend_path);
|
let _ = fs::remove_dir_all(&upend_path);
|
||||||
}
|
}
|
||||||
let new = !upend_path.exists();
|
let new = !upend_path.exists();
|
||||||
|
|
||||||
if new {
|
if new {
|
||||||
trace!("Creating UpEnd subdirectory...");
|
trace!("Creating UpEnd subdirectory...");
|
||||||
fs::create_dir(&upend_path)?;
|
fs::create_dir(&upend_path)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!("Creating pool.");
|
||||||
|
let manager = ConnectionManager::<SqliteConnection>::new(
|
||||||
|
upend_path.join(DATABASE_FILENAME).to_str().unwrap(),
|
||||||
|
);
|
||||||
|
let pool = r2d2::Pool::builder()
|
||||||
|
.connection_customizer(Box::new(ConnectionOptions {
|
||||||
|
enable_foreign_keys: true,
|
||||||
|
busy_timeout: Some(Duration::from_secs(30)),
|
||||||
|
}))
|
||||||
|
.build(manager)?;
|
||||||
|
|
||||||
|
let db = UpEndDatabase {
|
||||||
|
pool,
|
||||||
|
vault_path: PathBuf::from(dirpath.as_ref()),
|
||||||
|
};
|
||||||
|
let connection = db.connection().unwrap();
|
||||||
|
|
||||||
|
let enable_wal_mode = true;
|
||||||
|
connection.execute(if enable_wal_mode {
|
||||||
|
trace!("Enabling WAL journal mode & truncating WAL log...");
|
||||||
|
"PRAGMA journal_mode = WAL;PRAGMA wal_checkpoint(TRUNCATE);"
|
||||||
|
} else {
|
||||||
|
trace!("Enabling TRUNCATE journal mode");
|
||||||
|
"PRAGMA journal_mode = TRUNCATE;"
|
||||||
|
})?;
|
||||||
|
|
||||||
|
trace!("Pool created, running migrations...");
|
||||||
|
|
||||||
|
embedded_migrations::run_with_output(
|
||||||
|
&db.pool.get()?,
|
||||||
|
&mut LoggerSink {
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
trace!("Initializing types...");
|
||||||
|
connection.insert_entry(Entry::try_from(&*TYPE_INVARIANT)?)?;
|
||||||
|
upend_insert_addr!(&connection, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
||||||
|
upend_insert_val!(&connection, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_HAS_ATTR);
|
||||||
|
|
||||||
|
initialize_hier(&connection)?;
|
||||||
|
|
||||||
|
Ok(OpenResult { db, new })
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!("Creating pool.");
|
pub fn connection(self: &Self) -> Result<UpEndConnection> {
|
||||||
let manager = ConnectionManager::<SqliteConnection>::new(
|
Ok(UpEndConnection(self.pool.get()?))
|
||||||
upend_path.join(DATABASE_FILENAME).to_str().unwrap(),
|
}
|
||||||
);
|
|
||||||
let pool = r2d2::Pool::builder()
|
|
||||||
.connection_customizer(Box::new(ConnectionOptions {
|
|
||||||
enable_foreign_keys: true,
|
|
||||||
busy_timeout: Some(Duration::from_secs(30)),
|
|
||||||
}))
|
|
||||||
.build(manager)?;
|
|
||||||
|
|
||||||
let enable_wal_mode = true;
|
|
||||||
pool.get().unwrap().execute(if enable_wal_mode {
|
|
||||||
trace!("Enabling WAL journal mode & truncating WAL log...");
|
|
||||||
"PRAGMA journal_mode = WAL;PRAGMA wal_checkpoint(TRUNCATE);"
|
|
||||||
} else {
|
|
||||||
trace!("Enabling TRUNCATE journal mode");
|
|
||||||
"PRAGMA journal_mode = TRUNCATE;"
|
|
||||||
})?;
|
|
||||||
|
|
||||||
trace!("Pool created, running migrations...");
|
|
||||||
|
|
||||||
embedded_migrations::run_with_output(
|
|
||||||
&pool.get()?,
|
|
||||||
&mut LoggerSink {
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
|
|
||||||
trace!("Initializing types...");
|
|
||||||
|
|
||||||
initialize_types(&pool)?;
|
|
||||||
initialize_hier(&pool)?;
|
|
||||||
|
|
||||||
Ok(OpenResult { pool, new })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn initialize_types(pool: &DbPool) -> Result<()> {
|
pub struct UpEndConnection(PooledConnection<ConnectionManager<SqliteConnection>>);
|
||||||
insert_entry(&pool.get()?, Entry::try_from(&*TYPE_INVARIANT)?)?;
|
|
||||||
upend_insert_addr!(&pool.get()?, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
impl UpEndConnection {
|
||||||
upend_insert_val!(&pool.get()?, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_HAS_ATTR);
|
pub fn execute<S: AsRef<str>>(self: &Self, query: S) -> Result<usize, diesel::result::Error> {
|
||||||
Ok(())
|
self.0.execute(query.as_ref())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
|
||||||
|
where
|
||||||
|
F: FnOnce() -> Result<T, E>,
|
||||||
|
E: From<Error>,
|
||||||
|
{
|
||||||
|
self.0.transaction(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_file(self: &Self, file: models::NewFile) -> Result<usize> {
|
||||||
|
use crate::database::inner::schema::files;
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"Inserting {} ({})...",
|
||||||
|
&file.path,
|
||||||
|
Address::Hash(Hash((&file.hash).clone()))
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(diesel::insert_into(files::table)
|
||||||
|
.values(file)
|
||||||
|
.execute(&self.0)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn retrieve_file(self: &Self, obj_hash: Hash) -> Result<Vec<models::File>> {
|
||||||
|
use crate::database::inner::schema::files::dsl::*;
|
||||||
|
|
||||||
|
let matches = files
|
||||||
|
.filter(valid.eq(true))
|
||||||
|
.filter(hash.eq(obj_hash.0))
|
||||||
|
.load::<models::File>(&self.0)?;
|
||||||
|
|
||||||
|
Ok(matches)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn retrieve_all_files(self: &Self) -> Result<Vec<models::File>> {
|
||||||
|
use crate::database::inner::schema::files::dsl::*;
|
||||||
|
let matches = files.load::<models::File>(&self.0)?;
|
||||||
|
Ok(matches)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_latest_files(self: &Self, count: i64) -> Result<Vec<models::File>> {
|
||||||
|
use crate::database::inner::schema::files::dsl::*;
|
||||||
|
|
||||||
|
let matches = files
|
||||||
|
.order_by(added.desc())
|
||||||
|
.limit(count)
|
||||||
|
.load::<models::File>(&self.0)?;
|
||||||
|
|
||||||
|
Ok(matches)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn file_update_mtime(
|
||||||
|
self: &Self,
|
||||||
|
file_id: i32,
|
||||||
|
m_time: Option<NaiveDateTime>,
|
||||||
|
) -> Result<usize> {
|
||||||
|
use crate::database::inner::schema::files::dsl::*;
|
||||||
|
|
||||||
|
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
||||||
|
|
||||||
|
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||||
|
.set(mtime.eq(m_time))
|
||||||
|
.execute(&self.0)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn file_set_valid(self: &Self, file_id: i32, is_valid: bool) -> Result<usize> {
|
||||||
|
use crate::database::inner::schema::files::dsl::*;
|
||||||
|
|
||||||
|
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
||||||
|
|
||||||
|
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||||
|
.set(valid.eq(is_valid))
|
||||||
|
.execute(&self.0)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn retrieve_object(self: &Self, object_address: Address) -> Result<Vec<Entry>> {
|
||||||
|
use crate::database::inner::schema::data::dsl::*;
|
||||||
|
|
||||||
|
let primary = data
|
||||||
|
.filter(entity.eq(object_address.encode()?))
|
||||||
|
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?))
|
||||||
|
.load::<models::Entry>(&self.0)?;
|
||||||
|
|
||||||
|
let entries = primary
|
||||||
|
.iter()
|
||||||
|
.map(Entry::try_from)
|
||||||
|
.collect::<Result<Vec<Entry>>>()?;
|
||||||
|
|
||||||
|
let secondary = data
|
||||||
|
.filter(
|
||||||
|
entity.eq_any(
|
||||||
|
entries
|
||||||
|
.iter()
|
||||||
|
.map(|e| e.address())
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.map(|addr| addr.encode())
|
||||||
|
.collect::<Result<Vec<Vec<u8>>>>()?,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.load::<models::Entry>(&self.0)?;
|
||||||
|
|
||||||
|
let secondary_entries = secondary
|
||||||
|
.iter()
|
||||||
|
.map(Entry::try_from)
|
||||||
|
.collect::<Result<Vec<Entry>>>()?;
|
||||||
|
|
||||||
|
Ok([entries, secondary_entries].concat())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove_object(self: &Self, object_address: Address) -> Result<usize> {
|
||||||
|
use crate::database::inner::schema::data::dsl::*;
|
||||||
|
|
||||||
|
debug!("Deleting {}!", object_address);
|
||||||
|
|
||||||
|
let matches = data
|
||||||
|
.filter(identity.eq(object_address.encode()?))
|
||||||
|
.or_filter(entity.eq(object_address.encode()?))
|
||||||
|
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?));
|
||||||
|
|
||||||
|
Ok(diesel::delete(matches).execute(&self.0)?)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn query(self: &Self, query: Query) -> Result<Vec<Entry>> {
|
||||||
|
use crate::database::inner::schema::data::dsl::*;
|
||||||
|
|
||||||
|
trace!("Querying: {:?}", query);
|
||||||
|
|
||||||
|
let db_query = data.filter(query.to_sqlite_predicates()?);
|
||||||
|
|
||||||
|
trace!("DB query: {}", debug_query(&db_query));
|
||||||
|
|
||||||
|
let matches = db_query.load::<models::Entry>(&self.0)?;
|
||||||
|
|
||||||
|
let entries = matches
|
||||||
|
.iter()
|
||||||
|
.map(Entry::try_from)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok(entries)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert_entry(self: &Self, entry: Entry) -> Result<Address> {
|
||||||
|
debug!("Inserting: {}", entry);
|
||||||
|
|
||||||
|
let insert_entry = models::Entry::try_from(&entry)?;
|
||||||
|
|
||||||
|
let entry = Entry::try_from(&insert_entry)?;
|
||||||
|
|
||||||
|
let result = diesel::insert_into(data::table)
|
||||||
|
.values(insert_entry)
|
||||||
|
.execute(&self.0);
|
||||||
|
|
||||||
|
if let Some(error) = result.err() {
|
||||||
|
match error {
|
||||||
|
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {}
|
||||||
|
_ => return Err(anyhow!(error)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Address::Hash(entry.hash()?))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -322,7 +326,8 @@ mod test {
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_open() -> Result<(), anyhow::Error> {
|
fn test_open() {
|
||||||
open_upend(TempDir::new("upend-test").unwrap(), None, true).map(|_| ())
|
let result = UpEndDatabase::open(TempDir::new("upend-test").unwrap(), None, false);
|
||||||
|
assert!(result.is_ok());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::borrow::Borrow;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::path::{Component, Path, PathBuf};
|
use std::path::{Component, Path, PathBuf};
|
||||||
use std::sync::{Arc, Mutex, RwLock};
|
use std::sync::{Arc, Mutex, RwLock};
|
||||||
|
@ -9,17 +10,13 @@ use crate::database::constants::{
|
||||||
HIER_HAS_ATTR, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, TYPE_HAS_ATTR,
|
HIER_HAS_ATTR, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_BASE_ATTR, TYPE_HAS_ATTR,
|
||||||
};
|
};
|
||||||
use crate::database::entry::{Entry, EntryValue, InvariantEntry};
|
use crate::database::entry::{Entry, EntryValue, InvariantEntry};
|
||||||
use crate::database::hierarchies::{resolve_path_cached, ResolveCache, UNode, UHierPath};
|
use crate::database::hierarchies::{resolve_path_cached, ResolveCache, UHierPath, UNode};
|
||||||
use crate::database::inner::models;
|
use crate::database::inner::models;
|
||||||
use crate::database::{
|
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
|
||||||
file_set_valid, file_update_mtime, insert_entry, insert_file, retrieve_all_files, DbPool,
|
|
||||||
UPEND_SUBDIR,
|
|
||||||
};
|
|
||||||
use crate::util::hash::{Hash, Hashable};
|
use crate::util::hash::{Hash, Hashable};
|
||||||
use crate::util::jobs::{Job, JobContainer, JobId, State};
|
use crate::util::jobs::{Job, JobContainer, JobId, State};
|
||||||
use anyhow::{Error, Result};
|
use anyhow::{Error, Result};
|
||||||
use chrono::prelude::*;
|
use chrono::prelude::*;
|
||||||
use diesel::Connection;
|
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
|
@ -40,22 +37,18 @@ lazy_static! {
|
||||||
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
static ref BLOB_TYPE_ADDR: Address = BLOB_TYPE_INVARIANT.entity().unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
fn initialize_types(pool: &DbPool) -> Result<()> {
|
fn initialize_types(connection: &UpEndConnection) -> Result<()> {
|
||||||
// BLOB_TYPE
|
// BLOB_TYPE
|
||||||
insert_entry(&pool.get()?, Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
|
connection.insert_entry(Entry::try_from(&*BLOB_TYPE_INVARIANT)?)?;
|
||||||
upend_insert_addr!(&pool.get()?, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
upend_insert_addr!(&connection, BLOB_TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
||||||
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY);
|
upend_insert_val!(&connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MTIME_KEY);
|
||||||
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY);
|
upend_insert_val!(&connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_SIZE_KEY);
|
||||||
upend_insert_val!(&pool.get()?, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY);
|
upend_insert_val!(&connection, BLOB_TYPE_ADDR, TYPE_HAS_ATTR, FILE_MIME_KEY);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn rescan_vault(
|
pub async fn rescan_vault(db: Arc<UpEndDatabase>, job_container: Arc<RwLock<JobContainer>>) {
|
||||||
pool: DbPool,
|
|
||||||
directory: PathBuf,
|
|
||||||
job_container: Arc<RwLock<JobContainer>>,
|
|
||||||
) {
|
|
||||||
let job_id = job_container
|
let job_id = job_container
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -64,8 +57,7 @@ pub async fn rescan_vault(
|
||||||
|
|
||||||
let job_container_rescan = job_container.clone();
|
let job_container_rescan = job_container.clone();
|
||||||
let result =
|
let result =
|
||||||
actix_web::web::block(move || _rescan_vault(pool, directory, job_container_rescan, job_id))
|
actix_web::web::block(move || _rescan_vault(db, job_container_rescan, job_id)).await;
|
||||||
.await;
|
|
||||||
|
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
let err = result.err().unwrap();
|
let err = result.err().unwrap();
|
||||||
|
@ -78,18 +70,12 @@ pub async fn rescan_vault(
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
struct PragmaSynchronousGuard<'a>(&'a DbPool);
|
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
|
||||||
|
|
||||||
impl Drop for PragmaSynchronousGuard<'_> {
|
impl Drop for PragmaSynchronousGuard<'_> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
debug!("Re-enabling synchronous mode.");
|
debug!("Re-enabling synchronous mode.");
|
||||||
let connection = self.0.get();
|
let res = self.0.execute("PRAGMA synchronous = NORMAL;");
|
||||||
let res: Result<_, String> = match connection {
|
|
||||||
Ok(connection) => connection
|
|
||||||
.execute("PRAGMA synchronous = NORMAL;")
|
|
||||||
.map_err(|err| format!("{}", err)),
|
|
||||||
Err(err) => Err(format!("{}", err)),
|
|
||||||
};
|
|
||||||
if let Err(err) = res {
|
if let Err(err) = res {
|
||||||
error!(
|
error!(
|
||||||
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
|
"Error setting synchronous mode back to NORMAL! Data loss possible! {}",
|
||||||
|
@ -109,28 +95,30 @@ enum UpdatePathOutcome {
|
||||||
Failed(PathBuf, Error),
|
Failed(PathBuf, Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
fn _rescan_vault<T: AsRef<Path>>(
|
fn _rescan_vault<D: Borrow<UpEndDatabase>>(
|
||||||
pool: DbPool,
|
db: D,
|
||||||
directory: T,
|
|
||||||
job_container: Arc<RwLock<JobContainer>>,
|
job_container: Arc<RwLock<JobContainer>>,
|
||||||
job_id: JobId,
|
job_id: JobId,
|
||||||
) -> Result<Vec<UpdatePathOutcome>> {
|
) -> Result<Vec<UpdatePathOutcome>> {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
info!("Vault rescan started.");
|
info!("Vault rescan started.");
|
||||||
|
|
||||||
|
let db = db.borrow();
|
||||||
|
let connection = db.connection()?;
|
||||||
|
|
||||||
// Initialize types, etc...
|
// Initialize types, etc...
|
||||||
debug!("Initializing DB types.");
|
debug!("Initializing DB types.");
|
||||||
initialize_types(&pool)?;
|
initialize_types(&connection)?;
|
||||||
|
|
||||||
// Disable syncing in SQLite for the duration of the import
|
// Disable syncing in SQLite for the duration of the import
|
||||||
debug!("Disabling SQLite synchronous mode");
|
debug!("Disabling SQLite synchronous mode");
|
||||||
pool.get()?.execute("PRAGMA synchronous = OFF;")?;
|
connection.execute("PRAGMA synchronous = OFF;")?;
|
||||||
let _guard = PragmaSynchronousGuard(&pool);
|
let _guard = PragmaSynchronousGuard(&connection);
|
||||||
|
|
||||||
// Walk through the vault, find all paths
|
// Walk through the vault, find all paths
|
||||||
debug!("Traversing vault directory");
|
debug!("Traversing vault directory");
|
||||||
let absolute_dir_path = fs::canonicalize(&directory)?;
|
let absolute_dir_path = fs::canonicalize(&db.vault_path)?;
|
||||||
let path_entries: Vec<PathBuf> = WalkDir::new(&directory)
|
let path_entries: Vec<PathBuf> = WalkDir::new(&db.vault_path)
|
||||||
.follow_links(true)
|
.follow_links(true)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|e| e.ok())
|
.filter_map(|e| e.ok())
|
||||||
|
@ -140,8 +128,7 @@ fn _rescan_vault<T: AsRef<Path>>(
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
// Prepare for processing
|
// Prepare for processing
|
||||||
let rw_pool = Arc::new(RwLock::new(pool.clone()));
|
let existing_files = Arc::new(RwLock::new(connection.retrieve_all_files()?));
|
||||||
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
|
|
||||||
|
|
||||||
// Actual processing
|
// Actual processing
|
||||||
let count = RwLock::new(0_usize);
|
let count = RwLock::new(0_usize);
|
||||||
|
@ -151,7 +138,7 @@ fn _rescan_vault<T: AsRef<Path>>(
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|path| {
|
.map(|path| {
|
||||||
let result = _process_directory_entry(
|
let result = _process_directory_entry(
|
||||||
&rw_pool,
|
db.connection().unwrap(),
|
||||||
&resolve_cache,
|
&resolve_cache,
|
||||||
path.clone(),
|
path.clone(),
|
||||||
&absolute_dir_path,
|
&absolute_dir_path,
|
||||||
|
@ -178,10 +165,9 @@ fn _rescan_vault<T: AsRef<Path>>(
|
||||||
|
|
||||||
let existing_files = existing_files.read().unwrap();
|
let existing_files = existing_files.read().unwrap();
|
||||||
|
|
||||||
let connection = pool.get()?;
|
|
||||||
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
|
let cleanup_results = existing_files.iter().filter(|f| f.valid).map(|file| {
|
||||||
let trans_result = connection.transaction::<_, Error, _>(|| {
|
let trans_result = connection.transaction::<_, Error, _>(|| {
|
||||||
file_set_valid(&connection, file.id, false)?;
|
connection.file_set_valid(file.id, false)?;
|
||||||
// remove_object(&connection, )?
|
// remove_object(&connection, )?
|
||||||
Ok(())
|
Ok(())
|
||||||
});
|
});
|
||||||
|
@ -225,8 +211,8 @@ fn _rescan_vault<T: AsRef<Path>>(
|
||||||
drop(_guard);
|
drop(_guard);
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Finished updating {} ({} created, {} deleted, {} left unchanged). Took {}s.",
|
"Finished updating {:?} ({} created, {} deleted, {} left unchanged). Took {}s.",
|
||||||
directory.as_ref().display(),
|
db.vault_path,
|
||||||
created,
|
created,
|
||||||
deleted,
|
deleted,
|
||||||
unchanged,
|
unchanged,
|
||||||
|
@ -237,7 +223,7 @@ fn _rescan_vault<T: AsRef<Path>>(
|
||||||
}
|
}
|
||||||
|
|
||||||
fn _process_directory_entry<P: AsRef<Path>>(
|
fn _process_directory_entry<P: AsRef<Path>>(
|
||||||
db_pool: &Arc<RwLock<DbPool>>,
|
connection: UpEndConnection,
|
||||||
resolve_cache: &Arc<Mutex<ResolveCache>>,
|
resolve_cache: &Arc<Mutex<ResolveCache>>,
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
directory_path: &P,
|
directory_path: &P,
|
||||||
|
@ -246,7 +232,6 @@ fn _process_directory_entry<P: AsRef<Path>>(
|
||||||
debug!("Processing: {:?}", path);
|
debug!("Processing: {:?}", path);
|
||||||
|
|
||||||
// Prepare the data
|
// Prepare the data
|
||||||
let connection = &db_pool.write().unwrap().get()?;
|
|
||||||
let existing_files = Arc::clone(existing_files);
|
let existing_files = Arc::clone(existing_files);
|
||||||
|
|
||||||
let normalized_path = path.strip_prefix(&directory_path)?;
|
let normalized_path = path.strip_prefix(&directory_path)?;
|
||||||
|
@ -288,11 +273,11 @@ fn _process_directory_entry<P: AsRef<Path>>(
|
||||||
|
|
||||||
if same_mtime || same_hash {
|
if same_mtime || same_hash {
|
||||||
if mtime != existing_file.mtime {
|
if mtime != existing_file.mtime {
|
||||||
file_update_mtime(connection, existing_file.id, mtime)?;
|
connection.file_update_mtime(existing_file.id, mtime)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if !existing_file.valid {
|
if !existing_file.valid {
|
||||||
file_set_valid(connection, existing_file.id, true)?;
|
connection.file_set_valid(existing_file.id, true)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut existing_files_write = existing_files.write().unwrap();
|
let mut existing_files_write = existing_files.write().unwrap();
|
||||||
|
@ -357,22 +342,22 @@ fn _process_directory_entry<P: AsRef<Path>>(
|
||||||
}))
|
}))
|
||||||
.collect(),
|
.collect(),
|
||||||
);
|
);
|
||||||
let resolved_path = resolve_path_cached(connection, &upath, true, resolve_cache)?;
|
let resolved_path = resolve_path_cached(&connection, &upath, true, resolve_cache)?;
|
||||||
let parent_dir = resolved_path.last().unwrap();
|
let parent_dir = resolved_path.last().unwrap();
|
||||||
|
|
||||||
connection.transaction::<_, Error, _>(|| {
|
connection.transaction::<_, Error, _>(|| {
|
||||||
insert_file(connection, new_file)?;
|
connection.insert_file(new_file)?;
|
||||||
|
|
||||||
insert_entry(connection, type_entry)?;
|
connection.insert_entry(type_entry)?;
|
||||||
insert_entry(connection, size_entry)?;
|
connection.insert_entry(size_entry)?;
|
||||||
insert_entry(connection, mime_entry)?;
|
connection.insert_entry(mime_entry)?;
|
||||||
|
|
||||||
let dir_has_entry = Entry {
|
let dir_has_entry = Entry {
|
||||||
entity: parent_dir.clone(),
|
entity: parent_dir.clone(),
|
||||||
attribute: HIER_HAS_ATTR.to_string(),
|
attribute: HIER_HAS_ATTR.to_string(),
|
||||||
value: EntryValue::Address(Address::Hash(file_hash.clone())),
|
value: EntryValue::Address(Address::Hash(file_hash.clone())),
|
||||||
};
|
};
|
||||||
let dir_has_entry_addr = insert_entry(connection, dir_has_entry)?;
|
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
|
||||||
|
|
||||||
let name_entry = Entry {
|
let name_entry = Entry {
|
||||||
entity: dir_has_entry_addr,
|
entity: dir_has_entry_addr,
|
||||||
|
@ -381,7 +366,7 @@ fn _process_directory_entry<P: AsRef<Path>>(
|
||||||
filename.as_os_str().to_string_lossy().to_string(),
|
filename.as_os_str().to_string_lossy().to_string(),
|
||||||
)),
|
)),
|
||||||
};
|
};
|
||||||
insert_entry(connection, name_entry)?;
|
connection.insert_entry(name_entry)?;
|
||||||
|
|
||||||
info!("Added: {:?}", path);
|
info!("Added: {:?}", path);
|
||||||
Ok(UpdatePathOutcome::Added(path.clone()))
|
Ok(UpdatePathOutcome::Added(path.clone()))
|
||||||
|
@ -390,7 +375,7 @@ fn _process_directory_entry<P: AsRef<Path>>(
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use crate::database::open_upend;
|
use crate::database::UpEndDatabase;
|
||||||
use crate::util;
|
use crate::util;
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
@ -416,7 +401,7 @@ mod test {
|
||||||
|
|
||||||
// Initialize database
|
// Initialize database
|
||||||
|
|
||||||
let open_result = open_upend(&temp_dir, None, true).unwrap();
|
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
|
||||||
let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default()));
|
let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default()));
|
||||||
let job_id = job_container
|
let job_id = job_container
|
||||||
.write()
|
.write()
|
||||||
|
@ -425,12 +410,7 @@ mod test {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
// Initial scan
|
// Initial scan
|
||||||
let rescan_result = _rescan_vault(
|
let rescan_result = _rescan_vault(&open_result.db, job_container.clone(), job_id);
|
||||||
open_result.pool.clone(),
|
|
||||||
temp_dir.as_ref().to_path_buf(),
|
|
||||||
job_container.clone(),
|
|
||||||
job_id,
|
|
||||||
);
|
|
||||||
|
|
||||||
assert!(rescan_result.is_ok());
|
assert!(rescan_result.is_ok());
|
||||||
let rescan_result = rescan_result.unwrap();
|
let rescan_result = rescan_result.unwrap();
|
||||||
|
@ -441,12 +421,7 @@ mod test {
|
||||||
|
|
||||||
// Modification-less rescan
|
// Modification-less rescan
|
||||||
|
|
||||||
let rescan_result = _rescan_vault(
|
let rescan_result = _rescan_vault(&open_result.db, job_container.clone(), job_id);
|
||||||
open_result.pool.clone(),
|
|
||||||
temp_dir.as_ref().to_path_buf(),
|
|
||||||
job_container.clone(),
|
|
||||||
job_id,
|
|
||||||
);
|
|
||||||
|
|
||||||
assert!(rescan_result.is_ok());
|
assert!(rescan_result.is_ok());
|
||||||
let rescan_result = rescan_result.unwrap();
|
let rescan_result = rescan_result.unwrap();
|
||||||
|
@ -459,12 +434,7 @@ mod test {
|
||||||
|
|
||||||
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
|
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
|
||||||
|
|
||||||
let rescan_result = _rescan_vault(
|
let rescan_result = _rescan_vault(&open_result.db, job_container, job_id);
|
||||||
open_result.pool,
|
|
||||||
temp_dir.as_ref().to_path_buf(),
|
|
||||||
job_container,
|
|
||||||
job_id,
|
|
||||||
);
|
|
||||||
|
|
||||||
assert!(rescan_result.is_ok());
|
assert!(rescan_result.is_ok());
|
||||||
let rescan_result = rescan_result.unwrap();
|
let rescan_result = rescan_result.unwrap();
|
||||||
|
|
11
src/main.rs
11
src/main.rs
|
@ -15,6 +15,8 @@ use clap::{App as ClapApp, Arg};
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
|
use crate::database::UpEndDatabase;
|
||||||
|
|
||||||
mod addressing;
|
mod addressing;
|
||||||
mod database;
|
mod database;
|
||||||
mod filesystem;
|
mod filesystem;
|
||||||
|
@ -85,14 +87,14 @@ fn main() -> Result<()> {
|
||||||
|
|
||||||
let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap());
|
let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap());
|
||||||
|
|
||||||
let open_result = database::open_upend(
|
let open_result = UpEndDatabase::open(
|
||||||
&vault_path,
|
&vault_path,
|
||||||
matches.value_of("DB_PATH").map(PathBuf::from),
|
matches.value_of("DB_PATH").map(PathBuf::from),
|
||||||
matches.is_present("REINITIALIZE"),
|
matches.is_present("REINITIALIZE"),
|
||||||
)
|
)
|
||||||
.expect("failed to open database!");
|
.expect("failed to open database!");
|
||||||
|
|
||||||
let db_pool = open_result.pool;
|
let upend = Arc::new(open_result.db);
|
||||||
|
|
||||||
let bind: SocketAddr = matches
|
let bind: SocketAddr = matches
|
||||||
.value_of("BIND")
|
.value_of("BIND")
|
||||||
|
@ -102,6 +104,7 @@ fn main() -> Result<()> {
|
||||||
info!("Starting server at: {}", &bind);
|
info!("Starting server at: {}", &bind);
|
||||||
|
|
||||||
let state = routes::State {
|
let state = routes::State {
|
||||||
|
upend: upend.clone(),
|
||||||
vault_name: Some(
|
vault_name: Some(
|
||||||
matches
|
matches
|
||||||
.value_of("VAULT_NAME")
|
.value_of("VAULT_NAME")
|
||||||
|
@ -115,8 +118,6 @@ fn main() -> Result<()> {
|
||||||
.into_owned()
|
.into_owned()
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
directory: vault_path.clone(),
|
|
||||||
db_pool: db_pool.clone(),
|
|
||||||
job_container: job_container.clone(),
|
job_container: job_container.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -154,7 +155,7 @@ fn main() -> Result<()> {
|
||||||
|
|
||||||
if !matches.is_present("NO_INITIAL_UPDATE") {
|
if !matches.is_present("NO_INITIAL_UPDATE") {
|
||||||
info!("Running initial update...");
|
info!("Running initial update...");
|
||||||
actix::spawn(filesystem::rescan_vault(db_pool, vault_path, job_container));
|
actix::spawn(filesystem::rescan_vault(upend, job_container));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "desktop")]
|
#[cfg(feature = "desktop")]
|
||||||
|
|
|
@ -2,9 +2,7 @@ use crate::addressing::{Address, Addressable};
|
||||||
use crate::database::entry::{Entry, InEntry};
|
use crate::database::entry::{Entry, InEntry};
|
||||||
use crate::database::hierarchies::{list_roots, resolve_path, UHierPath};
|
use crate::database::hierarchies::{list_roots, resolve_path, UHierPath};
|
||||||
use crate::database::lang::Query;
|
use crate::database::lang::Query;
|
||||||
use crate::database::{
|
use crate::database::UpEndDatabase;
|
||||||
get_latest_files, insert_entry, query, remove_object, retrieve_file, retrieve_object, DbPool,
|
|
||||||
};
|
|
||||||
use crate::util::hash::{decode, encode};
|
use crate::util::hash::{decode, encode};
|
||||||
use crate::util::jobs::JobContainer;
|
use crate::util::jobs::JobContainer;
|
||||||
use actix_files::NamedFile;
|
use actix_files::NamedFile;
|
||||||
|
@ -18,7 +16,6 @@ use serde::Deserialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
#[cfg(feature = "desktop")]
|
#[cfg(feature = "desktop")]
|
||||||
|
@ -28,9 +25,8 @@ const VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct State {
|
pub struct State {
|
||||||
|
pub upend: Arc<UpEndDatabase>,
|
||||||
pub vault_name: Option<String>,
|
pub vault_name: Option<String>,
|
||||||
pub directory: PathBuf,
|
|
||||||
pub db_pool: DbPool,
|
|
||||||
pub job_container: Arc<RwLock<JobContainer>>,
|
pub job_container: Arc<RwLock<JobContainer>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,10 +44,12 @@ pub async fn get_raw(
|
||||||
let address = Address::decode(&decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
|
let address = Address::decode(&decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
if let Address::Hash(hash) = address {
|
if let Address::Hash(hash) = address {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let files = retrieve_file(&connection, hash).map_err(ErrorInternalServerError)?;
|
let files = connection
|
||||||
|
.retrieve_file(hash)
|
||||||
|
.map_err(ErrorInternalServerError)?;
|
||||||
if let Some(file) = files.get(0) {
|
if let Some(file) = files.get(0) {
|
||||||
let file_path = state.directory.join(&file.path);
|
let file_path = state.upend.vault_path.join(&file.path);
|
||||||
|
|
||||||
if !query.native.is_some() {
|
if !query.native.is_some() {
|
||||||
Ok(Either::A(NamedFile::open(file_path)?))
|
Ok(Either::A(NamedFile::open(file_path)?))
|
||||||
|
@ -73,7 +71,7 @@ pub async fn get_raw(
|
||||||
http::header::ACCESS_CONTROL_EXPOSE_HEADERS,
|
http::header::ACCESS_CONTROL_EXPOSE_HEADERS,
|
||||||
http::header::WARNING.to_string(),
|
http::header::WARNING.to_string(),
|
||||||
);
|
);
|
||||||
|
|
||||||
file_path
|
file_path
|
||||||
.parent()
|
.parent()
|
||||||
.ok_or_else(|| {
|
.ok_or_else(|| {
|
||||||
|
@ -111,10 +109,12 @@ pub async fn get_query(
|
||||||
state: web::Data<State>,
|
state: web::Data<State>,
|
||||||
web::Query(info): web::Query<QueryRequest>,
|
web::Query(info): web::Query<QueryRequest>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
let in_query: Query = info.query.as_str().parse().map_err(ErrorBadRequest)?;
|
let in_query: Query = info.query.as_str().parse().map_err(ErrorBadRequest)?;
|
||||||
let entries = query(&connection, in_query).map_err(ErrorInternalServerError)?;
|
let entries = connection
|
||||||
|
.query(in_query)
|
||||||
|
.map_err(ErrorInternalServerError)?;
|
||||||
let mut result: HashMap<String, Entry> = HashMap::new();
|
let mut result: HashMap<String, Entry> = HashMap::new();
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
result.insert(
|
result.insert(
|
||||||
|
@ -152,13 +152,13 @@ pub async fn get_object(
|
||||||
state: web::Data<State>,
|
state: web::Data<State>,
|
||||||
address_str: web::Path<String>,
|
address_str: web::Path<String>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let result: Vec<Entry> = retrieve_object(
|
let result: Vec<Entry> = connection
|
||||||
&connection,
|
.retrieve_object(
|
||||||
Address::decode(&decode(address_str.into_inner()).map_err(ErrorBadRequest)?)
|
Address::decode(&decode(address_str.into_inner()).map_err(ErrorBadRequest)?)
|
||||||
.map_err(ErrorBadRequest)?,
|
.map_err(ErrorBadRequest)?,
|
||||||
)
|
)
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
debug!("{:?}", result);
|
debug!("{:?}", result);
|
||||||
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
|
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
|
||||||
|
@ -171,7 +171,7 @@ pub async fn put_object(
|
||||||
state: web::Data<State>,
|
state: web::Data<State>,
|
||||||
mut payload: web::Payload,
|
mut payload: web::Payload,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
let mut body = web::BytesMut::new();
|
let mut body = web::BytesMut::new();
|
||||||
while let Some(chunk) = payload.next().await {
|
while let Some(chunk) = payload.next().await {
|
||||||
|
@ -186,8 +186,9 @@ pub async fn put_object(
|
||||||
let in_entry = serde_json::from_slice::<InEntry>(&body).map_err(ErrorBadRequest)?;
|
let in_entry = serde_json::from_slice::<InEntry>(&body).map_err(ErrorBadRequest)?;
|
||||||
let entry = Entry::try_from(in_entry).map_err(ErrorInternalServerError)?;
|
let entry = Entry::try_from(in_entry).map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
let result_address =
|
let result_address = connection
|
||||||
insert_entry(&connection, entry.clone()).map_err(ErrorInternalServerError)?;
|
.insert_entry(entry.clone())
|
||||||
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(
|
Ok(HttpResponse::Ok().json(
|
||||||
[(
|
[(
|
||||||
|
@ -205,13 +206,13 @@ pub async fn delete_object(
|
||||||
state: web::Data<State>,
|
state: web::Data<State>,
|
||||||
address_str: web::Path<String>,
|
address_str: web::Path<String>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let _ = remove_object(
|
let _ = connection
|
||||||
&connection,
|
.remove_object(
|
||||||
Address::decode(&decode(address_str.into_inner()).map_err(ErrorBadRequest)?)
|
Address::decode(&decode(address_str.into_inner()).map_err(ErrorBadRequest)?)
|
||||||
.map_err(ErrorInternalServerError)?,
|
.map_err(ErrorInternalServerError)?,
|
||||||
)
|
)
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
@ -221,7 +222,7 @@ pub async fn list_hier(
|
||||||
state: web::Data<State>,
|
state: web::Data<State>,
|
||||||
path: web::Path<String>,
|
path: web::Path<String>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
if path.is_empty() {
|
if path.is_empty() {
|
||||||
Ok(HttpResponse::MovedPermanently()
|
Ok(HttpResponse::MovedPermanently()
|
||||||
.header("Location", "/api/hier_roots")
|
.header("Location", "/api/hier_roots")
|
||||||
|
@ -243,12 +244,12 @@ pub async fn list_hier(
|
||||||
|
|
||||||
#[get("/api/hier_roots")]
|
#[get("/api/hier_roots")]
|
||||||
pub async fn list_hier_roots(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
pub async fn list_hier_roots(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
let result = list_roots(&connection)
|
let result = list_roots(&connection)
|
||||||
.map_err(ErrorInternalServerError)?
|
.map_err(ErrorInternalServerError)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|root| retrieve_object(&connection, root))
|
.map(|root| connection.retrieve_object(root))
|
||||||
.collect::<Result<Vec<Vec<Entry>>>>()
|
.collect::<Result<Vec<Vec<Entry>>>>()
|
||||||
.map_err(ErrorInternalServerError)?
|
.map_err(ErrorInternalServerError)?
|
||||||
.concat();
|
.concat();
|
||||||
|
@ -258,11 +259,8 @@ pub async fn list_hier_roots(state: web::Data<State>) -> Result<HttpResponse, Er
|
||||||
|
|
||||||
#[post("/api/refresh")]
|
#[post("/api/refresh")]
|
||||||
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
let _pool = state.db_pool.clone();
|
|
||||||
let _directory = state.directory.clone();
|
|
||||||
actix::spawn(crate::filesystem::rescan_vault(
|
actix::spawn(crate::filesystem::rescan_vault(
|
||||||
_pool,
|
state.upend.clone(),
|
||||||
_directory,
|
|
||||||
state.job_container.clone(),
|
state.job_container.clone(),
|
||||||
));
|
));
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
|
@ -277,8 +275,10 @@ pub async fn get_file(
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
if let Address::Hash(hash) = address {
|
if let Address::Hash(hash) = address {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let response = retrieve_file(&connection, hash).map_err(ErrorInternalServerError)?;
|
let response = connection
|
||||||
|
.retrieve_file(hash)
|
||||||
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(response))
|
Ok(HttpResponse::Ok().json(response))
|
||||||
} else {
|
} else {
|
||||||
|
@ -288,8 +288,10 @@ pub async fn get_file(
|
||||||
|
|
||||||
#[get("/api/files/latest")]
|
#[get("/api/files/latest")]
|
||||||
pub async fn latest_files(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
pub async fn latest_files(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.db_pool.get().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let files = get_latest_files(&connection, 100).map_err(ErrorInternalServerError)?;
|
let files = connection
|
||||||
|
.get_latest_files(100)
|
||||||
|
.map_err(ErrorInternalServerError)?;
|
||||||
Ok(HttpResponse::Ok().json(&files))
|
Ok(HttpResponse::Ok().json(&files))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -303,7 +305,7 @@ pub async fn get_jobs(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
pub async fn get_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
pub async fn get_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
Ok(HttpResponse::Ok().json(json!({
|
Ok(HttpResponse::Ok().json(json!({
|
||||||
"name": state.vault_name,
|
"name": state.vault_name,
|
||||||
"location": state.directory,
|
"location": state.upend.vault_path,
|
||||||
"version": VERSION
|
"version": VERSION
|
||||||
})))
|
})))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue