use crate::addressing::Address; use crate::hash::{decode, hash, Hash, Hashable}; use crate::models; use crate::schema::data; use crate::util::LoggerSink; use anyhow::{anyhow, Result}; use diesel::debug_query; use diesel::expression::operators::{And, Or}; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager}; use diesel::result::{DatabaseErrorKind, Error}; use diesel::sql_types::Bool; use diesel::sqlite::{Sqlite, SqliteConnection}; use lexpr::value::Value::Symbol; use lexpr::Value; use lexpr::Value::Cons; use log::{debug, trace}; use nonempty::NonEmpty; use serde::{Deserialize, Serialize}; use std::borrow::Borrow; use std::convert::TryFrom; use std::fs; use std::io::{Cursor, Write}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; pub const TYPE_ATTR: &str = "TYPE"; pub const TYPE_HAS_ATTR: &str = "TYPE_HAS"; pub const IS_TYPE_ATTR: &str = "IS"; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Entry { pub entity: Address, pub attribute: String, pub value: EntryValue, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct InvariantEntry { pub attribute: String, pub value: EntryValue, } #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(tag = "t", content = "c")] pub enum EntryValue { Value(serde_json::Value), Address(Address), Invalid, } impl TryFrom<&models::Entry> for Entry { type Error = anyhow::Error; fn try_from(e: &models::Entry) -> Result { Ok(Entry { entity: Address::decode(&e.entity)?, attribute: e.attribute.clone(), value: e.value.parse().unwrap(), }) } } impl TryFrom<&Entry> for models::Entry { type Error = anyhow::Error; fn try_from(e: &Entry) -> Result { Ok(models::Entry { identity: e.address()?.encode()?, entity: e.entity.encode()?, attribute: e.attribute.clone(), value: e.value.to_string()?, }) } } impl TryFrom<&InvariantEntry> for Entry { type Error = anyhow::Error; fn try_from(invariant: &InvariantEntry) -> Result { let mut entity = Cursor::new(vec![0u8; 0]); entity.write_all(invariant.attribute.as_bytes())?; entity.write_all(invariant.value.to_string()?.as_bytes())?; Ok(Entry { entity: Address::Hash(Hash(entity.into_inner())), attribute: invariant.attribute.clone(), value: invariant.value.clone(), }) } } impl std::fmt::Display for Entry { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} | {} | {}", self.entity, self.attribute, self.value) } } impl Hashable for Entry { fn hash(self: &Entry) -> Result { let mut result = Cursor::new(vec![0u8; 0]); result.write_all(self.entity.encode()?.as_slice())?; result.write_all(self.attribute.as_bytes())?; result.write_all(self.value.to_string()?.as_bytes())?; Ok(hash(result.get_ref())) } } impl Hashable for InvariantEntry { fn hash(&self) -> Result { Entry::try_from(self)?.hash() } } pub trait Addressable: Hashable { fn address(&self) -> Result
{ Ok(Address::Hash(self.hash()?)) } } impl Addressable for Entry {} impl Addressable for InvariantEntry {} impl EntryValue { pub fn to_string(&self) -> Result { let (type_char, content) = match self { EntryValue::Value(value) => ('J', serde_json::to_string(value)?), EntryValue::Address(address) => ('O', address.to_string()), EntryValue::Invalid => return Err(anyhow!("Cannot serialize invalid Entity value.")), }; Ok(format!("{}{}", type_char, content)) } } impl std::fmt::Display for EntryValue { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let (entry_type, entry_value) = match self { EntryValue::Address(address) => ("ADDRESS", address.to_string()), EntryValue::Value(value) => ( "VALUE", serde_json::to_string(value).unwrap_or_else(|_| String::from("?!?!?!")), ), EntryValue::Invalid => ("INVALID", "INVALID".to_string()), }; write!(f, "{}: {}", entry_type, entry_value) } } impl std::str::FromStr for EntryValue { type Err = std::convert::Infallible; fn from_str(s: &str) -> Result { if s.len() < 2 { Ok(EntryValue::Invalid) } else { let (type_char, content) = s.split_at(1); match (type_char, content) { ("J", content) => { if let Ok(value) = serde_json::from_str(content) { Ok(EntryValue::Value(value)) } else { Ok(EntryValue::Invalid) } } ("O", content) => { if let Ok(addr) = decode(content).and_then(|v| Address::decode(&v)) { Ok(EntryValue::Address(addr)) } else { Ok(EntryValue::Invalid) } } _ => Ok(EntryValue::Invalid), } } } } pub fn insert_file>( connection: &C, file: models::NewFile, ) -> Result { use crate::schema::files; debug!( "Inserting {} ({})...", &file.path, Address::Hash(Hash((&file.hash).clone())) ); Ok(diesel::insert_into(files::table) .values(file) .execute(connection)?) } pub fn retrieve_all_files>( connection: &C, ) -> Result> { use crate::schema::files::dsl::*; let matches = files.load::(connection)?; Ok(matches) } pub fn get_latest_files>( connection: &C, count: i64, ) -> Result> { use crate::schema::files::dsl::*; let matches = files .order_by(added.desc()) .limit(count) .load::(connection)?; Ok(matches) } pub fn retrieve_file>( connection: &C, obj_hash: Hash, ) -> Result> { use crate::schema::files::dsl::*; let matches = files .filter(valid.eq(true)) .filter(hash.eq(obj_hash.0)) .load::(connection)?; Ok(matches.get(0).map(|f| f.path.clone())) } pub fn file_set_valid>( connection: &C, file_id: i32, is_valid: bool, ) -> Result { use crate::schema::files::dsl::*; debug!("Setting file ID {} to valid = {}", file_id, is_valid); Ok(diesel::update(files.filter(id.eq(file_id))) .set(valid.eq(is_valid)) .execute(connection)?) } pub fn retrieve_object>( connection: &C, object_address: Address, ) -> Result> { use crate::schema::data::dsl::*; let matches = data .filter(entity.eq(object_address.encode()?)) .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)) .load::(connection)?; let entries = matches .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn bulk_retrieve_objects>( connection: &C, object_addresses: Vec
, ) -> Result> { use crate::schema::data::dsl::*; let matches = data .filter( entity.eq_any( object_addresses .iter() .filter_map(|addr| addr.encode().ok()), ), ) // .or_filter(value.eq(EntryValue::Address(object_address).to_str()?)) .load::(connection)?; let entries = matches .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn remove_object>( connection: &C, object_address: Address, ) -> Result { use crate::schema::data::dsl::*; debug!("Deleting {}!", object_address); let matches = data .filter(identity.eq(object_address.encode()?)) .or_filter(entity.eq(object_address.encode()?)) .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)); Ok(diesel::delete(matches).execute(connection)?) } #[derive(Debug)] pub enum QueryComponent where T: FromStr, { Exact(T), In(Vec), Contains(String), Any, } #[derive(Debug)] pub struct EntryQuery { pub entity: QueryComponent
, pub attribute: QueryComponent, pub value: QueryComponent, } #[derive(Debug)] pub enum QueryPart { Matches(EntryQuery), Type(String), } #[derive(Debug)] pub enum QueryQualifier { AND, OR, } #[derive(Debug)] pub struct MultiQuery { pub qualifier: QueryQualifier, pub queries: NonEmpty>, } #[derive(Debug)] pub enum Query { SingleQuery(QueryPart), MultiQuery(MultiQuery), } type Predicate = dyn BoxableExpression; impl TryFrom<&lexpr::Value> for Query { type Error = anyhow::Error; fn try_from(expression: &Value) -> Result { fn parse_component(value: &lexpr::Value) -> Result> where ::Err: std::fmt::Debug, { match value { Cons(cons) => { if let Symbol(symbol) = cons.car() { match symbol.borrow() { "in" => { let (cons_vec, _) = cons.clone().into_vec(); if let Some(split) = cons_vec.split_first() { let args = split.1; let values: Result, _> = args.iter().map(|value| { if let lexpr::Value::String(str) = value { match T::from_str(str.borrow()) { Ok(value) => Ok(value), Err(error) => Err(anyhow!(format!("Malformed expression: Conversion of inner value '{}' from string failed: {:#?}",str, error))), } } else { Err(anyhow!("Malformed expression: Inner value list must be comprised of strings.")) } }).collect(); Ok(QueryComponent::In(values?)) } else { Err(anyhow!( "Malformed expression: Inner value cannot be empty." )) } } "contains" => { let (mut cons_vec, _) = cons.clone().into_vec(); match cons_vec.len() { 2 => { let value = cons_vec.remove(1); if let lexpr::Value::String(str) = value { Ok(QueryComponent::Contains(str.into_string())) } else { Err(anyhow!("Malformed expression: 'Contains' argument must be a string.")) } } _ => Err(anyhow!( "Malformed expression: 'Contains' requires a single argument." )), } } _ => Err(anyhow!(format!( "Malformed expression: Unknown symbol {}", symbol ))), } } else { Err(anyhow!(format!( "Malformed expression: Inner value '{:?}' is not a symbol.", value ))) } } lexpr::Value::String(str) => match T::from_str(str.borrow()) { Ok(value) => Ok(QueryComponent::Exact(value)), Err(error) => Err(anyhow!(format!( "Malformed expression: Conversion of inner value '{}' from string failed: {:#?}", str, error ))), }, lexpr::Value::Symbol(symbol) => match symbol.borrow() { "?" => Ok(QueryComponent::Any), _ => Err(anyhow!(format!( "Malformed expression: Unknown symbol {}", symbol ))), }, _ => Err(anyhow!( "Malformed expression: Inner value not a string, list or '?'." )), } } if let Cons(value) = expression { if let Symbol(symbol) = value.car() { match symbol.borrow() { "matches" => { let (cons_vec, _) = value.clone().into_vec(); if let [_, entity, attribute, value] = &cons_vec[..] { let entity = parse_component::
(entity)?; let attribute = parse_component::(attribute)?; let value = parse_component::(value)?; Ok(Query::SingleQuery(QueryPart::Matches(EntryQuery { entity, attribute, value, }))) } else { Err(anyhow!( "Malformed expression: Wrong number of arguments to 'matches'." )) } } "type" => { let (cons_vec, _) = value.clone().into_vec(); if let [_, type_name] = &cons_vec[..] { if let lexpr::Value::String(type_name_str) = type_name { Ok(Query::SingleQuery(QueryPart::Type( type_name_str.to_string(), ))) } else { Err(anyhow!( "Malformed expression: Type must be specified as a string." )) } } else { Err(anyhow!( "Malformed expression: Wrong number of arguments to 'type'." )) } } "and" | "or" => { let (cons_vec, _) = value.clone().into_vec(); let sub_expressions = &cons_vec[1..]; let values: Result>> = sub_expressions .iter() .map(|value| Ok(Box::new(Query::try_from(value)?))) .collect(); if let Some(queries) = NonEmpty::from_vec(values?) { Ok(Query::MultiQuery(MultiQuery { qualifier: match symbol.borrow() { "and" => QueryQualifier::AND, _ => QueryQualifier::OR, }, queries, })) } else { Err(anyhow!( "Malformed expression: sub-query list cannot be empty.", )) } } _ => Err(anyhow!(format!( "Malformed expression: Unknown symbol '{}'.", symbol ))), } } else { Err(anyhow!(format!( "Malformed expression: Value '{:?}' is not a symbol.", value ))) } } else { Err(anyhow!("Malformed expression: Not a list.")) } } } impl Query { fn to_sqlite_predicates(&self) -> Result> { match self { Query::SingleQuery(qp) => { match qp { QueryPart::Matches(eq) => { let mut subqueries: Vec> = vec![]; match &eq.entity { QueryComponent::Exact(q_entity) => { subqueries.push(Box::new(data::entity.eq(q_entity.encode()?))) } QueryComponent::In(q_entities) => { let entities: Result, _> = q_entities.iter().map(|t| t.encode()).collect(); subqueries.push(Box::new(data::entity.eq_any(entities?))) } QueryComponent::Contains(_) => { return Err(anyhow!("Addresses cannot be queried alike.")) } QueryComponent::Any => {} }; match &eq.attribute { QueryComponent::Exact(q_attribute) => { subqueries.push(Box::new(data::attribute.eq(q_attribute.clone()))) } QueryComponent::In(q_attributes) => subqueries .push(Box::new(data::attribute.eq_any(q_attributes.clone()))), QueryComponent::Contains(q_attribute) => subqueries .push(Box::new(data::attribute.like(format!("%{}%", q_attribute)))), QueryComponent::Any => {} }; match &eq.value { QueryComponent::Exact(q_value) => { subqueries.push(Box::new(data::value.eq(q_value.to_string()?))) } QueryComponent::In(q_values) => { let values: Result, _> = q_values.iter().map(|v| v.to_string()).collect(); subqueries.push(Box::new(data::value.eq_any(values?))) } QueryComponent::Contains(q_value) => subqueries .push(Box::new(data::value.like(format!("%{}%", q_value)))), QueryComponent::Any => {} }; match subqueries.len() { 0 => Ok(Box::new(true.into_sql::())), 1 => Ok(subqueries.remove(0)), _ => { let mut result: Box, Box>> = Box::new(And::new(subqueries.remove(0), subqueries.remove(0))); while !subqueries.is_empty() { result = Box::new(And::new(result, subqueries.remove(0))); } Ok(Box::new(result)) } } } QueryPart::Type(_) => unimplemented!("Type queries are not yet implemented."), } } Query::MultiQuery(mq) => { let subqueries: Result>> = mq .queries .iter() .map(|sq| sq.to_sqlite_predicates()) .collect(); let mut subqueries: Vec> = subqueries?; match subqueries.len() { 0 => Ok(Box::new(true.into_sql::())), 1 => Ok(subqueries.remove(0)), _ => match mq.qualifier { QueryQualifier::AND => { let mut result: Box, Box>> = Box::new(And::new(subqueries.remove(0), subqueries.remove(0))); while !subqueries.is_empty() { result = Box::new(And::new(result, subqueries.remove(0))); } Ok(Box::new(result)) } QueryQualifier::OR => { let mut result: Box, Box>> = Box::new(Or::new(subqueries.remove(0), subqueries.remove(0))); while !subqueries.is_empty() { result = Box::new(Or::new(result, subqueries.remove(0))); } Ok(Box::new(result)) } }, } } } } } pub fn query>(connection: &C, query: Query) -> Result> { use crate::schema::data::dsl::*; let db_query = data.filter(query.to_sqlite_predicates()?); trace!("Querying: {}", debug_query(&db_query)); let matches = db_query.load::(connection)?; let entries = matches .iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } pub fn insert_entry>( connection: &C, entry: Entry, ) -> Result
{ debug!("Inserting: {}", entry); let insert_entry = models::Entry::try_from(&entry)?; let entry = Entry::try_from(&insert_entry)?; let result = diesel::insert_into(data::table) .values(insert_entry) .execute(connection); if let Some(error) = result.err() { match error { Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {} _ => return Err(anyhow!(error)), } } Ok(Address::Hash(entry.hash()?)) } #[derive(Debug)] pub struct ConnectionOptions { pub enable_foreign_keys: bool, pub busy_timeout: Option, } impl ConnectionOptions { pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { if self.enable_foreign_keys { conn.execute("PRAGMA foreign_keys = ON;")?; } if let Some(duration) = self.busy_timeout { conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; } Ok(()) } } impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { self.apply(conn).map_err(diesel::r2d2::Error::QueryError) } } pub type DbPool = r2d2::Pool>; pub struct OpenResult { pub pool: DbPool, pub new: bool, } pub const DATABASE_FILENAME: &str = "upend.sqlite3"; pub fn open_upend>(dirpath: P, reinitialize: bool) -> Result { embed_migrations!("./migrations/upend/"); let database_path: PathBuf = dirpath.as_ref().join(DATABASE_FILENAME); if reinitialize { let _ = fs::remove_file(&database_path); } let new = !database_path.exists(); let manager = ConnectionManager::::new(database_path.to_str().unwrap()); let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { enable_foreign_keys: true, busy_timeout: Some(Duration::from_secs(30)), })) .build(manager) .expect("Failed to create pool."); embedded_migrations::run_with_output( &pool.get().unwrap(), &mut LoggerSink { ..Default::default() }, )?; Ok(OpenResult { pool, new }) }