extern crate diesel;
extern crate diesel_migrations;
extern crate lazy_static;
mod macros;
pub mod common;
pub mod engine;
pub mod entry;
pub mod hierarchies;
pub mod jobs;
pub mod stores;
mod inner;
mod util;
use crate::common::build;
use crate::engine::execute;
use crate::inner::models;
use crate::inner::schema::data;
use crate::util::LoggerSink;
use anyhow::{anyhow, Result};
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager};
use diesel::result::{DatabaseErrorKind, Error};
use diesel::sqlite::SqliteConnection;
use hierarchies::initialize_hier;
use shadow_rs::is_release;
use std::convert::TryFrom;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
use tracing::{debug, error, trace, warn};
use upend_base::addressing::{Address, Addressable};
use upend_base::entry::{Entry, EntryValue, ImmutableEntry};
use upend_base::error::UpEndError;
use upend_base::hash::UpMultihash;
use upend_base::lang::Query;
pub struct ConnectionOptions {
pub busy_timeout: Option<Duration>,
pub enable_wal_mode: bool,
pub mutex: Arc<Mutex<()>>,
impl ConnectionOptions {
pub fn apply(&self, connection: &SqliteConnection) -> QueryResult<()> {
let _lock = self.mutex.lock().unwrap();
if let Some(duration) = self.busy_timeout {
debug!("Setting busy_timeout to {:?}", duration);
connection.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
connection.execute(if self.enable_wal_mode {
debug!("Enabling WAL journal mode & truncating WAL log...");
"PRAGMA journal_mode = WAL; PRAGMA wal_autocheckpoint = 1000; PRAGMA wal_checkpoint(TRUNCATE);"
} else {
debug!("Enabling TRUNCATE journal mode");
"PRAGMA journal_mode = TRUNCATE;"
debug!(r#"Setting "synchronous" to NORMAL"#);
connection.execute("PRAGMA synchronous = NORMAL;")?;
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
for ConnectionOptions
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
pub struct LoggingHandler {
pub name: &'static str,
impl diesel::r2d2::HandleError<diesel::r2d2::Error> for LoggingHandler {
fn handle_error(&self, error: diesel::r2d2::Error) {
error!(name = self.name, "Database error: {}", error);
if !is_release() {
panic!("Database error! This should not happen! {}", error);
pub struct OpenResult {
pub db: UpEndDatabase,
pub new: bool,
pub struct UpEndDatabase {
pub path: PathBuf,
pool: Arc<DbPool>,
lock: Arc<RwLock<()>>,
pub const UPEND_SUBDIR: &str = ".upend";
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
impl UpEndDatabase {
pub fn open<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<OpenResult> {
let upend_path = dirpath.as_ref().join(UPEND_SUBDIR);
if reinitialize {
warn!("Reinitializing - removing previous database...");
let _ = fs::remove_dir_all(&upend_path);
let new = !upend_path.exists();
if new {
trace!("Creating UpEnd subdirectory...");
trace!("Creating pool.");
let manager = ConnectionManager::<SqliteConnection>::new(
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {
busy_timeout: Some(Duration::from_secs(30)),
enable_wal_mode: true,
mutex: Arc::new(Mutex::new(())),
.error_handler(Box::new(LoggingHandler { name: "main" }))
trace!("Pool created.");
let db = UpEndDatabase {
path: upend_path,
pool: Arc::new(pool),
lock: Arc::new(RwLock::new(())),
let connection = db.connection().unwrap();
if !new {
let db_major: u64 = connection.get_meta("VERSION")?.parse()?;
if db_major > build::PKG_VERSION_MAJOR.parse().unwrap() {
return Err(anyhow!("Incompatible database! Found version "));
trace!("Running migrations...");
&mut LoggerSink {
Ok(OpenResult { db, new })
pub fn connection(&self) -> Result<UpEndConnection> {
Ok(UpEndConnection {
pool: self.pool.clone(),
lock: self.lock.clone(),
pub struct UpEndConnection {
pool: Arc<DbPool>,
lock: Arc<RwLock<()>>,
impl UpEndConnection {
pub fn transaction<T, E, F>(&self, f: F) -> Result<T, E>
F: FnOnce() -> Result<T, E>,
E: From<Error>,
let span = span!(tracing::Level::TRACE, "transaction");
let _span = span.enter();
let _lock = self.transaction_lock.lock().unwrap();
// Disable transactions for now.
pub fn get_meta<S: AsRef<str>>(&self, key: S) -> Result<String> {
use crate::inner::schema::meta::dsl;
let key = key.as_ref();
trace!("Querying META:{key}");
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
.ok_or(anyhow!(r#"No META "{key}" value found."#))
.map(|mv| mv.value.clone())
pub fn retrieve_entry(&self, hash: &UpMultihash) -> Result<Option<Entry>> {
use crate::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let entry = data
match entry.len() {
0 => Ok(None),
1 => Ok(Some(Entry::try_from(entry.get(0).unwrap())?)),
_ => {
"Multiple entries returned with the same hash - this should be impossible!"
pub fn retrieve_object(&self, object_address: &Address) -> Result<Vec<Entry>> {
use crate::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let primary = data
let entries = primary
let secondary = data
.map(|e| e.address())
.map(|addr| addr.encode())
.collect::<Result<Vec<Vec<u8>>, UpEndError>>()?,
let secondary_entries = secondary
Ok([entries, secondary_entries].concat())
pub fn remove_object(&self, object_address: Address) -> Result<usize> {
use crate::inner::schema::data::dsl::*;
trace!("Deleting {}!", object_address);
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
let matches = data
pub fn query(&self, query: Query) -> Result<Vec<Entry>> {
trace!("Querying: {:?}", query);
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let entries = execute(&conn, query)?;
let entries = entries
pub fn insert_entry(&self, entry: Entry) -> Result<Address> {
trace!("Inserting: {}", entry);
let db_entry = models::Entry::try_from(&entry)?;
pub fn insert_entry_immutable(&self, entry: Entry) -> Result<Address> {
trace!("Inserting immutably: {}", entry);
let address = entry.address()?;
let db_entry = models::Entry::try_from(&ImmutableEntry(entry))?;
fn insert_model_entry(&self, entry: models::Entry) -> Result<usize> {
let _lock = self.lock.write().unwrap();
let conn = self.pool.get()?;
let result = diesel::insert_into(data::table)
match result {
Ok(num) => Ok(num),
Err(error) => match error {
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => Ok(0),
_ => Err(anyhow!(error)),
// #[deprecated]
pub fn get_all_addresses(&self) -> Result<Vec<Address>> {
use crate::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let result = data
.filter_map(|buf| Address::decode(&buf).ok())
pub fn get_all_attributes(&self) -> Result<Vec<String>> {
use crate::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let result = data
pub fn get_stats(&self) -> Result<serde_json::Value> {
use crate::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let total_entry_count = data.count().load::<i64>(&conn)?;
let total_entry_count = total_entry_count
.ok_or(anyhow!("Couldn't get entry count"))?;
let api_entry_count = data
let api_entry_count = api_entry_count
.ok_or(anyhow!("Couldn't get API entry count"))?;
let implicit_entry_count = data
let implicit_entry_count = implicit_entry_count
.ok_or(anyhow!("Couldn't get API entry count"))?;
"entryCount": {
"total": total_entry_count,
"api": api_entry_count,
"explicit": api_entry_count - implicit_entry_count
pub fn get_explicit_entries(&self) -> Result<Vec<Entry>> {
use crate::inner::schema::data::dsl::*;
let _lock = self.lock.read().unwrap();
let conn = self.pool.get()?;
let result: Vec<models::Entry> = data
mod test {
use upend_base::constants::{ATTR_LABEL, ATTR_IN};
use super::*;
use tempfile::TempDir;
fn test_open() {
let tempdir = TempDir::new().unwrap();
let result = UpEndDatabase::open(&tempdir, false);
let result = result.unwrap();
// Not new
let result = UpEndDatabase::open(&tempdir, false);
let result = result.unwrap();
// reinitialize true, new again
let result = UpEndDatabase::open(&tempdir, true);
let result = result.unwrap();
fn test_query() {
let tempdir = TempDir::new().unwrap();
let result = UpEndDatabase::open(&tempdir, false).unwrap();
let db = result.db;
let connection = db.connection().unwrap();
let random_entity = Address::Uuid(uuid::Uuid::new_v4());
upend_insert_val!(connection, random_entity, ATTR_LABEL, "FOOBAR").unwrap();
upend_insert_val!(connection, random_entity, "FLAVOUR", "STRANGE").unwrap();
let query = format!(r#"(matches @{random_entity} ? ?)"#)
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 2);
let other_entity = Address::Uuid(uuid::Uuid::new_v4());
upend_insert_val!(connection, other_entity, ATTR_LABEL, "BAZQUX").unwrap();
upend_insert_val!(connection, other_entity, "CHARGE", "POSITIVE").unwrap();
let query = format!(r#"(matches (in @{random_entity} @{other_entity}) ? ?)"#)
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 4);
let query = r#"(matches ? (in "FLAVOUR" "CHARGE") ?)"#.parse().unwrap();
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 2);
let query = format!(r#"(matches ? "{ATTR_LABEL}" (in "FOOBAR" "BAZQUX"))"#)
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 2);
let query = format!(r#"(matches ? "{ATTR_LABEL}" (contains "OOBA"))"#)
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 1);
let query = r#"(or (matches ? ? (contains "OOBA")) (matches ? (contains "HARGE") ?) )"#
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 2);
let query =
format!(r#"(and (matches ? ? (contains "OOBA")) (matches ? "{ATTR_LABEL}" ?) )"#)
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 1);
let query = format!(
(matches ? ? (contains "OOBA"))
(matches ? (contains "HARGE") ?)
(not (matches ? "{ATTR_LABEL}" ?))
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 1);
let edge_entity = Address::Uuid(uuid::Uuid::new_v4());
upend_insert_addr!(connection, random_entity, ATTR_IN, other_entity).unwrap();
upend_insert_addr!(connection, edge_entity, ATTR_IN, random_entity).unwrap();
let query = format!(
(matches ?a "{ATTR_IN}" @{other_entity})
(matches ? "{ATTR_IN}" ?a)
let result = connection.query(query).unwrap();
assert_eq!(result.len(), 1);
assert_eq!(result[0].entity, edge_entity);
assert_eq!(result[0].value, EntryValue::Address(random_entity));