upend/db/src/engine.rs

414 lines
20 KiB
Rust

use std::collections::HashMap;
use super::inner::models::Entry;
use super::inner::schema::data;
use crate::inner::models;
use anyhow::Result;
use diesel::expression::grouped::Grouped;
use diesel::expression::operators::{And, Not, Or};
use diesel::sql_types::Bool;
use diesel::sqlite::Sqlite;
use diesel::IntoSql;
use diesel::RunQueryDsl;
use diesel::{
r2d2::{ConnectionManager, PooledConnection},
SqliteConnection,
};
use diesel::{BoxableExpression, QueryDsl};
use diesel::{ExpressionMethods, TextExpressionMethods};
use upend_base::addressing::Address;
use upend_base::entry::{EntryPart, EntryValue};
use upend_base::lang::{Attribute, Query, QueryComponent, QueryPart, QueryQualifier};
#[derive(Debug, Clone)]
pub struct QueryExecutionError(String);
impl std::fmt::Display for QueryExecutionError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for QueryExecutionError {}
pub fn execute(
connection: &PooledConnection<ConnectionManager<SqliteConnection>>,
query: Query,
) -> Result<Vec<Entry>, QueryExecutionError> {
use crate::inner::schema::data::dsl::*;
if let Some(predicates) = to_sqlite_predicates(query.clone())? {
let db_query = data.filter(predicates);
db_query
.load::<models::Entry>(connection)
.map_err(|e| QueryExecutionError(e.to_string()))
} else {
match query {
Query::SingleQuery(_) => Err(QueryExecutionError(
"Forced manual evaluation of an atomic query, this should never happen.".into(),
)),
Query::MultiQuery(mq) => match mq.qualifier {
QueryQualifier::Not => Err(QueryExecutionError(
"Stopped manual evaluation at NOT sub-query due to performance limits. Please \
rework your query."
.into(),
)),
_ => {
if let QueryQualifier::Join = mq.qualifier {
let pattern_queries = mq
.queries
.into_iter()
.map(|q| match *q {
Query::SingleQuery(QueryPart::Matches(pq)) => Some(pq),
_ => None,
})
.collect::<Option<Vec<_>>>()
.ok_or(QueryExecutionError(
"Cannot join on non-atomic queries.".into(),
))?;
let mut vars: HashMap<String, Vec<EntryPart>> = HashMap::new();
let mut subquery_results: Vec<Entry> = vec![];
for query in pattern_queries {
let mut final_query = query.clone();
if let QueryComponent::Variable(Some(var_name)) = &query.entity {
if let Some(entities) = vars.get(var_name) {
final_query.entity = QueryComponent::In(
entities
.iter()
.filter_map(|e| match e {
EntryPart::Entity(a) => Some(a.clone()),
EntryPart::Value(EntryValue::Address(a)) => {
Some(a.clone())
}
_ => None,
})
.collect(),
);
if final_query.entity == QueryComponent::In(vec![]) {
return Ok(vec![]);
}
}
}
if let QueryComponent::Variable(Some(var_name)) = &query.attribute {
if let Some(attributes) = vars.get(var_name) {
final_query.attribute = QueryComponent::In(
attributes
.iter()
.filter_map(|e| {
if let EntryPart::Attribute(a) = e {
Some(a.clone())
} else {
None
}
})
.collect(),
);
if final_query.attribute == QueryComponent::In(vec![]) {
return Ok(vec![]);
}
}
}
if let QueryComponent::Variable(Some(var_name)) = &query.value {
if let Some(values) = vars.get(var_name) {
final_query.value = QueryComponent::In(
values
.iter()
.filter_map(|e| match e {
EntryPart::Entity(a) => {
Some(EntryValue::Address(a.clone()))
}
EntryPart::Attribute(a) => {
Some(EntryValue::Address(Address::Attribute(
a.0.clone(),
)))
}
EntryPart::Value(v) => Some(v.clone()),
_ => None,
})
.collect(),
);
if final_query.value == QueryComponent::In(vec![]) {
return Ok(vec![]);
}
}
}
subquery_results = execute(
connection,
Query::SingleQuery(QueryPart::Matches(final_query)),
)?;
if subquery_results.is_empty() {
return Ok(vec![]);
}
if let QueryComponent::Variable(Some(var_name)) = &query.entity {
vars.insert(
var_name.clone(),
subquery_results
.iter()
.map(|e| {
EntryPart::Entity(
Address::decode(&e.entity)
.map_err(|e| QueryExecutionError(e.to_string()))
.unwrap(),
)
})
.collect(),
);
}
if let QueryComponent::Variable(Some(var_name)) = &query.attribute {
vars.insert(
var_name.clone(),
subquery_results
.iter()
.map(|e| {
EntryPart::Attribute(Attribute(e.attribute.clone()))
})
.collect(),
);
}
if let QueryComponent::Variable(Some(var_name)) = &query.value {
vars.insert(
var_name.clone(),
subquery_results
.iter()
.map(|e| {
if let Some(value_string) = &e.value_str {
if let Ok(value) = value_string.parse() {
return Ok(EntryPart::Value(value));
}
}
if let Some(value_number) = e.value_num {
return Ok(EntryPart::Value(EntryValue::Number(
value_number,
)));
}
Err(QueryExecutionError(
"value-less entries cannot be joined on".into(),
))
})
.collect::<Result<Vec<EntryPart>, _>>()?,
);
}
}
Ok(subquery_results)
} else {
let subquery_results = mq
.queries
.iter()
.map(|q| execute(connection, *q.clone()))
.collect::<Result<Vec<Vec<Entry>>, QueryExecutionError>>()?;
match mq.qualifier {
QueryQualifier::Join | QueryQualifier::Not => unreachable!(),
QueryQualifier::And => Ok(subquery_results
.into_iter()
.reduce(|acc, cur| {
acc.into_iter()
.filter(|e| {
cur.iter()
.map(|e| &e.identity)
.any(|x| x == &e.identity)
})
.collect()
})
.unwrap()), // TODO
QueryQualifier::Or => {
Ok(subquery_results.into_iter().flatten().collect())
}
}
}
}
},
}
}
}
type SqlPredicate = dyn BoxableExpression<data::table, Sqlite, SqlType = Bool>;
type SqlResult = Option<Box<SqlPredicate>>;
fn to_sqlite_predicates(query: Query) -> Result<SqlResult, QueryExecutionError> {
match query {
Query::SingleQuery(qp) => match qp {
QueryPart::Matches(eq) => {
let mut subqueries: Vec<Box<SqlPredicate>> = vec![];
match &eq.entity {
QueryComponent::Exact(q_entity) => {
subqueries.push(Box::new(data::entity.eq(q_entity.encode().map_err(
|e| QueryExecutionError(format!("failed producing sql: {e}")),
)?)))
}
QueryComponent::In(q_entities) => {
let entities: Result<Vec<_>, _> =
q_entities.iter().map(|t| t.encode()).collect();
subqueries.push(Box::new(data::entity.eq_any(entities.map_err(|e| {
QueryExecutionError(format!("failed producing sql: {e}"))
})?)))
}
QueryComponent::Contains(q_entity) => subqueries.push(Box::new(
data::entity_searchable.like(format!("%{}%", q_entity)),
)),
QueryComponent::Variable(_) => {}
QueryComponent::Key(_) => {
subqueries.push(Box::new)
}
};
match &eq.attribute {
QueryComponent::Exact(q_attribute) => {
subqueries.push(Box::new(data::attribute.eq(q_attribute.0.clone())))
}
QueryComponent::Key(q_attribute) => {
subqueries.push(Box::new(data::attribute.eq(q_attribute.clone())))
}
QueryComponent::In(q_attributes) => subqueries.push(Box::new(
data::attribute.eq_any(q_attributes.iter().map(|a| &a.0).cloned()),
)),
QueryComponent::Contains(q_attribute) => subqueries
.push(Box::new(data::attribute.like(format!("%{}%", q_attribute)))),
QueryComponent::Variable(_) => {}
};
match &eq.value {
QueryComponent::Exact(q_value) => match q_value {
EntryValue::Number(n) => subqueries.push(Box::new(data::value_num.eq(*n))),
_ => subqueries.push(Box::new(data::value_str.eq(
q_value.to_string().map_err(|e| {
QueryExecutionError(format!("failed producing sql: {e}"))
})?,
))),
},
QueryComponent::In(q_values) => {
let first = q_values.first().ok_or_else(|| {
QueryExecutionError(
"Malformed expression: Inner value cannot be empty.".into(),
)
})?;
match first {
EntryValue::Number(_) => subqueries.push(Box::new(
data::value_num.eq_any(
q_values
.iter()
.map(|v| {
if let EntryValue::Number(n) = v {
Ok(*n)
} else {
Err(QueryExecutionError(format!(
"IN queries must not combine numeric and \
string values! ({v} is not a number)"
)))
}
})
.collect::<Result<Vec<f64>, QueryExecutionError>>()?,
),
)),
_ => subqueries.push(Box::new(
data::value_str.eq_any(
q_values
.iter()
.map(|v| {
if let EntryValue::Number(_) = v {
Err(QueryExecutionError(format!(
"IN queries must not combine numeric and \
string values! (Found {v})"
)))
} else {
v.to_string().map_err(|e| {
QueryExecutionError(format!(
"failed producing sql: {e}"
))
})
}
})
.collect::<Result<Vec<String>, QueryExecutionError>>()?,
),
)),
}
}
QueryComponent::Contains(q_value) => {
subqueries.push(Box::new(data::value_str.like(format!("S%{}%", q_value))))
}
QueryComponent::Variable(_) => {}
QueryComponent::Key(key) => {
subqueries.push(Box::new(data::value_str.like(format!("K%{}%", key))))
}
};
match subqueries.len() {
0 => Ok(Some(Box::new(true.into_sql::<Bool>()))),
1 => Ok(Some(subqueries.remove(0))),
_ => {
let mut result: Box<And<Box<SqlPredicate>, Box<SqlPredicate>>> =
Box::new(And::new(subqueries.remove(0), subqueries.remove(0)));
while !subqueries.is_empty() {
result = Box::new(And::new(result, subqueries.remove(0)));
}
Ok(Some(Box::new(result)))
}
}
}
QueryPart::Type(_) => unimplemented!("Type queries are not yet implemented."),
},
Query::MultiQuery(mq) => {
let mq_result = mq
.queries
.into_iter()
.map(|sq| to_sqlite_predicates(*sq))
.collect::<Result<Vec<SqlResult>, QueryExecutionError>>()?;
let mq_result: Option<Vec<Box<SqlPredicate>>> = mq_result.into_iter().collect();
if let Some(mut subqueries) = mq_result {
match subqueries.len() {
0 => Ok(Some(Box::new(true.into_sql::<Bool>()))),
1 => {
if let QueryQualifier::Not = mq.qualifier {
Ok(Some(Box::new(Not::new(subqueries.remove(0)))))
} else {
Ok(Some(subqueries.remove(0)))
}
}
_ => match mq.qualifier {
QueryQualifier::Join => Ok(None),
QueryQualifier::And => {
let mut result: Box<And<Box<SqlPredicate>, Box<SqlPredicate>>> =
Box::new(And::new(subqueries.remove(0), subqueries.remove(0)));
while !subqueries.is_empty() {
result = Box::new(And::new(result, subqueries.remove(0)));
}
Ok(Some(Box::new(Grouped(result))))
}
QueryQualifier::Or => {
let mut result =
Box::new(Or::new(subqueries.remove(0), subqueries.remove(0)));
while !subqueries.is_empty() {
result = Box::new(Or::new(result, subqueries.remove(0)));
}
Ok(Some(Box::new(Grouped(result))))
}
QueryQualifier::Not => {
Err(QueryExecutionError("NOT only takes one subquery.".into()))
}
},
}
} else {
Ok(None)
}
}
}
}