Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 216 additions & 8 deletions core/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -974,13 +975,16 @@ impl CommitManager {
let rewrite_action = if use_starting_sequence_number {
// TODO: avoid retry if the snapshot_id is not found
if let Some(snapshot) = table.metadata().snapshot_by_id(starting_snapshot_id) {
txn.rewrite_files()
let mut action = txn
.rewrite_files()
.set_enable_delete_filter_manager(true)
.add_data_files(data_files)
.delete_files(delete_files)
.set_target_branch(to_branch.to_owned())
.set_new_data_file_sequence_number(snapshot.sequence_number())
.set_check_file_existence(true)
.set_check_file_existence(true);
action.set_snapshot_properties(custom_snapshot_properties(snapshot));
action
} else {
return Err(iceberg::Error::new(
ErrorKind::Unexpected,
Expand All @@ -990,12 +994,17 @@ impl CommitManager {
));
}
} else {
txn.rewrite_files()
let mut action = txn
.rewrite_files()
.set_enable_delete_filter_manager(true)
.add_data_files(data_files)
.delete_files(delete_files)
.set_target_branch(to_branch.to_owned())
.set_check_file_existence(true)
.set_check_file_existence(true);
if let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) {
action.set_snapshot_properties(custom_snapshot_properties(snapshot));
}
action
};

let txn = rewrite_action.apply(txn)?;
Expand Down Expand Up @@ -1084,12 +1093,15 @@ impl CommitManager {
let overwrite_action = if use_starting_sequence_number {
// TODO: avoid retry if the snapshot_id is not found
if let Some(snapshot) = table.metadata().snapshot_by_id(starting_snapshot_id) {
txn.overwrite_files()
let mut action = txn
.overwrite_files()
.add_data_files(data_files)
.delete_files(delete_files)
.set_target_branch(to_branch.to_owned())
.set_new_data_file_sequence_number(snapshot.sequence_number())
.set_check_file_existence(true)
.set_check_file_existence(true);
action.set_snapshot_properties(custom_snapshot_properties(snapshot));
action
} else {
return Err(iceberg::Error::new(
ErrorKind::Unexpected,
Expand All @@ -1099,11 +1111,16 @@ impl CommitManager {
));
}
} else {
txn.overwrite_files()
let mut action = txn
.overwrite_files()
.add_data_files(data_files)
.delete_files(delete_files)
.set_target_branch(to_branch.to_owned())
.set_check_file_existence(true)
.set_check_file_existence(true);
if let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) {
action.set_snapshot_properties(custom_snapshot_properties(snapshot));
}
action
};

let txn = overwrite_action.apply(txn)?;
Expand Down Expand Up @@ -1149,6 +1166,52 @@ impl CommitManager {
}
}

/// Known Iceberg snapshot summary keys managed by `SnapshotSummaryCollector`
/// and `update_snapshot_summaries`.
///
/// These keys are auto-computed by iceberg-rust during snapshot production and must
/// NOT be copied from the previous snapshot — doing so would overwrite the correctly
/// recalculated values. Only properties whose keys are *not* in this list (and don't
/// start with `"partitions."`) are considered custom metadata that must be preserved.
const KNOWN_SNAPSHOT_SUMMARY_KEYS: &[&str] = &[
"added-data-files",
"added-delete-files",
"added-equality-delete-files",
"added-position-delete-files",
"added-files-size",
"added-records",
"added-equality-deletes",
"added-position-deletes",
"deleted-data-files",
"removed-delete-files",
"removed-equality-delete-files",
"removed-position-delete-files",
"removed-files-size",
"deleted-records",
"removed-equality-deletes",
"removed-position-deletes",
"total-data-files",
"total-delete-files",
"total-files-size",
"total-records",
"total-equality-deletes",
"total-position-deletes",
"changed-partition-count",
];

/// Extracts non-standard (custom) properties from a snapshot's summary.
fn custom_snapshot_properties(snapshot: &Snapshot) -> HashMap<String, String> {
snapshot
.summary()
.additional_properties
.iter()
.filter(|(k, _)| {
!KNOWN_SNAPSHOT_SUMMARY_KEYS.contains(&k.as_str()) && !k.starts_with("partitions.")
})
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}

/// Compaction plan describing files to rewrite and target commit location.
#[derive(Debug, Clone)]
pub struct CompactionPlan {
Expand Down Expand Up @@ -2382,4 +2445,149 @@ mod tests {
"plan_compaction() without config should fail"
);
}

/// Appends data files with custom snapshot properties set on the commit.
async fn append_and_commit_with_properties<C: Catalog>(
table: &Table,
catalog: &C,
data_files: Vec<DataFile>,
properties: HashMap<String, String>,
) -> Table {
let transaction = Transaction::new(table);
let append_action = transaction
.fast_append()
.add_data_files(data_files)
.set_snapshot_properties(properties);
let tx = append_action.apply(transaction).unwrap();
tx.commit(catalog).await.unwrap()
}

/// Custom snapshot metadata from the previous snapshot should get through compaction
#[tokio::test]
async fn test_custom_snapshot_metadata_preserved_after_compaction() {
let env = create_test_env().await;

let data_files = write_simple_files(&env.table, &env.warehouse_location, "test", 3).await;

// Append with custom properties that simulate external system metadata
let mut custom_props = HashMap::new();
custom_props.insert("pipeline-id".to_owned(), "pipe-42".to_owned());
custom_props.insert("bobsled.source-table".to_owned(), "events_raw".to_owned());
custom_props.insert("custom.watermark-ms".to_owned(), "1700000000000".to_owned());

let updated_table = append_and_commit_with_properties(
&env.table,
env.catalog.as_ref(),
data_files,
custom_props.clone(),
)
.await;

// Verify custom properties exist on the pre-compaction snapshot
let snapshot_before = updated_table
.metadata()
.snapshot_for_ref(MAIN_BRANCH)
.unwrap();
let summary_before = &snapshot_before.summary().additional_properties;
for (key, value) in &custom_props {
assert_eq!(
summary_before.get(key).unwrap(),
value,
"Custom property '{key}' should be present before compaction"
);
}

// Also record the total-records value before compaction
let total_records_before = summary_before.get("total-records").cloned();

// Run compaction
let compaction = create_default_compaction(env.catalog.clone(), env.table_ident.clone());
let result = compaction.compact().await.unwrap().unwrap();
let final_table = result.table.unwrap();

let snapshot_after = final_table
.metadata()
.snapshot_for_ref(MAIN_BRANCH)
.unwrap();
let summary_after = &snapshot_after.summary().additional_properties;

// Custom properties must be preserved
for (key, value) in &custom_props {
assert_eq!(
summary_after.get(key).unwrap(),
value,
"Custom property '{key}' must survive compaction"
);
}

// total-records must still be correct (recalculated, not blindly copied)
assert_eq!(
summary_after.get("total-records"),
total_records_before.as_ref(),
"total-records must be preserved (no data was added or removed)"
);

// Snapshot operation should be "replace" for compaction
assert_eq!(
snapshot_after.summary().operation,
iceberg::spec::Operation::Replace,
"Compaction snapshot operation should be 'replace'"
);
}

/// Verifies that `custom_snapshot_properties` correctly filters known keys.
#[test]
fn test_custom_snapshot_properties_filters_known_keys() {
use iceberg::spec::{Operation, Snapshot, Summary};

use super::{KNOWN_SNAPSHOT_SUMMARY_KEYS, custom_snapshot_properties};

let mut all_properties = HashMap::new();

// Add all known keys
for key in KNOWN_SNAPSHOT_SUMMARY_KEYS {
all_properties.insert(key.to_string(), "100".to_owned());
}

all_properties.insert(
"partitions.date=2024-01-01".to_owned(),
"added-data-files=1".to_owned(),
);

all_properties.insert("pipeline-id".to_owned(), "pipe-42".to_owned());
all_properties.insert("bobsled.source-table".to_owned(), "events_raw".to_owned());

let summary = Summary {
operation: Operation::Append,
additional_properties: all_properties,
};

let snapshot = Snapshot::builder()
.with_snapshot_id(1)
.with_timestamp_ms(1000)
.with_sequence_number(1)
.with_schema_id(0)
.with_manifest_list("manifest-list.avro")
.with_summary(summary)
.build();

let custom = custom_snapshot_properties(&snapshot);

// Only custom keys should remain
assert_eq!(custom.len(), 2);
assert_eq!(custom.get("pipeline-id").unwrap(), "pipe-42");
assert_eq!(custom.get("bobsled.source-table").unwrap(), "events_raw");

// Known keys must NOT be present
for key in KNOWN_SNAPSHOT_SUMMARY_KEYS {
assert!(
!custom.contains_key(*key),
"Known key '{key}' must be filtered out"
);
}
assert!(
!custom.contains_key("partitions.date=2024-01-01"),
"Partition keys must be filtered out"
);
}
}
Loading