diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 1de006de74..e13165b1a0 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -58,7 +58,7 @@ use crate::io::{FileIO, FileMetadata, FileRead}; use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; -use crate::utils::available_parallelism; +use crate::util::available_parallelism; use crate::{Error, ErrorKind}; /// Default gap between byte ranges below which they are coalesced into a diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 8b345deb6e..3515cbdd6f 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -93,9 +93,10 @@ mod runtime; pub mod arrow; pub(crate) mod delete_file_index; pub mod test_utils; -mod utils; pub mod writer; mod delete_vector; pub mod metadata_columns; pub mod puffin; +/// Utility functions and modules. +pub mod util; diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e52b3bdeae..4a1e27bdc1 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -40,7 +40,7 @@ use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name}; use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; -use crate::utils::available_parallelism; +use crate::util::available_parallelism; use crate::{Error, ErrorKind, Result}; /// A stream of arrow [`RecordBatch`]es. @@ -683,6 +683,39 @@ pub mod tests { } } + /// Creates a fixture with 5 snapshots chained as: + /// S1 (root) -> S2 -> S3 -> S4 -> S5 (current) + /// Useful for testing snapshot history traversal. + pub fn new_with_deep_history() -> Self { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("table1"); + let table_metadata1_location = table_location.join("metadata/v1.json"); + + let file_io = FileIO::new_with_fs(); + + let table_metadata = { + let json_str = fs::read_to_string(format!( + "{}/testdata/example_table_metadata_v2_deep_history.json", + env!("CARGO_MANIFEST_DIR") + )) + .unwrap(); + serde_json::from_str::(&json_str).unwrap() + }; + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) + .file_io(file_io.clone()) + .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .build() + .unwrap(); + + Self { + table_location: table_location.to_str().unwrap().to_string(), + table, + } + } + pub fn new_unpartitioned() -> Self { let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().join("table1"); diff --git a/crates/iceberg/src/utils.rs b/crates/iceberg/src/util/mod.rs similarity index 96% rename from crates/iceberg/src/utils.rs rename to crates/iceberg/src/util/mod.rs index 00d3e69bd3..28eda66d49 100644 --- a/crates/iceberg/src/utils.rs +++ b/crates/iceberg/src/util/mod.rs @@ -17,6 +17,9 @@ use std::num::NonZeroUsize; +/// Utilities for working with snapshots. +pub mod snapshot; + // Use a default value of 1 as the safest option. // See https://doc.rust-lang.org/std/thread/fn.available_parallelism.html#limitations // for more details. diff --git a/crates/iceberg/src/util/snapshot.rs b/crates/iceberg/src/util/snapshot.rs new file mode 100644 index 0000000000..98997ae815 --- /dev/null +++ b/crates/iceberg/src/util/snapshot.rs @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::{SnapshotRef, TableMetadataRef}; + +struct Ancestors { + next: Option, + get_snapshot: Box Option + Send>, +} + +impl Iterator for Ancestors { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let snapshot = self.next.take()?; + self.next = snapshot + .parent_snapshot_id() + .and_then(|id| (self.get_snapshot)(id)); + Some(snapshot) + } +} + +/// Iterate starting from `snapshot_id` (inclusive) to the root snapshot. +pub fn ancestors_of( + table_metadata: &TableMetadataRef, + snapshot_id: i64, +) -> impl Iterator + Send { + let initial = table_metadata.snapshot_by_id(snapshot_id).cloned(); + let table_metadata = table_metadata.clone(); + Ancestors { + next: initial, + get_snapshot: Box::new(move |id| table_metadata.snapshot_by_id(id).cloned()), + } +} + +/// Iterate starting from `latest_snapshot_id` (inclusive) to `oldest_snapshot_id` (exclusive). +pub fn ancestors_between( + table_metadata: &TableMetadataRef, + latest_snapshot_id: i64, + oldest_snapshot_id: Option, +) -> impl Iterator + Send { + ancestors_of(table_metadata, latest_snapshot_id).take_while(move |snapshot| { + oldest_snapshot_id + .map(|id| snapshot.snapshot_id() != id) + .unwrap_or(true) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::scan::tests::TableTestFixture; + + // Five snapshots chained as: S1 (root) -> S2 -> S3 -> S4 -> S5 (current) + const S1: i64 = 3051729675574597004; + const S2: i64 = 3055729675574597004; + const S3: i64 = 3056729675574597004; + const S4: i64 = 3057729675574597004; + const S5: i64 = 3059729675574597004; + + fn metadata() -> TableMetadataRef { + let fixture = TableTestFixture::new_with_deep_history(); + std::sync::Arc::new(fixture.table.metadata().clone()) + } + + // --- ancestors_of --- + + #[test] + fn test_ancestors_of_nonexistent_snapshot_returns_empty() { + let meta = metadata(); + let ids: Vec = ancestors_of(&meta, 999).map(|s| s.snapshot_id()).collect(); + assert!(ids.is_empty()); + } + + #[test] + fn test_ancestors_of_root_returns_only_root() { + let meta = metadata(); + let ids: Vec = ancestors_of(&meta, S1).map(|s| s.snapshot_id()).collect(); + assert_eq!(ids, vec![S1]); + } + + #[test] + fn test_ancestors_of_leaf_returns_full_chain() { + let meta = metadata(); + let ids: Vec = ancestors_of(&meta, S5).map(|s| s.snapshot_id()).collect(); + assert_eq!(ids, vec![S5, S4, S3, S2, S1]); + } + + #[test] + fn test_ancestors_of_mid_chain_returns_partial_chain() { + let meta = metadata(); + let ids: Vec = ancestors_of(&meta, S3).map(|s| s.snapshot_id()).collect(); + assert_eq!(ids, vec![S3, S2, S1]); + } + + #[test] + fn test_ancestors_of_second_snapshot() { + let meta = metadata(); + let ids: Vec = ancestors_of(&meta, S2).map(|s| s.snapshot_id()).collect(); + assert_eq!(ids, vec![S2, S1]); + } + + // --- ancestors_between --- + + #[test] + fn test_ancestors_between_same_id_returns_empty() { + let meta = metadata(); + let ids: Vec = ancestors_between(&meta, S3, Some(S3)) + .map(|s| s.snapshot_id()) + .collect(); + assert!(ids.is_empty()); + } + + #[test] + fn test_ancestors_between_no_oldest_returns_all_ancestors() { + let meta = metadata(); + let ids: Vec = ancestors_between(&meta, S5, None) + .map(|s| s.snapshot_id()) + .collect(); + assert_eq!(ids, vec![S5, S4, S3, S2, S1]); + } + + #[test] + fn test_ancestors_between_excludes_oldest_snapshot() { + let meta = metadata(); + // S5 down to (but not including) S2 + let ids: Vec = ancestors_between(&meta, S5, Some(S2)) + .map(|s| s.snapshot_id()) + .collect(); + assert_eq!(ids, vec![S5, S4, S3]); + } + + #[test] + fn test_ancestors_between_adjacent_snapshots() { + let meta = metadata(); + // S3 down to (but not including) S2 — only S3 itself + let ids: Vec = ancestors_between(&meta, S3, Some(S2)) + .map(|s| s.snapshot_id()) + .collect(); + assert_eq!(ids, vec![S3]); + } + + #[test] + fn test_ancestors_between_leaf_and_root() { + let meta = metadata(); + // S5 down to (but not including) S1 + let ids: Vec = ancestors_between(&meta, S5, Some(S1)) + .map(|s| s.snapshot_id()) + .collect(); + assert_eq!(ids, vec![S5, S4, S3, S2]); + } + + #[test] + fn test_ancestors_between_nonexistent_oldest_returns_full_chain() { + let meta = metadata(); + // oldest_snapshot_id doesn't exist in the chain, so take_while never stops + let ids: Vec = ancestors_between(&meta, S5, Some(999)) + .map(|s| s.snapshot_id()) + .collect(); + assert_eq!(ids, vec![S5, S4, S3, S2, S1]); + } + + #[test] + fn test_ancestors_between_nonexistent_latest_returns_empty() { + let meta = metadata(); + let ids: Vec = ancestors_between(&meta, 999, Some(S1)) + .map(|s| s.snapshot_id()) + .collect(); + assert!(ids.is_empty()); + } +} diff --git a/crates/iceberg/testdata/example_table_metadata_v2_deep_history.json b/crates/iceberg/testdata/example_table_metadata_v2_deep_history.json new file mode 100644 index 0000000000..a354958697 --- /dev/null +++ b/crates/iceberg/testdata/example_table_metadata_v2_deep_history.json @@ -0,0 +1,104 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + {"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000} + ] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + {"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"}, + {"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"} + ] + } + ], + "properties": {}, + "current-snapshot-id": 3059729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": {"operation": "append"}, + "manifest-list": "s3://bucket/metadata/snap-3051729675574597004.avro" + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": {"operation": "append"}, + "manifest-list": "s3://bucket/metadata/snap-3055729675574597004.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3056729675574597004, + "parent-snapshot-id": 3055729675574597004, + "timestamp-ms": 1575100955770, + "sequence-number": 2, + "summary": {"operation": "append"}, + "manifest-list": "s3://bucket/metadata/snap-3056729675574597004.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3057729675574597004, + "parent-snapshot-id": 3056729675574597004, + "timestamp-ms": 1595100955770, + "sequence-number": 3, + "summary": {"operation": "overwrite"}, + "manifest-list": "s3://bucket/metadata/snap-3057729675574597004.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3059729675574597004, + "parent-snapshot-id": 3057729675574597004, + "timestamp-ms": 1602638573590, + "sequence-number": 4, + "summary": {"operation": "append"}, + "manifest-list": "s3://bucket/metadata/snap-3059729675574597004.avro", + "schema-id": 1 + } + ], + "snapshot-log": [ + {"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770}, + {"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770}, + {"snapshot-id": 3056729675574597004, "timestamp-ms": 1575100955770}, + {"snapshot-id": 3057729675574597004, "timestamp-ms": 1595100955770}, + {"snapshot-id": 3059729675574597004, "timestamp-ms": 1602638573590} + ], + "metadata-log": [], + "refs": {"main": {"snapshot-id": 3059729675574597004, "type": "branch"}} +}