initial commit

feat/vaults
Tomáš Mládek 2020-08-27 00:11:50 +02:00
commit 48cf5889e7
15 changed files with 2490 additions and 0 deletions

1
.env Normal file
View File

@ -0,0 +1 @@
DATABASE_URL=upend.sqlite3

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/target
**/*.rs.bk
upend.sqlite3

2064
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

32
Cargo.toml Normal file
View File

@ -0,0 +1,32 @@
[package]
name = "upend-rust"
version = "0.1.0"
authors = ["Tomáš Mládek <t@mldk.cz>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = "2.33.0"
log = "0.4"
env_logger = "0.7.1"
anyhow = "1.0"
diesel = { version = "1.4.4", features=["sqlite", "r2d2"] }
diesel_migrations = "1.4.0"
actix = "0.9.0"
actix-web = "2.0"
actix-files = "0.2.2"
actix-rt = "1.0.0"
actix_derive = "0.3.2"
filebuffer = "0.4.0"
walkdir = "2"
tiny-keccak = { version = "2.0", features = ["k12"] }
bs58 = "0.3.1"
dotenv = "0.15.0"
xdg = "^2.1"

5
diesel.toml Normal file
View File

@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/schema.rs"

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE vaults;

View File

@ -0,0 +1,5 @@
-- Your SQL goes here
CREATE TABLE vaults (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path VARCHAR NOT NULL
)

View File

@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
DROP TABLE files;

View File

@ -0,0 +1,9 @@
CREATE TABLE files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash VARCHAR NOT NULL,
path VARCHAR NOT NULL,
size BIGINT NOT NULL,
valid BOOLEAN NOT NULL DEFAULT TRUE
);
CREATE INDEX files_hash ON files(hash);

122
src/database.rs Normal file
View File

@ -0,0 +1,122 @@
use crate::models::NewFile;
use actix::prelude::*;
use actix_derive::Message;
use anyhow::Result;
use diesel::prelude::*;
use diesel::r2d2::{self, ConnectionManager};
use diesel::sqlite::SqliteConnection;
use log::debug;
use std::error::Error;
use std::path::Path;
use std::time::Duration;
pub type DbPool = r2d2::Pool<ConnectionManager<SqliteConnection>>;
pub struct DbExecutor(pub DbPool);
impl Actor for DbExecutor {
type Context = SyncContext<Self>;
}
#[derive(Message)]
#[rtype(result = "Result<usize>")]
pub struct InsertFile {
pub file: NewFile,
}
#[derive(Message)]
#[rtype(result = "Result<Option<String>>")]
pub struct RetrieveByHash {
pub hash: String,
}
impl Handler<InsertFile> for DbExecutor {
type Result = Result<usize>;
fn handle(&mut self, msg: InsertFile, _: &mut Self::Context) -> Self::Result {
use crate::schema::files;
let connection = &self.0.get()?;
debug!("Inserting {:?}...", msg.file);
Ok(diesel::insert_into(files::table)
.values(msg.file)
.execute(connection)?)
}
}
impl Handler<RetrieveByHash> for DbExecutor {
type Result = Result<Option<String>>;
fn handle(&mut self, msg: RetrieveByHash, _: &mut Self::Context) -> Self::Result {
use crate::models::File;
use crate::schema::files::dsl::*;
let connection = &self.0.get()?;
let matches = files.filter(hash.eq(msg.hash)).load::<File>(connection)?;
Ok(matches.get(0).map(|f| f.path.clone()))
}
}
#[derive(Debug)]
pub struct ConnectionOptions {
pub enable_foreign_keys: bool,
pub busy_timeout: Option<Duration>,
}
impl ConnectionOptions {
pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> {
if self.enable_foreign_keys {
conn.execute("PRAGMA foreign_keys = ON;")?;
}
if let Some(duration) = self.busy_timeout {
conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?;
}
Ok(())
}
}
impl diesel::r2d2::CustomizeConnection<SqliteConnection, diesel::r2d2::Error>
for ConnectionOptions
{
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
self.apply(conn).map_err(diesel::r2d2::Error::QueryError)
}
}
pub fn open_upend<P: AsRef<Path>>(dirpath: P) -> Result<DbPool, Box<dyn Error>> {
embed_migrations!("./migrations/upend/");
let database_path = dirpath.as_ref().join("upend.sqlite3");
let manager = ConnectionManager::<SqliteConnection>::new(database_path.to_str().unwrap());
let pool = r2d2::Pool::builder()
.connection_customizer(Box::new(ConnectionOptions {
enable_foreign_keys: true,
busy_timeout: Some(Duration::from_secs(3)),
}))
.build(manager)
.expect("Failed to create pool.");
embedded_migrations::run_with_output(&pool.get().unwrap(), &mut std::io::stdout())?;
Ok(pool)
}
// extern crate xdg;
// pub fn open_config() -> Result<SqliteConnection, Box<dyn Error>> {
// embed_migrations!("./migrations/config/");
// let dirpath = xdg::BaseDirectories::with_prefix("upend")
// .unwrap()
// .place_config_file("config.sqlite3")
// .expect("Could not create config file?!");
// let database_url = dirpath.join("base.sqlite3");
// let connection = SqliteConnection::establish(database_url.to_str().unwrap())?;
// embedded_migrations::run_with_output(&connection, &mut std::io::stdout())?;
// Ok(connection)
// }

111
src/dataops.rs Normal file
View File

@ -0,0 +1,111 @@
use crate::models::NewFile;
use anyhow::Result;
use filebuffer::FileBuffer;
use log::info;
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use tiny_keccak::{Hasher, KangarooTwelve};
use walkdir::WalkDir;
use actix::prelude::*;
// use rayon::prelude::*;
// pub struct VaultUpdater(
// pub Addr<crate::database::DbExecutor>,
// pub Addr<HasherWorker>,
// );
// impl Actor for VaultUpdater {
// type Context = Context<Self>;
// }
// struct UpdateDirectory<'a> {
// path: &'a Path,
// }
// impl Message for UpdateDirectory<'_> {
// type Result = Result<(), Box<dyn error::Error>>;
// }
// impl Handler<UpdateDirectory<'_>> for VaultUpdater {
// type Result = Result<(), Box<dyn error::Error>>;
// fn handle(&mut self, msg: UpdateDirectory, _: &mut Self::Context) -> Self::Result {
// update_directory(msg.path, &self.0, &self.1).await
// }
// }
pub async fn update_directory<T: AsRef<Path>>(
directory: T,
db_executor: &Addr<crate::database::DbExecutor>,
hasher_worker: &Addr<HasherWorker>,
) -> Result<()> {
for entry in WalkDir::new(&directory)
.into_iter()
.filter_map(|e| e.ok())
.filter(|e| e.path().is_file())
{
info!("Processing: {}", entry.path().display());
let metadata = fs::metadata(entry.path())?;
let size = metadata.len() as i64;
if size < 0 {
panic!("File {} too large?!", entry.path().display());
}
let msg = ComputeHash {
path: entry.path().to_path_buf(),
};
let digest = hasher_worker.send(msg).await;
let new_file = NewFile {
path: entry
.path()
.to_str()
.expect("path not valid unicode?!")
.to_string(),
hash: digest.unwrap().unwrap(),
size: size,
};
let _insert_result = db_executor
.send(crate::database::InsertFile { file: new_file })
.await?;
}
info!("Finished updating {}.", directory.as_ref().display());
Ok(())
}
pub struct HasherWorker;
impl Actor for HasherWorker {
type Context = SyncContext<Self>;
}
#[derive(Message)]
#[rtype(result = "Result<String, io::Error>")]
struct ComputeHash {
path: PathBuf,
}
impl Handler<ComputeHash> for HasherWorker {
type Result = Result<String, io::Error>;
fn handle(&mut self, msg: ComputeHash, _: &mut Self::Context) -> Self::Result {
compute_digest(msg.path)
}
}
pub fn compute_digest<P: AsRef<Path>>(filepath: P) -> Result<String, io::Error> {
let fbuffer = FileBuffer::open(&filepath)?;
let mut k12 = KangarooTwelve::new(b"");
k12.update(&fbuffer);
let mut result = [0u8; 256 / 8];
k12.finalize(&mut result);
Ok(bs58::encode(&result).into_string())
}

106
src/main.rs Normal file
View File

@ -0,0 +1,106 @@
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate diesel_migrations;
use actix::prelude::*;
use actix_files::NamedFile;
use actix_web::{error, get, middleware, post, web, App, Error, HttpResponse, HttpServer};
use clap::{App as ClapApp, Arg};
use std::env;
mod database;
mod dataops;
mod models;
mod schema;
use env_logger;
use log::info;
use std::fs;
use std::path::PathBuf;
struct State {
directory: PathBuf,
db: Addr<database::DbExecutor>,
hasher: Addr<dataops::HasherWorker>,
}
#[get("/raw/{hash}")]
async fn get_raw(state: web::Data<State>, hash: web::Path<String>) -> Result<NamedFile, Error> {
let response = state
.db
.send(database::RetrieveByHash {
hash: hash.into_inner(),
})
.await?;
match response {
Ok(result) => match result {
Some(path) => Ok(NamedFile::open(path)?),
None => Err(error::ErrorNotFound("NOT FOUND")),
},
Err(e) => Err(error::ErrorInternalServerError(e)),
}
}
#[post("/api/refresh")]
async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
dataops::update_directory(&state.directory, &state.db, &state.hasher)
.await
.map_err(|_| error::ErrorInternalServerError("UPDATE ERROR"))?;
Ok(HttpResponse::Ok().finish())
}
const VERSION: &'static str = env!("CARGO_PKG_VERSION");
fn main() -> std::io::Result<()> {
let env = env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info");
env_logger::init_from_env(env);
let app = ClapApp::new("upend")
.version(VERSION)
.author("Tomáš Mládek <t@mldk.cz>")
.arg(Arg::with_name("DIRECTORY").required(true).index(1))
.arg(
Arg::with_name("BIND")
.long("bind")
.default_value("127.0.0.1:8093")
.help("address and port to bind the Web interface on")
.required(true),
);
let matches = app.get_matches();
info!("Starting UpEnd {}...", VERSION);
let dirname = matches.value_of("DIRECTORY").unwrap();
let path = PathBuf::from(dirname);
let _ = fs::remove_file(format!("{}/upend.sqlite3", dirname)); // TODO REMOVE!!!
let db_pool = database::open_upend(&dirname).expect("failed to open database!");
let sys = actix::System::new("upend");
// let connection = db_pool.get().expect("Could not get SQL connection.");
let db_addr = SyncArbiter::start(3, move || database::DbExecutor(db_pool.clone()));
let hash_addr = SyncArbiter::start(4, move || dataops::HasherWorker);
let bind = matches.value_of("BIND").unwrap();
info!("Starting server at: {}", &bind);
// Start HTTP server
HttpServer::new(move || {
App::new()
.data(State {
directory: path.clone(),
db: db_addr.clone(),
hasher: hash_addr.clone(),
})
.wrap(middleware::Logger::default())
.service(get_raw)
.service(api_refresh)
})
.bind(&bind)?
.run();
sys.run()
}

18
src/models.rs Normal file
View File

@ -0,0 +1,18 @@
use super::schema::files;
#[derive(Queryable)]
pub struct File {
pub id: i32,
pub hash: String,
pub path: String,
pub size: i64,
pub valid: bool,
}
#[derive(Insertable, Debug)]
#[table_name = "files"]
pub struct NewFile {
pub hash: String,
pub path: String,
pub size: i64,
}

0
src/routes.rs Normal file
View File

9
src/schema.rs Normal file
View File

@ -0,0 +1,9 @@
table! {
files (id) {
id -> Integer,
hash -> Text,
path -> Text,
size -> BigInt,
valid -> Bool,
}
}