use crate::models::NewFile; use actix::prelude::*; use actix_derive::Message; use anyhow::Result; use diesel::prelude::*; use diesel::r2d2::{self, ConnectionManager}; use diesel::sqlite::SqliteConnection; use log::debug; use std::path::{Path, PathBuf}; use std::time::Duration; pub type DbPool = r2d2::Pool>; pub struct DbExecutor(pub DbPool); impl Actor for DbExecutor { type Context = SyncContext; } #[derive(Message)] #[rtype(result = "Result")] pub struct InsertFile { pub file: NewFile, } #[derive(Message)] #[rtype(result = "Result>")] pub struct RetrieveByHash { pub hash: String, } #[derive(Message)] #[rtype(result = "Result>")] pub struct LookupByFilename { pub query: String, } impl Handler for DbExecutor { type Result = Result; fn handle(&mut self, msg: InsertFile, _: &mut Self::Context) -> Self::Result { use crate::schema::files; let connection = &self.0.get()?; debug!("Inserting {:?}...", msg.file); Ok(diesel::insert_into(files::table) .values(msg.file) .execute(connection)?) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: RetrieveByHash, _: &mut Self::Context) -> Self::Result { use crate::models::File; use crate::schema::files::dsl::*; let connection = &self.0.get()?; let matches = files .filter(hash.eq(msg.hash)) .filter(valid.eq(true)) .load::(connection)?; Ok(matches.get(0).map(|f| f.path.clone())) } } impl Handler for DbExecutor { type Result = Result>; fn handle(&mut self, msg: LookupByFilename, _: &mut Self::Context) -> Self::Result { use crate::models::File; use crate::schema::files::dsl::*; let connection = &self.0.get()?; let matches = files .filter(path.like(format!("%{}%", msg.query))) .filter(valid.eq(true)) .load::(connection)?; Ok(matches) } } #[derive(Debug)] pub struct ConnectionOptions { pub enable_foreign_keys: bool, pub busy_timeout: Option, } impl ConnectionOptions { pub fn apply(&self, conn: &SqliteConnection) -> QueryResult<()> { if self.enable_foreign_keys { conn.execute("PRAGMA foreign_keys = ON;")?; } if let Some(duration) = self.busy_timeout { conn.execute(&format!("PRAGMA busy_timeout = {};", duration.as_millis()))?; } Ok(()) } } impl diesel::r2d2::CustomizeConnection for ConnectionOptions { fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> { self.apply(conn).map_err(diesel::r2d2::Error::QueryError) } } #[derive(Default)] struct LoggerSink { buffer: Vec, } impl std::io::Write for LoggerSink { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.buffer.extend(buf.iter()); if self.buffer.ends_with(b"\n") { self.flush()?; } Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { use std::str; debug!( "{}", str::from_utf8(self.buffer.as_mut()) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))? .trim() ); Ok(()) } } pub struct OpenResult { pub pool: DbPool, pub new: bool, } pub fn open_upend>(dirpath: P) -> Result { embed_migrations!("./migrations/upend/"); let database_path: PathBuf = dirpath.as_ref().join("upend.sqlite3"); let new = !database_path.exists(); let manager = ConnectionManager::::new(database_path.to_str().unwrap()); let pool = r2d2::Pool::builder() .connection_customizer(Box::new(ConnectionOptions { enable_foreign_keys: true, busy_timeout: Some(Duration::from_secs(3)), })) .build(manager) .expect("Failed to create pool."); embedded_migrations::run_with_output( &pool.get().unwrap(), &mut LoggerSink { ..Default::default() }, )?; Ok(OpenResult { pool, new }) } // extern crate xdg; // pub fn open_config() -> Result> { // embed_migrations!("./migrations/config/"); // let dirpath = xdg::BaseDirectories::with_prefix("upend") // .unwrap() // .place_config_file("config.sqlite3") // .expect("Could not create config file?!"); // let database_url = dirpath.join("base.sqlite3"); // let connection = SqliteConnection::establish(database_url.to_str().unwrap())?; // embedded_migrations::run_with_output(&connection, &mut std::io::stdout())?; // Ok(connection) // }