diff --git a/Cargo.lock b/Cargo.lock index fe2c15a..64a539f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2884,7 +2884,6 @@ dependencies = [ "actix-web", "actix_derive", "anyhow", - "async-trait", "bs58", "built", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 1b4cd19..07bf4af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,6 @@ image = { version = "0.23.14", optional = true } webp = { version = "0.2.0", optional = true } webpage = { version = "1.4.0", optional = true } -async-trait = "0.1.52" [build-dependencies] built = "0.5.1" diff --git a/src/extractors/mod.rs b/src/extractors/mod.rs index 73dde18..8628242 100644 --- a/src/extractors/mod.rs +++ b/src/extractors/mod.rs @@ -4,45 +4,28 @@ use crate::{ util::jobs::JobContainer, }; use anyhow::Result; -use async_trait::async_trait; use std::sync::{Arc, RwLock}; #[cfg(feature = "extractors-web")] pub mod web; -#[async_trait] pub trait Extractor { - async fn get( - &self, - address: Address, - job_container: Arc>, - ) -> Result>; + fn get(&self, address: Address, job_container: Arc>) + -> Result>; - async fn insert_info( + fn insert_info( &self, address: Address, connection: UpEndConnection, job_container: Arc>, ) -> Result<()> { - let entries = self.get(address, job_container).await?; + let entries = self.get(address, job_container)?; - Ok(actix_web::web::block::<_, _, anyhow::Error>(move || { - connection.transaction(|| { - for entry in entries { - connection.insert_entry(entry)?; - } - Ok(()) - }) + connection.transaction(|| { + for entry in entries { + connection.insert_entry(entry)?; + } + Ok(()) }) - .await?) - } - - async fn insert_info_fnf( - &self, - address: Address, - connection: UpEndConnection, - job_container: Arc>, - ) { - let _ = self.insert_info(address, connection, job_container).await; } } diff --git a/src/extractors/web.rs b/src/extractors/web.rs index f573dc9..ae9808c 100644 --- a/src/extractors/web.rs +++ b/src/extractors/web.rs @@ -4,17 +4,14 @@ use crate::{ database::entry::Entry, util::jobs::{Job, JobContainer, State}, }; -use actix_web::web; use anyhow::anyhow; -use async_trait::async_trait; use std::sync::{Arc, RwLock}; use webpage::{Webpage, WebpageOptions}; pub struct WebExtractor; -#[async_trait] impl Extractor for WebExtractor { - async fn get( + fn get( &self, address: Address, job_container: Arc>, @@ -27,9 +24,7 @@ impl Extractor for WebExtractor { .unwrap(); let webpage_url = url.clone(); - let webpage_get = - web::block(move || Webpage::from_url(&webpage_url, WebpageOptions::default())) - .await; + let webpage_get = Webpage::from_url(&webpage_url, WebpageOptions::default()); if let Ok(webpage) = webpage_get { let _ = job_container @@ -37,7 +32,7 @@ impl Extractor for WebExtractor { .unwrap() .update_progress(&job_id, 50.0); - let address = Address::Url(url.clone()); + let address = Address::Url(url); let mut entries = vec![ webpage.html.title.map(|html_title| Entry { entity: address.clone(), @@ -79,7 +74,7 @@ impl Extractor for WebExtractor { .update_state(&job_id, State::Failed); Err(anyhow!("Failed for unknown reason.")) } else { - Err(anyhow!("Can only extract info for URLs.")) + Ok(vec![]) } } } diff --git a/src/filesystem.rs b/src/filesystem.rs index f7dc084..5883739 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -51,40 +51,41 @@ fn initialize_types(connection: &UpEndConnection) -> Result<()> { Ok(()) } -pub async fn rescan_vault( - db: Arc, +pub fn rescan_vault>( + db: D, job_container: Arc>, quick_check: bool, disable_synchronous: bool, -) { - let add_result = job_container +) -> Result> { + let job_id_result = job_container .write() .unwrap() .add_job(Job::new("REIMPORT", "Reimporting vault...")); - if let Ok(job_id) = add_result { - let job_container_rescan = job_container.clone(); - let result = actix_web::web::block(move || { - rescan_vault_inner( + match job_id_result { + Ok(job_id) => { + let job_container_rescan = job_container.clone(); + let result = rescan_vault_inner( db, job_container_rescan, job_id, quick_check, disable_synchronous, - ) - }) - .await; + ); - if result.is_err() { - let err = result.err().unwrap(); - error!("Update did not succeed! {:?}", err); + if let Err(err) = &result { + error!("Update did not succeed! {:?}", err); - job_container - .write() - .unwrap() - .update_state(&job_id, State::Failed) - .unwrap(); + job_container + .write() + .unwrap() + .update_state(&job_id, State::Failed) + .unwrap(); + } + + result } + Err(err) => Err(err.into()), } } struct PragmaSynchronousGuard<'a>(&'a UpEndConnection); @@ -105,7 +106,7 @@ impl Drop for PragmaSynchronousGuard<'_> { type UpdatePathResult = Result; #[derive(Debug)] -enum UpdatePathOutcome { +pub enum UpdatePathOutcome { Added(PathBuf), Unchanged(PathBuf), Removed(PathBuf), @@ -524,8 +525,7 @@ mod test { .unwrap(); // Initial scan - let rescan_result = - rescan_vault_inner(&open_result.db, job_container.clone(), job_id, quick, true); + let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); diff --git a/src/main.rs b/src/main.rs index a6bbb2a..3477302 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use std::sync::{Arc, RwLock}; use crate::{ common::{get_static_dir, PKG_VERSION}, database::UpEndDatabase, + util::exec::block_background, }; mod addressing; @@ -218,9 +219,9 @@ fn main() -> Result<()> { if !matches.is_present("NO_INITIAL_UPDATE") { info!("Running initial update..."); if open_result.new { - actix::spawn(filesystem::rescan_vault(upend, job_container, false, true)); + block_background(|| filesystem::rescan_vault(upend, job_container, false, true)); } else { - actix::spawn(filesystem::rescan_vault(upend, job_container, true, false)); + block_background(|| filesystem::rescan_vault(upend, job_container, true, false)); } } diff --git a/src/routes.rs b/src/routes.rs index 6570e66..494235d 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -7,6 +7,7 @@ use crate::database::UpEndDatabase; use crate::extractors::Extractor; use crate::filesystem::add_file; use crate::previews::PreviewStore; +use crate::util::exec::block_background; use crate::util::hash::{b58_decode, b58_encode, Hashable}; use crate::util::jobs::JobContainer; use actix_files::NamedFile; @@ -362,13 +363,16 @@ pub async fn put_object( }], Address::Url(url) => { #[cfg(feature = "extractors-web")] - actix::spawn( - (crate::extractors::web::WebExtractor {}).insert_info_fnf( - address.clone(), - state.upend.connection().map_err(ErrorInternalServerError)?, - state.job_container.clone(), - ), - ); + { + let _address = address.clone(); + block_background(move || { + (crate::extractors::web::WebExtractor {}).insert_info( + _address, + state.upend.connection()?, + state.job_container.clone(), + ) + }); + } vec![Entry { entity: address.clone(), attribute: LABEL_ATTR.to_string(), @@ -606,12 +610,14 @@ pub async fn api_refresh( state: web::Data, web::Query(query): web::Query, ) -> Result { - actix::spawn(crate::filesystem::rescan_vault( - state.upend.clone(), - state.job_container.clone(), - query.full.is_none(), - false, - )); + block_background(move || { + crate::filesystem::rescan_vault( + state.upend.clone(), + state.job_container.clone(), + query.full.is_none(), + false, + ) + }); Ok(HttpResponse::Ok().finish()) } diff --git a/src/util/exec.rs b/src/util/exec.rs new file mode 100644 index 0000000..667bd44 --- /dev/null +++ b/src/util/exec.rs @@ -0,0 +1,18 @@ +use actix_web::web; + +pub fn block_background(f: F) +where + F: FnOnce() -> Result + Send + 'static, + I: Send + 'static, + E: Send + std::fmt::Debug + 'static, +{ + async fn inner(f: F) + where + F: FnOnce() -> Result + Send + 'static, + I: Send + 'static, + E: Send + std::fmt::Debug + 'static, + { + let _ = web::block(f).await; + } + actix::spawn(inner(f)); +} diff --git a/src/util/jobs.rs b/src/util/jobs.rs index cf95c3e..ddcaf33 100644 --- a/src/util/jobs.rs +++ b/src/util/jobs.rs @@ -74,6 +74,14 @@ impl Serialize for JobId { #[derive(Debug, Clone)] pub struct JobInProgessError(String); +impl std::fmt::Display for JobInProgessError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "job of type {} is already in progress", self.0) + } +} + +impl std::error::Error for JobInProgessError {} + impl JobContainer { pub fn add_job(&mut self, job: Job) -> Result { if let Some(job_type) = &job.job_type { diff --git a/src/util/mod.rs b/src/util/mod.rs index 8bb5b6a..4035250 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,3 +1,4 @@ +pub mod exec; pub mod hash; pub mod jobs;