#![macro_use] #[macro_use] mod macros; pub mod constants; pub mod engine; pub mod entry; pub mod hierarchies; pub mod inner; pub mod lang; use crate::addressing::{Address, Addressable}; use crate::database::constants::{ IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_INVARIANT, }; use crate::database::engine::execute; use crate::database::entry::{Entry, EntryValue, ImmutableEntry}; use crate::database::inner::models; use crate::database::inner::schema::data; use crate::database::lang::Query; use crate::util::hash::Hash; use crate::util::LoggerSink; use anyhow::{anyhow, Result}; use chrono::NaiveDateTime; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, PooledConnection}; use diesel::result::{DatabaseErrorKind, Error}; use diesel::sqlite::SqliteConnection; use hierarchies::initialize_hier; use log::{debug, trace}; use std::convert::{TryFrom, TryInto}; use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; #[derive(Debug)] pub struct ConnectionOptions { pub enable_foreign_keys: bool, pub busy_timeout: Option, } impl ConnectionOptions { pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { if self.enable_foreign_keys { debug!("Enabling foreign keys"); conn.execute("PRAGMA foreign_keys = ON;")?; } if let Some(duration) = self.busy_timeout { debug!("Setting busy_timeout to {:?}", duration); conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; } debug!(r#"Setting "synchronous" to NORMAL"#); conn.execute("PRAGMA synchronous = NORMAL;")?; Ok(()) } } impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { self.apply(conn).map_err(diesel::r2d2::Error::QueryError) } } type DbPool = r2d2::Pool>; pub struct OpenResult { pub db: UpEndDatabase, pub new: bool, } pub struct UpEndDatabase { pool: DbPool, pub vault_path: Arc, pub db_path: Arc, } pub const UPEND_SUBDIR: &str = ".upend"; pub const DATABASE_FILENAME: &str = "upend.sqlite3"; impl UpEndDatabase { pub fn open>( dirpath: P, db_path: Option, reinitialize: bool, ) -> Result { embed_migrations!("./migrations/upend/"); let upend_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(UPEND_SUBDIR)); if reinitialize { debug!("Reinitializing - removing previous database..."); let _ = fs::remove_dir_all(&upend_path); } let new = !upend_path.exists(); if new { trace!("Creating UpEnd subdirectory..."); fs::create_dir(&upend_path)?; } trace!("Creating pool."); let manager = ConnectionManager::::new( upend_path.join(DATABASE_FILENAME).to_str().unwrap(), ); let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { enable_foreign_keys: true, busy_timeout: Some(Duration::from_secs(30)), })) .build(manager)?; trace!("Pool created."); let db = UpEndDatabase { pool, vault_path: Arc::new(dirpath.as_ref().canonicalize()?), db_path: Arc::new(upend_path), }; let connection = db.connection().unwrap(); if !new { let db_major: u64 = connection.get_meta("VERSION")?.parse()?; if db_major > crate::common::PKG_VERSION_MAJOR.parse().unwrap() { return Err(anyhow!("Incompatible database! Found version ")); } } trace!("Running initial config."); let enable_wal_mode = true; connection.execute(if enable_wal_mode { debug!("Enabling WAL journal mode & truncating WAL log..."); "PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);" } else { debug!("Enabling TRUNCATE journal mode"); "PRAGMA journal_mode = TRUNCATE;" })?; trace!("Running migrations..."); embedded_migrations::run_with_output( &db.pool.get()?, &mut LoggerSink { ..Default::default() }, )?; trace!("Initializing types..."); connection.insert_entry(Entry::try_from(&*TYPE_INVARIANT)?)?; upend_insert_addr!(connection, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?; upend_insert_val!(connection, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_HAS_ATTR)?; upend_insert_val!(connection, TYPE_ADDR, LABEL_ATTR, "UpEnd Type")?; initialize_hier(&connection)?; Ok(OpenResult { db, new }) } pub fn connection(&self) -> Result { Ok(UpEndConnection { conn: self.pool.get()?, vault_path: self.vault_path.clone(), }) } } pub struct UpEndConnection { conn: PooledConnection>, vault_path: Arc, } impl UpEndConnection { pub fn execute>(&self, query: S) -> Result { self.conn.execute(query.as_ref()) } pub fn transaction(&self, f: F) -> Result where F: FnOnce() -> Result, E: From, { self.conn.transaction(f) } pub fn get_meta>(&self, key: S) -> Result { use crate::database::inner::schema::meta::dsl; let key = key.as_ref(); debug!("Querying META:{key}"); dsl::meta .filter(dsl::key.eq(key)) .load::(&self.conn)? .first() .ok_or(anyhow!(r#"No META "{key}" value found."#)) .map(|mv| mv.value.clone()) } pub fn insert_file(&self, file: models::NewFile) -> Result { use crate::database::inner::schema::files; debug!( "Inserting {} ({})...", &file.path, Address::Hash(Hash((&file.hash).clone())) ); diesel::insert_into(files::table) .values(&file) .execute(&self.conn)?; Ok(files::dsl::files .filter(files::dsl::valid.eq(true)) .filter(files::dsl::hash.eq(file.hash)) .count() .first::(&self.conn)? .try_into() .unwrap()) } pub fn retrieve_file(&self, obj_hash: &Hash) -> Result> { use crate::database::inner::schema::files::dsl::*; let matches = files .filter(valid.eq(true)) .filter(hash.eq(&obj_hash.0)) .load::(&self.conn)?; let matches = matches .into_iter() .map(|f| models::OutFile { id: f.id, hash: f.hash, path: self.vault_path.join(PathBuf::from(f.path)), valid: f.valid, added: f.added, size: f.size, mtime: f.mtime, }) .collect(); Ok(matches) } pub fn retrieve_all_files(&self) -> Result> { use crate::database::inner::schema::files::dsl::*; let matches = files.load::(&self.conn)?; Ok(matches) } pub fn file_update_mtime(&self, file_id: i32, m_time: Option) -> Result { use crate::database::inner::schema::files::dsl::*; debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); Ok(diesel::update(files.filter(id.eq(file_id))) .set(mtime.eq(m_time)) .execute(&self.conn)?) } pub fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result { use crate::database::inner::schema::files::dsl::*; debug!("Setting file ID {} to valid = {}", file_id, is_valid); Ok(diesel::update(files.filter(id.eq(file_id))) .set(valid.eq(is_valid)) .execute(&self.conn)?) } pub fn normalize_path(&self, path: &Path) -> Result { Ok(path .canonicalize()? .strip_prefix(self.vault_path.as_path())? .to_path_buf()) } pub fn retrieve_entry(&self, hash: &Hash) -> Result> { use crate::database::inner::schema::data::dsl::*; let entry = data .filter(identity.eq(Address::Hash(hash.clone()).encode()?)) .load::(&self.conn)?; match entry.len() { 0 => Ok(None), 1 => Ok(Some(Entry::try_from(entry.get(0).unwrap())?)), _ => { unreachable!( "Multiple entries returned with the same hash - this should be impossible!" ) } } } pub fn retrieve_object(&self, object_address: &Address) -> Result> { use crate::database::inner::schema::data::dsl::*; let primary = data .filter(entity.eq(object_address.encode()?)) .or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string()?)) .load::(&self.conn)?; let entries = primary .iter() .map(Entry::try_from) .collect::>>()?; let secondary = data .filter( entity.eq_any( entries .iter() .map(|e| e.address()) .filter_map(Result::ok) .map(|addr| addr.encode()) .collect::>>>()?, ), ) .load::(&self.conn)?; let secondary_entries = secondary .iter() .map(Entry::try_from) .collect::>>()?; Ok([entries, secondary_entries].concat()) } pub fn remove_object(&self, object_address: Address) -> Result { use crate::database::inner::schema::data::dsl::*; debug!("Deleting {}!", object_address); let matches = data .filter(identity.eq(object_address.encode()?)) .or_filter(entity.eq(object_address.encode()?)) .or_filter(value_str.eq(EntryValue::Address(object_address).to_string()?)); Ok(diesel::delete(matches).execute(&self.conn)?) } pub fn query(&self, query: Query) -> Result> { trace!("Querying: {:?}", query); let entries = execute(&self.conn, query)?; let entries = entries .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn insert_entry(&self, entry: Entry) -> Result
{ debug!("Inserting: {}", entry); let db_entry = models::Entry::try_from(&entry)?; self.insert_model_entry(db_entry)?; entry.address() } pub fn insert_entry_immutable(&self, entry: Entry) -> Result
{ debug!("Inserting immutably: {}", entry); let address = entry.address()?; let db_entry = models::Entry::try_from(&ImmutableEntry(entry))?; self.insert_model_entry(db_entry)?; Ok(address) } fn insert_model_entry(&self, entry: models::Entry) -> Result { let result = diesel::insert_into(data::table) .values(&entry) .execute(&self.conn); match result { Ok(num) => Ok(num), Err(error) => match error { Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => Ok(0), _ => Err(anyhow!(error)), }, } } // #[deprecated] pub fn get_all_addresses(&self) -> Result> { use crate::database::inner::schema::data::dsl::*; let result = data .select(entity) .distinct() .load::>(&self.conn)? .into_iter() .filter_map(|buf| Address::decode(&buf).ok()) .collect(); Ok(result) } // #[deprecated] pub fn get_all_attributes(&self) -> Result> { use crate::database::inner::schema::data::dsl::*; let result = data .select(attribute) .distinct() .order_by(attribute) .load::(&self.conn)?; Ok(result) } } #[cfg(test)] mod test { use super::*; use tempfile::TempDir; #[test] fn test_open() { let tempdir = TempDir::new().unwrap(); let result = UpEndDatabase::open(&tempdir, None, false); assert!(result.is_ok()); assert!(result.unwrap().new); // Not new let result = UpEndDatabase::open(&tempdir, None, false); assert!(result.is_ok()); assert!(!result.unwrap().new); // reinitialize true, new again let result = UpEndDatabase::open(&tempdir, None, true); assert!(result.is_ok()); assert!(result.unwrap().new); } #[test] fn test_query() { let tempdir = TempDir::new().unwrap(); let result = UpEndDatabase::open(&tempdir, None, false).unwrap(); let db = result.db; let connection = db.connection().unwrap(); let random_entity = Address::Uuid(uuid::Uuid::new_v4()); upend_insert_val!(connection, random_entity, LABEL_ATTR, "FOOBAR").unwrap(); upend_insert_val!(connection, random_entity, "FLAVOUR", "STRANGE").unwrap(); let query = format!(r#"(matches @{random_entity} ? ?)"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 2); let other_entity = Address::Uuid(uuid::Uuid::new_v4()); upend_insert_val!(connection, random_entity, LABEL_ATTR, "BAZQUX").unwrap(); upend_insert_val!(connection, random_entity, "CHARGE", "POSITIVE").unwrap(); let query = format!(r#"(matches (in @{random_entity} @{other_entity}) ? ?)"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 4); let query = r#"(matches ? (in "FLAVOUR" "CHARGE") ?)"#.parse().unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 2); let query = format!(r#"(matches ? "{LABEL_ATTR}" (in "FOOBAR" "BAZQUX"))"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 2); let query = format!(r#"(matches ? "{LABEL_ATTR}" (contains "OOBA"))"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 1); let query = r#"(or (matches ? ? (contains "OOBA")) (matches ? (contains "HARGE") ?) )"# .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 2); let query = format!(r#"(and (matches ? ? (contains "OOBA")) (matches ? "{LABEL_ATTR}" ?) )"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 1); let query = format!( r#"(and (or (matches ? ? (contains "OOBA")) (matches ? (contains "HARGE") ?) ) (not (matches ? "{LABEL_ATTR}" ?)) )"# ) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 1); } }