Skip to content

Commit b7fa87c

Browse files
authored
fix: preserve custom snapshot metadata after compaction (#131)
1 parent 2bb1798 commit b7fa87c

1 file changed

Lines changed: 216 additions & 8 deletions

File tree

core/src/compaction/mod.rs

Lines changed: 216 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
use std::borrow::Cow;
18+
use std::collections::HashMap;
1819
use std::sync::Arc;
1920
use std::time::Duration;
2021

@@ -974,13 +975,16 @@ impl CommitManager {
974975
let rewrite_action = if use_starting_sequence_number {
975976
// TODO: avoid retry if the snapshot_id is not found
976977
if let Some(snapshot) = table.metadata().snapshot_by_id(starting_snapshot_id) {
977-
txn.rewrite_files()
978+
let mut action = txn
979+
.rewrite_files()
978980
.set_enable_delete_filter_manager(true)
979981
.add_data_files(data_files)
980982
.delete_files(delete_files)
981983
.set_target_branch(to_branch.to_owned())
982984
.set_new_data_file_sequence_number(snapshot.sequence_number())
983-
.set_check_file_existence(true)
985+
.set_check_file_existence(true);
986+
action.set_snapshot_properties(custom_snapshot_properties(snapshot));
987+
action
984988
} else {
985989
return Err(iceberg::Error::new(
986990
ErrorKind::Unexpected,
@@ -990,12 +994,17 @@ impl CommitManager {
990994
));
991995
}
992996
} else {
993-
txn.rewrite_files()
997+
let mut action = txn
998+
.rewrite_files()
994999
.set_enable_delete_filter_manager(true)
9951000
.add_data_files(data_files)
9961001
.delete_files(delete_files)
9971002
.set_target_branch(to_branch.to_owned())
998-
.set_check_file_existence(true)
1003+
.set_check_file_existence(true);
1004+
if let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) {
1005+
action.set_snapshot_properties(custom_snapshot_properties(snapshot));
1006+
}
1007+
action
9991008
};
10001009

10011010
let txn = rewrite_action.apply(txn)?;
@@ -1084,12 +1093,15 @@ impl CommitManager {
10841093
let overwrite_action = if use_starting_sequence_number {
10851094
// TODO: avoid retry if the snapshot_id is not found
10861095
if let Some(snapshot) = table.metadata().snapshot_by_id(starting_snapshot_id) {
1087-
txn.overwrite_files()
1096+
let mut action = txn
1097+
.overwrite_files()
10881098
.add_data_files(data_files)
10891099
.delete_files(delete_files)
10901100
.set_target_branch(to_branch.to_owned())
10911101
.set_new_data_file_sequence_number(snapshot.sequence_number())
1092-
.set_check_file_existence(true)
1102+
.set_check_file_existence(true);
1103+
action.set_snapshot_properties(custom_snapshot_properties(snapshot));
1104+
action
10931105
} else {
10941106
return Err(iceberg::Error::new(
10951107
ErrorKind::Unexpected,
@@ -1099,11 +1111,16 @@ impl CommitManager {
10991111
));
11001112
}
11011113
} else {
1102-
txn.overwrite_files()
1114+
let mut action = txn
1115+
.overwrite_files()
11031116
.add_data_files(data_files)
11041117
.delete_files(delete_files)
11051118
.set_target_branch(to_branch.to_owned())
1106-
.set_check_file_existence(true)
1119+
.set_check_file_existence(true);
1120+
if let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) {
1121+
action.set_snapshot_properties(custom_snapshot_properties(snapshot));
1122+
}
1123+
action
11071124
};
11081125

11091126
let txn = overwrite_action.apply(txn)?;
@@ -1149,6 +1166,52 @@ impl CommitManager {
11491166
}
11501167
}
11511168

1169+
/// Known Iceberg snapshot summary keys managed by `SnapshotSummaryCollector`
1170+
/// and `update_snapshot_summaries`.
1171+
///
1172+
/// These keys are auto-computed by iceberg-rust during snapshot production and must
1173+
/// NOT be copied from the previous snapshot — doing so would overwrite the correctly
1174+
/// recalculated values. Only properties whose keys are *not* in this list (and don't
1175+
/// start with `"partitions."`) are considered custom metadata that must be preserved.
1176+
const KNOWN_SNAPSHOT_SUMMARY_KEYS: &[&str] = &[
1177+
"added-data-files",
1178+
"added-delete-files",
1179+
"added-equality-delete-files",
1180+
"added-position-delete-files",
1181+
"added-files-size",
1182+
"added-records",
1183+
"added-equality-deletes",
1184+
"added-position-deletes",
1185+
"deleted-data-files",
1186+
"removed-delete-files",
1187+
"removed-equality-delete-files",
1188+
"removed-position-delete-files",
1189+
"removed-files-size",
1190+
"deleted-records",
1191+
"removed-equality-deletes",
1192+
"removed-position-deletes",
1193+
"total-data-files",
1194+
"total-delete-files",
1195+
"total-files-size",
1196+
"total-records",
1197+
"total-equality-deletes",
1198+
"total-position-deletes",
1199+
"changed-partition-count",
1200+
];
1201+
1202+
/// Extracts non-standard (custom) properties from a snapshot's summary.
1203+
fn custom_snapshot_properties(snapshot: &Snapshot) -> HashMap<String, String> {
1204+
snapshot
1205+
.summary()
1206+
.additional_properties
1207+
.iter()
1208+
.filter(|(k, _)| {
1209+
!KNOWN_SNAPSHOT_SUMMARY_KEYS.contains(&k.as_str()) && !k.starts_with("partitions.")
1210+
})
1211+
.map(|(k, v)| (k.clone(), v.clone()))
1212+
.collect()
1213+
}
1214+
11521215
/// Compaction plan describing files to rewrite and target commit location.
11531216
#[derive(Debug, Clone)]
11541217
pub struct CompactionPlan {
@@ -2382,4 +2445,149 @@ mod tests {
23822445
"plan_compaction() without config should fail"
23832446
);
23842447
}
2448+
2449+
/// Appends data files with custom snapshot properties set on the commit.
2450+
async fn append_and_commit_with_properties<C: Catalog>(
2451+
table: &Table,
2452+
catalog: &C,
2453+
data_files: Vec<DataFile>,
2454+
properties: HashMap<String, String>,
2455+
) -> Table {
2456+
let transaction = Transaction::new(table);
2457+
let append_action = transaction
2458+
.fast_append()
2459+
.add_data_files(data_files)
2460+
.set_snapshot_properties(properties);
2461+
let tx = append_action.apply(transaction).unwrap();
2462+
tx.commit(catalog).await.unwrap()
2463+
}
2464+
2465+
/// Custom snapshot metadata from the previous snapshot should get through compaction
2466+
#[tokio::test]
2467+
async fn test_custom_snapshot_metadata_preserved_after_compaction() {
2468+
let env = create_test_env().await;
2469+
2470+
let data_files = write_simple_files(&env.table, &env.warehouse_location, "test", 3).await;
2471+
2472+
// Append with custom properties that simulate external system metadata
2473+
let mut custom_props = HashMap::new();
2474+
custom_props.insert("pipeline-id".to_owned(), "pipe-42".to_owned());
2475+
custom_props.insert("bobsled.source-table".to_owned(), "events_raw".to_owned());
2476+
custom_props.insert("custom.watermark-ms".to_owned(), "1700000000000".to_owned());
2477+
2478+
let updated_table = append_and_commit_with_properties(
2479+
&env.table,
2480+
env.catalog.as_ref(),
2481+
data_files,
2482+
custom_props.clone(),
2483+
)
2484+
.await;
2485+
2486+
// Verify custom properties exist on the pre-compaction snapshot
2487+
let snapshot_before = updated_table
2488+
.metadata()
2489+
.snapshot_for_ref(MAIN_BRANCH)
2490+
.unwrap();
2491+
let summary_before = &snapshot_before.summary().additional_properties;
2492+
for (key, value) in &custom_props {
2493+
assert_eq!(
2494+
summary_before.get(key).unwrap(),
2495+
value,
2496+
"Custom property '{key}' should be present before compaction"
2497+
);
2498+
}
2499+
2500+
// Also record the total-records value before compaction
2501+
let total_records_before = summary_before.get("total-records").cloned();
2502+
2503+
// Run compaction
2504+
let compaction = create_default_compaction(env.catalog.clone(), env.table_ident.clone());
2505+
let result = compaction.compact().await.unwrap().unwrap();
2506+
let final_table = result.table.unwrap();
2507+
2508+
let snapshot_after = final_table
2509+
.metadata()
2510+
.snapshot_for_ref(MAIN_BRANCH)
2511+
.unwrap();
2512+
let summary_after = &snapshot_after.summary().additional_properties;
2513+
2514+
// Custom properties must be preserved
2515+
for (key, value) in &custom_props {
2516+
assert_eq!(
2517+
summary_after.get(key).unwrap(),
2518+
value,
2519+
"Custom property '{key}' must survive compaction"
2520+
);
2521+
}
2522+
2523+
// total-records must still be correct (recalculated, not blindly copied)
2524+
assert_eq!(
2525+
summary_after.get("total-records"),
2526+
total_records_before.as_ref(),
2527+
"total-records must be preserved (no data was added or removed)"
2528+
);
2529+
2530+
// Snapshot operation should be "replace" for compaction
2531+
assert_eq!(
2532+
snapshot_after.summary().operation,
2533+
iceberg::spec::Operation::Replace,
2534+
"Compaction snapshot operation should be 'replace'"
2535+
);
2536+
}
2537+
2538+
/// Verifies that `custom_snapshot_properties` correctly filters known keys.
2539+
#[test]
2540+
fn test_custom_snapshot_properties_filters_known_keys() {
2541+
use iceberg::spec::{Operation, Snapshot, Summary};
2542+
2543+
use super::{KNOWN_SNAPSHOT_SUMMARY_KEYS, custom_snapshot_properties};
2544+
2545+
let mut all_properties = HashMap::new();
2546+
2547+
// Add all known keys
2548+
for key in KNOWN_SNAPSHOT_SUMMARY_KEYS {
2549+
all_properties.insert(key.to_string(), "100".to_owned());
2550+
}
2551+
2552+
all_properties.insert(
2553+
"partitions.date=2024-01-01".to_owned(),
2554+
"added-data-files=1".to_owned(),
2555+
);
2556+
2557+
all_properties.insert("pipeline-id".to_owned(), "pipe-42".to_owned());
2558+
all_properties.insert("bobsled.source-table".to_owned(), "events_raw".to_owned());
2559+
2560+
let summary = Summary {
2561+
operation: Operation::Append,
2562+
additional_properties: all_properties,
2563+
};
2564+
2565+
let snapshot = Snapshot::builder()
2566+
.with_snapshot_id(1)
2567+
.with_timestamp_ms(1000)
2568+
.with_sequence_number(1)
2569+
.with_schema_id(0)
2570+
.with_manifest_list("manifest-list.avro")
2571+
.with_summary(summary)
2572+
.build();
2573+
2574+
let custom = custom_snapshot_properties(&snapshot);
2575+
2576+
// Only custom keys should remain
2577+
assert_eq!(custom.len(), 2);
2578+
assert_eq!(custom.get("pipeline-id").unwrap(), "pipe-42");
2579+
assert_eq!(custom.get("bobsled.source-table").unwrap(), "events_raw");
2580+
2581+
// Known keys must NOT be present
2582+
for key in KNOWN_SNAPSHOT_SUMMARY_KEYS {
2583+
assert!(
2584+
!custom.contains_key(*key),
2585+
"Known key '{key}' must be filtered out"
2586+
);
2587+
}
2588+
assert!(
2589+
!custom.contains_key("partitions.date=2024-01-01"),
2590+
"Partition keys must be filtered out"
2591+
);
2592+
}
23852593
}

0 commit comments

Comments
 (0)