refactor Jobs and their handling, really fix infinitely hanging jobs

feat/vaults
Tomáš Mládek 2022-03-02 01:14:23 +01:00
parent 1c316427ab
commit ef3a130855
No known key found for this signature in database
GPG Key ID: 65E225C8B3E2ED8A
10 changed files with 194 additions and 221 deletions

View File

@ -1,5 +1,5 @@
use crate::addressing::Address;
use crate::database::entry::{InvariantEntry};
use crate::database::entry::InvariantEntry;
pub const TYPE_TYPE_VAL: &str = "TYPE";
pub const TYPE_BASE_ATTR: &str = "TYPE";

View File

@ -30,8 +30,4 @@ table! {
}
}
allow_tables_to_appear_in_same_query!(
data,
files,
meta,
);
allow_tables_to_appear_in_same_query!(data, files, meta,);

View File

@ -7,10 +7,9 @@ use crate::{
UpEndConnection,
},
filesystem::FILE_MIME_KEY,
util::jobs::{Job, JobContainer, State},
util::jobs::{JobContainer, JobState},
};
use anyhow::{anyhow, Result};
use std::sync::{Arc, RwLock};
pub struct ID3Extractor;
@ -19,7 +18,7 @@ impl Extractor for ID3Extractor {
&self,
address: &Address,
connection: &UpEndConnection,
job_container: Arc<RwLock<JobContainer>>,
mut job_container: JobContainer,
) -> Result<Vec<Entry>> {
if let Address::Hash(hash) = address {
let is_audio = connection.retrieve_object(address)?.iter().any(|e| {
@ -38,22 +37,18 @@ impl Extractor for ID3Extractor {
let files = connection.retrieve_file(hash)?;
if let Some(file) = files.get(0) {
let job_id = job_container
.write()
.unwrap()
.add_job(Job::new(
None,
&format!(
"Getting ID3 info from \"{:}\"",
file.path
.components()
.last()
.unwrap()
.as_os_str()
.to_string_lossy()
),
))
.unwrap();
let mut job_handle = job_container.add_job(
None,
&format!(
"Getting ID3 info from \"{:}\"",
file.path
.components()
.last()
.unwrap()
.as_os_str()
.to_string_lossy()
),
)?;
let tags = id3::Tag::read_from_path(&file.path)?;
@ -79,10 +74,7 @@ impl Extractor for ID3Extractor {
})
.collect();
let _ = job_container
.write()
.unwrap()
.update_state(&job_id, State::Done);
let _ = job_handle.update_state(JobState::Done);
Ok(result)
} else {

View File

@ -1,14 +1,14 @@
use crate::{
addressing::Address,
database::{entry::Entry, UpEndConnection, UpEndDatabase},
util::jobs::{Job, JobContainer},
util::jobs::JobContainer,
};
use anyhow::Result;
use log::{info, trace};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::{
borrow::Borrow,
sync::{Arc, RwLock},
sync::{Arc, Mutex, RwLock},
};
#[cfg(feature = "extractors-web")]
@ -22,7 +22,7 @@ pub trait Extractor {
&self,
address: &Address,
connection: &UpEndConnection,
job_container: Arc<RwLock<JobContainer>>,
job_container: JobContainer,
) -> Result<Vec<Entry>>;
fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result<bool> {
@ -33,7 +33,7 @@ pub trait Extractor {
&self,
address: &Address,
connection: &UpEndConnection,
job_container: Arc<RwLock<JobContainer>>,
job_container: JobContainer,
) -> Result<usize> {
if self.is_needed(address, connection)? {
let entries = self.get(address, connection, job_container)?;
@ -53,57 +53,50 @@ pub trait Extractor {
pub fn extract_all<D: Borrow<UpEndDatabase>>(
db: D,
job_container: Arc<RwLock<JobContainer>>,
mut job_container: JobContainer,
) -> Result<usize> {
info!("Extracting metadata for all addresses.");
let db = db.borrow();
let job_id_result = job_container
.write()
.unwrap()
.add_job(Job::new("EXTRACT_ALL", "Extracting additional metadata..."));
let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?;
match job_id_result {
Ok(job_id) => {
let all_addresses = db.connection()?.get_all_addresses()?;
let total = all_addresses.len() as f32;
let count = RwLock::new(0_usize);
let all_addresses = db.connection()?.get_all_addresses()?;
let total = all_addresses.len() as f32;
let count = RwLock::new(0_usize);
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let result = all_addresses
.par_iter()
.map(|address| {
let connection = db.connection()?;
let extract_result = extract(address, &connection, job_container.clone());
let result = all_addresses
.par_iter()
.map(|address| {
let connection = db.connection()?;
let extract_result = extract(address, &connection, job_container.clone());
let mut cnt = count.write().unwrap();
*cnt += 1;
let mut cnt = count.write().unwrap();
*cnt += 1;
job_container
.write()
.unwrap()
.update_progress(&job_id, *cnt as f32 / total * 100.0)?;
shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0)?;
extract_result
})
.flatten()
.sum();
extract_result
})
.flatten()
.sum();
info!(
"Done extracting metadata; processed {} addresses, added {} entries.",
all_addresses.len(),
result
);
info!(
"Done extracting metadata; processed {} addresses, added {} entries.",
all_addresses.len(),
result
);
Ok(result)
}
Err(err) => Err(err.into()),
}
Ok(result)
}
pub fn extract(
address: &Address,
connection: &UpEndConnection,
job_container: Arc<RwLock<JobContainer>>,
job_container: JobContainer,
) -> Result<usize> {
let mut entry_count = 0;
trace!("Extracting metadata for {address:?}");

View File

@ -2,11 +2,11 @@ use super::Extractor;
use crate::{
addressing::Address,
database::{entry::Entry, UpEndConnection},
util::jobs::{Job, JobContainer, State},
util::jobs::{JobContainer, JobState},
};
use anyhow::anyhow;
use anyhow::Result;
use std::sync::{Arc, RwLock};
use webpage::{Webpage, WebpageOptions};
pub struct WebExtractor;
@ -16,23 +16,17 @@ impl Extractor for WebExtractor {
&self,
address: &Address,
_: &UpEndConnection,
job_container: Arc<RwLock<JobContainer>>,
mut job_container: JobContainer,
) -> anyhow::Result<Vec<Entry>> {
if let Address::Url(url) = address {
let job_id = job_container
.write()
.unwrap()
.add_job(Job::new(None, &format!("Getting info about {url:?}")))
.unwrap();
let mut job_handle =
job_container.add_job(None, &format!("Getting info about {url:?}"))?;
let webpage_url = url.clone();
let webpage_get = Webpage::from_url(&webpage_url, WebpageOptions::default());
if let Ok(webpage) = webpage_get {
let _ = job_container
.write()
.unwrap()
.update_progress(&job_id, 50.0);
let _ = job_handle.update_progress(50.0);
let mut entries = vec![
webpage.html.title.map(|html_title| Entry {
@ -61,18 +55,10 @@ impl Extractor for WebExtractor {
}))
}
let _ = job_container
.write()
.unwrap()
.update_progress(&job_id, 100.0);
let _ = job_handle.update_state(JobState::Done);
return Ok(entries.into_iter().flatten().collect());
}
let _ = job_container
.write()
.unwrap()
.update_state(&job_id, State::Failed);
Err(anyhow!("Failed for unknown reason."))
} else {
Ok(vec![])
@ -95,6 +81,8 @@ impl Extractor for WebExtractor {
#[cfg(test)]
mod test {
use crate::util::jobs::JobContainer;
use super::*;
use anyhow::Result;
use tempfile::TempDir;
@ -104,7 +92,7 @@ mod test {
let temp_dir = TempDir::new().unwrap();
let open_result = crate::database::UpEndDatabase::open(&temp_dir, None, true)?;
let connection = open_result.db.connection()?;
let job_container = Arc::new(RwLock::new(crate::util::jobs::JobContainer::default()));
let job_container = JobContainer::new();
let address = Address::Url("https://upendproject.net".into());
assert!(WebExtractor.is_needed(&address, &connection)?);

View File

@ -17,7 +17,7 @@ use crate::database::hierarchies::{
use crate::database::inner::models;
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
use crate::util::hash::{Hash, Hashable};
use crate::util::jobs::{Job, JobContainer, JobId, State};
use crate::util::jobs::{JobContainer, JobHandle};
use anyhow::{anyhow, Error, Result};
use chrono::prelude::*;
use log::{debug, error, info, warn};
@ -53,39 +53,23 @@ fn initialize_types(connection: &UpEndConnection) -> Result<()> {
pub fn rescan_vault<D: Borrow<UpEndDatabase>>(
db: D,
job_container: Arc<RwLock<JobContainer>>,
mut job_container: JobContainer,
quick_check: bool,
disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
let job_id_result = job_container
.write()
.unwrap()
.add_job(Job::new("REIMPORT", "Reimporting vault..."));
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
match job_id_result {
Ok(job_id) => {
let job_container_rescan = job_container.clone();
let result = rescan_vault_inner(
db,
job_container_rescan,
job_id,
quick_check,
disable_synchronous,
);
match job_result {
Ok(job_handle) => {
let result = rescan_vault_inner(db, job_handle, quick_check, disable_synchronous);
if let Err(err) = &result {
error!("Update did not succeed! {:?}", err);
job_container
.write()
.unwrap()
.update_state(&job_id, State::Failed)
.unwrap();
}
result
}
Err(err) => Err(err.into()),
Err(err) => Err(err),
}
}
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
@ -115,8 +99,7 @@ pub enum UpdatePathOutcome {
fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
db: D,
job_container: Arc<RwLock<JobContainer>>,
job_id: JobId,
job_handle: JobHandle,
quick_check: bool,
disable_synchronous: bool,
) -> Result<Vec<UpdatePathOutcome>> {
@ -157,6 +140,7 @@ fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
let count = RwLock::new(0_usize);
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
let total = path_entries.len() as f32;
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
.into_par_iter()
.map(|path| {
@ -171,11 +155,10 @@ fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
let mut cnt = count.write().unwrap();
*cnt += 1;
job_container
.write()
let _ = shared_job_handle
.lock()
.unwrap()
.update_progress(&job_id, *cnt as f32 / total * 100.0)
.unwrap();
.update_progress(*cnt as f32 / total * 100.0);
match result {
Ok(result) => result,
@ -482,7 +465,7 @@ fn insert_file_with_metadata(
#[cfg(test)]
mod test {
use crate::database::UpEndDatabase;
use crate::util;
use crate::util::jobs::JobContainer;
use super::*;
use std::fs::File;
@ -517,12 +500,8 @@ mod test {
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default()));
let job_id = job_container
.write()
.unwrap()
.add_job(util::jobs::Job::new("RESCAN", "TEST JOB"))
.unwrap();
let mut job_container = JobContainer::new();
let _job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
// Initial scan
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true);
@ -536,8 +515,7 @@ mod test {
// Modification-less rescan
let rescan_result =
rescan_vault_inner(&open_result.db, job_container.clone(), job_id, quick, false);
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();
@ -550,8 +528,7 @@ mod test {
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
let rescan_result =
rescan_vault_inner(&open_result.db, job_container, job_id, quick, false);
let rescan_result = rescan_vault(&open_result.db, job_container, quick, false);
assert!(rescan_result.is_ok());
let rescan_result = rescan_result.unwrap();

View File

@ -12,12 +12,12 @@ use actix_web::{middleware, App, HttpServer};
use anyhow::Result;
use clap::{App as ClapApp, Arg};
use log::{debug, info, warn};
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use crate::{
common::{get_static_dir, PKG_VERSION},
database::UpEndDatabase,
util::exec::block_background,
util::{exec::block_background, jobs::JobContainer},
};
mod addressing;
@ -92,7 +92,7 @@ fn main() -> Result<()> {
info!("Starting UpEnd {}...", PKG_VERSION);
let sys = actix::System::new("upend");
let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default()));
let job_container = JobContainer::new();
let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap());

View File

@ -1,9 +1,9 @@
use crate::util::hash::Hash;
use crate::util::jobs::{Job, JobContainer, State};
use crate::util::jobs::{JobContainer, JobState};
use crate::{database::UpEndDatabase, util::hash::b58_encode};
use anyhow::{anyhow, Result};
use log::{debug, trace};
use std::sync::RwLock;
use std::{
collections::HashMap,
fs::File,
@ -58,7 +58,7 @@ impl PreviewStore {
&self,
hash: Hash,
mime_type: S,
job_container: Arc<RwLock<JobContainer>>,
mut job_container: JobContainer,
) -> Result<Option<PathBuf>>
where
S: Into<Option<String>>,
@ -74,14 +74,10 @@ impl PreviewStore {
let connection = self.db.connection()?;
let files = connection.retrieve_file(&hash)?;
if let Some(file) = files.get(0) {
let job_id = job_container
.write()
.unwrap()
.add_job(Job::new(
None,
&format!("Creating preview for {:?}", file.path.file_name().unwrap()),
))
.unwrap();
let mut job_handle = job_container.add_job(
None,
&format!("Creating preview for {:?}", file.path.file_name().unwrap()),
)?;
let mime_type = mime_type.into();
@ -108,10 +104,7 @@ impl PreviewStore {
Ok(preview) => {
trace!("Got preview for {hash:?}.");
let _ = job_container
.write()
.unwrap()
.update_state(&job_id, State::Done);
let _ = job_handle.update_state(JobState::Done);
if let Some(data) = preview {
std::fs::create_dir_all(&self.path)?;
@ -122,13 +115,7 @@ impl PreviewStore {
Ok(None)
}
}
Err(err) => {
let _ = job_container
.write()
.unwrap()
.update_state(&job_id, State::Failed);
Err(err)
}
Err(err) => Err(err),
}
} else {
Err(anyhow!("Object not found, or is not a file."))

View File

@ -25,7 +25,7 @@ use std::convert::{TryFrom, TryInto};
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, io};
use tempfile::NamedTempFile;
@ -38,7 +38,7 @@ use is_executable::IsExecutable;
pub struct State {
pub upend: Arc<UpEndDatabase>,
pub vault_name: Option<String>,
pub job_container: Arc<RwLock<JobContainer>>,
pub job_container: JobContainer,
pub preview_store: Option<Arc<PreviewStore>>,
pub desktop_enabled: bool,
}
@ -714,12 +714,16 @@ pub async fn get_jobs(
state: web::Data<State>,
web::Query(query): web::Query<JobsRequest>,
) -> Result<HttpResponse, Error> {
let jobs = state.job_container.read().unwrap().get_jobs();
let jobs = state
.job_container
.get_jobs()
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(if query.full.is_some() {
jobs
} else {
jobs.into_iter()
.filter(|(_, j)| matches!(j.state, crate::util::jobs::State::InProgress))
.filter(|(_, j)| matches!(j.state, crate::util::jobs::JobState::InProgress))
.collect()
}))
}

View File

@ -1,6 +1,10 @@
use anyhow::{anyhow, Result};
use log::warn;
use serde::{Serialize, Serializer};
use std::collections::HashMap;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use uuid::Uuid;
#[derive(Default, Serialize, Clone)]
@ -8,69 +12,29 @@ pub struct Job {
pub job_type: Option<JobType>,
pub title: String,
pub progress: Option<f32>,
pub state: State,
}
impl Job {
pub fn new<S, IS>(job_type: IS, title: S) -> Self
where
S: AsRef<str>,
IS: Into<Option<S>>,
{
Job {
job_type: job_type.into().map(|jt| String::from(jt.as_ref())),
title: String::from(title.as_ref()),
..Default::default()
}
}
}
impl Drop for Job {
fn drop(&mut self) {
self.state = State::Failed;
}
pub state: JobState,
}
pub type JobType = String;
#[derive(Serialize, Clone, Copy, PartialEq)]
pub enum State {
pub enum JobState {
InProgress,
Done,
Failed,
}
impl Default for State {
impl Default for JobState {
fn default() -> Self {
State::InProgress
JobState::InProgress
}
}
#[derive(Default)]
pub struct JobContainer {
pub struct JobContainerData {
jobs: HashMap<JobId, Job>,
}
#[derive(Clone, Hash, PartialEq, Eq, Copy)]
pub struct JobId {
uuid: Uuid,
}
impl From<Uuid> for JobId {
fn from(uuid: Uuid) -> Self {
JobId { uuid }
}
}
impl Serialize for JobId {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
serializer.serialize_str(format!("{}", self.uuid).as_str())
}
}
#[derive(Debug, Clone)]
pub struct JobInProgessError(String);
@ -82,35 +46,92 @@ impl std::fmt::Display for JobInProgessError {
impl std::error::Error for JobInProgessError {}
#[derive(Clone)]
pub struct JobContainer(Arc<RwLock<JobContainerData>>);
impl JobContainer {
pub fn add_job(&mut self, job: Job) -> Result<JobId, JobInProgessError> {
pub fn new() -> Self {
JobContainer(Arc::new(RwLock::new(JobContainerData::default())))
}
pub fn add_job<S, IS>(&mut self, job_type: IS, title: S) -> Result<JobHandle>
where
S: AsRef<str>,
IS: Into<Option<S>>,
{
let jobs = &mut self
.0
.write()
.map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))?
.jobs;
let job = Job {
job_type: job_type.into().map(|jt| String::from(jt.as_ref())),
title: String::from(title.as_ref()),
..Default::default()
};
if let Some(job_type) = &job.job_type {
if self
.jobs
if jobs
.iter()
.any(|(_, j)| j.state == State::InProgress && j.job_type == job.job_type)
.any(|(_, j)| j.state == JobState::InProgress && j.job_type == job.job_type)
{
return Err(JobInProgessError(format!(
"Job of type \"{}\" currently in progress.",
job_type
)));
))
.into());
}
}
let uuid = Uuid::new_v4();
self.jobs.insert(JobId::from(uuid), job);
Ok(JobId::from(uuid))
let job_id = JobId(Uuid::new_v4());
jobs.insert(job_id, job);
Ok(JobHandle {
job_id,
container: self.0.clone(),
})
}
pub fn get_jobs(&self) -> HashMap<JobId, Job> {
self.jobs.clone()
}
pub fn get_jobs(&self) -> Result<HashMap<JobId, Job>> {
let jobs = &self
.0
.read()
.map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))?
.jobs;
pub fn update_progress(&mut self, id: &JobId, progress: f32) -> Result<()> {
if let Some(job) = self.jobs.get_mut(id) {
Ok(jobs.clone())
}
}
#[derive(Clone, Hash, PartialEq, Eq, Copy)]
pub struct JobId(Uuid);
impl Serialize for JobId {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
serializer.serialize_str(format!("{}", self.0).as_str())
}
}
pub struct JobHandle {
job_id: JobId,
container: Arc<RwLock<JobContainerData>>,
}
impl JobHandle {
pub fn update_progress(&mut self, progress: f32) -> Result<()> {
let jobs = &mut self
.container
.write()
.map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))?
.jobs;
if let Some(job) = jobs.get_mut(&self.job_id) {
job.progress = Some(progress);
if progress >= 100.0 {
job.state = State::Done;
job.state = JobState::Done;
}
Ok(())
} else {
@ -118,8 +139,14 @@ impl JobContainer {
}
}
pub fn update_state(&mut self, id: &JobId, state: State) -> Result<()> {
if let Some(job) = self.jobs.get_mut(id) {
pub fn update_state(&mut self, state: JobState) -> Result<()> {
let jobs = &mut self
.container
.write()
.map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))?
.jobs;
if let Some(job) = jobs.get_mut(&self.job_id) {
job.state = state;
Ok(())
} else {
@ -127,3 +154,12 @@ impl JobContainer {
}
}
}
impl Drop for JobHandle {
fn drop(&mut self) {
let update_result = self.update_state(JobState::Failed);
if let Err(err) = update_result {
warn!("Handle dropped, but couldn't set self as failed! {:?}", err);
}
}
}