Skip to content

Commit eaaabd3

Browse files
committed
Add data_sequence_number to ReplaceDataFilesAction
1 parent 71aaa7a commit eaaabd3

File tree

2 files changed

+84
-13
lines changed

2 files changed

+84
-13
lines changed

crates/iceberg/src/transaction/replace_data_files.rs

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ pub struct ReplaceDataFilesAction {
3838
files_to_delete: Vec<DataFile>,
3939
files_to_add: Vec<DataFile>,
4040
validate_from_snapshot_id: Option<i64>,
41+
data_sequence_number: Option<i64>,
4142
}
4243

4344
impl ReplaceDataFilesAction {
@@ -49,6 +50,7 @@ impl ReplaceDataFilesAction {
4950
files_to_delete: vec![],
5051
files_to_add: vec![],
5152
validate_from_snapshot_id: None,
53+
data_sequence_number: None,
5254
}
5355
}
5456

@@ -87,6 +89,12 @@ impl ReplaceDataFilesAction {
8789
self.validate_from_snapshot_id = Some(snapshot_id);
8890
self
8991
}
92+
93+
/// Set data sequence number for new files (for handling concurrent equality deletes).
94+
pub fn data_sequence_number(mut self, seq_num: i64) -> Self {
95+
self.data_sequence_number = Some(seq_num);
96+
self
97+
}
9098
}
9199

92100
#[async_trait]
@@ -111,14 +119,18 @@ impl TransactionAction for ReplaceDataFilesAction {
111119
Self::validate_files_exist(table, snapshot_id, &self.files_to_delete).await?;
112120
}
113121

114-
let snapshot_producer = SnapshotProducer::new(
122+
let mut snapshot_producer = SnapshotProducer::new(
115123
table,
116124
self.commit_uuid.unwrap_or_else(Uuid::now_v7),
117125
self.key_metadata.clone(),
118126
self.snapshot_properties.clone(),
119127
self.files_to_add.clone(),
120128
);
121129

130+
if let Some(seq_num) = self.data_sequence_number {
131+
snapshot_producer = snapshot_producer.with_data_sequence_number(seq_num);
132+
}
133+
122134
snapshot_producer.validate_added_data_files()?;
123135

124136
let replace_op = ReplaceOperation {
@@ -501,4 +513,45 @@ mod integration_tests {
501513
let result = tx.commit(&catalog).await;
502514
assert!(result.is_err());
503515
}
516+
517+
#[tokio::test]
518+
async fn test_data_sequence_number() {
519+
let catalog = new_memory_catalog().await;
520+
let table = make_v3_minimal_table_in_catalog(&catalog).await;
521+
522+
let file1 = create_file("data/file1.parquet", 100);
523+
let tx = Transaction::new(&table);
524+
let action = tx.fast_append().add_data_files(vec![file1.clone()]);
525+
let tx = action.apply(tx).unwrap();
526+
let table = tx.commit(&catalog).await.unwrap();
527+
528+
let original_seq = table.metadata().current_snapshot().unwrap().sequence_number();
529+
530+
// Replace with custom sequence number
531+
let compacted = create_file("data/compacted.parquet", 100);
532+
let tx = Transaction::new(&table);
533+
let action = tx
534+
.replace_data_files()
535+
.data_sequence_number(original_seq)
536+
.delete_files(vec![file1])
537+
.add_files(vec![compacted]);
538+
let tx = action.apply(tx).unwrap();
539+
let table = tx.commit(&catalog).await.unwrap();
540+
541+
// Verify the new manifest entry has the custom sequence number
542+
let snapshot = table.metadata().current_snapshot().unwrap();
543+
let manifest_list = snapshot
544+
.load_manifest_list(table.file_io(), table.metadata())
545+
.await
546+
.unwrap();
547+
548+
for entry in manifest_list.entries() {
549+
let manifest = entry.load_manifest(table.file_io()).await.unwrap();
550+
for e in manifest.entries() {
551+
if e.file_path() == "data/compacted.parquet" {
552+
assert_eq!(e.sequence_number(), Some(original_seq));
553+
}
554+
}
555+
}
556+
}
504557
}

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> {
118118
// It starts from 0 and increments for each new manifest file.
119119
// Note: This counter is limited to the range of (0..u64::MAX).
120120
manifest_counter: RangeFrom<u64>,
121+
// Custom data sequence number for compaction operations.
122+
data_sequence_number: Option<i64>,
121123
}
122124

123125
impl<'a> SnapshotProducer<'a> {
@@ -136,9 +138,15 @@ impl<'a> SnapshotProducer<'a> {
136138
snapshot_properties,
137139
added_data_files,
138140
manifest_counter: (0..),
141+
data_sequence_number: None,
139142
}
140143
}
141144

145+
pub(crate) fn with_data_sequence_number(mut self, seq_num: i64) -> Self {
146+
self.data_sequence_number = Some(seq_num);
147+
self
148+
}
149+
142150
pub(crate) fn validate_added_data_files(&self) -> Result<()> {
143151
for data_file in &self.added_data_files {
144152
if data_file.content_type() != crate::spec::DataContentType::Data {
@@ -300,18 +308,28 @@ impl<'a> SnapshotProducer<'a> {
300308

301309
let snapshot_id = self.snapshot_id;
302310
let format_version = self.table.metadata().format_version();
303-
let manifest_entries = added_data_files.into_iter().map(|data_file| {
304-
let builder = ManifestEntry::builder()
305-
.status(crate::spec::ManifestStatus::Added)
306-
.data_file(data_file);
307-
if format_version == FormatVersion::V1 {
308-
builder.snapshot_id(snapshot_id).build()
309-
} else {
310-
// For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when
311-
// commit failed.
312-
builder.build()
313-
}
314-
});
311+
let data_sequence_number = self.data_sequence_number;
312+
let manifest_entries: Vec<_> = added_data_files
313+
.into_iter()
314+
.map(|data_file| {
315+
match (format_version, data_sequence_number) {
316+
(FormatVersion::V1, _) => ManifestEntry::builder()
317+
.status(crate::spec::ManifestStatus::Added)
318+
.snapshot_id(snapshot_id)
319+
.data_file(data_file)
320+
.build(),
321+
(_, Some(seq_num)) => ManifestEntry::builder()
322+
.status(crate::spec::ManifestStatus::Added)
323+
.sequence_number(seq_num)
324+
.data_file(data_file)
325+
.build(),
326+
(_, None) => ManifestEntry::builder()
327+
.status(crate::spec::ManifestStatus::Added)
328+
.data_file(data_file)
329+
.build(),
330+
}
331+
})
332+
.collect();
315333
let mut writer = self.new_manifest_writer(ManifestContentType::Data)?;
316334
for entry in manifest_entries {
317335
writer.add_entry(entry)?;

0 commit comments

Comments
 (0)