feat: add `user` to every Entry

(very ugly, lots of clones)
feat/plugins-backend
Tomáš Mládek 2024-03-31 00:31:32 +01:00
parent 05ee557d1a
commit 196447da0f
23 changed files with 277 additions and 60 deletions

View File

@ -49,6 +49,7 @@ pub struct Entry {
pub attribute: Attribute, pub attribute: Attribute,
pub value: EntryValue, pub value: EntryValue,
pub provenance: String, pub provenance: String,
pub user: Option<String>,
pub timestamp: NaiveDateTime, pub timestamp: NaiveDateTime,
} }
@ -81,6 +82,7 @@ impl TryFrom<&InvariantEntry> for Entry {
attribute: invariant.attribute.clone(), attribute: invariant.attribute.clone(),
value: invariant.value.clone(), value: invariant.value.clone(),
provenance: "INVARIANT".to_string(), provenance: "INVARIANT".to_string(),
user: None,
timestamp: NaiveDateTime::from_timestamp_opt(0, 0).unwrap(), timestamp: NaiveDateTime::from_timestamp_opt(0, 0).unwrap(),
}) })
} }

View File

@ -13,7 +13,7 @@ use upend_db::stores::Blob;
use upend_db::{ use upend_db::{
jobs::{JobContainer, JobState}, jobs::{JobContainer, JobState},
stores::{fs::FILE_MIME_KEY, UpStore}, stores::{fs::FILE_MIME_KEY, UpStore},
BlobMode, UpEndConnection, BlobMode, OperationContext, UpEndConnection,
}; };
lazy_static! { lazy_static! {
@ -26,6 +26,7 @@ lazy_static! {
attribute: ATTR_LABEL.parse().unwrap(), attribute: ATTR_LABEL.parse().unwrap(),
value: "ID3".into(), value: "ID3".into(),
provenance: "INVARIANT".to_string(), provenance: "INVARIANT".to_string(),
user: None,
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}; };
} }
@ -39,6 +40,7 @@ impl Extractor for ID3Extractor {
connection: &UpEndConnection, connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>, store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer, mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> { ) -> Result<Vec<Entry>> {
if let Address::Hash(hash) = address { if let Address::Hash(hash) = address {
let files = store.retrieve(hash)?; let files = store.retrieve(hash)?;
@ -72,14 +74,16 @@ impl Extractor for ID3Extractor {
"TYER" | "TBPM" => EntryValue::guess_from(text), "TYER" | "TBPM" => EntryValue::guess_from(text),
_ => text.clone().into(), _ => text.clone().into(),
}, },
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
Entry { Entry {
entity: Address::Attribute(format!("ID3_{}", frame.id()).parse()?), entity: Address::Attribute(format!("ID3_{}", frame.id()).parse()?),
attribute: ATTR_LABEL.parse().unwrap(), attribute: ATTR_LABEL.parse().unwrap(),
value: format!("ID3: {}", frame.name()).into(), value: format!("ID3: {}", frame.name()).into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
]); ]);
@ -97,12 +101,14 @@ impl Extractor for ID3Extractor {
Blob::from_filepath(&tmp_path), Blob::from_filepath(&tmp_path),
None, None,
Some(BlobMode::StoreOnly), Some(BlobMode::StoreOnly),
context.clone(),
)?; )?;
result.push(Entry { result.push(Entry {
entity: address.clone(), entity: address.clone(),
attribute: "ID3_PICTURE".parse()?, attribute: "ID3_PICTURE".parse()?,
value: EntryValue::Address(Address::Hash(hash)), value: EntryValue::Address(Address::Hash(hash)),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}); });
has_pictures = true; has_pictures = true;
@ -112,7 +118,8 @@ impl Extractor for ID3Extractor {
entity: Address::Attribute("ID3_PICTURE".parse()?), entity: Address::Attribute("ID3_PICTURE".parse()?),
attribute: ATTR_LABEL.parse().unwrap(), attribute: ATTR_LABEL.parse().unwrap(),
value: "ID3 Embedded Image".into(), value: "ID3 Embedded Image".into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}) })
} }
@ -126,7 +133,8 @@ impl Extractor for ID3Extractor {
entity: Address::Attribute(e.attribute.clone()), entity: Address::Attribute(e.attribute.clone()),
attribute: ATTR_OF.parse().unwrap(), attribute: ATTR_OF.parse().unwrap(),
value: EntryValue::Address(ID3_TYPE_INVARIANT.entity().unwrap()), value: EntryValue::Address(ID3_TYPE_INVARIANT.entity().unwrap()),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}) })
.collect::<Vec<Entry>>(), .collect::<Vec<Entry>>(),
@ -138,7 +146,8 @@ impl Extractor for ID3Extractor {
entity: address.clone(), entity: address.clone(),
attribute: ATTR_IN.parse().unwrap(), attribute: ATTR_IN.parse().unwrap(),
value: EntryValue::Address(ID3_TYPE_INVARIANT.entity().unwrap()), value: EntryValue::Address(ID3_TYPE_INVARIANT.entity().unwrap()),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
]); ]);

View File

@ -12,7 +12,7 @@ use upend_base::{
use upend_db::{ use upend_db::{
jobs::{JobContainer, JobState}, jobs::{JobContainer, JobState},
stores::{fs::FILE_MIME_KEY, UpStore}, stores::{fs::FILE_MIME_KEY, UpStore},
UpEndConnection, OperationContext, UpEndConnection,
}; };
pub struct ExifExtractor; pub struct ExifExtractor;
@ -31,6 +31,7 @@ lazy_static! {
value: "EXIF".into(), value: "EXIF".into(),
provenance: "INVARIANT".to_string(), provenance: "INVARIANT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: None
}; };
} }
@ -41,6 +42,7 @@ impl Extractor for ExifExtractor {
_connection: &UpEndConnection, _connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>, store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer, mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> { ) -> Result<Vec<Entry>> {
if let Address::Hash(hash) = address { if let Address::Hash(hash) = address {
let files = store.retrieve(hash)?; let files = store.retrieve(hash)?;
@ -86,14 +88,16 @@ impl Extractor for ExifExtractor {
EntryValue::guess_from(format!("{}", field.display_value())) EntryValue::guess_from(format!("{}", field.display_value()))
} }
}, },
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
Entry { Entry {
entity: Address::Attribute(attribute), entity: Address::Attribute(attribute),
attribute: ATTR_LABEL.parse().unwrap(), attribute: ATTR_LABEL.parse().unwrap(),
value: format!("EXIF: {}", tag_description).into(), value: format!("EXIF: {}", tag_description).into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
]); ]);
@ -109,7 +113,8 @@ impl Extractor for ExifExtractor {
entity: Address::Attribute(e.attribute.clone()), entity: Address::Attribute(e.attribute.clone()),
attribute: ATTR_OF.parse().unwrap(), attribute: ATTR_OF.parse().unwrap(),
value: EntryValue::Address(EXIF_TYPE_INVARIANT.entity().unwrap()), value: EntryValue::Address(EXIF_TYPE_INVARIANT.entity().unwrap()),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}) })
.collect::<Vec<Entry>>(), .collect::<Vec<Entry>>(),
@ -123,7 +128,8 @@ impl Extractor for ExifExtractor {
entity: address.clone(), entity: address.clone(),
attribute: ATTR_IN.parse().unwrap(), attribute: ATTR_IN.parse().unwrap(),
value: EntryValue::Address(EXIF_TYPE_INVARIANT.entity().unwrap()), value: EntryValue::Address(EXIF_TYPE_INVARIANT.entity().unwrap()),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
]); ]);

View File

@ -12,7 +12,7 @@ use upend_base::{
use upend_db::{ use upend_db::{
jobs::{JobContainer, JobState}, jobs::{JobContainer, JobState},
stores::{fs::FILE_MIME_KEY, UpStore}, stores::{fs::FILE_MIME_KEY, UpStore},
UpEndConnection, OperationContext, UpEndConnection,
}; };
const DURATION_KEY: &str = "MEDIA_DURATION"; const DURATION_KEY: &str = "MEDIA_DURATION";
@ -28,6 +28,7 @@ lazy_static! {
value: "Multimedia".into(), value: "Multimedia".into(),
provenance: "INVARIANT".to_string(), provenance: "INVARIANT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: None,
}; };
pub static ref DURATION_OF_MEDIA: Entry = Entry { pub static ref DURATION_OF_MEDIA: Entry = Entry {
entity: Address::Attribute(DURATION_KEY.parse().unwrap()), entity: Address::Attribute(DURATION_KEY.parse().unwrap()),
@ -35,6 +36,7 @@ lazy_static! {
value: EntryValue::Address(MEDIA_TYPE_INVARIANT.entity().unwrap()), value: EntryValue::Address(MEDIA_TYPE_INVARIANT.entity().unwrap()),
provenance: "INVARIANT".to_string(), provenance: "INVARIANT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: None,
}; };
} }
@ -47,6 +49,7 @@ impl Extractor for MediaExtractor {
_connection: &UpEndConnection, _connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>, store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer, mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> { ) -> Result<Vec<Entry>> {
if let Address::Hash(hash) = address { if let Address::Hash(hash) = address {
let files = store.retrieve(hash)?; let files = store.retrieve(hash)?;
@ -95,7 +98,8 @@ impl Extractor for MediaExtractor {
entity: address.clone(), entity: address.clone(),
attribute: DURATION_KEY.parse().unwrap(), attribute: DURATION_KEY.parse().unwrap(),
value: EntryValue::Number(duration), value: EntryValue::Number(duration),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
(&MEDIA_TYPE_INVARIANT as &InvariantEntry) (&MEDIA_TYPE_INVARIANT as &InvariantEntry)
@ -107,7 +111,8 @@ impl Extractor for MediaExtractor {
entity: address.clone(), entity: address.clone(),
attribute: ATTR_IN.parse().unwrap(), attribute: ATTR_IN.parse().unwrap(),
value: EntryValue::Address(MEDIA_TYPE_INVARIANT.entity().unwrap()), value: EntryValue::Address(MEDIA_TYPE_INVARIANT.entity().unwrap()),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
]; ];

View File

@ -6,7 +6,9 @@ use std::{
}; };
use tracing::{debug, info, trace}; use tracing::{debug, info, trace};
use upend_base::{addressing::Address, entry::Entry}; use upend_base::{addressing::Address, entry::Entry};
use upend_db::{jobs::JobContainer, stores::UpStore, UpEndConnection, UpEndDatabase}; use upend_db::{
jobs::JobContainer, stores::UpStore, OperationContext, UpEndConnection, UpEndDatabase,
};
#[cfg(feature = "extractors-web")] #[cfg(feature = "extractors-web")]
pub mod web; pub mod web;
@ -27,6 +29,7 @@ pub trait Extractor {
connection: &UpEndConnection, connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>, store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer, job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>>; ) -> Result<Vec<Entry>>;
fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result<bool> { fn is_needed(&self, _address: &Address, _connection: &UpEndConnection) -> Result<bool> {
@ -39,9 +42,10 @@ pub trait Extractor {
connection: &UpEndConnection, connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>, store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer, job_container: JobContainer,
context: OperationContext,
) -> Result<usize> { ) -> Result<usize> {
if self.is_needed(address, connection)? { if self.is_needed(address, connection)? {
let entries = self.get(address, connection, store, job_container)?; let entries = self.get(address, connection, store, job_container, context)?;
trace!("For \"{address}\", got: {entries:?}"); trace!("For \"{address}\", got: {entries:?}");
connection.transaction(|| { connection.transaction(|| {
@ -62,6 +66,7 @@ pub fn extract_all<D: Borrow<UpEndDatabase>>(
db: D, db: D,
store: Arc<Box<dyn UpStore + Send + Sync>>, store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer, mut job_container: JobContainer,
context: OperationContext,
) -> Result<usize> { ) -> Result<usize> {
info!("Extracting metadata for all addresses."); info!("Extracting metadata for all addresses.");
@ -77,7 +82,13 @@ pub fn extract_all<D: Borrow<UpEndDatabase>>(
.par_iter() .par_iter()
.map(|address| { .map(|address| {
let connection = db.connection()?; let connection = db.connection()?;
let entry_count = extract(address, &connection, store.clone(), job_container.clone()); let entry_count = extract(
address,
&connection,
store.clone(),
job_container.clone(),
context.clone(),
);
let mut cnt = count.write().unwrap(); let mut cnt = count.write().unwrap();
*cnt += 1; *cnt += 1;
@ -107,6 +118,7 @@ pub fn extract(
connection: &UpEndConnection, connection: &UpEndConnection,
store: Arc<Box<dyn UpStore + Send + Sync>>, store: Arc<Box<dyn UpStore + Send + Sync>>,
job_container: JobContainer, job_container: JobContainer,
context: OperationContext,
) -> usize { ) -> usize {
let mut entry_count = 0; let mut entry_count = 0;
trace!("Extracting metadata for {address:?}"); trace!("Extracting metadata for {address:?}");
@ -118,6 +130,7 @@ pub fn extract(
connection, connection,
store.clone(), store.clone(),
job_container.clone(), job_container.clone(),
context.clone(),
); );
match extract_result { match extract_result {
@ -133,6 +146,7 @@ pub fn extract(
connection, connection,
store.clone(), store.clone(),
job_container.clone(), job_container.clone(),
context.clone(),
); );
match extract_result { match extract_result {
@ -148,6 +162,7 @@ pub fn extract(
connection, connection,
store.clone(), store.clone(),
job_container.clone(), job_container.clone(),
context.clone(),
); );
match extract_result { match extract_result {
@ -158,8 +173,13 @@ pub fn extract(
#[cfg(feature = "extractors-media")] #[cfg(feature = "extractors-media")]
{ {
let extract_result = let extract_result = media::MediaExtractor.insert_info(
media::MediaExtractor.insert_info(address, connection, store.clone(), job_container); address,
connection,
store.clone(),
job_container,
context.clone(),
);
match extract_result { match extract_result {
Ok(count) => entry_count += count, Ok(count) => entry_count += count,

View File

@ -14,7 +14,7 @@ use upend_base::entry::EntryValue;
use upend_db::jobs::JobContainer; use upend_db::jobs::JobContainer;
use upend_db::jobs::JobState; use upend_db::jobs::JobState;
use upend_db::stores::UpStore; use upend_db::stores::UpStore;
use upend_db::UpEndConnection; use upend_db::{OperationContext, UpEndConnection};
use webpage::HTML; use webpage::HTML;
pub struct WebExtractor; pub struct WebExtractor;
@ -26,6 +26,7 @@ impl Extractor for WebExtractor {
_connection: &UpEndConnection, _connection: &UpEndConnection,
_store: Arc<Box<dyn UpStore + Send + Sync>>, _store: Arc<Box<dyn UpStore + Send + Sync>>,
mut job_container: JobContainer, mut job_container: JobContainer,
context: OperationContext,
) -> Result<Vec<Entry>> { ) -> Result<Vec<Entry>> {
if let Address::Url(url) = address { if let Address::Url(url) = address {
let mut job_handle = let mut job_handle =
@ -42,21 +43,24 @@ impl Extractor for WebExtractor {
entity: address.clone(), entity: address.clone(),
attribute: "HTML_TITLE".parse().unwrap(), attribute: "HTML_TITLE".parse().unwrap(),
value: html_title.clone().into(), value: html_title.clone().into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}), }),
html.title.map(|html_title| Entry { html.title.map(|html_title| Entry {
entity: address.clone(), entity: address.clone(),
attribute: ATTR_LABEL.parse().unwrap(), attribute: ATTR_LABEL.parse().unwrap(),
value: html_title.into(), value: html_title.into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}), }),
html.description.map(|html_desc| Entry { html.description.map(|html_desc| Entry {
entity: address.clone(), entity: address.clone(),
attribute: "HTML_DESCRIPTION".parse().unwrap(), attribute: "HTML_DESCRIPTION".parse().unwrap(),
value: html_desc.into(), value: html_desc.into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}), }),
]; ];
@ -67,7 +71,8 @@ impl Extractor for WebExtractor {
entity: address.clone(), entity: address.clone(),
attribute: ATTR_LABEL.parse()?, attribute: ATTR_LABEL.parse()?,
value: value.clone().into(), value: value.clone().into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
})); }));
} }
@ -76,7 +81,8 @@ impl Extractor for WebExtractor {
entity: address.clone(), entity: address.clone(),
attribute: attribute.parse()?, attribute: attribute.parse()?,
value: value.into(), value: value.into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
})); }));
} }
@ -85,7 +91,8 @@ impl Extractor for WebExtractor {
entity: address.clone(), entity: address.clone(),
attribute: "OG_IMAGE".parse()?, attribute: "OG_IMAGE".parse()?,
value: image.url.into(), value: image.url.into(),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
})) }))
} }
@ -101,7 +108,8 @@ impl Extractor for WebExtractor {
entity: Address::Attribute(e.attribute.clone()), entity: Address::Attribute(e.attribute.clone()),
attribute: ATTR_OF.parse().unwrap(), attribute: ATTR_OF.parse().unwrap(),
value: EntryValue::Address(TYPE_URL_ADDRESS.clone()), value: EntryValue::Address(TYPE_URL_ADDRESS.clone()),
provenance: "SYSTEM EXTRACTOR".to_string(), provenance: context.provenance.clone() + "EXTRACTOR",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}, },
e, e,
@ -149,7 +157,13 @@ mod test {
let address = Address::Url(Url::parse("https://upend.dev").unwrap()); let address = Address::Url(Url::parse("https://upend.dev").unwrap());
assert!(WebExtractor.is_needed(&address, &connection)?); assert!(WebExtractor.is_needed(&address, &connection)?);
WebExtractor.insert_info(&address, &connection, store, job_container)?; WebExtractor.insert_info(
&address,
&connection,
store,
job_container,
OperationContext::default(),
)?;
assert!(!WebExtractor.is_needed(&address, &connection)?); assert!(!WebExtractor.is_needed(&address, &connection)?);

View File

@ -26,7 +26,7 @@ use upend_base::hash::{sha256hash, UpMultihash};
use upend_db::jobs::JobContainer; use upend_db::jobs::JobContainer;
use upend_db::stores::fs::FsStore; use upend_db::stores::fs::FsStore;
use upend_db::stores::UpStore; use upend_db::stores::UpStore;
use upend_db::{BlobMode, UpEndDatabase}; use upend_db::{BlobMode, OperationContext, UpEndDatabase};
use crate::util::exec::block_background; use crate::util::exec::block_background;
@ -467,8 +467,14 @@ async fn main() -> Result<()> {
initial: false, initial: false,
tree_mode, tree_mode,
}, },
OperationContext::default(),
);
let _ = extractors::extract_all(
upend,
state.store,
job_container,
OperationContext::default(),
); );
let _ = extractors::extract_all(upend, state.store, job_container);
Ok(()) Ok(())
}); });
} }

View File

@ -41,6 +41,7 @@ use upend_db::jobs;
use upend_db::stores::UpdateOptions; use upend_db::stores::UpdateOptions;
use upend_db::stores::{Blob, UpStore}; use upend_db::stores::{Blob, UpStore};
use upend_db::BlobMode; use upend_db::BlobMode;
use upend_db::OperationContext;
use upend_db::UpEndDatabase; use upend_db::UpEndDatabase;
use upend_db::VaultOptions; use upend_db::VaultOptions;
use url::Url; use url::Url;
@ -491,7 +492,7 @@ pub async fn put_object(
payload: web::Json<PutInput>, payload: web::Json<PutInput>,
web::Query(query): web::Query<UpdateQuery>, web::Query(query): web::Query<UpdateQuery>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?; let user = check_auth(&req, &state)?;
let (entry_address, entity_address) = { let (entry_address, entity_address) = {
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
@ -500,6 +501,7 @@ pub async fn put_object(
debug!("PUTting {in_entry:?}"); debug!("PUTting {in_entry:?}");
let provenance = query.provenance.clone(); let provenance = query.provenance.clone();
let _user = user.clone();
let process_inentry = move |in_entry: InEntry| -> Result<Entry> { let process_inentry = move |in_entry: InEntry| -> Result<Entry> {
if let Some(entity) = in_entry.entity { if let Some(entity) = in_entry.entity {
Ok(Entry { Ok(Entry {
@ -513,6 +515,7 @@ pub async fn put_object(
.trim() .trim()
.to_string(), .to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: _user.clone(),
}) })
} else { } else {
Ok(Entry::try_from(&InvariantEntry { Ok(Entry::try_from(&InvariantEntry {
@ -554,15 +557,25 @@ pub async fn put_object(
let _address = address.clone(); let _address = address.clone();
let _job_container = state.job_container.clone(); let _job_container = state.job_container.clone();
let _store = state.store.clone(); let _store = state.store.clone();
let _user = user.clone();
block_background::<_, _, anyhow::Error>(move || { block_background::<_, _, anyhow::Error>(move || {
let entry_count = let entry_count = extractors::extract(
extractors::extract(&_address, &connection, _store, _job_container); &_address,
&connection,
_store,
_job_container,
OperationContext {
user: _user,
provenance: "API".to_string(),
},
);
debug!("Added {entry_count} extracted entries for {_address:?}"); debug!("Added {entry_count} extracted entries for {_address:?}");
Ok(()) Ok(())
}); });
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _user = user.clone();
web::block(move || { web::block(move || {
connection.transaction::<_, anyhow::Error, _>(|| { connection.transaction::<_, anyhow::Error, _>(|| {
if connection.retrieve_object(&address)?.is_empty() { if connection.retrieve_object(&address)?.is_empty() {
@ -581,6 +594,7 @@ pub async fn put_object(
}) })
.trim() .trim()
.to_string(), .to_string(),
user: _user,
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
})?; })?;
} }
@ -603,7 +617,7 @@ pub async fn put_blob(
state: web::Data<State>, state: web::Data<State>,
mut payload: Multipart, mut payload: Multipart,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?; let user = check_auth(&req, &state)?;
if let Some(mut field) = payload.try_next().await? { if let Some(mut field) = payload.try_next().await? {
let mut file = NamedTempFile::new()?; let mut file = NamedTempFile::new()?;
@ -642,6 +656,7 @@ pub async fn put_blob(
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _store = state.store.clone(); let _store = state.store.clone();
let _filename = filename.clone(); let _filename = filename.clone();
let _user = user.clone();
let hash = web::block(move || { let hash = web::block(move || {
let options = connection.get_vault_options()?; let options = connection.get_vault_options()?;
_store _store
@ -650,6 +665,10 @@ pub async fn put_blob(
Blob::from_filepath(file.path()), Blob::from_filepath(file.path()),
_filename, _filename,
options.blob_mode, options.blob_mode,
OperationContext {
user: _user,
provenance: "API".to_string(),
},
) )
.map_err(anyhow::Error::from) .map_err(anyhow::Error::from)
}) })
@ -675,8 +694,18 @@ pub async fn put_blob(
let _job_container = state.job_container.clone(); let _job_container = state.job_container.clone();
let _store = state.store.clone(); let _store = state.store.clone();
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
let _user = user.clone();
block_background::<_, _, anyhow::Error>(move || { block_background::<_, _, anyhow::Error>(move || {
let entry_count = extractors::extract(&_address, &connection, _store, _job_container); let entry_count = extractors::extract(
&_address,
&connection,
_store,
_job_container,
OperationContext {
user: _user,
provenance: "API".to_string(),
},
);
debug!("Added {entry_count} extracted entries for {_address:?}"); debug!("Added {entry_count} extracted entries for {_address:?}");
Ok(()) Ok(())
}); });
@ -694,7 +723,7 @@ pub async fn put_object_attribute(
value: web::Json<EntryValue>, value: web::Json<EntryValue>,
web::Query(query): web::Query<UpdateQuery>, web::Query(query): web::Query<UpdateQuery>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?; let user = check_auth(&req, &state)?;
let (address, attribute) = path.into_inner(); let (address, attribute) = path.into_inner();
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
@ -717,6 +746,7 @@ pub async fn put_object_attribute(
}) })
.trim() .trim()
.to_string(), .to_string(),
user: user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}; };
@ -869,7 +899,7 @@ pub async fn list_hier(
path: web::Path<String>, path: web::Path<String>,
req: HttpRequest, req: HttpRequest,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?; let user = check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
if path.is_empty() { if path.is_empty() {
@ -881,9 +911,19 @@ pub async fn list_hier(
trace!(r#"Listing path "{}""#, upath); trace!(r#"Listing path "{}""#, upath);
let create = !req.method().is_safe(); let create = !req.method().is_safe();
let path = web::block(move || resolve_path(&connection, &upath, create)) let path = web::block(move || {
.await? resolve_path(
.map_err(ErrorNotFound)?; &connection,
&upath,
create,
OperationContext {
user,
provenance: "API".to_string(),
},
)
})
.await?
.map_err(ErrorNotFound)?;
match path.last() { match path.last() {
Some(addr) => Ok(HttpResponse::Found() Some(addr) => Ok(HttpResponse::Found()
.append_header((http::header::LOCATION, format!("../../api/obj/{}", addr))) .append_header((http::header::LOCATION, format!("../../api/obj/{}", addr)))
@ -926,7 +966,7 @@ pub async fn api_refresh(
state: web::Data<State>, state: web::Data<State>,
web::Query(query): web::Query<RescanRequest>, web::Query(query): web::Query<RescanRequest>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
check_auth(&req, &state)?; let user = check_auth(&req, &state)?;
let connection = state.upend.connection().map_err(ErrorInternalServerError)?; let connection = state.upend.connection().map_err(ErrorInternalServerError)?;
@ -943,11 +983,19 @@ pub async fn api_refresh(
.unwrap_or_default(), .unwrap_or_default(),
), ),
}, },
OperationContext {
user: user.clone(),
provenance: "API".to_string(),
},
); );
let _ = crate::extractors::extract_all( let _ = crate::extractors::extract_all(
state.upend.clone(), state.upend.clone(),
state.store.clone(), state.store.clone(),
state.job_container.clone(), state.job_container.clone(),
OperationContext {
user: user.clone(),
provenance: "API".to_string(),
},
); );
Ok(()) Ok(())
}); });
@ -1321,6 +1369,7 @@ mod tests {
initial: true, initial: true,
tree_mode: upend_db::BlobMode::default(), tree_mode: upend_db::BlobMode::default(),
}, },
OperationContext::default(),
) )
.unwrap(); .unwrap();

View File

@ -0,0 +1,2 @@
ALTER TABLE data
DROP COLUMN user;

View File

@ -0,0 +1,2 @@
ALTER TABLE data
ADD COLUMN user VARCHAR;

View File

@ -14,6 +14,7 @@ impl TryFrom<&models::Entry> for Entry {
attribute: e.attribute.parse()?, attribute: e.attribute.parse()?,
value: value_str.parse().unwrap(), value: value_str.parse().unwrap(),
provenance: e.provenance.clone(), provenance: e.provenance.clone(),
user: e.user.clone(),
timestamp: e.timestamp, timestamp: e.timestamp,
}) })
} else if let Some(value_num) = e.value_num { } else if let Some(value_num) = e.value_num {
@ -22,6 +23,7 @@ impl TryFrom<&models::Entry> for Entry {
attribute: e.attribute.parse()?, attribute: e.attribute.parse()?,
value: EntryValue::Number(value_num), value: EntryValue::Number(value_num),
provenance: e.provenance.clone(), provenance: e.provenance.clone(),
user: e.user.clone(),
timestamp: e.timestamp, timestamp: e.timestamp,
}) })
} else { } else {
@ -30,6 +32,7 @@ impl TryFrom<&models::Entry> for Entry {
attribute: e.attribute.parse()?, attribute: e.attribute.parse()?,
value: EntryValue::Number(f64::NAN), value: EntryValue::Number(f64::NAN),
provenance: e.provenance.clone(), provenance: e.provenance.clone(),
user: e.user.clone(),
timestamp: e.timestamp, timestamp: e.timestamp,
}) })
} }
@ -53,6 +56,7 @@ impl TryFrom<&Entry> for models::Entry {
value_num: None, value_num: None,
immutable: false, immutable: false,
provenance: e.provenance.clone(), provenance: e.provenance.clone(),
user: e.user.clone(),
timestamp: e.timestamp, timestamp: e.timestamp,
}; };

View File

@ -6,6 +6,7 @@ use lru::LruCache;
use tracing::trace; use tracing::trace;
use uuid::Uuid; use uuid::Uuid;
use crate::OperationContext;
use upend_base::addressing::Address; use upend_base::addressing::Address;
use upend_base::constants::ATTR_LABEL; use upend_base::constants::ATTR_LABEL;
use upend_base::constants::{ATTR_IN, HIER_ROOT_ADDR, HIER_ROOT_INVARIANT}; use upend_base::constants::{ATTR_IN, HIER_ROOT_ADDR, HIER_ROOT_INVARIANT};
@ -91,6 +92,7 @@ pub fn fetch_or_create_dir(
parent: Option<Address>, parent: Option<Address>,
directory: UNode, directory: UNode,
create: bool, create: bool,
context: OperationContext,
) -> Result<Address> { ) -> Result<Address> {
match parent.clone() { match parent.clone() {
Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory), Some(address) => trace!("FETCHING/CREATING {}/{:#}", address, directory),
@ -137,7 +139,8 @@ pub fn fetch_or_create_dir(
entity: new_directory_address.clone(), entity: new_directory_address.clone(),
attribute: ATTR_LABEL.parse().unwrap(), attribute: ATTR_LABEL.parse().unwrap(),
value: directory.to_string().into(), value: directory.to_string().into(),
provenance: "SYSTEM FS".to_string(), provenance: context.provenance.clone() + "HIER",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}; };
connection.insert_entry(directory_entry)?; connection.insert_entry(directory_entry)?;
@ -147,7 +150,8 @@ pub fn fetch_or_create_dir(
entity: new_directory_address.clone(), entity: new_directory_address.clone(),
attribute: ATTR_IN.parse().unwrap(), attribute: ATTR_IN.parse().unwrap(),
value: parent.into(), value: parent.into(),
provenance: "SYSTEM FS".to_string(), provenance: context.provenance.clone() + "HIER",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
} }
} else { } else {
@ -155,7 +159,8 @@ pub fn fetch_or_create_dir(
entity: new_directory_address.clone(), entity: new_directory_address.clone(),
attribute: ATTR_IN.parse().unwrap(), attribute: ATTR_IN.parse().unwrap(),
value: HIER_ROOT_ADDR.clone().into(), value: HIER_ROOT_ADDR.clone().into(),
provenance: "SYSTEM FS".to_string(), provenance: context.provenance.clone() + "HIER",
user: context.user.clone(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
} }
})?; })?;
@ -177,6 +182,7 @@ pub fn resolve_path(
connection: &UpEndConnection, connection: &UpEndConnection,
path: &UHierPath, path: &UHierPath,
create: bool, create: bool,
context: OperationContext,
) -> Result<Vec<Address>> { ) -> Result<Vec<Address>> {
let mut result: Vec<Address> = vec![]; let mut result: Vec<Address> = vec![];
let mut path_stack = path.0.to_vec(); let mut path_stack = path.0.to_vec();
@ -188,6 +194,7 @@ pub fn resolve_path(
result.last().cloned(), result.last().cloned(),
path_stack.pop().unwrap(), path_stack.pop().unwrap(),
create, create,
context.clone(),
)?; )?;
result.push(dir_address); result.push(dir_address);
} }
@ -201,6 +208,7 @@ pub fn resolve_path_cached(
connection: &UpEndConnection, connection: &UpEndConnection,
path: &UHierPath, path: &UHierPath,
create: bool, create: bool,
context: OperationContext,
cache: &Arc<Mutex<ResolveCache>>, cache: &Arc<Mutex<ResolveCache>>,
) -> Result<Vec<Address>> { ) -> Result<Vec<Address>> {
let mut result: Vec<Address> = vec![]; let mut result: Vec<Address> = vec![];
@ -216,7 +224,7 @@ pub fn resolve_path_cached(
result.push(address.clone()); result.push(address.clone());
} else { } else {
drop(cache_lock); drop(cache_lock);
let address = fetch_or_create_dir(connection, parent, node, create)?; let address = fetch_or_create_dir(connection, parent, node, create, context.clone())?;
result.push(address.clone()); result.push(address.clone());
cache.lock().unwrap().put(key, address); cache.lock().unwrap().put(key, address);
} }
@ -286,11 +294,23 @@ mod tests {
let open_result = UpEndDatabase::open(&temp_dir, true).unwrap(); let open_result = UpEndDatabase::open(&temp_dir, true).unwrap();
let connection = open_result.db.connection().unwrap(); let connection = open_result.db.connection().unwrap();
let foo_result = fetch_or_create_dir(&connection, None, UNode("foo".to_string()), true); let foo_result = fetch_or_create_dir(
&connection,
None,
UNode("foo".to_string()),
true,
OperationContext::default(),
);
assert!(foo_result.is_ok()); assert!(foo_result.is_ok());
let foo_result = foo_result.unwrap(); let foo_result = foo_result.unwrap();
let bar_result = fetch_or_create_dir(&connection, None, UNode("bar".to_string()), true); let bar_result = fetch_or_create_dir(
&connection,
None,
UNode("bar".to_string()),
true,
OperationContext::default(),
);
assert!(bar_result.is_ok()); assert!(bar_result.is_ok());
let bar_result = bar_result.unwrap(); let bar_result = bar_result.unwrap();
@ -299,6 +319,7 @@ mod tests {
Some(bar_result.clone()), Some(bar_result.clone()),
UNode("baz".to_string()), UNode("baz".to_string()),
true, true,
OperationContext::default(),
); );
assert!(baz_result.is_ok()); assert!(baz_result.is_ok());
let baz_result = baz_result.unwrap(); let baz_result = baz_result.unwrap();
@ -306,7 +327,12 @@ mod tests {
let roots = list_roots(&connection); let roots = list_roots(&connection);
assert_eq!(roots.unwrap(), [foo_result, bar_result.clone()]); assert_eq!(roots.unwrap(), [foo_result, bar_result.clone()]);
let resolve_result = resolve_path(&connection, &"bar/baz".parse().unwrap(), false); let resolve_result = resolve_path(
&connection,
&"bar/baz".parse().unwrap(),
false,
OperationContext::default(),
);
assert!(resolve_result.is_ok()); assert!(resolve_result.is_ok());
assert_eq!( assert_eq!(
@ -314,10 +340,20 @@ mod tests {
vec![bar_result.clone(), baz_result.clone()] vec![bar_result.clone(), baz_result.clone()]
); );
let resolve_result = resolve_path(&connection, &"bar/baz/bax".parse().unwrap(), false); let resolve_result = resolve_path(
&connection,
&"bar/baz/bax".parse().unwrap(),
false,
OperationContext::default(),
);
assert!(resolve_result.is_err()); assert!(resolve_result.is_err());
let resolve_result = resolve_path(&connection, &"bar/baz/bax".parse().unwrap(), true); let resolve_result = resolve_path(
&connection,
&"bar/baz/bax".parse().unwrap(),
true,
OperationContext::default(),
);
assert!(resolve_result.is_ok()); assert!(resolve_result.is_ok());
let bax_result = fetch_or_create_dir( let bax_result = fetch_or_create_dir(
@ -325,6 +361,7 @@ mod tests {
Some(baz_result.clone()), Some(baz_result.clone()),
UNode("bax".to_string()), UNode("bax".to_string()),
false, false,
OperationContext::default(),
); );
assert!(bax_result.is_ok()); assert!(bax_result.is_ok());
let bax_result = bax_result.unwrap(); let bax_result = bax_result.unwrap();

View File

@ -13,6 +13,7 @@ pub struct Entry {
pub value_num: Option<f64>, pub value_num: Option<f64>,
pub immutable: bool, pub immutable: bool,
pub provenance: String, pub provenance: String,
pub user: Option<String>,
pub timestamp: NaiveDateTime, pub timestamp: NaiveDateTime,
} }

View File

@ -8,6 +8,7 @@ table! {
value_num -> Nullable<Double>, value_num -> Nullable<Double>,
immutable -> Bool, immutable -> Bool,
provenance -> Text, provenance -> Text,
user -> Nullable<Text>,
timestamp -> Timestamp, timestamp -> Timestamp,
} }
} }

View File

@ -117,7 +117,7 @@ pub const DATABASE_FILENAME: &str = "upend.sqlite3";
impl UpEndDatabase { impl UpEndDatabase {
pub fn open<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<OpenResult> { pub fn open<P: AsRef<Path>>(dirpath: P, reinitialize: bool) -> Result<OpenResult> {
embed_migrations!("./migrations/upend/"); embed_migrations!("./migrations/upend");
let upend_path = dirpath.as_ref().join(UPEND_SUBDIR); let upend_path = dirpath.as_ref().join(UPEND_SUBDIR);
@ -703,3 +703,18 @@ impl std::str::FromStr for BlobMode {
} }
} }
} }
#[derive(Debug, Clone)]
pub struct OperationContext {
pub user: Option<String>,
pub provenance: String,
}
impl Default for OperationContext {
fn default() -> Self {
Self {
user: None,
provenance: "SYSTEM".to_string(),
}
}
}

View File

@ -6,6 +6,7 @@ macro_rules! upend_insert_val {
attribute: $attribute.parse().unwrap(), attribute: $attribute.parse().unwrap(),
value: upend_base::entry::EntryValue::String(String::from($value)), value: upend_base::entry::EntryValue::String(String::from($value)),
provenance: "SYSTEM INIT".to_string(), provenance: "SYSTEM INIT".to_string(),
user: None,
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}) })
}}; }};
@ -19,6 +20,7 @@ macro_rules! upend_insert_addr {
attribute: $attribute.parse().unwrap(), attribute: $attribute.parse().unwrap(),
value: upend_base::entry::EntryValue::Address($addr.clone()), value: upend_base::entry::EntryValue::Address($addr.clone()),
provenance: "SYSTEM INIT".to_string(), provenance: "SYSTEM INIT".to_string(),
user: None,
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
}) })
}}; }};

View File

@ -5,7 +5,8 @@ use crate::hierarchies::{resolve_path, resolve_path_cached, ResolveCache, UHierP
use crate::jobs::{JobContainer, JobHandle}; use crate::jobs::{JobContainer, JobHandle};
use crate::util::hash_at_path; use crate::util::hash_at_path;
use crate::{ use crate::{
BlobMode, ConnectionOptions, LoggingHandler, UpEndConnection, UpEndDatabase, UPEND_SUBDIR, BlobMode, ConnectionOptions, LoggingHandler, OperationContext, UpEndConnection, UpEndDatabase,
UPEND_SUBDIR,
}; };
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use chrono::prelude::*; use chrono::prelude::*;
@ -95,6 +96,7 @@ impl FsStore {
db: D, db: D,
job_handle: JobHandle, job_handle: JobHandle,
options: UpdateOptions, options: UpdateOptions,
context: OperationContext,
) -> Result<Vec<UpdatePathOutcome>> { ) -> Result<Vec<UpdatePathOutcome>> {
let start = Instant::now(); let start = Instant::now();
info!("Vault rescan started."); info!("Vault rescan started.");
@ -153,6 +155,7 @@ impl FsStore {
&existing_files, &existing_files,
&resolve_cache, &resolve_cache,
quick_check, quick_check,
context.clone(),
); );
let mut cnt = count.write().unwrap(); let mut cnt = count.write().unwrap();
@ -249,6 +252,7 @@ impl FsStore {
existing_files: &Arc<RwLock<Vec<db::File>>>, existing_files: &Arc<RwLock<Vec<db::File>>>,
resolve_cache: &Arc<Mutex<ResolveCache>>, resolve_cache: &Arc<Mutex<ResolveCache>>,
quick_check: bool, quick_check: bool,
context: OperationContext,
) -> Result<UpdatePathOutcome> { ) -> Result<UpdatePathOutcome> {
trace!("Processing: {:?}", path); trace!("Processing: {:?}", path);
@ -366,6 +370,7 @@ impl FsStore {
size, size,
mtime, mtime,
Some(resolve_cache), Some(resolve_cache),
context,
) )
.map(|_| { .map(|_| {
info!("Added: {:?}", path); info!("Added: {:?}", path);
@ -422,6 +427,7 @@ impl FsStore {
size: i64, size: i64,
mtime: Option<NaiveDateTime>, mtime: Option<NaiveDateTime>,
resolve_cache: Option<&Arc<Mutex<ResolveCache>>>, resolve_cache: Option<&Arc<Mutex<ResolveCache>>>,
context: OperationContext,
) -> Result<Address> { ) -> Result<Address> {
let normalized_path = self.normalize_path(path)?; let normalized_path = self.normalize_path(path)?;
let new_file = db::NewFile { let new_file = db::NewFile {
@ -444,6 +450,7 @@ impl FsStore {
value: (size as f64).into(), value: (size as f64).into(),
provenance: "SYSTEM INIT".to_string(), provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: context.user.clone(),
}; };
let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string()); let mime_type = tree_magic_mini::from_filepath(path).map(|s| s.to_string());
@ -453,6 +460,7 @@ impl FsStore {
value: mime_type.into(), value: mime_type.into(),
provenance: "SYSTEM INIT".to_string(), provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: context.user.clone(),
}); });
let added_entry = Entry { let added_entry = Entry {
@ -465,6 +473,7 @@ impl FsStore {
.into(), .into(),
provenance: "SYSTEM INIT".to_string(), provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: context.user.clone(),
}; };
let components = normalized_path.components().collect::<Vec<Component>>(); let components = normalized_path.components().collect::<Vec<Component>>();
@ -488,13 +497,16 @@ impl FsStore {
.into(), .into(),
provenance: "SYSTEM INIT".to_string(), provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: context.user.clone(),
}; };
let label_entry_addr = connection.insert_entry(label_entry)?; let label_entry_addr = connection.insert_entry(label_entry)?;
if let Some(upath) = upath { if let Some(upath) = upath {
let resolved_path = match resolve_cache { let resolved_path = match resolve_cache {
Some(cache) => resolve_path_cached(connection, &upath, true, cache)?, Some(cache) => {
None => resolve_path(connection, &upath, true)?, resolve_path_cached(connection, &upath, true, context.clone(), cache)?
}
None => resolve_path(connection, &upath, true, context.clone())?,
}; };
let parent_dir = resolved_path.last().unwrap(); let parent_dir = resolved_path.last().unwrap();
@ -504,6 +516,7 @@ impl FsStore {
value: parent_dir.clone().into(), value: parent_dir.clone().into(),
provenance: "SYSTEM INIT".to_string(), provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: context.user.clone(),
}; };
let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?; let dir_has_entry_addr = connection.insert_entry(dir_has_entry)?;
@ -513,6 +526,7 @@ impl FsStore {
value: label_entry_addr.into(), value: label_entry_addr.into(),
provenance: "SYSTEM INIT".to_string(), provenance: "SYSTEM INIT".to_string(),
timestamp: chrono::Utc::now().naive_utc(), timestamp: chrono::Utc::now().naive_utc(),
user: context.user.clone(),
}; };
connection.insert_entry(alias_entry)?; connection.insert_entry(alias_entry)?;
} }
@ -651,6 +665,7 @@ impl UpStore for FsStore {
blob: Blob, blob: Blob,
name_hint: Option<String>, name_hint: Option<String>,
blob_mode: Option<BlobMode>, blob_mode: Option<BlobMode>,
context: OperationContext,
) -> Result<UpMultihash, super::StoreError> { ) -> Result<UpMultihash, super::StoreError> {
let file_path = blob.get_file_path(); let file_path = blob.get_file_path();
let hash = hash_at_path(file_path).map_err(|e| StoreError::Unknown(e.to_string()))?; let hash = hash_at_path(file_path).map_err(|e| StoreError::Unknown(e.to_string()))?;
@ -704,6 +719,7 @@ impl UpStore for FsStore {
size, size,
mtime, mtime,
None, None,
context,
) )
.map_err(|e| StoreError::Unknown(e.to_string()))?; .map_err(|e| StoreError::Unknown(e.to_string()))?;
} }
@ -716,6 +732,7 @@ impl UpStore for FsStore {
db: &UpEndDatabase, db: &UpEndDatabase,
mut job_container: JobContainer, mut job_container: JobContainer,
options: UpdateOptions, options: UpdateOptions,
context: OperationContext,
) -> Result<Vec<UpdatePathOutcome>, StoreError> { ) -> Result<Vec<UpdatePathOutcome>, StoreError> {
trace!( trace!(
"Running a vault update of {:?}, options = {:?}.", "Running a vault update of {:?}, options = {:?}.",
@ -726,7 +743,7 @@ impl UpStore for FsStore {
match job_result { match job_result {
Ok(job_handle) => { Ok(job_handle) => {
let result = self.rescan_vault(db, job_handle, options); let result = self.rescan_vault(db, job_handle, options, context);
if let Err(err) = &result { if let Err(err) = &result {
error!("Update did not succeed! {:?}", err); error!("Update did not succeed! {:?}", err);
@ -836,6 +853,7 @@ mod test {
initial: true, initial: true,
tree_mode: BlobMode::default(), tree_mode: BlobMode::default(),
}, },
OperationContext::default(),
); );
assert!(rescan_result.is_ok()); assert!(rescan_result.is_ok());
} }
@ -882,6 +900,7 @@ mod test {
initial: quick, initial: quick,
tree_mode: BlobMode::default(), tree_mode: BlobMode::default(),
}, },
OperationContext::default(),
); );
assert!(rescan_result.is_ok()); assert!(rescan_result.is_ok());
@ -902,6 +921,7 @@ mod test {
initial: quick, initial: quick,
tree_mode: BlobMode::default(), tree_mode: BlobMode::default(),
}, },
OperationContext::default(),
); );
assert!(rescan_result.is_ok()); assert!(rescan_result.is_ok());
@ -925,6 +945,7 @@ mod test {
initial: quick, initial: quick,
tree_mode: BlobMode::default(), tree_mode: BlobMode::default(),
}, },
OperationContext::default(),
); );
assert!(rescan_result.is_ok()); assert!(rescan_result.is_ok());
@ -977,6 +998,7 @@ mod test {
initial: quick, initial: quick,
tree_mode: BlobMode::default(), tree_mode: BlobMode::default(),
}, },
OperationContext::default(),
); );
assert!(rescan_result.is_ok()); assert!(rescan_result.is_ok());
@ -1079,6 +1101,7 @@ mod test {
initial: true, initial: true,
tree_mode, tree_mode,
}, },
OperationContext::default(),
) )
.unwrap(); .unwrap();
@ -1089,7 +1112,7 @@ mod test {
paths.iter().for_each(|path| { paths.iter().for_each(|path| {
let upath: UHierPath = path.parse().unwrap(); let upath: UHierPath = path.parse().unwrap();
assert!( assert!(
resolve_path(&connection, &upath, false).is_ok(), resolve_path(&connection, &upath, false, OperationContext::default()).is_ok(),
"Failed: {}", "Failed: {}",
upath upath
); );

View File

@ -1,6 +1,7 @@
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use super::{UpEndConnection, UpEndDatabase}; use super::{UpEndConnection, UpEndDatabase};
use crate::OperationContext;
use crate::{jobs::JobContainer, BlobMode}; use crate::{jobs::JobContainer, BlobMode};
use upend_base::hash::UpMultihash; use upend_base::hash::UpMultihash;
@ -61,12 +62,14 @@ pub trait UpStore {
blob: Blob, blob: Blob,
name_hint: Option<String>, name_hint: Option<String>,
blob_mode: Option<BlobMode>, blob_mode: Option<BlobMode>,
context: OperationContext,
) -> Result<UpMultihash>; ) -> Result<UpMultihash>;
fn update( fn update(
&self, &self,
database: &UpEndDatabase, database: &UpEndDatabase,
job_container: JobContainer, job_container: JobContainer,
options: UpdateOptions, options: UpdateOptions,
context: OperationContext,
) -> Result<Vec<UpdatePathOutcome>>; ) -> Result<Vec<UpdatePathOutcome>>;
fn stats(&self) -> Result<serde_json::Value>; fn stats(&self) -> Result<serde_json::Value>;
} }

View File

@ -1,6 +1,6 @@
{ {
"name": "@upnd/upend", "name": "@upnd/upend",
"version": "0.4.1", "version": "0.5.0",
"description": "Client library to interact with the UpEnd system.", "description": "Client library to interact with the UpEnd system.",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",

View File

@ -121,6 +121,7 @@ export class UpEntry extends UpObject implements IEntry {
attribute: string; attribute: string;
value: IValue; value: IValue;
provenance: string; provenance: string;
user: string;
timestamp: string; timestamp: string;
constructor(address: string, entry: IEntry, listing: UpListing) { constructor(address: string, entry: IEntry, listing: UpListing) {
@ -130,6 +131,7 @@ export class UpEntry extends UpObject implements IEntry {
this.attribute = entry.attribute; this.attribute = entry.attribute;
this.value = entry.value; this.value = entry.value;
this.provenance = entry.provenance; this.provenance = entry.provenance;
this.user = entry.user;
this.timestamp = entry.timestamp; this.timestamp = entry.timestamp;
} }

View File

@ -18,6 +18,8 @@ export interface IEntry {
value: IValue; value: IValue;
/** The origin or provenance of the data entry (e.g. SYSTEM or USER API...) */ /** The origin or provenance of the data entry (e.g. SYSTEM or USER API...) */
provenance: string; provenance: string;
/** The user who created the data entry. */
user: string;
/** The timestamp when the data entry was created in RFC 3339 format. */ /** The timestamp when the data entry was created in RFC 3339 format. */
timestamp: string; timestamp: string;
} }

View File

@ -513,13 +513,13 @@
<h2>{$i18n.t('Attributes')}</h2> <h2>{$i18n.t('Attributes')}</h2>
<EntryList <EntryList
entries={$entity?.attributes || []} entries={$entity?.attributes || []}
columns={detail ? 'timestamp, provenance, attribute, value' : 'attribute, value'} columns={detail ? 'timestamp, user, provenance, attribute, value' : 'attribute, value'}
on:change={onChange} on:change={onChange}
/> />
<h2>{$i18n.t('Backlinks')}</h2> <h2>{$i18n.t('Backlinks')}</h2>
<EntryList <EntryList
entries={$entity?.backlinks || []} entries={$entity?.backlinks || []}
columns={detail ? 'timestamp, provenance, entity, attribute' : 'entity, attribute'} columns={detail ? 'timestamp, user, provenance, entity, attribute' : 'entity, attribute'}
on:change={onChange} on:change={onChange}
/> />
</div> </div>

View File

@ -34,6 +34,7 @@
const TIMESTAMP_COL = 'timestamp'; const TIMESTAMP_COL = 'timestamp';
const PROVENANCE_COL = 'provenance'; const PROVENANCE_COL = 'provenance';
const USER_COL = 'user';
const ENTITY_COL = 'entity'; const ENTITY_COL = 'entity';
const ATTR_COL = 'attribute'; const ATTR_COL = 'attribute';
const VALUE_COL = 'value'; const VALUE_COL = 'value';
@ -188,6 +189,7 @@
const COLUMN_LABELS: { [key: string]: string } = { const COLUMN_LABELS: { [key: string]: string } = {
timestamp: $i18n.t('Added at'), timestamp: $i18n.t('Added at'),
provenance: $i18n.t('Provenance'), provenance: $i18n.t('Provenance'),
user: $i18n.t('User'),
entity: $i18n.t('Entity'), entity: $i18n.t('Entity'),
attribute: $i18n.t('Attribute'), attribute: $i18n.t('Attribute'),
value: $i18n.t('Value') value: $i18n.t('Value')
@ -243,6 +245,16 @@
</div> </div>
{:else if column == PROVENANCE_COL} {:else if column == PROVENANCE_COL}
<div class="cell">{entry.provenance}</div> <div class="cell">{entry.provenance}</div>
{:else if column == USER_COL}
<div class="cell">
{#if entry.user}
{entry.user}
{:else}
<div class="unset">
{$i18n.t('unset')}
</div>
{/if}
</div>
{:else if column == ENTITY_COL} {:else if column == ENTITY_COL}
<div class="cell entity mark-entity"> <div class="cell entity mark-entity">
<UpObject <UpObject