#![macro_use] #[macro_use] mod macros; pub mod constants; pub mod entry; pub mod inner; pub mod lang; use crate::addressing::Address; use crate::database::constants::{ IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_ID_ATTR, TYPE_INVARIANT, TYPE_IS_ATTR, }; use crate::database::entry::{Entry, EntryValue}; use crate::database::inner::models; use crate::database::inner::schema::data; use crate::database::lang::Query; use crate::util::hash::{Hash, Hashable}; use crate::util::LoggerSink; use anyhow::{anyhow, Result}; use diesel::debug_query; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager}; use diesel::result::{DatabaseErrorKind, Error}; use diesel::sqlite::{Sqlite, SqliteConnection}; use log::{debug, trace}; use std::convert::TryFrom; use std::fs; use std::path::{Path, PathBuf}; use std::time::Duration; pub fn insert_file>( connection: &C, file: models::NewFile, ) -> Result { use crate::database::inner::schema::files; debug!( "Inserting {} ({})...", &file.path, Address::Hash(Hash((&file.hash).clone())) ); Ok(diesel::insert_into(files::table) .values(file) .execute(connection)?) } pub fn retrieve_file>( connection: &C, obj_hash: Hash, ) -> Result> { use crate::database::inner::schema::files::dsl::*; let matches = files .filter(valid.eq(true)) .filter(hash.eq(obj_hash.0)) .load::(connection)?; Ok(matches) } pub fn retrieve_all_files>( connection: &C, ) -> Result> { use crate::database::inner::schema::files::dsl::*; let matches = files.load::(connection)?; Ok(matches) } pub fn get_latest_files>( connection: &C, count: i64, ) -> Result> { use crate::database::inner::schema::files::dsl::*; let matches = files .order_by(added.desc()) .limit(count) .load::(connection)?; Ok(matches) } pub fn file_set_valid>( connection: &C, file_id: i32, is_valid: bool, ) -> Result { use crate::database::inner::schema::files::dsl::*; debug!("Setting file ID {} to valid = {}", file_id, is_valid); Ok(diesel::update(files.filter(id.eq(file_id))) .set(valid.eq(is_valid)) .execute(connection)?) } pub fn retrieve_object>( connection: &C, object_address: Address, ) -> Result> { use crate::database::inner::schema::data::dsl::*; let matches = data .filter(entity.eq(object_address.encode()?)) .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)) .load::(connection)?; let entries = matches .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn bulk_retrieve_objects>( connection: &C, object_addresses: Vec
, ) -> Result> { use crate::database::inner::schema::data::dsl::*; let matches = data .filter( entity.eq_any( object_addresses .iter() .filter_map(|addr| addr.encode().ok()), ), ) // .or_filter(value.eq(EntryValue::Address(object_address).to_str()?)) .load::(connection)?; let entries = matches .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn remove_object>( connection: &C, object_address: Address, ) -> Result { use crate::database::inner::schema::data::dsl::*; debug!("Deleting {}!", object_address); let matches = data .filter(identity.eq(object_address.encode()?)) .or_filter(entity.eq(object_address.encode()?)) .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)); Ok(diesel::delete(matches).execute(connection)?) } pub fn query>(connection: &C, query: Query) -> Result> { use crate::database::inner::schema::data::dsl::*; trace!("Querying: {:?}", query); let db_query = data.filter(query.to_sqlite_predicates()?); trace!("DB query: {}", debug_query(&db_query)); let matches = db_query.load::(connection)?; let entries = matches .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn insert_entry>( connection: &C, entry: Entry, ) -> Result
{ debug!("Inserting: {}", entry); let insert_entry = models::Entry::try_from(&entry)?; let entry = Entry::try_from(&insert_entry)?; let result = diesel::insert_into(data::table) .values(insert_entry) .execute(connection); if let Some(error) = result.err() { match error { Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {} _ => return Err(anyhow!(error)), } } Ok(Address::Hash(entry.hash()?)) } #[derive(Debug)] pub struct ConnectionOptions { pub enable_foreign_keys: bool, pub busy_timeout: Option, } impl ConnectionOptions { pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { if self.enable_foreign_keys { conn.execute("PRAGMA foreign_keys = ON;")?; } if let Some(duration) = self.busy_timeout { conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; } Ok(()) } } impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { self.apply(conn).map_err(diesel::r2d2::Error::QueryError) } } pub type DbPool = r2d2::Pool>; pub struct OpenResult { pub pool: DbPool, pub new: bool, } pub const DATABASE_FILENAME: &str = "upend.sqlite3"; pub fn open_upend>( dirpath: P, db_path: Option, reinitialize: bool, ) -> Result { embed_migrations!("./migrations/upend/"); let database_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(DATABASE_FILENAME)); if reinitialize { let _ = fs::remove_file(&database_path); } let new = !database_path.exists(); let manager = ConnectionManager::::new(database_path.to_str().unwrap()); let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { enable_foreign_keys: true, busy_timeout: Some(Duration::from_secs(30)), })) .build(manager)?; trace!("Pool created, running migrations..."); embedded_migrations::run_with_output( &pool.get()?, &mut LoggerSink { ..Default::default() }, )?; trace!("Initializing types..."); initialize_types(&pool)?; Ok(OpenResult { pool, new }) } fn initialize_types(pool: &DbPool) -> Result<()> { insert_entry(&pool.get()?, Entry::try_from(&*TYPE_INVARIANT)?)?; upend_insert_addr!(&pool.get()?, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR); upend_insert_val!(&pool.get()?, TYPE_ADDR, TYPE_ID_ATTR, TYPE_IS_ATTR); Ok(()) }