add "jobs", endpoint to query vault update progress
parent
5096e1eece
commit
ed5ea08335
|
@ -4,6 +4,7 @@ use crate::database::{
|
|||
DbPool, Entry, EntryQuery, EntryValue, Query, QueryComponent, QueryPart, DATABASE_FILENAME,
|
||||
};
|
||||
use crate::hash::Hashable;
|
||||
use crate::jobs::{Job, JobContainer, JobId};
|
||||
use crate::models;
|
||||
use crate::models::File;
|
||||
use anyhow::{anyhow, Error, Result};
|
||||
|
@ -283,8 +284,20 @@ pub fn resolve_path<C: Connection<Backend = Sqlite>>(
|
|||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
|
||||
let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await;
|
||||
pub async fn reimport_directory(
|
||||
pool: DbPool,
|
||||
directory: PathBuf,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
) {
|
||||
let job_id = job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.add_job(Job::new("REIMPORT", "Reimporting vault..."))
|
||||
.unwrap();
|
||||
|
||||
let result =
|
||||
actix_web::web::block(move || _reimport_directory(pool, directory, job_container, job_id))
|
||||
.await;
|
||||
if result.is_err() {
|
||||
let err = result.err().unwrap();
|
||||
error!("Update did not succeed! {:?}", err);
|
||||
|
@ -302,6 +315,8 @@ enum UpdatePathOutcome {
|
|||
fn _reimport_directory<T: AsRef<Path>>(
|
||||
pool: DbPool,
|
||||
directory: T,
|
||||
job_container: Arc<RwLock<JobContainer>>,
|
||||
job_id: JobId,
|
||||
) -> Result<Vec<UpdatePathResult>> {
|
||||
let start = Instant::now();
|
||||
|
||||
|
@ -316,15 +331,23 @@ fn _reimport_directory<T: AsRef<Path>>(
|
|||
let absolute_path = fs::canonicalize(&directory)?;
|
||||
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
|
||||
|
||||
let count = RwLock::new(0_usize);
|
||||
let total = path_entries.len() as f32;
|
||||
let path_results: Vec<UpdatePathResult> = path_entries
|
||||
.into_par_iter()
|
||||
.map(|path| {
|
||||
Ok(_process_directory_entry(
|
||||
&rw_pool,
|
||||
path,
|
||||
&absolute_path,
|
||||
&existing_files,
|
||||
)?)
|
||||
let result = _process_directory_entry(&rw_pool, path, &absolute_path, &existing_files)?;
|
||||
|
||||
let mut cnt = count.write().unwrap();
|
||||
*cnt += 1;
|
||||
|
||||
job_container
|
||||
.write()
|
||||
.unwrap()
|
||||
.update_progress(&job_id, *cnt as f32 / total * 100.0)
|
||||
.unwrap();
|
||||
|
||||
Ok(result)
|
||||
})
|
||||
.collect();
|
||||
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use serde::{Serialize, Serializer};
|
||||
use std::collections::HashMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Default, Serialize, Clone)]
|
||||
pub struct Job {
|
||||
job_type: JobType,
|
||||
title: String,
|
||||
progress: f32,
|
||||
state: State,
|
||||
}
|
||||
|
||||
impl Job {
|
||||
pub fn new<S: AsRef<str>>(job_type: S, title: S) -> Self {
|
||||
return Job {
|
||||
job_type: String::from(job_type.as_ref()),
|
||||
title: String::from(title.as_ref()),
|
||||
..Default::default()
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub type JobType = String;
|
||||
|
||||
#[derive(Serialize, Clone)]
|
||||
pub enum State {
|
||||
InProgress,
|
||||
Done,
|
||||
}
|
||||
|
||||
impl Default for State {
|
||||
fn default() -> Self {
|
||||
State::InProgress
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct JobContainer {
|
||||
jobs: HashMap<JobId, Job>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Hash, PartialEq, Eq)]
|
||||
pub struct JobId {
|
||||
uuid: Uuid,
|
||||
}
|
||||
|
||||
impl From<Uuid> for JobId {
|
||||
fn from(uuid: Uuid) -> Self {
|
||||
JobId { uuid }
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for JobId {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_str(format!("{}", self.uuid).as_str())
|
||||
}
|
||||
}
|
||||
|
||||
impl JobContainer {
|
||||
pub fn add_job(&mut self, job: Job) -> Result<JobId> {
|
||||
let uuid = Uuid::new_v4();
|
||||
self.jobs.insert(JobId::from(uuid), job);
|
||||
Ok(JobId::from(uuid))
|
||||
}
|
||||
|
||||
pub fn get_jobs(&self) -> HashMap<JobId, Job> {
|
||||
self.jobs.clone()
|
||||
}
|
||||
|
||||
pub fn update_progress(&mut self, id: &JobId, progress: f32) -> Result<()> {
|
||||
if let Some(job) = self.jobs.get_mut(id) {
|
||||
job.progress = progress;
|
||||
if progress >= 100.0 {
|
||||
job.state = State::Done;
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
Err(anyhow!("No such job."))
|
||||
}
|
||||
}
|
||||
}
|
12
src/main.rs
12
src/main.rs
|
@ -11,11 +11,13 @@ use actix_web::{middleware, App, HttpServer};
|
|||
use anyhow::Result;
|
||||
use clap::{App as ClapApp, Arg};
|
||||
use log::{info, warn};
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
mod addressing;
|
||||
mod database;
|
||||
mod filesystem;
|
||||
mod hash;
|
||||
mod jobs;
|
||||
mod models;
|
||||
mod routes;
|
||||
mod schema;
|
||||
|
@ -59,6 +61,8 @@ fn main() -> Result<()> {
|
|||
info!("Starting UpEnd {}...", VERSION);
|
||||
let sys = actix::System::new("upend");
|
||||
|
||||
let job_container = Arc::new(RwLock::new(jobs::JobContainer::default()));
|
||||
|
||||
let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap());
|
||||
|
||||
let open_result = database::open_upend(&vault_path, matches.is_present("REINITIALIZE"))
|
||||
|
@ -76,6 +80,7 @@ fn main() -> Result<()> {
|
|||
let state = routes::State {
|
||||
directory: vault_path.clone(),
|
||||
db_pool: db_pool.clone(),
|
||||
job_container: job_container.clone(),
|
||||
};
|
||||
|
||||
// Start HTTP server
|
||||
|
@ -90,6 +95,7 @@ fn main() -> Result<()> {
|
|||
.service(routes::delete_object)
|
||||
.service(routes::api_refresh)
|
||||
.service(routes::list_hier)
|
||||
.service(routes::get_jobs)
|
||||
.service(
|
||||
actix_files::Files::new(
|
||||
"/",
|
||||
|
@ -103,7 +109,11 @@ fn main() -> Result<()> {
|
|||
|
||||
if !matches.is_present("NO_INITIAL_UPDATE") {
|
||||
info!("Running initial update...");
|
||||
actix::spawn(filesystem::reimport_directory(db_pool, vault_path));
|
||||
actix::spawn(filesystem::reimport_directory(
|
||||
db_pool,
|
||||
vault_path,
|
||||
job_container,
|
||||
));
|
||||
}
|
||||
|
||||
if !matches.is_present("NO_BROWSER") {
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::database::{
|
|||
};
|
||||
use crate::filesystem::{list_directory, UPath};
|
||||
use crate::hash::{decode, encode, Hashable};
|
||||
use crate::jobs::JobContainer;
|
||||
use actix_files::NamedFile;
|
||||
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, ErrorNotFound};
|
||||
use actix_web::{delete, error, get, post, put, web, Error, HttpResponse};
|
||||
|
@ -14,11 +15,13 @@ use serde::Deserialize;
|
|||
use std::collections::HashMap;
|
||||
use std::convert::TryFrom;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct State {
|
||||
pub directory: PathBuf,
|
||||
pub db_pool: DbPool,
|
||||
pub job_container: Arc<RwLock<JobContainer>>,
|
||||
}
|
||||
|
||||
#[get("/api/raw/{hash}")]
|
||||
|
@ -146,6 +149,16 @@ pub async fn list_hier(
|
|||
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||
let _pool = state.db_pool.clone();
|
||||
let _directory = state.directory.clone();
|
||||
actix::spawn(crate::filesystem::reimport_directory(_pool, _directory));
|
||||
actix::spawn(crate::filesystem::reimport_directory(
|
||||
_pool,
|
||||
_directory,
|
||||
state.job_container.clone(),
|
||||
));
|
||||
Ok(HttpResponse::Ok().finish())
|
||||
}
|
||||
|
||||
#[get("/api/jobs")]
|
||||
pub async fn get_jobs(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||
let jobs = state.job_container.read().unwrap().get_jobs();
|
||||
Ok(HttpResponse::Ok().json(&jobs))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue