upend/cli/src/main.rs

490 lines
16 KiB
Rust

#[macro_use]
extern crate upend;
use actix_cors::Cors;
use actix_web::{middleware, App, HttpServer};
use anyhow::Result;
use clap::{Args, Parser, Subcommand, ValueEnum};
use filebuffer::FileBuffer;
use rand::{thread_rng, Rng};
use regex::Captures;
use regex::Regex;
use reqwest::Url;
use serde_json::json;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tracing::trace;
use tracing::{debug, error, info, warn};
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use upend::addressing::Address;
use upend::database::entry::EntryValue;
use upend::util::hash::hash;
use upend::{
common::{build, get_static_dir},
config::UpEndConfig,
database::{
stores::{fs::FsStore, UpStore},
UpEndDatabase,
},
util::jobs::JobContainer,
};
use crate::util::exec::block_background;
mod routes;
mod util;
mod extractors;
mod previews;
#[derive(Debug, Parser)]
#[command(name = "upend", author, version)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Debug, Subcommand)]
enum Commands {
/// Perform a query against an UpEnd server instance.
Query {
/// URL of the UpEnd instance to query.
#[arg(short, long, default_value = "http://localhost:8093")]
url: Url,
/// The query itself, in L-expression format; prefix a filepath by `@=` to insert its hash in its place.
query: String,
/// Output format
#[arg(short, long, default_value = "tsv")]
format: OutputFormat,
},
/// Insert an entry into an UpEnd server instance.
Insert {
/// URL of the UpEnd instance to query.
#[arg(short, long)]
url: Option<Url>,
/// The address of the entity; prefix a filepath by `@` to insert its hash.
entity: String,
/// The attribute of the entry.
attribute: String,
/// The value; its type will be heurestically determined.
value: String,
/// Output format
#[arg(short, long, default_value = "tsv")]
format: OutputFormat,
},
/// Get the address of a file, attribute, or URL.
Address {
/// Type of input to be addressed
_type: AddressType,
/// Path to a file, hash...
input: String,
/// Output format
#[arg(short, long, default_value = "tsv")]
format: OutputFormat,
},
/// Start an UpEnd server instance.
Serve(ServeArgs),
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, ValueEnum)]
enum OutputFormat {
/// JSON
Json,
/// Tab Separated Values
Tsv,
/// Raw, as received from the server
Raw,
}
#[derive(Copy, Clone, PartialEq, Eq, Debug, ValueEnum)]
enum AddressType {
/// Hash a file and output its address.
File,
/// Compute an address from the output of `sha256sum`
Sha256sum,
}
#[derive(Debug, Args)]
struct ServeArgs {
/// Directory to serve a vault from.
#[arg()]
directory: PathBuf,
/// Address and port to bind the Web interface on.
#[arg(long, default_value = "127.0.0.1:8093")]
bind: String,
/// Path to blob store ($VAULT_PATH by default).
#[arg(long)]
store_path: Option<PathBuf>,
/// Do not open a web browser with the UI.
#[arg(long)]
no_browser: bool,
/// Disable desktop features (web browser, native file opening).
#[arg(long)]
no_desktop: bool,
/// Trust the vault, and open local executable files.
#[arg(long)]
trust_executables: bool,
/// Do not serve the web UI.
#[arg(long)]
no_ui: bool,
/// Do not run a database update on start.
#[arg(long)]
no_initial_update: bool,
/// Clean up temporary files (e.g. previews) on start.
#[arg(long)]
clean: bool,
/// Delete and initialize database, if it exists already.
#[arg(long)]
reinitialize: bool,
/// Name of the vault.
#[arg(long)]
vault_name: Option<String>,
/// Secret to use for authentication.
#[arg(long, env = "UPEND_SECRET")]
secret: Option<String>,
/// Authentication key users must supply.
#[arg(long, env = "UPEND_KEY")]
key: Option<String>,
/// Allowed host/domain name the API can serve.
#[arg(long)]
allow_host: Vec<String>,
}
fn main() -> Result<()> {
let args = Cli::parse();
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy(),
)
.init();
match args.command {
Commands::Query { url, query, format } => {
let re = Regex::new(r#"@(="([^"]+)"|=([^ ]+))"#).unwrap();
let query = re
.replace_all(&query, |caps: &Captures| {
if let Some(filepath_match) = caps.get(2).or_else(|| caps.get(3)) {
let address = hash_path(filepath_match.as_str()).unwrap();
format!("@{}", address)
} else {
panic!("Error preprocessing query. Captures: {:?}", caps)
}
})
.to_string();
debug!("Final query: {}", query);
let api_url = url.join("/api/query")?;
debug!("Querying \"{}\"", api_url);
let client = reqwest::blocking::Client::new();
let response = client.post(api_url).body(query).send()?;
response.error_for_status_ref()?;
match format {
OutputFormat::Json | OutputFormat::Raw => println!("{}", response.text()?),
OutputFormat::Tsv => {
eprintln!(
"entity\tattribute\tvalue\ttimestamp\tprovenance"
);
response
.json::<HashMap<String, serde_json::Value>>()?
.iter()
.for_each(|(_, entry)| {
println!(
"{}\t{}\t{}\t{}\t{}",
entry.get("entity").and_then(|e| e.as_str()).unwrap(),
entry.get("attribute").and_then(|a| a.as_str()).unwrap(),
entry
.get("value")
.and_then(|v| v.get("c"))
.unwrap(),
entry.get("timestamp").and_then(|t| t.as_str()).unwrap(),
entry.get("provenance").and_then(|p| p.as_str()).unwrap(),
)
})
}
}
Ok(())
}
Commands::Insert {
url,
entity,
attribute,
value,
format: _,
} => {
let url = url.unwrap_or("http://localhost:8093".parse().unwrap());
let api_url = url.join("/api/obj")?;
let entity = match entity {
entity if entity.starts_with('=') => hash_path(&entity[1..])?.to_string(),
entity if entity.starts_with("http") => Address::Url(entity.parse()?).to_string(),
_ => entity,
};
let value = EntryValue::guess_from(value);
let body = json!({
"entity": entity,
"attribute": attribute,
"value": value
});
debug!("Inserting {:?} at \"{}\"", body, api_url);
let client = reqwest::blocking::Client::new();
let response = client.put(api_url).json(&body).send()?;
match response.error_for_status_ref() {
Ok(_) => {
let data: Vec<String> = response.json()?;
Ok(println!("{}", data[0]))
}
Err(err) => {
error!("{}", response.text()?);
Err(err.into())
}
}
}
Commands::Address {
_type,
input,
format,
} => {
let address = match _type {
AddressType::File => hash_path(&input)?,
AddressType::Sha256sum => {
let digest = multibase::Base::Base16Lower.decode(input)?;
Address::Hash(upend::util::hash::Hash(digest))
}
};
match format {
OutputFormat::Json => Ok(println!("\"{}\"", address)),
OutputFormat::Tsv | OutputFormat::Raw => Ok(println!("{}", address)),
}
}
Commands::Serve(args) => {
info!("Starting UpEnd {}...", build::PKG_VERSION);
let sys = actix::System::new("upend");
let job_container = JobContainer::new();
let vault_path = args.directory;
let open_result = UpEndDatabase::open(&vault_path, args.reinitialize)
.expect("failed to open database!");
let upend = Arc::new(open_result.db);
let store = Arc::new(Box::new(
FsStore::from_path(args.store_path.unwrap_or_else(|| vault_path.clone())).unwrap(),
) as Box<dyn UpStore + Send + Sync>);
let ui_path = get_static_dir("webui");
if ui_path.is_err() {
warn!(
"Couldn't locate Web UI directory ({:?}), disabling...",
ui_path
);
}
let ui_enabled = ui_path.is_ok() && !args.no_ui;
let browser_enabled = !args.no_desktop && ui_enabled && !args.no_browser;
let preview_path = upend.path.join("previews");
#[cfg(feature = "previews")]
let preview_store = Some(Arc::new(crate::previews::PreviewStore::new(
preview_path.clone(),
store.clone(),
)));
#[cfg(feature = "previews")]
let preview_pool = Some(Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get() / 2)
.build()
.unwrap(),
));
if args.clean {
info!("Cleaning temporary directories...");
if preview_path.exists() {
std::fs::remove_dir_all(&preview_path).unwrap();
debug!("Removed {preview_path:?}");
} else {
debug!("No preview path exists, continuing...");
}
}
#[cfg(not(feature = "previews"))]
let preview_store = None;
#[cfg(not(feature = "previews"))]
let preview_pool = None;
let mut bind: SocketAddr = args.bind.parse().expect("Incorrect bind format.");
let secret = args.secret.unwrap_or_else(|| {
warn!("No secret supplied, generating one at random.");
thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(32)
.map(char::from)
.collect()
});
let state = routes::State {
upend: upend.clone(),
store,
job_container: job_container.clone(),
preview_store,
preview_pool,
config: UpEndConfig {
vault_name: Some(args.vault_name.unwrap_or_else(|| {
vault_path
.iter()
.last()
.unwrap()
.to_string_lossy()
.into_owned()
})),
desktop_enabled: !args.no_desktop,
trust_executables: args.trust_executables,
key: args.key,
secret,
},
};
// Start HTTP server
let mut cnt = 0;
let ui_path = ui_path.ok();
let server = loop {
let state = state.clone();
let ui_path = ui_path.clone();
let allowed_origins = args.allow_host.clone();
let server = HttpServer::new(move || {
let allowed_origins = allowed_origins.clone();
let cors = Cors::default()
.allowed_origin("http://localhost")
.allowed_origin("http://127.0.0.1")
.allowed_origin_fn(|origin, _req_head| {
origin.as_bytes().starts_with(b"http://localhost:")
|| origin.as_bytes().starts_with(b"http://127.0.0.1:")
})
.allowed_origin_fn(move |origin, _req_head| {
allowed_origins.iter().any(|allowed_origin| {
*allowed_origin == "*" || origin == allowed_origin
})
})
.allow_any_method();
let app = App::new()
.wrap(cors)
.app_data(actix_web::web::PayloadConfig::new(4_294_967_296))
.data(state.clone())
.wrap(middleware::Logger::default().exclude("/api/jobs"))
.service(routes::login)
.service(routes::get_raw)
.service(routes::get_thumbnail)
.service(routes::get_query)
.service(routes::get_object)
.service(routes::put_object)
.service(routes::put_blob)
.service(routes::put_object_attribute)
.service(routes::delete_object)
.service(routes::get_address)
.service(routes::get_all_attributes)
.service(routes::api_refresh)
.service(routes::list_hier)
.service(routes::list_hier_roots)
.service(routes::store_info)
.service(routes::get_jobs)
.service(routes::get_info);
if ui_enabled {
if let Some(ui_path) = &ui_path {
return app.service(
actix_files::Files::new("/", ui_path).index_file("index.html"),
);
}
}
app
});
let bind_result = server.bind(&bind);
if let Ok(server) = bind_result {
break server;
} else {
warn!("Failed to bind at {:?}, trying next port number...", bind);
bind.set_port(bind.port() + 1);
}
if cnt > 32 {
panic!("Couldn't start server.")
} else {
cnt += 1;
}
};
info!("Starting server at: {}", &bind);
server.run();
if !args.no_initial_update {
info!("Running initial update...");
let initial = open_result.new;
block_background::<_, _, anyhow::Error>(move || {
let _ = state.store.update(&upend, job_container.clone(), initial);
let _ = extractors::extract_all(upend, state.store, job_container);
Ok(())
})
}
#[cfg(feature = "desktop")]
{
if browser_enabled && ui_enabled {
let ui_result = webbrowser::open(&format!("http://localhost:{}", bind.port()));
if ui_result.is_err() {
warn!("Could not open UI in browser!");
}
}
}
Ok(sys.run()?)
}
}
}
fn hash_path<P: AsRef<Path>>(filepath: P) -> Result<Address> {
let filepath = filepath.as_ref();
debug!("Hashing {:?}...", filepath);
let fbuffer = FileBuffer::open(filepath)?;
let digest = hash(&fbuffer);
trace!("Finished hashing {:?}...", filepath);
Ok(Address::Hash(digest))
}