fix(db): 🐛 fix join behavior
ci/woodpecker/push/woodpecker Pipeline was successful
Details
ci/woodpecker/push/woodpecker Pipeline was successful
Details
parent
f66857ca3b
commit
bb8d390d9e
128
db/src/engine.rs
128
db/src/engine.rs
|
@ -17,8 +17,9 @@ use diesel::{
|
|||
};
|
||||
use diesel::{BoxableExpression, QueryDsl};
|
||||
use diesel::{ExpressionMethods, TextExpressionMethods};
|
||||
use upend_base::addressing::Address;
|
||||
use upend_base::entry::EntryValue;
|
||||
use upend_base::lang::{PatternQuery, Query, QueryComponent, QueryPart, QueryQualifier};
|
||||
use upend_base::lang::{Query, QueryComponent, QueryPart, QueryQualifier};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct QueryExecutionError(String);
|
||||
|
@ -83,33 +84,81 @@ pub fn execute(
|
|||
.collect::<Option<Vec<_>>>();
|
||||
|
||||
if let Some(pattern_queries) = pattern_queries {
|
||||
let entries = zip(pattern_queries, subquery_results).map(
|
||||
|(query, results)| {
|
||||
results
|
||||
.into_iter()
|
||||
.map(|e| EntryWithVars::new(&query, e))
|
||||
.collect::<Vec<EntryWithVars>>()
|
||||
},
|
||||
);
|
||||
|
||||
let joined = entries
|
||||
.reduce(|acc, cur| {
|
||||
acc.into_iter()
|
||||
.filter(|tested_entry| {
|
||||
tested_entry.vars.iter().any(|(k1, v1)| {
|
||||
cur.iter().any(|other_entry| {
|
||||
other_entry
|
||||
.vars
|
||||
.iter()
|
||||
.any(|(k2, v2)| k1 == k2 && v1 == v2)
|
||||
})
|
||||
let mut vars: HashMap<String, Vec<EntryValue>> = HashMap::new();
|
||||
let mut matching_entries = vec![];
|
||||
for (query, results) in zip(pattern_queries, subquery_results) {
|
||||
matching_entries = results.clone();
|
||||
|
||||
if let QueryComponent::Variable(Some(var_name)) = &query.entity {
|
||||
if let Some(entities) = vars.get(var_name) {
|
||||
matching_entries.retain(|e| {
|
||||
Address::decode(&e.entity).is_ok_and(|e| {
|
||||
entities.contains(&EntryValue::Address(e))
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.unwrap(); // TODO
|
||||
});
|
||||
} else {
|
||||
vars.insert(
|
||||
var_name.clone(),
|
||||
results
|
||||
.iter()
|
||||
.map(|e| Address::decode(&e.entity).map(EntryValue::Address))
|
||||
.collect::<Result<Vec<EntryValue>, _>>().map_err(|e| QueryExecutionError(format!("failed producing sql: {e}")))?,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let QueryComponent::Variable(Some(var_name)) = &query.attribute {
|
||||
if let Some(attributes) = vars.get(var_name) {
|
||||
matching_entries.retain(|e| {
|
||||
attributes.contains(&EntryValue::Address(Address::Attribute(e.attribute.clone())))
|
||||
});
|
||||
} else {
|
||||
vars.insert(
|
||||
var_name.clone(),
|
||||
results
|
||||
.iter()
|
||||
.map(|e| EntryValue::Address(Address::Attribute(e.attribute.clone())))
|
||||
.collect(),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if let QueryComponent::Variable(Some(var_name)) = &query.value {
|
||||
if let Some(values) = vars.get(var_name) {
|
||||
matching_entries.retain(|e| {
|
||||
if let Some(value_string) = &e.value_str {
|
||||
if let Ok(value) = value_string.parse() {
|
||||
return values.contains(&value)
|
||||
}
|
||||
}
|
||||
if let Some(value_number) = e.value_num {
|
||||
return values.contains(&EntryValue::Number(value_number))
|
||||
}
|
||||
false
|
||||
});
|
||||
} else {
|
||||
vars.insert(
|
||||
var_name.clone(),
|
||||
results
|
||||
.iter()
|
||||
.map(|e| {
|
||||
if let Some(value_string) = &e.value_str {
|
||||
if let Ok(value) = value_string.parse() {
|
||||
return Ok(value)
|
||||
}
|
||||
}
|
||||
if let Some(value_number) = e.value_num {
|
||||
return Ok(EntryValue::Number(value_number))
|
||||
}
|
||||
Err(QueryExecutionError("value-less entries cannot be joined on".into()))
|
||||
}).collect::<Result<Vec<EntryValue>, _>>().map_err(|e| QueryExecutionError(format!("failed producing sql: {e}")))?,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(joined.into_iter().map(|ev| ev.entry).collect())
|
||||
Ok(matching_entries)
|
||||
} else {
|
||||
Err(QueryExecutionError(
|
||||
"Cannot join on non-atomic queries.".into(),
|
||||
|
@ -123,35 +172,6 @@ pub fn execute(
|
|||
}
|
||||
}
|
||||
|
||||
struct EntryWithVars {
|
||||
entry: Entry,
|
||||
vars: HashMap<String, String>,
|
||||
}
|
||||
|
||||
impl EntryWithVars {
|
||||
pub fn new(query: &PatternQuery, entry: Entry) -> Self {
|
||||
let mut vars = HashMap::new();
|
||||
|
||||
if let QueryComponent::Variable(Some(var_name)) = &query.entity {
|
||||
vars.insert(
|
||||
var_name.clone(),
|
||||
upend_base::hash::b58_encode(&entry.entity),
|
||||
);
|
||||
}
|
||||
|
||||
if let QueryComponent::Variable(Some(var_name)) = &query.attribute {
|
||||
vars.insert(var_name.clone(), entry.attribute.clone());
|
||||
}
|
||||
|
||||
if let QueryComponent::Variable(Some(var_name)) = &query.value {
|
||||
if let Some(value_str) = &entry.value_str {
|
||||
vars.insert(var_name.clone(), value_str.clone());
|
||||
}
|
||||
}
|
||||
|
||||
EntryWithVars { entry, vars }
|
||||
}
|
||||
}
|
||||
|
||||
type SqlPredicate = dyn BoxableExpression<data::table, Sqlite, SqlType = Bool>;
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ use super::schema::{data, meta};
|
|||
use chrono::NaiveDateTime;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Queryable, Insertable, Serialize, Debug)]
|
||||
#[derive(Queryable, Insertable, Serialize, Debug, Clone)]
|
||||
#[table_name = "data"]
|
||||
pub struct Entry {
|
||||
pub identity: Vec<u8>,
|
||||
|
|
|
@ -432,7 +432,7 @@ impl UpEndConnection {
|
|||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use upend_base::constants::ATTR_LABEL;
|
||||
use upend_base::constants::{ATTR_LABEL, ATTR_IN};
|
||||
|
||||
use super::*;
|
||||
use tempfile::TempDir;
|
||||
|
@ -527,16 +527,21 @@ mod test {
|
|||
let result = connection.query(query).unwrap();
|
||||
assert_eq!(result.len(), 1);
|
||||
|
||||
let edge_entity = Address::Uuid(uuid::Uuid::new_v4());
|
||||
upend_insert_addr!(connection, random_entity, ATTR_IN, other_entity).unwrap();
|
||||
upend_insert_addr!(connection, edge_entity, ATTR_IN, random_entity).unwrap();
|
||||
|
||||
let query = format!(
|
||||
r#"(join
|
||||
(matches ?a "FLAVOUR" ?)
|
||||
(matches ?a "{ATTR_LABEL}" "FOOBAR")
|
||||
(matches ?a "{ATTR_IN}" @{other_entity})
|
||||
(matches ? "{ATTR_IN}" ?a)
|
||||
)"#
|
||||
)
|
||||
.parse()
|
||||
.unwrap();
|
||||
let result = connection.query(query).unwrap();
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].value, "STRANGE".into());
|
||||
assert_eq!(result[0].entity, edge_entity);
|
||||
assert_eq!(result[0].value, EntryValue::Address(random_entity));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue