Skip to content

Commit 7a8fa3c

Browse files
committed
Revert "Merge pull request MaterializeInc#33902 from teskje/persist-defer-register-schema"
This reverts commit d2b1b58, reversing changes made to dd3402f.
1 parent b432b70 commit 7a8fa3c

File tree

9 files changed

+113
-152
lines changed

9 files changed

+113
-152
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
231231
Arc::clone(&pubsub_sender),
232232
));
233233

234+
// We need a PersistClient to open a write handle so we can append an empty batch.
234235
let persist_client = PersistClient::new(
235236
cfg,
236237
blob,
@@ -258,7 +259,21 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
258259
diagnostics,
259260
)
260261
.await?;
261-
write_handle.advance_upper(&Antichain::new()).await;
262+
263+
if !write_handle.upper().is_empty() {
264+
let empty_batch: Vec<(
265+
(crate::cli::inspect::K, crate::cli::inspect::V),
266+
u64,
267+
i64,
268+
)> = vec![];
269+
let lower = write_handle.upper().clone();
270+
let upper = Antichain::new();
271+
272+
let result = write_handle.append(empty_batch, lower, upper).await?;
273+
if let Err(err) = result {
274+
anyhow::bail!("failed to force downgrade upper, {err:?}");
275+
}
276+
}
262277
}
263278

264279
if force_downgrade_since {

src/persist-client/src/internal/apply.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -254,22 +254,6 @@ where
254254
})
255255
}
256256

257-
/// Returns the ID of the given schema, if known at the current state.
258-
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
259-
self.state
260-
.read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
261-
// The common case is that the requested schema is a recent one, so as a minor
262-
// optimization, do this search in reverse order.
263-
let mut schemas = state.collections.schemas.iter().rev();
264-
schemas
265-
.find(|(_, x)| {
266-
K::decode_schema(&x.key) == *key_schema
267-
&& V::decode_schema(&x.val) == *val_schema
268-
})
269-
.map(|(id, _)| *id)
270-
})
271-
}
272-
273257
/// Returns whether the current's state `since` and `upper` are both empty.
274258
///
275259
/// Due to sharing state with other handles, successive reads to this fn or any other may

src/persist-client/src/internal/encoding.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ use crate::{PersistConfig, ShardId, WriterId, cfg};
6565
/// A key and value `Schema` of data written to a batch or shard.
6666
#[derive(Debug)]
6767
pub struct Schemas<K: Codec, V: Codec> {
68+
// TODO: Remove the Option once this finishes rolling out and all shards
69+
// have a registered schema.
6870
/// Id under which this schema is registered in the shard's schema registry,
6971
/// if any.
7072
pub id: Option<SchemaId>,

src/persist-client/src/internal/machine.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -700,11 +700,6 @@ where
700700
self.applier.latest_schema()
701701
}
702702

703-
/// Returns the ID of the given schema, if known at the current state.
704-
pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
705-
self.applier.find_schema(key_schema, val_schema)
706-
}
707-
708703
/// See [crate::PersistClient::compare_and_evolve_schema].
709704
///
710705
/// TODO: Unify this with [Self::register_schema]?

src/persist-client/src/lib.rs

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use differential_dataflow::lattice::Lattice;
2424
use itertools::Itertools;
2525
use mz_build_info::{BuildInfo, build_info};
2626
use mz_dyncfg::ConfigSet;
27-
use mz_ore::instrument;
27+
use mz_ore::{instrument, soft_assert_or_log};
2828
use mz_persist::location::{Blob, Consensus, ExternalError};
2929
use mz_persist_types::schema::SchemaId;
3030
use mz_persist_types::{Codec, Codec64, Opaque};
@@ -490,6 +490,10 @@ impl PersistClient {
490490
///
491491
/// Use this to save latency and a bit of persist traffic if you're just
492492
/// going to immediately drop or expire the [ReadHandle].
493+
///
494+
/// The `_schema` parameter is currently unused, but should be an object
495+
/// that represents the schema of the data in the shard. This will be required
496+
/// in the future.
493497
#[instrument(level = "debug", fields(shard = %shard_id))]
494498
pub async fn open_writer<K, V, T, D>(
495499
&self,
@@ -507,11 +511,23 @@ impl PersistClient {
507511
let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
508512
let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
509513

510-
// We defer registering the schema until write time, to allow opening
511-
// write handles in a "read-only" mode where they don't implicitly
512-
// modify persist state. But it might already be registered, in which
513-
// case we can fetch its ID.
514-
let schema_id = machine.find_schema(&*key_schema, &*val_schema);
514+
// TODO: Because schemas are ordered, as part of the persist schema
515+
// changes work, we probably want to build some way to allow persist
516+
// users to control the order. For example, maybe a
517+
// `PersistClient::compare_and_append_schema(current_schema_id,
518+
// next_schema)`. Presumably this would then be passed in to open_writer
519+
// instead of us implicitly registering it here.
520+
// NB: The overwhelming common case is that this schema is already
521+
// registered. In this case, the cmd breaks early and nothing is
522+
// written to (or read from) CRDB.
523+
let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
524+
maintenance.start_performing(&machine, &gc);
525+
soft_assert_or_log!(
526+
schema_id.is_some(),
527+
"unable to register schemas {:?} {:?}",
528+
key_schema,
529+
val_schema,
530+
);
515531

516532
let writer_id = WriterId::new();
517533
let schemas = Schemas {
@@ -1976,6 +1992,7 @@ mod tests {
19761992
#[mz_persist_proc::test(tokio::test)]
19771993
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
19781994
async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
1995+
const EMPTY: &[(((), ()), u64, i64)] = &[];
19791996
let persist_client = new_test_client(&dyncfgs).await;
19801997

19811998
let shard_id = ShardId::new();
@@ -1989,7 +2006,11 @@ mod tests {
19892006
// Advance since and upper to empty, which is a pre-requisite for
19902007
// finalization/tombstoning.
19912008
let () = read.downgrade_since(&Antichain::new()).await;
1992-
let () = write.advance_upper(&Antichain::new()).await;
2009+
let () = write
2010+
.compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
2011+
.await
2012+
.expect("usage should be valid")
2013+
.expect("upper should match");
19932014

19942015
let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
19952016
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
@@ -2026,6 +2047,7 @@ mod tests {
20262047
#[mz_persist_proc::test(tokio::test)]
20272048
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
20282049
async fn finalize_shard(dyncfgs: ConfigUpdates) {
2050+
const EMPTY: &[(((), ()), u64, i64)] = &[];
20292051
const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
20302052
let persist_client = new_test_client(&dyncfgs).await;
20312053

@@ -2047,7 +2069,11 @@ mod tests {
20472069
// Advance since and upper to empty, which is a pre-requisite for
20482070
// finalization/tombstoning.
20492071
let () = read.downgrade_since(&Antichain::new()).await;
2050-
let () = write.advance_upper(&Antichain::new()).await;
2072+
let () = write
2073+
.compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
2074+
.await
2075+
.expect("usage should be valid")
2076+
.expect("upper should match");
20512077

20522078
let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
20532079
.open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())

src/persist-client/src/schema.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ mod tests {
605605
let schema0 = StringsSchema(vec![false]);
606606
let schema1 = StringsSchema(vec![false, true]);
607607

608-
let mut write0 = client
608+
let write0 = client
609609
.open_writer::<Strings, (), u64, i64>(
610610
shard_id,
611611
Arc::new(schema0.clone()),
@@ -614,8 +614,6 @@ mod tests {
614614
)
615615
.await
616616
.unwrap();
617-
618-
write0.ensure_schema_registered().await;
619617
assert_eq!(write0.write_schemas.id.unwrap(), SchemaId(0));
620618

621619
// Not backward compatible (yet... we don't support dropping a column at

src/persist-client/src/write.rs

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -221,31 +221,6 @@ where
221221
self.write_schemas.id
222222
}
223223

224-
/// Registers the write schema, if it isn't already registered.
225-
///
226-
/// # Panics
227-
///
228-
/// This method expects that either the shard doesn't yet have any schema registered, or one of
229-
/// the registered schemas is the same as the write schema. If all registered schemas are
230-
/// different from the write schema, it panics.
231-
pub async fn ensure_schema_registered(&mut self) -> SchemaId {
232-
let Schemas { id, key, val } = &self.write_schemas;
233-
234-
if let Some(id) = id {
235-
return *id;
236-
}
237-
238-
let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
239-
maintenance.start_performing(&self.machine, &self.gc);
240-
241-
let Some(schema_id) = schema_id else {
242-
panic!("unable to register schemas: {key:?} {val:?}");
243-
};
244-
245-
self.write_schemas.id = Some(schema_id);
246-
schema_id
247-
}
248-
249224
/// A cached version of the shard-global `upper` frontier.
250225
///
251226
/// This is the most recent upper discovered by this handle. It is
@@ -281,50 +256,6 @@ where
281256
&self.upper
282257
}
283258

284-
/// Advance the shard's upper by the given frontier.
285-
///
286-
/// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
287-
///
288-
/// In contrast to the various compare-and-append methods, this method does not require the
289-
/// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
290-
/// schema when creating a writer just to advance a shard upper.
291-
pub async fn advance_upper(&mut self, target: &Antichain<T>) {
292-
// We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
293-
// already beyond the target.
294-
let mut lower = self.shared_upper().clone();
295-
296-
while !PartialOrder::less_equal(target, &lower) {
297-
let since = Antichain::from_elem(T::minimum());
298-
let desc = Description::new(lower.clone(), target.clone(), since);
299-
let batch = HollowBatch::empty(desc);
300-
301-
let heartbeat_timestamp = (self.cfg.now)();
302-
let res = self
303-
.machine
304-
.compare_and_append(
305-
&batch,
306-
&self.writer_id,
307-
&self.debug_state,
308-
heartbeat_timestamp,
309-
)
310-
.await;
311-
312-
use CompareAndAppendRes::*;
313-
let new_upper = match res {
314-
Success(_seq_no, maintenance) => {
315-
maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
316-
batch.desc.upper().clone()
317-
}
318-
UpperMismatch(_seq_no, actual_upper) => actual_upper,
319-
InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
320-
InlineBackpressure => unreachable!("batch was empty"),
321-
};
322-
323-
self.upper.clone_from(&new_upper);
324-
lower = new_upper;
325-
}
326-
}
327-
328259
/// Applies `updates` to this shard and downgrades this handle's upper to
329260
/// `upper`.
330261
///
@@ -576,9 +507,6 @@ where
576507
}
577508
}
578509

579-
// Before we append any data, we require a registered write schema.
580-
self.ensure_schema_registered().await;
581-
582510
let lower = expected_upper.clone();
583511
let upper = new_upper;
584512
let since = Antichain::from_elem(T::minimum());

src/storage-client/src/storage_collections.rs

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3248,23 +3248,40 @@ async fn finalize_shards_task<T>(
32483248
Some(shard_id)
32493249
} else {
32503250
debug!(%shard_id, "finalizing shard");
3251-
let finalize = || async move {
3251+
let finalize = || async move {
32523252
// TODO: thread the global ID into the shard finalization WAL
32533253
let diagnostics = Diagnostics::from_purpose("finalizing shards");
32543254

3255-
// We only use the writer to advance the upper, so using a dummy schema is
3256-
// fine.
3255+
let schemas = persist_client.latest_schema::<SourceData, (), T, StorageDiff>(shard_id, diagnostics.clone()).await.expect("codecs have not changed");
3256+
let (key_schema, val_schema) = match schemas {
3257+
Some((_, key_schema, val_schema)) => (key_schema, val_schema),
3258+
None => (RelationDesc::empty(), UnitSchema),
3259+
};
3260+
3261+
let empty_batch: Vec<((SourceData, ()), T, StorageDiff)> = vec![];
32573262
let mut write_handle: WriteHandle<SourceData, (), T, StorageDiff> =
32583263
persist_client
32593264
.open_writer(
32603265
shard_id,
3261-
Arc::new(RelationDesc::empty()),
3262-
Arc::new(UnitSchema),
3266+
Arc::new(key_schema),
3267+
Arc::new(val_schema),
32633268
diagnostics,
32643269
)
32653270
.await
32663271
.expect("invalid persist usage");
3267-
write_handle.advance_upper(&Antichain::new()).await;
3272+
3273+
let upper = write_handle.upper();
3274+
3275+
if !upper.is_empty() {
3276+
let append = write_handle
3277+
.append(empty_batch, upper.clone(), Antichain::new())
3278+
.await?;
3279+
3280+
if let Err(e) = append {
3281+
warn!(%shard_id, "tried to finalize a shard with an advancing upper: {e:?}");
3282+
return Ok(());
3283+
}
3284+
}
32683285
write_handle.expire().await;
32693286

32703287
if force_downgrade_since {
@@ -3300,7 +3317,9 @@ async fn finalize_shards_task<T>(
33003317
.compare_and_downgrade_since(&epoch, (&epoch, &new_since))
33013318
.await;
33023319
if let Err(e) = downgrade {
3303-
warn!("tried to finalize a shard with an advancing epoch: {e:?}");
3320+
warn!(
3321+
"tried to finalize a shard with an advancing epoch: {e:?}"
3322+
);
33043323
return Ok(());
33053324
}
33063325
// Not available now, so finalization is broken.

0 commit comments

Comments
 (0)