From ef3a13085580bd43e5d234a3be280a46466a0b84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Wed, 2 Mar 2022 01:14:23 +0100 Subject: [PATCH] refactor Jobs and their handling, really fix infinitely hanging jobs --- src/database/constants.rs | 2 +- src/database/inner/schema.rs | 6 +- src/extractors/audio.rs | 38 ++++----- src/extractors/mod.rs | 71 +++++++--------- src/extractors/web.rs | 32 +++---- src/filesystem.rs | 57 ++++--------- src/main.rs | 6 +- src/previews/mod.rs | 31 ++----- src/routes.rs | 12 ++- src/util/jobs.rs | 160 +++++++++++++++++++++-------------- 10 files changed, 194 insertions(+), 221 deletions(-) diff --git a/src/database/constants.rs b/src/database/constants.rs index 155b50c..637c5e4 100644 --- a/src/database/constants.rs +++ b/src/database/constants.rs @@ -1,5 +1,5 @@ use crate::addressing::Address; -use crate::database::entry::{InvariantEntry}; +use crate::database::entry::InvariantEntry; pub const TYPE_TYPE_VAL: &str = "TYPE"; pub const TYPE_BASE_ATTR: &str = "TYPE"; diff --git a/src/database/inner/schema.rs b/src/database/inner/schema.rs index e9e85e8..7221c31 100644 --- a/src/database/inner/schema.rs +++ b/src/database/inner/schema.rs @@ -30,8 +30,4 @@ table! { } } -allow_tables_to_appear_in_same_query!( - data, - files, - meta, -); +allow_tables_to_appear_in_same_query!(data, files, meta,); diff --git a/src/extractors/audio.rs b/src/extractors/audio.rs index 67425ab..8cac3bf 100644 --- a/src/extractors/audio.rs +++ b/src/extractors/audio.rs @@ -7,10 +7,9 @@ use crate::{ UpEndConnection, }, filesystem::FILE_MIME_KEY, - util::jobs::{Job, JobContainer, State}, + util::jobs::{JobContainer, JobState}, }; use anyhow::{anyhow, Result}; -use std::sync::{Arc, RwLock}; pub struct ID3Extractor; @@ -19,7 +18,7 @@ impl Extractor for ID3Extractor { &self, address: &Address, connection: &UpEndConnection, - job_container: Arc>, + mut job_container: JobContainer, ) -> Result> { if let Address::Hash(hash) = address { let is_audio = connection.retrieve_object(address)?.iter().any(|e| { @@ -38,22 +37,18 @@ impl Extractor for ID3Extractor { let files = connection.retrieve_file(hash)?; if let Some(file) = files.get(0) { - let job_id = job_container - .write() - .unwrap() - .add_job(Job::new( - None, - &format!( - "Getting ID3 info from \"{:}\"", - file.path - .components() - .last() - .unwrap() - .as_os_str() - .to_string_lossy() - ), - )) - .unwrap(); + let mut job_handle = job_container.add_job( + None, + &format!( + "Getting ID3 info from \"{:}\"", + file.path + .components() + .last() + .unwrap() + .as_os_str() + .to_string_lossy() + ), + )?; let tags = id3::Tag::read_from_path(&file.path)?; @@ -79,10 +74,7 @@ impl Extractor for ID3Extractor { }) .collect(); - let _ = job_container - .write() - .unwrap() - .update_state(&job_id, State::Done); + let _ = job_handle.update_state(JobState::Done); Ok(result) } else { diff --git a/src/extractors/mod.rs b/src/extractors/mod.rs index 06a5b14..47c12d1 100644 --- a/src/extractors/mod.rs +++ b/src/extractors/mod.rs @@ -1,14 +1,14 @@ use crate::{ addressing::Address, database::{entry::Entry, UpEndConnection, UpEndDatabase}, - util::jobs::{Job, JobContainer}, + util::jobs::JobContainer, }; use anyhow::Result; use log::{info, trace}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use std::{ borrow::Borrow, - sync::{Arc, RwLock}, + sync::{Arc, Mutex, RwLock}, }; #[cfg(feature = "extractors-web")] @@ -22,7 +22,7 @@ pub trait Extractor { &self, address: &Address, connection: &UpEndConnection, - job_container: Arc>, + job_container: JobContainer, ) -> Result>; fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result { @@ -33,7 +33,7 @@ pub trait Extractor { &self, address: &Address, connection: &UpEndConnection, - job_container: Arc>, + job_container: JobContainer, ) -> Result { if self.is_needed(address, connection)? { let entries = self.get(address, connection, job_container)?; @@ -53,57 +53,50 @@ pub trait Extractor { pub fn extract_all>( db: D, - job_container: Arc>, + mut job_container: JobContainer, ) -> Result { info!("Extracting metadata for all addresses."); let db = db.borrow(); - let job_id_result = job_container - .write() - .unwrap() - .add_job(Job::new("EXTRACT_ALL", "Extracting additional metadata...")); + let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?; - match job_id_result { - Ok(job_id) => { - let all_addresses = db.connection()?.get_all_addresses()?; - let total = all_addresses.len() as f32; - let count = RwLock::new(0_usize); + let all_addresses = db.connection()?.get_all_addresses()?; + let total = all_addresses.len() as f32; + let count = RwLock::new(0_usize); + let shared_job_handle = Arc::new(Mutex::new(job_handle)); - let result = all_addresses - .par_iter() - .map(|address| { - let connection = db.connection()?; - let extract_result = extract(address, &connection, job_container.clone()); + let result = all_addresses + .par_iter() + .map(|address| { + let connection = db.connection()?; + let extract_result = extract(address, &connection, job_container.clone()); - let mut cnt = count.write().unwrap(); - *cnt += 1; + let mut cnt = count.write().unwrap(); + *cnt += 1; - job_container - .write() - .unwrap() - .update_progress(&job_id, *cnt as f32 / total * 100.0)?; + shared_job_handle + .lock() + .unwrap() + .update_progress(*cnt as f32 / total * 100.0)?; - extract_result - }) - .flatten() - .sum(); + extract_result + }) + .flatten() + .sum(); - info!( - "Done extracting metadata; processed {} addresses, added {} entries.", - all_addresses.len(), - result - ); + info!( + "Done extracting metadata; processed {} addresses, added {} entries.", + all_addresses.len(), + result + ); - Ok(result) - } - Err(err) => Err(err.into()), - } + Ok(result) } pub fn extract( address: &Address, connection: &UpEndConnection, - job_container: Arc>, + job_container: JobContainer, ) -> Result { let mut entry_count = 0; trace!("Extracting metadata for {address:?}"); diff --git a/src/extractors/web.rs b/src/extractors/web.rs index a1422fa..c683676 100644 --- a/src/extractors/web.rs +++ b/src/extractors/web.rs @@ -2,11 +2,11 @@ use super::Extractor; use crate::{ addressing::Address, database::{entry::Entry, UpEndConnection}, - util::jobs::{Job, JobContainer, State}, + util::jobs::{JobContainer, JobState}, }; use anyhow::anyhow; use anyhow::Result; -use std::sync::{Arc, RwLock}; + use webpage::{Webpage, WebpageOptions}; pub struct WebExtractor; @@ -16,23 +16,17 @@ impl Extractor for WebExtractor { &self, address: &Address, _: &UpEndConnection, - job_container: Arc>, + mut job_container: JobContainer, ) -> anyhow::Result> { if let Address::Url(url) = address { - let job_id = job_container - .write() - .unwrap() - .add_job(Job::new(None, &format!("Getting info about {url:?}"))) - .unwrap(); + let mut job_handle = + job_container.add_job(None, &format!("Getting info about {url:?}"))?; let webpage_url = url.clone(); let webpage_get = Webpage::from_url(&webpage_url, WebpageOptions::default()); if let Ok(webpage) = webpage_get { - let _ = job_container - .write() - .unwrap() - .update_progress(&job_id, 50.0); + let _ = job_handle.update_progress(50.0); let mut entries = vec![ webpage.html.title.map(|html_title| Entry { @@ -61,18 +55,10 @@ impl Extractor for WebExtractor { })) } - let _ = job_container - .write() - .unwrap() - .update_progress(&job_id, 100.0); + let _ = job_handle.update_state(JobState::Done); return Ok(entries.into_iter().flatten().collect()); } - - let _ = job_container - .write() - .unwrap() - .update_state(&job_id, State::Failed); Err(anyhow!("Failed for unknown reason.")) } else { Ok(vec![]) @@ -95,6 +81,8 @@ impl Extractor for WebExtractor { #[cfg(test)] mod test { + use crate::util::jobs::JobContainer; + use super::*; use anyhow::Result; use tempfile::TempDir; @@ -104,7 +92,7 @@ mod test { let temp_dir = TempDir::new().unwrap(); let open_result = crate::database::UpEndDatabase::open(&temp_dir, None, true)?; let connection = open_result.db.connection()?; - let job_container = Arc::new(RwLock::new(crate::util::jobs::JobContainer::default())); + let job_container = JobContainer::new(); let address = Address::Url("https://upendproject.net".into()); assert!(WebExtractor.is_needed(&address, &connection)?); diff --git a/src/filesystem.rs b/src/filesystem.rs index 4ea0ca8..27ebf96 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -17,7 +17,7 @@ use crate::database::hierarchies::{ use crate::database::inner::models; use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR}; use crate::util::hash::{Hash, Hashable}; -use crate::util::jobs::{Job, JobContainer, JobId, State}; +use crate::util::jobs::{JobContainer, JobHandle}; use anyhow::{anyhow, Error, Result}; use chrono::prelude::*; use log::{debug, error, info, warn}; @@ -53,39 +53,23 @@ fn initialize_types(connection: &UpEndConnection) -> Result<()> { pub fn rescan_vault>( db: D, - job_container: Arc>, + mut job_container: JobContainer, quick_check: bool, disable_synchronous: bool, ) -> Result> { - let job_id_result = job_container - .write() - .unwrap() - .add_job(Job::new("REIMPORT", "Reimporting vault...")); + let job_result = job_container.add_job("REIMPORT", "Scaning vault directory..."); - match job_id_result { - Ok(job_id) => { - let job_container_rescan = job_container.clone(); - let result = rescan_vault_inner( - db, - job_container_rescan, - job_id, - quick_check, - disable_synchronous, - ); + match job_result { + Ok(job_handle) => { + let result = rescan_vault_inner(db, job_handle, quick_check, disable_synchronous); if let Err(err) = &result { error!("Update did not succeed! {:?}", err); - - job_container - .write() - .unwrap() - .update_state(&job_id, State::Failed) - .unwrap(); } result } - Err(err) => Err(err.into()), + Err(err) => Err(err), } } struct PragmaSynchronousGuard<'a>(&'a UpEndConnection); @@ -115,8 +99,7 @@ pub enum UpdatePathOutcome { fn rescan_vault_inner>( db: D, - job_container: Arc>, - job_id: JobId, + job_handle: JobHandle, quick_check: bool, disable_synchronous: bool, ) -> Result> { @@ -157,6 +140,7 @@ fn rescan_vault_inner>( let count = RwLock::new(0_usize); let resolve_cache = Arc::new(Mutex::new(LruCache::new(256))); let total = path_entries.len() as f32; + let shared_job_handle = Arc::new(Mutex::new(job_handle)); let path_outcomes: Vec = path_entries .into_par_iter() .map(|path| { @@ -171,11 +155,10 @@ fn rescan_vault_inner>( let mut cnt = count.write().unwrap(); *cnt += 1; - job_container - .write() + let _ = shared_job_handle + .lock() .unwrap() - .update_progress(&job_id, *cnt as f32 / total * 100.0) - .unwrap(); + .update_progress(*cnt as f32 / total * 100.0); match result { Ok(result) => result, @@ -482,7 +465,7 @@ fn insert_file_with_metadata( #[cfg(test)] mod test { use crate::database::UpEndDatabase; - use crate::util; + use crate::util::jobs::JobContainer; use super::*; use std::fs::File; @@ -517,12 +500,8 @@ mod test { // Initialize database let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap(); - let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default())); - let job_id = job_container - .write() - .unwrap() - .add_job(util::jobs::Job::new("RESCAN", "TEST JOB")) - .unwrap(); + let mut job_container = JobContainer::new(); + let _job = job_container.add_job("RESCAN", "TEST JOB").unwrap(); // Initial scan let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true); @@ -536,8 +515,7 @@ mod test { // Modification-less rescan - let rescan_result = - rescan_vault_inner(&open_result.db, job_container.clone(), job_id, quick, false); + let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, false); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); @@ -550,8 +528,7 @@ mod test { std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap(); - let rescan_result = - rescan_vault_inner(&open_result.db, job_container, job_id, quick, false); + let rescan_result = rescan_vault(&open_result.db, job_container, quick, false); assert!(rescan_result.is_ok()); let rescan_result = rescan_result.unwrap(); diff --git a/src/main.rs b/src/main.rs index f4e79e5..f17d145 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,12 +12,12 @@ use actix_web::{middleware, App, HttpServer}; use anyhow::Result; use clap::{App as ClapApp, Arg}; use log::{debug, info, warn}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use crate::{ common::{get_static_dir, PKG_VERSION}, database::UpEndDatabase, - util::exec::block_background, + util::{exec::block_background, jobs::JobContainer}, }; mod addressing; @@ -92,7 +92,7 @@ fn main() -> Result<()> { info!("Starting UpEnd {}...", PKG_VERSION); let sys = actix::System::new("upend"); - let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default())); + let job_container = JobContainer::new(); let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap()); diff --git a/src/previews/mod.rs b/src/previews/mod.rs index 7789c5a..02d9c18 100644 --- a/src/previews/mod.rs +++ b/src/previews/mod.rs @@ -1,9 +1,9 @@ use crate::util::hash::Hash; -use crate::util::jobs::{Job, JobContainer, State}; +use crate::util::jobs::{JobContainer, JobState}; use crate::{database::UpEndDatabase, util::hash::b58_encode}; use anyhow::{anyhow, Result}; use log::{debug, trace}; -use std::sync::RwLock; + use std::{ collections::HashMap, fs::File, @@ -58,7 +58,7 @@ impl PreviewStore { &self, hash: Hash, mime_type: S, - job_container: Arc>, + mut job_container: JobContainer, ) -> Result> where S: Into>, @@ -74,14 +74,10 @@ impl PreviewStore { let connection = self.db.connection()?; let files = connection.retrieve_file(&hash)?; if let Some(file) = files.get(0) { - let job_id = job_container - .write() - .unwrap() - .add_job(Job::new( - None, - &format!("Creating preview for {:?}", file.path.file_name().unwrap()), - )) - .unwrap(); + let mut job_handle = job_container.add_job( + None, + &format!("Creating preview for {:?}", file.path.file_name().unwrap()), + )?; let mime_type = mime_type.into(); @@ -108,10 +104,7 @@ impl PreviewStore { Ok(preview) => { trace!("Got preview for {hash:?}."); - let _ = job_container - .write() - .unwrap() - .update_state(&job_id, State::Done); + let _ = job_handle.update_state(JobState::Done); if let Some(data) = preview { std::fs::create_dir_all(&self.path)?; @@ -122,13 +115,7 @@ impl PreviewStore { Ok(None) } } - Err(err) => { - let _ = job_container - .write() - .unwrap() - .update_state(&job_id, State::Failed); - Err(err) - } + Err(err) => Err(err), } } else { Err(anyhow!("Object not found, or is not a file.")) diff --git a/src/routes.rs b/src/routes.rs index 670f5ee..8d0a5ee 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -25,7 +25,7 @@ use std::convert::{TryFrom, TryInto}; use std::fs; use std::io::Write; use std::path::PathBuf; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use std::{collections::HashMap, io}; use tempfile::NamedTempFile; @@ -38,7 +38,7 @@ use is_executable::IsExecutable; pub struct State { pub upend: Arc, pub vault_name: Option, - pub job_container: Arc>, + pub job_container: JobContainer, pub preview_store: Option>, pub desktop_enabled: bool, } @@ -714,12 +714,16 @@ pub async fn get_jobs( state: web::Data, web::Query(query): web::Query, ) -> Result { - let jobs = state.job_container.read().unwrap().get_jobs(); + let jobs = state + .job_container + .get_jobs() + .map_err(ErrorInternalServerError)?; + Ok(HttpResponse::Ok().json(if query.full.is_some() { jobs } else { jobs.into_iter() - .filter(|(_, j)| matches!(j.state, crate::util::jobs::State::InProgress)) + .filter(|(_, j)| matches!(j.state, crate::util::jobs::JobState::InProgress)) .collect() })) } diff --git a/src/util/jobs.rs b/src/util/jobs.rs index ddcaf33..18bce61 100644 --- a/src/util/jobs.rs +++ b/src/util/jobs.rs @@ -1,6 +1,10 @@ use anyhow::{anyhow, Result}; +use log::warn; use serde::{Serialize, Serializer}; -use std::collections::HashMap; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; use uuid::Uuid; #[derive(Default, Serialize, Clone)] @@ -8,69 +12,29 @@ pub struct Job { pub job_type: Option, pub title: String, pub progress: Option, - pub state: State, -} - -impl Job { - pub fn new(job_type: IS, title: S) -> Self - where - S: AsRef, - IS: Into>, - { - Job { - job_type: job_type.into().map(|jt| String::from(jt.as_ref())), - title: String::from(title.as_ref()), - ..Default::default() - } - } -} - -impl Drop for Job { - fn drop(&mut self) { - self.state = State::Failed; - } + pub state: JobState, } pub type JobType = String; #[derive(Serialize, Clone, Copy, PartialEq)] -pub enum State { +pub enum JobState { InProgress, Done, Failed, } -impl Default for State { +impl Default for JobState { fn default() -> Self { - State::InProgress + JobState::InProgress } } #[derive(Default)] -pub struct JobContainer { +pub struct JobContainerData { jobs: HashMap, } -#[derive(Clone, Hash, PartialEq, Eq, Copy)] -pub struct JobId { - uuid: Uuid, -} - -impl From for JobId { - fn from(uuid: Uuid) -> Self { - JobId { uuid } - } -} - -impl Serialize for JobId { - fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> - where - S: Serializer, - { - serializer.serialize_str(format!("{}", self.uuid).as_str()) - } -} - #[derive(Debug, Clone)] pub struct JobInProgessError(String); @@ -82,35 +46,92 @@ impl std::fmt::Display for JobInProgessError { impl std::error::Error for JobInProgessError {} +#[derive(Clone)] +pub struct JobContainer(Arc>); + impl JobContainer { - pub fn add_job(&mut self, job: Job) -> Result { + pub fn new() -> Self { + JobContainer(Arc::new(RwLock::new(JobContainerData::default()))) + } + + pub fn add_job(&mut self, job_type: IS, title: S) -> Result + where + S: AsRef, + IS: Into>, + { + let jobs = &mut self + .0 + .write() + .map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))? + .jobs; + + let job = Job { + job_type: job_type.into().map(|jt| String::from(jt.as_ref())), + title: String::from(title.as_ref()), + ..Default::default() + }; + if let Some(job_type) = &job.job_type { - if self - .jobs + if jobs .iter() - .any(|(_, j)| j.state == State::InProgress && j.job_type == job.job_type) + .any(|(_, j)| j.state == JobState::InProgress && j.job_type == job.job_type) { return Err(JobInProgessError(format!( "Job of type \"{}\" currently in progress.", job_type - ))); + )) + .into()); } } - let uuid = Uuid::new_v4(); - self.jobs.insert(JobId::from(uuid), job); - Ok(JobId::from(uuid)) + let job_id = JobId(Uuid::new_v4()); + jobs.insert(job_id, job); + Ok(JobHandle { + job_id, + container: self.0.clone(), + }) } - pub fn get_jobs(&self) -> HashMap { - self.jobs.clone() - } + pub fn get_jobs(&self) -> Result> { + let jobs = &self + .0 + .read() + .map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))? + .jobs; - pub fn update_progress(&mut self, id: &JobId, progress: f32) -> Result<()> { - if let Some(job) = self.jobs.get_mut(id) { + Ok(jobs.clone()) + } +} + +#[derive(Clone, Hash, PartialEq, Eq, Copy)] +pub struct JobId(Uuid); + +impl Serialize for JobId { + fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> + where + S: Serializer, + { + serializer.serialize_str(format!("{}", self.0).as_str()) + } +} + +pub struct JobHandle { + job_id: JobId, + container: Arc>, +} + +impl JobHandle { + pub fn update_progress(&mut self, progress: f32) -> Result<()> { + let jobs = &mut self + .container + .write() + .map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))? + .jobs; + + if let Some(job) = jobs.get_mut(&self.job_id) { job.progress = Some(progress); if progress >= 100.0 { - job.state = State::Done; + job.state = JobState::Done; } Ok(()) } else { @@ -118,8 +139,14 @@ impl JobContainer { } } - pub fn update_state(&mut self, id: &JobId, state: State) -> Result<()> { - if let Some(job) = self.jobs.get_mut(id) { + pub fn update_state(&mut self, state: JobState) -> Result<()> { + let jobs = &mut self + .container + .write() + .map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))? + .jobs; + + if let Some(job) = jobs.get_mut(&self.job_id) { job.state = state; Ok(()) } else { @@ -127,3 +154,12 @@ impl JobContainer { } } } + +impl Drop for JobHandle { + fn drop(&mut self) { + let update_result = self.update_state(JobState::Failed); + if let Err(err) = update_result { + warn!("Handle dropped, but couldn't set self as failed! {:?}", err); + } + } +}