add "jobs", endpoint to query vault update progress
parent
5096e1eece
commit
ed5ea08335
|
@ -4,6 +4,7 @@ use crate::database::{
|
||||||
DbPool, Entry, EntryQuery, EntryValue, Query, QueryComponent, QueryPart, DATABASE_FILENAME,
|
DbPool, Entry, EntryQuery, EntryValue, Query, QueryComponent, QueryPart, DATABASE_FILENAME,
|
||||||
};
|
};
|
||||||
use crate::hash::Hashable;
|
use crate::hash::Hashable;
|
||||||
|
use crate::jobs::{Job, JobContainer, JobId};
|
||||||
use crate::models;
|
use crate::models;
|
||||||
use crate::models::File;
|
use crate::models::File;
|
||||||
use anyhow::{anyhow, Error, Result};
|
use anyhow::{anyhow, Error, Result};
|
||||||
|
@ -283,8 +284,20 @@ pub fn resolve_path<C: Connection<Backend = Sqlite>>(
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn reimport_directory(pool: DbPool, directory: PathBuf) {
|
pub async fn reimport_directory(
|
||||||
let result = actix_web::web::block(move || _reimport_directory(pool, directory)).await;
|
pool: DbPool,
|
||||||
|
directory: PathBuf,
|
||||||
|
job_container: Arc<RwLock<JobContainer>>,
|
||||||
|
) {
|
||||||
|
let job_id = job_container
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.add_job(Job::new("REIMPORT", "Reimporting vault..."))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let result =
|
||||||
|
actix_web::web::block(move || _reimport_directory(pool, directory, job_container, job_id))
|
||||||
|
.await;
|
||||||
if result.is_err() {
|
if result.is_err() {
|
||||||
let err = result.err().unwrap();
|
let err = result.err().unwrap();
|
||||||
error!("Update did not succeed! {:?}", err);
|
error!("Update did not succeed! {:?}", err);
|
||||||
|
@ -302,6 +315,8 @@ enum UpdatePathOutcome {
|
||||||
fn _reimport_directory<T: AsRef<Path>>(
|
fn _reimport_directory<T: AsRef<Path>>(
|
||||||
pool: DbPool,
|
pool: DbPool,
|
||||||
directory: T,
|
directory: T,
|
||||||
|
job_container: Arc<RwLock<JobContainer>>,
|
||||||
|
job_id: JobId,
|
||||||
) -> Result<Vec<UpdatePathResult>> {
|
) -> Result<Vec<UpdatePathResult>> {
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
|
|
||||||
|
@ -316,15 +331,23 @@ fn _reimport_directory<T: AsRef<Path>>(
|
||||||
let absolute_path = fs::canonicalize(&directory)?;
|
let absolute_path = fs::canonicalize(&directory)?;
|
||||||
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
|
let existing_files = Arc::new(RwLock::new(retrieve_all_files(&pool.get()?)?));
|
||||||
|
|
||||||
|
let count = RwLock::new(0_usize);
|
||||||
|
let total = path_entries.len() as f32;
|
||||||
let path_results: Vec<UpdatePathResult> = path_entries
|
let path_results: Vec<UpdatePathResult> = path_entries
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|path| {
|
.map(|path| {
|
||||||
Ok(_process_directory_entry(
|
let result = _process_directory_entry(&rw_pool, path, &absolute_path, &existing_files)?;
|
||||||
&rw_pool,
|
|
||||||
path,
|
let mut cnt = count.write().unwrap();
|
||||||
&absolute_path,
|
*cnt += 1;
|
||||||
&existing_files,
|
|
||||||
)?)
|
job_container
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.update_progress(&job_id, *cnt as f32 / total * 100.0)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use serde::{Serialize, Serializer};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
#[derive(Default, Serialize, Clone)]
|
||||||
|
pub struct Job {
|
||||||
|
job_type: JobType,
|
||||||
|
title: String,
|
||||||
|
progress: f32,
|
||||||
|
state: State,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Job {
|
||||||
|
pub fn new<S: AsRef<str>>(job_type: S, title: S) -> Self {
|
||||||
|
return Job {
|
||||||
|
job_type: String::from(job_type.as_ref()),
|
||||||
|
title: String::from(title.as_ref()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type JobType = String;
|
||||||
|
|
||||||
|
#[derive(Serialize, Clone)]
|
||||||
|
pub enum State {
|
||||||
|
InProgress,
|
||||||
|
Done,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for State {
|
||||||
|
fn default() -> Self {
|
||||||
|
State::InProgress
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct JobContainer {
|
||||||
|
jobs: HashMap<JobId, Job>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Hash, PartialEq, Eq)]
|
||||||
|
pub struct JobId {
|
||||||
|
uuid: Uuid,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Uuid> for JobId {
|
||||||
|
fn from(uuid: Uuid) -> Self {
|
||||||
|
JobId { uuid }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Serialize for JobId {
|
||||||
|
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
|
||||||
|
where
|
||||||
|
S: Serializer,
|
||||||
|
{
|
||||||
|
serializer.serialize_str(format!("{}", self.uuid).as_str())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl JobContainer {
|
||||||
|
pub fn add_job(&mut self, job: Job) -> Result<JobId> {
|
||||||
|
let uuid = Uuid::new_v4();
|
||||||
|
self.jobs.insert(JobId::from(uuid), job);
|
||||||
|
Ok(JobId::from(uuid))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_jobs(&self) -> HashMap<JobId, Job> {
|
||||||
|
self.jobs.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update_progress(&mut self, id: &JobId, progress: f32) -> Result<()> {
|
||||||
|
if let Some(job) = self.jobs.get_mut(id) {
|
||||||
|
job.progress = progress;
|
||||||
|
if progress >= 100.0 {
|
||||||
|
job.state = State::Done;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(anyhow!("No such job."))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
12
src/main.rs
12
src/main.rs
|
@ -11,11 +11,13 @@ use actix_web::{middleware, App, HttpServer};
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use clap::{App as ClapApp, Arg};
|
use clap::{App as ClapApp, Arg};
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
mod addressing;
|
mod addressing;
|
||||||
mod database;
|
mod database;
|
||||||
mod filesystem;
|
mod filesystem;
|
||||||
mod hash;
|
mod hash;
|
||||||
|
mod jobs;
|
||||||
mod models;
|
mod models;
|
||||||
mod routes;
|
mod routes;
|
||||||
mod schema;
|
mod schema;
|
||||||
|
@ -59,6 +61,8 @@ fn main() -> Result<()> {
|
||||||
info!("Starting UpEnd {}...", VERSION);
|
info!("Starting UpEnd {}...", VERSION);
|
||||||
let sys = actix::System::new("upend");
|
let sys = actix::System::new("upend");
|
||||||
|
|
||||||
|
let job_container = Arc::new(RwLock::new(jobs::JobContainer::default()));
|
||||||
|
|
||||||
let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap());
|
let vault_path = PathBuf::from(matches.value_of("DIRECTORY").unwrap());
|
||||||
|
|
||||||
let open_result = database::open_upend(&vault_path, matches.is_present("REINITIALIZE"))
|
let open_result = database::open_upend(&vault_path, matches.is_present("REINITIALIZE"))
|
||||||
|
@ -76,6 +80,7 @@ fn main() -> Result<()> {
|
||||||
let state = routes::State {
|
let state = routes::State {
|
||||||
directory: vault_path.clone(),
|
directory: vault_path.clone(),
|
||||||
db_pool: db_pool.clone(),
|
db_pool: db_pool.clone(),
|
||||||
|
job_container: job_container.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start HTTP server
|
// Start HTTP server
|
||||||
|
@ -90,6 +95,7 @@ fn main() -> Result<()> {
|
||||||
.service(routes::delete_object)
|
.service(routes::delete_object)
|
||||||
.service(routes::api_refresh)
|
.service(routes::api_refresh)
|
||||||
.service(routes::list_hier)
|
.service(routes::list_hier)
|
||||||
|
.service(routes::get_jobs)
|
||||||
.service(
|
.service(
|
||||||
actix_files::Files::new(
|
actix_files::Files::new(
|
||||||
"/",
|
"/",
|
||||||
|
@ -103,7 +109,11 @@ fn main() -> Result<()> {
|
||||||
|
|
||||||
if !matches.is_present("NO_INITIAL_UPDATE") {
|
if !matches.is_present("NO_INITIAL_UPDATE") {
|
||||||
info!("Running initial update...");
|
info!("Running initial update...");
|
||||||
actix::spawn(filesystem::reimport_directory(db_pool, vault_path));
|
actix::spawn(filesystem::reimport_directory(
|
||||||
|
db_pool,
|
||||||
|
vault_path,
|
||||||
|
job_container,
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
if !matches.is_present("NO_BROWSER") {
|
if !matches.is_present("NO_BROWSER") {
|
||||||
|
|
|
@ -4,6 +4,7 @@ use crate::database::{
|
||||||
};
|
};
|
||||||
use crate::filesystem::{list_directory, UPath};
|
use crate::filesystem::{list_directory, UPath};
|
||||||
use crate::hash::{decode, encode, Hashable};
|
use crate::hash::{decode, encode, Hashable};
|
||||||
|
use crate::jobs::JobContainer;
|
||||||
use actix_files::NamedFile;
|
use actix_files::NamedFile;
|
||||||
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, ErrorNotFound};
|
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, ErrorNotFound};
|
||||||
use actix_web::{delete, error, get, post, put, web, Error, HttpResponse};
|
use actix_web::{delete, error, get, post, put, web, Error, HttpResponse};
|
||||||
|
@ -14,11 +15,13 @@ use serde::Deserialize;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct State {
|
pub struct State {
|
||||||
pub directory: PathBuf,
|
pub directory: PathBuf,
|
||||||
pub db_pool: DbPool,
|
pub db_pool: DbPool,
|
||||||
|
pub job_container: Arc<RwLock<JobContainer>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/api/raw/{hash}")]
|
#[get("/api/raw/{hash}")]
|
||||||
|
@ -146,6 +149,16 @@ pub async fn list_hier(
|
||||||
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
pub async fn api_refresh(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
let _pool = state.db_pool.clone();
|
let _pool = state.db_pool.clone();
|
||||||
let _directory = state.directory.clone();
|
let _directory = state.directory.clone();
|
||||||
actix::spawn(crate::filesystem::reimport_directory(_pool, _directory));
|
actix::spawn(crate::filesystem::reimport_directory(
|
||||||
|
_pool,
|
||||||
|
_directory,
|
||||||
|
state.job_container.clone(),
|
||||||
|
));
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[get("/api/jobs")]
|
||||||
|
pub async fn get_jobs(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
|
let jobs = state.job_container.read().unwrap().get_jobs();
|
||||||
|
Ok(HttpResponse::Ok().json(&jobs))
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue