upend/cli/src/main.rs

563 lines
18 KiB
Rust

#[macro_use]
extern crate upend_db;
use crate::common::{REQWEST_ASYNC_CLIENT, WEBUI_PATH};
use crate::config::UpEndConfig;
use actix_web::HttpServer;
use anyhow::Result;
use clap::{Args, CommandFactory, FromArgMatches, Parser, Subcommand, ValueEnum};
use filebuffer::FileBuffer;
use rand::{thread_rng, Rng};
use regex::Captures;
use regex::Regex;
use reqwest::Url;
use serde_json::json;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::Path;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tracing::trace;
use tracing::{debug, error, info, warn};
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use upend_base::addressing::Address;
use upend_base::entry::EntryValue;
use upend_base::hash::{sha256hash, UpMultihash};
use upend_db::jobs::JobContainer;
use upend_db::stores::fs::FsStore;
use upend_db::stores::UpStore;
use upend_db::{BlobMode, OperationContext, UpEndDatabase};
use crate::util::exec::block_background;
mod common;
mod config;
mod routes;
mod serve;
mod util;
mod extractors; // TODO REMOVE
mod plugins;
mod previews; // TODO REMOVE
#[derive(Debug, Parser)]
#[command(name = "upend", author)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Debug, Subcommand)]
enum Commands {
/// Perform a query against an UpEnd server instance.
Query {
/// URL of the UpEnd instance to query.
#[arg(short, long, default_value = "http://localhost:8093")]
url: Url,
/// The query itself, in L-expression format; prefix a filepath by `@=` to insert its hash in its place.
query: String,
/// Output format
#[arg(short, long, default_value = "tsv")]
format: OutputFormat,
},
Get {
/// URL of the UpEnd instance to query.
#[arg(short, long, default_value = "http://localhost:8093")]
url: Url,
/// The address of the entity; prefix a filepath by `=` to insert its hash.
entity: String,
/// The attribute to get the value(s) of. Optional.
attribute: Option<String>,
/// Output format
#[arg(short, long, default_value = "tsv")]
format: OutputFormat,
},
/// Insert an entry into an UpEnd server instance.
Insert {
/// URL of the UpEnd instance to query.
#[arg(short, long, default_value = "http://localhost:8093")]
url: Url,
/// The address of the entity; prefix a filepath by `=` to insert its hash.
entity: String,
/// The attribute of the entry.
attribute: String,
/// The value; its type will be heuristically determined.
value: String,
/// Output format
#[arg(short, long, default_value = "tsv")]
format: OutputFormat,
},
/// Get the address of a file, attribute, or URL.
Address {
/// Type of input to be addressed
_type: AddressType,
/// Path to a file, hash...
input: String,
/// Output format
#[arg(short, long, default_value = "tsv")]
format: OutputFormat,
},
/// Start an UpEnd server instance.
Serve(ServeArgs),
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, ValueEnum)]
enum OutputFormat {
/// JSON
Json,
/// Tab Separated Values
Tsv,
/// Raw, as received from the server
Raw,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, ValueEnum)]
enum AddressType {
/// Hash a file and output its address.
File,
/// Compute an address from the output of `sha256sum`
Sha256sum,
}
#[derive(Debug, Args)]
struct ServeArgs {
/// Directory to serve a vault from.
#[arg()]
directory: PathBuf,
/// Address and port to bind the Web interface on.
#[arg(long, default_value = "127.0.0.1:8093")]
bind: String,
/// Path to blob store ($VAULT_PATH by default).
#[arg(long)]
store_path: Option<PathBuf>,
/// Do not open a web browser with the UI.
#[arg(long)]
no_browser: bool,
/// Disable desktop features (web browser, native file opening).
#[arg(long, env = "UPEND_NO_DESKTOP")]
no_desktop: bool,
/// Trust the vault, and open local executable files.
#[arg(long)]
trust_executables: bool,
/// Do not serve the web UI.
#[arg(long)]
no_ui: bool,
/// Do not run a database update on start.
#[arg(long)]
no_initial_update: bool,
/// Which mode to use for rescanning the vault.
#[arg(long)]
rescan_mode: Option<BlobMode>,
/// Clean up temporary files (e.g. previews) on start.
#[arg(long)]
clean: bool,
/// Delete and initialize database, if it exists already.
#[arg(long)]
reinitialize: bool,
/// Name of the vault.
#[arg(long, env = "UPEND_VAULT_NAME")]
vault_name: Option<String>,
/// Secret to use for authentication.
#[arg(long, env = "UPEND_SECRET")]
secret: Option<String>,
/// Allowed host/domain name the API can serve.
#[arg(long, env = "UPEND_ALLOW_HOST")]
allow_host: Vec<String>,
}
#[actix_web::main]
async fn main() -> Result<()> {
let command = Cli::command().version(crate::common::get_version());
let args = Cli::from_arg_matches(&command.get_matches())?;
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();
match args.command {
Commands::Query { url, query, format } => {
let re = Regex::new(r#"@(="([^"]+)"|=([^ ]+))"#).unwrap();
let query = re
.replace_all(&query, |caps: &Captures| {
if let Some(filepath_match) = caps.get(2).or_else(|| caps.get(3)) {
let address = hash_path(filepath_match.as_str()).unwrap();
format!("@{}", address)
} else {
panic!("Error preprocessing query. Captures: {:?}", caps)
}
})
.to_string();
let api_url = url.join("/api/query")?;
debug!("Querying \"{}\": {}", api_url, query);
let response = REQWEST_ASYNC_CLIENT
.post(api_url)
.body(query)
.send()
.await?;
response.error_for_status_ref()?;
print_response_entries(response, format).await?;
Ok(())
}
Commands::Get {
url,
entity,
attribute,
format,
} => {
let response = if let Some(attribute) = attribute {
let api_url = url.join("/api/query")?;
let entity = match entity {
entity if entity.starts_with('=') => hash_path(&entity[1..])?.to_string(),
entity if entity.starts_with("http") => {
Address::Url(entity.parse()?).to_string()
}
_ => entity,
};
let query = format!("(matches @{} \"{}\" ?)", entity, attribute);
debug!("Querying \"{}\": {}", api_url, query);
REQWEST_ASYNC_CLIENT
.post(api_url)
.body(query)
.send()
.await?
} else {
let entity = match entity {
entity if entity.starts_with('=') => hash_path(&entity[1..])?.to_string(),
_ => todo!("Only GETting blobs (files) is implemented."),
};
let api_url = url.join(&format!("/api/obj/{entity}"))?;
debug!("Getting object \"{}\" from {}", entity, api_url);
REQWEST_ASYNC_CLIENT.get(api_url).send().await?
};
response.error_for_status_ref()?;
print_response_entries(response, format).await?;
Ok(())
}
Commands::Insert {
url,
entity,
attribute,
value,
format: _,
} => {
let api_url = url.join("/api/obj")?;
let entity = match entity {
entity if entity.starts_with('=') => hash_path(&entity[1..])?.to_string(),
entity if entity.starts_with("http") => Address::Url(entity.parse()?).to_string(),
_ => entity,
};
let value = EntryValue::guess_from(value);
let body = json!({
"entity": entity,
"attribute": attribute,
"value": value
});
debug!("Inserting {:?} at \"{}\"", body, api_url);
let response = REQWEST_ASYNC_CLIENT.put(api_url).json(&body).send().await?;
match response.error_for_status_ref() {
Ok(_) => {
let data: Vec<String> = response.json().await?;
Ok(println!("{}", data[0]))
}
Err(err) => {
error!("{}", response.text().await?);
Err(err.into())
}
}
}
Commands::Address {
_type,
input,
format,
} => {
let address = match _type {
AddressType::File => hash_path(&input)?,
AddressType::Sha256sum => {
let digest = multibase::Base::Base16Lower.decode(input)?;
Address::Hash(UpMultihash::from_sha256(digest).unwrap())
}
};
match format {
OutputFormat::Json => Ok(println!("\"{}\"", address)),
OutputFormat::Tsv | OutputFormat::Raw => Ok(println!("{}", address)),
}
}
Commands::Serve(args) => {
info!("Starting UpEnd {}...", common::build::PKG_VERSION);
let term_now = Arc::new(std::sync::atomic::AtomicBool::new(false));
for sig in signal_hook::consts::TERM_SIGNALS {
signal_hook::flag::register_conditional_shutdown(*sig, 1, Arc::clone(&term_now))?;
signal_hook::flag::register(*sig, Arc::clone(&term_now))?;
}
let job_container = JobContainer::new();
let vault_path = args.directory;
let open_result = UpEndDatabase::open(&vault_path, args.reinitialize)
.expect("failed to open database!");
let upend = Arc::new(open_result.db);
let store = Arc::new(Box::new(
FsStore::from_path(args.store_path.unwrap_or_else(|| vault_path.clone())).unwrap(),
) as Box<dyn UpStore + Send + Sync>);
let webui_enabled = if args.no_ui {
false
} else {
let exists = WEBUI_PATH.exists();
if !exists {
warn!(
"Couldn't locate Web UI directory ({:?}), disabling...",
*WEBUI_PATH
);
}
exists
};
let browser_enabled = !args.no_desktop && webui_enabled && !args.no_browser;
let preview_path = upend.path.join("previews");
#[cfg(feature = "previews")]
let preview_store = Some(Arc::new(crate::previews::PreviewStore::new(
preview_path.clone(),
store.clone(),
)));
#[cfg(feature = "previews")]
let preview_thread_pool = Some(Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get() / 2)
.build()
.unwrap(),
));
if args.clean {
info!("Cleaning temporary directories...");
if preview_path.exists() {
std::fs::remove_dir_all(&preview_path).unwrap();
debug!("Removed {preview_path:?}");
} else {
debug!("No preview path exists, continuing...");
}
}
#[cfg(not(feature = "previews"))]
let preview_store = None;
#[cfg(not(feature = "previews"))]
let preview_thread_pool = None;
let mut bind: SocketAddr = args.bind.parse().expect("Incorrect bind format.");
let secret = args.secret.unwrap_or_else(|| {
warn!("No secret supplied, generating one at random.");
thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(32)
.map(char::from)
.collect()
});
let plugins = crate::plugins::Plugins::init(&get_resource_path("plugins")?)?;
let state = routes::State {
upend: upend.clone(),
store,
job_container: job_container.clone(),
preview_store,
preview_thread_pool,
plugins: plugins.into(),
config: UpEndConfig {
vault_name: Some(args.vault_name.unwrap_or_else(|| {
vault_path
.iter()
.last()
.unwrap()
.to_string_lossy()
.into_owned()
})),
desktop_enabled: !args.no_desktop,
trust_executables: args.trust_executables,
secret,
},
public: Arc::new(Mutex::new(upend.connection()?.get_users()?.is_empty())),
};
// Start HTTP server
let mut cnt = 0;
let server = loop {
let state = state.clone();
let allowed_origins = args.allow_host.clone();
let server = HttpServer::new(move || {
serve::get_app(webui_enabled, allowed_origins.clone(), state.clone())
});
let bind_result = server.bind(&bind);
if let Ok(server) = bind_result {
break server;
} else {
warn!("Failed to bind at {:?}, trying next port number...", bind);
bind.set_port(bind.port() + 1);
}
if cnt > 32 {
panic!("Couldn't start server.")
} else {
cnt += 1;
}
};
if !args.no_initial_update && (!open_result.new || args.rescan_mode.is_some()) {
info!("Running update...");
block_background::<_, _, anyhow::Error>(move || {
let connection: upend_db::UpEndConnection = upend.connection()?;
let tree_mode = if let Some(rescan_mode) = args.rescan_mode {
connection.set_vault_options(upend_db::VaultOptions {
blob_mode: Some(rescan_mode.clone()),
})?;
rescan_mode
} else {
connection
.get_vault_options()
.unwrap()
.blob_mode
.unwrap_or_default()
};
let _ = state.store.update(
&upend,
job_container.clone(),
upend_db::stores::UpdateOptions {
initial: false,
tree_mode,
},
OperationContext::default(),
);
let _ = extractors::extract_all(
upend,
state.store,
job_container,
OperationContext::default(),
);
Ok(())
});
}
#[cfg(feature = "desktop")]
{
if browser_enabled {
let ui_result = webbrowser::open(&format!("http://localhost:{}", bind.port()));
if ui_result.is_err() {
warn!("Could not open UI in browser!");
}
}
}
info!("Starting server at: {}", &bind);
server.run().await?;
Ok(())
}
}
}
type Entries = HashMap<String, serde_json::Value>;
async fn print_response_entries(response: reqwest::Response, format: OutputFormat) -> Result<()> {
match format {
OutputFormat::Json | OutputFormat::Raw => println!("{}", response.text().await?),
OutputFormat::Tsv => {
let mut entries = if response.url().path().contains("/obj/") {
#[derive(serde::Deserialize)]
struct ObjResponse {
entries: Entries,
}
response.json::<ObjResponse>().await?.entries
} else {
response.json::<Entries>().await?
}
.into_iter()
.peekable();
if entries.peek().is_some() {
eprintln!("entity\tattribute\tvalue\ttimestamp\tprovenance");
entries.for_each(|(_, entry)| {
println!(
"{}\t{}\t{}\t{}\t{}",
entry
.get("entity")
.and_then(|e| e.as_str())
.unwrap_or("???"),
entry
.get("attribute")
.and_then(|a| a.as_str())
.unwrap_or("???"),
entry
.get("value")
.and_then(|v| v.get("c"))
.map(|c| format!("{c}"))
.unwrap_or("???".to_string()),
entry
.get("timestamp")
.and_then(|t| t.as_str())
.unwrap_or("???"),
entry
.get("provenance")
.and_then(|p| p.as_str())
.unwrap_or("???"),
)
})
}
}
}
Ok(())
}
fn hash_path<P: AsRef<Path>>(filepath: P) -> Result<Address> {
let filepath = filepath.as_ref();
debug!("Hashing {:?}...", filepath);
let fbuffer = FileBuffer::open(filepath)?;
let hash = sha256hash(&fbuffer)?;
trace!("Finished hashing {:?}...", filepath);
Ok(Address::Hash(hash))
}