use std::io::{Cursor, Write}; use std::path::{Path, PathBuf}; use std::time::Duration; use actix::prelude::*; use actix_derive::Message; use anyhow::{anyhow, Result}; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager}; use diesel::sqlite::SqliteConnection; use log::debug; use crate::addressing::Address; use crate::hash::{decode, encode, hash, Hash, Hashable}; use crate::models; #[derive(Debug)] pub struct Entry { identity: Hash, target: Address, key: String, value: EntryValue, } #[derive(Debug)] pub struct InnerEntry { target: Address, key: String, value: EntryValue, } #[derive(Debug)] pub enum EntryValue { Value(serde_json::Value), Address(Address), Invalid, } impl EntryValue { fn to_str(&self) -> Result { let (type_char, content) = match self { EntryValue::Value(value) => ('J', serde_json::to_string(value)?), EntryValue::Address(address) => ('O', encode(address.encode()?)), EntryValue::Invalid => return Err(anyhow!("Cannot serialize invalid Entity value.")), }; Ok(format!("{}{}", type_char, content)) } } impl std::str::FromStr for EntryValue { type Err = std::convert::Infallible; fn from_str(s: &str) -> Result { if s.len() < 2 { Ok(EntryValue::Invalid) } else { let (type_char, content) = s.split_at(1); match (type_char, content) { ("J", content) => { let value = serde_json::from_str(content); if value.is_ok() { Ok(EntryValue::Value(value.unwrap())) } else { Ok(EntryValue::Invalid) } } ("O", content) => { let addr = decode(content).and_then(|v| Address::decode(&v)); if addr.is_ok() { Ok(EntryValue::Address(addr.unwrap())) } else { Ok(EntryValue::Invalid) } } _ => Ok(EntryValue::Invalid), } } } } pub type DbPool = r2d2::Pool>; pub struct DbExecutor(pub DbPool); impl Actor for DbExecutor { type Context = SyncContext; } #[derive(Message)] #[rtype(result = "Result")] pub struct InsertFile { pub file: models::NewFile, } #[derive(Message)] #[rtype(result = "Result>")] pub struct RetrieveByHash { pub hash: String, } #[derive(Message)] #[rtype(result = "Result>")] pub struct LookupByFilename { pub query: String, } #[derive(Message)] #[rtype(result = "Result>")] pub struct RetrieveEntries { pub hash: Vec, } #[derive(Message)] #[rtype(result = "Result")] pub struct InsertEntry { pub entry: InnerEntry, } impl Handler for DbExecutor { type Result = Result; fn handle(&mut self, msg: InsertFile, _: &mut Self::Context) -> Self::Result { use crate::schema::files; let connection = &self.0.get()?; debug!("Inserting {:?}...", msg.file); Ok(diesel::insert_into(files::table) .values(msg.file) .execute(connection)?) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: RetrieveByHash, _: &mut Self::Context) -> Self::Result { use crate::schema::files::dsl::*; let connection = &self.0.get()?; let matches = files .filter(hash.eq(msg.hash)) .filter(valid.eq(true)) .load::(connection)?; Ok(matches.get(0).map(|f| f.path.clone())) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: LookupByFilename, _: &mut Self::Context) -> Self::Result { use crate::schema::files::dsl::*; let connection = &self.0.get()?; let matches = files .filter(path.like(format!("%{}%", msg.query))) .filter(valid.eq(true)) .load::(connection)?; Ok(matches) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: RetrieveEntries, _: &mut Self::Context) -> Self::Result { use crate::schema::data::dsl::*; let connection = &self.0.get()?; let matches = data .filter(target.eq(msg.hash)) .load::(connection)?; let entries = matches .into_iter() .map(|e| -> Result { Ok(Entry { identity: Hash(e.identity), target: Address::decode(&e.target)?, key: e.key, value: e.value.parse().unwrap(), }) }) .filter_map(Result::ok) .collect(); Ok(entries) } } impl Handler for DbExecutor { type Result = Result; fn handle(&mut self, msg: InsertEntry, _: &mut Self::Context) -> Self::Result { use crate::schema::data; let connection = &self.0.get()?; debug!("Inserting {:?}...", msg.entry); let insert_entry = models::Entry { identity: msg.entry.hash()?.0, target: msg.entry.target.encode()?, key: msg.entry.key, value: msg.entry.value.to_str()?, }; Ok(diesel::insert_into(data::table) .values(insert_entry) .execute(connection)?) } } impl Hashable for InnerEntry { fn hash(self: &InnerEntry) -> Result { let mut result = Cursor::new(vec![0u8; 0]); result.write(self.target.encode()?.as_slice())?; result.write(self.key.as_bytes())?; result.write(self.value.to_str()?.as_bytes())?; Ok(hash(result.get_ref())) } } #[derive(Debug)] pub struct ConnectionOptions { pub enable_foreign_keys: bool, pub busy_timeout: Option, } impl ConnectionOptions { pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { if self.enable_foreign_keys { conn.execute("PRAGMA foreign_keys = ON;")?; } if let Some(duration) = self.busy_timeout { conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; } Ok(()) } } impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { self.apply(conn).map_err(diesel::r2d2::Error::QueryError) } } #[derive(Default)] struct LoggerSink { buffer: Vec, } impl std::io::Write for LoggerSink { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.buffer.extend(buf.iter()); if self.buffer.ends_with(b"\n") { self.flush()?; } Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { use std::str; debug!( "{}", str::from_utf8(self.buffer.as_mut()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? .trim() ); Ok(()) } } pub struct OpenResult { pub pool: DbPool, pub new: bool, } pub fn open_upend>(dirpath: P) -> Result { embed_migrations!("./migrations/upend/"); let database_path: PathBuf = dirpath.as_ref().join("upend.sqlite3"); let new = !database_path.exists(); let manager = ConnectionManager::::new(database_path.to_str().unwrap()); let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { enable_foreign_keys: true, busy_timeout: Some(Duration::from_secs(3)), })) .build(manager) .expect("Failed to create pool."); embedded_migrations::run_with_output( &pool.get().unwrap(), &mut LoggerSink { ..Default::default() }, )?; Ok(OpenResult { pool, new }) }