extract all metadata on vault rescan

feat/vaults
Tomáš Mládek 2022-03-02 01:14:46 +01:00
parent e8af838020
commit 1c316427ab
No known key found for this signature in database
GPG Key ID: 65E225C8B3E2ED8A
4 changed files with 93 additions and 14 deletions

View File

@ -393,6 +393,21 @@ impl UpEndConnection {
}
}
// #[deprecated]
pub fn get_all_addresses(&self) -> Result<Vec<Address>> {
use crate::database::inner::schema::data::dsl::*;
let result = data
.select(entity)
.distinct()
.load::<Vec<u8>>(&self.conn)?
.into_iter()
.filter_map(|buf| Address::decode(&buf).ok())
.collect();
Ok(result)
}
// #[deprecated]
pub fn get_all_attributes(&self) -> Result<Vec<String>> {
use crate::database::inner::schema::data::dsl::*;

View File

@ -1,10 +1,15 @@
use crate::{
addressing::Address,
database::{entry::Entry, UpEndConnection},
util::jobs::JobContainer,
database::{entry::Entry, UpEndConnection, UpEndDatabase},
util::jobs::{Job, JobContainer},
};
use anyhow::Result;
use std::sync::{Arc, RwLock};
use log::{info, trace};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::{
borrow::Borrow,
sync::{Arc, RwLock},
};
#[cfg(feature = "extractors-web")]
pub mod web;
@ -46,12 +51,62 @@ pub trait Extractor {
}
}
pub fn extract_all(
pub fn extract_all<D: Borrow<UpEndDatabase>>(
db: D,
job_container: Arc<RwLock<JobContainer>>,
) -> Result<usize> {
info!("Extracting metadata for all addresses.");
let db = db.borrow();
let job_id_result = job_container
.write()
.unwrap()
.add_job(Job::new("EXTRACT_ALL", "Extracting additional metadata..."));
match job_id_result {
Ok(job_id) => {
let all_addresses = db.connection()?.get_all_addresses()?;
let total = all_addresses.len() as f32;
let count = RwLock::new(0_usize);
let result = all_addresses
.par_iter()
.map(|address| {
let connection = db.connection()?;
let extract_result = extract(address, &connection, job_container.clone());
let mut cnt = count.write().unwrap();
*cnt += 1;
job_container
.write()
.unwrap()
.update_progress(&job_id, *cnt as f32 / total * 100.0)?;
extract_result
})
.flatten()
.sum();
info!(
"Done extracting metadata; processed {} addresses, added {} entries.",
all_addresses.len(),
result
);
Ok(result)
}
Err(err) => Err(err.into()),
}
}
pub fn extract(
address: &Address,
connection: &UpEndConnection,
job_container: Arc<RwLock<JobContainer>>,
) -> Result<usize> {
let mut entry_count = 0;
trace!("Extracting metadata for {address:?}");
#[cfg(feature = "extractors-web")]
{
@ -63,5 +118,7 @@ pub fn extract_all(
entry_count += audio::ID3Extractor.insert_info(address, connection, job_container)?;
}
trace!("Extracting metadata for {address:?} - got {entry_count} entries.");
Ok(entry_count)
}

View File

@ -218,11 +218,16 @@ fn main() -> Result<()> {
if !matches.is_present("NO_INITIAL_UPDATE") {
info!("Running initial update...");
if open_result.new {
block_background(|| filesystem::rescan_vault(upend, job_container, false, true));
} else {
block_background(|| filesystem::rescan_vault(upend, job_container, true, false));
}
let new = open_result.new;
block_background::<_, _, anyhow::Error>(move || {
let _ = if new {
filesystem::rescan_vault(upend.clone(), job_container.clone(), false, true)
} else {
filesystem::rescan_vault(upend.clone(), job_container.clone(), true, false)
};
let _ = extractors::extract_all(upend, job_container);
Ok(())
})
}
#[cfg(feature = "desktop")]

View File

@ -372,7 +372,7 @@ pub async fn put_object(
let _job_container = state.job_container.clone();
block_background::<_, _, anyhow::Error>(move || {
let extract_result =
extractors::extract_all(&_address, &connection, _job_container);
extractors::extract(&_address, &connection, _job_container);
if let Ok(entry_count) = extract_result {
debug!("Added {entry_count} extracted entries for {_address:?}");
} else {
@ -472,7 +472,7 @@ pub async fn put_object(
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
block_background::<_, _, anyhow::Error>(move || {
let extract_result =
extractors::extract_all(&_address, &connection, _job_container);
extractors::extract(&_address, &connection, _job_container);
if let Ok(entry_count) = extract_result {
debug!("Added {entry_count} extracted entries for {_address:?}");
} else {
@ -625,13 +625,15 @@ pub async fn api_refresh(
state: web::Data<State>,
web::Query(query): web::Query<RescanRequest>,
) -> Result<HttpResponse, Error> {
block_background(move || {
crate::filesystem::rescan_vault(
block_background::<_, _, anyhow::Error>(move || {
let _ = crate::filesystem::rescan_vault(
state.upend.clone(),
state.job_container.clone(),
query.full.is_none(),
false,
)
);
let _ = crate::extractors::extract_all(state.upend.clone(), state.job_container.clone());
Ok(())
});
Ok(HttpResponse::Ok().finish())
}