use crate::addressing::Address; use crate::hash::{decode, hash, Hash, Hashable}; use crate::models; use actix::prelude::*; use actix_derive::Message; use anyhow::{anyhow, Result}; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager}; use diesel::sqlite::SqliteConnection; use log::debug; use serde::export::Formatter; use serde_json::json; use std::convert::TryFrom; use std::fs; use std::io::{Cursor, Write}; use std::path::{Path, PathBuf}; use std::time::Duration; #[derive(Debug, Clone)] pub struct Entry { pub identity: Hash, pub target: Address, pub key: String, pub value: EntryValue, } #[derive(Debug, Clone)] pub struct InnerEntry { pub target: Address, pub key: String, pub value: EntryValue, } #[derive(Debug, Clone)] pub enum EntryValue { Value(serde_json::Value), Address(Address), Invalid, } impl TryFrom for Entry { type Error = anyhow::Error; fn try_from(e: models::Entry) -> Result { Ok(Entry { identity: Hash(e.identity), target: Address::decode(&e.target)?, key: e.key, value: e.value.parse().unwrap(), }) } } impl Entry { pub fn as_json(&self) -> serde_json::Value { json!({ "target": self.target.to_string(), "key": self.key, "value": match &self.value { EntryValue::Value(value) => ("VALUE", value.clone()), EntryValue::Address(address) => ("ADDR", json!(address.to_string())), EntryValue::Invalid => ("INVALID", json!("INVALID")), } }) } } impl std::fmt::Display for InnerEntry { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{} | {} | {}", self.target, self.key, self.value) } } impl EntryValue { pub fn to_str(&self) -> Result { let (type_char, content) = match self { EntryValue::Value(value) => ('J', serde_json::to_string(value)?), EntryValue::Address(address) => ('O', address.to_string()), EntryValue::Invalid => return Err(anyhow!("Cannot serialize invalid Entity value.")), }; Ok(format!("{}{}", type_char, content)) } } // unsafe unwraps! impl std::fmt::Display for EntryValue { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let (entry_type, entry_value) = match self { EntryValue::Address(address) => ("ADDRESS", address.to_string()), EntryValue::Value(value) => ("VALUE", serde_json::to_string(value).unwrap()), EntryValue::Invalid => ("INVALID", "INVALID".to_string()), }; write!(f, "{}: {}", entry_type, entry_value) } } impl std::str::FromStr for EntryValue { type Err = std::convert::Infallible; fn from_str(s: &str) -> Result { if s.len() < 2 { Ok(EntryValue::Invalid) } else { let (type_char, content) = s.split_at(1); match (type_char, content) { ("J", content) => { let value = serde_json::from_str(content); if value.is_ok() { Ok(EntryValue::Value(value.unwrap())) } else { Ok(EntryValue::Invalid) } } ("O", content) => { let addr = decode(content).and_then(|v| Address::decode(&v)); if addr.is_ok() { Ok(EntryValue::Address(addr.unwrap())) } else { Ok(EntryValue::Invalid) } } _ => Ok(EntryValue::Invalid), } } } } pub type DbPool = r2d2::Pool>; pub struct DbExecutor(pub DbPool); impl Actor for DbExecutor { type Context = SyncContext; } #[derive(Message)] #[rtype(result = "Result")] pub struct InsertFile { pub file: models::NewFile, } #[derive(Message)] #[rtype(result = "Result>")] pub struct RetrieveByHash { pub hash: Hash, } #[derive(Message)] #[rtype(result = "Result>")] pub struct LookupByFilename { pub query: String, } #[derive(Message)] #[rtype(result = "Result>")] pub struct RetrieveObject { pub target: Address, } #[derive(Message)] #[rtype(result = "Result>")] pub struct QueryEntries { pub target: Option
, pub key: Option, pub value: Option, } impl Default for QueryEntries { fn default() -> Self { QueryEntries { target: None, key: None, value: None, } } } #[derive(Message)] #[rtype(result = "Result")] pub struct InsertEntry { pub entry: InnerEntry, } impl Handler for DbExecutor { type Result = Result; fn handle(&mut self, msg: InsertFile, _: &mut Self::Context) -> Self::Result { use crate::schema::files; let connection = &self.0.get()?; debug!( "Inserting {} ({})...", &msg.file.path, Address::Hash(Hash((&msg.file.hash).clone())) ); Ok(diesel::insert_into(files::table) .values(msg.file) .execute(connection)?) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: RetrieveByHash, _: &mut Self::Context) -> Self::Result { use crate::schema::files::dsl::*; let connection = &self.0.get()?; let matches = files .filter(valid.eq(true)) .filter(hash.eq(msg.hash.0)) .load::(connection)?; Ok(matches.get(0).map(|f| f.path.clone())) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: LookupByFilename, _: &mut Self::Context) -> Self::Result { use crate::schema::files::dsl::*; let connection = &self.0.get()?; let matches = files .filter(path.like(format!("%{}%", msg.query))) .filter(valid.eq(true)) .load::(connection)?; Ok(matches) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: RetrieveObject, _: &mut Self::Context) -> Self::Result { use crate::schema::data::dsl::*; let connection = &self.0.get()?; let matches = data .filter(target.eq(msg.target.encode()?)) .or_filter(value.eq(EntryValue::Address(msg.target.clone()).to_str()?)) .load::(connection)?; let entries = matches .into_iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: QueryEntries, _: &mut Self::Context) -> Self::Result { use crate::schema::data::dsl::*; let connection = &self.0.get()?; let mut query = data.into_boxed(); if let Some(q_target) = msg.target { query = query.filter(target.eq(q_target.encode()?)); } if let Some(q_key) = msg.key { query = query.filter(key.eq(q_key)); } if let Some(q_value) = msg.value { query = query.filter(value.eq(q_value.to_str()?)); } let matches = query.load::(connection)?; let entries = matches .into_iter() .map(Entry::try_from) .filter_map(Result::ok) .collect(); Ok(entries) } } impl Handler for DbExecutor { type Result = Result; fn handle(&mut self, msg: InsertEntry, _: &mut Self::Context) -> Self::Result { use crate::schema::data; let connection = &self.0.get()?; debug!("Inserting: {}", msg.entry); let insert_entry = models::Entry { identity: msg.entry.hash()?.0, target: msg.entry.target.encode()?, key: msg.entry.key, value: msg.entry.value.to_str()?, }; Ok(diesel::insert_into(data::table) .values(insert_entry) .execute(connection)?) } } impl Hashable for InnerEntry { fn hash(self: &InnerEntry) -> Result { let mut result = Cursor::new(vec![0u8; 0]); result.write(self.target.encode()?.as_slice())?; result.write(self.key.as_bytes())?; result.write(self.value.to_str()?.as_bytes())?; Ok(hash(result.get_ref())) } } #[derive(Debug)] pub struct ConnectionOptions { pub enable_foreign_keys: bool, pub busy_timeout: Option, } impl ConnectionOptions { pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { if self.enable_foreign_keys { conn.execute("PRAGMA foreign_keys = ON;")?; } if let Some(duration) = self.busy_timeout { conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; } Ok(()) } } impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { self.apply(conn).map_err(diesel::r2d2::Error::QueryError) } } #[derive(Default)] struct LoggerSink { buffer: Vec, } impl std::io::Write for LoggerSink { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.buffer.extend(buf.iter()); if self.buffer.ends_with(b"\n") { self.flush()?; } Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { use std::str; debug!( "{}", str::from_utf8(self.buffer.as_mut()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? .trim() ); Ok(()) } } pub struct OpenResult { pub pool: DbPool, pub new: bool, } const DATABASE_FILENAME: &str = "upend.sqlite3"; pub fn open_upend>(dirpath: P, reinitialize: bool) -> Result { embed_migrations!("./migrations/upend/"); let database_path: PathBuf = dirpath.as_ref().join(DATABASE_FILENAME); if reinitialize { let _ = fs::remove_file(&database_path); } let new = !database_path.exists(); let manager = ConnectionManager::::new(database_path.to_str().unwrap()); let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { enable_foreign_keys: true, busy_timeout: Some(Duration::from_secs(3)), })) .build(manager) .expect("Failed to create pool."); embedded_migrations::run_with_output( &pool.get().unwrap(), &mut LoggerSink { ..Default::default() }, )?; Ok(OpenResult { pool, new }) }