#![macro_use] #[macro_use] mod macros; pub mod constants; pub mod entry; pub mod hierarchies; pub mod inner; pub mod lang; use crate::addressing::{Address, Addressable}; use crate::database::constants::{IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_INVARIANT}; use crate::database::entry::{Entry, EntryValue}; use crate::database::inner::models; use crate::database::inner::schema::data; use crate::database::lang::Query; use crate::util::hash::{Hash, Hashable}; use crate::util::LoggerSink; use anyhow::{anyhow, Result}; use chrono::NaiveDateTime; use diesel::debug_query; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager, PooledConnection}; use diesel::result::{DatabaseErrorKind, Error}; use diesel::sqlite::SqliteConnection; use hierarchies::initialize_hier; use log::{debug, trace}; use std::convert::TryFrom; use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; #[derive(Debug)] pub struct ConnectionOptions { pub enable_foreign_keys: bool, pub busy_timeout: Option, } impl ConnectionOptions { pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { if self.enable_foreign_keys { trace!("Enabling foreign keys"); conn.execute("PRAGMA foreign_keys = ON;")?; } if let Some(duration) = self.busy_timeout { trace!("Setting busy_timeout to {:?}", duration); conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; } trace!("Setting \"synchronous\" to NORMAL"); conn.execute("PRAGMA synchronous = NORMAL;")?; Ok(()) } } impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { self.apply(conn).map_err(diesel::r2d2::Error::QueryError) } } type DbPool = r2d2::Pool>; pub struct OpenResult { pub db: UpEndDatabase, pub new: bool, } pub struct UpEndDatabase { pool: DbPool, pub vault_path: Arc, pub db_path: Arc } pub const UPEND_SUBDIR: &str = ".upend"; pub const DATABASE_FILENAME: &str = "upend.sqlite3"; impl UpEndDatabase { pub fn open>( dirpath: P, db_path: Option, reinitialize: bool, ) -> Result { embed_migrations!("./migrations/upend/"); let upend_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(UPEND_SUBDIR)); if reinitialize { trace!("Reinitializing - removing previous database..."); let _ = fs::remove_dir_all(&upend_path); } let new = !upend_path.exists(); if new { trace!("Creating UpEnd subdirectory..."); fs::create_dir(&upend_path)?; } trace!("Creating pool."); let manager = ConnectionManager::::new( upend_path.join(DATABASE_FILENAME).to_str().unwrap(), ); let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { enable_foreign_keys: true, busy_timeout: Some(Duration::from_secs(30)), })) .build(manager)?; let db = UpEndDatabase { pool, vault_path: Arc::new(PathBuf::from(dirpath.as_ref())), db_path: Arc::new(upend_path) }; let connection = db.connection().unwrap(); let enable_wal_mode = true; connection.execute(if enable_wal_mode { trace!("Enabling WAL journal mode & truncating WAL log..."); "PRAGMA journal_mode = WAL;PRAGMA wal_checkpoint(TRUNCATE);" } else { trace!("Enabling TRUNCATE journal mode"); "PRAGMA journal_mode = TRUNCATE;" })?; trace!("Pool created, running migrations..."); embedded_migrations::run_with_output( &db.pool.get()?, &mut LoggerSink { ..Default::default() }, )?; trace!("Initializing types..."); connection.insert_entry(Entry::try_from(&*TYPE_INVARIANT)?)?; upend_insert_addr!(connection, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR); upend_insert_val!(connection, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_HAS_ATTR); initialize_hier(&connection)?; Ok(OpenResult { db, new }) } pub fn connection(&self) -> Result { Ok(UpEndConnection { conn: self.pool.get()?, vault_path: self.vault_path.clone(), }) } } pub struct UpEndConnection { conn: PooledConnection>, vault_path: Arc, } impl UpEndConnection { pub fn execute>(&self, query: S) -> Result { self.conn.execute(query.as_ref()) } pub fn transaction(&self, f: F) -> Result where F: FnOnce() -> Result, E: From, { self.conn.transaction(f) } pub fn insert_file(&self, file: models::NewFile) -> Result { use crate::database::inner::schema::files; debug!( "Inserting {} ({})...", &file.path, Address::Hash(Hash((&file.hash).clone())) ); Ok(diesel::insert_into(files::table) .values(file) .execute(&self.conn)?) } pub fn retrieve_file(&self, obj_hash: Hash) -> Result> { use crate::database::inner::schema::files::dsl::*; let matches = files .filter(valid.eq(true)) .filter(hash.eq(obj_hash.0)) .load::(&self.conn)?; let matches = matches .into_iter() .map(|f| models::OutFile { id: f.id, hash: f.hash, path: self.vault_path.join(PathBuf::from(f.path)), valid: f.valid, added: f.added, size: f.size, mtime: f.mtime, }) .collect(); Ok(matches) } pub fn retrieve_all_files(&self) -> Result> { use crate::database::inner::schema::files::dsl::*; let matches = files.load::(&self.conn)?; Ok(matches) } pub fn get_latest_files(&self, count: i64) -> Result> { use crate::database::inner::schema::files::dsl::*; let matches = files .order_by(added.desc()) .limit(count) .load::(&self.conn)?; Ok(matches) } pub fn file_update_mtime(&self, file_id: i32, m_time: Option) -> Result { use crate::database::inner::schema::files::dsl::*; debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time); Ok(diesel::update(files.filter(id.eq(file_id))) .set(mtime.eq(m_time)) .execute(&self.conn)?) } pub fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result { use crate::database::inner::schema::files::dsl::*; debug!("Setting file ID {} to valid = {}", file_id, is_valid); Ok(diesel::update(files.filter(id.eq(file_id))) .set(valid.eq(is_valid)) .execute(&self.conn)?) } pub fn retrieve_object(&self, object_address: Address) -> Result> { use crate::database::inner::schema::data::dsl::*; let primary = data .filter(entity.eq(object_address.encode()?)) .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)) .load::(&self.conn)?; let entries = primary .iter() .map(Entry::try_from) .collect::>>()?; let secondary = data .filter( entity.eq_any( entries .iter() .map(|e| e.address()) .filter_map(Result::ok) .map(|addr| addr.encode()) .collect::>>>()?, ), ) .load::(&self.conn)?; let secondary_entries = secondary .iter() .map(Entry::try_from) .collect::>>()?; Ok([entries, secondary_entries].concat()) } pub fn remove_object(&self, object_address: Address) -> Result { use crate::database::inner::schema::data::dsl::*; debug!("Deleting {}!", object_address); let matches = data .filter(identity.eq(object_address.encode()?)) .or_filter(entity.eq(object_address.encode()?)) .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)); Ok(diesel::delete(matches).execute(&self.conn)?) } pub fn query(&self, query: Query) -> Result> { use crate::database::inner::schema::data::dsl::*; trace!("Querying: {:?}", query); let db_query = data.filter(query.to_sqlite_predicates()?); trace!("DB query: {}", debug_query(&db_query)); let matches = db_query.load::(&self.conn)?; let entries = matches .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn insert_entry(&self, entry: Entry) -> Result
{ debug!("Inserting: {}", entry); let insert_entry = models::Entry::try_from(&entry)?; let entry = Entry::try_from(&insert_entry)?; let result = diesel::insert_into(data::table) .values(insert_entry) .execute(&self.conn); if let Some(error) = result.err() { match error { Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {} _ => return Err(anyhow!(error)), } } Ok(Address::Hash(entry.hash()?)) } } #[cfg(test)] mod test { use super::*; use tempdir::TempDir; #[test] fn test_open() { let tempdir = TempDir::new("upend-test").unwrap(); let result = UpEndDatabase::open(&tempdir, None, false); assert!(result.is_ok()); assert!(result.unwrap().new); // Not new let result = UpEndDatabase::open(&tempdir, None, false); assert!(result.is_ok()); assert!(!result.unwrap().new); // reinitialize true, new again let result = UpEndDatabase::open(&tempdir, None, true); assert!(result.is_ok()); assert!(result.unwrap().new); } }