upend/src/routes.rs

740 lines
26 KiB
Rust

use crate::addressing::{Address, Addressable};
use crate::database::constants::{ADDED_ATTR, LABEL_ATTR};
use crate::database::entry::{Entry, EntryValue, InvariantEntry};
use crate::database::hierarchies::{list_roots, resolve_path, UHierPath};
use crate::database::lang::Query;
use crate::database::UpEndDatabase;
use crate::extractors::Extractor;
use crate::filesystem::add_file;
use crate::previews::PreviewStore;
use crate::util::exec::block_background;
use crate::util::hash::{b58_decode, b58_encode, Hashable};
use crate::util::jobs::JobContainer;
use actix_files::NamedFile;
use actix_multipart::Multipart;
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError, ErrorNotFound};
use actix_web::http::header::{CacheControl, CacheDirective, ContentDisposition, DispositionType};
use actix_web::{delete, error, get, post, put, web, Either, Error, HttpResponse};
use actix_web::{http, Responder};
use anyhow::{anyhow, Result};
use futures_util::TryStreamExt;
use log::{debug, info, trace};
use serde::Deserialize;
use serde_json::json;
use std::convert::{TryFrom, TryInto};
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::time::{SystemTime, UNIX_EPOCH};
use std::{collections::HashMap, io};
use tempfile::NamedTempFile;
use uuid::Uuid;
#[cfg(feature = "desktop")]
use is_executable::IsExecutable;
#[derive(Clone)]
pub struct State {
pub upend: Arc<UpEndDatabase>,
pub vault_name: Option<String>,
pub job_container: Arc<RwLock<JobContainer>>,
pub preview_store: Option<Arc<PreviewStore>>,
pub desktop_enabled: bool,
}
#[derive(Deserialize)]
pub struct RawRequest {
native: Option<String>,
inline: Option<String>,
}
#[get("/api/raw/{hash}")]
pub async fn get_raw(
state: web::Data<State>,
web::Query(query): web::Query<RawRequest>,
hash: web::Path<String>,
) -> Result<impl Responder, Error> {
let address =
Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?;
if let Address::Hash(hash) = address {
let hash = Arc::new(hash);
// First check if there's an entry with this hash
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _hash = hash.clone();
let entry = web::block(move || connection.retrieve_entry(_hash.as_ref()))
.await
.map_err(ErrorInternalServerError)?;
if let Some(entry) = entry {
return Ok(Either::B(HttpResponse::Ok().json(entry)));
}
// Then, check the files
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _hash = hash.clone();
let files = web::block(move || connection.retrieve_file(_hash.as_ref()))
.await
.map_err(ErrorInternalServerError)?;
if let Some(file) = files.get(0) {
let file_path = state.upend.vault_path.join(&file.path);
if query.native.is_none() {
Ok(Either::A(
NamedFile::open(file_path)?
.set_content_disposition(ContentDisposition {
disposition: if query.inline.is_some() {
DispositionType::Inline
} else {
DispositionType::Attachment
},
parameters: vec![],
})
.with_header(
http::header::CACHE_CONTROL,
CacheControl(vec![
CacheDirective::MaxAge(2678400),
CacheDirective::Extension("immutable".into(), None),
]),
),
))
} else if state.desktop_enabled {
#[cfg(feature = "desktop")]
{
info!("Opening {:?}...", file_path);
let mut response = HttpResponse::NoContent();
let path = if !file_path.is_executable() {
file_path
} else {
response
.header(
http::header::WARNING,
"199 - Opening parent directory due to file being executable.",
)
.header(
http::header::ACCESS_CONTROL_EXPOSE_HEADERS,
http::header::WARNING.to_string(),
);
file_path
.parent()
.ok_or_else(|| {
ErrorInternalServerError("No parent to open as fallback.")
})?
.to_path_buf()
};
opener::open(path).map_err(error::ErrorServiceUnavailable)?;
Ok(Either::B(response.finish()))
}
#[cfg(not(feature = "desktop"))]
unreachable!()
} else {
Err(error::ErrorNotImplemented("Desktop features not enabled."))
}
} else {
Err(error::ErrorNotFound("NOT FOUND"))
}
} else {
Err(ErrorBadRequest(
"Address does not refer to a rawable object.",
))
}
}
#[derive(Deserialize)]
pub struct ThumbRequest {
mime: Option<String>,
}
#[get("/api/thumb/{hash}")]
pub async fn get_thumbnail(
state: web::Data<State>,
hash: web::Path<String>,
web::Query(query): web::Query<ThumbRequest>,
) -> Result<Either<NamedFile, HttpResponse>, Error> {
#[cfg(feature = "previews")]
if let Some(preview_store) = &state.preview_store {
let hash = hash.into_inner();
let address = Address::decode(&b58_decode(&hash).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?;
if let Address::Hash(address_hash) = address {
let preview_store = preview_store.clone();
let preview_result = web::block(move || {
preview_store.get(address_hash, query.mime, state.job_container.clone())
})
.await?;
if let Some(preview_path) = preview_result {
let mut file = NamedFile::open(&preview_path)?.disable_content_disposition();
if let Some(mime_type) = tree_magic_mini::from_filepath(&preview_path) {
if let Ok(mime) = mime_type.parse() {
file = file.set_content_type(mime);
}
}
return Ok(Either::A(file));
} else {
return Ok(Either::B(
HttpResponse::SeeOther()
.header(http::header::LOCATION, format!("../../api/raw/{hash}"))
.finish(),
));
}
} else {
return Err(ErrorBadRequest(
"Address does not refer to a previewable object.",
));
}
}
Err(error::ErrorNotImplemented("Previews not enabled."))
}
#[post("/api/query")]
pub async fn get_query(state: web::Data<State>, query: String) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let in_query: Query = query.parse().map_err(ErrorBadRequest)?;
let entries = web::block(move || connection.query(in_query))
.await
.map_err(ErrorInternalServerError)?;
let mut result: HashMap<String, Entry> = HashMap::new();
for entry in entries {
result.insert(
b58_encode(
entry
.address()
.map_err(ErrorInternalServerError)?
.encode()
.map_err(ErrorInternalServerError)?,
),
entry,
);
}
Ok(HttpResponse::Ok().json(&result))
}
trait EntriesAsHash {
fn as_hash(&self) -> Result<HashMap<String, &Entry>>;
}
impl EntriesAsHash for Vec<Entry> {
fn as_hash(&self) -> Result<HashMap<String, &Entry>> {
let mut result: HashMap<String, &Entry> = HashMap::new();
for entry in self {
result.insert(b58_encode(entry.address()?.encode()?), entry);
}
Ok(result)
}
}
#[get("/api/obj/{address_str}")]
pub async fn get_object(
state: web::Data<State>,
address: web::Path<Address>,
) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let address = address.into_inner();
let _address = address.clone();
let result: Vec<Entry> = web::block(move || connection.retrieve_object(&_address))
.await
.map_err(ErrorInternalServerError)?;
debug!("{:?}", result);
// TODO: make this automatically derive from `Address` definition
let (entity_type, entity_content) = match address {
Address::Hash(_) => ("Hash", None),
Address::Uuid(_) => ("Uuid", None),
Address::Attribute(attribute) => ("Attribute", Some(attribute)),
Address::Url(url) => ("Url", Some(url)),
};
Ok(HttpResponse::Ok().json(json!({
"entity": {
"t": entity_type,
"c": entity_content
},
"entries": result.as_hash().map_err(ErrorInternalServerError)?
})))
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum InEntry {
Entry(Entry),
EntryList(Vec<Entry>),
Invariant(InvariantEntry),
Address { entity: InAddress },
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum InAddress {
Address(String),
Components { t: String, c: Option<String> },
}
impl TryInto<Address> for InAddress {
type Error = anyhow::Error;
fn try_into(self) -> Result<Address, Self::Error> {
Ok(match self {
InAddress::Address(address) => address.parse()?,
InAddress::Components { t, c } => {
// I absolutely cannot handle serde magic right now
// TODO: make this automatically derive from `Address` definition
match t.as_str() {
"Attribute" => Address::Attribute(c.ok_or(anyhow!("Missing attribute."))?),
"Url" => Address::Url(c.ok_or(anyhow!("Missing URL."))?),
"Uuid" => match c {
Some(c) => c.parse()?,
None => Address::Uuid(Uuid::new_v4()),
},
_ => c.ok_or(anyhow!("Missing address."))?.parse()?,
}
}
})
}
}
#[put("/api/obj")]
pub async fn put_object(
state: web::Data<State>,
payload: Either<web::Json<InEntry>, Multipart>,
) -> Result<HttpResponse, Error> {
let (entry_address, entity_address) = match payload {
Either::A(in_entry) => {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let in_entry = in_entry.into_inner();
debug!("PUTting {in_entry:?}");
match in_entry {
InEntry::Entry(entry) => Ok(web::block::<_, _, anyhow::Error>(move || {
Ok((
Some(connection.insert_entry(entry.clone())?),
Some(entry.entity),
))
})
.await
.map_err(ErrorInternalServerError)?),
InEntry::EntryList(entries) => {
web::block(move || {
connection.transaction::<_, anyhow::Error, _>(|| {
for entry in entries {
connection.insert_entry(entry)?;
}
Ok(())
})
})
.await
.map_err(ErrorInternalServerError)?;
Ok((None, None))
}
InEntry::Invariant(in_entry) => {
let invariant = Entry::try_from(&InvariantEntry {
attribute: in_entry.attribute,
value: in_entry.value,
})
.map_err(ErrorInternalServerError)?;
Ok(web::block::<_, _, anyhow::Error>(move || {
Ok((
Some(connection.insert_entry(invariant.clone())?),
Some(invariant.entity),
))
})
.await
.map_err(ErrorInternalServerError)?)
}
InEntry::Address { entity: in_address } => {
let address = in_address.try_into().map_err(ErrorBadRequest)?;
let entries_to_add = match &address {
Address::Hash(_) | Address::Uuid(_) => vec![],
Address::Attribute(attribute) => vec![Entry {
entity: address.clone(),
attribute: LABEL_ATTR.to_string(),
value: format!("ATTRIBUTE: {attribute}").into(),
}],
Address::Url(url) => {
#[cfg(feature = "extractors-web")]
{
let _address = address.clone();
block_background(move || {
(crate::extractors::web::WebExtractor {}).insert_info(
&_address,
&state.upend.connection()?,
state.job_container.clone(),
)
});
}
vec![Entry {
entity: address.clone(),
attribute: LABEL_ATTR.to_string(),
value: url.clone().into(),
}]
}
};
Ok(web::block(move || {
connection.transaction::<_, anyhow::Error, _>(|| {
if connection.retrieve_object(&address)?.is_empty() {
connection.insert_entry(Entry {
entity: address.clone(),
attribute: ADDED_ATTR.to_string(),
value: EntryValue::Number(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
as f64,
),
})?;
}
for entry in entries_to_add {
connection.insert_entry(entry)?;
}
Ok((None, Some(address)))
})
})
.await
.map_err(ErrorInternalServerError)?)
}
}
}
Either::B(mut multipart) => {
if let Some(mut field) = multipart.try_next().await? {
let content_disposition = field
.content_disposition()
.ok_or_else(|| HttpResponse::BadRequest().finish())?;
let filename = content_disposition.get_filename().map(String::from);
let mut file = NamedTempFile::new()?;
while let Some(chunk) = field.try_next().await? {
file = web::block(move || file.write_all(&chunk).map(|_| file)).await?;
}
let path = PathBuf::from(file.path());
let hash = web::block(move || path.hash()).await?;
let address = Address::Hash(hash.clone());
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _hash = hash.clone();
let existing_files = web::block(move || connection.retrieve_file(&_hash))
.await
.map_err(ErrorInternalServerError)?;
if existing_files.is_empty() {
let addr_str = b58_encode(address.encode().map_err(ErrorInternalServerError)?);
let final_name = if let Some(ref filename) = filename {
format!("{addr_str}_{filename}")
} else {
addr_str
};
let final_path = state.upend.vault_path.join(&final_name);
let (_, tmp_path) = file.keep().map_err(ErrorInternalServerError)?;
let final_path = web::block::<_, _, io::Error>(move || {
fs::copy(&tmp_path, &final_path)?;
fs::remove_file(tmp_path)?;
Ok(final_path)
})
.await?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
web::block(move || add_file(&connection, &final_path, hash))
.await
.map_err(ErrorInternalServerError)?;
}
if let Some(ref filename) = filename {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _address = address.clone();
let _filename = filename.clone();
let _ = web::block(move || {
upend_insert_val!(&connection, _address, LABEL_ATTR, _filename)
})
.await;
}
Ok((None, Some(address)))
} else {
Err(anyhow!("Multipart contains no fields."))
}
}
}
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json([entry_address, entity_address]))
}
#[put("/api/obj/{address}/{attribute}")]
pub async fn put_object_attribute(
state: web::Data<State>,
web::Path((address, attribute)): web::Path<(Address, String)>,
value: web::Json<EntryValue>,
) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let new_address = web::block(move || {
connection.transaction::<_, anyhow::Error, _>(|| {
let existing_attr_entries =
connection.query(format!("(matches \"{address}\" \"{attribute}\" ?)").parse()?)?;
for eae in existing_attr_entries {
let _ = connection.remove_object(eae.address()?)?;
}
let new_attr_entry = Entry {
entity: address,
attribute,
value: value.into_inner(),
};
connection.insert_entry(new_attr_entry)
})
})
.await
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(new_address))
}
#[delete("/api/obj/{address_str}")]
pub async fn delete_object(
state: web::Data<State>,
address: web::Path<Address>,
) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _ = web::block(move || connection.remove_object(address.into_inner()))
.await
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().finish())
}
// #[post("api/obj/{address_str}")]
// pub async fn update_attribute(
// state: web::Data<State>,
// address_str: web::Path<String>,
// mut payload: web::Payload,
// ) -> Result<HttpResponse, Error> {
// let body = load_body(&mut payload)
// .await
// .map_err(error::ErrorBadRequest)?;
// let entry_value = serde_json::from_slice::<EntryValue>(&body).map_err(ErrorBadRequest)?;
// let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
// connection
// .transaction::<_, anyhow::Error, _>(|| {
// let address = Address::decode(&decode(address_str.into_inner())?)?;
// let _ = connection.remove_object(address)?;
// Ok(())
// })
// .map_err(ErrorInternalServerError)?;
// Ok(HttpResponse::Ok().finish())
// }
#[get("/api/all/attributes")]
pub async fn get_all_attributes(state: web::Data<State>) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let result = web::block(move || connection.get_all_attributes())
.await
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(result))
}
#[get("/api/hier/{path:.*}")]
pub async fn list_hier(
state: web::Data<State>,
path: web::Path<String>,
) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
if path.is_empty() {
Ok(HttpResponse::MovedPermanently()
.header(http::header::LOCATION, "../../api/hier_roots")
.finish())
} else {
let upath: UHierPath = path.into_inner().parse().map_err(ErrorBadRequest)?;
trace!("Listing path \"{}\"", upath);
// todo: 500 if actual error occurs
let path = web::block(move || resolve_path(&connection, &upath, false))
.await
.map_err(ErrorNotFound)?;
match path.last() {
Some(addr) => Ok(HttpResponse::Found()
.header(http::header::LOCATION, format!("../../api/obj/{}", addr))
.finish()),
None => Ok(HttpResponse::NotFound().finish()),
}
}
}
#[get("/api/hier_roots")]
pub async fn list_hier_roots(state: web::Data<State>) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let result = web::block(move || {
list_roots(&connection)?
.into_iter()
.map(|root| connection.retrieve_object(&root))
.collect::<Result<Vec<Vec<Entry>>>>()
})
.await
.map_err(ErrorInternalServerError)?
.concat();
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
}
#[derive(Deserialize)]
pub struct RescanRequest {
full: Option<String>,
}
#[post("/api/refresh")]
pub async fn api_refresh(
state: web::Data<State>,
web::Query(query): web::Query<RescanRequest>,
) -> Result<HttpResponse, Error> {
block_background(move || {
crate::filesystem::rescan_vault(
state.upend.clone(),
state.job_container.clone(),
query.full.is_none(),
false,
)
});
Ok(HttpResponse::Ok().finish())
}
#[get("/api/store")]
pub async fn store_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let files = web::block(move || connection.retrieve_all_files())
.await
.map_err(ErrorInternalServerError)?;
let mut files_by_hash = HashMap::new();
for file in &files {
if !files_by_hash.contains_key(&file.hash) {
files_by_hash.insert(&file.hash, vec![]);
}
files_by_hash.get_mut(&file.hash).unwrap().push(file);
}
for paths in files_by_hash.values_mut() {
paths.sort_unstable_by_key(|f| !f.valid);
}
let mut blobs = files_by_hash
.iter()
.map(|(hash, files)| {
json!({
"hash": hash,
"size": files[0].size,
"paths": files.iter().map(|f| json!({
"added": f.added,
"valid": f.valid,
"path": f.path
})).collect::<serde_json::Value>()
})
})
.collect::<Vec<serde_json::Value>>();
blobs.sort_unstable_by_key(|f| f["size"].as_u64().unwrap());
blobs.reverse();
Ok(HttpResponse::Ok().json(json!({
"totals": {
"count": files_by_hash.len(),
"size": files_by_hash.iter().map(|(_, f)| f[0].size as u64).sum::<u64>()
},
"blobs": blobs
})))
}
#[get("/api/store/{hash}")]
pub async fn get_file(
state: web::Data<State>,
hash: web::Path<String>,
) -> Result<HttpResponse, Error> {
let address =
Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
.map_err(ErrorInternalServerError)?;
if let Address::Hash(hash) = address {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let response = web::block(move || connection.retrieve_file(&hash))
.await
.map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok().json(response))
} else {
Err(ErrorBadRequest("Address does not refer to a file."))
}
}
#[derive(Deserialize)]
pub struct JobsRequest {
full: Option<String>,
}
#[get("/api/jobs")]
pub async fn get_jobs(
state: web::Data<State>,
web::Query(query): web::Query<JobsRequest>,
) -> Result<HttpResponse, Error> {
let jobs = state.job_container.read().unwrap().get_jobs();
Ok(HttpResponse::Ok().json(if query.full.is_some() {
jobs
} else {
jobs.into_iter()
.filter(|(_, j)| matches!(j.state, crate::util::jobs::State::InProgress))
.collect()
}))
}
#[get("/api/info")]
pub async fn get_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
Ok(HttpResponse::Ok().json(json!({
"name": state.vault_name,
"location": &*state.upend.vault_path,
"version": crate::common::PKG_VERSION,
"desktop": state.desktop_enabled
})))
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
#[test]
fn test_in_address() -> Result<()> {
let address = Address::Url("https://upendproject.net".into());
let in_address = InAddress::Address(address.to_string());
assert_eq!(address, in_address.try_into()?);
let in_address = InAddress::Components {
t: "Url".into(),
c: Some("https://upendproject.net".into()),
};
assert_eq!(address, in_address.try_into()?);
Ok(())
}
}