upend/src/extractors/mod.rs

118 lines
3 KiB
Rust
Raw Normal View History

2022-02-15 13:32:46 +01:00
use crate::{
addressing::Address,
2022-03-02 01:14:46 +01:00
database::{entry::Entry, UpEndConnection, UpEndDatabase},
util::jobs::JobContainer,
2022-02-15 13:32:46 +01:00
};
use anyhow::Result;
2022-03-02 01:14:46 +01:00
use log::{info, trace};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::{
borrow::Borrow,
sync::{Arc, Mutex, RwLock},
2022-03-02 01:14:46 +01:00
};
2022-02-10 11:38:45 +01:00
#[cfg(feature = "extractors-web")]
pub mod web;
2022-02-15 13:32:46 +01:00
2022-02-28 21:36:55 +01:00
#[cfg(feature = "extractors-audio")]
pub mod audio;
2022-02-15 13:32:46 +01:00
pub trait Extractor {
fn get(
&self,
address: &Address,
2022-02-28 21:36:55 +01:00
connection: &UpEndConnection,
job_container: JobContainer,
) -> Result<Vec<Entry>>;
fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result<bool> {
Ok(true)
}
2022-02-15 13:32:46 +01:00
fn insert_info(
2022-02-15 13:32:46 +01:00
&self,
address: &Address,
connection: &UpEndConnection,
job_container: JobContainer,
2022-02-28 21:36:55 +01:00
) -> Result<usize> {
if self.is_needed(address, connection)? {
2022-02-28 21:36:55 +01:00
let entries = self.get(address, connection, job_container)?;
2022-02-15 13:32:46 +01:00
connection.transaction(|| {
2022-02-28 21:36:55 +01:00
let len = entries.len();
for entry in entries {
connection.insert_entry(entry)?;
}
2022-02-28 21:36:55 +01:00
Ok(len)
})
} else {
2022-02-28 21:36:55 +01:00
Ok(0)
}
2022-02-15 13:32:46 +01:00
}
}
2022-03-02 01:14:46 +01:00
pub fn extract_all<D: Borrow<UpEndDatabase>>(
db: D,
mut job_container: JobContainer,
2022-03-02 01:14:46 +01:00
) -> Result<usize> {
info!("Extracting metadata for all addresses.");
let db = db.borrow();
let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?;
let all_addresses = db.connection()?.get_all_addresses()?;
let total = all_addresses.len() as f32;
let count = RwLock::new(0_usize);
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let result = all_addresses
.par_iter()
.map(|address| {
let connection = db.connection()?;
let extract_result = extract(address, &connection, job_container.clone());
let mut cnt = count.write().unwrap();
*cnt += 1;
shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0)?;
extract_result
})
.flatten()
.sum();
info!(
"Done extracting metadata; processed {} addresses, added {} entries.",
all_addresses.len(),
result
);
Ok(result)
2022-03-02 01:14:46 +01:00
}
pub fn extract(
address: &Address,
connection: &UpEndConnection,
job_container: JobContainer,
) -> Result<usize> {
let mut entry_count = 0;
2022-03-02 01:14:46 +01:00
trace!("Extracting metadata for {address:?}");
#[cfg(feature = "extractors-web")]
{
entry_count += web::WebExtractor.insert_info(address, connection, job_container.clone())?;
}
#[cfg(feature = "extractors-audio")]
{
entry_count += audio::ID3Extractor.insert_info(address, connection, job_container)?;
}
2022-03-02 01:14:46 +01:00
trace!("Extracting metadata for {address:?} - got {entry_count} entries.");
Ok(entry_count)
}