#[macro_use] extern crate diesel; #[macro_use] extern crate diesel_migrations; #[macro_use] extern crate lazy_static; #[macro_use] mod macros; pub mod common; pub mod engine; pub mod entry; pub mod hierarchies; pub mod jobs; pub mod stores; mod inner; mod util; use crate::common::build; use crate::engine::execute; use crate::inner::models; use crate::inner::schema::data; use crate::util::LoggerSink; use anyhow::{anyhow, Result}; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager}; use diesel::result::{DatabaseErrorKind, Error}; use diesel::sqlite::SqliteConnection; use hierarchies::initialize_hier; use shadow_rs::is_release; use std::convert::TryFrom; use std::fs; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use tracing::{debug, error, trace, warn}; use upend_base::addressing::{Address, Addressable}; use upend_base::entry::{Entry, EntryValue, ImmutableEntry}; use upend_base::error::UpEndError; use upend_base::hash::UpMultihash; use upend_base::lang::Query; #[derive(Debug)] pub struct ConnectionOptions { pub busy_timeout: Option, pub enable_wal_mode: bool, pub mutex: Arc>, } impl ConnectionOptions { pub fn apply(&self, connection: &SqliteConnection) -> QueryResult<()> { let _lock = self.mutex.lock().unwrap(); if let Some(duration) = self.busy_timeout { debug!("Setting busy_timeout to {:?}", duration); connection.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; } connection.execute(if self.enable_wal_mode { debug!("Enabling WAL journal mode & truncating WAL log..."); "PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);" } else { debug!("Enabling TRUNCATE journal mode"); "PRAGMA journal_mode = TRUNCATE;" })?; debug!(r#"Setting "synchronous" to NORMAL"#); connection.execute("PRAGMA synchronous = NORMAL;")?; Ok(()) } } impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { self.apply(conn).map_err(diesel::r2d2::Error::QueryError) } } type DbPool = r2d2::Pool>; #[derive(Debug)] pub struct LoggingHandler { pub name: &'static str, } impl diesel::r2d2::HandleError for LoggingHandler { fn handle_error(&self, error: diesel::r2d2::Error) { error!(name = self.name, "Database error: {}", error); if !is_release() { panic!("Database error! This should not happen! {}", error); } } } pub struct OpenResult { pub db: UpEndDatabase, pub new: bool, } pub struct UpEndDatabase { pub path: PathBuf, pool: Arc, lock: Arc>, } pub const UPEND_SUBDIR: &str = ".upend"; pub const DATABASE_FILENAME: &str = "upend.sqlite3"; impl UpEndDatabase { pub fn open>(dirpath: P, reinitialize: bool) -> Result { embed_migrations!("./migrations/upend/"); let upend_path = dirpath.as_ref().join(UPEND_SUBDIR); if reinitialize { warn!("Reinitializing - removing previous database..."); let _ = fs::remove_dir_all(&upend_path); } let new = !upend_path.exists(); if new { trace!("Creating UpEnd subdirectory..."); fs::create_dir(&upend_path)?; } trace!("Creating pool."); let manager = ConnectionManager::::new( upend_path.join(DATABASE_FILENAME).to_str().unwrap(), ); let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { busy_timeout: Some(Duration::from_secs(30)), enable_wal_mode: true, mutex: Arc::new(Mutex::new(())), })) .error_handler(Box::new(LoggingHandler { name: "main" })) .build(manager)?; trace!("Pool created."); let db = UpEndDatabase { path: upend_path, pool: Arc::new(pool), lock: Arc::new(RwLock::new(())), }; let connection = db.connection().unwrap(); if !new { let db_major: u64 = connection.get_meta("VERSION")?.parse()?; if db_major > build::PKG_VERSION_MAJOR.parse().unwrap() { return Err(anyhow!("Incompatible database! Found version ")); } } trace!("Running migrations..."); embedded_migrations::run_with_output( &db.pool.get()?, &mut LoggerSink { ..Default::default() }, )?; initialize_hier(&connection)?; Ok(OpenResult { db, new }) } pub fn connection(&self) -> Result { Ok(UpEndConnection { pool: self.pool.clone(), lock: self.lock.clone(), }) } } pub struct UpEndConnection { pool: Arc, lock: Arc>, } impl UpEndConnection { pub fn transaction(&self, f: F) -> Result where F: FnOnce() -> Result, E: From, { /* let span = span!(tracing::Level::TRACE, "transaction"); let _span = span.enter(); let _lock = self.transaction_lock.lock().unwrap(); self.conn.exclusive_transaction(f) */ // Disable transactions for now. f() } pub fn get_meta>(&self, key: S) -> Result { use crate::inner::schema::meta::dsl; let key = key.as_ref(); trace!("Querying META:{key}"); let _lock = self.lock.read().unwrap(); let conn = self.pool.get()?; dsl::meta .filter(dsl::key.eq(key)) .load::(&conn)? .first() .ok_or(anyhow!(r#"No META "{key}" value found."#)) .map(|mv| mv.value.clone()) } pub fn retrieve_entry(&self, hash: &UpMultihash) -> Result> { use crate::inner::schema::data::dsl::*; let _lock = self.lock.read().unwrap(); let conn = self.pool.get()?; let entry = data .filter(identity.eq(Address::Hash(hash.clone()).encode()?)) .load::(&conn)?; match entry.len() { 0 => Ok(None), 1 => Ok(Some(Entry::try_from(entry.get(0).unwrap())?)), _ => { unreachable!( "Multiple entries returned with the same hash - this should be impossible!" ) } } } pub fn retrieve_object(&self, object_address: &Address) -> Result> { use crate::inner::schema::data::dsl::*; let _lock = self.lock.read().unwrap(); let conn = self.pool.get()?; let primary = data .filter(entity.eq(object_address.encode()?)) .or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string())) .load::(&conn)?; let entries = primary .iter() .map(Entry::try_from) .collect::>>()?; let secondary = data .filter( entity.eq_any( entries .iter() .map(|e| e.address()) .filter_map(Result::ok) .map(|addr| addr.encode()) .collect::>, UpEndError>>()?, ), ) .load::(&conn)?; let secondary_entries = secondary .iter() .map(Entry::try_from) .collect::>>()?; Ok([entries, secondary_entries].concat()) } pub fn remove_object(&self, object_address: Address) -> Result { use crate::inner::schema::data::dsl::*; trace!("Deleting {}!", object_address); let _lock = self.lock.write().unwrap(); let conn = self.pool.get()?; let matches = data .filter(identity.eq(object_address.encode()?)) .or_filter(entity.eq(object_address.encode()?)) .or_filter(value_str.eq(EntryValue::Address(object_address).to_string())); Ok(diesel::delete(matches).execute(&conn)?) } pub fn query(&self, query: Query) -> Result> { trace!("Querying: {:?}", query); let _lock = self.lock.read().unwrap(); let conn = self.pool.get()?; let entries = execute(&conn, query)?; let entries = entries .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn insert_entry(&self, entry: Entry) -> Result
{ trace!("Inserting: {}", entry); let db_entry = models::Entry::try_from(&entry)?; self.insert_model_entry(db_entry)?; Ok(entry.address()?) } pub fn insert_entry_immutable(&self, entry: Entry) -> Result
{ trace!("Inserting immutably: {}", entry); let address = entry.address()?; let db_entry = models::Entry::try_from(&ImmutableEntry(entry))?; self.insert_model_entry(db_entry)?; Ok(address) } fn insert_model_entry(&self, entry: models::Entry) -> Result { let _lock = self.lock.write().unwrap(); let conn = self.pool.get()?; let result = diesel::insert_into(data::table) .values(&entry) .execute(&conn); match result { Ok(num) => Ok(num), Err(error) => match error { Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => Ok(0), _ => Err(anyhow!(error)), }, } } // #[deprecated] pub fn get_all_addresses(&self) -> Result> { use crate::inner::schema::data::dsl::*; let _lock = self.lock.read().unwrap(); let conn = self.pool.get()?; let result = data .select(entity) .distinct() .load::>(&conn)? .into_iter() .filter_map(|buf| Address::decode(&buf).ok()) .collect(); Ok(result) } #[deprecated] pub fn get_all_attributes(&self) -> Result> { use crate::inner::schema::data::dsl::*; let _lock = self.lock.read().unwrap(); let conn = self.pool.get()?; let result = data .select(attribute) .distinct() .order_by(attribute) .load::(&conn)?; Ok(result) } pub fn get_stats(&self) -> Result { use crate::inner::schema::data::dsl::*; let _lock = self.lock.read().unwrap(); let conn = self.pool.get()?; let total_entry_count = data.count().load::(&conn)?; let total_entry_count = total_entry_count .first() .ok_or(anyhow!("Couldn't get entry count"))?; let api_entry_count = data .filter(provenance.like("API%")) .count() .load::(&conn)?; let api_entry_count = api_entry_count .first() .ok_or(anyhow!("Couldn't get API entry count"))?; let implicit_entry_count = data .filter(provenance.like("%IMPLICIT%")) .count() .load::(&conn)?; let implicit_entry_count = implicit_entry_count .first() .ok_or(anyhow!("Couldn't get API entry count"))?; Ok(serde_json::json!({ "entryCount": { "total": total_entry_count, "api": api_entry_count, "explicit": api_entry_count - implicit_entry_count } })) } #[deprecated] pub fn get_explicit_entries(&self) -> Result> { use crate::inner::schema::data::dsl::*; let _lock = self.lock.read().unwrap(); let conn = self.pool.get()?; let result: Vec = data .filter( provenance .like("API%") .and(provenance.not_like("%IMPLICIT%")), ) .load(&conn)?; Ok(result .iter() .map(Entry::try_from) .collect::>>()?) } } #[cfg(test)] mod test { use upend_base::constants::ATTR_LABEL; use super::*; use tempfile::TempDir; #[test] fn test_open() { let tempdir = TempDir::new().unwrap(); let result = UpEndDatabase::open(&tempdir, false); let result = result.unwrap(); assert!(result.new); // Not new let result = UpEndDatabase::open(&tempdir, false); let result = result.unwrap(); assert!(!result.new); // reinitialize true, new again let result = UpEndDatabase::open(&tempdir, true); let result = result.unwrap(); assert!(result.new); } #[test] fn test_query() { let tempdir = TempDir::new().unwrap(); let result = UpEndDatabase::open(&tempdir, false).unwrap(); let db = result.db; let connection = db.connection().unwrap(); let random_entity = Address::Uuid(uuid::Uuid::new_v4()); upend_insert_val!(connection, random_entity, ATTR_LABEL, "FOOBAR").unwrap(); upend_insert_val!(connection, random_entity, "FLAVOUR", "STRANGE").unwrap(); let query = format!(r#"(matches @{random_entity} ? ?)"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 2); let other_entity = Address::Uuid(uuid::Uuid::new_v4()); upend_insert_val!(connection, random_entity, ATTR_LABEL, "BAZQUX").unwrap(); upend_insert_val!(connection, random_entity, "CHARGE", "POSITIVE").unwrap(); let query = format!(r#"(matches (in @{random_entity} @{other_entity}) ? ?)"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 4); let query = r#"(matches ? (in "FLAVOUR" "CHARGE") ?)"#.parse().unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 2); let query = format!(r#"(matches ? "{ATTR_LABEL}" (in "FOOBAR" "BAZQUX"))"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 2); let query = format!(r#"(matches ? "{ATTR_LABEL}" (contains "OOBA"))"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 1); let query = r#"(or (matches ? ? (contains "OOBA")) (matches ? (contains "HARGE") ?) )"# .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 2); let query = format!(r#"(and (matches ? ? (contains "OOBA")) (matches ? "{ATTR_LABEL}" ?) )"#) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 1); let query = format!( r#"(and (or (matches ? ? (contains "OOBA")) (matches ? (contains "HARGE") ?) ) (not (matches ? "{ATTR_LABEL}" ?)) )"# ) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 1); let query = format!( r#"(join (matches ?a "FLAVOUR" ?) (matches ?a "{ATTR_LABEL}" "FOOBAR") )"# ) .parse() .unwrap(); let result = connection.query(query).unwrap(); assert_eq!(result.len(), 1); assert_eq!(result[0].value, "STRANGE".into()); } }