From ed5ea083351397eb02d3af8226d2799899009595 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Ml=C3=A1dek?= Date: Sat, 20 Feb 2021 17:36:19 +0100 Subject: [PATCH] add "jobs", endpoint to query vault update progress --- src/filesystem.rs | 39 +++++++++++++++++----- src/jobs.rs | 85 +++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 12 ++++++- src/routes.rs | 15 ++++++++- 4 files changed, 141 insertions(+), 10 deletions(-) create mode 100644 src/jobs.rs diff --git a/src/filesystem.rs b/src/filesystem.rs index 828629d..947ec72 100644 --- a/src/filesystem.rs +++ b/src/filesystem.rs @@ -4,6 +4,7 @@ use crate::database::{ DbPool, Entry, EntryQuery, EntryValue, Query, QueryComponent, QueryPart, DATABASE_FILENAME, }; use crate::hash::Hashable; +use crate::jobs::{Job, JobContainer, JobId}; use crate::models; use crate::models::File; use anyhow::{anyhow, Error, Result}; @@ -283,8 +284,20 @@ pub fn resolve_path>( Ok(result) } -pub async fn reimport_directory(pool: DbPool, directory: PathBuf) { - let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await; +pub async fn reimport_directory( + pool: DbPool, + directory: PathBuf, + job_container: Arc>, +) { + let job_id = job_container + .write() + .unwrap() + .add_job(Job::new("REIMPORT", "Reimporting vault...")) + .unwrap(); + + let result = + actix_web::web::block(move || _reimport_directory(pool, directory, job_container, job_id)) + .await; if result.is_err() { let err = result.err().unwrap(); error!("Update did not succeed! {:?}", err); @@ -302,6 +315,8 @@ enum UpdatePathOutcome { fn _reimport_directory>( pool: DbPool, directory: T, + job_container: Arc>, + job_id: JobId, ) -> Result> { let start = Instant::now(); @@ -316,15 +331,23 @@ fn _reimport_directory>( let absolute_path = fs::canonicalize(&directory)?; let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?)); + let count = RwLock::new(0_usize); + let total = path_entries.len() as f32; let path_results: Vec = path_entries .into_par_iter() .map(|path| { - Ok(_process_directory_entry( - &rw_pool, - path, - &absolute_path, - &existing_files, - )?) + let result = _process_directory_entry(&rw_pool, path, &absolute_path, &existing_files)?; + + let mut cnt = count.write().unwrap(); + *cnt += 1; + + job_container + .write() + .unwrap() + .update_progress(&job_id, *cnt as f32 / total * 100.0) + .unwrap(); + + Ok(result) }) .collect(); diff --git a/src/jobs.rs b/src/jobs.rs new file mode 100644 index 0000000..5112a58 --- /dev/null +++ b/src/jobs.rs @@ -0,0 +1,85 @@ +use anyhow::{anyhow, Result}; +use serde::{Serialize, Serializer}; +use std::collections::HashMap; +use uuid::Uuid; + +#[derive(Default, Serialize, Clone)] +pub struct Job { + job_type: JobType, + title: String, + progress: f32, + state: State, +} + +impl Job { + pub fn new>(job_type: S, title: S) -> Self { + return Job { + job_type: String::from(job_type.as_ref()), + title: String::from(title.as_ref()), + ..Default::default() + }; + } +} + +pub type JobType = String; + +#[derive(Serialize, Clone)] +pub enum State { + InProgress, + Done, +} + +impl Default for State { + fn default() -> Self { + State::InProgress + } +} + +#[derive(Default)] +pub struct JobContainer { + jobs: HashMap, +} + +#[derive(Clone, Hash, PartialEq, Eq)] +pub struct JobId { + uuid: Uuid, +} + +impl From for JobId { + fn from(uuid: Uuid) -> Self { + JobId { uuid } + } +} + +impl Serialize for JobId { + fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> + where + S: Serializer, + { + serializer.serialize_str(format!("{}", self.uuid).as_str()) + } +} + +impl JobContainer { + pub fn add_job(&mut self, job: Job) -> Result { + let uuid = Uuid::new_v4(); + self.jobs.insert(JobId::from(uuid), job); + Ok(JobId::from(uuid)) + } + + pub fn get_jobs(&self) -> HashMap { + self.jobs.clone() + } + + pub fn update_progress(&mut self, id: &JobId, progress: f32) -> Result<()> { + if let Some(job) = self.jobs.get_mut(id) { + job.progress = progress; + if progress >= 100.0 { + job.state = State::Done; + } + Ok(()) + } else { + Err(anyhow!("No such job.")) + } + } +} diff --git a/src/main.rs b/src/main.rs index 9928ec9..49d549a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,11 +11,13 @@ use actix_web::{middleware, App, HttpServer}; use anyhow::Result; use clap::{App as ClapApp, Arg}; use log::{info, warn}; +use std::sync::{Arc, RwLock}; mod addressing; mod database; mod filesystem; mod hash; +mod jobs; mod models; mod routes; mod schema; @@ -59,6 +61,8 @@ fn main() -> Result<()> { info!("Starting UpEnd {}...", VERSION); let sys = actix::System::new("upend"); + let job_container = Arc::new(RwLock::new(jobs::JobContainer::default())); + let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap()); let open_result = database::open_upend(&vault_path, matches.is_present("REINITIALIZE")) @@ -76,6 +80,7 @@ fn main() -> Result<()> { let state = routes::State { directory: vault_path.clone(), db_pool: db_pool.clone(), + job_container: job_container.clone(), }; // Start HTTP server @@ -90,6 +95,7 @@ fn main() -> Result<()> { .service(routes::delete_object) .service(routes::api_refresh) .service(routes::list_hier) + .service(routes::get_jobs) .service( actix_files::Files::new( "/", @@ -103,7 +109,11 @@ fn main() -> Result<()> { if !matches.is_present("NO_INITIAL_UPDATE") { info!("Running initial update..."); - actix::spawn(filesystem::reimport_directory(db_pool, vault_path)); + actix::spawn(filesystem::reimport_directory( + db_pool, + vault_path, + job_container, + )); } if !matches.is_present("NO_BROWSER") { diff --git a/src/routes.rs b/src/routes.rs index 6633972..b686bf3 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -4,6 +4,7 @@ use crate::database::{ }; use crate::filesystem::{list_directory, UPath}; use crate::hash::{decode, encode, Hashable}; +use crate::jobs::JobContainer; use actix_files::NamedFile; use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, ErrorNotFound}; use actix_web::{delete, error, get, post, put, web, Error, HttpResponse}; @@ -14,11 +15,13 @@ use serde::Deserialize; use std::collections::HashMap; use std::convert::TryFrom; use std::path::PathBuf; +use std::sync::{Arc, RwLock}; #[derive(Clone)] pub struct State { pub directory: PathBuf, pub db_pool: DbPool, + pub job_container: Arc>, } #[get("/api/raw/{hash}")] @@ -146,6 +149,16 @@ pub async fn list_hier( pub async fn api_refresh(state: web::Data) -> Result { let _pool = state.db_pool.clone(); let _directory = state.directory.clone(); - actix::spawn(crate::filesystem::reimport_directory(_pool, _directory)); + actix::spawn(crate::filesystem::reimport_directory( + _pool, + _directory, + state.job_container.clone(), + )); Ok(HttpResponse::Ok().finish()) } + +#[get("/api/jobs")] +pub async fn get_jobs(state: web::Data) -> Result { + let jobs = state.job_container.read().unwrap().get_jobs(); + Ok(HttpResponse::Ok().json(&jobs)) +}