add "jobs", endpoint to query vault update progress

feat/vaults
Tomáš Mládek 2021-02-20 17:36:19 +01:00
parent 5096e1eece
commit ed5ea08335
4 changed files with 141 additions and 10 deletions

View File

@ -4,6 +4,7 @@ use crate::database::{
DbPool, Entry, EntryQuery, EntryValue, Query, QueryComponent, QueryPart, DATABASE_FILENAME,
};
use crate::hash::Hashable;
use crate::jobs::{Job, JobContainer, JobId};
use crate::models;
use crate::models::File;
use anyhow::{anyhow, Error, Result};
@ -283,8 +284,20 @@ pub fn resolve_path<C: Connection<Backend = Sqlite>>(
Ok(result)
}
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await;
pub async fn reimport_directory(
pool: DbPool,
directory: PathBuf,
job_container: Arc<RwLock<JobContainer>>,
) {
let job_id = job_container
.write()
.unwrap()
.add_job(Job::new("REIMPORT", "Reimporting vault..."))
.unwrap();
let result =
actix_web::web::block(move || _reimport_directory(pool, directory, job_container, job_id))
.await;
if result.is_err() {
let err = result.err().unwrap();
error!("Update did not succeed! {:?}", err);
@ -302,6 +315,8 @@ enum UpdatePathOutcome {
fn _reimport_directory<T: AsRef<Path>>(
pool: DbPool,
directory: T,
job_container: Arc<RwLock<JobContainer>>,
job_id: JobId,
) -> Result<Vec<UpdatePathResult>> {
let start = Instant::now();
@ -316,15 +331,23 @@ fn _reimport_directory<T: AsRef<Path>>(
let absolute_path = fs::canonicalize(&directory)?;
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
let count = RwLock::new(0_usize);
let total = path_entries.len() as f32;
let path_results: Vec<UpdatePathResult> = path_entries
.into_par_iter()
.map(|path| {
Ok(_process_directory_entry(
&rw_pool,
path,
&absolute_path,
&existing_files,
)?)
let result = _process_directory_entry(&rw_pool, path, &absolute_path, &existing_files)?;
let mut cnt = count.write().unwrap();
*cnt += 1;
job_container
.write()
.unwrap()
.update_progress(&job_id, *cnt as f32 / total * 100.0)
.unwrap();
Ok(result)
})
.collect();

85
src/jobs.rs Normal file
View File

@ -0,0 +1,85 @@
use anyhow::{anyhow, Result};
use serde::{Serialize, Serializer};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Default, Serialize, Clone)]
pub struct Job {
job_type: JobType,
title: String,
progress: f32,
state: State,
}
impl Job {
pub fn new<S: AsRef<str>>(job_type: S, title: S) -> Self {
return Job {
job_type: String::from(job_type.as_ref()),
title: String::from(title.as_ref()),
..Default::default()
};
}
}
pub type JobType = String;
#[derive(Serialize, Clone)]
pub enum State {
InProgress,
Done,
}
impl Default for State {
fn default() -> Self {
State::InProgress
}
}
#[derive(Default)]
pub struct JobContainer {
jobs: HashMap<JobId, Job>,
}
#[derive(Clone, Hash, PartialEq, Eq)]
pub struct JobId {
uuid: Uuid,
}
impl From<Uuid> for JobId {
fn from(uuid: Uuid) -> Self {
JobId { uuid }
}
}
impl Serialize for JobId {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
serializer.serialize_str(format!("{}", self.uuid).as_str())
}
}
impl JobContainer {
pub fn add_job(&mut self, job: Job) -> Result<JobId> {
let uuid = Uuid::new_v4();
self.jobs.insert(JobId::from(uuid), job);
Ok(JobId::from(uuid))
}
pub fn get_jobs(&self) -> HashMap<JobId, Job> {
self.jobs.clone()
}
pub fn update_progress(&mut self, id: &JobId, progress: f32) -> Result<()> {
if let Some(job) = self.jobs.get_mut(id) {
job.progress = progress;
if progress >= 100.0 {
job.state = State::Done;
}
Ok(())
} else {
Err(anyhow!("No such job."))
}
}
}

View File

@ -11,11 +11,13 @@ use actix_web::{middleware, App, HttpServer};
use anyhow::Result;
use clap::{App as ClapApp, Arg};
use log::{info, warn};
use std::sync::{Arc, RwLock};
mod addressing;
mod database;
mod filesystem;
mod hash;
mod jobs;
mod models;
mod routes;
mod schema;
@ -59,6 +61,8 @@ fn main() -> Result<()> {
info!("Starting UpEnd {}...", VERSION);
let sys = actix::System::new("upend");
let job_container = Arc::new(RwLock::new(jobs::JobContainer::default()));
let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap());
let open_result = database::open_upend(&vault_path, matches.is_present("REINITIALIZE"))
@ -76,6 +80,7 @@ fn main() -> Result<()> {
let state = routes::State {
directory: vault_path.clone(),
db_pool: db_pool.clone(),
job_container: job_container.clone(),
};
// Start HTTP server
@ -90,6 +95,7 @@ fn main() -> Result<()> {
.service(routes::delete_object)
.service(routes::api_refresh)
.service(routes::list_hier)
.service(routes::get_jobs)
.service(
actix_files::Files::new(
"/",
@ -103,7 +109,11 @@ fn main() -> Result<()> {
if !matches.is_present("NO_INITIAL_UPDATE") {
info!("Running initial update...");
actix::spawn(filesystem::reimport_directory(db_pool, vault_path));
actix::spawn(filesystem::reimport_directory(
db_pool,
vault_path,
job_container,
));
}
if !matches.is_present("NO_BROWSER") {

View File

@ -4,6 +4,7 @@ use crate::database::{
};
use crate::filesystem::{list_directory, UPath};
use crate::hash::{decode, encode, Hashable};
use crate::jobs::JobContainer;
use actix_files::NamedFile;
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, ErrorNotFound};
use actix_web::{delete, error, get, post, put, web, Error, HttpResponse};
@ -14,11 +15,13 @@ use serde::Deserialize;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
#[derive(Clone)]
pub struct State {
pub directory: PathBuf,
pub db_pool: DbPool,
pub job_container: Arc<RwLock<JobContainer>>,
}
#[get("/api/raw/{hash}")]
@ -146,6 +149,16 @@ pub async fn list_hier(
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
let _pool = state.db_pool.clone();
let _directory = state.directory.clone();
actix::spawn(crate::filesystem::reimport_directory(_pool, _directory));
actix::spawn(crate::filesystem::reimport_directory(
_pool,
_directory,
state.job_container.clone(),
));
Ok(HttpResponse::Ok().finish())
}
#[get("/api/jobs")]
pub async fn get_jobs(state: web::Data<State>) -> Result<HttpResponse, Error> {
let jobs = state.job_container.read().unwrap().get_jobs();
Ok(HttpResponse::Ok().json(&jobs))
}