diff --git a/base/src/entry.rs b/base/src/entry.rs index 1c9c6ed..a246aff 100644 --- a/base/src/entry.rs +++ b/base/src/entry.rs @@ -49,6 +49,7 @@ pub struct Entry { pub attribute: Attribute, pub value: EntryValue, pub provenance: String, + pub user: Option, pub timestamp: NaiveDateTime, } @@ -81,6 +82,7 @@ impl TryFrom<&InvariantEntry> for Entry { attribute: invariant.attribute.clone(), value: invariant.value.clone(), provenance: "INVARIANT".to_string(), + user: None, timestamp: NaiveDateTime::from_timestamp_opt(0, 0).unwrap(), }) } diff --git a/cli/src/extractors/audio.rs b/cli/src/extractors/audio.rs index df0eb57..a421414 100644 --- a/cli/src/extractors/audio.rs +++ b/cli/src/extractors/audio.rs @@ -13,7 +13,7 @@ use upend_db::stores::Blob; use upend_db::{ jobs::{JobContainer, JobState}, stores::{fs::FILE_MIME_KEY, UpStore}, - BlobMode, UpEndConnection, + BlobMode, OperationContext, UpEndConnection, }; lazy_static! { @@ -26,6 +26,7 @@ lazy_static! { attribute: ATTR_LABEL.parse().unwrap(), value: "ID3".into(), provenance: "INVARIANT".to_string(), + user: None, timestamp: chrono::Utc::now().naive_utc(), }; } @@ -39,6 +40,7 @@ impl Extractor for ID3Extractor { connection: &UpEndConnection, store: Arc>, mut job_container: JobContainer, + context: OperationContext, ) -> Result> { if let Address::Hash(hash) = address { let files = store.retrieve(hash)?; @@ -72,14 +74,16 @@ impl Extractor for ID3Extractor { "TYER" | "TBPM" => EntryValue::guess_from(text), _ => text.clone().into(), }, - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, Entry { entity: Address::Attribute(format!("ID3_{}", frame.id()).parse()?), attribute: ATTR_LABEL.parse().unwrap(), value: format!("ID3: {}", frame.name()).into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, ]); @@ -97,12 +101,14 @@ impl Extractor for ID3Extractor { Blob::from_filepath(&tmp_path), None, Some(BlobMode::StoreOnly), + context.clone(), )?; result.push(Entry { entity: address.clone(), attribute: "ID3_PICTURE".parse()?, value: EntryValue::Address(Address::Hash(hash)), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }); has_pictures = true; @@ -112,7 +118,8 @@ impl Extractor for ID3Extractor { entity: Address::Attribute("ID3_PICTURE".parse()?), attribute: ATTR_LABEL.parse().unwrap(), value: "ID3 Embedded Image".into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }) } @@ -126,7 +133,8 @@ impl Extractor for ID3Extractor { entity: Address::Attribute(e.attribute.clone()), attribute: ATTR_OF.parse().unwrap(), value: EntryValue::Address(ID3_TYPE_INVARIANT.entity().unwrap()), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }) .collect::>(), @@ -138,7 +146,8 @@ impl Extractor for ID3Extractor { entity: address.clone(), attribute: ATTR_IN.parse().unwrap(), value: EntryValue::Address(ID3_TYPE_INVARIANT.entity().unwrap()), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, ]); diff --git a/cli/src/extractors/exif.rs b/cli/src/extractors/exif.rs index 8ddf074..9c73825 100644 --- a/cli/src/extractors/exif.rs +++ b/cli/src/extractors/exif.rs @@ -12,7 +12,7 @@ use upend_base::{ use upend_db::{ jobs::{JobContainer, JobState}, stores::{fs::FILE_MIME_KEY, UpStore}, - UpEndConnection, + OperationContext, UpEndConnection, }; pub struct ExifExtractor; @@ -31,6 +31,7 @@ lazy_static! { value: "EXIF".into(), provenance: "INVARIANT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: None }; } @@ -41,6 +42,7 @@ impl Extractor for ExifExtractor { _connection: &UpEndConnection, store: Arc>, mut job_container: JobContainer, + context: OperationContext, ) -> Result> { if let Address::Hash(hash) = address { let files = store.retrieve(hash)?; @@ -86,14 +88,16 @@ impl Extractor for ExifExtractor { EntryValue::guess_from(format!("{}", field.display_value())) } }, - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, Entry { entity: Address::Attribute(attribute), attribute: ATTR_LABEL.parse().unwrap(), value: format!("EXIF: {}", tag_description).into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, ]); @@ -109,7 +113,8 @@ impl Extractor for ExifExtractor { entity: Address::Attribute(e.attribute.clone()), attribute: ATTR_OF.parse().unwrap(), value: EntryValue::Address(EXIF_TYPE_INVARIANT.entity().unwrap()), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }) .collect::>(), @@ -123,7 +128,8 @@ impl Extractor for ExifExtractor { entity: address.clone(), attribute: ATTR_IN.parse().unwrap(), value: EntryValue::Address(EXIF_TYPE_INVARIANT.entity().unwrap()), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, ]); diff --git a/cli/src/extractors/media.rs b/cli/src/extractors/media.rs index 8e7d4ad..0f15e5f 100644 --- a/cli/src/extractors/media.rs +++ b/cli/src/extractors/media.rs @@ -12,7 +12,7 @@ use upend_base::{ use upend_db::{ jobs::{JobContainer, JobState}, stores::{fs::FILE_MIME_KEY, UpStore}, - UpEndConnection, + OperationContext, UpEndConnection, }; const DURATION_KEY: &str = "MEDIA_DURATION"; @@ -28,6 +28,7 @@ lazy_static! { value: "Multimedia".into(), provenance: "INVARIANT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: None, }; pub static ref DURATION_OF_MEDIA: Entry = Entry { entity: Address::Attribute(DURATION_KEY.parse().unwrap()), @@ -35,6 +36,7 @@ lazy_static! { value: EntryValue::Address(MEDIA_TYPE_INVARIANT.entity().unwrap()), provenance: "INVARIANT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: None, }; } @@ -47,6 +49,7 @@ impl Extractor for MediaExtractor { _connection: &UpEndConnection, store: Arc>, mut job_container: JobContainer, + context: OperationContext, ) -> Result> { if let Address::Hash(hash) = address { let files = store.retrieve(hash)?; @@ -95,7 +98,8 @@ impl Extractor for MediaExtractor { entity: address.clone(), attribute: DURATION_KEY.parse().unwrap(), value: EntryValue::Number(duration), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, (&MEDIA_TYPE_INVARIANT as &InvariantEntry) @@ -107,7 +111,8 @@ impl Extractor for MediaExtractor { entity: address.clone(), attribute: ATTR_IN.parse().unwrap(), value: EntryValue::Address(MEDIA_TYPE_INVARIANT.entity().unwrap()), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, ]; diff --git a/cli/src/extractors/mod.rs b/cli/src/extractors/mod.rs index 1b19bda..d5d28eb 100644 --- a/cli/src/extractors/mod.rs +++ b/cli/src/extractors/mod.rs @@ -6,7 +6,9 @@ use std::{ }; use tracing::{debug, info, trace}; use upend_base::{addressing::Address, entry::Entry}; -use upend_db::{jobs::JobContainer, stores::UpStore, UpEndConnection, UpEndDatabase}; +use upend_db::{ + jobs::JobContainer, stores::UpStore, OperationContext, UpEndConnection, UpEndDatabase, +}; #[cfg(feature = "extractors-web")] pub mod web; @@ -27,6 +29,7 @@ pub trait Extractor { connection: &UpEndConnection, store: Arc>, job_container: JobContainer, + context: OperationContext, ) -> Result>; fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result { @@ -39,9 +42,10 @@ pub trait Extractor { connection: &UpEndConnection, store: Arc>, job_container: JobContainer, + context: OperationContext, ) -> Result { if self.is_needed(address, connection)? { - let entries = self.get(address, connection, store, job_container)?; + let entries = self.get(address, connection, store, job_container, context)?; trace!("For \"{address}\", got: {entries:?}"); connection.transaction(|| { @@ -62,6 +66,7 @@ pub fn extract_all>( db: D, store: Arc>, mut job_container: JobContainer, + context: OperationContext, ) -> Result { info!("Extracting metadata for all addresses."); @@ -77,7 +82,13 @@ pub fn extract_all>( .par_iter() .map(|address| { let connection = db.connection()?; - let entry_count = extract(address, &connection, store.clone(), job_container.clone()); + let entry_count = extract( + address, + &connection, + store.clone(), + job_container.clone(), + context.clone(), + ); let mut cnt = count.write().unwrap(); *cnt += 1; @@ -107,6 +118,7 @@ pub fn extract( connection: &UpEndConnection, store: Arc>, job_container: JobContainer, + context: OperationContext, ) -> usize { let mut entry_count = 0; trace!("Extracting metadata for {address:?}"); @@ -118,6 +130,7 @@ pub fn extract( connection, store.clone(), job_container.clone(), + context.clone(), ); match extract_result { @@ -133,6 +146,7 @@ pub fn extract( connection, store.clone(), job_container.clone(), + context.clone(), ); match extract_result { @@ -148,6 +162,7 @@ pub fn extract( connection, store.clone(), job_container.clone(), + context.clone(), ); match extract_result { @@ -158,8 +173,13 @@ pub fn extract( #[cfg(feature = "extractors-media")] { - let extract_result = - media::MediaExtractor.insert_info(address, connection, store.clone(), job_container); + let extract_result = media::MediaExtractor.insert_info( + address, + connection, + store.clone(), + job_container, + context.clone(), + ); match extract_result { Ok(count) => entry_count += count, diff --git a/cli/src/extractors/web.rs b/cli/src/extractors/web.rs index 8c72ade..a664e30 100644 --- a/cli/src/extractors/web.rs +++ b/cli/src/extractors/web.rs @@ -14,7 +14,7 @@ use upend_base::entry::EntryValue; use upend_db::jobs::JobContainer; use upend_db::jobs::JobState; use upend_db::stores::UpStore; -use upend_db::UpEndConnection; +use upend_db::{OperationContext, UpEndConnection}; use webpage::HTML; pub struct WebExtractor; @@ -26,6 +26,7 @@ impl Extractor for WebExtractor { _connection: &UpEndConnection, _store: Arc>, mut job_container: JobContainer, + context: OperationContext, ) -> Result> { if let Address::Url(url) = address { let mut job_handle = @@ -42,21 +43,24 @@ impl Extractor for WebExtractor { entity: address.clone(), attribute: "HTML_TITLE".parse().unwrap(), value: html_title.clone().into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }), html.title.map(|html_title| Entry { entity: address.clone(), attribute: ATTR_LABEL.parse().unwrap(), value: html_title.into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }), html.description.map(|html_desc| Entry { entity: address.clone(), attribute: "HTML_DESCRIPTION".parse().unwrap(), value: html_desc.into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }), ]; @@ -67,7 +71,8 @@ impl Extractor for WebExtractor { entity: address.clone(), attribute: ATTR_LABEL.parse()?, value: value.clone().into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), })); } @@ -76,7 +81,8 @@ impl Extractor for WebExtractor { entity: address.clone(), attribute: attribute.parse()?, value: value.into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), })); } @@ -85,7 +91,8 @@ impl Extractor for WebExtractor { entity: address.clone(), attribute: "OG_IMAGE".parse()?, value: image.url.into(), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), })) } @@ -101,7 +108,8 @@ impl Extractor for WebExtractor { entity: Address::Attribute(e.attribute.clone()), attribute: ATTR_OF.parse().unwrap(), value: EntryValue::Address(TYPE_URL_ADDRESS.clone()), - provenance: "SYSTEM EXTRACTOR".to_string(), + provenance: context.provenance.clone() + "EXTRACTOR", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }, e, @@ -149,7 +157,13 @@ mod test { let address = Address::Url(Url::parse("https://upend.dev").unwrap()); assert!(WebExtractor.is_needed(&address, &connection)?); - WebExtractor.insert_info(&address, &connection, store, job_container)?; + WebExtractor.insert_info( + &address, + &connection, + store, + job_container, + OperationContext::default(), + )?; assert!(!WebExtractor.is_needed(&address, &connection)?); diff --git a/cli/src/main.rs b/cli/src/main.rs index 8683e99..2167751 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -26,7 +26,7 @@ use upend_base::hash::{sha256hash, UpMultihash}; use upend_db::jobs::JobContainer; use upend_db::stores::fs::FsStore; use upend_db::stores::UpStore; -use upend_db::{BlobMode, UpEndDatabase}; +use upend_db::{BlobMode, OperationContext, UpEndDatabase}; use crate::util::exec::block_background; @@ -467,8 +467,14 @@ async fn main() -> Result<()> { initial: false, tree_mode, }, + OperationContext::default(), + ); + let _ = extractors::extract_all( + upend, + state.store, + job_container, + OperationContext::default(), ); - let _ = extractors::extract_all(upend, state.store, job_container); Ok(()) }); } diff --git a/cli/src/routes.rs b/cli/src/routes.rs index 23fffe1..2ebc252 100644 --- a/cli/src/routes.rs +++ b/cli/src/routes.rs @@ -41,6 +41,7 @@ use upend_db::jobs; use upend_db::stores::UpdateOptions; use upend_db::stores::{Blob, UpStore}; use upend_db::BlobMode; +use upend_db::OperationContext; use upend_db::UpEndDatabase; use upend_db::VaultOptions; use url::Url; @@ -491,7 +492,7 @@ pub async fn put_object( payload: web::Json, web::Query(query): web::Query, ) -> Result { - check_auth(&req, &state)?; + let user = check_auth(&req, &state)?; let (entry_address, entity_address) = { let connection = state.upend.connection().map_err(ErrorInternalServerError)?; @@ -500,6 +501,7 @@ pub async fn put_object( debug!("PUTting {in_entry:?}"); let provenance = query.provenance.clone(); + let _user = user.clone(); let process_inentry = move |in_entry: InEntry| -> Result { if let Some(entity) = in_entry.entity { Ok(Entry { @@ -513,6 +515,7 @@ pub async fn put_object( .trim() .to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: _user.clone(), }) } else { Ok(Entry::try_from(&InvariantEntry { @@ -554,15 +557,25 @@ pub async fn put_object( let _address = address.clone(); let _job_container = state.job_container.clone(); let _store = state.store.clone(); + let _user = user.clone(); block_background::<_, _, anyhow::Error>(move || { - let entry_count = - extractors::extract(&_address, &connection, _store, _job_container); + let entry_count = extractors::extract( + &_address, + &connection, + _store, + _job_container, + OperationContext { + user: _user, + provenance: "API".to_string(), + }, + ); debug!("Added {entry_count} extracted entries for {_address:?}"); Ok(()) }); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; + let _user = user.clone(); web::block(move || { connection.transaction::<_, anyhow::Error, _>(|| { if connection.retrieve_object(&address)?.is_empty() { @@ -581,6 +594,7 @@ pub async fn put_object( }) .trim() .to_string(), + user: _user, timestamp: chrono::Utc::now().naive_utc(), })?; } @@ -603,7 +617,7 @@ pub async fn put_blob( state: web::Data, mut payload: Multipart, ) -> Result { - check_auth(&req, &state)?; + let user = check_auth(&req, &state)?; if let Some(mut field) = payload.try_next().await? { let mut file = NamedTempFile::new()?; @@ -642,6 +656,7 @@ pub async fn put_blob( let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let _store = state.store.clone(); let _filename = filename.clone(); + let _user = user.clone(); let hash = web::block(move || { let options = connection.get_vault_options()?; _store @@ -650,6 +665,10 @@ pub async fn put_blob( Blob::from_filepath(file.path()), _filename, options.blob_mode, + OperationContext { + user: _user, + provenance: "API".to_string(), + }, ) .map_err(anyhow::Error::from) }) @@ -675,8 +694,18 @@ pub async fn put_blob( let _job_container = state.job_container.clone(); let _store = state.store.clone(); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; + let _user = user.clone(); block_background::<_, _, anyhow::Error>(move || { - let entry_count = extractors::extract(&_address, &connection, _store, _job_container); + let entry_count = extractors::extract( + &_address, + &connection, + _store, + _job_container, + OperationContext { + user: _user, + provenance: "API".to_string(), + }, + ); debug!("Added {entry_count} extracted entries for {_address:?}"); Ok(()) }); @@ -694,7 +723,7 @@ pub async fn put_object_attribute( value: web::Json, web::Query(query): web::Query, ) -> Result { - check_auth(&req, &state)?; + let user = check_auth(&req, &state)?; let (address, attribute) = path.into_inner(); let connection = state.upend.connection().map_err(ErrorInternalServerError)?; @@ -717,6 +746,7 @@ pub async fn put_object_attribute( }) .trim() .to_string(), + user: user.clone(), timestamp: chrono::Utc::now().naive_utc(), }; @@ -869,7 +899,7 @@ pub async fn list_hier( path: web::Path, req: HttpRequest, ) -> Result { - check_auth(&req, &state)?; + let user = check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; if path.is_empty() { @@ -881,9 +911,19 @@ pub async fn list_hier( trace!(r#"Listing path "{}""#, upath); let create = !req.method().is_safe(); - let path = web::block(move || resolve_path(&connection, &upath, create)) - .await? - .map_err(ErrorNotFound)?; + let path = web::block(move || { + resolve_path( + &connection, + &upath, + create, + OperationContext { + user, + provenance: "API".to_string(), + }, + ) + }) + .await? + .map_err(ErrorNotFound)?; match path.last() { Some(addr) => Ok(HttpResponse::Found() .append_header((http::header::LOCATION, format!("../../api/obj/{}", addr))) @@ -926,7 +966,7 @@ pub async fn api_refresh( state: web::Data, web::Query(query): web::Query, ) -> Result { - check_auth(&req, &state)?; + let user = check_auth(&req, &state)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?; @@ -943,11 +983,19 @@ pub async fn api_refresh( .unwrap_or_default(), ), }, + OperationContext { + user: user.clone(), + provenance: "API".to_string(), + }, ); let _ = crate::extractors::extract_all( state.upend.clone(), state.store.clone(), state.job_container.clone(), + OperationContext { + user: user.clone(), + provenance: "API".to_string(), + }, ); Ok(()) }); @@ -1321,6 +1369,7 @@ mod tests { initial: true, tree_mode: upend_db::BlobMode::default(), }, + OperationContext::default(), ) .unwrap(); diff --git a/db/migrations/upend/02_users_entries/down.sql b/db/migrations/upend/02_users_entries/down.sql new file mode 100644 index 0000000..a812629 --- /dev/null +++ b/db/migrations/upend/02_users_entries/down.sql @@ -0,0 +1,2 @@ +ALTER TABLE data + DROP COLUMN user; diff --git a/db/migrations/upend/02_users_entries/up.sql b/db/migrations/upend/02_users_entries/up.sql new file mode 100644 index 0000000..7c26e87 --- /dev/null +++ b/db/migrations/upend/02_users_entries/up.sql @@ -0,0 +1,2 @@ +ALTER TABLE data + ADD COLUMN user VARCHAR; diff --git a/db/src/entry.rs b/db/src/entry.rs index 89d7814..591cf24 100644 --- a/db/src/entry.rs +++ b/db/src/entry.rs @@ -14,6 +14,7 @@ impl TryFrom<&models::Entry> for Entry { attribute: e.attribute.parse()?, value: value_str.parse().unwrap(), provenance: e.provenance.clone(), + user: e.user.clone(), timestamp: e.timestamp, }) } else if let Some(value_num) = e.value_num { @@ -22,6 +23,7 @@ impl TryFrom<&models::Entry> for Entry { attribute: e.attribute.parse()?, value: EntryValue::Number(value_num), provenance: e.provenance.clone(), + user: e.user.clone(), timestamp: e.timestamp, }) } else { @@ -30,6 +32,7 @@ impl TryFrom<&models::Entry> for Entry { attribute: e.attribute.parse()?, value: EntryValue::Number(f64::NAN), provenance: e.provenance.clone(), + user: e.user.clone(), timestamp: e.timestamp, }) } @@ -53,6 +56,7 @@ impl TryFrom<&Entry> for models::Entry { value_num: None, immutable: false, provenance: e.provenance.clone(), + user: e.user.clone(), timestamp: e.timestamp, }; diff --git a/db/src/hierarchies.rs b/db/src/hierarchies.rs index 01714f7..d34cc9e 100644 --- a/db/src/hierarchies.rs +++ b/db/src/hierarchies.rs @@ -6,6 +6,7 @@ use lru::LruCache; use tracing::trace; use uuid::Uuid; +use crate::OperationContext; use upend_base::addressing::Address; use upend_base::constants::ATTR_LABEL; use upend_base::constants::{ATTR_IN, HIER_ROOT_ADDR, HIER_ROOT_INVARIANT}; @@ -91,6 +92,7 @@ pub fn fetch_or_create_dir( parent: Option
, directory: UNode, create: bool, + context: OperationContext, ) -> Result
{ match parent.clone() { Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory), @@ -137,7 +139,8 @@ pub fn fetch_or_create_dir( entity: new_directory_address.clone(), attribute: ATTR_LABEL.parse().unwrap(), value: directory.to_string().into(), - provenance: "SYSTEM FS".to_string(), + provenance: context.provenance.clone() + "HIER", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), }; connection.insert_entry(directory_entry)?; @@ -147,7 +150,8 @@ pub fn fetch_or_create_dir( entity: new_directory_address.clone(), attribute: ATTR_IN.parse().unwrap(), value: parent.into(), - provenance: "SYSTEM FS".to_string(), + provenance: context.provenance.clone() + "HIER", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), } } else { @@ -155,7 +159,8 @@ pub fn fetch_or_create_dir( entity: new_directory_address.clone(), attribute: ATTR_IN.parse().unwrap(), value: HIER_ROOT_ADDR.clone().into(), - provenance: "SYSTEM FS".to_string(), + provenance: context.provenance.clone() + "HIER", + user: context.user.clone(), timestamp: chrono::Utc::now().naive_utc(), } })?; @@ -177,6 +182,7 @@ pub fn resolve_path( connection: &UpEndConnection, path: &UHierPath, create: bool, + context: OperationContext, ) -> Result> { let mut result: Vec
= vec![]; let mut path_stack = path.0.to_vec(); @@ -188,6 +194,7 @@ pub fn resolve_path( result.last().cloned(), path_stack.pop().unwrap(), create, + context.clone(), )?; result.push(dir_address); } @@ -201,6 +208,7 @@ pub fn resolve_path_cached( connection: &UpEndConnection, path: &UHierPath, create: bool, + context: OperationContext, cache: &Arc>, ) -> Result> { let mut result: Vec
= vec![]; @@ -216,7 +224,7 @@ pub fn resolve_path_cached( result.push(address.clone()); } else { drop(cache_lock); - let address = fetch_or_create_dir(connection, parent, node, create)?; + let address = fetch_or_create_dir(connection, parent, node, create, context.clone())?; result.push(address.clone()); cache.lock().unwrap().put(key, address); } @@ -286,11 +294,23 @@ mod tests { let open_result = UpEndDatabase::open(&temp_dir, true).unwrap(); let connection = open_result.db.connection().unwrap(); - let foo_result = fetch_or_create_dir(&connection, None, UNode("foo".to_string()), true); + let foo_result = fetch_or_create_dir( + &connection, + None, + UNode("foo".to_string()), + true, + OperationContext::default(), + ); assert!(foo_result.is_ok()); let foo_result = foo_result.unwrap(); - let bar_result = fetch_or_create_dir(&connection, None, UNode("bar".to_string()), true); + let bar_result = fetch_or_create_dir( + &connection, + None, + UNode("bar".to_string()), + true, + OperationContext::default(), + ); assert!(bar_result.is_ok()); let bar_result = bar_result.unwrap(); @@ -299,6 +319,7 @@ mod tests { Some(bar_result.clone()), UNode("baz".to_string()), true, + OperationContext::default(), ); assert!(baz_result.is_ok()); let baz_result = baz_result.unwrap(); @@ -306,7 +327,12 @@ mod tests { let roots = list_roots(&connection); assert_eq!(roots.unwrap(), [foo_result, bar_result.clone()]); - let resolve_result = resolve_path(&connection, &"bar/baz".parse().unwrap(), false); + let resolve_result = resolve_path( + &connection, + &"bar/baz".parse().unwrap(), + false, + OperationContext::default(), + ); assert!(resolve_result.is_ok()); assert_eq!( @@ -314,10 +340,20 @@ mod tests { vec![bar_result.clone(), baz_result.clone()] ); - let resolve_result = resolve_path(&connection, &"bar/baz/bax".parse().unwrap(), false); + let resolve_result = resolve_path( + &connection, + &"bar/baz/bax".parse().unwrap(), + false, + OperationContext::default(), + ); assert!(resolve_result.is_err()); - let resolve_result = resolve_path(&connection, &"bar/baz/bax".parse().unwrap(), true); + let resolve_result = resolve_path( + &connection, + &"bar/baz/bax".parse().unwrap(), + true, + OperationContext::default(), + ); assert!(resolve_result.is_ok()); let bax_result = fetch_or_create_dir( @@ -325,6 +361,7 @@ mod tests { Some(baz_result.clone()), UNode("bax".to_string()), false, + OperationContext::default(), ); assert!(bax_result.is_ok()); let bax_result = bax_result.unwrap(); diff --git a/db/src/inner/models.rs b/db/src/inner/models.rs index 4d82b52..2c495c7 100644 --- a/db/src/inner/models.rs +++ b/db/src/inner/models.rs @@ -13,6 +13,7 @@ pub struct Entry { pub value_num: Option, pub immutable: bool, pub provenance: String, + pub user: Option, pub timestamp: NaiveDateTime, } diff --git a/db/src/inner/schema.rs b/db/src/inner/schema.rs index d8f53b0..b681da0 100644 --- a/db/src/inner/schema.rs +++ b/db/src/inner/schema.rs @@ -8,6 +8,7 @@ table! { value_num -> Nullable, immutable -> Bool, provenance -> Text, + user -> Nullable, timestamp -> Timestamp, } } diff --git a/db/src/lib.rs b/db/src/lib.rs index 9c3a2a5..8a1eb06 100644 --- a/db/src/lib.rs +++ b/db/src/lib.rs @@ -117,7 +117,7 @@ pub const DATABASE_FILENAME: &str = "upend.sqlite3"; impl UpEndDatabase { pub fn open>(dirpath: P, reinitialize: bool) -> Result { - embed_migrations!("./migrations/upend/"); + embed_migrations!("./migrations/upend"); let upend_path = dirpath.as_ref().join(UPEND_SUBDIR); @@ -703,3 +703,18 @@ impl std::str::FromStr for BlobMode { } } } + +#[derive(Debug, Clone)] +pub struct OperationContext { + pub user: Option, + pub provenance: String, +} + +impl Default for OperationContext { + fn default() -> Self { + Self { + user: None, + provenance: "SYSTEM".to_string(), + } + } +} diff --git a/db/src/macros.rs b/db/src/macros.rs index 44dab6e..2b60f87 100644 --- a/db/src/macros.rs +++ b/db/src/macros.rs @@ -6,6 +6,7 @@ macro_rules! upend_insert_val { attribute: $attribute.parse().unwrap(), value: upend_base::entry::EntryValue::String(String::from($value)), provenance: "SYSTEM INIT".to_string(), + user: None, timestamp: chrono::Utc::now().naive_utc(), }) }}; @@ -19,6 +20,7 @@ macro_rules! upend_insert_addr { attribute: $attribute.parse().unwrap(), value: upend_base::entry::EntryValue::Address($addr.clone()), provenance: "SYSTEM INIT".to_string(), + user: None, timestamp: chrono::Utc::now().naive_utc(), }) }}; diff --git a/db/src/stores/fs/mod.rs b/db/src/stores/fs/mod.rs index 3f3e613..d3c0359 100644 --- a/db/src/stores/fs/mod.rs +++ b/db/src/stores/fs/mod.rs @@ -5,7 +5,8 @@ use crate::hierarchies::{resolve_path, resolve_path_cached, ResolveCache, UHierP use crate::jobs::{JobContainer, JobHandle}; use crate::util::hash_at_path; use crate::{ - BlobMode, ConnectionOptions, LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR, + BlobMode, ConnectionOptions, LoggingHandler, OperationContext, UpEndConnection, UpEndDatabase, + UPEND_SUBDIR, }; use anyhow::{anyhow, Result}; use chrono::prelude::*; @@ -95,6 +96,7 @@ impl FsStore { db: D, job_handle: JobHandle, options: UpdateOptions, + context: OperationContext, ) -> Result> { let start = Instant::now(); info!("Vault rescan started."); @@ -153,6 +155,7 @@ impl FsStore { &existing_files, &resolve_cache, quick_check, + context.clone(), ); let mut cnt = count.write().unwrap(); @@ -249,6 +252,7 @@ impl FsStore { existing_files: &Arc>>, resolve_cache: &Arc>, quick_check: bool, + context: OperationContext, ) -> Result { trace!("Processing: {:?}", path); @@ -366,6 +370,7 @@ impl FsStore { size, mtime, Some(resolve_cache), + context, ) .map(|_| { info!("Added: {:?}", path); @@ -422,6 +427,7 @@ impl FsStore { size: i64, mtime: Option, resolve_cache: Option<&Arc>>, + context: OperationContext, ) -> Result
{ let normalized_path = self.normalize_path(path)?; let new_file = db::NewFile { @@ -444,6 +450,7 @@ impl FsStore { value: (size as f64).into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: context.user.clone(), }; let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string()); @@ -453,6 +460,7 @@ impl FsStore { value: mime_type.into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: context.user.clone(), }); let added_entry = Entry { @@ -465,6 +473,7 @@ impl FsStore { .into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: context.user.clone(), }; let components = normalized_path.components().collect::>(); @@ -488,13 +497,16 @@ impl FsStore { .into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: context.user.clone(), }; let label_entry_addr = connection.insert_entry(label_entry)?; if let Some(upath) = upath { let resolved_path = match resolve_cache { - Some(cache) => resolve_path_cached(connection, &upath, true, cache)?, - None => resolve_path(connection, &upath, true)?, + Some(cache) => { + resolve_path_cached(connection, &upath, true, context.clone(), cache)? + } + None => resolve_path(connection, &upath, true, context.clone())?, }; let parent_dir = resolved_path.last().unwrap(); @@ -504,6 +516,7 @@ impl FsStore { value: parent_dir.clone().into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: context.user.clone(), }; let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?; @@ -513,6 +526,7 @@ impl FsStore { value: label_entry_addr.into(), provenance: "SYSTEM INIT".to_string(), timestamp: chrono::Utc::now().naive_utc(), + user: context.user.clone(), }; connection.insert_entry(alias_entry)?; } @@ -651,6 +665,7 @@ impl UpStore for FsStore { blob: Blob, name_hint: Option, blob_mode: Option, + context: OperationContext, ) -> Result { let file_path = blob.get_file_path(); let hash = hash_at_path(file_path).map_err(|e| StoreError::Unknown(e.to_string()))?; @@ -704,6 +719,7 @@ impl UpStore for FsStore { size, mtime, None, + context, ) .map_err(|e| StoreError::Unknown(e.to_string()))?; } @@ -716,6 +732,7 @@ impl UpStore for FsStore { db: &UpEndDatabase, mut job_container: JobContainer, options: UpdateOptions, + context: OperationContext, ) -> Result, StoreError> { trace!( "Running a vault update of {:?}, options = {:?}.", @@ -726,7 +743,7 @@ impl UpStore for FsStore { match job_result { Ok(job_handle) => { - let result = self.rescan_vault(db, job_handle, options); + let result = self.rescan_vault(db, job_handle, options, context); if let Err(err) = &result { error!("Update did not succeed! {:?}", err); @@ -836,6 +853,7 @@ mod test { initial: true, tree_mode: BlobMode::default(), }, + OperationContext::default(), ); assert!(rescan_result.is_ok()); } @@ -882,6 +900,7 @@ mod test { initial: quick, tree_mode: BlobMode::default(), }, + OperationContext::default(), ); assert!(rescan_result.is_ok()); @@ -902,6 +921,7 @@ mod test { initial: quick, tree_mode: BlobMode::default(), }, + OperationContext::default(), ); assert!(rescan_result.is_ok()); @@ -925,6 +945,7 @@ mod test { initial: quick, tree_mode: BlobMode::default(), }, + OperationContext::default(), ); assert!(rescan_result.is_ok()); @@ -977,6 +998,7 @@ mod test { initial: quick, tree_mode: BlobMode::default(), }, + OperationContext::default(), ); assert!(rescan_result.is_ok()); @@ -1079,6 +1101,7 @@ mod test { initial: true, tree_mode, }, + OperationContext::default(), ) .unwrap(); @@ -1089,7 +1112,7 @@ mod test { paths.iter().for_each(|path| { let upath: UHierPath = path.parse().unwrap(); assert!( - resolve_path(&connection, &upath, false).is_ok(), + resolve_path(&connection, &upath, false, OperationContext::default()).is_ok(), "Failed: {}", upath ); diff --git a/db/src/stores/mod.rs b/db/src/stores/mod.rs index 39d2893..5b01c60 100644 --- a/db/src/stores/mod.rs +++ b/db/src/stores/mod.rs @@ -1,6 +1,7 @@ use std::path::{Path, PathBuf}; use super::{UpEndConnection, UpEndDatabase}; +use crate::OperationContext; use crate::{jobs::JobContainer, BlobMode}; use upend_base::hash::UpMultihash; @@ -61,12 +62,14 @@ pub trait UpStore { blob: Blob, name_hint: Option, blob_mode: Option, + context: OperationContext, ) -> Result; fn update( &self, database: &UpEndDatabase, job_container: JobContainer, options: UpdateOptions, + context: OperationContext, ) -> Result>; fn stats(&self) -> Result; } diff --git a/sdks/js/package.json b/sdks/js/package.json index 85e19ac..3831b79 100644 --- a/sdks/js/package.json +++ b/sdks/js/package.json @@ -1,6 +1,6 @@ { "name": "@upnd/upend", - "version": "0.4.1", + "version": "0.5.0", "description": "Client library to interact with the UpEnd system.", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/sdks/js/src/index.ts b/sdks/js/src/index.ts index e57144b..5155b3b 100644 --- a/sdks/js/src/index.ts +++ b/sdks/js/src/index.ts @@ -121,6 +121,7 @@ export class UpEntry extends UpObject implements IEntry { attribute: string; value: IValue; provenance: string; + user: string; timestamp: string; constructor(address: string, entry: IEntry, listing: UpListing) { @@ -130,6 +131,7 @@ export class UpEntry extends UpObject implements IEntry { this.attribute = entry.attribute; this.value = entry.value; this.provenance = entry.provenance; + this.user = entry.user; this.timestamp = entry.timestamp; } diff --git a/sdks/js/src/types.ts b/sdks/js/src/types.ts index 78ed224..bf6e2cb 100644 --- a/sdks/js/src/types.ts +++ b/sdks/js/src/types.ts @@ -18,6 +18,8 @@ export interface IEntry { value: IValue; /** The origin or provenance of the data entry (e.g. SYSTEM or USER API...) */ provenance: string; + /** The user who created the data entry. */ + user: string; /** The timestamp when the data entry was created in RFC 3339 format. */ timestamp: string; } diff --git a/webui/src/lib/components/Inspect.svelte b/webui/src/lib/components/Inspect.svelte index 082dda6..bd71b35 100644 --- a/webui/src/lib/components/Inspect.svelte +++ b/webui/src/lib/components/Inspect.svelte @@ -513,13 +513,13 @@

{$i18n.t('Attributes')}

{$i18n.t('Backlinks')}

diff --git a/webui/src/lib/components/widgets/EntryList.svelte b/webui/src/lib/components/widgets/EntryList.svelte index 785b32d..fc44556 100644 --- a/webui/src/lib/components/widgets/EntryList.svelte +++ b/webui/src/lib/components/widgets/EntryList.svelte @@ -34,6 +34,7 @@ const TIMESTAMP_COL = 'timestamp'; const PROVENANCE_COL = 'provenance'; + const USER_COL = 'user'; const ENTITY_COL = 'entity'; const ATTR_COL = 'attribute'; const VALUE_COL = 'value'; @@ -188,6 +189,7 @@ const COLUMN_LABELS: { [key: string]: string } = { timestamp: $i18n.t('Added at'), provenance: $i18n.t('Provenance'), + user: $i18n.t('User'), entity: $i18n.t('Entity'), attribute: $i18n.t('Attribute'), value: $i18n.t('Value') @@ -243,6 +245,16 @@ {:else if column == PROVENANCE_COL}
{entry.provenance}
+ {:else if column == USER_COL} +
+ {#if entry.user} + {entry.user} + {:else} +
+ {$i18n.t('unset')} +
+ {/if} +
{:else if column == ENTITY_COL}