upend/src/database/mod.rs

479 lines
14 KiB
Rust

#![macro_use]
#[macro_use]
mod macros;
pub mod stores;
pub mod constants;
pub mod engine;
pub mod entry;
pub mod hierarchies;
pub mod inner;
pub mod lang;
use crate::addressing::{Address, Addressable};
use crate::common::build;
use crate::database::constants::{
IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_INVARIANT,
};
use crate::database::engine::execute;
use crate::database::entry::{Entry, EntryValue, ImmutableEntry};
use crate::database::inner::models;
use crate::database::inner::schema::data;
use crate::database::lang::Query;
use crate::util::hash::Hash;
use crate::util::LoggerSink;
use anyhow::{anyhow, Result};
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager};
use diesel::result::{DatabaseErrorKind, Error};
use diesel::sqlite::SqliteConnection;
use hierarchies::initialize_hier;
use std::convert::TryFrom;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tracing::{debug, trace};
#[derive(Debug)]
pub struct ConnectionOptions {
pub enable_foreign_keys: bool,
pub busy_timeout: Option<Duration>,
}
impl ConnectionOptions {
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
if self.enable_foreign_keys {
debug!("Enabling foreign keys");
conn.execute("PRAGMA foreign_keys = ON;")?;
}
if let Some(duration) = self.busy_timeout {
debug!("Setting busy_timeout to {:?}", duration);
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
}
debug!(r#"Setting "synchronous" to NORMAL"#);
conn.execute("PRAGMA synchronous = NORMAL;")?;
Ok(())
}
}
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
for ConnectionOptions
{
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
}
}
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
pub struct OpenResult {
pub db: UpEndDatabase,
pub new: bool,
}
pub struct UpEndDatabase {
pub path: PathBuf,
pool: Arc<DbPool>,
lock: Arc<RwLock<()>>,
}
pub const UPEND_SUBDIR: &str = ".upend";
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
impl UpEndDatabase {
pub fn open<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<OpenResult> {
embed_migrations!("./migrations/upend/");
let upend_path = dirpath.as_ref().join(UPEND_SUBDIR);
if reinitialize {
debug!("Reinitializing - removing previous database...");
let _ = fs::remove_dir_all(&upend_path);
}
let new = !upend_path.exists();
if new {
trace!("Creating UpEnd subdirectory...");
fs::create_dir(&upend_path)?;
}
trace!("Creating pool.");
let manager = ConnectionManager::<SqliteConnection>::new(
upend_path.join(DATABASE_FILENAME).to_str().unwrap(),
);
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {
enable_foreign_keys: true,
busy_timeout: Some(Duration::from_secs(30)),
}))
.build(manager)?;
trace!("Pool created.");
let db = UpEndDatabase {
path: upend_path,
pool: Arc::new(pool),
lock: Arc::new(RwLock::new(())),
};
let connection = db.connection().unwrap();
if !new {
let db_major: u64 = connection.get_meta("VERSION")?.parse()?;
if db_major > build::PKG_VERSION_MAJOR.parse().unwrap() {
return Err(anyhow!("Incompatible database! Found version "));
}
}
trace!("Running initial config.");
let enable_wal_mode = true;
connection.execute(if enable_wal_mode {
debug!("Enabling WAL journal mode & truncating WAL log...");
"PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);"
} else {
debug!("Enabling TRUNCATE journal mode");
"PRAGMA journal_mode = TRUNCATE;"
})?;
trace!("Running migrations...");
embedded_migrations::run_with_output(
&db.pool.get()?,
&mut LoggerSink {
..Default::default()
},
)?;
trace!("Initializing types...");
connection.insert_entry(Entry::try_from(&*TYPE_INVARIANT)?)?;
upend_insert_addr!(connection, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
upend_insert_val!(connection, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_HAS_ATTR)?;
upend_insert_val!(connection, TYPE_ADDR, LABEL_ATTR, "UpEnd Type")?;
initialize_hier(&connection)?;
Ok(OpenResult { db, new })
}
pub fn connection(&self) -> Result<UpEndConnection> {
Ok(UpEndConnection {
pool: self.pool.clone(),
lock: self.lock.clone(),
})
}
}
pub struct UpEndConnection {
pool: Arc<DbPool>,
lock: Arc<RwLock<()>>,
}
impl UpEndConnection {
pub fn execute<S: AsRef<str>>(&self, query: S) -> Result<usize> {
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
Ok(conn.execute(query.as_ref())?)
}
pub fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
where
F: FnOnce() -> Result<T, E>,
E: From<Error>,
{
/*
let span = span!(tracing::Level::TRACE, "transaction");
let _span = span.enter();
let _lock = self.transaction_lock.lock().unwrap();
self.conn.exclusive_transaction(f)
*/
// Disable transactions for now.
f()
}
pub fn get_meta<S: AsRef<str>>(&self, key: S) -> Result<String> {
use crate::database::inner::schema::meta::dsl;
let key = key.as_ref();
debug!("Querying META:{key}");
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
dsl::meta
.filter(dsl::key.eq(key))
.load::<models::MetaValue>(&conn)?
.first()
.ok_or(anyhow!(r#"No META "{key}" value found."#))
.map(|mv| mv.value.clone())
}
pub fn retrieve_entry(&self, hash: &Hash) -> Result<Option<Entry>> {
use crate::database::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let entry = data
.filter(identity.eq(Address::Hash(hash.clone()).encode()?))
.load::<models::Entry>(&conn)?;
match entry.len() {
0 => Ok(None),
1 => Ok(Some(Entry::try_from(entry.get(0).unwrap())?)),
_ => {
unreachable!(
"Multiple entries returned with the same hash - this should be impossible!"
)
}
}
}
pub fn retrieve_object(&self, object_address: &Address) -> Result<Vec<Entry>> {
use crate::database::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let primary = data
.filter(entity.eq(object_address.encode()?))
.or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string()?))
.load::<models::Entry>(&conn)?;
let entries = primary
.iter()
.map(Entry::try_from)
.collect::<Result<Vec<Entry>>>()?;
let secondary = data
.filter(
entity.eq_any(
entries
.iter()
.map(|e| e.address())
.filter_map(Result::ok)
.map(|addr| addr.encode())
.collect::<Result<Vec<Vec<u8>>>>()?,
),
)
.load::<models::Entry>(&conn)?;
let secondary_entries = secondary
.iter()
.map(Entry::try_from)
.collect::<Result<Vec<Entry>>>()?;
Ok([entries, secondary_entries].concat())
}
pub fn remove_object(&self, object_address: Address) -> Result<usize> {
use crate::database::inner::schema::data::dsl::*;
debug!("Deleting {}!", object_address);
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
let matches = data
.filter(identity.eq(object_address.encode()?))
.or_filter(entity.eq(object_address.encode()?))
.or_filter(value_str.eq(EntryValue::Address(object_address).to_string()?));
Ok(diesel::delete(matches).execute(&conn)?)
}
pub fn query(&self, query: Query) -> Result<Vec<Entry>> {
trace!("Querying: {:?}", query);
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let entries = execute(&conn, query)?;
let entries = entries
.iter()
.map(Entry::try_from)
.filter_map(Result::ok)
.collect();
Ok(entries)
}
pub fn insert_entry(&self, entry: Entry) -> Result<Address> {
debug!("Inserting: {}", entry);
let db_entry = models::Entry::try_from(&entry)?;
self.insert_model_entry(db_entry)?;
entry.address()
}
pub fn insert_entry_immutable(&self, entry: Entry) -> Result<Address> {
debug!("Inserting immutably: {}", entry);
let address = entry.address()?;
let db_entry = models::Entry::try_from(&ImmutableEntry(entry))?;
self.insert_model_entry(db_entry)?;
Ok(address)
}
fn insert_model_entry(&self, entry: models::Entry) -> Result<usize> {
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
let result = diesel::insert_into(data::table)
.values(&entry)
.execute(&conn);
match result {
Ok(num) => Ok(num),
Err(error) => match error {
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => Ok(0),
_ => Err(anyhow!(error)),
},
}
}
// #[deprecated]
pub fn get_all_addresses(&self) -> Result<Vec<Address>> {
use crate::database::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let result = data
.select(entity)
.distinct()
.load::<Vec<u8>>(&conn)?
.into_iter()
.filter_map(|buf| Address::decode(&buf).ok())
.collect();
Ok(result)
}
// #[deprecated]
pub fn get_all_attributes(&self) -> Result<Vec<String>> {
use crate::database::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let result = data
.select(attribute)
.distinct()
.order_by(attribute)
.load::<String>(&conn)?;
Ok(result)
}
}
#[cfg(test)]
mod test {
use super::*;
use tempfile::TempDir;
#[test]
fn test_open() {
let tempdir = TempDir::new().unwrap();
let result = UpEndDatabase::open(&tempdir, false);
assert!(result.is_ok());
assert!(result.unwrap().new);
// Not new
let result = UpEndDatabase::open(&tempdir, false);
assert!(result.is_ok());
assert!(!result.unwrap().new);
// reinitialize true, new again
let result = UpEndDatabase::open(&tempdir, true);
assert!(result.is_ok());
assert!(result.unwrap().new);
}
#[test]
fn test_query() {
let tempdir = TempDir::new().unwrap();
let result = UpEndDatabase::open(&tempdir, false).unwrap();
let db = result.db;
let connection = db.connection().unwrap();
let random_entity = Address::Uuid(uuid::Uuid::new_v4());
upend_insert_val!(connection, random_entity, LABEL_ATTR, "FOOBAR").unwrap();
upend_insert_val!(connection, random_entity, "FLAVOUR", "STRANGE").unwrap();
let query = format!(r#"(matches @{random_entity} ? ?)"#)
.parse()
.unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 2);
let other_entity = Address::Uuid(uuid::Uuid::new_v4());
upend_insert_val!(connection, random_entity, LABEL_ATTR, "BAZQUX").unwrap();
upend_insert_val!(connection, random_entity, "CHARGE", "POSITIVE").unwrap();
let query = format!(r#"(matches (in @{random_entity} @{other_entity}) ? ?)"#)
.parse()
.unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 4);
let query = r#"(matches ? (in "FLAVOUR" "CHARGE") ?)"#.parse().unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 2);
let query = format!(r#"(matches ? "{LABEL_ATTR}" (in "FOOBAR" "BAZQUX"))"#)
.parse()
.unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 2);
let query = format!(r#"(matches ? "{LABEL_ATTR}" (contains "OOBA"))"#)
.parse()
.unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 1);
let query = r#"(or (matches ? ? (contains "OOBA")) (matches ? (contains "HARGE") ?) )"#
.parse()
.unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 2);
let query =
format!(r#"(and (matches ? ? (contains "OOBA")) (matches ? "{LABEL_ATTR}" ?) )"#)
.parse()
.unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 1);
let query = format!(
r#"(and
(or
(matches ? ? (contains "OOBA"))
(matches ? (contains "HARGE") ?)
)
(not (matches ? "{LABEL_ATTR}" ?))
)"#
)
.parse()
.unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 1);
let query = format!(
r#"(join
(matches ?a "FLAVOUR" ?)
(matches ?a "{LABEL_ATTR}" "FOOBAR")
)"#
)
.parse()
.unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].value, "STRANGE".into());
}
}