336 lines
8.9 KiB
Rust
336 lines
8.9 KiB
Rust
#![macro_use]
|
|
|
|
#[macro_use]
|
|
mod macros;
|
|
|
|
pub mod constants;
|
|
pub mod entry;
|
|
pub mod hierarchies;
|
|
pub mod inner;
|
|
pub mod lang;
|
|
|
|
use crate::addressing::Address;
|
|
use crate::database::constants::{IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_INVARIANT};
|
|
use crate::database::entry::{Entry, EntryValue};
|
|
use crate::database::inner::models;
|
|
use crate::database::inner::schema::data;
|
|
use crate::database::lang::Query;
|
|
use crate::util::hash::{Hash, Hashable};
|
|
use crate::util::LoggerSink;
|
|
use anyhow::{anyhow, Result};
|
|
use chrono::NaiveDateTime;
|
|
use diesel::debug_query;
|
|
use diesel::prelude::*;
|
|
use diesel::r2d2::{self, ConnectionManager};
|
|
use diesel::result::{DatabaseErrorKind, Error};
|
|
use diesel::sqlite::{Sqlite, SqliteConnection};
|
|
use hierarchies::initialize_hier;
|
|
use log::{debug, trace};
|
|
use std::convert::TryFrom;
|
|
use std::fs;
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::Duration;
|
|
|
|
pub fn insert_file<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
file: models::NewFile,
|
|
) -> Result<usize> {
|
|
use crate::database::inner::schema::files;
|
|
|
|
debug!(
|
|
"Inserting {} ({})...",
|
|
&file.path,
|
|
Address::Hash(Hash((&file.hash).clone()))
|
|
);
|
|
|
|
Ok(diesel::insert_into(files::table)
|
|
.values(file)
|
|
.execute(connection)?)
|
|
}
|
|
|
|
pub fn retrieve_file<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
obj_hash: Hash,
|
|
) -> Result<Vec<models::File>> {
|
|
use crate::database::inner::schema::files::dsl::*;
|
|
|
|
let matches = files
|
|
.filter(valid.eq(true))
|
|
.filter(hash.eq(obj_hash.0))
|
|
.load::<models::File>(connection)?;
|
|
|
|
Ok(matches)
|
|
}
|
|
|
|
pub fn retrieve_all_files<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
) -> Result<Vec<models::File>> {
|
|
use crate::database::inner::schema::files::dsl::*;
|
|
let matches = files.load::<models::File>(connection)?;
|
|
Ok(matches)
|
|
}
|
|
|
|
pub fn get_latest_files<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
count: i64,
|
|
) -> Result<Vec<models::File>> {
|
|
use crate::database::inner::schema::files::dsl::*;
|
|
|
|
let matches = files
|
|
.order_by(added.desc())
|
|
.limit(count)
|
|
.load::<models::File>(connection)?;
|
|
|
|
Ok(matches)
|
|
}
|
|
|
|
pub fn file_update_mtime<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
file_id: i32,
|
|
m_time: Option<NaiveDateTime>,
|
|
) -> Result<usize> {
|
|
use crate::database::inner::schema::files::dsl::*;
|
|
|
|
debug!("Setting file ID {}'s mtime = {:?}", file_id, m_time);
|
|
|
|
Ok(diesel::update(files.filter(id.eq(file_id)))
|
|
.set(mtime.eq(m_time))
|
|
.execute(connection)?)
|
|
}
|
|
|
|
pub fn file_set_valid<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
file_id: i32,
|
|
is_valid: bool,
|
|
) -> Result<usize> {
|
|
use crate::database::inner::schema::files::dsl::*;
|
|
|
|
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
|
|
|
Ok(diesel::update(files.filter(id.eq(file_id)))
|
|
.set(valid.eq(is_valid))
|
|
.execute(connection)?)
|
|
}
|
|
|
|
pub fn retrieve_object<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
object_address: Address,
|
|
) -> Result<Vec<Entry>> {
|
|
use crate::database::inner::schema::data::dsl::*;
|
|
|
|
let matches = data
|
|
.filter(entity.eq(object_address.encode()?))
|
|
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?))
|
|
.load::<models::Entry>(connection)?;
|
|
let entries = matches
|
|
.iter()
|
|
.map(Entry::try_from)
|
|
.filter_map(Result::ok)
|
|
.collect();
|
|
|
|
Ok(entries)
|
|
}
|
|
|
|
pub fn bulk_retrieve_objects<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
object_addresses: Vec<Address>,
|
|
) -> Result<Vec<Entry>> {
|
|
use crate::database::inner::schema::data::dsl::*;
|
|
|
|
let matches = data
|
|
.filter(
|
|
entity.eq_any(
|
|
object_addresses
|
|
.iter()
|
|
.filter_map(|addr| addr.encode().ok()),
|
|
),
|
|
)
|
|
// .or_filter(value.eq(EntryValue::Address(object_address).to_str()?))
|
|
.load::<models::Entry>(connection)?;
|
|
|
|
let entries = matches
|
|
.iter()
|
|
.map(Entry::try_from)
|
|
.filter_map(Result::ok)
|
|
.collect();
|
|
|
|
Ok(entries)
|
|
}
|
|
|
|
pub fn remove_object<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
object_address: Address,
|
|
) -> Result<usize> {
|
|
use crate::database::inner::schema::data::dsl::*;
|
|
|
|
debug!("Deleting {}!", object_address);
|
|
|
|
let matches = data
|
|
.filter(identity.eq(object_address.encode()?))
|
|
.or_filter(entity.eq(object_address.encode()?))
|
|
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?));
|
|
|
|
Ok(diesel::delete(matches).execute(connection)?)
|
|
}
|
|
|
|
pub fn query<C: Connection<Backend = Sqlite>>(connection: &C, query: Query) -> Result<Vec<Entry>> {
|
|
use crate::database::inner::schema::data::dsl::*;
|
|
|
|
trace!("Querying: {:?}", query);
|
|
|
|
let db_query = data.filter(query.to_sqlite_predicates()?);
|
|
|
|
trace!("DB query: {}", debug_query(&db_query));
|
|
|
|
let matches = db_query.load::<models::Entry>(connection)?;
|
|
|
|
let entries = matches
|
|
.iter()
|
|
.map(Entry::try_from)
|
|
.filter_map(Result::ok)
|
|
.collect();
|
|
|
|
Ok(entries)
|
|
}
|
|
|
|
pub fn insert_entry<C: Connection<Backend = Sqlite>>(
|
|
connection: &C,
|
|
entry: Entry,
|
|
) -> Result<Address> {
|
|
debug!("Inserting: {}", entry);
|
|
|
|
let insert_entry = models::Entry::try_from(&entry)?;
|
|
|
|
let entry = Entry::try_from(&insert_entry)?;
|
|
|
|
let result = diesel::insert_into(data::table)
|
|
.values(insert_entry)
|
|
.execute(connection);
|
|
|
|
if let Some(error) = result.err() {
|
|
match error {
|
|
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {}
|
|
_ => return Err(anyhow!(error)),
|
|
}
|
|
}
|
|
|
|
Ok(Address::Hash(entry.hash()?))
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct ConnectionOptions {
|
|
pub enable_foreign_keys: bool,
|
|
pub busy_timeout: Option<Duration>,
|
|
}
|
|
|
|
impl ConnectionOptions {
|
|
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
|
|
if self.enable_foreign_keys {
|
|
trace!("Enabling foreign keys");
|
|
conn.execute("PRAGMA foreign_keys = ON;")?;
|
|
}
|
|
|
|
if let Some(duration) = self.busy_timeout {
|
|
trace!("Setting busy_timeout to {:?}", duration);
|
|
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
|
|
}
|
|
|
|
trace!("Setting \"synchronous\" to NORMAL");
|
|
conn.execute("PRAGMA synchronous = NORMAL;")?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
|
|
for ConnectionOptions
|
|
{
|
|
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
|
|
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
|
|
}
|
|
}
|
|
|
|
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
|
|
|
pub struct OpenResult {
|
|
pub pool: DbPool,
|
|
pub new: bool,
|
|
}
|
|
|
|
pub const UPEND_SUBDIR: &str = ".upend";
|
|
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
|
|
|
|
pub fn open_upend<P: AsRef<Path>>(
|
|
dirpath: P,
|
|
db_path: Option<PathBuf>,
|
|
reinitialize: bool,
|
|
) -> Result<OpenResult> {
|
|
embed_migrations!("./migrations/upend/");
|
|
|
|
let upend_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(UPEND_SUBDIR));
|
|
|
|
if reinitialize {
|
|
trace!("Reinitializing - removing previous database...");
|
|
let _ = fs::remove_dir_all(&upend_path);
|
|
}
|
|
let new = !upend_path.exists();
|
|
|
|
if new {
|
|
trace!("Creating UpEnd subdirectory...");
|
|
fs::create_dir(&upend_path)?;
|
|
}
|
|
|
|
trace!("Creating pool.");
|
|
let manager = ConnectionManager::<SqliteConnection>::new(
|
|
upend_path.join(DATABASE_FILENAME).to_str().unwrap(),
|
|
);
|
|
let pool = r2d2::Pool::builder()
|
|
.connection_customizer(Box::new(ConnectionOptions {
|
|
enable_foreign_keys: true,
|
|
busy_timeout: Some(Duration::from_secs(30)),
|
|
}))
|
|
.build(manager)?;
|
|
|
|
let enable_wal_mode = true;
|
|
pool.get().unwrap().execute(if enable_wal_mode {
|
|
trace!("Enabling WAL journal mode & truncating WAL log...");
|
|
"PRAGMA journal_mode = WAL;PRAGMA wal_checkpoint(TRUNCATE);"
|
|
} else {
|
|
trace!("Enabling TRUNCATE journal mode");
|
|
"PRAGMA journal_mode = TRUNCATE;"
|
|
})?;
|
|
|
|
trace!("Pool created, running migrations...");
|
|
|
|
embedded_migrations::run_with_output(
|
|
&pool.get()?,
|
|
&mut LoggerSink {
|
|
..Default::default()
|
|
},
|
|
)?;
|
|
|
|
trace!("Initializing types...");
|
|
|
|
initialize_types(&pool)?;
|
|
initialize_hier(&pool)?;
|
|
|
|
Ok(OpenResult { pool, new })
|
|
}
|
|
|
|
fn initialize_types(pool: &DbPool) -> Result<()> {
|
|
insert_entry(&pool.get()?, Entry::try_from(&*TYPE_INVARIANT)?)?;
|
|
upend_insert_addr!(&pool.get()?, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
|
upend_insert_val!(&pool.get()?, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_HAS_ATTR);
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use super::*;
|
|
use tempdir::TempDir;
|
|
|
|
#[test]
|
|
fn test_open() -> Result<(), anyhow::Error> {
|
|
open_upend(TempDir::new("upend-test").unwrap(), None, true).map(|_| ())
|
|
}
|
|
}
|