542 lines
16 KiB
Rust
542 lines
16 KiB
Rust
#[macro_use]
|
|
extern crate diesel;
|
|
|
|
#[macro_use]
|
|
extern crate diesel_migrations;
|
|
|
|
#[macro_use]
|
|
extern crate lazy_static;
|
|
|
|
#[macro_use]
|
|
mod macros;
|
|
|
|
pub mod common;
|
|
pub mod engine;
|
|
pub mod entry;
|
|
pub mod hierarchies;
|
|
pub mod jobs;
|
|
pub mod stores;
|
|
|
|
mod inner;
|
|
mod util;
|
|
|
|
use crate::common::build;
|
|
use crate::engine::execute;
|
|
use crate::inner::models;
|
|
use crate::inner::schema::data;
|
|
use crate::util::LoggerSink;
|
|
use anyhow::{anyhow, Result};
|
|
use diesel::prelude::*;
|
|
use diesel::r2d2::{self, ConnectionManager};
|
|
use diesel::result::{DatabaseErrorKind, Error};
|
|
use diesel::sqlite::SqliteConnection;
|
|
use hierarchies::initialize_hier;
|
|
use shadow_rs::is_release;
|
|
use std::convert::TryFrom;
|
|
use std::fs;
|
|
use std::path::{Path, PathBuf};
|
|
use std::sync::{Arc, Mutex, RwLock};
|
|
use std::time::Duration;
|
|
use tracing::{debug, error, trace, warn};
|
|
use upend_base::addressing::{Address, Addressable};
|
|
use upend_base::entry::{Entry, EntryValue, ImmutableEntry};
|
|
use upend_base::error::UpEndError;
|
|
use upend_base::hash::UpMultihash;
|
|
use upend_base::lang::Query;
|
|
|
|
#[derive(Debug)]
|
|
pub struct ConnectionOptions {
|
|
pub busy_timeout: Option<Duration>,
|
|
pub enable_wal_mode: bool,
|
|
pub mutex: Arc<Mutex<()>>,
|
|
}
|
|
|
|
impl ConnectionOptions {
|
|
pub fn apply(&self, connection: &SqliteConnection) -> QueryResult<()> {
|
|
let _lock = self.mutex.lock().unwrap();
|
|
|
|
if let Some(duration) = self.busy_timeout {
|
|
debug!("Setting busy_timeout to {:?}", duration);
|
|
connection.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
|
|
}
|
|
|
|
connection.execute(if self.enable_wal_mode {
|
|
debug!("Enabling WAL journal mode & truncating WAL log...");
|
|
"PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);"
|
|
} else {
|
|
debug!("Enabling TRUNCATE journal mode");
|
|
"PRAGMA journal_mode = TRUNCATE;"
|
|
})?;
|
|
|
|
debug!(r#"Setting "synchronous" to NORMAL"#);
|
|
connection.execute("PRAGMA synchronous = NORMAL;")?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
|
|
for ConnectionOptions
|
|
{
|
|
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
|
|
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
|
|
}
|
|
}
|
|
|
|
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
|
|
|
#[derive(Debug)]
|
|
pub struct LoggingHandler {
|
|
pub name: &'static str,
|
|
}
|
|
|
|
impl diesel::r2d2::HandleError<diesel::r2d2::Error> for LoggingHandler {
|
|
fn handle_error(&self, error: diesel::r2d2::Error) {
|
|
error!(name = self.name, "Database error: {}", error);
|
|
if !is_release() {
|
|
panic!("Database error! This should not happen! {}", error);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct OpenResult {
|
|
pub db: UpEndDatabase,
|
|
pub new: bool,
|
|
}
|
|
|
|
pub struct UpEndDatabase {
|
|
pub path: PathBuf,
|
|
pool: Arc<DbPool>,
|
|
lock: Arc<RwLock<()>>,
|
|
}
|
|
|
|
pub const UPEND_SUBDIR: &str = ".upend";
|
|
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
|
|
|
|
impl UpEndDatabase {
|
|
pub fn open<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<OpenResult> {
|
|
embed_migrations!("./migrations/upend/");
|
|
|
|
let upend_path = dirpath.as_ref().join(UPEND_SUBDIR);
|
|
|
|
if reinitialize {
|
|
warn!("Reinitializing - removing previous database...");
|
|
let _ = fs::remove_dir_all(&upend_path);
|
|
}
|
|
let new = !upend_path.exists();
|
|
|
|
if new {
|
|
trace!("Creating UpEnd subdirectory...");
|
|
fs::create_dir(&upend_path)?;
|
|
}
|
|
|
|
trace!("Creating pool.");
|
|
let manager = ConnectionManager::<SqliteConnection>::new(
|
|
upend_path.join(DATABASE_FILENAME).to_str().unwrap(),
|
|
);
|
|
let pool = r2d2::Pool::builder()
|
|
.connection_customizer(Box::new(ConnectionOptions {
|
|
busy_timeout: Some(Duration::from_secs(30)),
|
|
enable_wal_mode: true,
|
|
mutex: Arc::new(Mutex::new(())),
|
|
}))
|
|
.error_handler(Box::new(LoggingHandler { name: "main" }))
|
|
.build(manager)?;
|
|
trace!("Pool created.");
|
|
|
|
let db = UpEndDatabase {
|
|
path: upend_path,
|
|
pool: Arc::new(pool),
|
|
lock: Arc::new(RwLock::new(())),
|
|
};
|
|
let connection = db.connection().unwrap();
|
|
|
|
if !new {
|
|
let db_major: u64 = connection.get_meta("VERSION")?.parse()?;
|
|
if db_major > build::PKG_VERSION_MAJOR.parse().unwrap() {
|
|
return Err(anyhow!("Incompatible database! Found version "));
|
|
}
|
|
}
|
|
|
|
trace!("Running migrations...");
|
|
|
|
embedded_migrations::run_with_output(
|
|
&db.pool.get()?,
|
|
&mut LoggerSink {
|
|
..Default::default()
|
|
},
|
|
)?;
|
|
|
|
initialize_hier(&connection)?;
|
|
|
|
Ok(OpenResult { db, new })
|
|
}
|
|
|
|
pub fn connection(&self) -> Result<UpEndConnection> {
|
|
Ok(UpEndConnection {
|
|
pool: self.pool.clone(),
|
|
lock: self.lock.clone(),
|
|
})
|
|
}
|
|
}
|
|
|
|
pub struct UpEndConnection {
|
|
pool: Arc<DbPool>,
|
|
lock: Arc<RwLock<()>>,
|
|
}
|
|
|
|
impl UpEndConnection {
|
|
pub fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
|
|
where
|
|
F: FnOnce() -> Result<T, E>,
|
|
E: From<Error>,
|
|
{
|
|
/*
|
|
let span = span!(tracing::Level::TRACE, "transaction");
|
|
let _span = span.enter();
|
|
let _lock = self.transaction_lock.lock().unwrap();
|
|
self.conn.exclusive_transaction(f)
|
|
*/
|
|
// Disable transactions for now.
|
|
f()
|
|
}
|
|
|
|
pub fn get_meta<S: AsRef<str>>(&self, key: S) -> Result<String> {
|
|
use crate::inner::schema::meta::dsl;
|
|
let key = key.as_ref();
|
|
|
|
trace!("Querying META:{key}");
|
|
|
|
let _lock = self.lock.read().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
dsl::meta
|
|
.filter(dsl::key.eq(key))
|
|
.load::<models::MetaValue>(&conn)?
|
|
.first()
|
|
.ok_or(anyhow!(r#"No META "{key}" value found."#))
|
|
.map(|mv| mv.value.clone())
|
|
}
|
|
|
|
pub fn retrieve_entry(&self, hash: &UpMultihash) -> Result<Option<Entry>> {
|
|
use crate::inner::schema::data::dsl::*;
|
|
|
|
let _lock = self.lock.read().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let entry = data
|
|
.filter(identity.eq(Address::Hash(hash.clone()).encode()?))
|
|
.load::<models::Entry>(&conn)?;
|
|
|
|
match entry.len() {
|
|
0 => Ok(None),
|
|
1 => Ok(Some(Entry::try_from(entry.get(0).unwrap())?)),
|
|
_ => {
|
|
unreachable!(
|
|
"Multiple entries returned with the same hash - this should be impossible!"
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn retrieve_object(&self, object_address: &Address) -> Result<Vec<Entry>> {
|
|
use crate::inner::schema::data::dsl::*;
|
|
|
|
let _lock = self.lock.read().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let primary = data
|
|
.filter(entity.eq(object_address.encode()?))
|
|
.or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string()?))
|
|
.load::<models::Entry>(&conn)?;
|
|
|
|
let entries = primary
|
|
.iter()
|
|
.map(Entry::try_from)
|
|
.collect::<Result<Vec<Entry>>>()?;
|
|
|
|
let secondary = data
|
|
.filter(
|
|
entity.eq_any(
|
|
entries
|
|
.iter()
|
|
.map(|e| e.address())
|
|
.filter_map(Result::ok)
|
|
.map(|addr| addr.encode())
|
|
.collect::<Result<Vec<Vec<u8>>, UpEndError>>()?,
|
|
),
|
|
)
|
|
.load::<models::Entry>(&conn)?;
|
|
|
|
let secondary_entries = secondary
|
|
.iter()
|
|
.map(Entry::try_from)
|
|
.collect::<Result<Vec<Entry>>>()?;
|
|
|
|
Ok([entries, secondary_entries].concat())
|
|
}
|
|
|
|
pub fn remove_object(&self, object_address: Address) -> Result<usize> {
|
|
use crate::inner::schema::data::dsl::*;
|
|
|
|
trace!("Deleting {}!", object_address);
|
|
|
|
let _lock = self.lock.write().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let matches = data
|
|
.filter(identity.eq(object_address.encode()?))
|
|
.or_filter(entity.eq(object_address.encode()?))
|
|
.or_filter(value_str.eq(EntryValue::Address(object_address).to_string()?));
|
|
|
|
Ok(diesel::delete(matches).execute(&conn)?)
|
|
}
|
|
|
|
pub fn query(&self, query: Query) -> Result<Vec<Entry>> {
|
|
trace!("Querying: {:?}", query);
|
|
|
|
let _lock = self.lock.read().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let entries = execute(&conn, query)?;
|
|
let entries = entries
|
|
.iter()
|
|
.map(Entry::try_from)
|
|
.filter_map(Result::ok)
|
|
.collect();
|
|
|
|
Ok(entries)
|
|
}
|
|
|
|
pub fn insert_entry(&self, entry: Entry) -> Result<Address> {
|
|
trace!("Inserting: {}", entry);
|
|
let db_entry = models::Entry::try_from(&entry)?;
|
|
self.insert_model_entry(db_entry)?;
|
|
Ok(entry.address()?)
|
|
}
|
|
|
|
pub fn insert_entry_immutable(&self, entry: Entry) -> Result<Address> {
|
|
trace!("Inserting immutably: {}", entry);
|
|
let address = entry.address()?;
|
|
let db_entry = models::Entry::try_from(&ImmutableEntry(entry))?;
|
|
self.insert_model_entry(db_entry)?;
|
|
Ok(address)
|
|
}
|
|
|
|
fn insert_model_entry(&self, entry: models::Entry) -> Result<usize> {
|
|
let _lock = self.lock.write().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let result = diesel::insert_into(data::table)
|
|
.values(&entry)
|
|
.execute(&conn);
|
|
|
|
match result {
|
|
Ok(num) => Ok(num),
|
|
Err(error) => match error {
|
|
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => Ok(0),
|
|
_ => Err(anyhow!(error)),
|
|
},
|
|
}
|
|
}
|
|
|
|
// #[deprecated]
|
|
pub fn get_all_addresses(&self) -> Result<Vec<Address>> {
|
|
use crate::inner::schema::data::dsl::*;
|
|
|
|
let _lock = self.lock.read().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let result = data
|
|
.select(entity)
|
|
.distinct()
|
|
.load::<Vec<u8>>(&conn)?
|
|
.into_iter()
|
|
.filter_map(|buf| Address::decode(&buf).ok())
|
|
.collect();
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
#[deprecated]
|
|
pub fn get_all_attributes(&self) -> Result<Vec<String>> {
|
|
use crate::inner::schema::data::dsl::*;
|
|
|
|
let _lock = self.lock.read().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let result = data
|
|
.select(attribute)
|
|
.distinct()
|
|
.order_by(attribute)
|
|
.load::<String>(&conn)?;
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
pub fn get_stats(&self) -> Result<serde_json::Value> {
|
|
use crate::inner::schema::data::dsl::*;
|
|
let _lock = self.lock.read().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let total_entry_count = data.count().load::<i64>(&conn)?;
|
|
let total_entry_count = total_entry_count
|
|
.first()
|
|
.ok_or(anyhow!("Couldn't get entry count"))?;
|
|
|
|
let api_entry_count = data
|
|
.filter(provenance.like("API%"))
|
|
.count()
|
|
.load::<i64>(&conn)?;
|
|
let api_entry_count = api_entry_count
|
|
.first()
|
|
.ok_or(anyhow!("Couldn't get API entry count"))?;
|
|
|
|
let implicit_entry_count = data
|
|
.filter(provenance.like("%IMPLICIT%"))
|
|
.count()
|
|
.load::<i64>(&conn)?;
|
|
let implicit_entry_count = implicit_entry_count
|
|
.first()
|
|
.ok_or(anyhow!("Couldn't get API entry count"))?;
|
|
|
|
Ok(serde_json::json!({
|
|
"entryCount": {
|
|
"total": total_entry_count,
|
|
"api": api_entry_count,
|
|
"explicit": api_entry_count - implicit_entry_count
|
|
}
|
|
}))
|
|
}
|
|
|
|
#[deprecated]
|
|
pub fn get_explicit_entries(&self) -> Result<Vec<Entry>> {
|
|
use crate::inner::schema::data::dsl::*;
|
|
let _lock = self.lock.read().unwrap();
|
|
let conn = self.pool.get()?;
|
|
|
|
let result: Vec<models::Entry> = data
|
|
.filter(
|
|
provenance
|
|
.like("API%")
|
|
.and(provenance.not_like("%IMPLICIT%")),
|
|
)
|
|
.load(&conn)?;
|
|
|
|
Ok(result
|
|
.iter()
|
|
.map(Entry::try_from)
|
|
.collect::<Result<Vec<Entry>>>()?)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use upend_base::constants::ATTR_LABEL;
|
|
|
|
use super::*;
|
|
use tempfile::TempDir;
|
|
|
|
#[test]
|
|
fn test_open() {
|
|
let tempdir = TempDir::new().unwrap();
|
|
|
|
let result = UpEndDatabase::open(&tempdir, false);
|
|
let result = result.unwrap();
|
|
assert!(result.new);
|
|
|
|
// Not new
|
|
let result = UpEndDatabase::open(&tempdir, false);
|
|
let result = result.unwrap();
|
|
assert!(!result.new);
|
|
|
|
// reinitialize true, new again
|
|
let result = UpEndDatabase::open(&tempdir, true);
|
|
let result = result.unwrap();
|
|
assert!(result.new);
|
|
}
|
|
|
|
#[test]
|
|
fn test_query() {
|
|
let tempdir = TempDir::new().unwrap();
|
|
let result = UpEndDatabase::open(&tempdir, false).unwrap();
|
|
let db = result.db;
|
|
|
|
let connection = db.connection().unwrap();
|
|
|
|
let random_entity = Address::Uuid(uuid::Uuid::new_v4());
|
|
upend_insert_val!(connection, random_entity, ATTR_LABEL, "FOOBAR").unwrap();
|
|
upend_insert_val!(connection, random_entity, "FLAVOUR", "STRANGE").unwrap();
|
|
|
|
let query = format!(r#"(matches @{random_entity} ? ?)"#)
|
|
.parse()
|
|
.unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 2);
|
|
|
|
let other_entity = Address::Uuid(uuid::Uuid::new_v4());
|
|
upend_insert_val!(connection, random_entity, ATTR_LABEL, "BAZQUX").unwrap();
|
|
upend_insert_val!(connection, random_entity, "CHARGE", "POSITIVE").unwrap();
|
|
|
|
let query = format!(r#"(matches (in @{random_entity} @{other_entity}) ? ?)"#)
|
|
.parse()
|
|
.unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 4);
|
|
|
|
let query = r#"(matches ? (in "FLAVOUR" "CHARGE") ?)"#.parse().unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 2);
|
|
|
|
let query = format!(r#"(matches ? "{ATTR_LABEL}" (in "FOOBAR" "BAZQUX"))"#)
|
|
.parse()
|
|
.unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 2);
|
|
|
|
let query = format!(r#"(matches ? "{ATTR_LABEL}" (contains "OOBA"))"#)
|
|
.parse()
|
|
.unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 1);
|
|
|
|
let query = r#"(or (matches ? ? (contains "OOBA")) (matches ? (contains "HARGE") ?) )"#
|
|
.parse()
|
|
.unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 2);
|
|
|
|
let query =
|
|
format!(r#"(and (matches ? ? (contains "OOBA")) (matches ? "{ATTR_LABEL}" ?) )"#)
|
|
.parse()
|
|
.unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 1);
|
|
|
|
let query = format!(
|
|
r#"(and
|
|
(or
|
|
(matches ? ? (contains "OOBA"))
|
|
(matches ? (contains "HARGE") ?)
|
|
)
|
|
(not (matches ? "{ATTR_LABEL}" ?))
|
|
)"#
|
|
)
|
|
.parse()
|
|
.unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 1);
|
|
|
|
let query = format!(
|
|
r#"(join
|
|
(matches ?a "FLAVOUR" ?)
|
|
(matches ?a "{ATTR_LABEL}" "FOOBAR")
|
|
)"#
|
|
)
|
|
.parse()
|
|
.unwrap();
|
|
let result = connection.query(query).unwrap();
|
|
assert_eq!(result.len(), 1);
|
|
assert_eq!(result[0].value, "STRANGE".into());
|
|
}
|
|
}
|