diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index 3abc1d3848931..2a41d3868009f 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -83,6 +83,7 @@ tonic-build = "0.12.3" [features] default = ["mz-build-tools/default", "workspace-hack"] +turmoil = [] [package.metadata.cargo-udeps.ignore] normal = ["workspace-hack"] diff --git a/src/persist-client/src/async_runtime.rs b/src/persist-client/src/async_runtime.rs index 82a7a85df6479..88f41c2d9a5fe 100644 --- a/src/persist-client/src/async_runtime.rs +++ b/src/persist-client/src/async_runtime.rs @@ -75,6 +75,15 @@ impl IsolatedRuntime { IsolatedRuntime::new(&MetricsRegistry::new(), Some(TEST_THREADS)) } + #[cfg(feature = "turmoil")] + /// Create a no-op shim that spawns tasks on the current tokio runtime. + /// + /// This is useful for simulation tests where we don't want to spawn additional threads and/or + /// tokio runtimes. + pub fn new_disabled() -> Self { + IsolatedRuntime { inner: None } + } + /// Spawns a task onto this runtime. /// /// Note: We purposefully do not use the [`tokio::task::spawn_blocking`] API here, see the doc @@ -86,10 +95,11 @@ impl IsolatedRuntime { F: Future + Send + 'static, F::Output: Send + 'static, { - self.inner - .as_ref() - .expect("exists until drop") - .spawn_named(name, fut) + if let Some(runtime) = &self.inner { + runtime.spawn_named(name, fut) + } else { + mz_ore::task::spawn(name, fut) + } } } @@ -98,9 +108,8 @@ impl Drop for IsolatedRuntime { // We don't need to worry about `shutdown_background` leaking // blocking tasks (i.e., tasks spawned with `spawn_blocking`) because // the `IsolatedRuntime` wrapper prevents access to `spawn_blocking`. - self.inner - .take() - .expect("cannot drop twice") - .shutdown_background() + if let Some(runtime) = self.inner.take() { + runtime.shutdown_background(); + } } } diff --git a/src/persist-client/src/cache.rs b/src/persist-client/src/cache.rs index cc9e14b03320d..b47ee3ce92ffb 100644 --- a/src/persist-client/src/cache.rs +++ b/src/persist-client/src/cache.rs @@ -111,6 +111,39 @@ impl PersistClientCache { ) } + #[cfg(feature = "turmoil")] + /// Create a [PersistClientCache] for use in turmoil tests. + /// + /// Turmoil wants to run all software under test in a single thread, so we disable the + /// (multi-threaded) isolated runtime. + pub fn new_for_turmoil() -> Self { + use crate::rpc::NoopPubSubSender; + + let cfg = PersistConfig::new_for_tests(); + let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new())); + + let pubsub_sender: Arc = Arc::new(NoopPubSubSender); + let _pubsub_receiver_task = mz_ore::task::spawn(|| "noop", async {}); + + let state_cache = Arc::new(StateCache::new( + &cfg, + Arc::clone(&metrics), + Arc::clone(&pubsub_sender), + )); + let isolated_runtime = IsolatedRuntime::new_disabled(); + + PersistClientCache { + cfg, + metrics, + blob_by_uri: Mutex::new(BTreeMap::new()), + consensus_by_uri: Mutex::new(BTreeMap::new()), + isolated_runtime: Arc::new(isolated_runtime), + state_cache, + pubsub_sender, + _pubsub_receiver_task, + } + } + /// Returns the [PersistConfig] being used by this cache. pub fn cfg(&self) -> &PersistConfig { &self.cfg