Skip to content

Commit 72eeacb

Browse files
committed
fdb maelstrom
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 08651ec commit 72eeacb

File tree

6 files changed

+53
-16
lines changed

6 files changed

+53
-16
lines changed

src/persist-client/src/internal/metrics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3012,6 +3012,10 @@ impl Consensus for MetricsConsensus {
30123012
.inc_by(u64::cast_from(deleted));
30133013
Ok(deleted)
30143014
}
3015+
3016+
fn truncate_counts(&self) -> bool {
3017+
self.consensus.truncate_counts()
3018+
}
30153019
}
30163020

30173021
/// A standard set of metrics for an async task. Call [TaskMetrics::instrument_task] to instrument

src/persist/src/cfg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ impl BlobConfig {
213213
/// Config for an implementation of [Consensus].
214214
#[derive(Debug, Clone)]
215215
pub enum ConsensusConfig {
216-
/// Config for [FdbConsensus].
216+
/// Config for FoundationDB.
217217
FoundationDB(FdbConsensusConfig),
218218
/// Config for [PostgresConsensus].
219219
Postgres(PostgresConsensusConfig),

src/persist/src/foundationdb.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
//!
1212
//! We're storing the consensus data in a subspace at `/mz/consensus`. Each key maps to a subspace
1313
//! with the following structure:
14-
//! ./seqno/<key> -> <seqno>
15-
//! ./data/<key>/<seqno> -> <data>
14+
//! * `./seqno/<key> -> <seqno>`
15+
//! * `./data/<key>/<seqno> -> <data>`
1616
1717
use std::io::Write;
1818
use std::sync::OnceLock;
@@ -304,24 +304,27 @@ impl FdbConsensus {
304304
trx.set(&data_seqno_key, &pack(&new.data.as_ref()));
305305
Ok(CaSResult::Committed)
306306
}
307+
307308
async fn scan_trx(
308309
&self,
309310
trx: &Transaction,
310311
data_key: &Subspace,
311312
from: &SeqNo,
312313
limit: &usize,
313-
) -> Result<Vec<VersionedData>, FdbTransactError> {
314+
entries: &mut Vec<VersionedData>,
315+
) -> Result<(), FdbTransactError> {
314316
let mut limit = *limit;
315317
let seqno_start = data_key.pack(&from);
316318
let seqno_end = data_key.pack(&SeqNo::maximum());
317319

318320
let mut range = RangeOption::from(seqno_start..=seqno_end);
319321
range.limit = Some(limit);
320322

321-
let mut entries = Vec::new();
323+
entries.clear();
322324

323325
loop {
324326
let output = trx.get_range(&range, 1, false).await?;
327+
entries.reserve(output.len());
325328
for key_value in &output {
326329
let seqno = data_key.unpack(key_value.key())?;
327330
let value: Vec<u8> = unpack(key_value.value())?;
@@ -342,9 +345,7 @@ impl FdbConsensus {
342345
break;
343346
}
344347
}
345-
346-
entries.sort_by_key(|e| e.seqno);
347-
Ok(entries)
348+
Ok(())
348349
}
349350
async fn truncate_trx(
350351
&self,
@@ -462,15 +463,19 @@ impl Consensus for FdbConsensus {
462463
limit: usize,
463464
) -> Result<Vec<VersionedData>, ExternalError> {
464465
let data_key = self.data.subspace(&key);
465-
let ok = self
466-
.db
466+
let mut entries = Vec::new();
467+
self.db
467468
.transact_boxed(
468-
(&data_key, from, limit),
469-
|trx, (data_key, from, limit)| self.scan_trx(trx, data_key, from, limit).boxed(),
469+
(&data_key, from, limit, &mut entries),
470+
|trx, (data_key, from, limit, entries)| {
471+
self.scan_trx(trx, data_key, from, limit, entries).boxed()
472+
},
470473
TransactOption::default(),
471474
)
472475
.await?;
473-
Ok(ok)
476+
477+
entries.sort_by_key(|e| e.seqno);
478+
Ok(entries)
474479
}
475480

476481
async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {

src/persist/src/location.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ pub trait Consensus: std::fmt::Debug + Send + Sync {
441441
/// data at this key.
442442
async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError>;
443443

444-
/// Returns true if [`truncate`] returns the number of versions deleted.
444+
/// Returns true if [`Self::truncate`] returns the number of versions deleted.
445445
fn truncate_counts(&self) -> bool {
446446
true
447447
}
@@ -508,6 +508,10 @@ impl<A: Consensus + 'static> Consensus for Tasked<A> {
508508
)
509509
.await?
510510
}
511+
512+
fn truncate_counts(&self) -> bool {
513+
self.0.truncate_counts()
514+
}
511515
}
512516

513517
/// Metadata about a particular blob stored by persist

src/persist/src/unreliable.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,10 @@ impl Consensus for UnreliableConsensus {
228228
.run_op("truncate", || self.consensus.truncate(key, seqno))
229229
.await
230230
}
231+
232+
fn truncate_counts(&self) -> bool {
233+
self.consensus.truncate_counts()
234+
}
231235
}
232236

233237
#[cfg(test)]

test/persist/mzcompose.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,29 @@
1313

1414
import argparse
1515

16+
from materialize import MZ_ROOT
1617
from materialize.mzcompose.composition import (
1718
Composition,
1819
WorkflowArgumentParser,
1920
)
2021
from materialize.mzcompose.service import Service
2122
from materialize.mzcompose.services.cockroach import Cockroach
23+
from materialize.mzcompose.services.foundationdb import FoundationDB
2224
from materialize.mzcompose.services.postgres import PostgresMetadata
2325

2426
SERVICES = [
2527
Cockroach(setup_materialize=True, in_memory=True),
2628
PostgresMetadata(),
29+
FoundationDB(),
2730
Service(
2831
"maelstrom-persist",
29-
{"mzbuild": "maelstrom-persist", "volumes": ["./maelstrom:/store"]},
32+
{
33+
"mzbuild": "maelstrom-persist",
34+
"volumes": [
35+
"./maelstrom:/store",
36+
f"{MZ_ROOT}/misc/foundationdb/fdb.cluster:/etc/foundationdb/fdb.cluster",
37+
],
38+
},
3039
),
3140
]
3241

@@ -50,7 +59,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
5059
parser.add_argument(
5160
"--consensus",
5261
type=str,
53-
choices=["mem", "cockroach", "maelstrom", "postgres"],
62+
choices=["mem", "cockroach", "maelstrom", "postgres", "foundationdb"],
5463
default="maelstrom",
5564
)
5665
parser.add_argument(
@@ -76,6 +85,17 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
7685
"postgres://root@postgres-metadata:26257?options=--search_path=consensus"
7786
)
7887
c.up("postgres-metadata")
88+
elif args.consensus == "foundationdb":
89+
consensus_uri = "foundationdb:"
90+
c.up("foundationdb")
91+
c.run(
92+
"foundationdb",
93+
"-C",
94+
"/etc/foundationdb/fdb.cluster",
95+
"--exec",
96+
"configure new single memory",
97+
entrypoint="fdbcli",
98+
)
7999
else:
80100
# empty consensus uri defaults to Maelstrom consensus implementation
81101
consensus_uri = ""

0 commit comments

Comments
 (0)