upend/db/src/jobs.rs

176 lines
4.3 KiB
Rust

use serde::{Serialize, Serializer};
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use tracing::warn;
use uuid::Uuid;
#[derive(Default, Serialize, Clone)]
pub struct Job {
pub job_type: Option<JobType>,
pub title: String,
pub progress: Option<f32>,
pub state: JobState,
}
pub type JobType = String;
#[derive(Default, Serialize, Clone, Copy, PartialEq)]
pub enum JobState {
#[default]
InProgress,
Done,
Failed,
}
#[derive(Default)]
pub struct JobContainerData {
jobs: HashMap<JobId, Job>,
}
#[derive(Debug, Clone)]
pub enum JobError {
JobInProgessError(String),
JobNotFound,
Unknown(String),
}
impl std::fmt::Display for JobError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
JobError::JobInProgessError(msg) => {
write!(f, "job of type {} is already in progress", self.0)
}
JobError::Unknown(msg) => write!(f, "Unknown: {}", msg),
}
}
}
#[derive(Clone)]
pub struct JobContainer(Arc<RwLock<JobContainerData>>);
impl JobContainer {
pub fn new() -> Self {
JobContainer(Arc::new(RwLock::new(JobContainerData::default())))
}
pub fn add_job<S, IS>(&mut self, job_type: IS, title: S) -> Result<JobHandle, JobError>
where
S: AsRef<str>,
IS: Into<Option<S>>,
{
let jobs = &mut self
.0
.write()
.map_err(|err| {
JobError::Unknown(format!("Couldn't lock job container for writing! {err:?}"))
})?
.jobs;
let job = Job {
job_type: job_type.into().map(|jt| String::from(jt.as_ref())),
title: String::from(title.as_ref()),
..Default::default()
};
if let Some(job_type) = &job.job_type {
if jobs
.iter()
.any(|(_, j)| j.state == JobState::InProgress && j.job_type == job.job_type)
{
return Err(JobError::JobInProgessError(job_type).into());
}
}
let job_id = JobId(Uuid::new_v4());
jobs.insert(job_id, job);
Ok(JobHandle {
job_id,
container: self.0.clone(),
})
}
pub fn get_jobs(&self) -> Result<HashMap<JobId, Job>, JobError> {
let jobs = &self
.0
.read()
.map_err(|err| {
JobError::Unknown(format!("Couldn't lock job container for writing! {err:?}"))
})?
.jobs;
Ok(jobs.clone())
}
}
impl Default for JobContainer {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Hash, PartialEq, Eq, Copy)]
pub struct JobId(Uuid);
impl Serialize for JobId {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
serializer.serialize_str(format!("{}", self.0).as_str())
}
}
pub struct JobHandle {
job_id: JobId,
container: Arc<RwLock<JobContainerData>>,
}
impl JobHandle {
pub fn update_progress(&mut self, progress: f32) -> Result<(), JobError> {
let jobs = &mut self
.container
.write()
.map_err(|err| {
JobError::Unknown(format!("Couldn't lock job container for writing! {err:?}"))
})?
.jobs;
if let Some(job) = jobs.get_mut(&self.job_id) {
job.progress = Some(progress);
if progress >= 100.0 {
job.state = JobState::Done;
}
Ok(())
} else {
Err(JobError::JobNotFound)
}
}
pub fn update_state(&mut self, state: JobState) -> Result<(), JobError> {
let jobs = &mut self
.container
.write()
.map_err(|err| {
JobError::Unknown(format!("Couldn't lock job container for writing! {err:?}"))
})?
.jobs;
if let Some(job) = jobs.get_mut(&self.job_id) {
job.state = state;
Ok(())
} else {
Err(JobError::JobNotFound)
}
}
}
impl Drop for JobHandle {
fn drop(&mut self) {
let update_result = self.update_state(JobState::Failed);
if let Err(err) = update_result {
warn!("Handle dropped, but couldn't set self as failed! {:?}", err);
}
}
}