diff --git a/Cargo.lock b/Cargo.lock index 205b90a..6caf91a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3900,6 +3900,7 @@ dependencies = [ "tempfile", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "tree_magic_mini", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 5eaca96..19d2638 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -18,6 +18,7 @@ futures = "0.3.24" futures-util = "~0.3.12" tokio = { version = "1.38.1", features = ["sync", "fs", "process", "rt"] } async-trait = "0.1.81" +tokio-stream = { version = "0.1", features = ["time"] } clap = { version = "4.2.4", features = ["derive", "env", "color", "string", "cargo"] } log = "0.4" diff --git a/cli/src/routes.rs b/cli/src/routes.rs index 694544f..36c5556 100644 --- a/cli/src/routes.rs +++ b/cli/src/routes.rs @@ -27,7 +27,7 @@ use std::collections::HashMap; use std::convert::{TryFrom, TryInto}; use std::io::Write; use std::sync::{Arc, Mutex}; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tempfile::NamedTempFile; use tracing::{debug, error, info, trace}; use upend_base::addressing::AddressComponents; @@ -1103,7 +1103,9 @@ pub async fn get_jobs_stream( .stream_jobs() .map_err(ErrorInternalServerError)?; - Ok(sse::Sse::from_stream(jobs.map(|jobs_msg| { + let throttled_jobs = tokio_stream::StreamExt::throttle(jobs, Duration::from_millis(200)); + + Ok(sse::Sse::from_stream(throttled_jobs.map(|jobs_msg| { let jobs = jobs_msg.map_err(|err| anyhow!(err))?; let data = sse::Data::new_json(jobs).map_err(|err| anyhow!(err))?; Ok::(sse::Event::Data(data))