feat: add external blobs via url at /api/blob
parent
f9cfca8fcf
commit
cdb0267ee5
|
@ -3248,6 +3248,7 @@ dependencies = [
|
|||
"actix-web",
|
||||
"actix_derive",
|
||||
"anyhow",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"clap",
|
||||
"diesel",
|
||||
|
|
|
@ -82,6 +82,8 @@ shadow-rs = "0.17"
|
|||
reqwest = { version = "0.11.16", features = ["blocking", "json"] }
|
||||
url = "2"
|
||||
|
||||
bytes = "1.4.0"
|
||||
|
||||
[build-dependencies]
|
||||
shadow-rs = "0.17"
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ use actix_web::{
|
|||
};
|
||||
use anyhow::{anyhow, Result};
|
||||
use futures::channel::oneshot;
|
||||
use futures_util::TryStreamExt;
|
||||
use futures_util::{StreamExt, TryStreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::collections::HashMap;
|
||||
|
@ -520,12 +520,36 @@ pub async fn put_blob(
|
|||
check_auth(&req, &state)?;
|
||||
|
||||
if let Some(mut field) = payload.try_next().await? {
|
||||
let filename = field.content_disposition().get_filename().map(String::from);
|
||||
|
||||
let mut file = NamedTempFile::new()?;
|
||||
while let Some(chunk) = field.try_next().await? {
|
||||
file = web::block(move || file.write_all(&chunk).map(|_| file)).await??;
|
||||
}
|
||||
|
||||
let (filename, fallback_label) = if field.name() == "@url" {
|
||||
let mut url_buffer = String::new();
|
||||
|
||||
let mut stream = field.into_stream();
|
||||
while let Some(chunk_result) = stream.next().await {
|
||||
let chunk = chunk_result?;
|
||||
let chunk_str = String::from_utf8_lossy(&chunk);
|
||||
url_buffer.push_str(&chunk_str);
|
||||
}
|
||||
|
||||
let url = Url::parse(&url_buffer).map_err(ErrorBadRequest)?;
|
||||
let (bytes, filename) = fetch_external(url.clone()).await?;
|
||||
file.write_all(&bytes)?;
|
||||
|
||||
(
|
||||
filename.or_else(|| url.path_segments().and_then(|s| s.last().map(String::from))),
|
||||
url.to_string(),
|
||||
)
|
||||
} else {
|
||||
while let Some(chunk) = field.try_next().await? {
|
||||
file = web::block(move || file.write_all(&chunk).map(|_| file)).await??;
|
||||
}
|
||||
|
||||
(
|
||||
field.content_disposition().get_filename().map(String::from),
|
||||
String::from(field.name()),
|
||||
)
|
||||
};
|
||||
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
let _store = state.store.clone();
|
||||
|
@ -538,14 +562,18 @@ pub async fn put_blob(
|
|||
|
||||
let address = Address::Hash(hash);
|
||||
|
||||
if let Some(ref filename) = filename {
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
let _address = address.clone();
|
||||
let _filename = filename.clone();
|
||||
let _ =
|
||||
web::block(move || upend_insert_val!(&connection, _address, LABEL_ATTR, _filename))
|
||||
.await;
|
||||
}
|
||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||
let _address = address.clone();
|
||||
let _filename = filename.clone();
|
||||
let _ = web::block(move || {
|
||||
upend_insert_val!(
|
||||
&connection,
|
||||
_address,
|
||||
LABEL_ATTR,
|
||||
_filename.unwrap_or(fallback_label)
|
||||
)
|
||||
})
|
||||
.await;
|
||||
|
||||
let _address = address.clone();
|
||||
let _job_container = state.job_container.clone();
|
||||
|
@ -657,40 +685,7 @@ pub async fn get_address(
|
|||
)
|
||||
} else if let Some(url) = query.get("url_content") {
|
||||
let url = Url::parse(url).map_err(ErrorBadRequest)?;
|
||||
|
||||
const MAX_SIZE: usize = 128_000_000;
|
||||
|
||||
let client = reqwest::blocking::Client::builder()
|
||||
.user_agent(APP_USER_AGENT.as_str())
|
||||
.build()
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
let response = client
|
||||
.get(url)
|
||||
.send()
|
||||
.map_err(ErrorInternalServerError)?
|
||||
.error_for_status()
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
if let Some(content_length) = response.headers().get(reqwest::header::CONTENT_LENGTH) {
|
||||
if let Some(content_length) = content_length
|
||||
.to_str()
|
||||
.ok()
|
||||
.and_then(|cl| cl.parse::<usize>().ok())
|
||||
{
|
||||
if content_length > MAX_SIZE {
|
||||
return Err(ErrorBadRequest(format!(
|
||||
"Error: The response is too large ({content_length} > {MAX_SIZE})."
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(ErrorUnprocessableEntity(
|
||||
"Error: Could not ascertain response size.",
|
||||
));
|
||||
}
|
||||
|
||||
let bytes = response.bytes().map_err(ErrorInternalServerError)?;
|
||||
let (bytes, _) = fetch_external(url).await?;
|
||||
let hash_result = hash(&bytes);
|
||||
(Address::Hash(hash_result), false)
|
||||
} else {
|
||||
|
@ -860,6 +855,56 @@ pub async fn get_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
|||
})))
|
||||
}
|
||||
|
||||
const MAX_EXTERNAL_SIZE: usize = 128_000_000;
|
||||
|
||||
#[tracing::instrument(skip(url), fields(url=%url))]
|
||||
async fn fetch_external(url: Url) -> Result<(bytes::Bytes, Option<String>), actix_web::Error> {
|
||||
let client = reqwest::Client::builder()
|
||||
.user_agent(APP_USER_AGENT.as_str())
|
||||
.build()
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
debug!("Fetching...");
|
||||
|
||||
let response = client
|
||||
.get(url)
|
||||
.send()
|
||||
.await
|
||||
.map_err(ErrorInternalServerError)?
|
||||
.error_for_status()
|
||||
.map_err(ErrorInternalServerError)?;
|
||||
|
||||
if let Some(content_length) = response.headers().get(reqwest::header::CONTENT_LENGTH) {
|
||||
if let Some(content_length) = content_length
|
||||
.to_str()
|
||||
.ok()
|
||||
.and_then(|cl| cl.parse::<usize>().ok())
|
||||
{
|
||||
if content_length > MAX_EXTERNAL_SIZE {
|
||||
return Err(ErrorBadRequest(format!(
|
||||
"Error: The response is too large ({content_length} > {MAX_EXTERNAL_SIZE})."
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return Err(ErrorUnprocessableEntity(
|
||||
"Error: Could not ascertain response size.",
|
||||
));
|
||||
}
|
||||
|
||||
let filename: Option<String> = response
|
||||
.headers()
|
||||
.get(http::header::CONTENT_DISPOSITION)
|
||||
.and_then(|hv| http::header::ContentDisposition::from_raw(hv).ok())
|
||||
.and_then(|cd| cd.get_filename().map(String::from));
|
||||
debug!("Got filename: {filename:?}");
|
||||
|
||||
let bytes = response.bytes().await.map_err(ErrorInternalServerError)?;
|
||||
debug!("Got {} bytes.", bytes.len());
|
||||
|
||||
Ok((bytes, filename))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
Loading…
Reference in New Issue