upend/cli/src/routes.rs

1392 lines
46 KiB
Rust

use crate::common::build;
use crate::common::REQWEST_CLIENT;
use crate::config::UpEndConfig;
use crate::extractors;
use crate::previews::PreviewStore;
use crate::util::exec::block_background;
use actix_files::NamedFile;
use actix_multipart::Multipart;
use actix_web::error::{
ErrorBadRequest, ErrorInternalServerError, ErrorNotFound, ErrorUnauthorized,
};
use actix_web::http::header::ContentDisposition;
use actix_web::{
delete, error, get, head, post, put, routes, web, Either, Error, HttpResponse, ResponseError,
};
use actix_web::{http, Responder};
use actix_web::{
http::header::{CacheControl, CacheDirective, DispositionType},
HttpRequest,
};
use anyhow::Result;
use futures::channel::oneshot;
use futures_util::{StreamExt, TryStreamExt};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::io::Write;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use tempfile::NamedTempFile;
use tracing::{debug, info, trace};
use upend_base::addressing::AddressComponents;
use upend_base::addressing::{Address, Addressable};
use upend_base::constants::{ATTR_ADDED, ATTR_LABEL};
use upend_base::entry::{Entry, EntryValue, InvariantEntry};
use upend_base::hash::{b58_decode, b58_encode, sha256hash};
use upend_base::lang::Query;
use upend_db::hierarchies::{list_roots, resolve_path, UHierPath};
use upend_db::jobs;
use upend_db::stores::UpdateOptions;
use upend_db::stores::{Blob, UpStore};
use upend_db::BlobMode;
use upend_db::OperationContext;
use upend_db::UpEndDatabase;
use upend_db::VaultOptions;
use url::Url;
#[cfg(feature = "desktop")]
use is_executable::IsExecutable;
use upend_base::error::UpEndError;
#[derive(Clone)]
pub struct State {
pub upend: Arc<UpEndDatabase>,
pub store: Arc<Box<dyn UpStore + Sync + Send>>,
pub config: UpEndConfig,
pub job_container: jobs::JobContainer,
pub preview_store: Option<Arc<PreviewStore>>,
pub preview_thread_pool: Option<Arc<rayon::ThreadPool>>,
pub public: Arc<Mutex<bool>>,
}
#[derive(Debug, Serialize, Deserialize)]
struct JwtClaims {
user: String,
exp: usize,
}
#[derive(Deserialize)]
pub struct UserPayload {
username: String,
password: String,
}
#[derive(Deserialize)]
pub struct LoginQueryParams {
via: Option<String>,
}
#[post("/api/auth/login")]
pub async fn login(
state: web::Data<State>,
payload: web::Json<UserPayload>,
query: web::Query<LoginQueryParams>,
) -> Result<HttpResponse, Error> {
let conn = state.upend.connection().map_err(ErrorInternalServerError)?;
match conn.authenticate_user(&payload.username, &payload.password) {
Ok(()) => {
let token = create_token(&payload.username, &state.config.secret)?;
match query.via.as_deref() {
Some("cookie") => Ok(HttpResponse::NoContent()
.append_header((http::header::SET_COOKIE, format!("key={}; Path=/", token)))
.finish()),
_ => Ok(HttpResponse::Ok().json(json!({ "key": token }))),
}
}
Err(_) => Err(ErrorUnauthorized("Invalid credentials.")),
}
}
#[post("/api/auth/logout")]
pub async fn logout() -> Result<HttpResponse, Error> {
Ok(HttpResponse::NoContent()
.append_header((http::header::SET_COOKIE, "key=; Path=/; Max-Age=0"))
.finish())
}
#[post("/api/auth/register")]
pub async fn register(
req: HttpRequest,
state: web::Data<State>,
payload: web::Json<UserPayload>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let conn = state.upend.connection().map_err(ErrorInternalServerError)?;
match conn.set_user(&payload.username, &payload.password) {
Ok(_) => {
*state.public.lock().unwrap() = false;
let token = create_token(&payload.username, &state.config.secret)?;
Ok(HttpResponse::Ok().json(json!({ "token": token })))
}
Err(e) => Err(ErrorInternalServerError(e)),
}
}
#[get("/api/auth/whoami")]
pub async fn whoami(req: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, Error> {
let user = check_auth(&req, &state)?;
Ok(HttpResponse::Ok().json(json!({ "user": user })))
}
fn check_auth(req: &HttpRequest, state: &State) -> Result<Option<String>, actix_web::Error> {
if *state.public.lock().unwrap() {
return Ok(None);
}
let key = if let Some(value) = req.headers().get("Authorization") {
let value = value.to_str().map_err(|err| {
ErrorBadRequest(format!("Invalid value in Authorization header: {err:?}"))
})?;
if !value.starts_with("Bearer ") {
return Err(ErrorUnauthorized("Invalid token type."));
}
Some(value.trim_start_matches("Bearer ").to_string())
} else if let Ok(cookies) = req.cookies() {
cookies
.iter()
.find(|c| c.name() == "key")
.map(|cookie| cookie.value().to_string())
} else {
None
};
if let Some(key) = key {
let token = jsonwebtoken::decode::<JwtClaims>(
&key,
&jsonwebtoken::DecodingKey::from_secret(state.config.secret.as_ref()),
&jsonwebtoken::Validation::default(),
);
match token {
Ok(token) => Ok(Some(token.claims.user)),
Err(err) => Err(ErrorUnauthorized(format!("Invalid token: {err:?}"))),
}
} else {
Err(ErrorUnauthorized("Authorization required."))
}
}
fn create_token(username: &str, secret: &str) -> Result<String, Error> {
let claims = JwtClaims {
user: username.to_string(),
exp: (SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(ErrorInternalServerError)?
.as_secs()
+ 7 * 24 * 60 * 60) as usize,
};
jsonwebtoken::encode(
&jsonwebtoken::Header::default(),
&claims,
&jsonwebtoken::EncodingKey::from_secret(secret.as_ref()),
)
.map_err(ErrorInternalServerError)
}
#[derive(Deserialize)]
pub struct RawRequest {
native: Option<String>,
inline: Option<String>,
}
#[get("/api/raw/{hash}")]
pub async fn get_raw(
req: HttpRequest,
state: web::Data<State>,
web::Query(query): web::Query<RawRequest>,
hash: web::Path<String>,
) -> Result<impl Responder, Error> {
check_auth(&req, &state)?;
let address =
Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?;
if let Address::Hash(hash) = address {
let hash = Arc::new(hash);
let _hash = hash.clone();
let _store = state.store.clone();
let blobs = web::block(move || _store.retrieve(_hash.as_ref()))
.await?
.map_err(ErrorInternalServerError)?;
if let Some(blob) = blobs.first() {
let file_path = blob.get_file_path();
if query.native.is_none() {
return Ok(Either::Left(
NamedFile::open(file_path)?
.set_content_disposition(ContentDisposition {
disposition: if query.inline.is_some() {
DispositionType::Inline
} else {
DispositionType::Attachment
},
parameters: vec![],
})
.customize()
.insert_header((
http::header::CACHE_CONTROL,
CacheControl(vec![
CacheDirective::MaxAge(2678400),
CacheDirective::Extension("immutable".into(), None),
]),
)),
));
} else if state.config.desktop_enabled {
#[cfg(feature = "desktop")]
{
info!("Opening {:?}...", file_path);
let mut response = HttpResponse::NoContent();
let path = if !file_path.is_executable() || state.config.trust_executables {
file_path
} else {
response
.append_header((
http::header::WARNING,
"199 - Opening parent directory due to file being executable.",
))
.append_header((
http::header::ACCESS_CONTROL_EXPOSE_HEADERS,
http::header::WARNING.to_string(),
));
file_path.parent().ok_or_else(|| {
ErrorInternalServerError("No parent to open as fallback.")
})?
};
opener::open(path).map_err(error::ErrorServiceUnavailable)?;
return Ok(Either::Right(response.finish()));
}
#[cfg(not(feature = "desktop"))]
unreachable!()
} else {
return Err(error::ErrorNotImplemented("Desktop features not enabled."));
}
}
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _hash = hash.clone();
let entry = web::block(move || connection.retrieve_entry(_hash.as_ref()))
.await?
.map_err(ErrorInternalServerError)?;
if let Some(entry) = entry {
return Ok(Either::Right(HttpResponse::Ok().json(entry)));
}
Err(error::ErrorNotFound("NOT FOUND"))
} else {
Err(ErrorBadRequest(
"Address does not refer to a rawable object.",
))
}
}
#[head("/api/raw/{hash}")]
pub async fn head_raw(
req: HttpRequest,
state: web::Data<State>,
hash: web::Path<String>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let address =
Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?;
if let Address::Hash(hash) = address {
let hash = Arc::new(hash);
let _hash = hash.clone();
let _store = state.store.clone();
let blobs = web::block(move || _store.retrieve(_hash.as_ref()))
.await?
.map_err(ErrorInternalServerError)?;
if let Some(blob) = blobs.first() {
let file_path = blob.get_file_path();
let mut response = HttpResponse::NoContent();
if let Some(mime_type) = tree_magic_mini::from_filepath(file_path) {
if let Ok(mime) = mime_type.parse::<mime::Mime>() {
return Ok(response.content_type(mime).finish());
}
}
return Ok(response.into());
}
Err(error::ErrorNotFound("NOT FOUND"))
} else {
Err(ErrorBadRequest(
"Address does not refer to a rawable object.",
))
}
}
#[get("/api/thumb/{hash}")]
pub async fn get_thumbnail(
req: HttpRequest,
state: web::Data<State>,
hash: web::Path<String>,
web::Query(query): web::Query<HashMap<String, String>>,
) -> Result<Either<NamedFile, HttpResponse>, Error> {
check_auth(&req, &state)?;
#[cfg(feature = "previews")]
if let Some(preview_store) = &state.preview_store {
let hash = hash.into_inner();
let address = Address::decode(&b58_decode(&hash).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?;
if let Address::Hash(address_hash) = address {
let preview_store = preview_store.clone();
let (tx, rx) = oneshot::channel();
let _job_container = state.job_container.clone();
state.preview_thread_pool.as_ref().unwrap().spawn(move || {
let result = preview_store.get(address_hash, query, _job_container);
tx.send(result).unwrap();
});
let preview_result = rx.await.unwrap().map_err(ErrorInternalServerError)?;
if let Some(preview_path) = preview_result {
let mut file = NamedFile::open(&preview_path)?.disable_content_disposition();
if let Some(mime_type) = tree_magic_mini::from_filepath(&preview_path) {
if let Ok(mime) = mime_type.parse() {
file = file.set_content_type(mime);
}
}
return Ok(Either::Left(file));
} else {
return Ok(Either::Right(
HttpResponse::SeeOther()
.append_header((http::header::LOCATION, format!("../../api/raw/{hash}")))
.finish(),
));
}
} else {
return Err(ErrorBadRequest(
"Address does not refer to a previewable object.",
));
}
}
Err(error::ErrorNotImplemented("Previews not enabled."))
}
#[post("/api/query")]
pub async fn get_query(
req: HttpRequest,
state: web::Data<State>,
query: String,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let in_query: Query = query.parse().map_err(ErrorBadRequest)?;
let entries = web::block(move || connection.query(in_query))
.await
.map_err(ErrorInternalServerError)?
.map_err(ErrorInternalServerError)?;
let mut result: HashMap<String, Entry> = HashMap::new();
for entry in entries {
result.insert(
b58_encode(
entry
.address()
.map_err(ErrorInternalServerError)?
.encode()
.map_err(ErrorInternalServerError)?,
),
entry,
);
}
Ok(HttpResponse::Ok().json(&result))
}
trait EntriesAsHash {
fn as_hash(&self) -> Result<HashMap<String, &Entry>>;
}
impl EntriesAsHash for Vec<Entry> {
fn as_hash(&self) -> Result<HashMap<String, &Entry>> {
let mut result: HashMap<String, &Entry> = HashMap::new();
for entry in self {
result.insert(b58_encode(entry.address()?.encode()?), entry);
}
Ok(result)
}
}
#[get("/api/obj/{address_str}")]
pub async fn get_object(
req: HttpRequest,
state: web::Data<State>,
address: web::Path<Address>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let address = address.into_inner();
let _address = address.clone();
let result: Vec<Entry> = web::block(move || connection.retrieve_object(&_address))
.await?
.map_err(ErrorInternalServerError)?;
trace!("{:?}", result);
Ok(HttpResponse::Ok().json(json!({
"entity": address.as_components(),
"entries": result.as_hash().map_err(ErrorInternalServerError)?
})))
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum InAddress {
Address(String),
Components { t: String, c: Option<String> },
}
impl TryInto<Address> for InAddress {
type Error = anyhow::Error;
fn try_into(self) -> Result<Address, Self::Error> {
Ok(match self {
InAddress::Address(address) => address.parse()?,
InAddress::Components { t, c } => Address::from_components(AddressComponents { t, c })?,
})
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct InEntry {
pub entity: Option<InAddress>,
pub attribute: String,
pub value: EntryValue,
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged, deny_unknown_fields)]
pub enum PutInput {
Entry(InEntry),
EntryList(Vec<InEntry>),
Address { entity: InAddress },
}
#[derive(Deserialize)]
pub struct UpdateQuery {
provenance: Option<String>,
}
#[put("/api/obj")]
pub async fn put_object(
req: HttpRequest,
state: web::Data<State>,
payload: web::Json<PutInput>,
web::Query(query): web::Query<UpdateQuery>,
) -> Result<HttpResponse, Error> {
let user = check_auth(&req, &state)?;
let (entry_address, entity_address) = {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let in_entry = payload.into_inner();
debug!("PUTting {in_entry:?}");
let provenance = query.provenance.clone();
let _user = user.clone();
let process_inentry = move |in_entry: InEntry| -> Result<Entry> {
if let Some(entity) = in_entry.entity {
Ok(Entry {
entity: entity.try_into()?,
attribute: in_entry.attribute.parse()?,
value: in_entry.value,
provenance: (match &provenance {
Some(s) => format!("API {}", s),
None => "API".to_string(),
})
.trim()
.to_string(),
timestamp: chrono::Utc::now().naive_utc(),
user: _user.clone(),
})
} else {
Ok(Entry::try_from(&InvariantEntry {
attribute: in_entry.attribute.parse()?,
value: in_entry.value,
})?)
}
};
match in_entry {
PutInput::Entry(in_entry) => {
let entry = process_inentry(in_entry).map_err(ErrorBadRequest)?;
web::block::<_, Result<_>>(move || {
Ok((
Some(connection.insert_entry(entry.clone())?),
Some(entry.entity),
))
})
.await?
.map_err(ErrorInternalServerError)?
}
PutInput::EntryList(entries) => {
web::block(move || {
connection.transaction::<_, anyhow::Error, _>(|| {
for entry in entries {
connection.insert_entry(process_inentry(entry)?)?;
}
Ok(())
})
})
.await?
.map_err(ErrorBadRequest)?;
(None, None)
}
PutInput::Address { entity: in_address } => {
let address: Address = in_address.try_into().map_err(ErrorBadRequest)?;
let _address = address.clone();
let _job_container = state.job_container.clone();
let _store = state.store.clone();
let _user = user.clone();
block_background::<_, _, anyhow::Error>(move || {
let entry_count = extractors::extract(
&_address,
&connection,
_store,
_job_container,
OperationContext {
user: _user,
provenance: "API".to_string(),
},
);
debug!("Added {entry_count} extracted entries for {_address:?}");
Ok(())
});
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _user = user.clone();
web::block(move || {
connection.transaction::<_, anyhow::Error, _>(|| {
if connection.retrieve_object(&address)?.is_empty() {
connection.insert_entry(Entry {
entity: address.clone(),
attribute: ATTR_ADDED.parse().unwrap(),
value: EntryValue::Number(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs() as f64,
),
provenance: (match &query.provenance {
Some(s) => format!("API {}", s),
None => "API".to_string(),
})
.trim()
.to_string(),
user: _user,
timestamp: chrono::Utc::now().naive_utc(),
})?;
}
Ok((None, Some(address)))
})
})
.await?
.map_err(ErrorInternalServerError)?
}
}
};
Ok(HttpResponse::Ok().json([entry_address, entity_address]))
}
#[put("/api/blob")]
pub async fn put_blob(
req: HttpRequest,
state: web::Data<State>,
mut payload: Multipart,
) -> Result<HttpResponse, Error> {
let user = check_auth(&req, &state)?;
if let Some(mut field) = payload.try_next().await? {
let mut file = NamedTempFile::new()?;
let (filename, fallback_label) = if field.name() == "@url" {
let mut url_buffer = String::new();
let mut stream = field.into_stream();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result?;
let chunk_str = String::from_utf8_lossy(&chunk);
url_buffer.push_str(&chunk_str);
}
let url = Url::parse(&url_buffer).map_err(ErrorBadRequest)?;
let _url = url.clone();
let (bytes, filename) = web::block(move || fetch_external(_url)).await??;
file.write_all(&bytes)?;
(
filename.or_else(|| url.path_segments().and_then(|s| s.last().map(String::from))),
url.to_string(),
)
} else {
while let Some(chunk) = field.try_next().await? {
file = web::block(move || file.write_all(&chunk).map(|_| file)).await??;
}
(
field.content_disposition().get_filename().map(String::from),
String::from(field.name()),
)
};
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _store = state.store.clone();
let _filename = filename.clone();
let _user = user.clone();
let hash = web::block(move || {
let options = connection.get_vault_options()?;
_store
.store(
&connection,
Blob::from_filepath(file.path()),
_filename,
options.blob_mode,
OperationContext {
user: _user,
provenance: "API".to_string(),
},
)
.map_err(anyhow::Error::from)
})
.await?
.map_err(ErrorInternalServerError)?;
let address = Address::Hash(hash);
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _address = address.clone();
let _filename = filename.clone();
let _ = web::block(move || {
upend_insert_val!(
&connection,
_address,
ATTR_LABEL,
_filename.unwrap_or(fallback_label)
)
})
.await;
let _address = address.clone();
let _job_container = state.job_container.clone();
let _store = state.store.clone();
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _user = user.clone();
block_background::<_, _, anyhow::Error>(move || {
let entry_count = extractors::extract(
&_address,
&connection,
_store,
_job_container,
OperationContext {
user: _user,
provenance: "API".to_string(),
},
);
debug!("Added {entry_count} extracted entries for {_address:?}");
Ok(())
});
Ok(HttpResponse::Ok().json(address))
} else {
Err(ErrorBadRequest("Multipart contains no fields."))
}
}
#[put("/api/obj/{address}/{attribute}")]
pub async fn put_object_attribute(
req: HttpRequest,
state: web::Data<State>,
path: web::Path<(Address, String)>,
value: web::Json<EntryValue>,
web::Query(query): web::Query<UpdateQuery>,
) -> Result<HttpResponse, Error> {
let user = check_auth(&req, &state)?;
let (address, attribute) = path.into_inner();
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let new_address = web::block(move || {
connection.transaction::<_, anyhow::Error, _>(|| {
let existing_attr_entries =
connection.query(format!(r#"(matches @{address} "{attribute}" ?)"#).parse()?)?;
for eae in existing_attr_entries {
let _ = connection.remove_object(eae.address()?)?;
}
let new_attr_entry = Entry {
entity: address,
attribute: attribute.parse()?,
value: value.into_inner(),
provenance: (match &query.provenance {
Some(s) => format!("API {}", s),
None => "API".to_string(),
})
.trim()
.to_string(),
user: user.clone(),
timestamp: chrono::Utc::now().naive_utc(),
};
connection.insert_entry(new_attr_entry)
})
})
.await?
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(new_address))
}
#[delete("/api/obj/{address_str}")]
pub async fn delete_object(
req: HttpRequest,
state: web::Data<State>,
address: web::Path<Address>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _ = web::block(move || connection.remove_object(address.into_inner()))
.await
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().finish())
}
// #[post("api/obj/{address_str}")]
// pub async fn update_attribute(
// state: web::Data<State>,
// address_str: web::Path<String>,
// mut payload: web::Payload,
// ) -> Result<HttpResponse, Error> {
// let body = load_body(&mut payload)
// .await
// .map_err(error::ErrorBadRequest)?;
// let entry_value = serde_json::from_slice::<EntryValue>(&body).map_err(ErrorBadRequest)?;
// let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
// connection
// .transaction::<_, anyhow::Error, _>(|| {
// let address = Address::decode(&decode(address_str.into_inner())?)?;
// let _ = connection.remove_object(address)?;
// Ok(())
// })
// .map_err(ErrorInternalServerError)?;
// Ok(HttpResponse::Ok().finish())
// }
#[get("/api/address")]
pub async fn get_address(
web::Query(query): web::Query<HashMap<String, String>>,
) -> Result<HttpResponse, Error> {
let (address, immutable) = if let Some(attribute) = query.get("attribute") {
(
Address::Attribute(
attribute
.parse()
.map_err(|e: UpEndError| ErrorBadRequest(e.to_string()))?,
),
true,
)
} else if let Some(url) = query.get("url") {
(
Address::Url(Url::parse(url).map_err(ErrorBadRequest)?),
true,
)
} else if let Some(url) = query.get("url_content") {
let url = Url::parse(url).map_err(ErrorBadRequest)?;
let (bytes, _) = web::block(|| fetch_external(url)).await??;
let hash_result = sha256hash(&bytes).map_err(ErrorInternalServerError)?;
(Address::Hash(hash_result), false)
} else if let Some(type_str) = query.get("type") {
match type_str.as_str() {
"Hash" => (upend_base::constants::TYPE_HASH_ADDRESS.clone(), true),
"Uuid" => (upend_base::constants::TYPE_UUID_ADDRESS.clone(), true),
"Attribute" => (upend_base::constants::TYPE_ATTRIBUTE_ADDRESS.clone(), true),
"Url" => (upend_base::constants::TYPE_URL_ADDRESS.clone(), true),
_ => return Err(ErrorBadRequest(format!("Unknown type: {type_str}"))),
}
} else {
return Err(ErrorBadRequest(
"Specify one of: `attribute`, `url`, `url_content`, `type`.",
));
};
let mut response = HttpResponse::Ok();
if immutable {
response.append_header((
http::header::CACHE_CONTROL,
CacheControl(vec![
CacheDirective::MaxAge(2678400),
CacheDirective::Extension("immutable".into(), None),
]),
));
}
Ok(response.json(format!("{}", address)))
}
#[get("/api/all/attributes")]
pub async fn get_all_attributes(
req: HttpRequest,
state: web::Data<State>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let attributes = web::block(move || connection.get_all_attributes())
.await?
.map_err(ErrorInternalServerError)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let result: serde_json::Value = attributes
.into_iter()
.map(|attribute| {
json!({
"name": attribute,
"labels": connection
.retrieve_object(&Address::Attribute(attribute))
.unwrap_or_else(|_| vec![])
.into_iter()
.filter_map(|e| {
if e.attribute == ATTR_LABEL {
if let EntryValue::String(label) = e.value {
Some(label)
} else {
None
}
} else {
None
}
})
.collect::<Vec<String>>(),
})
})
.collect();
Ok(HttpResponse::Ok().json(result))
}
#[routes]
#[get("/api/hier/{path:.*}")]
#[put("/api/hier/{path:.*}")]
pub async fn list_hier(
state: web::Data<State>,
path: web::Path<String>,
req: HttpRequest,
) -> Result<HttpResponse, Error> {
let user = check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
if path.is_empty() {
Ok(HttpResponse::MovedPermanently()
.append_header((http::header::LOCATION, "../../api/hier_roots"))
.finish())
} else {
let upath: UHierPath = path.into_inner().parse().map_err(ErrorBadRequest)?;
trace!(r#"Listing path "{}""#, upath);
let create = !req.method().is_safe();
let path = web::block(move || {
resolve_path(
&connection,
&upath,
create,
OperationContext {
user,
provenance: "API".to_string(),
},
)
})
.await?
.map_err(ErrorNotFound)?;
match path.last() {
Some(addr) => Ok(HttpResponse::Found()
.append_header((http::header::LOCATION, format!("../../api/obj/{}", addr)))
.finish()),
None => Ok(HttpResponse::NotFound().finish()),
}
}
}
#[get("/api/hier_roots")]
pub async fn list_hier_roots(
req: HttpRequest,
state: web::Data<State>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let result = web::block(move || {
list_roots(&connection)?
.into_iter()
.map(|root| connection.retrieve_object(&root))
.collect::<Result<Vec<Vec<Entry>>>>()
})
.await?
.map_err(ErrorInternalServerError)?
.concat();
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
}
#[derive(Deserialize)]
pub struct RescanRequest {
initial: Option<bool>,
tree_mode: Option<BlobMode>,
}
#[post("/api/refresh")]
pub async fn api_refresh(
req: HttpRequest,
state: web::Data<State>,
web::Query(query): web::Query<RescanRequest>,
) -> Result<HttpResponse, Error> {
let user = check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
block_background::<_, _, anyhow::Error>(move || {
let _ = state.store.update(
&state.upend,
state.job_container.clone(),
UpdateOptions {
initial: query.initial.unwrap_or(false),
tree_mode: query.tree_mode.unwrap_or(
connection
.get_vault_options()?
.blob_mode
.unwrap_or_default(),
),
},
OperationContext {
user: user.clone(),
provenance: "API".to_string(),
},
);
let _ = crate::extractors::extract_all(
state.upend.clone(),
state.store.clone(),
state.job_container.clone(),
OperationContext {
user: user.clone(),
provenance: "API".to_string(),
},
);
Ok(())
});
Ok(HttpResponse::Ok().finish())
}
#[get("/api/stats/vault")]
pub async fn vault_stats(req: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(connection.get_stats().map_err(ErrorInternalServerError)?))
}
#[get("/api/stats/store")]
pub async fn store_stats(req: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
Ok(HttpResponse::Ok().json(json!({
"main": state.store.stats().map_err(ErrorInternalServerError)?
})))
}
#[derive(Deserialize)]
pub struct JobsRequest {
full: Option<String>,
}
#[get("/api/jobs")]
pub async fn get_jobs(
req: HttpRequest,
state: web::Data<State>,
web::Query(query): web::Query<JobsRequest>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let jobs = state
.job_container
.get_jobs()
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(if query.full.is_some() {
jobs
} else {
jobs.into_iter()
.filter(|(_, j)| matches!(j.state, jobs::JobState::InProgress))
.collect()
}))
}
#[get("/api/info")]
pub async fn get_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
Ok(HttpResponse::Ok().json(json!({
"name": state.config.vault_name,
// "location": &*state.store.path,
"version": format!(
"{} (base: {}, db: {}, cli: {})",
crate::common::get_version(),
upend_base::common::build::PKG_VERSION,
upend_db::common::build::PKG_VERSION,
build::PKG_VERSION
),
"desktop": state.config.desktop_enabled,
"public": *state.public.lock().unwrap(),
})))
}
#[get("/api/options")]
pub async fn get_options(req: HttpRequest, state: web::Data<State>) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(
connection
.get_vault_options()
.map_err(ErrorInternalServerError)?,
))
}
#[put("/api/options")]
pub async fn put_options(
req: HttpRequest,
state: web::Data<State>,
payload: web::Json<VaultOptions>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let options = payload.into_inner();
web::block(move || connection.set_vault_options(options))
.await
.map_err(ErrorInternalServerError)?
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().finish())
}
#[get("/api/migration/user-entries")]
pub async fn get_user_entries(
req: HttpRequest,
state: web::Data<State>,
) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let result = web::block(move || connection.get_explicit_entries())
.await?
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
}
#[derive(Debug)]
enum ExternalFetchError {
Status(anyhow::Error),
TooLarge((usize, usize)),
UnknownSize,
Other(anyhow::Error),
}
impl std::fmt::Display for ExternalFetchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
ExternalFetchError::Status(err) =>
format!("Received non-OK (2XX) status code: {}", err),
ExternalFetchError::TooLarge((size, max_size)) =>
format!("The response is too large ({size} > {max_size})."),
ExternalFetchError::UnknownSize => "Could not ascertain response size.".into(),
ExternalFetchError::Other(err) => format!("Unknown error: {}", err),
}
)
}
}
impl ResponseError for ExternalFetchError {
fn status_code(&self) -> http::StatusCode {
match self {
ExternalFetchError::Status(_) => http::StatusCode::UNPROCESSABLE_ENTITY,
ExternalFetchError::TooLarge(_) => http::StatusCode::PAYLOAD_TOO_LARGE,
ExternalFetchError::UnknownSize => http::StatusCode::UNPROCESSABLE_ENTITY,
ExternalFetchError::Other(_) => http::StatusCode::INTERNAL_SERVER_ERROR,
}
}
}
const MAX_EXTERNAL_SIZE: usize = 128_000_000;
#[tracing::instrument(skip(url), fields(url = % url))]
fn fetch_external(url: Url) -> Result<(bytes::Bytes, Option<String>), ExternalFetchError> {
debug!("Fetching...");
let response = REQWEST_CLIENT
.get(url)
.send()
.map_err(|err| ExternalFetchError::Other(err.into()))?
.error_for_status()
.map_err(|err| ExternalFetchError::Status(err.into()))?;
if let Some(content_length) = response.headers().get(reqwest::header::CONTENT_LENGTH) {
if let Some(content_length) = content_length
.to_str()
.ok()
.and_then(|cl| cl.parse::<usize>().ok())
{
if content_length > MAX_EXTERNAL_SIZE {
return Err(ExternalFetchError::TooLarge((
content_length,
MAX_EXTERNAL_SIZE,
)));
}
}
} else {
return Err(ExternalFetchError::UnknownSize);
}
let filename: Option<String> = response
.headers()
.get(http::header::CONTENT_DISPOSITION)
.and_then(|hv| http::header::ContentDisposition::from_raw(hv).ok())
.and_then(|cd| cd.get_filename().map(String::from));
debug!("Got filename: {filename:?}");
let bytes = response
.bytes()
.map_err(|err| ExternalFetchError::Other(err.into()))?;
debug!("Got {} bytes.", bytes.len());
Ok((bytes, filename))
}
#[cfg(test)]
mod tests {
use std::fs::File;
use super::*;
use anyhow::Result;
use tempfile::TempDir;
use upend_base::hash::UpMultihash;
#[test]
fn test_in_address() -> Result<()> {
let address = Address::Url(Url::parse("https://upend.dev").unwrap());
let in_address = InAddress::Address(address.to_string());
assert_eq!(address, in_address.try_into()?);
let in_address = InAddress::Components {
t: "Url".into(),
c: Some("https://upend.dev".into()),
};
assert_eq!(address, in_address.try_into()?);
Ok(())
}
#[actix_web::test]
async fn test_get_info() {
let app = actix_web::test::init_service(crate::serve::get_app::<Vec<String>>(
false,
vec![],
get_state(),
))
.await;
let req = actix_web::test::TestRequest::get()
.uri("/api/info")
.to_request();
#[derive(Deserialize)]
struct VaultInfo {
name: Option<String>,
desktop: bool,
}
let info: VaultInfo = actix_web::test::call_and_read_body_json(&app, req).await;
assert_eq!(info.name, Some("TEST VAULT".to_string()));
assert!(!info.desktop);
}
#[actix_web::test]
async fn test_get_hier() {
let app = actix_web::test::init_service(crate::serve::get_app::<Vec<String>>(
false,
vec![],
get_state(),
))
.await;
let req = actix_web::test::TestRequest::get()
.uri("/api/hier/")
.to_request();
let result = actix_web::test::call_service(&app, req).await;
assert_eq!(result.status(), http::StatusCode::MOVED_PERMANENTLY);
assert_eq!(
result
.headers()
.get(http::header::LOCATION)
.unwrap()
.to_str()
.unwrap(),
"../../api/hier_roots"
);
let req = actix_web::test::TestRequest::get()
.uri("/api/hier_roots")
.to_request();
let roots: HashMap<String, Entry> =
actix_web::test::call_and_read_body_json(&app, req).await;
let mut labels = roots.values().filter(|v| v.attribute == ATTR_LABEL);
assert_eq!(labels.next().unwrap().value, "NATIVE".into());
assert!(labels.next().is_none());
let req = actix_web::test::TestRequest::get()
.uri("/api/hier/NATIVE/hello-world.txt")
.to_request();
let result = actix_web::test::call_service(&app, req).await;
assert_eq!(
result.status(),
http::StatusCode::FOUND,
"expected redirect, got {:}",
result.status()
);
assert_eq!(
result
.headers()
.get(http::header::LOCATION)
.unwrap()
.to_str()
.unwrap(),
"../../api/obj/zb2rhkD35pyMqGBbkb9B2o1CuTXsqWxxtTfm87etbv4K8rGBz"
);
}
#[actix_web::test]
async fn test_obj_entity_info() {
let app = actix_web::test::init_service(crate::serve::get_app::<Vec<String>>(
false,
vec![],
get_state(),
))
.await;
let digest = UpMultihash::from_sha256([1, 2, 3, 4, 5]).unwrap();
let digest_str = b58_encode(digest.to_bytes());
let address = Address::Hash(digest);
let req = actix_web::test::TestRequest::get()
.uri(&format!("/api/obj/{}", address))
.to_request();
let result: serde_json::Value = actix_web::test::call_and_read_body_json(&app, req).await;
assert_eq!(result["entity"]["t"], "Hash");
assert_eq!(result["entity"]["c"], digest_str);
let address = Address::Attribute("TEST".parse().unwrap());
let req = actix_web::test::TestRequest::get()
.uri(&format!("/api/obj/{}", address))
.to_request();
let result: serde_json::Value = actix_web::test::call_and_read_body_json(&app, req).await;
assert_eq!(result["entity"]["t"], "Attribute");
assert_eq!(result["entity"]["c"], "TEST");
let address = Address::Url("https://upend.dev/".parse().unwrap());
let req = actix_web::test::TestRequest::get()
.uri(&format!("/api/obj/{}", address))
.to_request();
let result: serde_json::Value = actix_web::test::call_and_read_body_json(&app, req).await;
assert_eq!(result["entity"]["t"], "Url");
assert_eq!(result["entity"]["c"], "https://upend.dev/");
let uuid = uuid::Uuid::new_v4();
let address = Address::Uuid(uuid);
let req = actix_web::test::TestRequest::get()
.uri(&format!("/api/obj/{}", address))
.to_request();
let result: serde_json::Value = actix_web::test::call_and_read_body_json(&app, req).await;
assert_eq!(result["entity"]["t"], "Uuid");
assert_eq!(result["entity"]["c"], uuid.to_string());
}
fn get_state() -> State {
// Prepare temporary filesystem structure
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("my-temporary-note.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Brian was here. Briefly.").unwrap();
let file_path = temp_dir.path().join("hello-world.txt");
let mut tmp_file = File::create(file_path).unwrap();
writeln!(tmp_file, "Hello, World!").unwrap();
let file_path = temp_dir.path().join("empty");
File::create(file_path).unwrap();
// Initialize database
let open_result = UpEndDatabase::open(&temp_dir, true).unwrap();
let upend = Arc::new(open_result.db);
let store = Arc::new(Box::new(
upend_db::stores::fs::FsStore::from_path(temp_dir.path()).unwrap(),
) as Box<dyn UpStore + Send + Sync>);
let job_container = jobs::JobContainer::new();
store
.update(
&upend,
job_container.clone(),
UpdateOptions {
initial: true,
tree_mode: upend_db::BlobMode::default(),
},
OperationContext::default(),
)
.unwrap();
State {
upend,
store,
config: UpEndConfig {
vault_name: Some("TEST VAULT".to_string()),
desktop_enabled: false,
trust_executables: false,
secret: "secret".to_string(),
},
job_container,
preview_store: None,
preview_thread_pool: None,
public: Arc::new(Mutex::new(true)),
}
}
}