use crate::common::build; use crate::common::REQWEST_CLIENT; use crate::config::UpEndConfig; use crate::extractors; use crate::previews::PreviewStore; use crate::util::exec::block_background; use actix_files::NamedFile; use actix_multipart::Multipart; use actix_web::error::{ ErrorBadRequest, ErrorInternalServerError, ErrorNotFound, ErrorUnauthorized, }; use actix_web::http::header::ContentDisposition; use actix_web::{ delete, error, get, head, post, put, routes, web, Either, Error, HttpResponse, ResponseError, }; use actix_web::{http, Responder}; use actix_web::{ http::header::{CacheControl, CacheDirective, DispositionType}, HttpRequest, }; use anyhow::Result; use futures::channel::oneshot; use futures_util::{StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use serde_json::json; use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::io::Write; use std::sync::{Arc, Mutex}; use std::time::{SystemTime, UNIX_EPOCH}; use tempfile::NamedTempFile; use tracing::{debug, info, trace}; use upend_base::addressing::AddressComponents; use upend_base::addressing::{Address, Addressable}; use upend_base::constants::{ATTR_ADDED, ATTR_LABEL}; use upend_base::entry::{Entry, EntryValue, InvariantEntry}; use upend_base::hash::{b58_decode, b58_encode, sha256hash}; use upend_base::lang::Query; use upend_db::hierarchies::{list_roots, resolve_path, UHierPath}; use upend_db::jobs; use upend_db::stores::UpdateOptions; use upend_db::stores::{Blob, UpStore}; use upend_db::BlobMode; use upend_db::OperationContext; use upend_db::UpEndDatabase; use upend_db::VaultOptions; use url::Url; #[cfg(feature = "desktop")] use is_executable::IsExecutable; use upend_base::error::UpEndError; #[derive(Clone)] pub struct State { pub upend: Arc, pub store: Arc>, pub config: UpEndConfig, pub job_container: jobs::JobContainer, pub preview_store: Option>, pub preview_thread_pool: Option>, pub public: Arc>, } #[derive(Debug, Serialize, Deserialize)] struct JwtClaims { user: String, exp: usize, } #[derive(Deserialize)] pub struct UserPayload { username: String, password: String, } #[derive(Deserialize)] pub struct LoginQueryParams { via: Option, } #[post("/api/auth/login")] pub async fn login( state: web::Data, payload: web::Json, query: web::Query, ) -> Result { let conn = state.upend.connection().map_err(ErrorInternalServerError)?; match conn.authenticate_user(&payload.username, &payload.password) { Ok(()) => { let token = create_token(&payload.username, &state.config.secret)?; match query.via.as_deref() { Some("cookie") => Ok(HttpResponse::NoContent() .append_header((http::header::SET_COOKIE, format!("key={}; Path=/", token))) .finish()), _ => Ok(HttpResponse::Ok().json(json!({ "key": token }))), } } Err(_) => Err(ErrorUnauthorized("Invalid credentials.")), } } #[post("/api/auth/logout")] pub async fn logout() -> Result { Ok(HttpResponse::NoContent() .append_header((http::header::SET_COOKIE, "key=; Path=/; Max-Age=0")) .finish()) } #[post("/api/auth/register")] pub async fn register( req: HttpRequest, state: web::Data, payload: web::Json, ) -> Result { check_auth(&req, &state)?; let conn = state.upend.connection().map_err(ErrorInternalServerError)?; match conn.set_user(&payload.username, &payload.password) { Ok(_) => { *state.public.lock().unwrap() = false; let token = create_token(&payload.username, &state.config.secret)?; Ok(HttpResponse::Ok().json(json!({ "token": token }))) } Err(e) => Err(ErrorInternalServerError(e)), } } #[get("/api/auth/whoami")] pub async fn whoami(req: HttpRequest, state: web::Data) -> Result { let user = check_auth(&req, &state)?; Ok(HttpResponse::Ok().json(json!({ "user": user }))) } fn check_auth(req: &HttpRequest, state: &State) -> Result, actix_web::Error> { if *state.public.lock().unwrap() { return Ok(None); } let header_key = req.headers().get("Authorization").and_then(|value| { value.to_str().ok().and_then(|value| { if value.starts_with("Bearer ") { Some(value.trim_start_matches("Bearer ").to_string()) } else { None } }) }); let cookie_key = req.cookies().ok().and_then(|cookies| { cookies .iter() .find(|c| c.name() == "key") .map(|cookie| cookie.value().to_string()) }); let query_key = req.query_string().split('&').find_map(|pair| { let parts = pair.split('=').collect::>(); match parts[..] { ["auth_key", value] => Some(value.to_string()), _ => None, } }); let key = header_key.or(cookie_key).or(query_key); if let Some(key) = key { let token = jsonwebtoken::decode::( &key, &jsonwebtoken::DecodingKey::from_secret(state.config.secret.as_ref()), &jsonwebtoken::Validation::default(), ); match token { Ok(token) => Ok(Some(token.claims.user)), Err(err) => Err(ErrorUnauthorized(format!("Invalid token: {err:?}"))), } } else { Err(ErrorUnauthorized("Authorization required.")) } } fn create_token(username: &str, secret: &str) -> Result { let claims = JwtClaims { user: username.to_string(), exp: (SystemTime::now() .duration_since(UNIX_EPOCH) .map_err(ErrorInternalServerError)? .as_secs() + 7 * 24 * 60 * 60) as usize, }; jsonwebtoken::encode( &jsonwebtoken::Header::default(), &claims, &jsonwebtoken::EncodingKey::from_secret(secret.as_ref()), ) .map_err(ErrorInternalServerError) } #[derive(Deserialize)] pub struct RawRequest { native: Option, inline: Option, } #[get("/api/raw/{hash}")] pub async fn get_raw( req: HttpRequest, state: web::Data, web::Query(query): web::Query, hash: web::Path, ) -> Result { check_auth(&req, &state)?; let address = Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?) .map_err(ErrorInternalServerError)?; if let Address::Hash(hash) = address { let hash = Arc::new(hash); let _hash = hash.clone(); let _store = state.store.clone(); let blobs = web::block(move || _store.retrieve(_hash.as_ref())) .await? .map_err(ErrorInternalServerError)?; if let Some(blob) = blobs.first() { let file_path = blob.get_file_path(); if query.native.is_none() { return Ok(Either::Left( NamedFile::open(file_path)? .set_content_disposition(ContentDisposition { disposition: if query.inline.is_some() { DispositionType::Inline } else { DispositionType::Attachment }, parameters: vec![], }) .customize() .insert_header(( http::header::CACHE_CONTROL, CacheControl(vec![ CacheDirective::MaxAge(2678400), CacheDirective::Extension("immutable".into(), None), ]), )), )); } else if state.config.desktop_enabled { #[cfg(feature = "desktop")] { info!("Opening {:?}...", file_path); let mut response = HttpResponse::NoContent(); let path = if !file_path.is_executable() || state.config.trust_executables { file_path } else { response .append_header(( http::header::WARNING, "199 - Opening parent directory due to file being executable.", )) .append_header(( http::header::ACCESS_CONTROL_EXPOSE_HEADERS, http::header::WARNING.to_string(), )); file_path.parent().ok_or_else(|| { ErrorInternalServerError("No parent to open as fallback.") })? }; opener::open(path).map_err(error::ErrorServiceUnavailable)?; return Ok(Either::Right(response.finish())); } #[cfg(not(feature = "desktop"))] unreachable!() } else { return Err(error::ErrorNotImplemented("Desktop features not enabled.")); } } let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _hash = hash.clone(); let entry = web::block(move || connection.retrieve_entry(_hash.as_ref())) .await? .map_err(ErrorInternalServerError)?; if let Some(entry) = entry { return Ok(Either::Right(HttpResponse::Ok().json(entry))); } Err(error::ErrorNotFound("NOT FOUND")) } else { Err(ErrorBadRequest( "Address does not refer to a rawable object.", )) } } #[head("/api/raw/{hash}")] pub async fn head_raw( req: HttpRequest, state: web::Data, hash: web::Path, ) -> Result { check_auth(&req, &state)?; let address = Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?) .map_err(ErrorInternalServerError)?; if let Address::Hash(hash) = address { let hash = Arc::new(hash); let _hash = hash.clone(); let _store = state.store.clone(); let blobs = web::block(move || _store.retrieve(_hash.as_ref())) .await? .map_err(ErrorInternalServerError)?; if let Some(blob) = blobs.first() { let file_path = blob.get_file_path(); let mut response = HttpResponse::NoContent(); if let Some(mime_type) = tree_magic_mini::from_filepath(file_path) { if let Ok(mime) = mime_type.parse::() { return Ok(response.content_type(mime).finish()); } } return Ok(response.into()); } Err(error::ErrorNotFound("NOT FOUND")) } else { Err(ErrorBadRequest( "Address does not refer to a rawable object.", )) } } #[get("/api/thumb/{hash}")] pub async fn get_thumbnail( req: HttpRequest, state: web::Data, hash: web::Path, web::Query(query): web::Query>, ) -> Result, Error> { check_auth(&req, &state)?; #[cfg(feature = "previews")] if let Some(preview_store) = &state.preview_store { let hash = hash.into_inner(); let address = Address::decode(&b58_decode(&hash).map_err(ErrorInternalServerError)?) .map_err(ErrorInternalServerError)?; if let Address::Hash(address_hash) = address { let preview_store = preview_store.clone(); let (tx, rx) = oneshot::channel(); let _job_container = state.job_container.clone(); state.preview_thread_pool.as_ref().unwrap().spawn(move || { let result = preview_store.get(address_hash, query, _job_container); tx.send(result).unwrap(); }); let preview_result = rx.await.unwrap().map_err(ErrorInternalServerError)?; if let Some(preview_path) = preview_result { let mut file = NamedFile::open(&preview_path)?.disable_content_disposition(); if let Some(mime_type) = tree_magic_mini::from_filepath(&preview_path) { if let Ok(mime) = mime_type.parse() { file = file.set_content_type(mime); } } return Ok(Either::Left(file)); } else { return Ok(Either::Right( HttpResponse::SeeOther() .append_header((http::header::LOCATION, format!("../../api/raw/{hash}"))) .finish(), )); } } else { return Err(ErrorBadRequest( "Address does not refer to a previewable object.", )); } } Err(error::ErrorNotImplemented("Previews not enabled.")) } #[post("/api/query")] pub async fn get_query( req: HttpRequest, state: web::Data, query: String, ) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let in_query: Query = query.parse().map_err(ErrorBadRequest)?; let entries = web::block(move || connection.query(in_query)) .await .map_err(ErrorInternalServerError)? .map_err(ErrorInternalServerError)?; let mut result: HashMap = HashMap::new(); for entry in entries { result.insert( b58_encode( entry .address() .map_err(ErrorInternalServerError)? .encode() .map_err(ErrorInternalServerError)?, ), entry, ); } Ok(HttpResponse::Ok().json(&result)) } trait EntriesAsHash { fn as_hash(&self) -> Result>; } impl EntriesAsHash for Vec { fn as_hash(&self) -> Result> { let mut result: HashMap = HashMap::new(); for entry in self { result.insert(b58_encode(entry.address()?.encode()?), entry); } Ok(result) } } #[get("/api/obj/{address_str}")] pub async fn get_object( req: HttpRequest, state: web::Data, address: web::Path
, ) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let address = address.into_inner(); let _address = address.clone(); let result: Vec = web::block(move || connection.retrieve_object(&_address)) .await? .map_err(ErrorInternalServerError)?; trace!("{:?}", result); Ok(HttpResponse::Ok().json(json!({ "entity": address.as_components(), "entries": result.as_hash().map_err(ErrorInternalServerError)? }))) } #[derive(Debug, Clone, Deserialize)] #[serde(untagged)] pub enum InAddress { Address(String), Components { t: String, c: Option }, } impl TryInto
for InAddress { type Error = anyhow::Error; fn try_into(self) -> Result { Ok(match self { InAddress::Address(address) => address.parse()?, InAddress::Components { t, c } => Address::from_components(AddressComponents { t, c })?, }) } } #[derive(Debug, Clone, Deserialize)] pub struct InEntry { pub entity: Option, pub attribute: String, pub value: EntryValue, } #[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, Deserialize)] #[serde(untagged, deny_unknown_fields)] pub enum PutInput { Entry(InEntry), EntryList(Vec), Address { entity: InAddress }, } #[derive(Deserialize)] pub struct UpdateQuery { provenance: Option, } #[put("/api/obj")] pub async fn put_object( req: HttpRequest, state: web::Data, payload: web::Json, web::Query(query): web::Query, ) -> Result { let user = check_auth(&req, &state)?; let (entry_address, entity_address) = { let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let in_entry = payload.into_inner(); debug!("PUTting {in_entry:?}"); let provenance = query.provenance.clone(); let _user = user.clone(); let process_inentry = move |in_entry: InEntry| -> Result { if let Some(entity) = in_entry.entity { Ok(Entry { entity: entity.try_into()?, attribute: in_entry.attribute.parse()?, value: in_entry.value, provenance: (match &provenance { Some(s) => format!("API {}", s), None => "API".to_string(), }) .trim() .to_string(), timestamp: chrono::Utc::now().naive_utc(), user: _user.clone(), }) } else { Ok(Entry::try_from(&InvariantEntry { attribute: in_entry.attribute.parse()?, value: in_entry.value, })?) } }; match in_entry { PutInput::Entry(in_entry) => { let entry = process_inentry(in_entry).map_err(ErrorBadRequest)?; web::block::<_, Result<_>>(move || { Ok(( Some(connection.insert_entry(entry.clone())?), Some(entry.entity), )) }) .await? .map_err(ErrorInternalServerError)? } PutInput::EntryList(entries) => { web::block(move || { connection.transaction::<_, anyhow::Error, _>(|| { for entry in entries { connection.insert_entry(process_inentry(entry)?)?; } Ok(()) }) }) .await? .map_err(ErrorBadRequest)?; (None, None) } PutInput::Address { entity: in_address } => { let address: Address = in_address.try_into().map_err(ErrorBadRequest)?; let _address = address.clone(); let _job_container = state.job_container.clone(); let _store = state.store.clone(); let _user = user.clone(); block_background::<_, _, anyhow::Error>(move || { let entry_count = extractors::extract( &_address, &connection, _store, _job_container, OperationContext { user: _user, provenance: "API".to_string(), }, ); debug!("Added {entry_count} extracted entries for {_address:?}"); Ok(()) }); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _user = user.clone(); web::block(move || { connection.transaction::<_, anyhow::Error, _>(|| { if connection.retrieve_object(&address)?.is_empty() { connection.insert_entry(Entry { entity: address.clone(), attribute: ATTR_ADDED.parse().unwrap(), value: EntryValue::Number( SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() as f64, ), provenance: (match &query.provenance { Some(s) => format!("API {}", s), None => "API".to_string(), }) .trim() .to_string(), user: _user, timestamp: chrono::Utc::now().naive_utc(), })?; } Ok((None, Some(address))) }) }) .await? .map_err(ErrorInternalServerError)? } } }; Ok(HttpResponse::Ok().json([entry_address, entity_address])) } #[put("/api/blob")] pub async fn put_blob( req: HttpRequest, state: web::Data, mut payload: Multipart, ) -> Result { let user = check_auth(&req, &state)?; if let Some(mut field) = payload.try_next().await? { let mut file = NamedTempFile::new()?; let (filename, fallback_label) = if field.name() == "@url" { let mut url_buffer = String::new(); let mut stream = field.into_stream(); while let Some(chunk_result) = stream.next().await { let chunk = chunk_result?; let chunk_str = String::from_utf8_lossy(&chunk); url_buffer.push_str(&chunk_str); } let url = Url::parse(&url_buffer).map_err(ErrorBadRequest)?; let _url = url.clone(); let (bytes, filename) = web::block(move || fetch_external(_url)).await??; file.write_all(&bytes)?; ( filename.or_else(|| url.path_segments().and_then(|s| s.last().map(String::from))), url.to_string(), ) } else { while let Some(chunk) = field.try_next().await? { file = web::block(move || file.write_all(&chunk).map(|_| file)).await??; } ( field.content_disposition().get_filename().map(String::from), String::from(field.name()), ) }; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _store = state.store.clone(); let _filename = filename.clone(); let _user = user.clone(); let hash = web::block(move || { let options = connection.get_vault_options()?; _store .store( &connection, Blob::from_filepath(file.path()), _filename, options.blob_mode, OperationContext { user: _user, provenance: "API".to_string(), }, ) .map_err(anyhow::Error::from) }) .await? .map_err(ErrorInternalServerError)?; let address = Address::Hash(hash); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _address = address.clone(); let _filename = filename.clone(); let _ = web::block(move || { upend_insert_val!( &connection, _address, ATTR_LABEL, _filename.unwrap_or(fallback_label) ) }) .await; let _address = address.clone(); let _job_container = state.job_container.clone(); let _store = state.store.clone(); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _user = user.clone(); block_background::<_, _, anyhow::Error>(move || { let entry_count = extractors::extract( &_address, &connection, _store, _job_container, OperationContext { user: _user, provenance: "API".to_string(), }, ); debug!("Added {entry_count} extracted entries for {_address:?}"); Ok(()) }); Ok(HttpResponse::Ok().json(address)) } else { Err(ErrorBadRequest("Multipart contains no fields.")) } } #[put("/api/obj/{address}/{attribute}")] pub async fn put_object_attribute( req: HttpRequest, state: web::Data, path: web::Path<(Address, String)>, value: web::Json, web::Query(query): web::Query, ) -> Result { let user = check_auth(&req, &state)?; let (address, attribute) = path.into_inner(); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let new_address = web::block(move || { connection.transaction::<_, anyhow::Error, _>(|| { let existing_attr_entries = connection.query(format!(r#"(matches @{address} "{attribute}" ?)"#).parse()?)?; for eae in existing_attr_entries { let _ = connection.remove_object(eae.address()?)?; } let new_attr_entry = Entry { entity: address, attribute: attribute.parse()?, value: value.into_inner(), provenance: (match &query.provenance { Some(s) => format!("API {}", s), None => "API".to_string(), }) .trim() .to_string(), user: user.clone(), timestamp: chrono::Utc::now().naive_utc(), }; connection.insert_entry(new_attr_entry) }) }) .await? .map_err(ErrorInternalServerError)?; Ok(HttpResponse::Ok().json(new_address)) } #[delete("/api/obj/{address_str}")] pub async fn delete_object( req: HttpRequest, state: web::Data, address: web::Path
, ) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _ = web::block(move || connection.remove_object(address.into_inner())) .await .map_err(ErrorInternalServerError)?; Ok(HttpResponse::Ok().finish()) } // #[post("api/obj/{address_str}")] // pub async fn update_attribute( // state: web::Data, // address_str: web::Path, // mut payload: web::Payload, // ) -> Result { // let body = load_body(&mut payload) // .await // .map_err(error::ErrorBadRequest)?; // let entry_value = serde_json::from_slice::(&body).map_err(ErrorBadRequest)?; // let connection = state.upend.connection().map_err(ErrorInternalServerError)?; // connection // .transaction::<_, anyhow::Error, _>(|| { // let address = Address::decode(&decode(address_str.into_inner())?)?; // let _ = connection.remove_object(address)?; // Ok(()) // }) // .map_err(ErrorInternalServerError)?; // Ok(HttpResponse::Ok().finish()) // } #[get("/api/address")] pub async fn get_address( web::Query(query): web::Query>, ) -> Result { let (address, immutable) = if let Some(attribute) = query.get("attribute") { ( Address::Attribute( attribute .parse() .map_err(|e: UpEndError| ErrorBadRequest(e.to_string()))?, ), true, ) } else if let Some(url) = query.get("url") { ( Address::Url(Url::parse(url).map_err(ErrorBadRequest)?), true, ) } else if let Some(url) = query.get("url_content") { let url = Url::parse(url).map_err(ErrorBadRequest)?; let (bytes, _) = web::block(|| fetch_external(url)).await??; let hash_result = sha256hash(&bytes).map_err(ErrorInternalServerError)?; (Address::Hash(hash_result), false) } else if let Some(type_str) = query.get("type") { match type_str.as_str() { "Hash" => (upend_base::constants::TYPE_HASH_ADDRESS.clone(), true), "Uuid" => (upend_base::constants::TYPE_UUID_ADDRESS.clone(), true), "Attribute" => (upend_base::constants::TYPE_ATTRIBUTE_ADDRESS.clone(), true), "Url" => (upend_base::constants::TYPE_URL_ADDRESS.clone(), true), _ => return Err(ErrorBadRequest(format!("Unknown type: {type_str}"))), } } else { return Err(ErrorBadRequest( "Specify one of: `attribute`, `url`, `url_content`, `type`.", )); }; let mut response = HttpResponse::Ok(); if immutable { response.append_header(( http::header::CACHE_CONTROL, CacheControl(vec![ CacheDirective::MaxAge(2678400), CacheDirective::Extension("immutable".into(), None), ]), )); } Ok(response.json(format!("{}", address))) } #[get("/api/all/attributes")] pub async fn get_all_attributes( req: HttpRequest, state: web::Data, ) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let attributes = web::block(move || connection.get_all_attributes()) .await? .map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let result: serde_json::Value = attributes .into_iter() .map(|attribute| { json!({ "name": attribute, "labels": connection .retrieve_object(&Address::Attribute(attribute)) .unwrap_or_else(|_| vec![]) .into_iter() .filter_map(|e| { if e.attribute == ATTR_LABEL { if let EntryValue::String(label) = e.value { Some(label) } else { None } } else { None } }) .collect::>(), }) }) .collect(); Ok(HttpResponse::Ok().json(result)) } #[routes] #[get("/api/hier/{path:.*}")] #[put("/api/hier/{path:.*}")] pub async fn list_hier( state: web::Data, path: web::Path, req: HttpRequest, ) -> Result { let user = check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; if path.is_empty() { Ok(HttpResponse::MovedPermanently() .append_header((http::header::LOCATION, "../../api/hier_roots")) .finish()) } else { let upath: UHierPath = path.into_inner().parse().map_err(ErrorBadRequest)?; trace!(r#"Listing path "{}""#, upath); let create = !req.method().is_safe(); let path = web::block(move || { resolve_path( &connection, &upath, create, OperationContext { user, provenance: "API".to_string(), }, ) }) .await? .map_err(ErrorNotFound)?; match path.last() { Some(addr) => Ok(HttpResponse::Found() .append_header((http::header::LOCATION, format!("../../api/obj/{}", addr))) .finish()), None => Ok(HttpResponse::NotFound().finish()), } } } #[get("/api/hier_roots")] pub async fn list_hier_roots( req: HttpRequest, state: web::Data, ) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let result = web::block(move || { list_roots(&connection)? .into_iter() .map(|root| connection.retrieve_object(&root)) .collect::>>>() }) .await? .map_err(ErrorInternalServerError)? .concat(); Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?)) } #[derive(Deserialize)] pub struct RescanRequest { initial: Option, tree_mode: Option, } #[post("/api/refresh")] pub async fn api_refresh( req: HttpRequest, state: web::Data, web::Query(query): web::Query, ) -> Result { let user = check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; block_background::<_, _, anyhow::Error>(move || { let _ = state.store.update( &state.upend, state.job_container.clone(), UpdateOptions { initial: query.initial.unwrap_or(false), tree_mode: query.tree_mode.unwrap_or( connection .get_vault_options()? .blob_mode .unwrap_or_default(), ), }, OperationContext { user: user.clone(), provenance: "API".to_string(), }, ); let _ = crate::extractors::extract_all( state.upend.clone(), state.store.clone(), state.job_container.clone(), OperationContext { user: user.clone(), provenance: "API".to_string(), }, ); Ok(()) }); Ok(HttpResponse::Ok().finish()) } #[get("/api/stats/vault")] pub async fn vault_stats(req: HttpRequest, state: web::Data) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; Ok(HttpResponse::Ok().json(connection.get_stats().map_err(ErrorInternalServerError)?)) } #[get("/api/stats/store")] pub async fn store_stats(req: HttpRequest, state: web::Data) -> Result { check_auth(&req, &state)?; Ok(HttpResponse::Ok().json(json!({ "main": state.store.stats().map_err(ErrorInternalServerError)? }))) } #[derive(Deserialize)] pub struct JobsRequest { full: Option, } #[get("/api/jobs")] pub async fn get_jobs( req: HttpRequest, state: web::Data, web::Query(query): web::Query, ) -> Result { check_auth(&req, &state)?; let jobs = state .job_container .get_jobs() .map_err(ErrorInternalServerError)?; Ok(HttpResponse::Ok().json(if query.full.is_some() { jobs } else { jobs.into_iter() .filter(|(_, j)| matches!(j.state, jobs::JobState::InProgress)) .collect() })) } #[get("/api/info")] pub async fn get_info(state: web::Data) -> Result { Ok(HttpResponse::Ok().json(json!({ "name": state.config.vault_name, // "location": &*state.store.path, "version": format!( "{} (base: {}, db: {}, cli: {})", crate::common::get_version(), upend_base::common::build::PKG_VERSION, upend_db::common::build::PKG_VERSION, build::PKG_VERSION ), "desktop": state.config.desktop_enabled, "public": *state.public.lock().unwrap(), }))) } #[get("/api/options")] pub async fn get_options(req: HttpRequest, state: web::Data) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; Ok(HttpResponse::Ok().json( connection .get_vault_options() .map_err(ErrorInternalServerError)?, )) } #[put("/api/options")] pub async fn put_options( req: HttpRequest, state: web::Data, payload: web::Json, ) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let options = payload.into_inner(); web::block(move || connection.set_vault_options(options)) .await .map_err(ErrorInternalServerError)? .map_err(ErrorInternalServerError)?; Ok(HttpResponse::Ok().finish()) } #[get("/api/migration/user-entries")] pub async fn get_user_entries( req: HttpRequest, state: web::Data, ) -> Result { check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let result = web::block(move || connection.get_explicit_entries()) .await? .map_err(ErrorInternalServerError)?; Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?)) } #[derive(Debug)] enum ExternalFetchError { Status(anyhow::Error), TooLarge((usize, usize)), UnknownSize, Other(anyhow::Error), } impl std::fmt::Display for ExternalFetchError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "{}", match self { ExternalFetchError::Status(err) => format!("Received non-OK (2XX) status code: {}", err), ExternalFetchError::TooLarge((size, max_size)) => format!("The response is too large ({size} > {max_size})."), ExternalFetchError::UnknownSize => "Could not ascertain response size.".into(), ExternalFetchError::Other(err) => format!("Unknown error: {}", err), } ) } } impl ResponseError for ExternalFetchError { fn status_code(&self) -> http::StatusCode { match self { ExternalFetchError::Status(_) => http::StatusCode::UNPROCESSABLE_ENTITY, ExternalFetchError::TooLarge(_) => http::StatusCode::PAYLOAD_TOO_LARGE, ExternalFetchError::UnknownSize => http::StatusCode::UNPROCESSABLE_ENTITY, ExternalFetchError::Other(_) => http::StatusCode::INTERNAL_SERVER_ERROR, } } } const MAX_EXTERNAL_SIZE: usize = 128_000_000; #[tracing::instrument(skip(url), fields(url = % url))] fn fetch_external(url: Url) -> Result<(bytes::Bytes, Option), ExternalFetchError> { debug!("Fetching..."); let response = REQWEST_CLIENT .get(url) .send() .map_err(|err| ExternalFetchError::Other(err.into()))? .error_for_status() .map_err(|err| ExternalFetchError::Status(err.into()))?; if let Some(content_length) = response.headers().get(reqwest::header::CONTENT_LENGTH) { if let Some(content_length) = content_length .to_str() .ok() .and_then(|cl| cl.parse::().ok()) { if content_length > MAX_EXTERNAL_SIZE { return Err(ExternalFetchError::TooLarge(( content_length, MAX_EXTERNAL_SIZE, ))); } } } else { return Err(ExternalFetchError::UnknownSize); } let filename: Option = response .headers() .get(http::header::CONTENT_DISPOSITION) .and_then(|hv| http::header::ContentDisposition::from_raw(hv).ok()) .and_then(|cd| cd.get_filename().map(String::from)); debug!("Got filename: {filename:?}"); let bytes = response .bytes() .map_err(|err| ExternalFetchError::Other(err.into()))?; debug!("Got {} bytes.", bytes.len()); Ok((bytes, filename)) } #[cfg(test)] mod tests { use std::fs::File; use super::*; use anyhow::Result; use tempfile::TempDir; use upend_base::hash::UpMultihash; #[test] fn test_in_address() -> Result<()> { let address = Address::Url(Url::parse("https://upend.dev").unwrap()); let in_address = InAddress::Address(address.to_string()); assert_eq!(address, in_address.try_into()?); let in_address = InAddress::Components { t: "Url".into(), c: Some("https://upend.dev".into()), }; assert_eq!(address, in_address.try_into()?); Ok(()) } #[actix_web::test] async fn test_get_info() { let app = actix_web::test::init_service(crate::serve::get_app::>( false, vec![], get_state(), )) .await; let req = actix_web::test::TestRequest::get() .uri("/api/info") .to_request(); #[derive(Deserialize)] struct VaultInfo { name: Option, desktop: bool, } let info: VaultInfo = actix_web::test::call_and_read_body_json(&app, req).await; assert_eq!(info.name, Some("TEST VAULT".to_string())); assert!(!info.desktop); } #[actix_web::test] async fn test_get_hier() { let app = actix_web::test::init_service(crate::serve::get_app::>( false, vec![], get_state(), )) .await; let req = actix_web::test::TestRequest::get() .uri("/api/hier/") .to_request(); let result = actix_web::test::call_service(&app, req).await; assert_eq!(result.status(), http::StatusCode::MOVED_PERMANENTLY); assert_eq!( result .headers() .get(http::header::LOCATION) .unwrap() .to_str() .unwrap(), "../../api/hier_roots" ); let req = actix_web::test::TestRequest::get() .uri("/api/hier_roots") .to_request(); let roots: HashMap = actix_web::test::call_and_read_body_json(&app, req).await; let mut labels = roots.values().filter(|v| v.attribute == ATTR_LABEL); assert_eq!(labels.next().unwrap().value, "NATIVE".into()); assert!(labels.next().is_none()); let req = actix_web::test::TestRequest::get() .uri("/api/hier/NATIVE/hello-world.txt") .to_request(); let result = actix_web::test::call_service(&app, req).await; assert_eq!( result.status(), http::StatusCode::FOUND, "expected redirect, got {:}", result.status() ); assert_eq!( result .headers() .get(http::header::LOCATION) .unwrap() .to_str() .unwrap(), "../../api/obj/zb2rhkD35pyMqGBbkb9B2o1CuTXsqWxxtTfm87etbv4K8rGBz" ); } #[actix_web::test] async fn test_obj_entity_info() { let app = actix_web::test::init_service(crate::serve::get_app::>( false, vec![], get_state(), )) .await; let digest = UpMultihash::from_sha256([1, 2, 3, 4, 5]).unwrap(); let digest_str = b58_encode(digest.to_bytes()); let address = Address::Hash(digest); let req = actix_web::test::TestRequest::get() .uri(&format!("/api/obj/{}", address)) .to_request(); let result: serde_json::Value = actix_web::test::call_and_read_body_json(&app, req).await; assert_eq!(result["entity"]["t"], "Hash"); assert_eq!(result["entity"]["c"], digest_str); let address = Address::Attribute("TEST".parse().unwrap()); let req = actix_web::test::TestRequest::get() .uri(&format!("/api/obj/{}", address)) .to_request(); let result: serde_json::Value = actix_web::test::call_and_read_body_json(&app, req).await; assert_eq!(result["entity"]["t"], "Attribute"); assert_eq!(result["entity"]["c"], "TEST"); let address = Address::Url("https://upend.dev/".parse().unwrap()); let req = actix_web::test::TestRequest::get() .uri(&format!("/api/obj/{}", address)) .to_request(); let result: serde_json::Value = actix_web::test::call_and_read_body_json(&app, req).await; assert_eq!(result["entity"]["t"], "Url"); assert_eq!(result["entity"]["c"], "https://upend.dev/"); let uuid = uuid::Uuid::new_v4(); let address = Address::Uuid(uuid); let req = actix_web::test::TestRequest::get() .uri(&format!("/api/obj/{}", address)) .to_request(); let result: serde_json::Value = actix_web::test::call_and_read_body_json(&app, req).await; assert_eq!(result["entity"]["t"], "Uuid"); assert_eq!(result["entity"]["c"], uuid.to_string()); } fn get_state() -> State { // Prepare temporary filesystem structure let temp_dir = TempDir::new().unwrap(); let file_path = temp_dir.path().join("my-temporary-note.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Brian was here. Briefly.").unwrap(); let file_path = temp_dir.path().join("hello-world.txt"); let mut tmp_file = File::create(file_path).unwrap(); writeln!(tmp_file, "Hello, World!").unwrap(); let file_path = temp_dir.path().join("empty"); File::create(file_path).unwrap(); // Initialize database let open_result = UpEndDatabase::open(&temp_dir, true).unwrap(); let upend = Arc::new(open_result.db); let store = Arc::new(Box::new( upend_db::stores::fs::FsStore::from_path(temp_dir.path()).unwrap(), ) as Box); let job_container = jobs::JobContainer::new(); store .update( &upend, job_container.clone(), UpdateOptions { initial: true, tree_mode: upend_db::BlobMode::default(), }, OperationContext::default(), ) .unwrap(); State { upend, store, config: UpEndConfig { vault_name: Some("TEST VAULT".to_string()), desktop_enabled: false, trust_executables: false, secret: "secret".to_string(), }, job_container, preview_store: None, preview_thread_pool: None, public: Arc::new(Mutex::new(true)), } } }