diff --git a/cli/src/extractors/mod.rs b/cli/src/extractors/mod.rs index 9bab169..85eb36f 100644 --- a/cli/src/extractors/mod.rs +++ b/cli/src/extractors/mod.rs @@ -25,7 +25,7 @@ pub mod media; #[cfg(feature = "extractors-external")] pub mod external; -pub trait Extractor { +pub trait Extractor: Send + Sync { fn get( &self, address: &Address, @@ -35,6 +35,10 @@ pub trait Extractor { context: OperationContext, ) -> Result; + fn is_applicable(&self, _address: &Address) -> bool { + true + } + fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result { Ok(true) } @@ -93,194 +97,145 @@ impl From> for ExtractorGetResult { } } -#[tracing::instrument(name = "Extract all metadata", skip_all)] -pub fn extract_all>( - db: D, - store: Arc>, - mut job_container: JobContainer, - context: OperationContext, -) -> Result { - info!("Extracting metadata for all addresses."); - - let db = db.borrow(); - let job_handle = job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?; - - let all_addresses = db.connection()?.get_all_addresses()?; - let total = all_addresses.len() as f32; - let count = RwLock::new(0_usize); - let shared_job_handle = Arc::new(Mutex::new(job_handle)); - - let result = all_addresses - .par_iter() - .map(|address| { - let connection = db.connection()?; - let entry_count = extract( - address, - &connection, - store.clone(), - job_container.clone(), - context.clone(), - ); - - let mut cnt = count.write().unwrap(); - *cnt += 1; - - shared_job_handle - .lock() - .unwrap() - .update_progress(*cnt as f32 / total * 100.0)?; - - anyhow::Ok(entry_count) - }) - .flatten() - .sum(); - - info!( - "Done extracting metadata; processed {} addresses, added {} entries.", - all_addresses.len(), - result - ); - - Ok(result) +pub struct ExtractorManager { + extractors: Vec<(&'static str, Box)>, } -#[tracing::instrument(skip(connection, store, job_container))] -pub fn extract( - address: &Address, - connection: &UpEndConnection, - store: Arc>, - job_container: JobContainer, - context: OperationContext, -) -> usize { - let mut entry_count = 0; - let mut all_inserted = vec![]; - trace!("Extracting metadata for {address:?}"); +impl ExtractorManager { + pub fn new() -> Self { + let mut extractors: Vec<(&str, Box)> = vec![]; - #[cfg(feature = "extractors-web")] - { - let extract_result = web::WebExtractor.insert_info( - address, - connection, - store.clone(), - job_container.clone(), - context.clone(), - ); - - match extract_result { - Ok(ExtractorResult { count, inserted }) => { - entry_count += count; - all_inserted.extend(inserted); - } - Err(err) => debug!("web: {}", err), + #[cfg(feature = "extractors-web")] + { + extractors.push(("web", Box::new(web::WebExtractor))); } + + #[cfg(feature = "extractors-audio")] + { + extractors.push(("audio", Box::new(audio::ID3Extractor))); + } + + #[cfg(feature = "extractors-exif")] + { + extractors.push(("exif", Box::new(exif::ExifExtractor))); + } + + #[cfg(feature = "extractors-media")] + { + extractors.push(("media", Box::new(media::MediaExtractor))); + } + + #[cfg(feature = "extractors-external")] + { + extractors.push(("external", Box::new(external::MonolithExtractor))); + extractors.push(("external", Box::new(external::YtDlpExtractor))); + } + + Self { extractors } } - #[cfg(feature = "extractors-audio")] - { - let extract_result = audio::ID3Extractor.insert_info( - address, - connection, - store.clone(), - job_container.clone(), - context.clone(), - ); + #[tracing::instrument(skip(self, connection, store, job_container))] + pub fn extract( + &self, + address: &Address, + connection: &UpEndConnection, + store: Arc>, + job_container: JobContainer, + context: OperationContext, + ) -> usize { + let mut entry_count = 0; + let mut all_inserted = vec![]; + trace!("Extracting metadata for {address:?}"); - match extract_result { - Ok(ExtractorResult { count, inserted }) => { - entry_count += count; - all_inserted.extend(inserted); + for (name, extractor) in &self.extractors { + if extractor.is_applicable(address) { + trace!("Extracting with {name}"); + let extract_result = extractor.insert_info( + address, + connection, + store.clone(), + job_container.clone(), + context.clone(), + ); + + match extract_result { + Ok(ExtractorResult { count, inserted }) => { + entry_count += count; + all_inserted.extend(inserted); + } + Err(err) => debug!("{name}: {err}"), + } } - Err(err) => debug!("audio: {}", err), - } - } - - #[cfg(feature = "extractors-exif")] - { - let extract_result = exif::ExifExtractor.insert_info( - address, - connection, - store.clone(), - job_container.clone(), - context.clone(), - ); - - match extract_result { - Ok(ExtractorResult { count, inserted }) => { - entry_count += count; - all_inserted.extend(inserted); - } - Err(err) => debug!("photo: {}", err), - } - } - - #[cfg(feature = "extractors-media")] - { - let extract_result = media::MediaExtractor.insert_info( - address, - connection, - store.clone(), - job_container.clone(), - context.clone(), - ); - - match extract_result { - Ok(ExtractorResult { count, inserted }) => { - entry_count += count; - all_inserted.extend(inserted); - } - Err(err) => debug!("media: {}", err), - } - } - - #[cfg(feature = "extractors-external")] - { - let extract_result = external::MonolithExtractor.insert_info( - address, - connection, - store.clone(), - job_container.clone(), - context.clone(), - ); - - match extract_result { - Ok(ExtractorResult { count, inserted }) => { - entry_count += count; - all_inserted.extend(inserted); - } - Err(err) => debug!("external monolith: {}", err), } - let extract_result = external::YtDlpExtractor.insert_info( - address, - connection, - store.clone(), - job_container.clone(), - context.clone(), - ); - - match extract_result { - Ok(ExtractorResult { count, inserted }) => { - entry_count += count; - all_inserted.extend(inserted); - } - Err(err) => debug!("external yt-dlp: {}", err), - } - } - - trace!( + trace!( "Extracting metadata for {address:?} - got {entry_count} entries, inserted {} new blobs.", all_inserted.len() ); - for inserted in all_inserted { - extract( - &inserted, - connection, - store.clone(), - job_container.clone(), - context.clone(), - ); + for inserted in all_inserted { + self.extract( + &inserted, + connection, + store.clone(), + job_container.clone(), + context.clone(), + ); + } + + entry_count } - entry_count + #[tracing::instrument(name = "Extract all metadata", skip_all)] + pub fn extract_all>( + &self, + db: D, + store: Arc>, + mut job_container: JobContainer, + context: OperationContext, + ) -> Result { + info!("Extracting metadata for all addresses."); + + let db = db.borrow(); + let job_handle = + job_container.add_job("EXTRACT_ALL", "Extracting additional metadata...")?; + + let all_addresses = db.connection()?.get_all_addresses()?; + let total = all_addresses.len() as f32; + let count = RwLock::new(0_usize); + let shared_job_handle = Arc::new(Mutex::new(job_handle)); + + let result = all_addresses + .par_iter() + .map(|address| { + let connection = db.connection()?; + let entry_count = self.extract( + address, + &connection, + store.clone(), + job_container.clone(), + context.clone(), + ); + + let mut cnt = count.write().unwrap(); + *cnt += 1; + + shared_job_handle + .lock() + .unwrap() + .update_progress(*cnt as f32 / total * 100.0)?; + + anyhow::Ok(entry_count) + }) + .flatten() + .sum(); + + info!( + "Done extracting metadata; processed {} addresses, added {} entries.", + all_addresses.len(), + result + ); + + Ok(result) + } } diff --git a/cli/src/main.rs b/cli/src/main.rs index 333e735..4e6b1ee 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -441,6 +441,8 @@ async fn main() -> Result<()> { debug!("No preview path exists, continuing..."); } } + + let extractor_manager = Arc::new(extractors::ExtractorManager::new()); let mut bind: SocketAddr = args.bind.parse().expect("Incorrect bind format."); @@ -458,6 +460,7 @@ async fn main() -> Result<()> { upend: upend.clone(), store: store.clone(), job_container: job_container.clone(), + extractor_manager: extractor_manager.clone(), preview_store, config: UpEndConfig { vault_name: Some(args.vault_name.unwrap_or_else(|| { @@ -506,6 +509,7 @@ async fn main() -> Result<()> { let upend = upend.clone(); let store = store.clone(); let job_container = job_container.clone(); + let extractor_manager = extractor_manager.clone(); block_background::<_, _, anyhow::Error>(move || { let connection: upend_db::UpEndConnection = upend.connection()?; @@ -531,7 +535,7 @@ async fn main() -> Result<()> { }, OperationContext::default(), ); - let _ = extractors::extract_all( + let _ = extractor_manager.extract_all( upend, store, job_container, @@ -549,6 +553,7 @@ async fn main() -> Result<()> { let upend = upend.clone(); let store = store.clone(); let job_container = job_container.clone(); + let extractor_manager = extractor_manager.clone(); block_background::<_, _, anyhow::Error>(move || { info!("Running periodic vault update."); let connection = upend.connection()?; @@ -559,10 +564,10 @@ async fn main() -> Result<()> { upend_db::stores::UpdateOptions { initial: false, tree_mode, - }, + }, OperationContext::default(), ); - let _ = extractors::extract_all( + let _ = extractor_manager.extract_all( upend, store, job_container, diff --git a/cli/src/routes.rs b/cli/src/routes.rs index 106c7df..065cf6d 100644 --- a/cli/src/routes.rs +++ b/cli/src/routes.rs @@ -1,7 +1,7 @@ use crate::common::build; use crate::common::REQWEST_CLIENT; use crate::config::UpEndConfig; -use crate::extractors; +use crate::extractors::ExtractorManager; use crate::previews::PreviewStore; use crate::util::exec::block_background; use actix_files::NamedFile; @@ -56,6 +56,7 @@ pub struct State { pub store: Arc>, pub config: UpEndConfig, pub job_container: jobs::JobContainer, + pub extractor_manager: Arc, pub preview_store: Arc, pub public: Arc>, } @@ -567,25 +568,28 @@ pub async fn put_object( PutInput::Address { entity: in_address } => { let address: Address = in_address.try_into().map_err(ErrorBadRequest)?; - let _address = address.clone(); - let _job_container = state.job_container.clone(); - let _store = state.store.clone(); - let _user = user.clone(); - block_background::<_, _, anyhow::Error>(move || { - let entry_count = extractors::extract( - &_address, - &connection, - _store, - _job_container, - OperationContext { - user: _user, - provenance: "API".to_string(), - }, - ); + { + let address = address.clone(); + let job_container = state.job_container.clone(); + let store = state.store.clone(); + let extractors = state.extractor_manager.clone(); + let user = user.clone(); + block_background::<_, _, anyhow::Error>(move || { + let entry_count = extractors.extract( + &address, + &connection, + store, + job_container, + OperationContext { + user, + provenance: "API".to_string(), + }, + ); - debug!("Added {entry_count} extracted entries for {_address:?}"); - Ok(()) - }); + debug!("Added {entry_count} extracted entries for {address:?}"); + Ok(()) + }); + } let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _user = user.clone(); @@ -703,25 +707,28 @@ pub async fn put_blob( }) .await; - let _address = address.clone(); - let _job_container = state.job_container.clone(); - let _store = state.store.clone(); - let connection = state.upend.connection().map_err(ErrorInternalServerError)?; - let _user = user.clone(); - block_background::<_, _, anyhow::Error>(move || { - let entry_count = extractors::extract( - &_address, - &connection, - _store, - _job_container, - OperationContext { - user: _user, - provenance: "API".to_string(), - }, - ); - debug!("Added {entry_count} extracted entries for {_address:?}"); - Ok(()) - }); + { + let address = address.clone(); + let job_container = state.job_container.clone(); + let store = state.store.clone(); + let extractors = state.extractor_manager.clone(); + let connection = state.upend.connection().map_err(ErrorInternalServerError)?; + let user = user.clone(); + block_background::<_, _, anyhow::Error>(move || { + let entry_count = extractors.extract( + &address, + &connection, + store, + job_container, + OperationContext { + user, + provenance: "API".to_string(), + }, + ); + debug!("Added {entry_count} extracted entries for {address:?}"); + Ok(()) + }); + } Ok(HttpResponse::Ok().json(address)) } else { Err(ErrorBadRequest("Multipart contains no fields.")) @@ -982,6 +989,7 @@ pub async fn api_refresh( let user = check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; + let extractors = state.extractor_manager.clone(); block_background::<_, _, anyhow::Error>(move || { let _ = state.store.update( @@ -1001,7 +1009,7 @@ pub async fn api_refresh( provenance: "API".to_string(), }, ); - let _ = crate::extractors::extract_all( + let _ = extractors.extract_all( state.upend.clone(), state.store.clone(), state.job_container.clone(), @@ -1221,7 +1229,7 @@ mod tests { use std::fs::File; use super::*; - + use anyhow::Result; use tempfile::TempDir; use upend_base::hash::UpMultihash; @@ -1415,6 +1423,7 @@ mod tests { secret: "secret".to_string(), }, job_container, + extractor_manager: Arc::new(ExtractorManager::new()), preview_store: Arc::new(PreviewStore::new("", store)), public: Arc::new(Mutex::new(true)), }