upend/src/database/mod.rs

449 lines
13 KiB
Rust
Raw Normal View History

#![macro_use]
#[macro_use]
mod macros;
pub mod constants;
pub mod entry;
pub mod hierarchies;
pub mod inner;
pub mod lang;
use crate::addressing::{Address, Addressable};
use crate::database::constants::{
IS_OF_TYPE_ATTR, LABEL_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_INVARIANT,
};
use crate::database::entry::{Entry, EntryValue, ImmutableEntry};
use crate::database::inner::models;
use crate::database::inner::schema::data;
use crate::database::lang::Query;
use crate::util::hash::Hash;
use crate::util::LoggerSink;
use anyhow::{anyhow, Result};
use chrono::NaiveDateTime;
use diesel::debug_query;
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager, PooledConnection};
use diesel::result::{DatabaseErrorKind, Error};
use diesel::sqlite::SqliteConnection;
use hierarchies::initialize_hier;
use log::{debug, trace};
2022-02-03 10:12:26 +01:00
use std::convert::{TryFrom, TryInto};
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug)]
pub struct ConnectionOptions {
pub enable_foreign_keys: bool,
pub busy_timeout: Option<Duration>,
}
impl ConnectionOptions {
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
if self.enable_foreign_keys {
trace!("Enabling foreign keys");
conn.execute("PRAGMA foreign_keys = ON;")?;
}
if let Some(duration) = self.busy_timeout {
trace!("Setting busy_timeout to {:?}", duration);
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
}
trace!("Setting \"synchronous\" to NORMAL");
conn.execute("PRAGMA synchronous = NORMAL;")?;
Ok(())
}
}
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
for ConnectionOptions
{
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
}
}
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
pub struct OpenResult {
pub db: UpEndDatabase,
pub new: bool,
}
pub struct UpEndDatabase {
pool: DbPool,
pub vault_path: Arc<PathBuf>,
2022-01-04 21:58:23 +01:00
pub db_path: Arc<PathBuf>,
}
pub const UPEND_SUBDIR: &str = ".upend";
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
impl UpEndDatabase {
pub fn open<P: AsRef<Path>>(
dirpath: P,
db_path: Option<PathBuf>,
reinitialize: bool,
) -> Result<OpenResult> {
embed_migrations!("./migrations/upend/");
let upend_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(UPEND_SUBDIR));
if reinitialize {
trace!("Reinitializing - removing previous database...");
let _ = fs::remove_dir_all(&upend_path);
}
let new = !upend_path.exists();
if new {
trace!("Creating UpEnd subdirectory...");
fs::create_dir(&upend_path)?;
}
trace!("Creating pool.");
let manager = ConnectionManager::<SqliteConnection>::new(
upend_path.join(DATABASE_FILENAME).to_str().unwrap(),
);
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {
enable_foreign_keys: true,
busy_timeout: Some(Duration::from_secs(30)),
}))
.build(manager)?;
trace!("Pool created.");
let db = UpEndDatabase {
pool,
vault_path: Arc::new(dirpath.as_ref().canonicalize()?),
2022-01-04 21:58:23 +01:00
db_path: Arc::new(upend_path),
};
let connection = db.connection().unwrap();
if !new {
let db_major: u64 = connection.get_meta("VERSION")?.parse()?;
if db_major > crate::common::PKG_VERSION_MAJOR.parse().unwrap() {
return Err(anyhow!("Incompatible database! Found version "));
}
}
trace!("Running initial config.");
let enable_wal_mode = true;
connection.execute(if enable_wal_mode {
trace!("Enabling WAL journal mode & truncating WAL log...");
"PRAGMA journal_mode = WAL;PRAGMA wal_checkpoint(TRUNCATE);"
} else {
trace!("Enabling TRUNCATE journal mode");
"PRAGMA journal_mode = TRUNCATE;"
})?;
trace!("Running migrations...");
embedded_migrations::run_with_output(
&db.pool.get()?,
&mut LoggerSink {
..Default::default()
},
)?;
trace!("Initializing types...");
connection.insert_entry(Entry::try_from(&*TYPE_INVARIANT)?)?;
2022-01-18 16:59:32 +01:00
upend_insert_addr!(connection, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR)?;
upend_insert_val!(connection, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_HAS_ATTR)?;
2022-01-27 17:42:59 +01:00
upend_insert_val!(connection, TYPE_ADDR, LABEL_ATTR, "UpEnd Type")?;
initialize_hier(&connection)?;
Ok(OpenResult { db, new })
}
pub fn connection(&self) -> Result<UpEndConnection> {
Ok(UpEndConnection {
conn: self.pool.get()?,
vault_path: self.vault_path.clone(),
})
}
}
pub struct UpEndConnection {
conn: PooledConnection<ConnectionManager<SqliteConnection>>,
vault_path: Arc<PathBuf>,
}
impl UpEndConnection {
pub fn execute<S: AsRef<str>>(&self, query: S) -> Result<usize, diesel::result::Error> {
self.conn.execute(query.as_ref())
}
pub fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
where
F: FnOnce() -> Result<T, E>,
E: From<Error>,
{
self.conn.transaction(f)
}
pub fn get_meta<S: AsRef<str>>(&self, key: S) -> Result<String> {
use crate::database::inner::schema::meta::dsl;
let key = key.as_ref();
debug!("Querying META:{key}");
dsl::meta
.filter(dsl::key.eq(key))
.load::<models::MetaValue>(&self.conn)?
.first()
.ok_or(anyhow!("No META \"{key}\" value found."))
.map(|mv| mv.value.clone())
}
2022-02-03 10:12:26 +01:00
pub fn insert_file(&self, file: models::NewFile) -> Result<u32> {
use crate::database::inner::schema::files;
debug!(
"Inserting {} ({})...",
&file.path,
Address::Hash(Hash((&file.hash).clone()))
);
2022-02-03 10:12:26 +01:00
diesel::insert_into(files::table)
.values(&file)
.execute(&self.conn)?;
Ok(files::dsl::files
.filter(files::dsl::valid.eq(true))
.filter(files::dsl::hash.eq(file.hash))
.count()
.first::<i64>(&self.conn)?
.try_into()
.unwrap())
}
2022-02-04 20:33:07 +01:00
pub fn retrieve_file(&self, obj_hash: &Hash) -> Result<Vec<models::OutFile>> {
use crate::database::inner::schema::files::dsl::*;
let matches = files
.filter(valid.eq(true))
2022-02-04 20:33:07 +01:00
.filter(hash.eq(&obj_hash.0))
.load::<models::File>(&self.conn)?;
let matches = matches
.into_iter()
.map(|f| models::OutFile {
id: f.id,
hash: f.hash,
path: self.vault_path.join(PathBuf::from(f.path)),
valid: f.valid,
added: f.added,
size: f.size,
mtime: f.mtime,
})
.collect();
Ok(matches)
}
pub fn retrieve_all_files(&self) -> Result<Vec<models::File>> {
use crate::database::inner::schema::files::dsl::*;
let matches = files.load::<models::File>(&self.conn)?;
Ok(matches)
}
pub fn file_update_mtime(&self, file_id: i32, m_time: Option<NaiveDateTime>) -> Result<usize> {
use crate::database::inner::schema::files::dsl::*;
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(mtime.eq(m_time))
.execute(&self.conn)?)
}
pub fn file_set_valid(&self, file_id: i32, is_valid: bool) -> Result<usize> {
use crate::database::inner::schema::files::dsl::*;
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
Ok(diesel::update(files.filter(id.eq(file_id)))
.set(valid.eq(is_valid))
.execute(&self.conn)?)
}
2021-12-04 21:26:02 +01:00
2022-01-23 14:50:37 +01:00
pub fn normalize_path(&self, path: &Path) -> Result<PathBuf> {
Ok(path
.canonicalize()?
.strip_prefix(self.vault_path.as_path())?
.to_path_buf())
2022-01-23 14:50:37 +01:00
}
pub fn retrieve_entry(&self, hash: &Hash) -> Result<Option<Entry>> {
2022-01-14 22:04:53 +01:00
use crate::database::inner::schema::data::dsl::*;
let entry = data
.filter(identity.eq(Address::Hash(hash.clone()).encode()?))
2022-01-14 22:04:53 +01:00
.load::<models::Entry>(&self.conn)?;
match entry.len() {
0 => Ok(None),
1 => Ok(Some(Entry::try_from(entry.get(0).unwrap())?)),
_ => {
2022-01-21 17:03:50 +01:00
unreachable!(
"Multiple entries returned with the same hash - this should be impossible!"
)
2022-01-14 22:04:53 +01:00
}
}
}
2022-02-07 20:46:17 +01:00
pub fn retrieve_object(&self, object_address: &Address) -> Result<Vec<Entry>> {
use crate::database::inner::schema::data::dsl::*;
let primary = data
.filter(entity.eq(object_address.encode()?))
2022-02-07 20:46:17 +01:00
.or_filter(value_str.eq(EntryValue::Address(object_address.clone()).to_string()?))
.load::<models::Entry>(&self.conn)?;
let entries = primary
.iter()
.map(Entry::try_from)
.collect::<Result<Vec<Entry>>>()?;
let secondary = data
.filter(
entity.eq_any(
entries
.iter()
.map(|e| e.address())
.filter_map(Result::ok)
.map(|addr| addr.encode())
.collect::<Result<Vec<Vec<u8>>>>()?,
),
)
.load::<models::Entry>(&self.conn)?;
let secondary_entries = secondary
.iter()
.map(Entry::try_from)
.collect::<Result<Vec<Entry>>>()?;
Ok([entries, secondary_entries].concat())
}
pub fn remove_object(&self, object_address: Address) -> Result<usize> {
use crate::database::inner::schema::data::dsl::*;
debug!("Deleting {}!", object_address);
let matches = data
.filter(identity.eq(object_address.encode()?))
.or_filter(entity.eq(object_address.encode()?))
.or_filter(value_str.eq(EntryValue::Address(object_address).to_string()?));
Ok(diesel::delete(matches).execute(&self.conn)?)
}
pub fn query(&self, query: Query) -> Result<Vec<Entry>> {
use crate::database::inner::schema::data::dsl::*;
trace!("Querying: {:?}", query);
let db_query = data.filter(query.to_sqlite_predicates()?);
trace!("DB query: {}", debug_query(&db_query));
let matches = db_query.load::<models::Entry>(&self.conn)?;
let entries = matches
.iter()
.map(Entry::try_from)
.filter_map(Result::ok)
.collect();
Ok(entries)
}
pub fn insert_entry(&self, entry: Entry) -> Result<Address> {
debug!("Inserting: {}", entry);
2022-01-22 17:45:46 +01:00
let db_entry = models::Entry::try_from(&entry)?;
self.insert_model_entry(db_entry)?;
2022-01-22 17:45:46 +01:00
entry.address()
}
pub fn insert_entry_immutable(&self, entry: Entry) -> Result<Address> {
debug!("Inserting immutably: {}", entry);
2022-01-22 17:45:46 +01:00
let address = entry.address()?;
let db_entry = models::Entry::try_from(&ImmutableEntry(entry))?;
self.insert_model_entry(db_entry)?;
2022-01-22 17:45:46 +01:00
Ok(address)
}
fn insert_model_entry(&self, entry: models::Entry) -> Result<usize> {
let result = diesel::insert_into(data::table)
.values(&entry)
.execute(&self.conn);
2022-01-22 17:45:46 +01:00
match result {
Ok(num) => Ok(num),
Err(error) => match error {
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => Ok(0),
_ => Err(anyhow!(error)),
},
}
}
2022-01-04 21:58:23 +01:00
2022-03-02 01:14:46 +01:00
// #[deprecated]
pub fn get_all_addresses(&self) -> Result<Vec<Address>> {
use crate::database::inner::schema::data::dsl::*;
let result = data
.select(entity)
.distinct()
.load::<Vec<u8>>(&self.conn)?
.into_iter()
.filter_map(|buf| Address::decode(&buf).ok())
.collect();
Ok(result)
}
2022-01-04 21:58:23 +01:00
// #[deprecated]
pub fn get_all_attributes(&self) -> Result<Vec<String>> {
use crate::database::inner::schema::data::dsl::*;
let result = data
.select(attribute)
.distinct()
.order_by(attribute)
.load::<String>(&self.conn)?;
Ok(result)
}
}
#[cfg(test)]
mod test {
use super::*;
use tempfile::TempDir;
#[test]
fn test_open() {
let tempdir = TempDir::new().unwrap();
2021-12-23 11:11:36 +01:00
let result = UpEndDatabase::open(&tempdir, None, false);
assert!(result.is_ok());
assert!(result.unwrap().new);
// Not new
let result = UpEndDatabase::open(&tempdir, None, false);
assert!(result.is_ok());
assert!(!result.unwrap().new);
// reinitialize true, new again
let result = UpEndDatabase::open(&tempdir, None, true);
assert!(result.is_ok());
2021-12-23 11:11:36 +01:00
assert!(result.unwrap().new);
}
}