diff --git a/Cargo.lock b/Cargo.lock index fb16c0e1f59cf..7f3ed7de3c3b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6592,6 +6592,7 @@ dependencies = [ "turborepo-microfrontends", "turborepo-repository", "turborepo-scm", + "turborepo-signals", "turborepo-telemetry", "turborepo-ui", "turborepo-unescape", @@ -6744,6 +6745,14 @@ dependencies = [ "which", ] +[[package]] +name = "turborepo-signals" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", +] + [[package]] name = "turborepo-telemetry" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0d4af5ddad8b8..c321cab3cdeaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ turborepo-repository = { path = "crates/turborepo-repository" } turborepo-ui = { path = "crates/turborepo-ui" } turborepo-unescape = { path = "crates/turborepo-unescape" } turborepo-scm = { path = "crates/turborepo-scm" } +turborepo-signals = { path = "crates/turborepo-signals" } wax = { path = "crates/turborepo-wax" } turborepo-vercel-api = { path = "crates/turborepo-vercel-api" } turborepo-vercel-api-mock = { path = "crates/turborepo-vercel-api-mock" } diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index cfbfdb70d9345..ff87f6cd1a1ba 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -139,6 +139,7 @@ turborepo-lockfiles = { workspace = true } turborepo-microfrontends = { workspace = true } turborepo-repository = { path = "../turborepo-repository" } turborepo-scm = { workspace = true } +turborepo-signals = { workspace = true } turborepo-telemetry = { path = "../turborepo-telemetry" } turborepo-ui = { workspace = true } turborepo-unescape = { workspace = true } diff --git a/crates/turborepo-lib/src/cli/error.rs b/crates/turborepo-lib/src/cli/error.rs index 319a0461043bd..2407671e22d6a 100644 --- a/crates/turborepo-lib/src/cli/error.rs +++ b/crates/turborepo-lib/src/cli/error.rs @@ -4,17 +4,17 @@ use itertools::Itertools; use miette::Diagnostic; use thiserror::Error; use turborepo_repository::package_graph; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::{color, BOLD, GREY}; use crate::{ - commands::{bin, generate, link, login, ls, prune, run::get_signal, CommandBase}, + commands::{bin, generate, link, login, ls, prune, CommandBase}, daemon::DaemonError, query, rewrite_json::RewriteError, run, run::{builder::RunBuilder, watch}, - signal::SignalHandler, }; #[derive(Debug, Error, Diagnostic)] @@ -78,7 +78,7 @@ pub async fn print_potential_tasks( base: CommandBase, telemetry: CommandEventBuilder, ) -> Result<(), Error> { - let signal = get_signal()?; + let signal = get_signal().map_err(run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); let color_config = base.color_config; diff --git a/crates/turborepo-lib/src/commands/boundaries.rs b/crates/turborepo-lib/src/commands/boundaries.rs index b5bf517290d1b..03211975078be 100644 --- a/crates/turborepo-lib/src/commands/boundaries.rs +++ b/crates/turborepo-lib/src/commands/boundaries.rs @@ -1,14 +1,10 @@ +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; -use crate::{ - cli, - commands::{run::get_signal, CommandBase}, - run::builder::RunBuilder, - signal::SignalHandler, -}; +use crate::{cli, commands::CommandBase, run::builder::RunBuilder}; pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result { - let signal = get_signal()?; + let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); let run = RunBuilder::new(base)? diff --git a/crates/turborepo-lib/src/commands/ls.rs b/crates/turborepo-lib/src/commands/ls.rs index 9e05ac49438ad..a1c94f6bd874d 100644 --- a/crates/turborepo-lib/src/commands/ls.rs +++ b/crates/turborepo-lib/src/commands/ls.rs @@ -5,15 +5,15 @@ use serde::Serialize; use thiserror::Error; use turbopath::AnchoredSystemPath; use turborepo_repository::package_graph::{PackageName, PackageNode}; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::{color, cprint, cprintln, ColorConfig, BOLD, BOLD_GREEN, GREY}; use crate::{ cli, cli::OutputFormat, - commands::{run::get_signal, CommandBase}, + commands::CommandBase, run::{builder::RunBuilder, Run}, - signal::SignalHandler, }; #[derive(Debug, Error, Diagnostic)] @@ -115,7 +115,7 @@ pub async fn run( telemetry: CommandEventBuilder, output: Option, ) -> Result<(), cli::Error> { - let signal = get_signal()?; + let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); let run_builder = RunBuilder::new(base)?; diff --git a/crates/turborepo-lib/src/commands/query.rs b/crates/turborepo-lib/src/commands/query.rs index 6b896d519406a..fd505dbb8641b 100644 --- a/crates/turborepo-lib/src/commands/query.rs +++ b/crates/turborepo-lib/src/commands/query.rs @@ -5,14 +5,14 @@ use camino::Utf8Path; use miette::{Diagnostic, Report, SourceSpan}; use thiserror::Error; use turbopath::AbsoluteSystemPathBuf; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use crate::{ - commands::{run::get_signal, CommandBase}, + commands::CommandBase, query, query::{Error, RepositoryQuery}, run::builder::RunBuilder, - signal::SignalHandler, }; #[derive(Debug, Diagnostic, Error)] diff --git a/crates/turborepo-lib/src/commands/run.rs b/crates/turborepo-lib/src/commands/run.rs index b3b6e5dada7c9..174d9d93f0b3f 100644 --- a/crates/turborepo-lib/src/commands/run.rs +++ b/crates/turborepo-lib/src/commands/run.rs @@ -1,39 +1,14 @@ -use std::{future::Future, sync::Arc}; +use std::sync::Arc; use tracing::error; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::sender::UISender; -use crate::{commands::CommandBase, run, run::builder::RunBuilder, signal::SignalHandler}; - -#[cfg(windows)] -pub fn get_signal() -> Result>, run::Error> { - let mut ctrl_c = tokio::signal::windows::ctrl_c().map_err(run::Error::SignalHandler)?; - Ok(async move { ctrl_c.recv().await }) -} - -#[cfg(not(windows))] -pub fn get_signal() -> Result>, run::Error> { - use tokio::signal::unix; - let mut sigint = - unix::signal(unix::SignalKind::interrupt()).map_err(run::Error::SignalHandler)?; - let mut sigterm = - unix::signal(unix::SignalKind::terminate()).map_err(run::Error::SignalHandler)?; - - Ok(async move { - tokio::select! { - res = sigint.recv() => { - res - } - res = sigterm.recv() => { - res - } - } - }) -} +use crate::{commands::CommandBase, run, run::builder::RunBuilder}; pub async fn run(base: CommandBase, telemetry: CommandEventBuilder) -> Result { - let signal = get_signal()?; + let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); let run_builder = RunBuilder::new(base)?; diff --git a/crates/turborepo-lib/src/lib.rs b/crates/turborepo-lib/src/lib.rs index 17448af06168b..6e10258aebc0b 100644 --- a/crates/turborepo-lib/src/lib.rs +++ b/crates/turborepo-lib/src/lib.rs @@ -34,7 +34,6 @@ mod query; mod rewrite_json; mod run; mod shim; -mod signal; mod task_graph; mod task_hash; mod tracing; diff --git a/crates/turborepo-lib/src/query/mod.rs b/crates/turborepo-lib/src/query/mod.rs index c7e3278ab2efa..5d03350ee932e 100644 --- a/crates/turborepo-lib/src/query/mod.rs +++ b/crates/turborepo-lib/src/query/mod.rs @@ -24,12 +24,12 @@ use tokio::select; use turbo_trace::TraceError; use turbopath::AbsoluteSystemPathBuf; use turborepo_repository::{change_mapper::AllPackageChangeReason, package_graph::PackageName}; +use turborepo_signals::SignalHandler; use crate::{ get_version, query::{file::File, task::RepositoryTask}, run::{builder::RunBuilder, Run}, - signal::SignalHandler, }; #[derive(Error, Debug, miette::Diagnostic)] diff --git a/crates/turborepo-lib/src/run/builder.rs b/crates/turborepo-lib/src/run/builder.rs index f1773daa87785..ee3c0aa81dc58 100644 --- a/crates/turborepo-lib/src/run/builder.rs +++ b/crates/turborepo-lib/src/run/builder.rs @@ -20,6 +20,7 @@ use turborepo_repository::{ package_json::PackageJson, }; use turborepo_scm::SCM; +use turborepo_signals::{SignalHandler, SignalSubscriber}; use turborepo_telemetry::events::{ command::CommandEventBuilder, generic::{DaemonInitStatus, GenericEventBuilder}, @@ -46,7 +47,6 @@ use crate::{ process::ProcessManager, run::{scope, task_access::TaskAccess, task_id::TaskName, Error, Run, RunCache}, shim::TurboState, - signal::{SignalHandler, SignalSubscriber}, turbo_json::{TurboJson, TurboJsonLoader, UIMode}, DaemonConnector, }; diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 964cefb892760..6eba4e3996031 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -22,8 +22,9 @@ use std::{ pub use cache::{CacheOutput, ConfigCache, Error as CacheError, RunCache, TaskCache}; use chrono::{DateTime, Local}; +use futures::StreamExt; use rayon::iter::ParallelBridge; -use tokio::{select, task::JoinHandle}; +use tokio::{pin, select, task::JoinHandle}; use tracing::{debug, instrument}; use turbopath::{AbsoluteSystemPath, AbsoluteSystemPathBuf}; use turborepo_api_client::{APIAuth, APIClient}; @@ -31,6 +32,7 @@ use turborepo_ci::Vendor; use turborepo_env::EnvironmentVariableMap; use turborepo_repository::package_graph::{PackageGraph, PackageName, PackageNode}; use turborepo_scm::SCM; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::generic::GenericEventBuilder; use turborepo_ui::{ cprint, cprintln, sender::UISender, tui, tui::TuiSender, wui::sender::WebUISender, ColorConfig, @@ -45,7 +47,6 @@ use crate::{ opts::Opts, process::ProcessManager, run::{global_hash::get_global_hash_inputs, summary::RunTracker, task_access::TaskAccess}, - signal::SignalHandler, task_graph::Visitor, task_hash::{get_external_deps_hash, get_internal_deps_hash, PackageInputsHashes}, turbo_json::{TurboJson, TurboJsonLoader, UIMode}, @@ -336,8 +337,9 @@ impl Run { }; let interrupt = async { - if let Ok(fut) = crate::commands::run::get_signal() { - fut.await; + if let Ok(fut) = get_signal() { + pin!(fut); + fut.next().await; } else { tracing::warn!("could not register ctrl-c handler"); // wait forever diff --git a/crates/turborepo-lib/src/run/watch.rs b/crates/turborepo-lib/src/run/watch.rs index 8f4513272d6f2..172b638af8753 100644 --- a/crates/turborepo-lib/src/run/watch.rs +++ b/crates/turborepo-lib/src/run/watch.rs @@ -10,15 +10,15 @@ use thiserror::Error; use tokio::{select, sync::Notify, task::JoinHandle}; use tracing::{instrument, trace, warn}; use turborepo_repository::package_graph::PackageName; +use turborepo_signals::{listeners::get_signal, SignalHandler}; use turborepo_telemetry::events::command::CommandEventBuilder; use turborepo_ui::sender::UISender; use crate::{ - commands::{self, CommandBase}, + commands::CommandBase, daemon::{proto, DaemonConnectorError, DaemonError}, get_version, opts, run::{self, builder::RunBuilder, scope::target_selector::InvalidSelectorError, Run}, - signal::SignalHandler, turbo_json::CONFIG_FILE, DaemonConnector, DaemonPaths, }; @@ -115,7 +115,7 @@ impl WatchClient { experimental_write_cache: bool, telemetry: CommandEventBuilder, ) -> Result { - let signal = commands::run::get_signal()?; + let signal = get_signal().map_err(crate::run::Error::SignalHandler)?; let handler = SignalHandler::new(signal); if base.opts.repo_opts.root_turbo_json_path != base.repo_root.join_component(CONFIG_FILE) { diff --git a/crates/turborepo-signals/Cargo.toml b/crates/turborepo-signals/Cargo.toml new file mode 100644 index 0000000000000..ad04f1bb3ad12 --- /dev/null +++ b/crates/turborepo-signals/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "turborepo-signals" +version = "0.1.0" +edition = "2021" +license = "MIT" + +[dependencies] +futures = "0.3.30" +tokio = { workspace = true, features = ["full", "time"] } + +[dev-dependencies] + +[lints] +workspace = true diff --git a/crates/turborepo-lib/src/signal.rs b/crates/turborepo-signals/src/lib.rs similarity index 79% rename from crates/turborepo-lib/src/signal.rs rename to crates/turborepo-signals/src/lib.rs index 37cd735ef2fc2..d70eb3e2b6ea1 100644 --- a/crates/turborepo-lib/src/signal.rs +++ b/crates/turborepo-signals/src/lib.rs @@ -1,11 +1,22 @@ +#![deny(clippy::all)] +#![feature(assert_matches)] + +//! A crate for registering listeners for a given signal + +pub mod listeners; +pub mod signals; + use std::{ fmt::Debug, - future::Future, sync::{Arc, Mutex}, }; -use futures::{stream::FuturesUnordered, StreamExt}; -use tokio::sync::{mpsc, oneshot}; +use futures::{stream::FuturesUnordered, Stream, StreamExt}; +use signals::Signal; +use tokio::{ + pin, + sync::{mpsc, oneshot}, +}; /// SignalHandler provides a mechanism to subscribe to a future and get alerted /// whenever the future completes or the handler gets a close message. @@ -17,29 +28,32 @@ pub struct SignalHandler { #[derive(Debug, Default)] struct HandlerState { - subscribers: Vec>>, + subscribers: Vec>>, is_closing: bool, } -pub struct SignalSubscriber(oneshot::Receiver>); +pub struct SignalSubscriber(oneshot::Receiver>); /// SubscriberGuard should be kept until a subscriber is done processing the /// signal -pub struct SubscriberGuard(oneshot::Sender<()>); +pub struct SubscriberGuard { + _guard: oneshot::Sender, +} impl SignalHandler { /// Construct a new SignalHandler that will alert any subscribers when /// `signal_source` completes or `close` is called on it. - pub fn new(signal_source: impl Future> + Send + 'static) -> Self { + pub fn new(signal_source: impl Stream> + Send + 'static) -> Self { // think about channel size let state = Arc::new(Mutex::new(HandlerState::default())); let worker_state = state.clone(); let (close, mut rx) = mpsc::channel::<()>(1); tokio::spawn(async move { + pin!(signal_source); tokio::select! { // We don't care if we get a signal or if we are unable to receive signals // Either way we start the shutdown. - _ = signal_source => {}, + _ = signal_source.next() => {}, // We don't care if a close message was sent or if all handlers are dropped. // Either way start the shutdown process. _ = rx.recv() => {} @@ -108,16 +122,16 @@ impl SignalHandler { impl SignalSubscriber { /// Wait until signal is received by the signal handler pub async fn listen(self) -> SubscriberGuard { - let callback = self + let _guard = self .0 .await .expect("signal handler worker thread exited without alerting subscribers"); - SubscriberGuard(callback) + SubscriberGuard { _guard } } } impl HandlerState { - fn add_subscriber(&mut self) -> Option>> { + fn add_subscriber(&mut self) -> Option>> { (!self.is_closing).then(|| { let (tx, rx) = oneshot::channel(); self.subscribers.push(tx); @@ -130,15 +144,25 @@ impl HandlerState { mod test { use std::{assert_matches::assert_matches, time::Duration}; + use futures::stream; + use super::*; + #[cfg(windows)] + const DEFAULT_SIGNAL: Signal = Signal::CtrlC; + #[cfg(not(windows))] + const DEFAULT_SIGNAL: Signal = Signal::Interrupt; + #[tokio::test] async fn test_subscribers_triggered_from_signal() { let (tx, rx) = oneshot::channel(); - let handler = SignalHandler::new(async move { rx.await.ok() }); + let handler = SignalHandler::new(stream::once(async move { + rx.await.ok(); + Some(DEFAULT_SIGNAL) + })); let subscriber = handler.subscribe().unwrap(); // Send mocked SIGINT - tx.send(()).unwrap(); + tx.send(DEFAULT_SIGNAL).unwrap(); let (done, mut is_done) = oneshot::channel(); let handler2 = handler.clone(); @@ -161,7 +185,10 @@ mod test { #[tokio::test] async fn test_subscribers_triggered_from_close() { let (_tx, rx) = oneshot::channel::<()>(); - let handler = SignalHandler::new(async move { rx.await.ok() }); + let handler = SignalHandler::new(stream::once(async move { + rx.await.ok(); + Some(DEFAULT_SIGNAL) + })); let subscriber = handler.subscribe().unwrap(); let (close_done, mut is_close_done) = oneshot::channel(); @@ -184,7 +211,10 @@ mod test { #[tokio::test] async fn test_close_idempotent() { let (_tx, rx) = oneshot::channel::<()>(); - let handler = SignalHandler::new(async move { rx.await.ok() }); + let handler = SignalHandler::new(stream::once(async move { + rx.await.ok(); + Some(DEFAULT_SIGNAL) + })); handler.close().await; handler.close().await; } @@ -192,11 +222,14 @@ mod test { #[tokio::test] async fn test_subscribe_after_close() { let (tx, rx) = oneshot::channel(); - let handler = SignalHandler::new(async move { rx.await.ok() }); + let handler = SignalHandler::new(stream::once(async move { + rx.await.ok(); + Some(DEFAULT_SIGNAL) + })); let subscriber = handler.subscribe().unwrap(); // Send SIGINT - tx.send(()).unwrap(); + tx.send(DEFAULT_SIGNAL).unwrap(); // Do a quick yield to give the worker a chance to read the sigint tokio::task::yield_now().await; assert!( diff --git a/crates/turborepo-signals/src/listeners.rs b/crates/turborepo-signals/src/listeners.rs new file mode 100644 index 0000000000000..66e208ca7f402 --- /dev/null +++ b/crates/turborepo-signals/src/listeners.rs @@ -0,0 +1,33 @@ +use futures::{stream, Stream}; + +use crate::signals::Signal; + +#[cfg(windows)] +/// A listener for Windows Console Ctrl-C events +pub fn get_signal() -> Result>, std::io::Error> { + let mut ctrl_c = tokio::signal::windows::ctrl_c()?; + Ok(stream::once(async move { + ctrl_c.recv().await.map(|_| Signal::CtrlC) + })) +} + +#[cfg(not(windows))] +/// A listener for commong Unix signals that require special handling +/// +/// Currently listens for SIGINT and SIGTERM +pub fn get_signal() -> Result>, std::io::Error> { + use tokio::signal::unix; + let mut sigint = unix::signal(unix::SignalKind::interrupt())?; + let mut sigterm = unix::signal(unix::SignalKind::terminate())?; + + Ok(stream::once(async move { + tokio::select! { + res = sigint.recv() => { + res.map(|_| Signal::Interrupt) + } + res = sigterm.recv() => { + res.map(|_| Signal::Terminate) + } + } + })) +} diff --git a/crates/turborepo-signals/src/signals.rs b/crates/turborepo-signals/src/signals.rs new file mode 100644 index 0000000000000..ae6a0d71f71a1 --- /dev/null +++ b/crates/turborepo-signals/src/signals.rs @@ -0,0 +1,10 @@ +/// A collection of signals that are caught by the listeners +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Signal { + #[cfg(windows)] + CtrlC, + #[cfg(not(windows))] + Interrupt, + #[cfg(not(windows))] + Terminate, +}