2020-09-12 14:27:45 +02:00
|
|
|
use std::convert::TryFrom;
|
2020-09-07 21:21:54 +02:00
|
|
|
use std::io::{Cursor, Write};
|
|
|
|
use std::path::{Path, PathBuf};
|
|
|
|
use std::time::Duration;
|
|
|
|
|
2020-08-27 00:11:50 +02:00
|
|
|
use actix::prelude::*;
|
|
|
|
use actix_derive::Message;
|
2020-09-07 21:21:54 +02:00
|
|
|
use anyhow::{anyhow, Result};
|
2020-08-27 00:11:50 +02:00
|
|
|
use diesel::prelude::*;
|
|
|
|
use diesel::r2d2::{self, ConnectionManager};
|
|
|
|
use diesel::sqlite::SqliteConnection;
|
2020-09-13 13:20:35 +02:00
|
|
|
use log::debug;
|
2020-09-07 21:21:54 +02:00
|
|
|
|
|
|
|
use crate::addressing::Address;
|
|
|
|
use crate::hash::{decode, encode, hash, Hash, Hashable};
|
|
|
|
use crate::models;
|
2020-09-12 14:43:42 +02:00
|
|
|
use serde::export::Formatter;
|
2020-09-12 15:02:03 +02:00
|
|
|
use std::fs;
|
2020-09-07 21:21:54 +02:00
|
|
|
|
2020-09-12 14:27:45 +02:00
|
|
|
#[derive(Debug, Clone)]
|
2020-09-07 21:21:54 +02:00
|
|
|
pub struct Entry {
|
2020-09-12 14:27:45 +02:00
|
|
|
pub identity: Hash,
|
|
|
|
pub target: Address,
|
|
|
|
pub key: String,
|
|
|
|
pub value: EntryValue,
|
2020-09-07 21:21:54 +02:00
|
|
|
}
|
|
|
|
|
2020-09-12 14:27:45 +02:00
|
|
|
#[derive(Debug, Clone)]
|
2020-09-07 21:21:54 +02:00
|
|
|
pub struct InnerEntry {
|
2020-09-12 14:27:45 +02:00
|
|
|
pub target: Address,
|
|
|
|
pub key: String,
|
|
|
|
pub value: EntryValue,
|
2020-09-07 21:21:54 +02:00
|
|
|
}
|
|
|
|
|
2020-09-12 14:27:45 +02:00
|
|
|
#[derive(Debug, Clone)]
|
2020-09-07 21:21:54 +02:00
|
|
|
pub enum EntryValue {
|
|
|
|
Value(serde_json::Value),
|
|
|
|
Address(Address),
|
|
|
|
Invalid,
|
|
|
|
}
|
|
|
|
|
2020-09-12 14:27:45 +02:00
|
|
|
impl TryFrom<models::Entry> for Entry {
|
|
|
|
type Error = anyhow::Error;
|
|
|
|
|
|
|
|
fn try_from(e: models::Entry) -> Result<Self, Self::Error> {
|
|
|
|
Ok(Entry {
|
|
|
|
identity: Hash(e.identity),
|
|
|
|
target: Address::decode(&e.target)?,
|
|
|
|
key: e.key,
|
|
|
|
value: e.value.parse().unwrap(),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-12 14:43:42 +02:00
|
|
|
impl std::fmt::Display for InnerEntry {
|
|
|
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
|
|
|
write!(f, "{} | {} | {}", self.target, self.key, self.value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-07 21:21:54 +02:00
|
|
|
impl EntryValue {
|
2020-09-13 13:20:35 +02:00
|
|
|
pub fn to_str(&self) -> Result<String> {
|
2020-09-07 21:21:54 +02:00
|
|
|
let (type_char, content) = match self {
|
|
|
|
EntryValue::Value(value) => ('J', serde_json::to_string(value)?),
|
2020-09-13 13:20:35 +02:00
|
|
|
EntryValue::Address(address) => ('O', address.to_string()),
|
2020-09-07 21:21:54 +02:00
|
|
|
EntryValue::Invalid => return Err(anyhow!("Cannot serialize invalid Entity value.")),
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(format!("{}{}", type_char, content))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-12 14:43:42 +02:00
|
|
|
// unsafe unwraps!
|
|
|
|
impl std::fmt::Display for EntryValue {
|
|
|
|
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
|
|
|
let (entry_type, entry_value) = match self {
|
2020-09-13 13:20:35 +02:00
|
|
|
EntryValue::Address(address) => ("ADDRESS", address.to_string()),
|
2020-09-12 14:43:42 +02:00
|
|
|
EntryValue::Value(value) => ("VALUE", serde_json::to_string(value).unwrap()),
|
|
|
|
EntryValue::Invalid => ("INVALID", "INVALID".to_string()),
|
|
|
|
};
|
|
|
|
write!(f, "{}: {}", entry_type, entry_value)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-07 21:21:54 +02:00
|
|
|
impl std::str::FromStr for EntryValue {
|
|
|
|
type Err = std::convert::Infallible;
|
|
|
|
|
|
|
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
|
|
if s.len() < 2 {
|
|
|
|
Ok(EntryValue::Invalid)
|
|
|
|
} else {
|
|
|
|
let (type_char, content) = s.split_at(1);
|
|
|
|
match (type_char, content) {
|
|
|
|
("J", content) => {
|
|
|
|
let value = serde_json::from_str(content);
|
|
|
|
if value.is_ok() {
|
|
|
|
Ok(EntryValue::Value(value.unwrap()))
|
|
|
|
} else {
|
|
|
|
Ok(EntryValue::Invalid)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
("O", content) => {
|
|
|
|
let addr = decode(content).and_then(|v| Address::decode(&v));
|
|
|
|
if addr.is_ok() {
|
|
|
|
Ok(EntryValue::Address(addr.unwrap()))
|
|
|
|
} else {
|
|
|
|
Ok(EntryValue::Invalid)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
_ => Ok(EntryValue::Invalid),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2020-08-27 00:11:50 +02:00
|
|
|
|
|
|
|
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
|
|
|
|
|
|
|
|
pub struct DbExecutor(pub DbPool);
|
|
|
|
|
|
|
|
impl Actor for DbExecutor {
|
|
|
|
type Context = SyncContext<Self>;
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Message)]
|
|
|
|
#[rtype(result = "Result<usize>")]
|
|
|
|
pub struct InsertFile {
|
2020-09-07 21:21:54 +02:00
|
|
|
pub file: models::NewFile,
|
2020-08-27 00:11:50 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Message)]
|
|
|
|
#[rtype(result = "Result<Option<String>>")]
|
|
|
|
pub struct RetrieveByHash {
|
2020-09-12 14:27:45 +02:00
|
|
|
pub hash: Hash,
|
2020-08-27 00:11:50 +02:00
|
|
|
}
|
|
|
|
|
2020-08-27 01:29:44 +02:00
|
|
|
#[derive(Message)]
|
2020-09-07 21:21:54 +02:00
|
|
|
#[rtype(result = "Result<Vec<models::File>>")]
|
2020-08-27 01:29:44 +02:00
|
|
|
pub struct LookupByFilename {
|
|
|
|
pub query: String,
|
|
|
|
}
|
|
|
|
|
2020-09-07 21:21:54 +02:00
|
|
|
#[derive(Message)]
|
|
|
|
#[rtype(result = "Result<Vec<Entry>>")]
|
2020-09-12 14:27:45 +02:00
|
|
|
pub struct RetrieveObject {
|
|
|
|
pub target: Address,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Message)]
|
|
|
|
#[rtype(result = "Result<Vec<Entry>>")]
|
|
|
|
pub struct QueryEntries {
|
|
|
|
pub target: Option<Address>,
|
|
|
|
pub key: Option<String>,
|
|
|
|
pub value: Option<EntryValue>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for QueryEntries {
|
|
|
|
fn default() -> Self {
|
|
|
|
QueryEntries {
|
|
|
|
target: None,
|
|
|
|
key: None,
|
|
|
|
value: None,
|
|
|
|
}
|
|
|
|
}
|
2020-09-07 21:21:54 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Message)]
|
|
|
|
#[rtype(result = "Result<usize>")]
|
|
|
|
pub struct InsertEntry {
|
|
|
|
pub entry: InnerEntry,
|
|
|
|
}
|
|
|
|
|
2020-08-27 00:11:50 +02:00
|
|
|
impl Handler<InsertFile> for DbExecutor {
|
|
|
|
type Result = Result<usize>;
|
|
|
|
|
|
|
|
fn handle(&mut self, msg: InsertFile, _: &mut Self::Context) -> Self::Result {
|
|
|
|
use crate::schema::files;
|
|
|
|
|
|
|
|
let connection = &self.0.get()?;
|
|
|
|
|
2020-09-12 14:27:45 +02:00
|
|
|
debug!(
|
2020-09-13 13:20:35 +02:00
|
|
|
"Inserting {} ({}) @ {}...",
|
2020-09-12 14:27:45 +02:00
|
|
|
&msg.file.path,
|
2020-09-13 13:20:35 +02:00
|
|
|
encode(&msg.file.hash),
|
|
|
|
Address::Hash(Hash((&msg.file.hash).clone()))
|
2020-09-12 14:27:45 +02:00
|
|
|
);
|
2020-08-27 00:11:50 +02:00
|
|
|
|
|
|
|
Ok(diesel::insert_into(files::table)
|
|
|
|
.values(msg.file)
|
|
|
|
.execute(connection)?)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Handler<RetrieveByHash> for DbExecutor {
|
|
|
|
type Result = Result<Option<String>>;
|
|
|
|
|
|
|
|
fn handle(&mut self, msg: RetrieveByHash, _: &mut Self::Context) -> Self::Result {
|
|
|
|
use crate::schema::files::dsl::*;
|
|
|
|
|
|
|
|
let connection = &self.0.get()?;
|
|
|
|
|
2020-08-27 01:30:55 +02:00
|
|
|
let matches = files
|
|
|
|
.filter(valid.eq(true))
|
2020-09-12 14:27:45 +02:00
|
|
|
.filter(hash.eq(msg.hash.0))
|
2020-09-07 21:21:54 +02:00
|
|
|
.load::<models::File>(connection)?;
|
2020-08-27 00:11:50 +02:00
|
|
|
|
|
|
|
Ok(matches.get(0).map(|f| f.path.clone()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-27 01:29:44 +02:00
|
|
|
impl Handler<LookupByFilename> for DbExecutor {
|
2020-09-07 21:21:54 +02:00
|
|
|
type Result = Result<Vec<models::File>>;
|
2020-08-27 01:29:44 +02:00
|
|
|
|
|
|
|
fn handle(&mut self, msg: LookupByFilename, _: &mut Self::Context) -> Self::Result {
|
|
|
|
use crate::schema::files::dsl::*;
|
|
|
|
|
|
|
|
let connection = &self.0.get()?;
|
|
|
|
|
|
|
|
let matches = files
|
|
|
|
.filter(path.like(format!("%{}%", msg.query)))
|
2020-08-27 01:30:55 +02:00
|
|
|
.filter(valid.eq(true))
|
2020-09-07 21:21:54 +02:00
|
|
|
.load::<models::File>(connection)?;
|
2020-08-27 01:29:44 +02:00
|
|
|
|
|
|
|
Ok(matches)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-09-12 14:27:45 +02:00
|
|
|
impl Handler<RetrieveObject> for DbExecutor {
|
2020-09-07 21:21:54 +02:00
|
|
|
type Result = Result<Vec<Entry>>;
|
|
|
|
|
2020-09-12 14:27:45 +02:00
|
|
|
fn handle(&mut self, msg: RetrieveObject, _: &mut Self::Context) -> Self::Result {
|
2020-09-07 21:21:54 +02:00
|
|
|
use crate::schema::data::dsl::*;
|
|
|
|
|
|
|
|
let connection = &self.0.get()?;
|
|
|
|
|
|
|
|
let matches = data
|
2020-09-12 14:27:45 +02:00
|
|
|
.filter(target.eq(msg.target.encode()?))
|
|
|
|
.or_filter(
|
|
|
|
value.eq(EntryValue::Address(Address::Hash(Hash(msg.target.encode()?))).to_str()?),
|
|
|
|
)
|
2020-09-07 21:21:54 +02:00
|
|
|
.load::<models::Entry>(connection)?;
|
|
|
|
let entries = matches
|
|
|
|
.into_iter()
|
2020-09-12 14:27:45 +02:00
|
|
|
.map(Entry::try_from)
|
|
|
|
.filter_map(Result::ok)
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
Ok(entries)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Handler<QueryEntries> for DbExecutor {
|
|
|
|
type Result = Result<Vec<Entry>>;
|
|
|
|
|
|
|
|
fn handle(&mut self, msg: QueryEntries, _: &mut Self::Context) -> Self::Result {
|
|
|
|
use crate::schema::data::dsl::*;
|
|
|
|
|
|
|
|
let connection = &self.0.get()?;
|
|
|
|
|
|
|
|
let mut query = data.into_boxed();
|
|
|
|
|
|
|
|
if let Some(q_target) = msg.target {
|
|
|
|
query = query.filter(target.eq(q_target.encode()?));
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(q_key) = msg.key {
|
|
|
|
query = query.filter(key.eq(q_key));
|
|
|
|
}
|
|
|
|
|
|
|
|
if let Some(q_value) = msg.value {
|
|
|
|
query = query.filter(value.eq(q_value.to_str()?));
|
|
|
|
}
|
|
|
|
|
|
|
|
let matches = query.load::<models::Entry>(connection)?;
|
|
|
|
|
|
|
|
let entries = matches
|
|
|
|
.into_iter()
|
|
|
|
.map(Entry::try_from)
|
2020-09-07 21:21:54 +02:00
|
|
|
.filter_map(Result::ok)
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
Ok(entries)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Handler<InsertEntry> for DbExecutor {
|
|
|
|
type Result = Result<usize>;
|
|
|
|
|
|
|
|
fn handle(&mut self, msg: InsertEntry, _: &mut Self::Context) -> Self::Result {
|
|
|
|
use crate::schema::data;
|
|
|
|
|
|
|
|
let connection = &self.0.get()?;
|
|
|
|
|
2020-09-12 15:02:03 +02:00
|
|
|
debug!("Inserting: {}", msg.entry);
|
2020-09-07 21:21:54 +02:00
|
|
|
|
|
|
|
let insert_entry = models::Entry {
|
|
|
|
identity: msg.entry.hash()?.0,
|
|
|
|
target: msg.entry.target.encode()?,
|
|
|
|
key: msg.entry.key,
|
|
|
|
value: msg.entry.value.to_str()?,
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(diesel::insert_into(data::table)
|
|
|
|
.values(insert_entry)
|
|
|
|
.execute(connection)?)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Hashable for InnerEntry {
|
|
|
|
fn hash(self: &InnerEntry) -> Result<Hash> {
|
|
|
|
let mut result = Cursor::new(vec![0u8; 0]);
|
|
|
|
result.write(self.target.encode()?.as_slice())?;
|
|
|
|
result.write(self.key.as_bytes())?;
|
|
|
|
result.write(self.value.to_str()?.as_bytes())?;
|
|
|
|
Ok(hash(result.get_ref()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-27 00:11:50 +02:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct ConnectionOptions {
|
|
|
|
pub enable_foreign_keys: bool,
|
|
|
|
pub busy_timeout: Option<Duration>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ConnectionOptions {
|
|
|
|
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
|
|
|
|
if self.enable_foreign_keys {
|
|
|
|
conn.execute("PRAGMA foreign_keys = ON;")?;
|
|
|
|
}
|
|
|
|
if let Some(duration) = self.busy_timeout {
|
|
|
|
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
|
|
|
|
for ConnectionOptions
|
|
|
|
{
|
|
|
|
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
|
|
|
|
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-27 01:02:28 +02:00
|
|
|
#[derive(Default)]
|
|
|
|
struct LoggerSink {
|
|
|
|
buffer: Vec<u8>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl std::io::Write for LoggerSink {
|
|
|
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
2020-08-28 13:51:22 +02:00
|
|
|
self.buffer.extend(buf.iter());
|
2020-08-27 01:02:28 +02:00
|
|
|
|
|
|
|
if self.buffer.ends_with(b"\n") {
|
|
|
|
self.flush()?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(buf.len())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn flush(&mut self) -> std::io::Result<()> {
|
|
|
|
use std::str;
|
|
|
|
|
|
|
|
debug!(
|
|
|
|
"{}",
|
|
|
|
str::from_utf8(self.buffer.as_mut())
|
|
|
|
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?
|
|
|
|
.trim()
|
|
|
|
);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-08-30 17:13:18 +02:00
|
|
|
pub struct OpenResult {
|
|
|
|
pub pool: DbPool,
|
|
|
|
pub new: bool,
|
|
|
|
}
|
|
|
|
|
2020-09-12 15:02:03 +02:00
|
|
|
const DATABASE_FILENAME: &str = "upend.sqlite3";
|
|
|
|
|
|
|
|
pub fn open_upend<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<OpenResult> {
|
2020-08-27 00:11:50 +02:00
|
|
|
embed_migrations!("./migrations/upend/");
|
|
|
|
|
2020-09-12 15:02:03 +02:00
|
|
|
let database_path: PathBuf = dirpath.as_ref().join(DATABASE_FILENAME);
|
|
|
|
if reinitialize {
|
|
|
|
let _ = fs::remove_file(&database_path);
|
|
|
|
}
|
2020-08-30 22:11:32 +02:00
|
|
|
let new = !database_path.exists();
|
2020-08-27 00:11:50 +02:00
|
|
|
|
|
|
|
let manager = ConnectionManager::<SqliteConnection>::new(database_path.to_str().unwrap());
|
|
|
|
let pool = r2d2::Pool::builder()
|
|
|
|
.connection_customizer(Box::new(ConnectionOptions {
|
|
|
|
enable_foreign_keys: true,
|
|
|
|
busy_timeout: Some(Duration::from_secs(3)),
|
|
|
|
}))
|
|
|
|
.build(manager)
|
|
|
|
.expect("Failed to create pool.");
|
|
|
|
|
2020-08-27 01:02:28 +02:00
|
|
|
embedded_migrations::run_with_output(
|
|
|
|
&pool.get().unwrap(),
|
|
|
|
&mut LoggerSink {
|
|
|
|
..Default::default()
|
|
|
|
},
|
|
|
|
)?;
|
2020-08-30 17:13:18 +02:00
|
|
|
|
|
|
|
Ok(OpenResult { pool, new })
|
2020-08-27 00:11:50 +02:00
|
|
|
}
|