add web::block to all db actions in route handlers
This commit is contained in:
parent
0e1b1765ae
commit
d03998915c
3 changed files with 89 additions and 68 deletions
|
@ -279,11 +279,11 @@ impl UpEndConnection {
|
||||||
.to_path_buf())
|
.to_path_buf())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn retrieve_entry(&self, hash: Hash) -> Result<Option<Entry>> {
|
pub fn retrieve_entry(&self, hash: &Hash) -> Result<Option<Entry>> {
|
||||||
use crate::database::inner::schema::data::dsl::*;
|
use crate::database::inner::schema::data::dsl::*;
|
||||||
|
|
||||||
let entry = data
|
let entry = data
|
||||||
.filter(identity.eq(Address::Hash(hash).encode()?))
|
.filter(identity.eq(Address::Hash(hash.clone()).encode()?))
|
||||||
.load::<models::Entry>(&self.conn)?;
|
.load::<models::Entry>(&self.conn)?;
|
||||||
|
|
||||||
match entry.len() {
|
match entry.len() {
|
||||||
|
|
|
@ -185,7 +185,7 @@ fn main() -> Result<()> {
|
||||||
.service(routes::api_refresh)
|
.service(routes::api_refresh)
|
||||||
.service(routes::list_hier)
|
.service(routes::list_hier)
|
||||||
.service(routes::list_hier_roots)
|
.service(routes::list_hier_roots)
|
||||||
.service(routes::latest_files)
|
.service(routes::store_info)
|
||||||
.service(routes::get_file)
|
.service(routes::get_file)
|
||||||
.service(routes::get_jobs)
|
.service(routes::get_jobs)
|
||||||
.service(routes::get_info);
|
.service(routes::get_info);
|
||||||
|
|
151
src/routes.rs
151
src/routes.rs
|
@ -58,19 +58,23 @@ pub async fn get_raw(
|
||||||
Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
|
Address::decode(&b58_decode(hash.into_inner()).map_err(ErrorInternalServerError)?)
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
if let Address::Hash(hash) = address {
|
if let Address::Hash(hash) = address {
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let hash = Arc::new(hash);
|
||||||
|
|
||||||
// First check if there's an entry with this hash
|
// First check if there's an entry with this hash
|
||||||
let entry = connection
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
.retrieve_entry(hash.clone())
|
let _hash = hash.clone();
|
||||||
|
let entry = web::block(move || connection.retrieve_entry(_hash.as_ref()))
|
||||||
|
.await
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
if let Some(entry) = entry {
|
if let Some(entry) = entry {
|
||||||
return Ok(Either::B(HttpResponse::Ok().json(entry)));
|
return Ok(Either::B(HttpResponse::Ok().json(entry)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then, check the files
|
// Then, check the files
|
||||||
let files = connection
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
.retrieve_file(&hash)
|
let _hash = hash.clone();
|
||||||
|
let files = web::block(move || connection.retrieve_file(_hash.as_ref()))
|
||||||
|
.await
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
if let Some(file) = files.get(0) {
|
if let Some(file) = files.get(0) {
|
||||||
let file_path = state.upend.vault_path.join(&file.path);
|
let file_path = state.upend.vault_path.join(&file.path);
|
||||||
|
@ -190,8 +194,8 @@ pub async fn get_query(state: web::Data<State>, query: String) -> Result<HttpRes
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
let in_query: Query = query.parse().map_err(ErrorBadRequest)?;
|
let in_query: Query = query.parse().map_err(ErrorBadRequest)?;
|
||||||
let entries = connection
|
let entries = web::block(move || connection.query(in_query))
|
||||||
.query(in_query)
|
.await
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
let mut result: HashMap<String, Entry> = HashMap::new();
|
let mut result: HashMap<String, Entry> = HashMap::new();
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
|
@ -232,8 +236,10 @@ pub async fn get_object(
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let address = address.into_inner();
|
let address = address.into_inner();
|
||||||
let result: Vec<Entry> = connection
|
|
||||||
.retrieve_object(&address)
|
let _address = address.clone();
|
||||||
|
let result: Vec<Entry> = web::block(move || connection.retrieve_object(&_address))
|
||||||
|
.await
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
debug!("{:?}", result);
|
debug!("{:?}", result);
|
||||||
|
@ -307,23 +313,25 @@ pub async fn put_object(
|
||||||
debug!("PUTting {in_entry:?}");
|
debug!("PUTting {in_entry:?}");
|
||||||
|
|
||||||
match in_entry {
|
match in_entry {
|
||||||
InEntry::Entry(entry) => Ok((
|
InEntry::Entry(entry) => Ok(web::block::<_, _, anyhow::Error>(move || {
|
||||||
Some(
|
Ok((
|
||||||
connection
|
Some(connection.insert_entry(entry.clone())?),
|
||||||
.insert_entry(entry.clone())
|
Some(entry.entity),
|
||||||
.map_err(ErrorInternalServerError)?,
|
))
|
||||||
),
|
})
|
||||||
Some(entry.entity),
|
.await
|
||||||
)),
|
.map_err(ErrorInternalServerError)?),
|
||||||
InEntry::EntryList(entries) => {
|
InEntry::EntryList(entries) => {
|
||||||
connection
|
web::block(move || {
|
||||||
.transaction::<_, anyhow::Error, _>(|| {
|
connection.transaction::<_, anyhow::Error, _>(|| {
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
connection.insert_entry(entry)?;
|
connection.insert_entry(entry)?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.map_err(ErrorInternalServerError)?;
|
})
|
||||||
|
.await
|
||||||
|
.map_err(ErrorInternalServerError)?;
|
||||||
Ok((None, None))
|
Ok((None, None))
|
||||||
}
|
}
|
||||||
InEntry::Invariant(in_entry) => {
|
InEntry::Invariant(in_entry) => {
|
||||||
|
@ -333,14 +341,14 @@ pub async fn put_object(
|
||||||
})
|
})
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
Ok((
|
Ok(web::block::<_, _, anyhow::Error>(move || {
|
||||||
Some(
|
Ok((
|
||||||
connection
|
Some(connection.insert_entry(invariant.clone())?),
|
||||||
.insert_entry(invariant.clone())
|
Some(invariant.entity),
|
||||||
.map_err(ErrorInternalServerError)?,
|
))
|
||||||
),
|
})
|
||||||
Some(invariant.entity),
|
.await
|
||||||
))
|
.map_err(ErrorInternalServerError)?)
|
||||||
}
|
}
|
||||||
InEntry::Address { entity: in_address } => {
|
InEntry::Address { entity: in_address } => {
|
||||||
let address = in_address.try_into().map_err(ErrorBadRequest)?;
|
let address = in_address.try_into().map_err(ErrorBadRequest)?;
|
||||||
|
@ -369,8 +377,8 @@ pub async fn put_object(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
connection
|
Ok(web::block(move || {
|
||||||
.transaction::<_, anyhow::Error, _>(|| {
|
connection.transaction::<_, anyhow::Error, _>(|| {
|
||||||
if connection.retrieve_object(&address)?.is_empty() {
|
if connection.retrieve_object(&address)?.is_empty() {
|
||||||
connection.insert_entry(Entry {
|
connection.insert_entry(Entry {
|
||||||
entity: address.clone(),
|
entity: address.clone(),
|
||||||
|
@ -389,11 +397,11 @@ pub async fn put_object(
|
||||||
connection.insert_entry(entry)?;
|
connection.insert_entry(entry)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok((None, Some(address)))
|
||||||
})
|
})
|
||||||
.map_err(ErrorInternalServerError)?;
|
})
|
||||||
|
.await
|
||||||
Ok((None, Some(address)))
|
.map_err(ErrorInternalServerError)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -403,7 +411,7 @@ pub async fn put_object(
|
||||||
.content_disposition()
|
.content_disposition()
|
||||||
.ok_or_else(|| HttpResponse::BadRequest().finish())?;
|
.ok_or_else(|| HttpResponse::BadRequest().finish())?;
|
||||||
|
|
||||||
let filename = content_disposition.get_filename();
|
let filename = content_disposition.get_filename().map(String::from);
|
||||||
|
|
||||||
let mut file = NamedTempFile::new()?;
|
let mut file = NamedTempFile::new()?;
|
||||||
while let Some(chunk) = field.try_next().await? {
|
while let Some(chunk) = field.try_next().await? {
|
||||||
|
@ -415,13 +423,14 @@ pub async fn put_object(
|
||||||
|
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
let existing_files = connection
|
let _hash = hash.clone();
|
||||||
.retrieve_file(&hash)
|
let existing_files = web::block(move || connection.retrieve_file(&_hash))
|
||||||
|
.await
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
if existing_files.is_empty() {
|
if existing_files.is_empty() {
|
||||||
let addr_str = b58_encode(address.encode().map_err(ErrorInternalServerError)?);
|
let addr_str = b58_encode(address.encode().map_err(ErrorInternalServerError)?);
|
||||||
let final_name = if let Some(filename) = filename {
|
let final_name = if let Some(ref filename) = filename {
|
||||||
format!("{addr_str}_{filename}")
|
format!("{addr_str}_{filename}")
|
||||||
} else {
|
} else {
|
||||||
addr_str
|
addr_str
|
||||||
|
@ -437,11 +446,20 @@ pub async fn put_object(
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
add_file(&connection, &final_path, hash).map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
web::block(move || add_file(&connection, &final_path, hash))
|
||||||
|
.await
|
||||||
|
.map_err(ErrorInternalServerError)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(filename) = filename {
|
if let Some(ref filename) = filename {
|
||||||
let _ = upend_insert_val!(&connection, address, LABEL_ATTR, filename);
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
let _address = address.clone();
|
||||||
|
let _filename = filename.clone();
|
||||||
|
let _ = web::block(move || {
|
||||||
|
upend_insert_val!(&connection, _address, LABEL_ATTR, _filename)
|
||||||
|
})
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((None, Some(address)))
|
Ok((None, Some(address)))
|
||||||
|
@ -463,8 +481,8 @@ pub async fn put_object_attribute(
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
let new_address = connection
|
let new_address = web::block(move || {
|
||||||
.transaction::<_, anyhow::Error, _>(|| {
|
connection.transaction::<_, anyhow::Error, _>(|| {
|
||||||
let existing_attr_entries =
|
let existing_attr_entries =
|
||||||
connection.query(format!("(matches \"{address}\" \"{attribute}\" ?)").parse()?)?;
|
connection.query(format!("(matches \"{address}\" \"{attribute}\" ?)").parse()?)?;
|
||||||
|
|
||||||
|
@ -480,7 +498,9 @@ pub async fn put_object_attribute(
|
||||||
|
|
||||||
connection.insert_entry(new_attr_entry)
|
connection.insert_entry(new_attr_entry)
|
||||||
})
|
})
|
||||||
.map_err(ErrorInternalServerError)?;
|
})
|
||||||
|
.await
|
||||||
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(new_address))
|
Ok(HttpResponse::Ok().json(new_address))
|
||||||
}
|
}
|
||||||
|
@ -488,14 +508,11 @@ pub async fn put_object_attribute(
|
||||||
#[delete("/api/obj/{address_str}")]
|
#[delete("/api/obj/{address_str}")]
|
||||||
pub async fn delete_object(
|
pub async fn delete_object(
|
||||||
state: web::Data<State>,
|
state: web::Data<State>,
|
||||||
address_str: web::Path<String>,
|
address: web::Path<Address>,
|
||||||
) -> Result<HttpResponse, Error> {
|
) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let _ = connection
|
let _ = web::block(move || connection.remove_object(address.into_inner()))
|
||||||
.remove_object(
|
.await
|
||||||
Address::decode(&b58_decode(address_str.into_inner()).map_err(ErrorBadRequest)?)
|
|
||||||
.map_err(ErrorInternalServerError)?,
|
|
||||||
)
|
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().finish())
|
Ok(HttpResponse::Ok().finish())
|
||||||
|
@ -529,8 +546,8 @@ pub async fn delete_object(
|
||||||
#[get("/api/all/attributes")]
|
#[get("/api/all/attributes")]
|
||||||
pub async fn get_all_attributes(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
pub async fn get_all_attributes(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let result = connection
|
let result = web::block(move || connection.get_all_attributes())
|
||||||
.get_all_attributes()
|
.await
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
Ok(HttpResponse::Ok().json(result))
|
Ok(HttpResponse::Ok().json(result))
|
||||||
}
|
}
|
||||||
|
@ -550,7 +567,9 @@ pub async fn list_hier(
|
||||||
trace!("Listing path \"{}\"", upath);
|
trace!("Listing path \"{}\"", upath);
|
||||||
|
|
||||||
// todo: 500 if actual error occurs
|
// todo: 500 if actual error occurs
|
||||||
let path = resolve_path(&connection, &upath, false).map_err(ErrorNotFound)?;
|
let path = web::block(move || resolve_path(&connection, &upath, false))
|
||||||
|
.await
|
||||||
|
.map_err(ErrorNotFound)?;
|
||||||
match path.last() {
|
match path.last() {
|
||||||
Some(addr) => Ok(HttpResponse::Found()
|
Some(addr) => Ok(HttpResponse::Found()
|
||||||
.header(http::header::LOCATION, format!("../../api/obj/{}", addr))
|
.header(http::header::LOCATION, format!("../../api/obj/{}", addr))
|
||||||
|
@ -564,13 +583,15 @@ pub async fn list_hier(
|
||||||
pub async fn list_hier_roots(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
pub async fn list_hier_roots(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
let result = list_roots(&connection)
|
let result = web::block(move || {
|
||||||
.map_err(ErrorInternalServerError)?
|
list_roots(&connection)?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|root| connection.retrieve_object(&root))
|
.map(|root| connection.retrieve_object(&root))
|
||||||
.collect::<Result<Vec<Vec<Entry>>>>()
|
.collect::<Result<Vec<Vec<Entry>>>>()
|
||||||
.map_err(ErrorInternalServerError)?
|
})
|
||||||
.concat();
|
.await
|
||||||
|
.map_err(ErrorInternalServerError)?
|
||||||
|
.concat();
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
|
Ok(HttpResponse::Ok().json(result.as_hash().map_err(ErrorInternalServerError)?))
|
||||||
}
|
}
|
||||||
|
@ -595,10 +616,10 @@ pub async fn api_refresh(
|
||||||
}
|
}
|
||||||
|
|
||||||
#[get("/api/store")]
|
#[get("/api/store")]
|
||||||
pub async fn latest_files(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
pub async fn store_info(state: web::Data<State>) -> Result<HttpResponse, Error> {
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let files = connection
|
let files = web::block(move || connection.retrieve_all_files())
|
||||||
.retrieve_all_files()
|
.await
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
let mut files_by_hash = HashMap::new();
|
let mut files_by_hash = HashMap::new();
|
||||||
for file in &files {
|
for file in &files {
|
||||||
|
@ -650,8 +671,8 @@ pub async fn get_file(
|
||||||
|
|
||||||
if let Address::Hash(hash) = address {
|
if let Address::Hash(hash) = address {
|
||||||
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
|
||||||
let response = connection
|
let response = web::block(move || connection.retrieve_file(&hash))
|
||||||
.retrieve_file(&hash)
|
.await
|
||||||
.map_err(ErrorInternalServerError)?;
|
.map_err(ErrorInternalServerError)?;
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(response))
|
Ok(HttpResponse::Ok().json(response))
|
||||||
|
|
Loading…
Reference in a new issue