refactor Jobs and their handling, really fix infinitely hanging jobs
parent
1c316427ab
commit
ef3a130855
|
@ -1,5 +1,5 @@
|
|||
use crate::addressing::Address;
|
||||
use crate::database::entry::{InvariantEntry};
|
||||
use crate::database::entry::InvariantEntry;
|
||||
|
||||
pub const TYPE_TYPE_VAL: &str = "TYPE";
|
||||
pub const TYPE_BASE_ATTR: &str = "TYPE";
|
||||
|
|
|
@ -30,8 +30,4 @@ table! {
|
|||
}
|
||||
}
|
||||
|
||||
allow_tables_to_appear_in_same_query!(
|
||||
data,
|
||||
files,
|
||||
meta,
|
||||
);
|
||||
allow_tables_to_appear_in_same_query!(data, files, meta,);
|
||||
|
|
|
@ -7,10 +7,9 @@ use crate::{
|
|||
UpEndConnection,
|
||||
},
|
||||
filesystem::FILE_MIME_KEY,
|
||||
util::jobs::{Job, JobContainer, State},
|
||||
util::jobs::{JobContainer, JobState},
|
||||
};
|
||||
use anyhow::{anyhow, Result};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
pub struct ID3Extractor;
|
||||
|
||||
|
@ -19,7 +18,7 @@ impl Extractor for ID3Extractor {
|
|||
&self,
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
mut job_container: JobContainer,
|
||||
) -> Result<Vec<Entry>> {
|
||||
if let Address::Hash(hash) = address {
|
||||
let is_audio = connection.retrieve_object(address)?.iter().any(|e| {
|
||||
|
@ -38,22 +37,18 @@ impl Extractor for ID3Extractor {
|
|||
let files = connection.retrieve_file(hash)?;
|
||||
|
||||
if let Some(file) = files.get(0) {
|
||||
let job_id = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_job(Job::new(
|
||||
None,
|
||||
&format!(
|
||||
"Getting ID3 info from \"{:}\"",
|
||||
file.path
|
||||
.components()
|
||||
.last()
|
||||
.unwrap()
|
||||
.as_os_str()
|
||||
.to_string_lossy()
|
||||
),
|
||||
))
|
||||
.unwrap();
|
||||
let mut job_handle = job_container.add_job(
|
||||
None,
|
||||
&format!(
|
||||
"Getting ID3 info from \"{:}\"",
|
||||
file.path
|
||||
.components()
|
||||
.last()
|
||||
.unwrap()
|
||||
.as_os_str()
|
||||
.to_string_lossy()
|
||||
),
|
||||
)?;
|
||||
|
||||
let tags = id3::Tag::read_from_path(&file.path)?;
|
||||
|
||||
|
@ -79,10 +74,7 @@ impl Extractor for ID3Extractor {
|
|||
})
|
||||
.collect();
|
||||
|
||||
let _ = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_state(&job_id, State::Done);
|
||||
let _ = job_handle.update_state(JobState::Done);
|
||||
|
||||
Ok(result)
|
||||
} else {
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
use crate::{
|
||||
addressing::Address,
|
||||
database::{entry::Entry, UpEndConnection, UpEndDatabase},
|
||||
util::jobs::{Job, JobContainer},
|
||||
util::jobs::JobContainer,
|
||||
};
|
||||
use anyhow::Result;
|
||||
use log::{info, trace};
|
||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
sync::{Arc, RwLock},
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
};
|
||||
|
||||
#[cfg(feature = "extractors-web")]
|
||||
|
@ -22,7 +22,7 @@ pub trait Extractor {
|
|||
&self,
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
job_container: JobContainer,
|
||||
) -> Result<Vec<Entry>>;
|
||||
|
||||
fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result<bool> {
|
||||
|
@ -33,7 +33,7 @@ pub trait Extractor {
|
|||
&self,
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
job_container: JobContainer,
|
||||
) -> Result<usize> {
|
||||
if self.is_needed(address, connection)? {
|
||||
let entries = self.get(address, connection, job_container)?;
|
||||
|
@ -53,57 +53,50 @@ pub trait Extractor {
|
|||
|
||||
pub fn extract_all<D: Borrow<UpEndDatabase>>(
|
||||
db: D,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
mut job_container: JobContainer,
|
||||
) -> Result<usize> {
|
||||
info!("Extracting metadata for all addresses.");
|
||||
|
||||
let db = db.borrow();
|
||||
let job_id_result = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_job(Job::new("EXTRACT_ALL", "Extracting additional metadata..."));
|
||||
let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?;
|
||||
|
||||
match job_id_result {
|
||||
Ok(job_id) => {
|
||||
let all_addresses = db.connection()?.get_all_addresses()?;
|
||||
let total = all_addresses.len() as f32;
|
||||
let count = RwLock::new(0_usize);
|
||||
let all_addresses = db.connection()?.get_all_addresses()?;
|
||||
let total = all_addresses.len() as f32;
|
||||
let count = RwLock::new(0_usize);
|
||||
let shared_job_handle = Arc::new(Mutex::new(job_handle));
|
||||
|
||||
let result = all_addresses
|
||||
.par_iter()
|
||||
.map(|address| {
|
||||
let connection = db.connection()?;
|
||||
let extract_result = extract(address, &connection, job_container.clone());
|
||||
let result = all_addresses
|
||||
.par_iter()
|
||||
.map(|address| {
|
||||
let connection = db.connection()?;
|
||||
let extract_result = extract(address, &connection, job_container.clone());
|
||||
|
||||
let mut cnt = count.write().unwrap();
|
||||
*cnt += 1;
|
||||
let mut cnt = count.write().unwrap();
|
||||
*cnt += 1;
|
||||
|
||||
job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_progress(&job_id, *cnt as f32 / total * 100.0)?;
|
||||
shared_job_handle
|
||||
.lock()
|
||||
.unwrap()
|
||||
.update_progress(*cnt as f32 / total * 100.0)?;
|
||||
|
||||
extract_result
|
||||
})
|
||||
.flatten()
|
||||
.sum();
|
||||
extract_result
|
||||
})
|
||||
.flatten()
|
||||
.sum();
|
||||
|
||||
info!(
|
||||
"Done extracting metadata; processed {} addresses, added {} entries.",
|
||||
all_addresses.len(),
|
||||
result
|
||||
);
|
||||
info!(
|
||||
"Done extracting metadata; processed {} addresses, added {} entries.",
|
||||
all_addresses.len(),
|
||||
result
|
||||
);
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn extract(
|
||||
address: &Address,
|
||||
connection: &UpEndConnection,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
job_container: JobContainer,
|
||||
) -> Result<usize> {
|
||||
let mut entry_count = 0;
|
||||
trace!("Extracting metadata for {address:?}");
|
||||
|
|
|
@ -2,11 +2,11 @@ use super::Extractor;
|
|||
use crate::{
|
||||
addressing::Address,
|
||||
database::{entry::Entry, UpEndConnection},
|
||||
util::jobs::{Job, JobContainer, State},
|
||||
util::jobs::{JobContainer, JobState},
|
||||
};
|
||||
use anyhow::anyhow;
|
||||
use anyhow::Result;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use webpage::{Webpage, WebpageOptions};
|
||||
|
||||
pub struct WebExtractor;
|
||||
|
@ -16,23 +16,17 @@ impl Extractor for WebExtractor {
|
|||
&self,
|
||||
address: &Address,
|
||||
_: &UpEndConnection,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
mut job_container: JobContainer,
|
||||
) -> anyhow::Result<Vec<Entry>> {
|
||||
if let Address::Url(url) = address {
|
||||
let job_id = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_job(Job::new(None, &format!("Getting info about {url:?}")))
|
||||
.unwrap();
|
||||
let mut job_handle =
|
||||
job_container.add_job(None, &format!("Getting info about {url:?}"))?;
|
||||
|
||||
let webpage_url = url.clone();
|
||||
let webpage_get = Webpage::from_url(&webpage_url, WebpageOptions::default());
|
||||
|
||||
if let Ok(webpage) = webpage_get {
|
||||
let _ = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_progress(&job_id, 50.0);
|
||||
let _ = job_handle.update_progress(50.0);
|
||||
|
||||
let mut entries = vec![
|
||||
webpage.html.title.map(|html_title| Entry {
|
||||
|
@ -61,18 +55,10 @@ impl Extractor for WebExtractor {
|
|||
}))
|
||||
}
|
||||
|
||||
let _ = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_progress(&job_id, 100.0);
|
||||
let _ = job_handle.update_state(JobState::Done);
|
||||
|
||||
return Ok(entries.into_iter().flatten().collect());
|
||||
}
|
||||
|
||||
let _ = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_state(&job_id, State::Failed);
|
||||
Err(anyhow!("Failed for unknown reason."))
|
||||
} else {
|
||||
Ok(vec![])
|
||||
|
@ -95,6 +81,8 @@ impl Extractor for WebExtractor {
|
|||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::util::jobs::JobContainer;
|
||||
|
||||
use super::*;
|
||||
use anyhow::Result;
|
||||
use tempfile::TempDir;
|
||||
|
@ -104,7 +92,7 @@ mod test {
|
|||
let temp_dir = TempDir::new().unwrap();
|
||||
let open_result = crate::database::UpEndDatabase::open(&temp_dir, None, true)?;
|
||||
let connection = open_result.db.connection()?;
|
||||
let job_container = Arc::new(RwLock::new(crate::util::jobs::JobContainer::default()));
|
||||
let job_container = JobContainer::new();
|
||||
|
||||
let address = Address::Url("https://upendproject.net".into());
|
||||
assert!(WebExtractor.is_needed(&address, &connection)?);
|
||||
|
|
|
@ -17,7 +17,7 @@ use crate::database::hierarchies::{
|
|||
use crate::database::inner::models;
|
||||
use crate::database::{UpEndConnection, UpEndDatabase, UPEND_SUBDIR};
|
||||
use crate::util::hash::{Hash, Hashable};
|
||||
use crate::util::jobs::{Job, JobContainer, JobId, State};
|
||||
use crate::util::jobs::{JobContainer, JobHandle};
|
||||
use anyhow::{anyhow, Error, Result};
|
||||
use chrono::prelude::*;
|
||||
use log::{debug, error, info, warn};
|
||||
|
@ -53,39 +53,23 @@ fn initialize_types(connection: &UpEndConnection) -> Result<()> {
|
|||
|
||||
pub fn rescan_vault<D: Borrow<UpEndDatabase>>(
|
||||
db: D,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
mut job_container: JobContainer,
|
||||
quick_check: bool,
|
||||
disable_synchronous: bool,
|
||||
) -> Result<Vec<UpdatePathOutcome>> {
|
||||
let job_id_result = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_job(Job::new("REIMPORT", "Reimporting vault..."));
|
||||
let job_result = job_container.add_job("REIMPORT", "Scaning vault directory...");
|
||||
|
||||
match job_id_result {
|
||||
Ok(job_id) => {
|
||||
let job_container_rescan = job_container.clone();
|
||||
let result = rescan_vault_inner(
|
||||
db,
|
||||
job_container_rescan,
|
||||
job_id,
|
||||
quick_check,
|
||||
disable_synchronous,
|
||||
);
|
||||
match job_result {
|
||||
Ok(job_handle) => {
|
||||
let result = rescan_vault_inner(db, job_handle, quick_check, disable_synchronous);
|
||||
|
||||
if let Err(err) = &result {
|
||||
error!("Update did not succeed! {:?}", err);
|
||||
|
||||
job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_state(&job_id, State::Failed)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
Err(err) => Err(err.into()),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
struct PragmaSynchronousGuard<'a>(&'a UpEndConnection);
|
||||
|
@ -115,8 +99,7 @@ pub enum UpdatePathOutcome {
|
|||
|
||||
fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
|
||||
db: D,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
job_id: JobId,
|
||||
job_handle: JobHandle,
|
||||
quick_check: bool,
|
||||
disable_synchronous: bool,
|
||||
) -> Result<Vec<UpdatePathOutcome>> {
|
||||
|
@ -157,6 +140,7 @@ fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
|
|||
let count = RwLock::new(0_usize);
|
||||
let resolve_cache = Arc::new(Mutex::new(LruCache::new(256)));
|
||||
let total = path_entries.len() as f32;
|
||||
let shared_job_handle = Arc::new(Mutex::new(job_handle));
|
||||
let path_outcomes: Vec<UpdatePathOutcome> = path_entries
|
||||
.into_par_iter()
|
||||
.map(|path| {
|
||||
|
@ -171,11 +155,10 @@ fn rescan_vault_inner<D: Borrow<UpEndDatabase>>(
|
|||
let mut cnt = count.write().unwrap();
|
||||
*cnt += 1;
|
||||
|
||||
job_container
|
||||
.write()
|
||||
let _ = shared_job_handle
|
||||
.lock()
|
||||
.unwrap()
|
||||
.update_progress(&job_id, *cnt as f32 / total * 100.0)
|
||||
.unwrap();
|
||||
.update_progress(*cnt as f32 / total * 100.0);
|
||||
|
||||
match result {
|
||||
Ok(result) => result,
|
||||
|
@ -482,7 +465,7 @@ fn insert_file_with_metadata(
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::database::UpEndDatabase;
|
||||
use crate::util;
|
||||
use crate::util::jobs::JobContainer;
|
||||
|
||||
use super::*;
|
||||
use std::fs::File;
|
||||
|
@ -517,12 +500,8 @@ mod test {
|
|||
// Initialize database
|
||||
|
||||
let open_result = UpEndDatabase::open(&temp_dir, None, true).unwrap();
|
||||
let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default()));
|
||||
let job_id = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_job(util::jobs::Job::new("RESCAN", "TEST JOB"))
|
||||
.unwrap();
|
||||
let mut job_container = JobContainer::new();
|
||||
let _job = job_container.add_job("RESCAN", "TEST JOB").unwrap();
|
||||
|
||||
// Initial scan
|
||||
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, true);
|
||||
|
@ -536,8 +515,7 @@ mod test {
|
|||
|
||||
// Modification-less rescan
|
||||
|
||||
let rescan_result =
|
||||
rescan_vault_inner(&open_result.db, job_container.clone(), job_id, quick, false);
|
||||
let rescan_result = rescan_vault(&open_result.db, job_container.clone(), quick, false);
|
||||
|
||||
assert!(rescan_result.is_ok());
|
||||
let rescan_result = rescan_result.unwrap();
|
||||
|
@ -550,8 +528,7 @@ mod test {
|
|||
|
||||
std::fs::remove_file(temp_dir.path().join("hello-world.txt")).unwrap();
|
||||
|
||||
let rescan_result =
|
||||
rescan_vault_inner(&open_result.db, job_container, job_id, quick, false);
|
||||
let rescan_result = rescan_vault(&open_result.db, job_container, quick, false);
|
||||
|
||||
assert!(rescan_result.is_ok());
|
||||
let rescan_result = rescan_result.unwrap();
|
||||
|
|
|
@ -12,12 +12,12 @@ use actix_web::{middleware, App, HttpServer};
|
|||
use anyhow::Result;
|
||||
use clap::{App as ClapApp, Arg};
|
||||
use log::{debug, info, warn};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
common::{get_static_dir, PKG_VERSION},
|
||||
database::UpEndDatabase,
|
||||
util::exec::block_background,
|
||||
util::{exec::block_background, jobs::JobContainer},
|
||||
};
|
||||
|
||||
mod addressing;
|
||||
|
@ -92,7 +92,7 @@ fn main() -> Result<()> {
|
|||
info!("Starting UpEnd {}...", PKG_VERSION);
|
||||
let sys = actix::System::new("upend");
|
||||
|
||||
let job_container = Arc::new(RwLock::new(util::jobs::JobContainer::default()));
|
||||
let job_container = JobContainer::new();
|
||||
|
||||
let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap());
|
||||
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use crate::util::hash::Hash;
|
||||
use crate::util::jobs::{Job, JobContainer, State};
|
||||
use crate::util::jobs::{JobContainer, JobState};
|
||||
use crate::{database::UpEndDatabase, util::hash::b58_encode};
|
||||
use anyhow::{anyhow, Result};
|
||||
use log::{debug, trace};
|
||||
use std::sync::RwLock;
|
||||
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fs::File,
|
||||
|
@ -58,7 +58,7 @@ impl PreviewStore {
|
|||
&self,
|
||||
hash: Hash,
|
||||
mime_type: S,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
mut job_container: JobContainer,
|
||||
) -> Result<Option<PathBuf>>
|
||||
where
|
||||
S: Into<Option<String>>,
|
||||
|
@ -74,14 +74,10 @@ impl PreviewStore {
|
|||
let connection = self.db.connection()?;
|
||||
let files = connection.retrieve_file(&hash)?;
|
||||
if let Some(file) = files.get(0) {
|
||||
let job_id = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_job(Job::new(
|
||||
None,
|
||||
&format!("Creating preview for {:?}", file.path.file_name().unwrap()),
|
||||
))
|
||||
.unwrap();
|
||||
let mut job_handle = job_container.add_job(
|
||||
None,
|
||||
&format!("Creating preview for {:?}", file.path.file_name().unwrap()),
|
||||
)?;
|
||||
|
||||
let mime_type = mime_type.into();
|
||||
|
||||
|
@ -108,10 +104,7 @@ impl PreviewStore {
|
|||
Ok(preview) => {
|
||||
trace!("Got preview for {hash:?}.");
|
||||
|
||||
let _ = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_state(&job_id, State::Done);
|
||||
let _ = job_handle.update_state(JobState::Done);
|
||||
|
||||
if let Some(data) = preview {
|
||||
std::fs::create_dir_all(&self.path)?;
|
||||
|
@ -122,13 +115,7 @@ impl PreviewStore {
|
|||
Ok(None)
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
let _ = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_state(&job_id, State::Failed);
|
||||
Err(err)
|
||||
}
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
} else {
|
||||
Err(anyhow!("Object not found, or is not a file."))
|
||||
|
|
|
@ -25,7 +25,7 @@ use std::convert::{TryFrom, TryInto};
|
|||
use std::fs;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::Arc;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use std::{collections::HashMap, io};
|
||||
use tempfile::NamedTempFile;
|
||||
|
@ -38,7 +38,7 @@ use is_executable::IsExecutable;
|
|||
pub struct State {
|
||||
pub upend: Arc<UpEndDatabase>,
|
||||
pub vault_name: Option<String>,
|
||||
pub job_container: Arc<RwLock<JobContainer>>,
|
||||
pub job_container: JobContainer,
|
||||
pub preview_store: Option<Arc<PreviewStore>>,
|
||||
pub desktop_enabled: bool,
|
||||
}
|
||||
|
@ -714,12 +714,16 @@ pub async fn get_jobs(
|
|||
state: web::Data<State>,
|
||||
web::Query(query): web::Query<JobsRequest>,
|
||||
) -> Result<HttpResponse, Error> {
|
||||
let jobs = state.job_container.read().unwrap().get_jobs();
|
||||
let jobs = state
|
||||
.job_container
|
||||
.get_jobs()
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
Ok(HttpResponse::Ok().json(if query.full.is_some() {
|
||||
jobs
|
||||
} else {
|
||||
jobs.into_iter()
|
||||
.filter(|(_, j)| matches!(j.state, crate::util::jobs::State::InProgress))
|
||||
.filter(|(_, j)| matches!(j.state, crate::util::jobs::JobState::InProgress))
|
||||
.collect()
|
||||
}))
|
||||
}
|
||||
|
|
160
src/util/jobs.rs
160
src/util/jobs.rs
|
@ -1,6 +1,10 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use log::warn;
|
||||
use serde::{Serialize, Serializer};
|
||||
use std::collections::HashMap;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Default, Serialize, Clone)]
|
||||
|
@ -8,69 +12,29 @@ pub struct Job {
|
|||
pub job_type: Option<JobType>,
|
||||
pub title: String,
|
||||
pub progress: Option<f32>,
|
||||
pub state: State,
|
||||
}
|
||||
|
||||
impl Job {
|
||||
pub fn new<S, IS>(job_type: IS, title: S) -> Self
|
||||
where
|
||||
S: AsRef<str>,
|
||||
IS: Into<Option<S>>,
|
||||
{
|
||||
Job {
|
||||
job_type: job_type.into().map(|jt| String::from(jt.as_ref())),
|
||||
title: String::from(title.as_ref()),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Job {
|
||||
fn drop(&mut self) {
|
||||
self.state = State::Failed;
|
||||
}
|
||||
pub state: JobState,
|
||||
}
|
||||
|
||||
pub type JobType = String;
|
||||
|
||||
#[derive(Serialize, Clone, Copy, PartialEq)]
|
||||
pub enum State {
|
||||
pub enum JobState {
|
||||
InProgress,
|
||||
Done,
|
||||
Failed,
|
||||
}
|
||||
|
||||
impl Default for State {
|
||||
impl Default for JobState {
|
||||
fn default() -> Self {
|
||||
State::InProgress
|
||||
JobState::InProgress
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct JobContainer {
|
||||
pub struct JobContainerData {
|
||||
jobs: HashMap<JobId, Job>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Hash, PartialEq, Eq, Copy)]
|
||||
pub struct JobId {
|
||||
uuid: Uuid,
|
||||
}
|
||||
|
||||
impl From<Uuid> for JobId {
|
||||
fn from(uuid: Uuid) -> Self {
|
||||
JobId { uuid }
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for JobId {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_str(format!("{}", self.uuid).as_str())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct JobInProgessError(String);
|
||||
|
||||
|
@ -82,35 +46,92 @@ impl std::fmt::Display for JobInProgessError {
|
|||
|
||||
impl std::error::Error for JobInProgessError {}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct JobContainer(Arc<RwLock<JobContainerData>>);
|
||||
|
||||
impl JobContainer {
|
||||
pub fn add_job(&mut self, job: Job) -> Result<JobId, JobInProgessError> {
|
||||
pub fn new() -> Self {
|
||||
JobContainer(Arc::new(RwLock::new(JobContainerData::default())))
|
||||
}
|
||||
|
||||
pub fn add_job<S, IS>(&mut self, job_type: IS, title: S) -> Result<JobHandle>
|
||||
where
|
||||
S: AsRef<str>,
|
||||
IS: Into<Option<S>>,
|
||||
{
|
||||
let jobs = &mut self
|
||||
.0
|
||||
.write()
|
||||
.map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))?
|
||||
.jobs;
|
||||
|
||||
let job = Job {
|
||||
job_type: job_type.into().map(|jt| String::from(jt.as_ref())),
|
||||
title: String::from(title.as_ref()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if let Some(job_type) = &job.job_type {
|
||||
if self
|
||||
.jobs
|
||||
if jobs
|
||||
.iter()
|
||||
.any(|(_, j)| j.state == State::InProgress && j.job_type == job.job_type)
|
||||
.any(|(_, j)| j.state == JobState::InProgress && j.job_type == job.job_type)
|
||||
{
|
||||
return Err(JobInProgessError(format!(
|
||||
"Job of type \"{}\" currently in progress.",
|
||||
job_type
|
||||
)));
|
||||
))
|
||||
.into());
|
||||
}
|
||||
}
|
||||
|
||||
let uuid = Uuid::new_v4();
|
||||
self.jobs.insert(JobId::from(uuid), job);
|
||||
Ok(JobId::from(uuid))
|
||||
let job_id = JobId(Uuid::new_v4());
|
||||
jobs.insert(job_id, job);
|
||||
Ok(JobHandle {
|
||||
job_id,
|
||||
container: self.0.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_jobs(&self) -> HashMap<JobId, Job> {
|
||||
self.jobs.clone()
|
||||
}
|
||||
pub fn get_jobs(&self) -> Result<HashMap<JobId, Job>> {
|
||||
let jobs = &self
|
||||
.0
|
||||
.read()
|
||||
.map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))?
|
||||
.jobs;
|
||||
|
||||
pub fn update_progress(&mut self, id: &JobId, progress: f32) -> Result<()> {
|
||||
if let Some(job) = self.jobs.get_mut(id) {
|
||||
Ok(jobs.clone())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Hash, PartialEq, Eq, Copy)]
|
||||
pub struct JobId(Uuid);
|
||||
|
||||
impl Serialize for JobId {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_str(format!("{}", self.0).as_str())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct JobHandle {
|
||||
job_id: JobId,
|
||||
container: Arc<RwLock<JobContainerData>>,
|
||||
}
|
||||
|
||||
impl JobHandle {
|
||||
pub fn update_progress(&mut self, progress: f32) -> Result<()> {
|
||||
let jobs = &mut self
|
||||
.container
|
||||
.write()
|
||||
.map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))?
|
||||
.jobs;
|
||||
|
||||
if let Some(job) = jobs.get_mut(&self.job_id) {
|
||||
job.progress = Some(progress);
|
||||
if progress >= 100.0 {
|
||||
job.state = State::Done;
|
||||
job.state = JobState::Done;
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
|
@ -118,8 +139,14 @@ impl JobContainer {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn update_state(&mut self, id: &JobId, state: State) -> Result<()> {
|
||||
if let Some(job) = self.jobs.get_mut(id) {
|
||||
pub fn update_state(&mut self, state: JobState) -> Result<()> {
|
||||
let jobs = &mut self
|
||||
.container
|
||||
.write()
|
||||
.map_err(|err| anyhow!("Couldn't lock job container for writing! {err:?}"))?
|
||||
.jobs;
|
||||
|
||||
if let Some(job) = jobs.get_mut(&self.job_id) {
|
||||
job.state = state;
|
||||
Ok(())
|
||||
} else {
|
||||
|
@ -127,3 +154,12 @@ impl JobContainer {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for JobHandle {
|
||||
fn drop(&mut self) {
|
||||
let update_result = self.update_state(JobState::Failed);
|
||||
if let Err(err) = update_result {
|
||||
warn!("Handle dropped, but couldn't set self as failed! {:?}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue