refactor(backend): put all extractors into ExtractorManager
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
Tomáš Mládek 2024-07-29 13:44:04 +02:00
parent 22f9b6b447
commit baa4c02014
3 changed files with 187 additions and 218 deletions

View file

@ -25,7 +25,7 @@ pub mod media;
#[cfg(feature = "extractors-external")] #[cfg(feature = "extractors-external")]
pub mod external; pub mod external;
pub trait Extractor { pub trait Extractor: Send + Sync {
fn get( fn get(
&self, &self,
address: &Address, address: &Address,
@ -35,6 +35,10 @@ pub trait Extractor {
context: OperationContext, context: OperationContext,
) -> Result<ExtractorGetResult>; ) -> Result<ExtractorGetResult>;
fn is_applicable(&self, _address: &Address) -> bool {
true
}
fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result<bool> { fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result<bool> {
Ok(true) Ok(true)
} }
@ -93,194 +97,145 @@ impl From<Vec<Entry>> for ExtractorGetResult {
} }
} }
#[tracing::instrument(name = "Extract all metadata", skip_all)] pub struct ExtractorManager {
pub fn extract_all<D: Borrow<UpEndDatabase>>( extractors: Vec<(&'static str, Box<dyn Extractor>)>,
db: D,
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
context: OperationContext,
) -> Result<usize> {
info!("Extracting metadata for all addresses.");
let db = db.borrow();
let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?;
let all_addresses = db.connection()?.get_all_addresses()?;
let total = all_addresses.len() as f32;
let count = RwLock::new(0_usize);
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let result = all_addresses
.par_iter()
.map(|address| {
let connection = db.connection()?;
let entry_count = extract(
address,
&connection,
store.clone(),
job_container.clone(),
context.clone(),
);
let mut cnt = count.write().unwrap();
*cnt += 1;
shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0)?;
anyhow::Ok(entry_count)
})
.flatten()
.sum();
info!(
"Done extracting metadata; processed {} addresses, added {} entries.",
all_addresses.len(),
result
);
Ok(result)
} }
#[tracing::instrument(skip(connection, store, job_container))] impl ExtractorManager {
pub fn extract( pub fn new() -> Self {
address: &Address, let mut extractors: Vec<(&str, Box<dyn Extractor>)> = vec![];
connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer,
context: OperationContext,
) -> usize {
let mut entry_count = 0;
let mut all_inserted = vec![];
trace!("Extracting metadata for {address:?}");
#[cfg(feature = "extractors-web")] #[cfg(feature = "extractors-web")]
{ {
let extract_result = web::WebExtractor.insert_info( extractors.push(("web", Box::new(web::WebExtractor)));
address,
connection,
store.clone(),
job_container.clone(),
context.clone(),
);
match extract_result {
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("web: {}", err),
} }
#[cfg(feature = "extractors-audio")]
{
extractors.push(("audio", Box::new(audio::ID3Extractor)));
}
#[cfg(feature = "extractors-exif")]
{
extractors.push(("exif", Box::new(exif::ExifExtractor)));
}
#[cfg(feature = "extractors-media")]
{
extractors.push(("media", Box::new(media::MediaExtractor)));
}
#[cfg(feature = "extractors-external")]
{
extractors.push(("external", Box::new(external::MonolithExtractor)));
extractors.push(("external", Box::new(external::YtDlpExtractor)));
}
Self { extractors }
} }
#[cfg(feature = "extractors-audio")] #[tracing::instrument(skip(self, connection, store, job_container))]
{ pub fn extract(
let extract_result = audio::ID3Extractor.insert_info( &self,
address, address: &Address,
connection, connection: &UpEndConnection,
store.clone(), store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container.clone(), job_container: JobContainer,
context.clone(), context: OperationContext,
); ) -> usize {
let mut entry_count = 0;
let mut all_inserted = vec![];
trace!("Extracting metadata for {address:?}");
match extract_result { for (name, extractor) in &self.extractors {
Ok(ExtractorResult { count, inserted }) => { if extractor.is_applicable(address) {
entry_count += count; trace!("Extracting with {name}");
all_inserted.extend(inserted); let extract_result = extractor.insert_info(
address,
connection,
store.clone(),
job_container.clone(),
context.clone(),
);
match extract_result {
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("{name}: {err}"),
}
} }
Err(err) => debug!("audio: {}", err),
}
}
#[cfg(feature = "extractors-exif")]
{
let extract_result = exif::ExifExtractor.insert_info(
address,
connection,
store.clone(),
job_container.clone(),
context.clone(),
);
match extract_result {
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("photo: {}", err),
}
}
#[cfg(feature = "extractors-media")]
{
let extract_result = media::MediaExtractor.insert_info(
address,
connection,
store.clone(),
job_container.clone(),
context.clone(),
);
match extract_result {
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("media: {}", err),
}
}
#[cfg(feature = "extractors-external")]
{
let extract_result = external::MonolithExtractor.insert_info(
address,
connection,
store.clone(),
job_container.clone(),
context.clone(),
);
match extract_result {
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("external monolith: {}", err),
} }
let extract_result = external::YtDlpExtractor.insert_info( trace!(
address,
connection,
store.clone(),
job_container.clone(),
context.clone(),
);
match extract_result {
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("external yt-dlp: {}", err),
}
}
trace!(
"Extracting metadata for {address:?} - got {entry_count} entries, inserted {} new blobs.", "Extracting metadata for {address:?} - got {entry_count} entries, inserted {} new blobs.",
all_inserted.len() all_inserted.len()
); );
for inserted in all_inserted { for inserted in all_inserted {
extract( self.extract(
&inserted, &inserted,
connection, connection,
store.clone(), store.clone(),
job_container.clone(), job_container.clone(),
context.clone(), context.clone(),
); );
}
entry_count
} }
entry_count #[tracing::instrument(name = "Extract all metadata", skip_all)]
pub fn extract_all<D: Borrow<UpEndDatabase>>(
&self,
db: D,
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
context: OperationContext,
) -> Result<usize> {
info!("Extracting metadata for all addresses.");
let db = db.borrow();
let job_handle =
job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?;
let all_addresses = db.connection()?.get_all_addresses()?;
let total = all_addresses.len() as f32;
let count = RwLock::new(0_usize);
let shared_job_handle = Arc::new(Mutex::new(job_handle));
let result = all_addresses
.par_iter()
.map(|address| {
let connection = db.connection()?;
let entry_count = self.extract(
address,
&connection,
store.clone(),
job_container.clone(),
context.clone(),
);
let mut cnt = count.write().unwrap();
*cnt += 1;
shared_job_handle
.lock()
.unwrap()
.update_progress(*cnt as f32 / total * 100.0)?;
anyhow::Ok(entry_count)
})
.flatten()
.sum();
info!(
"Done extracting metadata; processed {} addresses, added {} entries.",
all_addresses.len(),
result
);
Ok(result)
}
} }

View file

@ -441,6 +441,8 @@ async fn main() -> Result<()> {
debug!("No preview path exists, continuing..."); debug!("No preview path exists, continuing...");
} }
} }
let extractor_manager = Arc::new(extractors::ExtractorManager::new());
let mut bind: SocketAddr = args.bind.parse().expect("Incorrect bind format."); let mut bind: SocketAddr = args.bind.parse().expect("Incorrect bind format.");
@ -458,6 +460,7 @@ async fn main() -> Result<()> {
upend: upend.clone(), upend: upend.clone(),
store: store.clone(), store: store.clone(),
job_container: job_container.clone(), job_container: job_container.clone(),
extractor_manager: extractor_manager.clone(),
preview_store, preview_store,
config: UpEndConfig { config: UpEndConfig {
vault_name: Some(args.vault_name.unwrap_or_else(|| { vault_name: Some(args.vault_name.unwrap_or_else(|| {
@ -506,6 +509,7 @@ async fn main() -> Result<()> {
let upend = upend.clone(); let upend = upend.clone();
let store = store.clone(); let store = store.clone();
let job_container = job_container.clone(); let job_container = job_container.clone();
let extractor_manager = extractor_manager.clone();
block_background::<_, _, anyhow::Error>(move || { block_background::<_, _, anyhow::Error>(move || {
let connection: upend_db::UpEndConnection = upend.connection()?; let connection: upend_db::UpEndConnection = upend.connection()?;
@ -531,7 +535,7 @@ async fn main() -> Result<()> {
}, },
OperationContext::default(), OperationContext::default(),
); );
let _ = extractors::extract_all( let _ = extractor_manager.extract_all(
upend, upend,
store, store,
job_container, job_container,
@ -549,6 +553,7 @@ async fn main() -> Result<()> {
let upend = upend.clone(); let upend = upend.clone();
let store = store.clone(); let store = store.clone();
let job_container = job_container.clone(); let job_container = job_container.clone();
let extractor_manager = extractor_manager.clone();
block_background::<_, _, anyhow::Error>(move || { block_background::<_, _, anyhow::Error>(move || {
info!("Running periodic vault update."); info!("Running periodic vault update.");
let connection = upend.connection()?; let connection = upend.connection()?;
@ -559,10 +564,10 @@ async fn main() -> Result<()> {
upend_db::stores::UpdateOptions { upend_db::stores::UpdateOptions {
initial: false, initial: false,
tree_mode, tree_mode,
}, },
OperationContext::default(), OperationContext::default(),
); );
let _ = extractors::extract_all( let _ = extractor_manager.extract_all(
upend, upend,
store, store,
job_container, job_container,

View file

@ -1,7 +1,7 @@
use crate::common::build; use crate::common::build;
use crate::common::REQWEST_CLIENT; use crate::common::REQWEST_CLIENT;
use crate::config::UpEndConfig; use crate::config::UpEndConfig;
use crate::extractors; use crate::extractors::ExtractorManager;
use crate::previews::PreviewStore; use crate::previews::PreviewStore;
use crate::util::exec::block_background; use crate::util::exec::block_background;
use actix_files::NamedFile; use actix_files::NamedFile;
@ -56,6 +56,7 @@ pub struct State {
pub store: Arc<Box<dyn UpStore + Sync + Send>>, pub store: Arc<Box<dyn UpStore + Sync + Send>>,
pub config: UpEndConfig, pub config: UpEndConfig,
pub job_container: jobs::JobContainer, pub job_container: jobs::JobContainer,
pub extractor_manager: Arc<ExtractorManager>,
pub preview_store: Arc<PreviewStore>, pub preview_store: Arc<PreviewStore>,
pub public: Arc<Mutex<bool>>, pub public: Arc<Mutex<bool>>,
} }
@ -567,25 +568,28 @@ pub async fn put_object(
PutInput::Address { entity: in_address } => { PutInput::Address { entity: in_address } => {
let address: Address = in_address.try_into().map_err(ErrorBadRequest)?; let address: Address = in_address.try_into().map_err(ErrorBadRequest)?;
let _address = address.clone(); {
let _job_container = state.job_container.clone(); let address = address.clone();
let _store = state.store.clone(); let job_container = state.job_container.clone();
let _user = user.clone(); let store = state.store.clone();
block_background::<_, _, anyhow::Error>(move || { let extractors = state.extractor_manager.clone();
let entry_count = extractors::extract( let user = user.clone();
&_address, block_background::<_, _, anyhow::Error>(move || {
&connection, let entry_count = extractors.extract(
_store, &address,
_job_container, &connection,
OperationContext { store,
user: _user, job_container,
provenance: "API".to_string(), OperationContext {
}, user,
); provenance: "API".to_string(),
},
);
debug!("Added {entry_count} extracted entries for {_address:?}"); debug!("Added {entry_count} extracted entries for {address:?}");
Ok(()) Ok(())
}); });
}
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _user = user.clone(); let _user = user.clone();
@ -703,25 +707,28 @@ pub async fn put_blob(
}) })
.await; .await;
let _address = address.clone(); {
let _job_container = state.job_container.clone(); let address = address.clone();
let _store = state.store.clone(); let job_container = state.job_container.clone();
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let store = state.store.clone();
let _user = user.clone(); let extractors = state.extractor_manager.clone();
block_background::<_, _, anyhow::Error>(move || { let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let entry_count = extractors::extract( let user = user.clone();
&_address, block_background::<_, _, anyhow::Error>(move || {
&connection, let entry_count = extractors.extract(
_store, &address,
_job_container, &connection,
OperationContext { store,
user: _user, job_container,
provenance: "API".to_string(), OperationContext {
}, user,
); provenance: "API".to_string(),
debug!("Added {entry_count} extracted entries for {_address:?}"); },
Ok(()) );
}); debug!("Added {entry_count} extracted entries for {address:?}");
Ok(())
});
}
Ok(HttpResponse::Ok().json(address)) Ok(HttpResponse::Ok().json(address))
} else { } else {
Err(ErrorBadRequest("Multipart contains no fields.")) Err(ErrorBadRequest("Multipart contains no fields."))
@ -982,6 +989,7 @@ pub async fn api_refresh(
let user = check_auth(&req, &state)?; let user = check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let extractors = state.extractor_manager.clone();
block_background::<_, _, anyhow::Error>(move || { block_background::<_, _, anyhow::Error>(move || {
let _ = state.store.update( let _ = state.store.update(
@ -1001,7 +1009,7 @@ pub async fn api_refresh(
provenance: "API".to_string(), provenance: "API".to_string(),
}, },
); );
let _ = crate::extractors::extract_all( let _ = extractors.extract_all(
state.upend.clone(), state.upend.clone(),
state.store.clone(), state.store.clone(),
state.job_container.clone(), state.job_container.clone(),
@ -1221,7 +1229,7 @@ mod tests {
use std::fs::File; use std::fs::File;
use super::*; use super::*;
use anyhow::Result; use anyhow::Result;
use tempfile::TempDir; use tempfile::TempDir;
use upend_base::hash::UpMultihash; use upend_base::hash::UpMultihash;
@ -1415,6 +1423,7 @@ mod tests {
secret: "secret".to_string(), secret: "secret".to_string(),
}, },
job_container, job_container,
extractor_manager: Arc::new(ExtractorManager::new()),
preview_store: Arc::new(PreviewStore::new("", store)), preview_store: Arc::new(PreviewStore::new("", store)),
public: Arc::new(Mutex::new(true)), public: Arc::new(Mutex::new(true)),
} }