fix(backend): also extract blobs added as result of extracting (e.g. yt-dlp downloads)
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed

This commit is contained in:
Tomáš Mládek 2024-07-02 21:59:20 +02:00
parent e027e399e5
commit 127af810d2
8 changed files with 127 additions and 59 deletions

View file

@ -1,7 +1,7 @@
use std::io::Write;
use std::sync::Arc;
use super::Extractor;
use super::{Extractor, ExtractorGetResult};
use anyhow::{anyhow, Result};
use lazy_static::lazy_static;
use upend_base::{
@ -40,7 +40,7 @@ impl Extractor for ID3Extractor {
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> {
) -> Result<ExtractorGetResult> {
if let Address::Hash(hash) = address {
let files = store.retrieve(hash)?;
@ -154,12 +154,12 @@ impl Extractor for ID3Extractor {
let _ = job_handle.update_state(JobState::Done);
Ok(result)
Ok(result.into())
} else {
Err(anyhow!("Couldn't find file for {hash:?}!"))
}
} else {
Ok(vec![])
Ok(Default::default())
}
}

View file

@ -1,6 +1,6 @@
use std::sync::Arc;
use super::Extractor;
use super::{Extractor, ExtractorGetResult};
use anyhow::{anyhow, Result};
use lazy_static::lazy_static;
use upend_base::entry::Attribute;
@ -43,7 +43,7 @@ impl Extractor for ExifExtractor {
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> {
) -> Result<ExtractorGetResult> {
if let Address::Hash(hash) = address {
let files = store.retrieve(hash)?;
@ -137,12 +137,12 @@ impl Extractor for ExifExtractor {
let _ = job_handle.update_state(JobState::Done);
Ok(result)
Ok(result.into())
} else {
Err(anyhow!("Couldn't find file for {hash:?}!"))
}
} else {
Ok(vec![])
Ok(Default::default())
}
}

View file

@ -1,5 +1,5 @@
use crate::extractors::external::{process, ExternalCommand, ExternalCommandError};
use crate::extractors::Extractor;
use crate::extractors::{Extractor, ExtractorGetResult};
use anyhow::Result;
use regex::Regex;
use std::process::Command;
@ -22,7 +22,7 @@ impl Extractor for MonolithExtractor {
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> {
) -> Result<ExtractorGetResult> {
if let Address::Url(url) = address {
debug!("Archiving {} with `monolith`", url.as_str());
let mut job_handle =
@ -47,26 +47,29 @@ impl Extractor for MonolithExtractor {
)?;
debug!("Stored {} as {:?}", url.as_str(), stored);
job_handle.update_progress(100.0)?;
Ok(vec![
Entry {
entity: address.clone(),
attribute: "WM_ARCHIVED".parse().unwrap(),
value: Address::Hash(stored).into(),
provenance: context.provenance.clone() + "EXTRACTOR monolith",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(),
},
Entry {
entity: Address::Attribute("WM_ARCHIVED".parse().unwrap()),
attribute: ATTR_LABEL.parse().unwrap(),
value: "Webpage Archived (monolith)".into(),
provenance: context.provenance.clone() + "EXTRACTOR monolith",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(),
},
])
Ok(ExtractorGetResult {
entries: vec![
Entry {
entity: address.clone(),
attribute: "WM_ARCHIVED".parse().unwrap(),
value: Address::Hash(stored.clone()).into(),
provenance: context.provenance.clone() + "EXTRACTOR monolith",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(),
},
Entry {
entity: Address::Attribute("WM_ARCHIVED".parse().unwrap()),
attribute: ATTR_LABEL.parse().unwrap(),
value: "Webpage Archived (monolith)".into(),
provenance: context.provenance.clone() + "EXTRACTOR monolith",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(),
},
],
stored: vec![Address::Hash(stored)],
})
} else {
Ok(vec![])
Ok(Default::default())
}
}

View file

@ -1,5 +1,5 @@
use crate::extractors::external::{process, ExternalCommand, ExternalCommandError};
use crate::extractors::Extractor;
use crate::extractors::{Extractor, ExtractorGetResult};
use anyhow::Result;
use regex::Regex;
use std::io::{BufReader, Read};
@ -23,7 +23,7 @@ impl Extractor for YtDlpExtractor {
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> {
) -> Result<ExtractorGetResult> {
if let Address::Url(url) = address {
debug!("Getting {} with `yt-dlp`", url.as_str());
let mut job_handle =
@ -106,7 +106,7 @@ impl Extractor for YtDlpExtractor {
));
}
let mut result = vec![];
let mut entries = vec![];
let files = std::fs::read_dir(temp_dir.path())?.collect::<Result<Vec<_>, _>>()?;
@ -133,7 +133,7 @@ impl Extractor for YtDlpExtractor {
for (key, label) in KNOWN_METADATA {
if let Some(value) = json_data.get(key) {
let attribute: Attribute = format!("YTDL_META_{}", key).parse().unwrap();
result.extend([
entries.extend([
Entry {
entity: address.clone(),
attribute: attribute.clone(),
@ -178,11 +178,11 @@ impl Extractor for YtDlpExtractor {
context.clone(),
)?;
result.extend([
entries.extend([
Entry {
entity: address.clone(),
attribute: "YTDLD".parse().unwrap(),
value: Address::Hash(stored).into(),
value: Address::Hash(stored.clone()).into(),
provenance: context.provenance.clone() + "EXTRACTOR yt-dlp",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(),
@ -199,9 +199,12 @@ impl Extractor for YtDlpExtractor {
job_handle.update_progress(100.0)?;
Ok(result)
Ok(ExtractorGetResult {
entries,
stored: vec![Address::Hash(stored)],
})
} else {
Ok(vec![])
Ok(Default::default())
}
}

View file

@ -1,6 +1,6 @@
use std::{process::Command, sync::Arc};
use super::Extractor;
use super::{Extractor, ExtractorGetResult};
use anyhow::{anyhow, Result};
use lazy_static::lazy_static;
use tracing::{debug, trace};
@ -50,7 +50,7 @@ impl Extractor for MediaExtractor {
store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> {
) -> Result<ExtractorGetResult> {
if let Address::Hash(hash) = address {
let files = store.retrieve(hash)?;
@ -118,12 +118,12 @@ impl Extractor for MediaExtractor {
];
let _ = job_handle.update_state(JobState::Done);
Ok(result)
Ok(result.into())
} else {
Err(anyhow!("Couldn't find file for {hash:?}!"))
}
} else {
Ok(vec![])
Ok(Default::default())
}
}

View file

@ -33,7 +33,7 @@ pub trait Extractor {
store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>>;
) -> Result<ExtractorGetResult>;
fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result<bool> {
Ok(true)
@ -46,9 +46,12 @@ pub trait Extractor {
store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer,
context: OperationContext,
) -> Result<usize> {
) -> Result<ExtractorResult> {
if self.is_needed(address, connection)? {
let entries = self.get(address, connection, store, job_container, context)?;
let ExtractorGetResult {
entries,
stored: inserted,
} = self.get(address, connection, store, job_container, context)?;
trace!("For \"{address}\", got: {entries:?}");
connection.transaction(|| {
@ -56,10 +59,36 @@ pub trait Extractor {
for entry in entries {
connection.insert_entry(entry)?;
}
Ok(len)
Ok(ExtractorResult {
count: len,
inserted,
})
})
} else {
Ok(0)
Ok(ExtractorResult {
count: 0,
inserted: vec![],
})
}
}
}
#[derive(Default)]
pub struct ExtractorGetResult {
pub entries: Vec<Entry>,
pub stored: Vec<Address>,
}
pub struct ExtractorResult {
pub count: usize,
pub inserted: Vec<Address>,
}
impl From<Vec<Entry>> for ExtractorGetResult {
fn from(entries: Vec<Entry>) -> Self {
ExtractorGetResult {
entries,
stored: vec![],
}
}
}
@ -124,6 +153,7 @@ pub fn extract(
context: OperationContext,
) -> usize {
let mut entry_count = 0;
let mut all_inserted = vec![];
trace!("Extracting metadata for {address:?}");
#[cfg(feature = "extractors-web")]
@ -137,7 +167,10 @@ pub fn extract(
);
match extract_result {
Ok(count) => entry_count += count,
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("web: {}", err),
}
}
@ -153,7 +186,10 @@ pub fn extract(
);
match extract_result {
Ok(count) => entry_count += count,
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("audio: {}", err),
}
}
@ -169,7 +205,10 @@ pub fn extract(
);
match extract_result {
Ok(count) => entry_count += count,
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("photo: {}", err),
}
}
@ -185,7 +224,10 @@ pub fn extract(
);
match extract_result {
Ok(count) => entry_count += count,
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("media: {}", err),
}
}
@ -201,7 +243,10 @@ pub fn extract(
);
match extract_result {
Ok(count) => entry_count += count,
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("external monolith: {}", err),
}
@ -209,17 +254,33 @@ pub fn extract(
address,
connection,
store.clone(),
job_container,
job_container.clone(),
context.clone(),
);
match extract_result {
Ok(count) => entry_count += count,
Ok(ExtractorResult { count, inserted }) => {
entry_count += count;
all_inserted.extend(inserted);
}
Err(err) => debug!("external yt-dlp: {}", err),
}
}
trace!("Extracting metadata for {address:?} - got {entry_count} entries.");
trace!(
"Extracting metadata for {address:?} - got {entry_count} entries, inserted {} new blobs.",
all_inserted.len()
);
for inserted in all_inserted {
extract(
&inserted,
connection,
store.clone(),
job_container.clone(),
context.clone(),
);
}
entry_count
}

View file

@ -1,6 +1,6 @@
use std::sync::Arc;
use super::Extractor;
use super::{Extractor, ExtractorGetResult};
use crate::common::REQWEST_CLIENT;
use anyhow::anyhow;
use anyhow::Result;
@ -27,7 +27,7 @@ impl Extractor for WebExtractor {
_store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> {
) -> Result<ExtractorGetResult> {
if let Address::Url(url) = address {
let mut job_handle =
job_container.add_job(None, &format!("Getting info about {url:?}"))?;
@ -137,11 +137,12 @@ impl Extractor for WebExtractor {
}
vec![e]
})
.collect());
.collect::<Vec<Entry>>()
.into());
}
Err(anyhow!("Failed for unknown reason."))
} else {
Ok(vec![])
Ok(Default::default())
}
}

View file

@ -563,7 +563,7 @@ async fn main() -> Result<()> {
let upend = upend.clone();
let store = store.clone();
let job_container = job_container.clone();
let _ = block_background::<_, _, anyhow::Error>(move || {
block_background::<_, _, anyhow::Error>(move || {
info!("Running periodic vault update.");
let connection = upend.connection()?;
let tree_mode = connection.get_vault_options()?.blob_mode.unwrap_or_default();