diff --git a/src/addressing.rs b/src/addressing.rs index 1dba1f8..064bc6c 100644 --- a/src/addressing.rs +++ b/src/addressing.rs @@ -1,4 +1,4 @@ -use crate::hash::{decode, encode, Hash}; +use crate::hash::{decode, encode, Hash, Hashable}; use anyhow::{anyhow, Result}; use serde::de::Visitor; use serde::{de, ser, Deserialize, Deserializer, Serialize, Serializer}; @@ -124,6 +124,12 @@ impl std::fmt::Debug for Address { } } +pub trait Addressable: Hashable { + fn address(&self) -> Result
{ + Ok(Address::Hash(self.hash()?)) + } +} + #[cfg(test)] mod tests { use uuid::Uuid; diff --git a/src/database.rs b/src/database.rs deleted file mode 100644 index 7c58857..0000000 --- a/src/database.rs +++ /dev/null @@ -1,802 +0,0 @@ -#![macro_use] -use crate::addressing::Address; -use crate::hash::{decode, hash, Hash, Hashable}; -use crate::models; -use crate::schema::data; -use crate::util::LoggerSink; -use anyhow::{anyhow, Result}; -use diesel::debug_query; -use diesel::expression::operators::{And, Or}; -use diesel::prelude::*; -use diesel::r2d2::{self, ConnectionManager}; -use diesel::result::{DatabaseErrorKind, Error}; -use diesel::sql_types::Bool; -use diesel::sqlite::{Sqlite, SqliteConnection}; -use lexpr::value::Value::Symbol; -use lexpr::Value; -use lexpr::Value::Cons; -use log::{debug, trace}; -use nonempty::NonEmpty; -use serde::{Deserialize, Serialize}; -use std::borrow::Borrow; -use std::convert::TryFrom; -use std::fs; -use std::io::{Cursor, Write}; -use std::path::{Path, PathBuf}; -use std::str::FromStr; -use std::time::Duration; - -pub const TYPE_TYPE: &str = "TYPE"; -pub const TYPE_IS_ATTR: &str = "TYPE"; -pub const TYPE_REQUIRES_ATTR: &str = "TYPE_REQUIRES"; -pub const TYPE_HAS_ATTR: &str = "TYPE_HAS"; -pub const TYPE_ID_ATTR: &str = "TYPE_ID"; -pub const TYPE_INSTANCES_ATTR: &str = "TYPE_INSTANCES"; -pub const IS_OF_TYPE_ATTR: &str = "IS"; - -lazy_static! { - static ref TYPE_INVARIANT: InvariantEntry = InvariantEntry { - attribute: String::from(TYPE_IS_ATTR), - value: EntryValue::Value(serde_json::Value::from(TYPE_TYPE)), - }; - pub static ref TYPE_ADDR: Address = TYPE_INVARIANT.entity().unwrap(); -} - -#[macro_use] -mod macros { - macro_rules! upend_insert_val { - ($db_connection:expr, $entity:expr, $attribute:expr, $value:expr) => {{ - insert_entry( - $db_connection, - Entry { - entity: $entity.clone(), - attribute: String::from($attribute), - value: EntryValue::Value(serde_json::Value::from($value)), - }, - )?; - }}; - } - - macro_rules! upend_insert_addr { - ($db_connection:expr, $entity:expr, $attribute:expr, $addr:expr) => {{ - insert_entry( - $db_connection, - Entry { - entity: $entity.clone(), - attribute: String::from($attribute), - value: EntryValue::Address($addr.clone()), - }, - )?; - }}; - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct InEntry { - pub entity: Option
, - pub attribute: String, - pub value: EntryValue, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Entry { - pub entity: Address, - pub attribute: String, - pub value: EntryValue, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct InvariantEntry { - pub attribute: String, - pub value: EntryValue, -} - -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] -#[serde(tag = "t", content = "c")] -pub enum EntryValue { - Value(serde_json::Value), - Address(Address), - Invalid, -} - -impl TryFrom<&models::Entry> for Entry { - type Error = anyhow::Error; - - fn try_from(e: &models::Entry) -> Result { - Ok(Entry { - entity: Address::decode(&e.entity)?, - attribute: e.attribute.clone(), - value: e.value.parse().unwrap(), - }) - } -} - -impl TryFrom<&Entry> for models::Entry { - type Error = anyhow::Error; - - fn try_from(e: &Entry) -> Result { - Ok(models::Entry { - identity: e.address()?.encode()?, - entity: e.entity.encode()?, - attribute: e.attribute.clone(), - value: e.value.to_string()?, - }) - } -} - -impl TryFrom<&InvariantEntry> for Entry { - type Error = anyhow::Error; - - fn try_from(invariant: &InvariantEntry) -> Result { - Ok(Entry { - entity: invariant.entity()?, - attribute: invariant.attribute.clone(), - value: invariant.value.clone(), - }) - } -} - -impl TryFrom for Entry { - type Error = anyhow::Error; - - fn try_from(in_entry: InEntry) -> Result { - match in_entry.entity { - Some(entity) => Ok(Entry { - entity, - attribute: in_entry.attribute, - value: in_entry.value, - }), - None => Ok(Entry::try_from(&InvariantEntry { - attribute: in_entry.attribute, - value: in_entry.value, - })?), - } - } -} - -impl InvariantEntry { - pub fn entity(&self) -> Result
{ - let mut entity = Cursor::new(vec![0u8; 0]); - entity.write_all(self.attribute.as_bytes())?; - entity.write_all(self.value.to_string()?.as_bytes())?; - Ok(Address::Hash(hash(entity.into_inner()))) - } -} - -impl std::fmt::Display for Entry { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} | {} | {}", self.entity, self.attribute, self.value) - } -} - -impl Hashable for Entry { - fn hash(self: &Entry) -> Result { - let mut result = Cursor::new(vec![0u8; 0]); - result.write_all(self.entity.encode()?.as_slice())?; - result.write_all(self.attribute.as_bytes())?; - result.write_all(self.value.to_string()?.as_bytes())?; - Ok(hash(result.get_ref())) - } -} - -impl Hashable for InvariantEntry { - fn hash(&self) -> Result { - Entry::try_from(self)?.hash() - } -} - -pub trait Addressable: Hashable { - fn address(&self) -> Result
{ - Ok(Address::Hash(self.hash()?)) - } -} - -impl Addressable for Entry {} -impl Addressable for InvariantEntry {} - -impl EntryValue { - pub fn to_string(&self) -> Result { - let (type_char, content) = match self { - EntryValue::Value(value) => ('J', serde_json::to_string(value)?), - EntryValue::Address(address) => ('O', address.to_string()), - EntryValue::Invalid => return Err(anyhow!("Cannot serialize invalid Entity value.")), - }; - - Ok(format!("{}{}", type_char, content)) - } -} - -impl std::fmt::Display for EntryValue { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let (entry_type, entry_value) = match self { - EntryValue::Address(address) => ("ADDRESS", address.to_string()), - EntryValue::Value(value) => ( - "VALUE", - serde_json::to_string(value).unwrap_or_else(|_| String::from("?!?!?!")), - ), - EntryValue::Invalid => ("INVALID", "INVALID".to_string()), - }; - write!(f, "{}: {}", entry_type, entry_value) - } -} - -impl std::str::FromStr for EntryValue { - type Err = std::convert::Infallible; - - fn from_str(s: &str) -> Result { - if s.len() < 2 { - Ok(EntryValue::Invalid) - } else { - let (type_char, content) = s.split_at(1); - match (type_char, content) { - ("J", content) => { - if let Ok(value) = serde_json::from_str(content) { - Ok(EntryValue::Value(value)) - } else { - Ok(EntryValue::Invalid) - } - } - ("O", content) => { - if let Ok(addr) = decode(content).and_then(|v| Address::decode(&v)) { - Ok(EntryValue::Address(addr)) - } else { - Ok(EntryValue::Invalid) - } - } - _ => Ok(EntryValue::Invalid), - } - } - } -} - -pub fn insert_file>( - connection: &C, - file: models::NewFile, -) -> Result { - use crate::schema::files; - - debug!( - "Inserting {} ({})...", - &file.path, - Address::Hash(Hash((&file.hash).clone())) - ); - - Ok(diesel::insert_into(files::table) - .values(file) - .execute(connection)?) -} - -pub fn retrieve_file>( - connection: &C, - obj_hash: Hash, -) -> Result> { - use crate::schema::files::dsl::*; - - let matches = files - .filter(valid.eq(true)) - .filter(hash.eq(obj_hash.0)) - .load::(connection)?; - - Ok(matches) -} - -pub fn retrieve_all_files>( - connection: &C, -) -> Result> { - use crate::schema::files::dsl::*; - let matches = files.load::(connection)?; - Ok(matches) -} - -pub fn get_latest_files>( - connection: &C, - count: i64, -) -> Result> { - use crate::schema::files::dsl::*; - - let matches = files - .order_by(added.desc()) - .limit(count) - .load::(connection)?; - - Ok(matches) -} - -pub fn file_set_valid>( - connection: &C, - file_id: i32, - is_valid: bool, -) -> Result { - use crate::schema::files::dsl::*; - - debug!("Setting file ID {} to valid = {}", file_id, is_valid); - - Ok(diesel::update(files.filter(id.eq(file_id))) - .set(valid.eq(is_valid)) - .execute(connection)?) -} - -pub fn retrieve_object>( - connection: &C, - object_address: Address, -) -> Result> { - use crate::schema::data::dsl::*; - - let matches = data - .filter(entity.eq(object_address.encode()?)) - .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)) - .load::(connection)?; - let entries = matches - .iter() - .map(Entry::try_from) - .filter_map(Result::ok) - .collect(); - - Ok(entries) -} - -pub fn bulk_retrieve_objects>( - connection: &C, - object_addresses: Vec
, -) -> Result> { - use crate::schema::data::dsl::*; - - let matches = data - .filter( - entity.eq_any( - object_addresses - .iter() - .filter_map(|addr| addr.encode().ok()), - ), - ) - // .or_filter(value.eq(EntryValue::Address(object_address).to_str()?)) - .load::(connection)?; - let entries = matches - .iter() - .map(Entry::try_from) - .filter_map(Result::ok) - .collect(); - - Ok(entries) -} - -pub fn remove_object>( - connection: &C, - object_address: Address, -) -> Result { - use crate::schema::data::dsl::*; - - debug!("Deleting {}!", object_address); - - let matches = data - .filter(identity.eq(object_address.encode()?)) - .or_filter(entity.eq(object_address.encode()?)) - .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)); - - Ok(diesel::delete(matches).execute(connection)?) -} - -#[derive(Debug)] -pub enum QueryComponent -where - T: FromStr, -{ - Exact(T), - In(Vec), - Contains(String), - Any, -} - -#[derive(Debug)] -pub struct EntryQuery { - pub entity: QueryComponent
, - pub attribute: QueryComponent, - pub value: QueryComponent, -} - -#[derive(Debug)] -pub enum QueryPart { - Matches(EntryQuery), - Type(String), -} - -#[derive(Debug)] -pub enum QueryQualifier { - And, - Or, -} - -#[derive(Debug)] -pub struct MultiQuery { - pub qualifier: QueryQualifier, - pub queries: NonEmpty>, -} - -#[derive(Debug)] -pub enum Query { - SingleQuery(QueryPart), - MultiQuery(MultiQuery), -} - -type Predicate = dyn BoxableExpression; - -impl TryFrom<&lexpr::Value> for Query { - type Error = anyhow::Error; - - fn try_from(expression: &Value) -> Result { - fn parse_component(value: &lexpr::Value) -> Result> - where - ::Err: std::fmt::Debug, - { - match value { - Cons(cons) => { - if let Symbol(symbol) = cons.car() { - match symbol.borrow() { - "in" => { - let (cons_vec, _) = cons.clone().into_vec(); - if let Some(split) = cons_vec.split_first() { - let args = split.1; - let values: Result, _> = args.iter().map(|value| { - if let lexpr::Value::String(str) = value { - match T::from_str(str.borrow()) { - Ok(value) => Ok(value), - Err(error) => Err(anyhow!(format!("Malformed expression: Conversion of inner value '{}' from string failed: {:#?}",str, error))), - } - } else { - Err(anyhow!("Malformed expression: Inner value list must be comprised of strings.")) - } - }).collect(); - - Ok(QueryComponent::In(values?)) - } else { - Err(anyhow!( - "Malformed expression: Inner value cannot be empty." - )) - } - } - "contains" => { - let (mut cons_vec, _) = cons.clone().into_vec(); - match cons_vec.len() { - 2 => { - let value = cons_vec.remove(1); - if let lexpr::Value::String(str) = value { - Ok(QueryComponent::Contains(str.into_string())) - } else { - Err(anyhow!("Malformed expression: 'Contains' argument must be a string.")) - } - } - _ => Err(anyhow!( - "Malformed expression: 'Contains' requires a single argument." - )), - } - } - _ => Err(anyhow!(format!( - "Malformed expression: Unknown symbol {}", - symbol - ))), - } - } else { - Err(anyhow!(format!( - "Malformed expression: Inner value '{:?}' is not a symbol.", - value - ))) - } - } - lexpr::Value::String(str) => match T::from_str(str.borrow()) { - Ok(value) => Ok(QueryComponent::Exact(value)), - Err(error) => Err(anyhow!(format!( - "Malformed expression: Conversion of inner value '{}' from string failed: {:#?}", - str, error - ))), - }, - lexpr::Value::Symbol(symbol) => match symbol.borrow() { - "?" => Ok(QueryComponent::Any), - _ => Err(anyhow!(format!( - "Malformed expression: Unknown symbol {}", - symbol - ))), - }, - _ => Err(anyhow!( - "Malformed expression: Inner value not a string, list or '?'." - )), - } - } - - if let Cons(value) = expression { - if let Symbol(symbol) = value.car() { - match symbol.borrow() { - "matches" => { - let (cons_vec, _) = value.clone().into_vec(); - if let [_, entity, attribute, value] = &cons_vec[..] { - let entity = parse_component::
(entity)?; - let attribute = parse_component::(attribute)?; - let value = parse_component::(value)?; - Ok(Query::SingleQuery(QueryPart::Matches(EntryQuery { - entity, - attribute, - value, - }))) - } else { - Err(anyhow!( - "Malformed expression: Wrong number of arguments to 'matches'." - )) - } - } - "type" => { - let (cons_vec, _) = value.clone().into_vec(); - if let [_, type_name] = &cons_vec[..] { - if let lexpr::Value::String(type_name_str) = type_name { - Ok(Query::SingleQuery(QueryPart::Type( - type_name_str.to_string(), - ))) - } else { - Err(anyhow!( - "Malformed expression: Type must be specified as a string." - )) - } - } else { - Err(anyhow!( - "Malformed expression: Wrong number of arguments to 'type'." - )) - } - } - "and" | "or" => { - let (cons_vec, _) = value.clone().into_vec(); - let sub_expressions = &cons_vec[1..]; - let values: Result>> = sub_expressions - .iter() - .map(|value| Ok(Box::new(Query::try_from(value)?))) - .collect(); - - if let Some(queries) = NonEmpty::from_vec(values?) { - Ok(Query::MultiQuery(MultiQuery { - qualifier: match symbol.borrow() { - "and" => QueryQualifier::And, - _ => QueryQualifier::Or, - }, - queries, - })) - } else { - Err(anyhow!( - "Malformed expression: sub-query list cannot be empty.", - )) - } - } - _ => Err(anyhow!(format!( - "Malformed expression: Unknown symbol '{}'.", - symbol - ))), - } - } else { - Err(anyhow!(format!( - "Malformed expression: Value '{:?}' is not a symbol.", - value - ))) - } - } else { - Err(anyhow!("Malformed expression: Not a list.")) - } - } -} - -impl Query { - fn to_sqlite_predicates(&self) -> Result> { - match self { - Query::SingleQuery(qp) => { - match qp { - QueryPart::Matches(eq) => { - let mut subqueries: Vec> = vec![]; - - match &eq.entity { - QueryComponent::Exact(q_entity) => { - subqueries.push(Box::new(data::entity.eq(q_entity.encode()?))) - } - QueryComponent::In(q_entities) => { - let entities: Result, _> = - q_entities.iter().map(|t| t.encode()).collect(); - subqueries.push(Box::new(data::entity.eq_any(entities?))) - } - QueryComponent::Contains(_) => { - return Err(anyhow!("Addresses cannot be queried alike.")) - } - QueryComponent::Any => {} - }; - - match &eq.attribute { - QueryComponent::Exact(q_attribute) => { - subqueries.push(Box::new(data::attribute.eq(q_attribute.clone()))) - } - QueryComponent::In(q_attributes) => subqueries - .push(Box::new(data::attribute.eq_any(q_attributes.clone()))), - QueryComponent::Contains(q_attribute) => subqueries - .push(Box::new(data::attribute.like(format!("%{}%", q_attribute)))), - QueryComponent::Any => {} - }; - - match &eq.value { - QueryComponent::Exact(q_value) => { - subqueries.push(Box::new(data::value.eq(q_value.to_string()?))) - } - QueryComponent::In(q_values) => { - let values: Result, _> = - q_values.iter().map(|v| v.to_string()).collect(); - subqueries.push(Box::new(data::value.eq_any(values?))) - } - QueryComponent::Contains(q_value) => subqueries - .push(Box::new(data::value.like(format!("%{}%", q_value)))), - QueryComponent::Any => {} - }; - - match subqueries.len() { - 0 => Ok(Box::new(true.into_sql::())), - 1 => Ok(subqueries.remove(0)), - _ => { - let mut result: Box, Box>> = - Box::new(And::new(subqueries.remove(0), subqueries.remove(0))); - while !subqueries.is_empty() { - result = Box::new(And::new(result, subqueries.remove(0))); - } - Ok(Box::new(result)) - } - } - } - QueryPart::Type(_) => unimplemented!("Type queries are not yet implemented."), - } - } - Query::MultiQuery(mq) => { - let subqueries: Result>> = mq - .queries - .iter() - .map(|sq| sq.to_sqlite_predicates()) - .collect(); - let mut subqueries: Vec> = subqueries?; - match subqueries.len() { - 0 => Ok(Box::new(true.into_sql::())), - 1 => Ok(subqueries.remove(0)), - _ => match mq.qualifier { - QueryQualifier::And => { - let mut result: Box, Box>> = - Box::new(And::new(subqueries.remove(0), subqueries.remove(0))); - while !subqueries.is_empty() { - result = Box::new(And::new(result, subqueries.remove(0))); - } - Ok(Box::new(result)) - } - QueryQualifier::Or => { - let mut result: Box, Box>> = - Box::new(Or::new(subqueries.remove(0), subqueries.remove(0))); - while !subqueries.is_empty() { - result = Box::new(Or::new(result, subqueries.remove(0))); - } - Ok(Box::new(result)) - } - }, - } - } - } - } -} - -pub fn query>(connection: &C, query: Query) -> Result> { - use crate::schema::data::dsl::*; - - trace!("Querying: {:?}", query); - - let db_query = data.filter(query.to_sqlite_predicates()?); - - trace!("DB query: {}", debug_query(&db_query)); - - let matches = db_query.load::(connection)?; - - let entries = matches - .iter() - .map(Entry::try_from) - .filter_map(Result::ok) - .collect(); - - Ok(entries) -} - -pub fn insert_entry>( - connection: &C, - entry: Entry, -) -> Result
{ - debug!("Inserting: {}", entry); - - let insert_entry = models::Entry::try_from(&entry)?; - - let entry = Entry::try_from(&insert_entry)?; - - let result = diesel::insert_into(data::table) - .values(insert_entry) - .execute(connection); - - if let Some(error) = result.err() { - match error { - Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {} - _ => return Err(anyhow!(error)), - } - } - - Ok(Address::Hash(entry.hash()?)) -} - -#[derive(Debug)] -pub struct ConnectionOptions { - pub enable_foreign_keys: bool, - pub busy_timeout: Option, -} - -impl ConnectionOptions { - pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { - if self.enable_foreign_keys { - conn.execute("PRAGMA foreign_keys = ON;")?; - } - if let Some(duration) = self.busy_timeout { - conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; - } - Ok(()) - } -} - -impl diesel::r2d2::CustomizeConnection - for ConnectionOptions -{ - fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { - self.apply(conn).map_err(diesel::r2d2::Error::QueryError) - } -} - -pub type DbPool = r2d2::Pool>; - -pub struct OpenResult { - pub pool: DbPool, - pub new: bool, -} - -pub const DATABASE_FILENAME: &str = "upend.sqlite3"; - -pub fn open_upend>( - dirpath: P, - db_path: Option, - reinitialize: bool, -) -> Result { - embed_migrations!("./migrations/upend/"); - - let database_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(DATABASE_FILENAME)); - - if reinitialize { - let _ = fs::remove_file(&database_path); - } - let new = !database_path.exists(); - - let manager = ConnectionManager::::new(database_path.to_str().unwrap()); - let pool = r2d2::Pool::builder() - .connection_customizer(Box::new(ConnectionOptions { - enable_foreign_keys: true, - busy_timeout: Some(Duration::from_secs(30)), - })) - .build(manager)?; - - trace!("Pool created, running migrations..."); - - embedded_migrations::run_with_output( - &pool.get()?, - &mut LoggerSink { - ..Default::default() - }, - )?; - - trace!("Initializing types..."); - - initialize_types(&pool)?; - - Ok(OpenResult { pool, new }) -} - -fn initialize_types(pool: &DbPool) -> Result<()> { - insert_entry(&pool.get()?, Entry::try_from(&*TYPE_INVARIANT)?)?; - upend_insert_addr!(&pool.get()?, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR); - upend_insert_val!(&pool.get()?, TYPE_ADDR, TYPE_ID_ATTR, TYPE_IS_ATTR); - Ok(()) -} diff --git a/src/database/constants.rs b/src/database/constants.rs new file mode 100644 index 0000000..1c0cbdc --- /dev/null +++ b/src/database/constants.rs @@ -0,0 +1,18 @@ +use crate::addressing::Address; +use crate::database::entry::{EntryValue, InvariantEntry}; + +pub const TYPE_TYPE: &str = "TYPE"; +pub const TYPE_IS_ATTR: &str = "TYPE"; +pub const TYPE_REQUIRES_ATTR: &str = "TYPE_REQUIRES"; +pub const TYPE_HAS_ATTR: &str = "TYPE_HAS"; +pub const TYPE_ID_ATTR: &str = "TYPE_ID"; +pub const TYPE_INSTANCES_ATTR: &str = "TYPE_INSTANCES"; +pub const IS_OF_TYPE_ATTR: &str = "IS"; + +lazy_static! { + pub static ref TYPE_INVARIANT: InvariantEntry = InvariantEntry { + attribute: String::from(TYPE_IS_ATTR), + value: EntryValue::Value(serde_json::Value::from(TYPE_TYPE)), + }; + pub static ref TYPE_ADDR: Address = TYPE_INVARIANT.entity().unwrap(); +} diff --git a/src/database/entry.rs b/src/database/entry.rs new file mode 100644 index 0000000..7005e15 --- /dev/null +++ b/src/database/entry.rs @@ -0,0 +1,179 @@ +use crate::addressing::{Address, Addressable}; +use crate::hash::{decode, hash, Hash, Hashable}; +use crate::models; +use anyhow::{anyhow, Result}; +use serde::{Deserialize, Serialize}; +use std::convert::TryFrom; +use std::io::{Cursor, Write}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InEntry { + pub entity: Option
, + pub attribute: String, + pub value: EntryValue, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Entry { + pub entity: Address, + pub attribute: String, + pub value: EntryValue, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct InvariantEntry { + pub attribute: String, + pub value: EntryValue, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "t", content = "c")] +pub enum EntryValue { + Value(serde_json::Value), + Address(Address), + Invalid, +} + +impl TryFrom<&models::Entry> for Entry { + type Error = anyhow::Error; + + fn try_from(e: &models::Entry) -> Result { + Ok(Entry { + entity: Address::decode(&e.entity)?, + attribute: e.attribute.clone(), + value: e.value.parse().unwrap(), + }) + } +} + +impl TryFrom<&Entry> for models::Entry { + type Error = anyhow::Error; + + fn try_from(e: &Entry) -> Result { + Ok(models::Entry { + identity: e.address()?.encode()?, + entity: e.entity.encode()?, + attribute: e.attribute.clone(), + value: e.value.to_string()?, + }) + } +} + +impl TryFrom<&InvariantEntry> for Entry { + type Error = anyhow::Error; + + fn try_from(invariant: &InvariantEntry) -> Result { + Ok(Entry { + entity: invariant.entity()?, + attribute: invariant.attribute.clone(), + value: invariant.value.clone(), + }) + } +} + +impl TryFrom for Entry { + type Error = anyhow::Error; + + fn try_from(in_entry: InEntry) -> Result { + match in_entry.entity { + Some(entity) => Ok(Entry { + entity, + attribute: in_entry.attribute, + value: in_entry.value, + }), + None => Ok(Entry::try_from(&InvariantEntry { + attribute: in_entry.attribute, + value: in_entry.value, + })?), + } + } +} + +impl InvariantEntry { + pub fn entity(&self) -> Result
{ + let mut entity = Cursor::new(vec![0u8; 0]); + entity.write_all(self.attribute.as_bytes())?; + entity.write_all(self.value.to_string()?.as_bytes())?; + Ok(Address::Hash(hash(entity.into_inner()))) + } +} + +impl std::fmt::Display for Entry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} | {} | {}", self.entity, self.attribute, self.value) + } +} + +impl Hashable for Entry { + fn hash(self: &Entry) -> Result { + let mut result = Cursor::new(vec![0u8; 0]); + result.write_all(self.entity.encode()?.as_slice())?; + result.write_all(self.attribute.as_bytes())?; + result.write_all(self.value.to_string()?.as_bytes())?; + Ok(hash(result.get_ref())) + } +} + +impl Hashable for InvariantEntry { + fn hash(&self) -> Result { + Entry::try_from(self)?.hash() + } +} + +impl Addressable for Entry {} +impl Addressable for InvariantEntry {} + +impl EntryValue { + pub fn to_string(&self) -> Result { + let (type_char, content) = match self { + EntryValue::Value(value) => ('J', serde_json::to_string(value)?), + EntryValue::Address(address) => ('O', address.to_string()), + EntryValue::Invalid => return Err(anyhow!("Cannot serialize invalid Entity value.")), + }; + + Ok(format!("{}{}", type_char, content)) + } +} + +impl std::fmt::Display for EntryValue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let (entry_type, entry_value) = match self { + EntryValue::Address(address) => ("ADDRESS", address.to_string()), + EntryValue::Value(value) => ( + "VALUE", + serde_json::to_string(value).unwrap_or_else(|_| String::from("?!?!?!")), + ), + EntryValue::Invalid => ("INVALID", "INVALID".to_string()), + }; + write!(f, "{}: {}", entry_type, entry_value) + } +} + +impl std::str::FromStr for EntryValue { + type Err = std::convert::Infallible; + + fn from_str(s: &str) -> Result { + if s.len() < 2 { + Ok(EntryValue::Invalid) + } else { + let (type_char, content) = s.split_at(1); + match (type_char, content) { + ("J", content) => { + if let Ok(value) = serde_json::from_str(content) { + Ok(EntryValue::Value(value)) + } else { + Ok(EntryValue::Invalid) + } + } + ("O", content) => { + if let Ok(addr) = decode(content).and_then(|v| Address::decode(&v)) { + Ok(EntryValue::Address(addr)) + } else { + Ok(EntryValue::Invalid) + } + } + _ => Ok(EntryValue::Invalid), + } + } + } +} diff --git a/src/database/lang.rs b/src/database/lang.rs new file mode 100644 index 0000000..b75e681 --- /dev/null +++ b/src/database/lang.rs @@ -0,0 +1,313 @@ +use crate::addressing::Address; +use crate::database::entry::EntryValue; +use crate::schema::data; +use anyhow::{anyhow, Result}; +use diesel::expression::operators::{And, Or}; +use diesel::sql_types::Bool; +use diesel::sqlite::Sqlite; +use diesel::{BoxableExpression, ExpressionMethods, IntoSql, TextExpressionMethods}; +use nonempty::NonEmpty; +use std::borrow::Borrow; +use std::convert::TryFrom; +use std::str::FromStr; + +#[derive(Debug)] +pub enum QueryComponent +where + T: FromStr, +{ + Exact(T), + In(Vec), + Contains(String), + Any, +} + +#[derive(Debug)] +pub struct EntryQuery { + pub entity: QueryComponent
, + pub attribute: QueryComponent, + pub value: QueryComponent, +} + +#[derive(Debug)] +pub enum QueryPart { + Matches(EntryQuery), + Type(String), +} + +#[derive(Debug)] +pub enum QueryQualifier { + And, + Or, +} + +#[derive(Debug)] +pub struct MultiQuery { + pub qualifier: QueryQualifier, + pub queries: NonEmpty>, +} + +#[derive(Debug)] +pub enum Query { + SingleQuery(QueryPart), + MultiQuery(MultiQuery), +} + +type Predicate = dyn BoxableExpression; + +impl TryFrom<&lexpr::Value> for Query { + type Error = anyhow::Error; + + fn try_from(expression: &lexpr::Value) -> Result { + fn parse_component(value: &lexpr::Value) -> Result> + where + ::Err: std::fmt::Debug, + { + match value { + lexpr::Value::Cons(cons) => { + if let lexpr::Value::Symbol(symbol) = cons.car() { + match symbol.borrow() { + "in" => { + let (cons_vec, _) = cons.clone().into_vec(); + if let Some(split) = cons_vec.split_first() { + let args = split.1; + let values: Result, _> = args.iter().map(|value| { + if let lexpr::Value::String(str) = value { + match T::from_str(str.borrow()) { + Ok(value) => Ok(value), + Err(error) => Err(anyhow!(format!("Malformed expression: Conversion of inner value '{}' from string failed: {:#?}",str, error))), + } + } else { + Err(anyhow!("Malformed expression: Inner value list must be comprised of strings.")) + } + }).collect(); + + Ok(QueryComponent::In(values?)) + } else { + Err(anyhow!( + "Malformed expression: Inner value cannot be empty." + )) + } + } + "contains" => { + let (mut cons_vec, _) = cons.clone().into_vec(); + match cons_vec.len() { + 2 => { + let value = cons_vec.remove(1); + if let lexpr::Value::String(str) = value { + Ok(QueryComponent::Contains(str.into_string())) + } else { + Err(anyhow!("Malformed expression: 'Contains' argument must be a string.")) + } + } + _ => Err(anyhow!( + "Malformed expression: 'Contains' requires a single argument." + )), + } + } + _ => Err(anyhow!(format!( + "Malformed expression: Unknown symbol {}", + symbol + ))), + } + } else { + Err(anyhow!(format!( + "Malformed expression: Inner value '{:?}' is not a symbol.", + value + ))) + } + } + lexpr::Value::String(str) => match T::from_str(str.borrow()) { + Ok(value) => Ok(QueryComponent::Exact(value)), + Err(error) => Err(anyhow!(format!( + "Malformed expression: Conversion of inner value '{}' from string failed: {:#?}", + str, error + ))), + }, + lexpr::Value::Symbol(symbol) => match symbol.borrow() { + "?" => Ok(QueryComponent::Any), + _ => Err(anyhow!(format!( + "Malformed expression: Unknown symbol {}", + symbol + ))), + }, + _ => Err(anyhow!( + "Malformed expression: Inner value not a string, list or '?'." + )), + } + } + + if let lexpr::Value::Cons(value) = expression { + if let lexpr::Value::Symbol(symbol) = value.car() { + match symbol.borrow() { + "matches" => { + let (cons_vec, _) = value.clone().into_vec(); + if let [_, entity, attribute, value] = &cons_vec[..] { + let entity = parse_component::
(entity)?; + let attribute = parse_component::(attribute)?; + let value = parse_component::(value)?; + Ok(Query::SingleQuery(QueryPart::Matches(EntryQuery { + entity, + attribute, + value, + }))) + } else { + Err(anyhow!( + "Malformed expression: Wrong number of arguments to 'matches'." + )) + } + } + "type" => { + let (cons_vec, _) = value.clone().into_vec(); + if let [_, type_name] = &cons_vec[..] { + if let lexpr::Value::String(type_name_str) = type_name { + Ok(Query::SingleQuery(QueryPart::Type( + type_name_str.to_string(), + ))) + } else { + Err(anyhow!( + "Malformed expression: Type must be specified as a string." + )) + } + } else { + Err(anyhow!( + "Malformed expression: Wrong number of arguments to 'type'." + )) + } + } + "and" | "or" => { + let (cons_vec, _) = value.clone().into_vec(); + let sub_expressions = &cons_vec[1..]; + let values: Result>> = sub_expressions + .iter() + .map(|value| Ok(Box::new(Query::try_from(value)?))) + .collect(); + + if let Some(queries) = NonEmpty::from_vec(values?) { + Ok(Query::MultiQuery(MultiQuery { + qualifier: match symbol.borrow() { + "and" => QueryQualifier::And, + _ => QueryQualifier::Or, + }, + queries, + })) + } else { + Err(anyhow!( + "Malformed expression: sub-query list cannot be empty.", + )) + } + } + _ => Err(anyhow!(format!( + "Malformed expression: Unknown symbol '{}'.", + symbol + ))), + } + } else { + Err(anyhow!(format!( + "Malformed expression: Value '{:?}' is not a symbol.", + value + ))) + } + } else { + Err(anyhow!("Malformed expression: Not a list.")) + } + } +} + +impl Query { + pub(crate) fn to_sqlite_predicates(&self) -> Result> { + match self { + Query::SingleQuery(qp) => { + match qp { + QueryPart::Matches(eq) => { + let mut subqueries: Vec> = vec![]; + + match &eq.entity { + QueryComponent::Exact(q_entity) => { + subqueries.push(Box::new(data::entity.eq(q_entity.encode()?))) + } + QueryComponent::In(q_entities) => { + let entities: Result, _> = + q_entities.iter().map(|t| t.encode()).collect(); + subqueries.push(Box::new(data::entity.eq_any(entities?))) + } + QueryComponent::Contains(_) => { + return Err(anyhow!("Addresses cannot be queried alike.")) + } + QueryComponent::Any => {} + }; + + match &eq.attribute { + QueryComponent::Exact(q_attribute) => { + subqueries.push(Box::new(data::attribute.eq(q_attribute.clone()))) + } + QueryComponent::In(q_attributes) => subqueries + .push(Box::new(data::attribute.eq_any(q_attributes.clone()))), + QueryComponent::Contains(q_attribute) => subqueries + .push(Box::new(data::attribute.like(format!("%{}%", q_attribute)))), + QueryComponent::Any => {} + }; + + match &eq.value { + QueryComponent::Exact(q_value) => { + subqueries.push(Box::new(data::value.eq(q_value.to_string()?))) + } + QueryComponent::In(q_values) => { + let values: Result, _> = + q_values.iter().map(|v| v.to_string()).collect(); + subqueries.push(Box::new(data::value.eq_any(values?))) + } + QueryComponent::Contains(q_value) => subqueries + .push(Box::new(data::value.like(format!("%{}%", q_value)))), + QueryComponent::Any => {} + }; + + match subqueries.len() { + 0 => Ok(Box::new(true.into_sql::())), + 1 => Ok(subqueries.remove(0)), + _ => { + let mut result: Box, Box>> = + Box::new(And::new(subqueries.remove(0), subqueries.remove(0))); + while !subqueries.is_empty() { + result = Box::new(And::new(result, subqueries.remove(0))); + } + Ok(Box::new(result)) + } + } + } + QueryPart::Type(_) => unimplemented!("Type queries are not yet implemented."), + } + } + Query::MultiQuery(mq) => { + let subqueries: Result>> = mq + .queries + .iter() + .map(|sq| sq.to_sqlite_predicates()) + .collect(); + let mut subqueries: Vec> = subqueries?; + match subqueries.len() { + 0 => Ok(Box::new(true.into_sql::())), + 1 => Ok(subqueries.remove(0)), + _ => match mq.qualifier { + QueryQualifier::And => { + let mut result: Box, Box>> = + Box::new(And::new(subqueries.remove(0), subqueries.remove(0))); + while !subqueries.is_empty() { + result = Box::new(And::new(result, subqueries.remove(0))); + } + Ok(Box::new(result)) + } + QueryQualifier::Or => { + let mut result: Box, Box>> = + Box::new(Or::new(subqueries.remove(0), subqueries.remove(0))); + while !subqueries.is_empty() { + result = Box::new(Or::new(result, subqueries.remove(0))); + } + Ok(Box::new(result)) + } + }, + } + } + } + } +} diff --git a/src/database/mod.rs b/src/database/mod.rs new file mode 100644 index 0000000..90dffb5 --- /dev/null +++ b/src/database/mod.rs @@ -0,0 +1,307 @@ +#![macro_use] + +pub mod constants; +pub mod entry; +pub mod lang; + +use crate::addressing::Address; +use crate::database::constants::{ + IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_ID_ATTR, TYPE_INVARIANT, TYPE_IS_ATTR, +}; +use crate::database::entry::{Entry, EntryValue}; +use crate::database::lang::Query; +use crate::hash::{Hash, Hashable}; +use crate::models; +use crate::schema::data; +use crate::util::LoggerSink; +use anyhow::{anyhow, Result}; +use diesel::debug_query; +use diesel::prelude::*; +use diesel::r2d2::{self, ConnectionManager}; +use diesel::result::{DatabaseErrorKind, Error}; +use diesel::sqlite::{Sqlite, SqliteConnection}; +use log::{debug, trace}; +use std::convert::TryFrom; +use std::fs; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +#[macro_use] +mod macros { + macro_rules! upend_insert_val { + ($db_connection:expr, $entity:expr, $attribute:expr, $value:expr) => {{ + insert_entry( + $db_connection, + Entry { + entity: $entity.clone(), + attribute: String::from($attribute), + value: EntryValue::Value(serde_json::Value::from($value)), + }, + )?; + }}; + } + + macro_rules! upend_insert_addr { + ($db_connection:expr, $entity:expr, $attribute:expr, $addr:expr) => {{ + insert_entry( + $db_connection, + Entry { + entity: $entity.clone(), + attribute: String::from($attribute), + value: EntryValue::Address($addr.clone()), + }, + )?; + }}; + } +} + +pub fn insert_file>( + connection: &C, + file: models::NewFile, +) -> Result { + use crate::schema::files; + + debug!( + "Inserting {} ({})...", + &file.path, + Address::Hash(Hash((&file.hash).clone())) + ); + + Ok(diesel::insert_into(files::table) + .values(file) + .execute(connection)?) +} + +pub fn retrieve_file>( + connection: &C, + obj_hash: Hash, +) -> Result> { + use crate::schema::files::dsl::*; + + let matches = files + .filter(valid.eq(true)) + .filter(hash.eq(obj_hash.0)) + .load::(connection)?; + + Ok(matches) +} + +pub fn retrieve_all_files>( + connection: &C, +) -> Result> { + use crate::schema::files::dsl::*; + let matches = files.load::(connection)?; + Ok(matches) +} + +pub fn get_latest_files>( + connection: &C, + count: i64, +) -> Result> { + use crate::schema::files::dsl::*; + + let matches = files + .order_by(added.desc()) + .limit(count) + .load::(connection)?; + + Ok(matches) +} + +pub fn file_set_valid>( + connection: &C, + file_id: i32, + is_valid: bool, +) -> Result { + use crate::schema::files::dsl::*; + + debug!("Setting file ID {} to valid = {}", file_id, is_valid); + + Ok(diesel::update(files.filter(id.eq(file_id))) + .set(valid.eq(is_valid)) + .execute(connection)?) +} + +pub fn retrieve_object>( + connection: &C, + object_address: Address, +) -> Result> { + use crate::schema::data::dsl::*; + + let matches = data + .filter(entity.eq(object_address.encode()?)) + .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)) + .load::(connection)?; + let entries = matches + .iter() + .map(Entry::try_from) + .filter_map(Result::ok) + .collect(); + + Ok(entries) +} + +pub fn bulk_retrieve_objects>( + connection: &C, + object_addresses: Vec
, +) -> Result> { + use crate::schema::data::dsl::*; + + let matches = data + .filter( + entity.eq_any( + object_addresses + .iter() + .filter_map(|addr| addr.encode().ok()), + ), + ) + // .or_filter(value.eq(EntryValue::Address(object_address).to_str()?)) + .load::(connection)?; + let entries = matches + .iter() + .map(Entry::try_from) + .filter_map(Result::ok) + .collect(); + + Ok(entries) +} + +pub fn remove_object>( + connection: &C, + object_address: Address, +) -> Result { + use crate::schema::data::dsl::*; + + debug!("Deleting {}!", object_address); + + let matches = data + .filter(identity.eq(object_address.encode()?)) + .or_filter(entity.eq(object_address.encode()?)) + .or_filter(value.eq(EntryValue::Address(object_address).to_string()?)); + + Ok(diesel::delete(matches).execute(connection)?) +} + +pub fn query>(connection: &C, query: Query) -> Result> { + use crate::schema::data::dsl::*; + + trace!("Querying: {:?}", query); + + let db_query = data.filter(query.to_sqlite_predicates()?); + + trace!("DB query: {}", debug_query(&db_query)); + + let matches = db_query.load::(connection)?; + + let entries = matches + .iter() + .map(Entry::try_from) + .filter_map(Result::ok) + .collect(); + + Ok(entries) +} + +pub fn insert_entry>( + connection: &C, + entry: Entry, +) -> Result
{ + debug!("Inserting: {}", entry); + + let insert_entry = models::Entry::try_from(&entry)?; + + let entry = Entry::try_from(&insert_entry)?; + + let result = diesel::insert_into(data::table) + .values(insert_entry) + .execute(connection); + + if let Some(error) = result.err() { + match error { + Error::DatabaseError(DatabaseErrorKind::UniqueViolation, _) => {} + _ => return Err(anyhow!(error)), + } + } + + Ok(Address::Hash(entry.hash()?)) +} + +#[derive(Debug)] +pub struct ConnectionOptions { + pub enable_foreign_keys: bool, + pub busy_timeout: Option, +} + +impl ConnectionOptions { + pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { + if self.enable_foreign_keys { + conn.execute("PRAGMA foreign_keys = ON;")?; + } + if let Some(duration) = self.busy_timeout { + conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; + } + Ok(()) + } +} + +impl diesel::r2d2::CustomizeConnection + for ConnectionOptions +{ + fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { + self.apply(conn).map_err(diesel::r2d2::Error::QueryError) + } +} + +pub type DbPool = r2d2::Pool>; + +pub struct OpenResult { + pub pool: DbPool, + pub new: bool, +} + +pub const DATABASE_FILENAME: &str = "upend.sqlite3"; + +pub fn open_upend>( + dirpath: P, + db_path: Option, + reinitialize: bool, +) -> Result { + embed_migrations!("./migrations/upend/"); + + let database_path = db_path.unwrap_or_else(|| dirpath.as_ref().join(DATABASE_FILENAME)); + + if reinitialize { + let _ = fs::remove_file(&database_path); + } + let new = !database_path.exists(); + + let manager = ConnectionManager::::new(database_path.to_str().unwrap()); + let pool = r2d2::Pool::builder() + .connection_customizer(Box::new(ConnectionOptions { + enable_foreign_keys: true, + busy_timeout: Some(Duration::from_secs(30)), + })) + .build(manager)?; + + trace!("Pool created, running migrations..."); + + embedded_migrations::run_with_output( + &pool.get()?, + &mut LoggerSink { + ..Default::default() + }, + )?; + + trace!("Initializing types..."); + + initialize_types(&pool)?; + + Ok(OpenResult { pool, new }) +} + +fn initialize_types(pool: &DbPool) -> Result<()> { + insert_entry(&pool.get()?, Entry::try_from(&*TYPE_INVARIANT)?)?; + upend_insert_addr!(&pool.get()?, TYPE_ADDR, IS_OF_TYPE_ATTR, TYPE_ADDR); + upend_insert_val!(&pool.get()?, TYPE_ADDR, TYPE_ID_ATTR, TYPE_IS_ATTR); + Ok(()) +} diff --git a/src/filesystem.rs b/src/filesystem.rs index a87928f..bc29af5 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -1,9 +1,13 @@ use crate::addressing::Address; +use crate::database::constants::{ + IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_ID_ATTR, TYPE_INSTANCES_ATTR, TYPE_IS_ATTR, + TYPE_REQUIRES_ATTR, +}; +use crate::database::entry::{Entry, EntryValue, InvariantEntry}; +use crate::database::lang::{EntryQuery, Query, QueryComponent, QueryPart}; use crate::database::{ bulk_retrieve_objects, file_set_valid, insert_entry, insert_file, query, retrieve_all_files, - DbPool, Entry, EntryQuery, EntryValue, InvariantEntry, Query, QueryComponent, QueryPart, - DATABASE_FILENAME, IS_OF_TYPE_ATTR, TYPE_ADDR, TYPE_HAS_ATTR, TYPE_ID_ATTR, - TYPE_INSTANCES_ATTR, TYPE_IS_ATTR, TYPE_REQUIRES_ATTR, + DbPool, DATABASE_FILENAME, }; use crate::hash::Hashable; use crate::jobs::{Job, JobContainer, JobId, State}; diff --git a/src/routes.rs b/src/routes.rs index 49a2245..ffed78f 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1,7 +1,8 @@ -use crate::addressing::Address; +use crate::addressing::{Address, Addressable}; +use crate::database::entry::{Entry, InEntry}; +use crate::database::lang::Query; use crate::database::{ - get_latest_files, insert_entry, query, remove_object, retrieve_file, retrieve_object, - Addressable, DbPool, Entry, InEntry, Query, + get_latest_files, insert_entry, query, remove_object, retrieve_file, retrieve_object, DbPool, }; use crate::filesystem::{list_directory, UPath}; use crate::hash::{decode, encode};