diff --git a/core/src/compaction/mod.rs b/core/src/compaction/mod.rs index 020eaff..1348e88 100644 --- a/core/src/compaction/mod.rs +++ b/core/src/compaction/mod.rs @@ -15,6 +15,7 @@ */ use std::borrow::Cow; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -974,13 +975,16 @@ impl CommitManager { let rewrite_action = if use_starting_sequence_number { // TODO: avoid retry if the snapshot_id is not found if let Some(snapshot) = table.metadata().snapshot_by_id(starting_snapshot_id) { - txn.rewrite_files() + let mut action = txn + .rewrite_files() .set_enable_delete_filter_manager(true) .add_data_files(data_files) .delete_files(delete_files) .set_target_branch(to_branch.to_owned()) .set_new_data_file_sequence_number(snapshot.sequence_number()) - .set_check_file_existence(true) + .set_check_file_existence(true); + action.set_snapshot_properties(custom_snapshot_properties(snapshot)); + action } else { return Err(iceberg::Error::new( ErrorKind::Unexpected, @@ -990,12 +994,17 @@ impl CommitManager { )); } } else { - txn.rewrite_files() + let mut action = txn + .rewrite_files() .set_enable_delete_filter_manager(true) .add_data_files(data_files) .delete_files(delete_files) .set_target_branch(to_branch.to_owned()) - .set_check_file_existence(true) + .set_check_file_existence(true); + if let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) { + action.set_snapshot_properties(custom_snapshot_properties(snapshot)); + } + action }; let txn = rewrite_action.apply(txn)?; @@ -1084,12 +1093,15 @@ impl CommitManager { let overwrite_action = if use_starting_sequence_number { // TODO: avoid retry if the snapshot_id is not found if let Some(snapshot) = table.metadata().snapshot_by_id(starting_snapshot_id) { - txn.overwrite_files() + let mut action = txn + .overwrite_files() .add_data_files(data_files) .delete_files(delete_files) .set_target_branch(to_branch.to_owned()) .set_new_data_file_sequence_number(snapshot.sequence_number()) - .set_check_file_existence(true) + .set_check_file_existence(true); + action.set_snapshot_properties(custom_snapshot_properties(snapshot)); + action } else { return Err(iceberg::Error::new( ErrorKind::Unexpected, @@ -1099,11 +1111,16 @@ impl CommitManager { )); } } else { - txn.overwrite_files() + let mut action = txn + .overwrite_files() .add_data_files(data_files) .delete_files(delete_files) .set_target_branch(to_branch.to_owned()) - .set_check_file_existence(true) + .set_check_file_existence(true); + if let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) { + action.set_snapshot_properties(custom_snapshot_properties(snapshot)); + } + action }; let txn = overwrite_action.apply(txn)?; @@ -1149,6 +1166,52 @@ impl CommitManager { } } +/// Known Iceberg snapshot summary keys managed by `SnapshotSummaryCollector` +/// and `update_snapshot_summaries`. +/// +/// These keys are auto-computed by iceberg-rust during snapshot production and must +/// NOT be copied from the previous snapshot — doing so would overwrite the correctly +/// recalculated values. Only properties whose keys are *not* in this list (and don't +/// start with `"partitions."`) are considered custom metadata that must be preserved. +const KNOWN_SNAPSHOT_SUMMARY_KEYS: &[&str] = &[ + "added-data-files", + "added-delete-files", + "added-equality-delete-files", + "added-position-delete-files", + "added-files-size", + "added-records", + "added-equality-deletes", + "added-position-deletes", + "deleted-data-files", + "removed-delete-files", + "removed-equality-delete-files", + "removed-position-delete-files", + "removed-files-size", + "deleted-records", + "removed-equality-deletes", + "removed-position-deletes", + "total-data-files", + "total-delete-files", + "total-files-size", + "total-records", + "total-equality-deletes", + "total-position-deletes", + "changed-partition-count", +]; + +/// Extracts non-standard (custom) properties from a snapshot's summary. +fn custom_snapshot_properties(snapshot: &Snapshot) -> HashMap { + snapshot + .summary() + .additional_properties + .iter() + .filter(|(k, _)| { + !KNOWN_SNAPSHOT_SUMMARY_KEYS.contains(&k.as_str()) && !k.starts_with("partitions.") + }) + .map(|(k, v)| (k.clone(), v.clone())) + .collect() +} + /// Compaction plan describing files to rewrite and target commit location. #[derive(Debug, Clone)] pub struct CompactionPlan { @@ -2382,4 +2445,149 @@ mod tests { "plan_compaction() without config should fail" ); } + + /// Appends data files with custom snapshot properties set on the commit. + async fn append_and_commit_with_properties( + table: &Table, + catalog: &C, + data_files: Vec, + properties: HashMap, + ) -> Table { + let transaction = Transaction::new(table); + let append_action = transaction + .fast_append() + .add_data_files(data_files) + .set_snapshot_properties(properties); + let tx = append_action.apply(transaction).unwrap(); + tx.commit(catalog).await.unwrap() + } + + /// Custom snapshot metadata from the previous snapshot should get through compaction + #[tokio::test] + async fn test_custom_snapshot_metadata_preserved_after_compaction() { + let env = create_test_env().await; + + let data_files = write_simple_files(&env.table, &env.warehouse_location, "test", 3).await; + + // Append with custom properties that simulate external system metadata + let mut custom_props = HashMap::new(); + custom_props.insert("pipeline-id".to_owned(), "pipe-42".to_owned()); + custom_props.insert("bobsled.source-table".to_owned(), "events_raw".to_owned()); + custom_props.insert("custom.watermark-ms".to_owned(), "1700000000000".to_owned()); + + let updated_table = append_and_commit_with_properties( + &env.table, + env.catalog.as_ref(), + data_files, + custom_props.clone(), + ) + .await; + + // Verify custom properties exist on the pre-compaction snapshot + let snapshot_before = updated_table + .metadata() + .snapshot_for_ref(MAIN_BRANCH) + .unwrap(); + let summary_before = &snapshot_before.summary().additional_properties; + for (key, value) in &custom_props { + assert_eq!( + summary_before.get(key).unwrap(), + value, + "Custom property '{key}' should be present before compaction" + ); + } + + // Also record the total-records value before compaction + let total_records_before = summary_before.get("total-records").cloned(); + + // Run compaction + let compaction = create_default_compaction(env.catalog.clone(), env.table_ident.clone()); + let result = compaction.compact().await.unwrap().unwrap(); + let final_table = result.table.unwrap(); + + let snapshot_after = final_table + .metadata() + .snapshot_for_ref(MAIN_BRANCH) + .unwrap(); + let summary_after = &snapshot_after.summary().additional_properties; + + // Custom properties must be preserved + for (key, value) in &custom_props { + assert_eq!( + summary_after.get(key).unwrap(), + value, + "Custom property '{key}' must survive compaction" + ); + } + + // total-records must still be correct (recalculated, not blindly copied) + assert_eq!( + summary_after.get("total-records"), + total_records_before.as_ref(), + "total-records must be preserved (no data was added or removed)" + ); + + // Snapshot operation should be "replace" for compaction + assert_eq!( + snapshot_after.summary().operation, + iceberg::spec::Operation::Replace, + "Compaction snapshot operation should be 'replace'" + ); + } + + /// Verifies that `custom_snapshot_properties` correctly filters known keys. + #[test] + fn test_custom_snapshot_properties_filters_known_keys() { + use iceberg::spec::{Operation, Snapshot, Summary}; + + use super::{KNOWN_SNAPSHOT_SUMMARY_KEYS, custom_snapshot_properties}; + + let mut all_properties = HashMap::new(); + + // Add all known keys + for key in KNOWN_SNAPSHOT_SUMMARY_KEYS { + all_properties.insert(key.to_string(), "100".to_owned()); + } + + all_properties.insert( + "partitions.date=2024-01-01".to_owned(), + "added-data-files=1".to_owned(), + ); + + all_properties.insert("pipeline-id".to_owned(), "pipe-42".to_owned()); + all_properties.insert("bobsled.source-table".to_owned(), "events_raw".to_owned()); + + let summary = Summary { + operation: Operation::Append, + additional_properties: all_properties, + }; + + let snapshot = Snapshot::builder() + .with_snapshot_id(1) + .with_timestamp_ms(1000) + .with_sequence_number(1) + .with_schema_id(0) + .with_manifest_list("manifest-list.avro") + .with_summary(summary) + .build(); + + let custom = custom_snapshot_properties(&snapshot); + + // Only custom keys should remain + assert_eq!(custom.len(), 2); + assert_eq!(custom.get("pipeline-id").unwrap(), "pipe-42"); + assert_eq!(custom.get("bobsled.source-table").unwrap(), "events_raw"); + + // Known keys must NOT be present + for key in KNOWN_SNAPSHOT_SUMMARY_KEYS { + assert!( + !custom.contains_key(*key), + "Known key '{key}' must be filtered out" + ); + } + assert!( + !custom.contains_key("partitions.date=2024-01-01"), + "Partition keys must be filtered out" + ); + } }