Skip to content

Commit 66ac898

Browse files
committed
persist: add turmoil-enabled consensus and blob implementations
This commit introduces `Consensus`/`Blob` implementations that both forward commands over a `turmoil::net::TcpStream` to a server task that's a thin wrapper around the `Mem*` implementations. This is to support persist simulation in turmoil tests, which can now crash the consensus/blob servers, and introduce network faults in the persist communication. The new implementations are gated behind a "turmoil" feature.
1 parent 5cac2e9 commit 66ac898

File tree

8 files changed

+581
-5
lines changed

8 files changed

+581
-5
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/ore/src/bytes.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
2929
use internal::SegmentedReader;
3030
#[cfg(feature = "parquet")]
3131
use parquet::errors::ParquetError;
32+
use serde::{Deserialize, Serialize};
3233
use smallvec::SmallVec;
3334

3435
#[cfg(feature = "parquet")]
@@ -46,7 +47,7 @@ use crate::cast::CastFrom;
4647
/// [`smallvec::SmallVec`] to store our [`Bytes`] segments, and `N` is how many `Bytes` we'll
4748
/// store inline before spilling to the heap. We default `N = 1`, so in the case of a single
4849
/// `Bytes` segment, we avoid one layer of indirection.
49-
#[derive(Clone, Debug, PartialEq, Eq)]
50+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
5051
pub struct SegmentedBytes<const N: usize = 1> {
5152
/// Collection of non-contiguous segments, each segment is guaranteed to be non-empty.
5253
segments: SmallVec<[(Bytes, Padding); N]>,

src/persist/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,12 @@ prost = { version = "0.13.5", features = ["no-recursion-limit"] }
6060
rand = { version = "0.8.5", features = ["small_rng"] }
6161
reqwest = { version = "0.12", features = ["blocking", "json", "default-tls", "charset", "http2"], default-features = false }
6262
serde = { version = "1.0.219", features = ["derive"] }
63+
serde_json = { version = "1.0.145", optional = true }
6364
timely = "0.25.1"
6465
tokio = { version = "1.44.1", default-features = false, features = ["fs", "macros", "sync", "rt", "rt-multi-thread"] }
6566
tokio-postgres = { version = "0.7.8" }
6667
tracing = "0.1.37"
68+
turmoil = { version = "0.7.0", optional = true }
6769
url = "2.3.1"
6870
urlencoding = "2.1.3"
6971
uuid = { version = "1.18.1", features = ["v4"] }
@@ -88,6 +90,7 @@ prost-build = "0.13.5"
8890

8991
[features]
9092
default = ["mz-build-tools/default", "workspace-hack"]
93+
turmoil = ["dep:serde_json", "dep:turmoil"]
9194

9295
[package.metadata.cargo-udeps.ignore]
9396
normal = ["workspace-hack", "sha2"]

src/persist/src/cfg.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@ pub enum BlobConfig {
5151
Mem(bool),
5252
/// Config for [AzureBlob].
5353
Azure(AzureBlobConfig),
54+
#[cfg(feature = "turmoil")]
55+
/// Config for [crate::turmoil::TurmoilBlob].
56+
Turmoil(crate::turmoil::BlobConfig),
5457
}
5558

5659
/// Configuration knobs for [Blob].
@@ -77,6 +80,8 @@ impl BlobConfig {
7780
BlobConfig::Mem(tombstone) => {
7881
Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
7982
}
83+
#[cfg(feature = "turmoil")]
84+
BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
8085
}
8186
}
8287

@@ -186,6 +191,11 @@ impl BlobConfig {
186191
}
187192
_ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
188193
},
194+
#[cfg(feature = "turmoil")]
195+
"turmoil" => {
196+
let cfg = crate::turmoil::BlobConfig::new(url);
197+
Ok(BlobConfig::Turmoil(cfg))
198+
}
189199
p => Err(anyhow!(
190200
"unknown persist blob scheme {}: {}",
191201
p,
@@ -216,6 +226,9 @@ pub enum ConsensusConfig {
216226
Postgres(PostgresConsensusConfig),
217227
/// Config for [MemConsensus], only available in testing.
218228
Mem,
229+
#[cfg(feature = "turmoil")]
230+
/// Config for [crate::turmoil::TurmoilConsensus].
231+
Turmoil(crate::turmoil::ConsensusConfig),
219232
}
220233

221234
impl ConsensusConfig {
@@ -226,6 +239,10 @@ impl ConsensusConfig {
226239
Ok(Arc::new(PostgresConsensus::open(config).await?))
227240
}
228241
ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
242+
#[cfg(feature = "turmoil")]
243+
ConsensusConfig::Turmoil(config) => {
244+
Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
245+
}
229246
}
230247
}
231248

@@ -246,6 +263,11 @@ impl ConsensusConfig {
246263
}
247264
Ok(ConsensusConfig::Mem)
248265
}
266+
#[cfg(feature = "turmoil")]
267+
"turmoil" => {
268+
let cfg = crate::turmoil::ConsensusConfig::new(url);
269+
Ok(ConsensusConfig::Turmoil(cfg))
270+
}
249271
p => Err(anyhow!(
250272
"unknown persist consensus scheme {}: {}",
251273
p,

src/persist/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,7 @@ pub mod metrics;
3030
pub mod postgres;
3131
pub mod retry;
3232
pub mod s3;
33+
#[cfg(feature = "turmoil")]
34+
pub mod turmoil;
3335
pub mod unreliable;
3436
pub mod workload;

src/persist/src/location.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ impl From<tokio::task::JoinError> for ExternalError {
334334

335335
/// An abstraction for a single arbitrarily-sized binary blob and an associated
336336
/// version number (sequence number).
337-
#[derive(Debug, Clone, PartialEq)]
337+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
338338
pub struct VersionedData {
339339
/// The sequence number of the data.
340340
pub seqno: SeqNo,
@@ -352,7 +352,7 @@ pub const SCAN_ALL: usize = u64_to_usize(i64::MAX as u64);
352352
pub const CONSENSUS_HEAD_LIVENESS_KEY: &str = "LIVENESS";
353353

354354
/// Return type to indicate whether [Consensus::compare_and_set] succeeded or failed.
355-
#[derive(Debug, PartialEq)]
355+
#[derive(Debug, PartialEq, Serialize, Deserialize)]
356356
pub enum CaSResult {
357357
/// The compare-and-set succeeded and committed new state.
358358
Committed,

src/persist/src/mem.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ impl MemBlobConfig {
149149
}
150150

151151
/// An in-memory implementation of [Blob].
152-
#[derive(Debug)]
152+
#[derive(Clone, Debug)]
153153
pub struct MemBlob {
154154
core: Arc<tokio::sync::Mutex<MemBlobCore>>,
155155
}
@@ -201,7 +201,7 @@ impl Blob for MemBlob {
201201
}
202202

203203
/// An in-memory implementation of [Consensus].
204-
#[derive(Debug)]
204+
#[derive(Clone, Debug)]
205205
pub struct MemConsensus {
206206
// TODO: This was intended to be a tokio::sync::Mutex but that seems to
207207
// regularly deadlock in the `concurrency` test.

0 commit comments

Comments
 (0)