From 63c9a8d18cf29ed9918ad5f12a6885072084a363 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Wed, 18 Mar 2026 16:21:48 +0000 Subject: [PATCH 1/2] fix: preserve custom snapshot metadata after compaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary Custom/non-standard snapshot metadata properties (e.g. pipeline-id, bobsled.source-table) were silently dropped after compaction Read previous snapshot's non-standard properties and pass them through via set_snapshot_properties() on both RewriteFilesAction and OverwriteFilesAction Standard Iceberg summary keys (total-records, added-data-files, etc.) are still correctly recalculated by iceberg-rust — only custom keys are preserved Changes Added custom_snapshot_properties() helper and KNOWN_SNAPSHOT_SUMMARY_KEYS constant Modified CommitManager::rewrite_files() and CommitManager::overwrite_files() to inject custom properties Added end-to-end test and unit test for the filtering logic --- core/src/compaction/mod.rs | 244 ++++++++++++++++++++++++++++++++++--- 1 file changed, 227 insertions(+), 17 deletions(-) diff --git a/core/src/compaction/mod.rs b/core/src/compaction/mod.rs index 020eaff..6cd8396 100644 --- a/core/src/compaction/mod.rs +++ b/core/src/compaction/mod.rs @@ -15,6 +15,7 @@ */ use std::borrow::Cow; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -974,13 +975,16 @@ impl CommitManager { let rewrite_action = if use_starting_sequence_number { // TODO: avoid retry if the snapshot_id is not found if let Some(snapshot) = table.metadata().snapshot_by_id(starting_snapshot_id) { - txn.rewrite_files() + let mut action = txn + .rewrite_files() .set_enable_delete_filter_manager(true) .add_data_files(data_files) .delete_files(delete_files) .set_target_branch(to_branch.to_owned()) .set_new_data_file_sequence_number(snapshot.sequence_number()) - .set_check_file_existence(true) + .set_check_file_existence(true); + action.set_snapshot_properties(custom_snapshot_properties(snapshot)); + action } else { return Err(iceberg::Error::new( ErrorKind::Unexpected, @@ -990,12 +994,17 @@ impl CommitManager { )); } } else { - txn.rewrite_files() + let mut action = txn + .rewrite_files() .set_enable_delete_filter_manager(true) .add_data_files(data_files) .delete_files(delete_files) .set_target_branch(to_branch.to_owned()) - .set_check_file_existence(true) + .set_check_file_existence(true); + if let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) { + action.set_snapshot_properties(custom_snapshot_properties(snapshot)); + } + action }; let txn = rewrite_action.apply(txn)?; @@ -1084,12 +1093,15 @@ impl CommitManager { let overwrite_action = if use_starting_sequence_number { // TODO: avoid retry if the snapshot_id is not found if let Some(snapshot) = table.metadata().snapshot_by_id(starting_snapshot_id) { - txn.overwrite_files() + let mut action = txn + .overwrite_files() .add_data_files(data_files) .delete_files(delete_files) .set_target_branch(to_branch.to_owned()) .set_new_data_file_sequence_number(snapshot.sequence_number()) - .set_check_file_existence(true) + .set_check_file_existence(true); + action.set_snapshot_properties(custom_snapshot_properties(snapshot)); + action } else { return Err(iceberg::Error::new( ErrorKind::Unexpected, @@ -1099,11 +1111,16 @@ impl CommitManager { )); } } else { - txn.overwrite_files() + let mut action = txn + .overwrite_files() .add_data_files(data_files) .delete_files(delete_files) .set_target_branch(to_branch.to_owned()) - .set_check_file_existence(true) + .set_check_file_existence(true); + if let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) { + action.set_snapshot_properties(custom_snapshot_properties(snapshot)); + } + action }; let txn = overwrite_action.apply(txn)?; @@ -1149,6 +1166,52 @@ impl CommitManager { } } +/// Known Iceberg snapshot summary keys managed by `SnapshotSummaryCollector` +/// and `update_snapshot_summaries`. +/// +/// These keys are auto-computed by iceberg-rust during snapshot production and must +/// NOT be copied from the previous snapshot — doing so would overwrite the correctly +/// recalculated values. Only properties whose keys are *not* in this list (and don't +/// start with `"partitions."`) are considered custom metadata that must be preserved. +const KNOWN_SNAPSHOT_SUMMARY_KEYS: &[&str] = &[ + "added-data-files", + "added-delete-files", + "added-equality-delete-files", + "added-position-delete-files", + "added-files-size", + "added-records", + "added-equality-deletes", + "added-position-deletes", + "deleted-data-files", + "removed-delete-files", + "removed-equality-delete-files", + "removed-position-delete-files", + "removed-files-size", + "deleted-records", + "removed-equality-deletes", + "removed-position-deletes", + "total-data-files", + "total-delete-files", + "total-files-size", + "total-records", + "total-equality-deletes", + "total-position-deletes", + "changed-partition-count", +]; + +/// Extracts non-standard (custom) properties from a snapshot's summary. +fn custom_snapshot_properties(snapshot: &Snapshot) -> HashMap { + snapshot + .summary() + .additional_properties + .iter() + .filter(|(k, _)| { + !KNOWN_SNAPSHOT_SUMMARY_KEYS.contains(&k.as_str()) && !k.starts_with("partitions.") + }) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() +} + /// Compaction plan describing files to rewrite and target commit location. #[derive(Debug, Clone)] pub struct CompactionPlan { @@ -1452,11 +1515,14 @@ mod tests { // Convert iceberg schema to arrow schema to ensure field ID consistency let arrow_schema = schema_to_arrow_schema(iceberg_schema).unwrap(); - RecordBatch::try_new(Arc::new(arrow_schema), vec![ - Arc::new(id_array), - Arc::new(name_array), - Arc::new(pos_array), - ]) + RecordBatch::try_new( + Arc::new(arrow_schema), + vec![ + Arc::new(id_array), + Arc::new(name_array), + Arc::new(pos_array), + ], + ) .unwrap() } @@ -1467,10 +1533,10 @@ mod tests { // Convert iceberg schema to arrow schema to ensure field ID consistency let arrow_schema = schema_to_arrow_schema(iceberg_schema).unwrap(); - RecordBatch::try_new(Arc::new(arrow_schema), vec![ - Arc::new(id_array), - Arc::new(name_array), - ]) + RecordBatch::try_new( + Arc::new(arrow_schema), + vec![Arc::new(id_array), Arc::new(name_array)], + ) .unwrap() } @@ -2382,4 +2448,148 @@ mod tests { "plan_compaction() without config should fail" ); } + + /// Appends data files with custom snapshot properties set on the commit. + async fn append_and_commit_with_properties( + table: &Table, + catalog: &C, + data_files: Vec, + properties: HashMap, + ) -> Table { + let transaction = Transaction::new(table); + let append_action = transaction + .fast_append() + .add_data_files(data_files) + .set_snapshot_properties(properties); + let tx = append_action.apply(transaction).unwrap(); + tx.commit(catalog).await.unwrap() + } + + /// Custom snapshot metadata from the previous snapshot should get through compaction + #[tokio::test] + async fn test_custom_snapshot_metadata_preserved_after_compaction() { + let env = create_test_env().await; + + let data_files = write_simple_files(&env.table, &env.warehouse_location, "test", 3).await; + + // Append with custom properties that simulate external system metadata + let mut custom_props = HashMap::new(); + custom_props.insert("pipeline-id".to_owned(), "pipe-42".to_owned()); + custom_props.insert("bobsled.source-table".to_owned(), "events_raw".to_owned()); + custom_props.insert("custom.watermark-ms".to_owned(), "1700000000000".to_owned()); + + let updated_table = append_and_commit_with_properties( + &env.table, + env.catalog.as_ref(), + data_files, + custom_props.clone(), + ) + .await; + + // Verify custom properties exist on the pre-compaction snapshot + let snapshot_before = updated_table + .metadata() + .snapshot_for_ref(MAIN_BRANCH) + .unwrap(); + let summary_before = &snapshot_before.summary().additional_properties; + for (key, value) in &custom_props { + assert_eq!( + summary_before.get(key).unwrap(), + value, + "Custom property '{key}' should be present before compaction" + ); + } + + // Also record the total-records value before compaction + let total_records_before = summary_before.get("total-records").cloned(); + + // Run compaction + let compaction = create_default_compaction(env.catalog.clone(), env.table_ident.clone()); + let result = compaction.compact().await.unwrap().unwrap(); + let final_table = result.table.unwrap(); + + let snapshot_after = final_table + .metadata() + .snapshot_for_ref(MAIN_BRANCH) + .unwrap(); + let summary_after = &snapshot_after.summary().additional_properties; + + // Custom properties must be preserved + for (key, value) in &custom_props { + assert_eq!( + summary_after.get(key).unwrap(), + value, + "Custom property '{key}' must survive compaction" + ); + } + + // total-records must still be correct (recalculated, not blindly copied) + assert_eq!( + summary_after.get("total-records"), + total_records_before.as_ref(), + "total-records must be preserved (no data was added or removed)" + ); + + // Snapshot operation should be "replace" for compaction + assert_eq!( + snapshot_after.summary().operation, + iceberg::spec::Operation::Replace, + "Compaction snapshot operation should be 'replace'" + ); + } + + /// Verifies that `custom_snapshot_properties` correctly filters known keys. + #[test] + fn test_custom_snapshot_properties_filters_known_keys() { + use super::{KNOWN_SNAPSHOT_SUMMARY_KEYS, custom_snapshot_properties}; + use iceberg::spec::{Operation, Snapshot, Summary}; + + let mut all_properties = HashMap::new(); + + // Add all known keys + for key in KNOWN_SNAPSHOT_SUMMARY_KEYS { + all_properties.insert(key.to_string(), "100".to_owned()); + } + + all_properties.insert( + "partitions.date=2024-01-01".to_owned(), + "added-data-files=1".to_owned(), + ); + + all_properties.insert("pipeline-id".to_owned(), "pipe-42".to_owned()); + all_properties.insert("bobsled.source-table".to_owned(), "events_raw".to_owned()); + + let summary = Summary { + operation: Operation::Append, + additional_properties: all_properties, + }; + + let snapshot = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(1000) + .with_sequence_number(1) + .with_schema_id(0) + .with_manifest_list("manifest-list.avro") + .with_summary(summary) + .build(); + + let custom = custom_snapshot_properties(&snapshot); + + // Only custom keys should remain + assert_eq!(custom.len(), 2); + assert_eq!(custom.get("pipeline-id").unwrap(), "pipe-42"); + assert_eq!(custom.get("bobsled.source-table").unwrap(), "events_raw"); + + // Known keys must NOT be present + for key in KNOWN_SNAPSHOT_SUMMARY_KEYS { + assert!( + !custom.contains_key(*key), + "Known key '{key}' must be filtered out" + ); + } + assert!( + !custom.contains_key("partitions.date=2024-01-01"), + "Partition keys must be filtered out" + ); + } } From cde15879e747bbe1274e659ccb1069ba0d3fe095 Mon Sep 17 00:00:00 2001 From: volodymyr Date: Wed, 18 Mar 2026 16:30:14 +0000 Subject: [PATCH 2/2] fix: preserve custom snapshot metadata after compaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary Custom/non-standard snapshot metadata properties (e.g. pipeline-id, bobsled.source-table) were silently dropped after compaction Read previous snapshot's non-standard properties and pass them through via set_snapshot_properties() on both RewriteFilesAction and OverwriteFilesAction Standard Iceberg summary keys (total-records, added-data-files, etc.) are still correctly recalculated by iceberg-rust — only custom keys are preserved Changes Added custom_snapshot_properties() helper and KNOWN_SNAPSHOT_SUMMARY_KEYS constant Modified CommitManager::rewrite_files() and CommitManager::overwrite_files() to inject custom properties Added end-to-end test and unit test for the filtering logic --- core/src/compaction/mod.rs | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/core/src/compaction/mod.rs b/core/src/compaction/mod.rs index 6cd8396..1348e88 100644 --- a/core/src/compaction/mod.rs +++ b/core/src/compaction/mod.rs @@ -1515,14 +1515,11 @@ mod tests { // Convert iceberg schema to arrow schema to ensure field ID consistency let arrow_schema = schema_to_arrow_schema(iceberg_schema).unwrap(); - RecordBatch::try_new( - Arc::new(arrow_schema), - vec![ - Arc::new(id_array), - Arc::new(name_array), - Arc::new(pos_array), - ], - ) + RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(id_array), + Arc::new(name_array), + Arc::new(pos_array), + ]) .unwrap() } @@ -1533,10 +1530,10 @@ mod tests { // Convert iceberg schema to arrow schema to ensure field ID consistency let arrow_schema = schema_to_arrow_schema(iceberg_schema).unwrap(); - RecordBatch::try_new( - Arc::new(arrow_schema), - vec![Arc::new(id_array), Arc::new(name_array)], - ) + RecordBatch::try_new(Arc::new(arrow_schema), vec![ + Arc::new(id_array), + Arc::new(name_array), + ]) .unwrap() } @@ -2541,9 +2538,10 @@ mod tests { /// Verifies that `custom_snapshot_properties` correctly filters known keys. #[test] fn test_custom_snapshot_properties_filters_known_keys() { - use super::{KNOWN_SNAPSHOT_SUMMARY_KEYS, custom_snapshot_properties}; use iceberg::spec::{Operation, Snapshot, Summary}; + use super::{KNOWN_SNAPSHOT_SUMMARY_KEYS, custom_snapshot_properties}; + let mut all_properties = HashMap::new(); // Add all known keys