refactor database.rs into separate files
This commit is contained in:
parent
9835b50199
commit
754c315cd2
8 changed files with 835 additions and 809 deletions
|
@ -1,4 +1,4 @@
|
|||
use crate::hash::{decode, encode, Hash};
|
||||
use crate::hash::{decode, encode, Hash, Hashable};
|
||||
use anyhow::{anyhow, Result};
|
||||
use serde::de::Visitor;
|
||||
use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
@ -124,6 +124,12 @@ impl std::fmt::Debug for Address {
|
|||
}
|
||||
}
|
||||
|
||||
pub trait Addressable: Hashable {
|
||||
fn address(&self) -> Result<Address> {
|
||||
Ok(Address::Hash(self.hash()?))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use uuid::Uuid;
|
||||
|
|
802
src/database.rs
802
src/database.rs
|
@ -1,802 +0,0 @@
|
|||
#![macro_use]
|
||||
use crate::addressing::Address;
|
||||
use crate::hash::{decode, hash, Hash, Hashable};
|
||||
use crate::models;
|
||||
use crate::schema::data;
|
||||
use crate::util::LoggerSink;
|
||||
use anyhow::{anyhow, Result};
|
||||
use diesel::debug_query;
|
||||
use diesel::expression::operators::{And, Or};
|
||||
use diesel::prelude::*;
|
||||
use diesel::r2d2::{self, ConnectionManager};
|
||||
use diesel::result::{DatabaseErrorKind, Error};
|
||||
use diesel::sql_types::Bool;
|
||||
use diesel::sqlite::{Sqlite, SqliteConnection};
|
||||
use lexpr::value::Value::Symbol;
|
||||
use lexpr::Value;
|
||||
use lexpr::Value::Cons;
|
||||
use log::{debug, trace};
|
||||
use nonempty::NonEmpty;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::Borrow;
|
||||
use std::convert::TryFrom;
|
||||
use std::fs;
|
||||
use std::io::{Cursor, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
pub const TYPE_TYPE: &str = "TYPE";
|
||||
pub const TYPE_IS_ATTR: &str = "TYPE";
|
||||
pub const TYPE_REQUIRES_ATTR: &str = "TYPE_REQUIRES";
|
||||
pub const TYPE_HAS_ATTR: &str = "TYPE_HAS";
|
||||
pub const TYPE_ID_ATTR: &str = "TYPE_ID";
|
||||
pub const TYPE_INSTANCES_ATTR: &str = "TYPE_INSTANCES";
|
||||
pub const IS_OF_TYPE_ATTR: &str = "IS";
|
||||
|
||||
lazy_static! {
|
||||
static ref TYPE_INVARIANT: InvariantEntry = InvariantEntry {
|
||||
attribute: String::from(TYPE_IS_ATTR),
|
||||
value: EntryValue::Value(serde_json::Value::from(TYPE_TYPE)),
|
||||
};
|
||||
pub static ref TYPE_ADDR: Address = TYPE_INVARIANT.entity().unwrap();
|
||||
}
|
||||
|
||||
#[macro_use]
|
||||
mod macros {
|
||||
macro_rules! upend_insert_val {
|
||||
($db_connection:expr, $entity:expr, $attribute:expr, $value:expr) => {{
|
||||
insert_entry(
|
||||
$db_connection,
|
||||
Entry {
|
||||
entity: $entity.clone(),
|
||||
attribute: String::from($attribute),
|
||||
value: EntryValue::Value(serde_json::Value::from($value)),
|
||||
},
|
||||
)?;
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! upend_insert_addr {
|
||||
($db_connection:expr, $entity:expr, $attribute:expr, $addr:expr) => {{
|
||||
insert_entry(
|
||||
$db_connection,
|
||||
Entry {
|
||||
entity: $entity.clone(),
|
||||
attribute: String::from($attribute),
|
||||
value: EntryValue::Address($addr.clone()),
|
||||
},
|
||||
)?;
|
||||
}};
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct InEntry {
|
||||
pub entity: Option<Address>,
|
||||
pub attribute: String,
|
||||
pub value: EntryValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Entry {
|
||||
pub entity: Address,
|
||||
pub attribute: String,
|
||||
pub value: EntryValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct InvariantEntry {
|
||||
pub attribute: String,
|
||||
pub value: EntryValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(tag = "t", content = "c")]
|
||||
pub enum EntryValue {
|
||||
Value(serde_json::Value),
|
||||
Address(Address),
|
||||
Invalid,
|
||||
}
|
||||
|
||||
impl TryFrom<&models::Entry> for Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(e: &models::Entry) -> Result<Self, Self::Error> {
|
||||
Ok(Entry {
|
||||
entity: Address::decode(&e.entity)?,
|
||||
attribute: e.attribute.clone(),
|
||||
value: e.value.parse().unwrap(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&Entry> for models::Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(e: &Entry) -> Result<Self, Self::Error> {
|
||||
Ok(models::Entry {
|
||||
identity: e.address()?.encode()?,
|
||||
entity: e.entity.encode()?,
|
||||
attribute: e.attribute.clone(),
|
||||
value: e.value.to_string()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&InvariantEntry> for Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(invariant: &InvariantEntry) -> Result<Self, Self::Error> {
|
||||
Ok(Entry {
|
||||
entity: invariant.entity()?,
|
||||
attribute: invariant.attribute.clone(),
|
||||
value: invariant.value.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<InEntry> for Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(in_entry: InEntry) -> Result<Self, Self::Error> {
|
||||
match in_entry.entity {
|
||||
Some(entity) => Ok(Entry {
|
||||
entity,
|
||||
attribute: in_entry.attribute,
|
||||
value: in_entry.value,
|
||||
}),
|
||||
None => Ok(Entry::try_from(&InvariantEntry {
|
||||
attribute: in_entry.attribute,
|
||||
value: in_entry.value,
|
||||
})?),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InvariantEntry {
|
||||
pub fn entity(&self) -> Result<Address> {
|
||||
let mut entity = Cursor::new(vec![0u8; 0]);
|
||||
entity.write_all(self.attribute.as_bytes())?;
|
||||
entity.write_all(self.value.to_string()?.as_bytes())?;
|
||||
Ok(Address::Hash(hash(entity.into_inner())))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Entry {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{} | {} | {}", self.entity, self.attribute, self.value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Hashable for Entry {
|
||||
fn hash(self: &Entry) -> Result<Hash> {
|
||||
let mut result = Cursor::new(vec![0u8; 0]);
|
||||
result.write_all(self.entity.encode()?.as_slice())?;
|
||||
result.write_all(self.attribute.as_bytes())?;
|
||||
result.write_all(self.value.to_string()?.as_bytes())?;
|
||||
Ok(hash(result.get_ref()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Hashable for InvariantEntry {
|
||||
fn hash(&self) -> Result<Hash> {
|
||||
Entry::try_from(self)?.hash()
|
||||
}
|
||||
}
|
||||
|
||||
pub trait Addressable: Hashable {
|
||||
fn address(&self) -> Result<Address> {
|
||||
Ok(Address::Hash(self.hash()?))
|
||||
}
|
||||
}
|
||||
|
||||
impl Addressable for Entry {}
|
||||
impl Addressable for InvariantEntry {}
|
||||
|
||||
impl EntryValue {
|
||||
pub fn to_string(&self) -> Result<String> {
|
||||
let (type_char, content) = match self {
|
||||
EntryValue::Value(value) => ('J', serde_json::to_string(value)?),
|
||||
EntryValue::Address(address) => ('O', address.to_string()),
|
||||
EntryValue::Invalid => return Err(anyhow!("Cannot serialize invalid Entity value.")),
|
||||
};
|
||||
|
||||
Ok(format!("{}{}", type_char, content))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EntryValue {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let (entry_type, entry_value) = match self {
|
||||
EntryValue::Address(address) => ("ADDRESS", address.to_string()),
|
||||
EntryValue::Value(value) => (
|
||||
"VALUE",
|
||||
serde_json::to_string(value).unwrap_or_else(|_| String::from("?!?!?!")),
|
||||
),
|
||||
EntryValue::Invalid => ("INVALID", "INVALID".to_string()),
|
||||
};
|
||||
write!(f, "{}: {}", entry_type, entry_value)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for EntryValue {
|
||||
type Err = std::convert::Infallible;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if s.len() < 2 {
|
||||
Ok(EntryValue::Invalid)
|
||||
} else {
|
||||
let (type_char, content) = s.split_at(1);
|
||||
match (type_char, content) {
|
||||
("J", content) => {
|
||||
if let Ok(value) = serde_json::from_str(content) {
|
||||
Ok(EntryValue::Value(value))
|
||||
} else {
|
||||
Ok(EntryValue::Invalid)
|
||||
}
|
||||
}
|
||||
("O", content) => {
|
||||
if let Ok(addr) = decode(content).and_then(|v| Address::decode(&v)) {
|
||||
Ok(EntryValue::Address(addr))
|
||||
} else {
|
||||
Ok(EntryValue::Invalid)
|
||||
}
|
||||
}
|
||||
_ => Ok(EntryValue::Invalid),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_file<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
file: models::NewFile,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::files;
|
||||
|
||||
debug!(
|
||||
"Inserting {} ({})...",
|
||||
&file.path,
|
||||
Address::Hash(Hash((&file.hash).clone()))
|
||||
);
|
||||
|
||||
Ok(diesel::insert_into(files::table)
|
||||
.values(file)
|
||||
.execute(connection)?)
|
||||
}
|
||||
|
||||
pub fn retrieve_file<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
obj_hash: Hash,
|
||||
) -> Result<Vec<models::File>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
let matches = files
|
||||
.filter(valid.eq(true))
|
||||
.filter(hash.eq(obj_hash.0))
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn retrieve_all_files<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
) -> Result<Vec<models::File>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
let matches = files.load::<models::File>(connection)?;
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn get_latest_files<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
count: i64,
|
||||
) -> Result<Vec<models::File>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
let matches = files
|
||||
.order_by(added.desc())
|
||||
.limit(count)
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn file_set_valid<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
file_id: i32,
|
||||
is_valid: bool,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(valid.eq(is_valid))
|
||||
.execute(connection)?)
|
||||
}
|
||||
|
||||
pub fn retrieve_object<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
object_address: Address,
|
||||
) -> Result<Vec<Entry>> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
let matches = data
|
||||
.filter(entity.eq(object_address.encode()?))
|
||||
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?))
|
||||
.load::<models::Entry>(connection)?;
|
||||
let entries = matches
|
||||
.iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
pub fn bulk_retrieve_objects<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
object_addresses: Vec<Address>,
|
||||
) -> Result<Vec<Entry>> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
let matches = data
|
||||
.filter(
|
||||
entity.eq_any(
|
||||
object_addresses
|
||||
.iter()
|
||||
.filter_map(|addr| addr.encode().ok()),
|
||||
),
|
||||
)
|
||||
// .or_filter(value.eq(EntryValue::Address(object_address).to_str()?))
|
||||
.load::<models::Entry>(connection)?;
|
||||
let entries = matches
|
||||
.iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
pub fn remove_object<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
object_address: Address,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
debug!("Deleting {}!", object_address);
|
||||
|
||||
let matches = data
|
||||
.filter(identity.eq(object_address.encode()?))
|
||||
.or_filter(entity.eq(object_address.encode()?))
|
||||
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?));
|
||||
|
||||
Ok(diesel::delete(matches).execute(connection)?)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryComponent<T>
|
||||
where
|
||||
T: FromStr,
|
||||
{
|
||||
Exact(T),
|
||||
In(Vec<T>),
|
||||
Contains(String),
|
||||
Any,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EntryQuery {
|
||||
pub entity: QueryComponent<Address>,
|
||||
pub attribute: QueryComponent<String>,
|
||||
pub value: QueryComponent<EntryValue>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryPart {
|
||||
Matches(EntryQuery),
|
||||
Type(String),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryQualifier {
|
||||
And,
|
||||
Or,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MultiQuery {
|
||||
pub qualifier: QueryQualifier,
|
||||
pub queries: NonEmpty<Box<Query>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Query {
|
||||
SingleQuery(QueryPart),
|
||||
MultiQuery(MultiQuery),
|
||||
}
|
||||
|
||||
type Predicate = dyn BoxableExpression<data::table, Sqlite, SqlType = Bool>;
|
||||
|
||||
impl TryFrom<&lexpr::Value> for Query {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(expression: &Value) -> Result<Self> {
|
||||
fn parse_component<T: FromStr>(value: &lexpr::Value) -> Result<QueryComponent<T>>
|
||||
where
|
||||
<T as FromStr>::Err: std::fmt::Debug,
|
||||
{
|
||||
match value {
|
||||
Cons(cons) => {
|
||||
if let Symbol(symbol) = cons.car() {
|
||||
match symbol.borrow() {
|
||||
"in" => {
|
||||
let (cons_vec, _) = cons.clone().into_vec();
|
||||
if let Some(split) = cons_vec.split_first() {
|
||||
let args = split.1;
|
||||
let values: Result<Vec<T>, _> = args.iter().map(|value| {
|
||||
if let lexpr::Value::String(str) = value {
|
||||
match T::from_str(str.borrow()) {
|
||||
Ok(value) => Ok(value),
|
||||
Err(error) => Err(anyhow!(format!("Malformed expression: Conversion of inner value '{}' from string failed: {:#?}",str, error))),
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!("Malformed expression: Inner value list must be comprised of strings."))
|
||||
}
|
||||
}).collect();
|
||||
|
||||
Ok(QueryComponent::In(values?))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: Inner value cannot be empty."
|
||||
))
|
||||
}
|
||||
}
|
||||
"contains" => {
|
||||
let (mut cons_vec, _) = cons.clone().into_vec();
|
||||
match cons_vec.len() {
|
||||
2 => {
|
||||
let value = cons_vec.remove(1);
|
||||
if let lexpr::Value::String(str) = value {
|
||||
Ok(QueryComponent::Contains(str.into_string()))
|
||||
} else {
|
||||
Err(anyhow!("Malformed expression: 'Contains' argument must be a string."))
|
||||
}
|
||||
}
|
||||
_ => Err(anyhow!(
|
||||
"Malformed expression: 'Contains' requires a single argument."
|
||||
)),
|
||||
}
|
||||
}
|
||||
_ => Err(anyhow!(format!(
|
||||
"Malformed expression: Unknown symbol {}",
|
||||
symbol
|
||||
))),
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!(format!(
|
||||
"Malformed expression: Inner value '{:?}' is not a symbol.",
|
||||
value
|
||||
)))
|
||||
}
|
||||
}
|
||||
lexpr::Value::String(str) => match T::from_str(str.borrow()) {
|
||||
Ok(value) => Ok(QueryComponent::Exact(value)),
|
||||
Err(error) => Err(anyhow!(format!(
|
||||
"Malformed expression: Conversion of inner value '{}' from string failed: {:#?}",
|
||||
str, error
|
||||
))),
|
||||
},
|
||||
lexpr::Value::Symbol(symbol) => match symbol.borrow() {
|
||||
"?" => Ok(QueryComponent::Any),
|
||||
_ => Err(anyhow!(format!(
|
||||
"Malformed expression: Unknown symbol {}",
|
||||
symbol
|
||||
))),
|
||||
},
|
||||
_ => Err(anyhow!(
|
||||
"Malformed expression: Inner value not a string, list or '?'."
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
if let Cons(value) = expression {
|
||||
if let Symbol(symbol) = value.car() {
|
||||
match symbol.borrow() {
|
||||
"matches" => {
|
||||
let (cons_vec, _) = value.clone().into_vec();
|
||||
if let [_, entity, attribute, value] = &cons_vec[..] {
|
||||
let entity = parse_component::<Address>(entity)?;
|
||||
let attribute = parse_component::<String>(attribute)?;
|
||||
let value = parse_component::<EntryValue>(value)?;
|
||||
Ok(Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
||||
entity,
|
||||
attribute,
|
||||
value,
|
||||
})))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: Wrong number of arguments to 'matches'."
|
||||
))
|
||||
}
|
||||
}
|
||||
"type" => {
|
||||
let (cons_vec, _) = value.clone().into_vec();
|
||||
if let [_, type_name] = &cons_vec[..] {
|
||||
if let lexpr::Value::String(type_name_str) = type_name {
|
||||
Ok(Query::SingleQuery(QueryPart::Type(
|
||||
type_name_str.to_string(),
|
||||
)))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: Type must be specified as a string."
|
||||
))
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: Wrong number of arguments to 'type'."
|
||||
))
|
||||
}
|
||||
}
|
||||
"and" | "or" => {
|
||||
let (cons_vec, _) = value.clone().into_vec();
|
||||
let sub_expressions = &cons_vec[1..];
|
||||
let values: Result<Vec<Box<Query>>> = sub_expressions
|
||||
.iter()
|
||||
.map(|value| Ok(Box::new(Query::try_from(value)?)))
|
||||
.collect();
|
||||
|
||||
if let Some(queries) = NonEmpty::from_vec(values?) {
|
||||
Ok(Query::MultiQuery(MultiQuery {
|
||||
qualifier: match symbol.borrow() {
|
||||
"and" => QueryQualifier::And,
|
||||
_ => QueryQualifier::Or,
|
||||
},
|
||||
queries,
|
||||
}))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: sub-query list cannot be empty.",
|
||||
))
|
||||
}
|
||||
}
|
||||
_ => Err(anyhow!(format!(
|
||||
"Malformed expression: Unknown symbol '{}'.",
|
||||
symbol
|
||||
))),
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!(format!(
|
||||
"Malformed expression: Value '{:?}' is not a symbol.",
|
||||
value
|
||||
)))
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!("Malformed expression: Not a list."))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Query {
|
||||
fn to_sqlite_predicates(&self) -> Result<Box<Predicate>> {
|
||||
match self {
|
||||
Query::SingleQuery(qp) => {
|
||||
match qp {
|
||||
QueryPart::Matches(eq) => {
|
||||
let mut subqueries: Vec<Box<Predicate>> = vec![];
|
||||
|
||||
match &eq.entity {
|
||||
QueryComponent::Exact(q_entity) => {
|
||||
subqueries.push(Box::new(data::entity.eq(q_entity.encode()?)))
|
||||
}
|
||||
QueryComponent::In(q_entities) => {
|
||||
let entities: Result<Vec<_>, _> =
|
||||
q_entities.iter().map(|t| t.encode()).collect();
|
||||
subqueries.push(Box::new(data::entity.eq_any(entities?)))
|
||||
}
|
||||
QueryComponent::Contains(_) => {
|
||||
return Err(anyhow!("Addresses cannot be queried alike."))
|
||||
}
|
||||
QueryComponent::Any => {}
|
||||
};
|
||||
|
||||
match &eq.attribute {
|
||||
QueryComponent::Exact(q_attribute) => {
|
||||
subqueries.push(Box::new(data::attribute.eq(q_attribute.clone())))
|
||||
}
|
||||
QueryComponent::In(q_attributes) => subqueries
|
||||
.push(Box::new(data::attribute.eq_any(q_attributes.clone()))),
|
||||
QueryComponent::Contains(q_attribute) => subqueries
|
||||
.push(Box::new(data::attribute.like(format!("%{}%", q_attribute)))),
|
||||
QueryComponent::Any => {}
|
||||
};
|
||||
|
||||
match &eq.value {
|
||||
QueryComponent::Exact(q_value) => {
|
||||
subqueries.push(Box::new(data::value.eq(q_value.to_string()?)))
|
||||
}
|
||||
QueryComponent::In(q_values) => {
|
||||
let values: Result<Vec<_>, _> =
|
||||
q_values.iter().map(|v| v.to_string()).collect();
|
||||
subqueries.push(Box::new(data::value.eq_any(values?)))
|
||||
}
|
||||
QueryComponent::Contains(q_value) => subqueries
|
||||
.push(Box::new(data::value.like(format!("%{}%", q_value)))),
|
||||
QueryComponent::Any => {}
|
||||
};
|
||||
|
||||
match subqueries.len() {
|
||||
0 => Ok(Box::new(true.into_sql::<Bool>())),
|
||||
1 => Ok(subqueries.remove(0)),
|
||||
_ => {
|
||||
let mut result: Box<And<Box<Predicate>, Box<Predicate>>> =
|
||||
Box::new(And::new(subqueries.remove(0), subqueries.remove(0)));
|
||||
while !subqueries.is_empty() {
|
||||
result = Box::new(And::new(result, subqueries.remove(0)));
|
||||
}
|
||||
Ok(Box::new(result))
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryPart::Type(_) => unimplemented!("Type queries are not yet implemented."),
|
||||
}
|
||||
}
|
||||
Query::MultiQuery(mq) => {
|
||||
let subqueries: Result<Vec<Box<Predicate>>> = mq
|
||||
.queries
|
||||
.iter()
|
||||
.map(|sq| sq.to_sqlite_predicates())
|
||||
.collect();
|
||||
let mut subqueries: Vec<Box<Predicate>> = subqueries?;
|
||||
match subqueries.len() {
|
||||
0 => Ok(Box::new(true.into_sql::<Bool>())),
|
||||
1 => Ok(subqueries.remove(0)),
|
||||
_ => match mq.qualifier {
|
||||
QueryQualifier::And => {
|
||||
let mut result: Box<And<Box<Predicate>, Box<Predicate>>> =
|
||||
Box::new(And::new(subqueries.remove(0), subqueries.remove(0)));
|
||||
while !subqueries.is_empty() {
|
||||
result = Box::new(And::new(result, subqueries.remove(0)));
|
||||
}
|
||||
Ok(Box::new(result))
|
||||
}
|
||||
QueryQualifier::Or => {
|
||||
let mut result: Box<Or<Box<Predicate>, Box<Predicate>>> =
|
||||
Box::new(Or::new(subqueries.remove(0), subqueries.remove(0)));
|
||||
while !subqueries.is_empty() {
|
||||
result = Box::new(Or::new(result, subqueries.remove(0)));
|
||||
}
|
||||
Ok(Box::new(result))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn query<C: Connection<Backend = Sqlite>>(connection: &C, query: Query) -> Result<Vec<Entry>> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
trace!("Querying: {:?}", query);
|
||||
|
||||
let db_query = data.filter(query.to_sqlite_predicates()?);
|
||||
|
||||
trace!("DB query: {}", debug_query(&db_query));
|
||||
|
||||
let matches = db_query.load::<models::Entry>(connection)?;
|
||||
|
||||
let entries = matches
|
||||
.iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
pub fn insert_entry<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
entry: Entry,
|
||||
) -> Result<Address> {
|
||||
debug!("Inserting: {}", entry);
|
||||
|
||||
let insert_entry = models::Entry::try_from(&entry)?;
|
||||
|
||||
let entry = Entry::try_from(&insert_entry)?;
|
||||
|
||||
let result = diesel::insert_into(data::table)
|
||||
.values(insert_entry)
|
||||
.execute(connection);
|
||||
|
||||
if let Some(error) = result.err() {
|
||||
match error {
|
||||
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {}
|
||||
_ => return Err(anyhow!(error)),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Address::Hash(entry.hash()?))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionOptions {
|
||||
pub enable_foreign_keys: bool,
|
||||
pub busy_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl ConnectionOptions {
|
||||
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
|
||||
if self.enable_foreign_keys {
|
||||
conn.execute("PRAGMA foreign_keys = ON;")?;
|
||||
}
|
||||
if let Some(duration) = self.busy_timeout {
|
||||
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
|
||||
for ConnectionOptions
|
||||
{
|
||||
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
|
||||
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
|
||||
}
|
||||
}
|
||||
|
||||
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
||||
|
||||
pub struct OpenResult {
|
||||
pub pool: DbPool,
|
||||
pub new: bool,
|
||||
}
|
||||
|
||||
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
|
||||
|
||||
pub fn open_upend<P: AsRef<Path>>(
|
||||
dirpath: P,
|
||||
db_path: Option<PathBuf>,
|
||||
reinitialize: bool,
|
||||
) -> Result<OpenResult> {
|
||||
embed_migrations!("./migrations/upend/");
|
||||
|
||||
let database_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(DATABASE_FILENAME));
|
||||
|
||||
if reinitialize {
|
||||
let _ = fs::remove_file(&database_path);
|
||||
}
|
||||
let new = !database_path.exists();
|
||||
|
||||
let manager = ConnectionManager::<SqliteConnection>::new(database_path.to_str().unwrap());
|
||||
let pool = r2d2::Pool::builder()
|
||||
.connection_customizer(Box::new(ConnectionOptions {
|
||||
enable_foreign_keys: true,
|
||||
busy_timeout: Some(Duration::from_secs(30)),
|
||||
}))
|
||||
.build(manager)?;
|
||||
|
||||
trace!("Pool created, running migrations...");
|
||||
|
||||
embedded_migrations::run_with_output(
|
||||
&pool.get()?,
|
||||
&mut LoggerSink {
|
||||
..Default::default()
|
||||
},
|
||||
)?;
|
||||
|
||||
trace!("Initializing types...");
|
||||
|
||||
initialize_types(&pool)?;
|
||||
|
||||
Ok(OpenResult { pool, new })
|
||||
}
|
||||
|
||||
fn initialize_types(pool: &DbPool) -> Result<()> {
|
||||
insert_entry(&pool.get()?, Entry::try_from(&*TYPE_INVARIANT)?)?;
|
||||
upend_insert_addr!(&pool.get()?, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
||||
upend_insert_val!(&pool.get()?, TYPE_ADDR, TYPE_ID_ATTR, TYPE_IS_ATTR);
|
||||
Ok(())
|
||||
}
|
18
src/database/constants.rs
Normal file
18
src/database/constants.rs
Normal file
|
@ -0,0 +1,18 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::entry::{EntryValue, InvariantEntry};
|
||||
|
||||
pub const TYPE_TYPE: &str = "TYPE";
|
||||
pub const TYPE_IS_ATTR: &str = "TYPE";
|
||||
pub const TYPE_REQUIRES_ATTR: &str = "TYPE_REQUIRES";
|
||||
pub const TYPE_HAS_ATTR: &str = "TYPE_HAS";
|
||||
pub const TYPE_ID_ATTR: &str = "TYPE_ID";
|
||||
pub const TYPE_INSTANCES_ATTR: &str = "TYPE_INSTANCES";
|
||||
pub const IS_OF_TYPE_ATTR: &str = "IS";
|
||||
|
||||
lazy_static! {
|
||||
pub static ref TYPE_INVARIANT: InvariantEntry = InvariantEntry {
|
||||
attribute: String::from(TYPE_IS_ATTR),
|
||||
value: EntryValue::Value(serde_json::Value::from(TYPE_TYPE)),
|
||||
};
|
||||
pub static ref TYPE_ADDR: Address = TYPE_INVARIANT.entity().unwrap();
|
||||
}
|
179
src/database/entry.rs
Normal file
179
src/database/entry.rs
Normal file
|
@ -0,0 +1,179 @@
|
|||
use crate::addressing::{Address, Addressable};
|
||||
use crate::hash::{decode, hash, Hash, Hashable};
|
||||
use crate::models;
|
||||
use anyhow::{anyhow, Result};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::convert::TryFrom;
|
||||
use std::io::{Cursor, Write};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct InEntry {
|
||||
pub entity: Option<Address>,
|
||||
pub attribute: String,
|
||||
pub value: EntryValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Entry {
|
||||
pub entity: Address,
|
||||
pub attribute: String,
|
||||
pub value: EntryValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct InvariantEntry {
|
||||
pub attribute: String,
|
||||
pub value: EntryValue,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(tag = "t", content = "c")]
|
||||
pub enum EntryValue {
|
||||
Value(serde_json::Value),
|
||||
Address(Address),
|
||||
Invalid,
|
||||
}
|
||||
|
||||
impl TryFrom<&models::Entry> for Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(e: &models::Entry) -> Result<Self, Self::Error> {
|
||||
Ok(Entry {
|
||||
entity: Address::decode(&e.entity)?,
|
||||
attribute: e.attribute.clone(),
|
||||
value: e.value.parse().unwrap(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&Entry> for models::Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(e: &Entry) -> Result<Self, Self::Error> {
|
||||
Ok(models::Entry {
|
||||
identity: e.address()?.encode()?,
|
||||
entity: e.entity.encode()?,
|
||||
attribute: e.attribute.clone(),
|
||||
value: e.value.to_string()?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&InvariantEntry> for Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(invariant: &InvariantEntry) -> Result<Self, Self::Error> {
|
||||
Ok(Entry {
|
||||
entity: invariant.entity()?,
|
||||
attribute: invariant.attribute.clone(),
|
||||
value: invariant.value.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<InEntry> for Entry {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(in_entry: InEntry) -> Result<Self, Self::Error> {
|
||||
match in_entry.entity {
|
||||
Some(entity) => Ok(Entry {
|
||||
entity,
|
||||
attribute: in_entry.attribute,
|
||||
value: in_entry.value,
|
||||
}),
|
||||
None => Ok(Entry::try_from(&InvariantEntry {
|
||||
attribute: in_entry.attribute,
|
||||
value: in_entry.value,
|
||||
})?),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl InvariantEntry {
|
||||
pub fn entity(&self) -> Result<Address> {
|
||||
let mut entity = Cursor::new(vec![0u8; 0]);
|
||||
entity.write_all(self.attribute.as_bytes())?;
|
||||
entity.write_all(self.value.to_string()?.as_bytes())?;
|
||||
Ok(Address::Hash(hash(entity.into_inner())))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for Entry {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{} | {} | {}", self.entity, self.attribute, self.value)
|
||||
}
|
||||
}
|
||||
|
||||
impl Hashable for Entry {
|
||||
fn hash(self: &Entry) -> Result<Hash> {
|
||||
let mut result = Cursor::new(vec![0u8; 0]);
|
||||
result.write_all(self.entity.encode()?.as_slice())?;
|
||||
result.write_all(self.attribute.as_bytes())?;
|
||||
result.write_all(self.value.to_string()?.as_bytes())?;
|
||||
Ok(hash(result.get_ref()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Hashable for InvariantEntry {
|
||||
fn hash(&self) -> Result<Hash> {
|
||||
Entry::try_from(self)?.hash()
|
||||
}
|
||||
}
|
||||
|
||||
impl Addressable for Entry {}
|
||||
impl Addressable for InvariantEntry {}
|
||||
|
||||
impl EntryValue {
|
||||
pub fn to_string(&self) -> Result<String> {
|
||||
let (type_char, content) = match self {
|
||||
EntryValue::Value(value) => ('J', serde_json::to_string(value)?),
|
||||
EntryValue::Address(address) => ('O', address.to_string()),
|
||||
EntryValue::Invalid => return Err(anyhow!("Cannot serialize invalid Entity value.")),
|
||||
};
|
||||
|
||||
Ok(format!("{}{}", type_char, content))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for EntryValue {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let (entry_type, entry_value) = match self {
|
||||
EntryValue::Address(address) => ("ADDRESS", address.to_string()),
|
||||
EntryValue::Value(value) => (
|
||||
"VALUE",
|
||||
serde_json::to_string(value).unwrap_or_else(|_| String::from("?!?!?!")),
|
||||
),
|
||||
EntryValue::Invalid => ("INVALID", "INVALID".to_string()),
|
||||
};
|
||||
write!(f, "{}: {}", entry_type, entry_value)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::str::FromStr for EntryValue {
|
||||
type Err = std::convert::Infallible;
|
||||
|
||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||
if s.len() < 2 {
|
||||
Ok(EntryValue::Invalid)
|
||||
} else {
|
||||
let (type_char, content) = s.split_at(1);
|
||||
match (type_char, content) {
|
||||
("J", content) => {
|
||||
if let Ok(value) = serde_json::from_str(content) {
|
||||
Ok(EntryValue::Value(value))
|
||||
} else {
|
||||
Ok(EntryValue::Invalid)
|
||||
}
|
||||
}
|
||||
("O", content) => {
|
||||
if let Ok(addr) = decode(content).and_then(|v| Address::decode(&v)) {
|
||||
Ok(EntryValue::Address(addr))
|
||||
} else {
|
||||
Ok(EntryValue::Invalid)
|
||||
}
|
||||
}
|
||||
_ => Ok(EntryValue::Invalid),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
313
src/database/lang.rs
Normal file
313
src/database/lang.rs
Normal file
|
@ -0,0 +1,313 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::entry::EntryValue;
|
||||
use crate::schema::data;
|
||||
use anyhow::{anyhow, Result};
|
||||
use diesel::expression::operators::{And, Or};
|
||||
use diesel::sql_types::Bool;
|
||||
use diesel::sqlite::Sqlite;
|
||||
use diesel::{BoxableExpression, ExpressionMethods, IntoSql, TextExpressionMethods};
|
||||
use nonempty::NonEmpty;
|
||||
use std::borrow::Borrow;
|
||||
use std::convert::TryFrom;
|
||||
use std::str::FromStr;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryComponent<T>
|
||||
where
|
||||
T: FromStr,
|
||||
{
|
||||
Exact(T),
|
||||
In(Vec<T>),
|
||||
Contains(String),
|
||||
Any,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct EntryQuery {
|
||||
pub entity: QueryComponent<Address>,
|
||||
pub attribute: QueryComponent<String>,
|
||||
pub value: QueryComponent<EntryValue>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryPart {
|
||||
Matches(EntryQuery),
|
||||
Type(String),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum QueryQualifier {
|
||||
And,
|
||||
Or,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MultiQuery {
|
||||
pub qualifier: QueryQualifier,
|
||||
pub queries: NonEmpty<Box<Query>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Query {
|
||||
SingleQuery(QueryPart),
|
||||
MultiQuery(MultiQuery),
|
||||
}
|
||||
|
||||
type Predicate = dyn BoxableExpression<data::table, Sqlite, SqlType = Bool>;
|
||||
|
||||
impl TryFrom<&lexpr::Value> for Query {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(expression: &lexpr::Value) -> Result<Self> {
|
||||
fn parse_component<T: FromStr>(value: &lexpr::Value) -> Result<QueryComponent<T>>
|
||||
where
|
||||
<T as FromStr>::Err: std::fmt::Debug,
|
||||
{
|
||||
match value {
|
||||
lexpr::Value::Cons(cons) => {
|
||||
if let lexpr::Value::Symbol(symbol) = cons.car() {
|
||||
match symbol.borrow() {
|
||||
"in" => {
|
||||
let (cons_vec, _) = cons.clone().into_vec();
|
||||
if let Some(split) = cons_vec.split_first() {
|
||||
let args = split.1;
|
||||
let values: Result<Vec<T>, _> = args.iter().map(|value| {
|
||||
if let lexpr::Value::String(str) = value {
|
||||
match T::from_str(str.borrow()) {
|
||||
Ok(value) => Ok(value),
|
||||
Err(error) => Err(anyhow!(format!("Malformed expression: Conversion of inner value '{}' from string failed: {:#?}",str, error))),
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!("Malformed expression: Inner value list must be comprised of strings."))
|
||||
}
|
||||
}).collect();
|
||||
|
||||
Ok(QueryComponent::In(values?))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: Inner value cannot be empty."
|
||||
))
|
||||
}
|
||||
}
|
||||
"contains" => {
|
||||
let (mut cons_vec, _) = cons.clone().into_vec();
|
||||
match cons_vec.len() {
|
||||
2 => {
|
||||
let value = cons_vec.remove(1);
|
||||
if let lexpr::Value::String(str) = value {
|
||||
Ok(QueryComponent::Contains(str.into_string()))
|
||||
} else {
|
||||
Err(anyhow!("Malformed expression: 'Contains' argument must be a string."))
|
||||
}
|
||||
}
|
||||
_ => Err(anyhow!(
|
||||
"Malformed expression: 'Contains' requires a single argument."
|
||||
)),
|
||||
}
|
||||
}
|
||||
_ => Err(anyhow!(format!(
|
||||
"Malformed expression: Unknown symbol {}",
|
||||
symbol
|
||||
))),
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!(format!(
|
||||
"Malformed expression: Inner value '{:?}' is not a symbol.",
|
||||
value
|
||||
)))
|
||||
}
|
||||
}
|
||||
lexpr::Value::String(str) => match T::from_str(str.borrow()) {
|
||||
Ok(value) => Ok(QueryComponent::Exact(value)),
|
||||
Err(error) => Err(anyhow!(format!(
|
||||
"Malformed expression: Conversion of inner value '{}' from string failed: {:#?}",
|
||||
str, error
|
||||
))),
|
||||
},
|
||||
lexpr::Value::Symbol(symbol) => match symbol.borrow() {
|
||||
"?" => Ok(QueryComponent::Any),
|
||||
_ => Err(anyhow!(format!(
|
||||
"Malformed expression: Unknown symbol {}",
|
||||
symbol
|
||||
))),
|
||||
},
|
||||
_ => Err(anyhow!(
|
||||
"Malformed expression: Inner value not a string, list or '?'."
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
if let lexpr::Value::Cons(value) = expression {
|
||||
if let lexpr::Value::Symbol(symbol) = value.car() {
|
||||
match symbol.borrow() {
|
||||
"matches" => {
|
||||
let (cons_vec, _) = value.clone().into_vec();
|
||||
if let [_, entity, attribute, value] = &cons_vec[..] {
|
||||
let entity = parse_component::<Address>(entity)?;
|
||||
let attribute = parse_component::<String>(attribute)?;
|
||||
let value = parse_component::<EntryValue>(value)?;
|
||||
Ok(Query::SingleQuery(QueryPart::Matches(EntryQuery {
|
||||
entity,
|
||||
attribute,
|
||||
value,
|
||||
})))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: Wrong number of arguments to 'matches'."
|
||||
))
|
||||
}
|
||||
}
|
||||
"type" => {
|
||||
let (cons_vec, _) = value.clone().into_vec();
|
||||
if let [_, type_name] = &cons_vec[..] {
|
||||
if let lexpr::Value::String(type_name_str) = type_name {
|
||||
Ok(Query::SingleQuery(QueryPart::Type(
|
||||
type_name_str.to_string(),
|
||||
)))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: Type must be specified as a string."
|
||||
))
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: Wrong number of arguments to 'type'."
|
||||
))
|
||||
}
|
||||
}
|
||||
"and" | "or" => {
|
||||
let (cons_vec, _) = value.clone().into_vec();
|
||||
let sub_expressions = &cons_vec[1..];
|
||||
let values: Result<Vec<Box<Query>>> = sub_expressions
|
||||
.iter()
|
||||
.map(|value| Ok(Box::new(Query::try_from(value)?)))
|
||||
.collect();
|
||||
|
||||
if let Some(queries) = NonEmpty::from_vec(values?) {
|
||||
Ok(Query::MultiQuery(MultiQuery {
|
||||
qualifier: match symbol.borrow() {
|
||||
"and" => QueryQualifier::And,
|
||||
_ => QueryQualifier::Or,
|
||||
},
|
||||
queries,
|
||||
}))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"Malformed expression: sub-query list cannot be empty.",
|
||||
))
|
||||
}
|
||||
}
|
||||
_ => Err(anyhow!(format!(
|
||||
"Malformed expression: Unknown symbol '{}'.",
|
||||
symbol
|
||||
))),
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!(format!(
|
||||
"Malformed expression: Value '{:?}' is not a symbol.",
|
||||
value
|
||||
)))
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!("Malformed expression: Not a list."))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Query {
|
||||
pub(crate) fn to_sqlite_predicates(&self) -> Result<Box<Predicate>> {
|
||||
match self {
|
||||
Query::SingleQuery(qp) => {
|
||||
match qp {
|
||||
QueryPart::Matches(eq) => {
|
||||
let mut subqueries: Vec<Box<Predicate>> = vec![];
|
||||
|
||||
match &eq.entity {
|
||||
QueryComponent::Exact(q_entity) => {
|
||||
subqueries.push(Box::new(data::entity.eq(q_entity.encode()?)))
|
||||
}
|
||||
QueryComponent::In(q_entities) => {
|
||||
let entities: Result<Vec<_>, _> =
|
||||
q_entities.iter().map(|t| t.encode()).collect();
|
||||
subqueries.push(Box::new(data::entity.eq_any(entities?)))
|
||||
}
|
||||
QueryComponent::Contains(_) => {
|
||||
return Err(anyhow!("Addresses cannot be queried alike."))
|
||||
}
|
||||
QueryComponent::Any => {}
|
||||
};
|
||||
|
||||
match &eq.attribute {
|
||||
QueryComponent::Exact(q_attribute) => {
|
||||
subqueries.push(Box::new(data::attribute.eq(q_attribute.clone())))
|
||||
}
|
||||
QueryComponent::In(q_attributes) => subqueries
|
||||
.push(Box::new(data::attribute.eq_any(q_attributes.clone()))),
|
||||
QueryComponent::Contains(q_attribute) => subqueries
|
||||
.push(Box::new(data::attribute.like(format!("%{}%", q_attribute)))),
|
||||
QueryComponent::Any => {}
|
||||
};
|
||||
|
||||
match &eq.value {
|
||||
QueryComponent::Exact(q_value) => {
|
||||
subqueries.push(Box::new(data::value.eq(q_value.to_string()?)))
|
||||
}
|
||||
QueryComponent::In(q_values) => {
|
||||
let values: Result<Vec<_>, _> =
|
||||
q_values.iter().map(|v| v.to_string()).collect();
|
||||
subqueries.push(Box::new(data::value.eq_any(values?)))
|
||||
}
|
||||
QueryComponent::Contains(q_value) => subqueries
|
||||
.push(Box::new(data::value.like(format!("%{}%", q_value)))),
|
||||
QueryComponent::Any => {}
|
||||
};
|
||||
|
||||
match subqueries.len() {
|
||||
0 => Ok(Box::new(true.into_sql::<Bool>())),
|
||||
1 => Ok(subqueries.remove(0)),
|
||||
_ => {
|
||||
let mut result: Box<And<Box<Predicate>, Box<Predicate>>> =
|
||||
Box::new(And::new(subqueries.remove(0), subqueries.remove(0)));
|
||||
while !subqueries.is_empty() {
|
||||
result = Box::new(And::new(result, subqueries.remove(0)));
|
||||
}
|
||||
Ok(Box::new(result))
|
||||
}
|
||||
}
|
||||
}
|
||||
QueryPart::Type(_) => unimplemented!("Type queries are not yet implemented."),
|
||||
}
|
||||
}
|
||||
Query::MultiQuery(mq) => {
|
||||
let subqueries: Result<Vec<Box<Predicate>>> = mq
|
||||
.queries
|
||||
.iter()
|
||||
.map(|sq| sq.to_sqlite_predicates())
|
||||
.collect();
|
||||
let mut subqueries: Vec<Box<Predicate>> = subqueries?;
|
||||
match subqueries.len() {
|
||||
0 => Ok(Box::new(true.into_sql::<Bool>())),
|
||||
1 => Ok(subqueries.remove(0)),
|
||||
_ => match mq.qualifier {
|
||||
QueryQualifier::And => {
|
||||
let mut result: Box<And<Box<Predicate>, Box<Predicate>>> =
|
||||
Box::new(And::new(subqueries.remove(0), subqueries.remove(0)));
|
||||
while !subqueries.is_empty() {
|
||||
result = Box::new(And::new(result, subqueries.remove(0)));
|
||||
}
|
||||
Ok(Box::new(result))
|
||||
}
|
||||
QueryQualifier::Or => {
|
||||
let mut result: Box<Or<Box<Predicate>, Box<Predicate>>> =
|
||||
Box::new(Or::new(subqueries.remove(0), subqueries.remove(0)));
|
||||
while !subqueries.is_empty() {
|
||||
result = Box::new(Or::new(result, subqueries.remove(0)));
|
||||
}
|
||||
Ok(Box::new(result))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
307
src/database/mod.rs
Normal file
307
src/database/mod.rs
Normal file
|
@ -0,0 +1,307 @@
|
|||
#![macro_use]
|
||||
|
||||
pub mod constants;
|
||||
pub mod entry;
|
||||
pub mod lang;
|
||||
|
||||
use crate::addressing::Address;
|
||||
use crate::database::constants::{
|
||||
IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_ID_ATTR, TYPE_INVARIANT, TYPE_IS_ATTR,
|
||||
};
|
||||
use crate::database::entry::{Entry, EntryValue};
|
||||
use crate::database::lang::Query;
|
||||
use crate::hash::{Hash, Hashable};
|
||||
use crate::models;
|
||||
use crate::schema::data;
|
||||
use crate::util::LoggerSink;
|
||||
use anyhow::{anyhow, Result};
|
||||
use diesel::debug_query;
|
||||
use diesel::prelude::*;
|
||||
use diesel::r2d2::{self, ConnectionManager};
|
||||
use diesel::result::{DatabaseErrorKind, Error};
|
||||
use diesel::sqlite::{Sqlite, SqliteConnection};
|
||||
use log::{debug, trace};
|
||||
use std::convert::TryFrom;
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
||||
#[macro_use]
|
||||
mod macros {
|
||||
macro_rules! upend_insert_val {
|
||||
($db_connection:expr, $entity:expr, $attribute:expr, $value:expr) => {{
|
||||
insert_entry(
|
||||
$db_connection,
|
||||
Entry {
|
||||
entity: $entity.clone(),
|
||||
attribute: String::from($attribute),
|
||||
value: EntryValue::Value(serde_json::Value::from($value)),
|
||||
},
|
||||
)?;
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! upend_insert_addr {
|
||||
($db_connection:expr, $entity:expr, $attribute:expr, $addr:expr) => {{
|
||||
insert_entry(
|
||||
$db_connection,
|
||||
Entry {
|
||||
entity: $entity.clone(),
|
||||
attribute: String::from($attribute),
|
||||
value: EntryValue::Address($addr.clone()),
|
||||
},
|
||||
)?;
|
||||
}};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert_file<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
file: models::NewFile,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::files;
|
||||
|
||||
debug!(
|
||||
"Inserting {} ({})...",
|
||||
&file.path,
|
||||
Address::Hash(Hash((&file.hash).clone()))
|
||||
);
|
||||
|
||||
Ok(diesel::insert_into(files::table)
|
||||
.values(file)
|
||||
.execute(connection)?)
|
||||
}
|
||||
|
||||
pub fn retrieve_file<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
obj_hash: Hash,
|
||||
) -> Result<Vec<models::File>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
let matches = files
|
||||
.filter(valid.eq(true))
|
||||
.filter(hash.eq(obj_hash.0))
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn retrieve_all_files<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
) -> Result<Vec<models::File>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
let matches = files.load::<models::File>(connection)?;
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn get_latest_files<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
count: i64,
|
||||
) -> Result<Vec<models::File>> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
let matches = files
|
||||
.order_by(added.desc())
|
||||
.limit(count)
|
||||
.load::<models::File>(connection)?;
|
||||
|
||||
Ok(matches)
|
||||
}
|
||||
|
||||
pub fn file_set_valid<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
file_id: i32,
|
||||
is_valid: bool,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::files::dsl::*;
|
||||
|
||||
debug!("Setting file ID {} to valid = {}", file_id, is_valid);
|
||||
|
||||
Ok(diesel::update(files.filter(id.eq(file_id)))
|
||||
.set(valid.eq(is_valid))
|
||||
.execute(connection)?)
|
||||
}
|
||||
|
||||
pub fn retrieve_object<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
object_address: Address,
|
||||
) -> Result<Vec<Entry>> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
let matches = data
|
||||
.filter(entity.eq(object_address.encode()?))
|
||||
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?))
|
||||
.load::<models::Entry>(connection)?;
|
||||
let entries = matches
|
||||
.iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
pub fn bulk_retrieve_objects<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
object_addresses: Vec<Address>,
|
||||
) -> Result<Vec<Entry>> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
let matches = data
|
||||
.filter(
|
||||
entity.eq_any(
|
||||
object_addresses
|
||||
.iter()
|
||||
.filter_map(|addr| addr.encode().ok()),
|
||||
),
|
||||
)
|
||||
// .or_filter(value.eq(EntryValue::Address(object_address).to_str()?))
|
||||
.load::<models::Entry>(connection)?;
|
||||
let entries = matches
|
||||
.iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
pub fn remove_object<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
object_address: Address,
|
||||
) -> Result<usize> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
debug!("Deleting {}!", object_address);
|
||||
|
||||
let matches = data
|
||||
.filter(identity.eq(object_address.encode()?))
|
||||
.or_filter(entity.eq(object_address.encode()?))
|
||||
.or_filter(value.eq(EntryValue::Address(object_address).to_string()?));
|
||||
|
||||
Ok(diesel::delete(matches).execute(connection)?)
|
||||
}
|
||||
|
||||
pub fn query<C: Connection<Backend = Sqlite>>(connection: &C, query: Query) -> Result<Vec<Entry>> {
|
||||
use crate::schema::data::dsl::*;
|
||||
|
||||
trace!("Querying: {:?}", query);
|
||||
|
||||
let db_query = data.filter(query.to_sqlite_predicates()?);
|
||||
|
||||
trace!("DB query: {}", debug_query(&db_query));
|
||||
|
||||
let matches = db_query.load::<models::Entry>(connection)?;
|
||||
|
||||
let entries = matches
|
||||
.iter()
|
||||
.map(Entry::try_from)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
pub fn insert_entry<C: Connection<Backend = Sqlite>>(
|
||||
connection: &C,
|
||||
entry: Entry,
|
||||
) -> Result<Address> {
|
||||
debug!("Inserting: {}", entry);
|
||||
|
||||
let insert_entry = models::Entry::try_from(&entry)?;
|
||||
|
||||
let entry = Entry::try_from(&insert_entry)?;
|
||||
|
||||
let result = diesel::insert_into(data::table)
|
||||
.values(insert_entry)
|
||||
.execute(connection);
|
||||
|
||||
if let Some(error) = result.err() {
|
||||
match error {
|
||||
Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {}
|
||||
_ => return Err(anyhow!(error)),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Address::Hash(entry.hash()?))
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectionOptions {
|
||||
pub enable_foreign_keys: bool,
|
||||
pub busy_timeout: Option<Duration>,
|
||||
}
|
||||
|
||||
impl ConnectionOptions {
|
||||
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
|
||||
if self.enable_foreign_keys {
|
||||
conn.execute("PRAGMA foreign_keys = ON;")?;
|
||||
}
|
||||
if let Some(duration) = self.busy_timeout {
|
||||
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
|
||||
for ConnectionOptions
|
||||
{
|
||||
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
|
||||
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
|
||||
}
|
||||
}
|
||||
|
||||
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
||||
|
||||
pub struct OpenResult {
|
||||
pub pool: DbPool,
|
||||
pub new: bool,
|
||||
}
|
||||
|
||||
pub const DATABASE_FILENAME: &str = "upend.sqlite3";
|
||||
|
||||
pub fn open_upend<P: AsRef<Path>>(
|
||||
dirpath: P,
|
||||
db_path: Option<PathBuf>,
|
||||
reinitialize: bool,
|
||||
) -> Result<OpenResult> {
|
||||
embed_migrations!("./migrations/upend/");
|
||||
|
||||
let database_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(DATABASE_FILENAME));
|
||||
|
||||
if reinitialize {
|
||||
let _ = fs::remove_file(&database_path);
|
||||
}
|
||||
let new = !database_path.exists();
|
||||
|
||||
let manager = ConnectionManager::<SqliteConnection>::new(database_path.to_str().unwrap());
|
||||
let pool = r2d2::Pool::builder()
|
||||
.connection_customizer(Box::new(ConnectionOptions {
|
||||
enable_foreign_keys: true,
|
||||
busy_timeout: Some(Duration::from_secs(30)),
|
||||
}))
|
||||
.build(manager)?;
|
||||
|
||||
trace!("Pool created, running migrations...");
|
||||
|
||||
embedded_migrations::run_with_output(
|
||||
&pool.get()?,
|
||||
&mut LoggerSink {
|
||||
..Default::default()
|
||||
},
|
||||
)?;
|
||||
|
||||
trace!("Initializing types...");
|
||||
|
||||
initialize_types(&pool)?;
|
||||
|
||||
Ok(OpenResult { pool, new })
|
||||
}
|
||||
|
||||
fn initialize_types(pool: &DbPool) -> Result<()> {
|
||||
insert_entry(&pool.get()?, Entry::try_from(&*TYPE_INVARIANT)?)?;
|
||||
upend_insert_addr!(&pool.get()?, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR);
|
||||
upend_insert_val!(&pool.get()?, TYPE_ADDR, TYPE_ID_ATTR, TYPE_IS_ATTR);
|
||||
Ok(())
|
||||
}
|
|
@ -1,9 +1,13 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::constants::{
|
||||
IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_ID_ATTR, TYPE_INSTANCES_ATTR, TYPE_IS_ATTR,
|
||||
TYPE_REQUIRES_ATTR,
|
||||
};
|
||||
use crate::database::entry::{Entry, EntryValue, InvariantEntry};
|
||||
use crate::database::lang::{EntryQuery, Query, QueryComponent, QueryPart};
|
||||
use crate::database::{
|
||||
bulk_retrieve_objects, file_set_valid, insert_entry, insert_file, query, retrieve_all_files,
|
||||
DbPool, Entry, EntryQuery, EntryValue, InvariantEntry, Query, QueryComponent, QueryPart,
|
||||
DATABASE_FILENAME, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_ID_ATTR,
|
||||
TYPE_INSTANCES_ATTR, TYPE_IS_ATTR, TYPE_REQUIRES_ATTR,
|
||||
DbPool, DATABASE_FILENAME,
|
||||
};
|
||||
use crate::hash::Hashable;
|
||||
use crate::jobs::{Job, JobContainer, JobId, State};
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::addressing::{Address, Addressable};
|
||||
use crate::database::entry::{Entry, InEntry};
|
||||
use crate::database::lang::Query;
|
||||
use crate::database::{
|
||||
get_latest_files, insert_entry, query, remove_object, retrieve_file, retrieve_object,
|
||||
Addressable, DbPool, Entry, InEntry, Query,
|
||||
get_latest_files, insert_entry, query, remove_object, retrieve_file, retrieve_object, DbPool,
|
||||
};
|
||||
use crate::filesystem::{list_directory, UPath};
|
||||
use crate::hash::{decode, encode};
|
||||
|
|
Loading…
Reference in a new issue