Skip to content

Commit a5d2cc6

Browse files
committed
persist: provide a way to disable the isolated runtime
This commit introduces `PersistClientCache::new_for_turmoil`, as a way to use persist in turmoil tests. In particular, this disables the isolated runtime. Tasks that would be spawned there are instead spawned in the current runtime. The method is gated behind a new "turmoil" feature.
1 parent 6e12a0d commit a5d2cc6

File tree

3 files changed

+51
-8
lines changed

3 files changed

+51
-8
lines changed

src/persist-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ tonic-build = "0.12.3"
8383

8484
[features]
8585
default = ["mz-build-tools/default", "workspace-hack"]
86+
turmoil = []
8687

8788
[package.metadata.cargo-udeps.ignore]
8889
normal = ["workspace-hack"]

src/persist-client/src/async_runtime.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,15 @@ impl IsolatedRuntime {
7575
IsolatedRuntime::new(&MetricsRegistry::new(), Some(TEST_THREADS))
7676
}
7777

78+
#[cfg(feature = "turmoil")]
79+
/// Create a no-op shim that spawns tasks on the current tokio runtime.
80+
///
81+
/// This is useful for simulation tests where we don't want to spawn additional threads and/or
82+
/// tokio runtimes.
83+
pub fn new_disabled() -> Self {
84+
IsolatedRuntime { inner: None }
85+
}
86+
7887
/// Spawns a task onto this runtime.
7988
///
8089
/// Note: We purposefully do not use the [`tokio::task::spawn_blocking`] API here, see the doc
@@ -86,10 +95,11 @@ impl IsolatedRuntime {
8695
F: Future + Send + 'static,
8796
F::Output: Send + 'static,
8897
{
89-
self.inner
90-
.as_ref()
91-
.expect("exists until drop")
92-
.spawn_named(name, fut)
98+
if let Some(runtime) = &self.inner {
99+
runtime.spawn_named(name, fut)
100+
} else {
101+
mz_ore::task::spawn(name, fut)
102+
}
93103
}
94104
}
95105

@@ -98,9 +108,8 @@ impl Drop for IsolatedRuntime {
98108
// We don't need to worry about `shutdown_background` leaking
99109
// blocking tasks (i.e., tasks spawned with `spawn_blocking`) because
100110
// the `IsolatedRuntime` wrapper prevents access to `spawn_blocking`.
101-
self.inner
102-
.take()
103-
.expect("cannot drop twice")
104-
.shutdown_background()
111+
if let Some(runtime) = self.inner.take() {
112+
runtime.shutdown_background();
113+
}
105114
}
106115
}

src/persist-client/src/cache.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,39 @@ impl PersistClientCache {
111111
)
112112
}
113113

114+
#[cfg(feature = "turmoil")]
115+
/// Create a [PersistClientCache] for use in turmoil tests.
116+
///
117+
/// Turmoil wants to run all software under test in a single thread, so we disable the
118+
/// (multi-threaded) isolated runtime.
119+
pub fn new_for_turmoil() -> Self {
120+
use crate::rpc::NoopPubSubSender;
121+
122+
let cfg = PersistConfig::new_for_tests();
123+
let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
124+
125+
let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
126+
let _pubsub_receiver_task = mz_ore::task::spawn(|| "noop", async {});
127+
128+
let state_cache = Arc::new(StateCache::new(
129+
&cfg,
130+
Arc::clone(&metrics),
131+
Arc::clone(&pubsub_sender),
132+
));
133+
let isolated_runtime = IsolatedRuntime::new_disabled();
134+
135+
PersistClientCache {
136+
cfg,
137+
metrics,
138+
blob_by_uri: Mutex::new(BTreeMap::new()),
139+
consensus_by_uri: Mutex::new(BTreeMap::new()),
140+
isolated_runtime: Arc::new(isolated_runtime),
141+
state_cache,
142+
pubsub_sender,
143+
_pubsub_receiver_task,
144+
}
145+
}
146+
114147
/// Returns the [PersistConfig] being used by this cache.
115148
pub fn cfg(&self) -> &PersistConfig {
116149
&self.cfg

0 commit comments

Comments
 (0)