diff --git a/Cargo.lock b/Cargo.lock index 8f5df819a2..05cb4ee41f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3276,6 +3276,7 @@ dependencies = [ "arrow-buffer", "arrow-cast", "arrow-ord", + "arrow-row", "arrow-schema", "arrow-select", "arrow-string", diff --git a/Cargo.toml b/Cargo.toml index 6eba22459c..7776451116 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ arrow-array = "57.1" arrow-buffer = "57.1" arrow-cast = "57.1" arrow-ord = "57.1" +arrow-row = "57.1" arrow-schema = "57.1" arrow-select = "57.1" arrow-string = "57.1" diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 871643b36e..1610913e36 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -716,6 +716,13 @@ impl Catalog for RestCatalog { }) .build()?; + println!( + "Create table request: {:?}", + request + .body() + .map(|b| String::from_utf8_lossy(b.as_bytes().unwrap_or(&[]))) + ); + let http_response = context.client.query_catalog(request).await?; let response = match http_response.status() { diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d6d931c86d..c2d6951ef4 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -49,6 +49,7 @@ arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-cast = { workspace = true } arrow-ord = { workspace = true } +arrow-row = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..fd4d514e3a 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -90,10 +90,19 @@ impl TransactionAction for FastAppendAction { self.key_metadata.clone(), self.snapshot_properties.clone(), self.added_data_files.clone(), + vec![], // fast append doesn't support delete files ); - // validate added files - snapshot_producer.validate_added_data_files()?; + // validate added files - ensure they are Data content type + for data_file in &self.added_data_files { + if data_file.content_type() != crate::spec::DataContentType::Data { + return Err(crate::Error::new( + crate::ErrorKind::DataInvalid, + "Only data content type is allowed for fast append", + )); + } + } + snapshot_producer.validate_added_data_files(&self.added_data_files)?; // Checks duplicate files if self.check_duplicate { diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index cb2ff7cf37..38df7d74d3 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod row_delta; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::row_delta::RowDeltaAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -141,6 +143,16 @@ impl Transaction { FastAppendAction::new() } + /// Creates a row delta action for row-level changes. + /// + /// Use this action for: + /// - CDC (Change Data Capture) ingestion + /// - Upsert operations + /// - Adding delete files (position or equality deletes) + pub fn row_delta(&self) -> RowDeltaAction { + RowDeltaAction::new() + } + /// Creates replace sort order action. pub fn replace_sort_order(&self) -> ReplaceSortOrderAction { ReplaceSortOrderAction::new() diff --git a/crates/iceberg/src/transaction/row_delta.rs b/crates/iceberg/src/transaction/row_delta.rs new file mode 100644 index 0000000000..89a9701396 --- /dev/null +++ b/crates/iceberg/src/transaction/row_delta.rs @@ -0,0 +1,432 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Row delta transaction for encoding row-level changes to Iceberg tables. +//! +//! This module provides the `RowDeltaAction` which enables atomic application of row-level +//! modifications including inserts, updates, and deletes. It supports both position deletes +//! (for specific row locations) and equality deletes (for rows matching equality conditions). +//! +//! This is the appropriate transaction type for CDC (Change Data Capture) ingestion, +//! upsert operations, and row-level deletions. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{DataContentType, DataFile, ManifestEntry, ManifestFile, Operation}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind}; + +/// RowDeltaAction is a transaction action for encoding row-level changes to a table. +/// +/// This action supports: +/// - Adding new data files +/// - Adding delete files (both position and equality deletes) +/// +/// This is the appropriate action to use for: +/// - CDC (Change Data Capture) ingestion +/// - Upsert operations +/// - Row-level deletions +/// +/// # Example +/// ```ignore +/// use iceberg::transaction::Transaction; +/// +/// let tx = Transaction::new(&table); +/// let action = tx.row_delta() +/// .add_data_files(new_data_files) +/// .add_delete_files(equality_delete_files); +/// let tx = action.apply(tx).unwrap(); +/// let table = tx.commit(&catalog).await.unwrap(); +/// ``` +pub struct RowDeltaAction { + check_duplicate: bool, + // below are properties used to create SnapshotProducer when commit + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + added_data_files: Vec, + added_delete_files: Vec, +} + +impl RowDeltaAction { + pub(crate) fn new() -> Self { + Self { + check_duplicate: true, + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + added_data_files: vec![], + added_delete_files: vec![], + } + } + + /// Set whether to check duplicate files + pub fn with_check_duplicate(mut self, v: bool) -> Self { + self.check_duplicate = v; + self + } + + /// Add data files to the snapshot. + pub fn add_data_files(mut self, data_files: impl IntoIterator) -> Self { + self.added_data_files.extend(data_files); + self + } + + /// Add delete files to the snapshot. + /// + /// Delete files can be either position deletes or equality deletes. + /// The content type of each file will be validated. + pub fn add_delete_files(mut self, delete_files: impl IntoIterator) -> Self { + self.added_delete_files.extend(delete_files); + self + } + + /// Set commit UUID for the snapshot. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, snapshot_properties: HashMap) -> Self { + self.snapshot_properties = snapshot_properties; + self + } + + /// Validate that delete files have appropriate content types + fn validate_delete_files(delete_files: &[DataFile]) -> Result<()> { + for delete_file in delete_files { + match delete_file.content_type() { + DataContentType::PositionDeletes | DataContentType::EqualityDeletes => { + // Valid delete file types + } + DataContentType::Data => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "File {} has content type Data but was added as a delete file. Use add_data_files() instead.", + delete_file.file_path() + ), + )); + } + } + + // Additional validation for equality deletes + if delete_file.content_type() == DataContentType::EqualityDeletes + && delete_file.equality_ids().is_none() + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Equality delete file {} must have equality_ids set", + delete_file.file_path() + ), + )); + } + } + Ok(()) + } +} + +#[async_trait] +impl TransactionAction for RowDeltaAction { + async fn commit(self: Arc, table: &Table) -> Result { + // Validate delete files have correct content types + Self::validate_delete_files(&self.added_delete_files)?; + + let snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.added_data_files.clone(), + self.added_delete_files.clone(), + ); + + // Validate added data files (partition specs, etc.) + if !self.added_data_files.is_empty() { + snapshot_producer.validate_added_data_files(&self.added_data_files)?; + } + + // Validate added delete files (partition specs, etc.) + if !self.added_delete_files.is_empty() { + snapshot_producer.validate_added_data_files(&self.added_delete_files)?; + } + + // Check duplicate files + if self.check_duplicate { + snapshot_producer.validate_duplicate_files().await?; + } + + snapshot_producer + .commit(RowDeltaOperation, DefaultManifestProcess) + .await + } +} + +struct RowDeltaOperation; + +impl SnapshotProduceOperation for RowDeltaOperation { + fn operation(&self) -> Operation { + Operation::Append + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) + .await?; + + // Include all existing manifests with added or existing files + Ok(manifest_list + .entries() + .iter() + .filter(|entry| entry.has_added_files() || entry.has_existing_files()) + .cloned() + .collect()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, Struct, + }; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; + use crate::{TableRequirement, TableUpdate}; + + #[tokio::test] + async fn test_row_delta_with_data_and_deletes() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/data-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let delete_file = DataFileBuilder::default() + .content(DataContentType::EqualityDeletes) + .file_path("test/delete-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(50) + .record_count(5) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .equality_ids(Some(vec![1])) // Assuming field id 1 is the key + .build() + .unwrap(); + + let action = tx + .row_delta() + .add_data_files(vec![data_file.clone()]) + .add_delete_files(vec![delete_file.clone()]); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + let requirements = action_commit.take_requirements(); + + // Check updates and requirements + assert!( + matches!((&updates[0],&updates[1]), (TableUpdate::AddSnapshot { snapshot },TableUpdate::SetSnapshotRef { reference,ref_name }) if snapshot.snapshot_id() == reference.snapshot_id && ref_name == MAIN_BRANCH) + ); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: table.metadata().current_snapshot_id + } + ], + requirements + ); + + // Check manifest list + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!() + }; + let manifest_list = new_snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + // Should have 2 manifests: one for data, one for deletes + assert_eq!(2, manifest_list.entries().len()); + } + + #[tokio::test] + async fn test_row_delta_rejects_data_file_as_delete() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/data-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + // Try to add a data file as a delete file - should fail + let action = tx.row_delta().add_delete_files(vec![data_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!( + err.to_string() + .contains("has content type Data but was added as a delete file") + ); + } + + #[tokio::test] + async fn test_row_delta_rejects_equality_delete_without_ids() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let delete_file = DataFileBuilder::default() + .content(DataContentType::EqualityDeletes) + .file_path("test/delete-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(50) + .record_count(5) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + // Missing equality_ids! + .build() + .unwrap(); + + let action = tx.row_delta().add_delete_files(vec![delete_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.to_string().contains("must have equality_ids set")); + } + + #[tokio::test] + async fn test_row_delta_with_only_deletes() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let delete_file = DataFileBuilder::default() + .content(DataContentType::PositionDeletes) + .file_path("test/delete-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(50) + .record_count(5) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let action = tx.row_delta().add_delete_files(vec![delete_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_row_delta_with_snapshot_properties() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let mut snapshot_properties = HashMap::new(); + snapshot_properties.insert("custom-key".to_string(), "custom-value".to_string()); + + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path("test/data-1.parquet".to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(10) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build() + .unwrap(); + + let action = tx + .row_delta() + .set_snapshot_properties(snapshot_properties) + .add_data_files(vec![data_file]); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + unreachable!() + }; + assert_eq!( + new_snapshot + .summary() + .additional_properties + .get("custom-key") + .unwrap(), + "custom-value" + ); + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..15c1d5a032 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -114,6 +114,7 @@ pub(crate) struct SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, // A counter used to generate unique manifest file names. // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). @@ -127,6 +128,7 @@ impl<'a> SnapshotProducer<'a> { key_metadata: Option>, snapshot_properties: HashMap, added_data_files: Vec, + added_delete_files: Vec, ) -> Self { Self { table, @@ -135,18 +137,13 @@ impl<'a> SnapshotProducer<'a> { key_metadata, snapshot_properties, added_data_files, + added_delete_files, manifest_counter: (0..), } } - pub(crate) fn validate_added_data_files(&self) -> Result<()> { - for data_file in &self.added_data_files { - if data_file.content_type() != crate::spec::DataContentType::Data { - return Err(Error::new( - ErrorKind::DataInvalid, - "Only data content type is allowed for fast append", - )); - } + pub(crate) fn validate_added_data_files(&self, added_data_files: &[DataFile]) -> Result<()> { + for data_file in added_data_files { // Check if the data file partition spec id matches the table default partition spec id. if self.table.metadata().default_partition_spec_id() != data_file.partition_spec_id { return Err(Error::new( @@ -319,6 +316,37 @@ impl<'a> SnapshotProducer<'a> { writer.write_manifest_file().await } + // Write manifest file for added delete files and return the ManifestFile for ManifestList. + async fn write_added_delete_manifest(&mut self) -> Result { + let added_delete_files = std::mem::take(&mut self.added_delete_files); + if added_delete_files.is_empty() { + return Err(Error::new( + ErrorKind::PreconditionFailed, + "No added delete files found when write an added delete manifest file", + )); + } + + let snapshot_id = self.snapshot_id; + let format_version = self.table.metadata().format_version(); + let manifest_entries = added_delete_files.into_iter().map(|delete_file| { + let builder = ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(delete_file); + if format_version == FormatVersion::V1 { + builder.snapshot_id(snapshot_id).build() + } else { + // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when + // commit failed. + builder.build() + } + }); + let mut writer = self.new_manifest_writer(ManifestContentType::Deletes)?; + for entry in manifest_entries { + writer.add_entry(entry)?; + } + writer.write_manifest_file().await + } + async fn manifest_file( &mut self, snapshot_produce_operation: &OP, @@ -329,24 +357,30 @@ impl<'a> SnapshotProducer<'a> { // TODO: Allowing snapshot property setup with no added data files is a workaround. // We should clean it up after all necessary actions are supported. // For details, please refer to https://github.com/apache/iceberg-rust/issues/1548 - if self.added_data_files.is_empty() && self.snapshot_properties.is_empty() { + if self.added_data_files.is_empty() + && self.added_delete_files.is_empty() + && self.snapshot_properties.is_empty() + { return Err(Error::new( ErrorKind::PreconditionFailed, - "No added data files or added snapshot properties found when write a manifest file", + "No added data files, delete files, or snapshot properties found when write a manifest file", )); } let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; let mut manifest_files = existing_manifests; - // Process added entries. + // Process added data entries. if !self.added_data_files.is_empty() { let added_manifest = self.write_added_manifest().await?; manifest_files.push(added_manifest); } - // # TODO - // Support process delete entries. + // Process added delete entries. + if !self.added_delete_files.is_empty() { + let added_delete_manifest = self.write_added_delete_manifest().await?; + manifest_files.push(added_delete_manifest); + } let manifest_files = manifest_process.process_manifests(self, manifest_files); Ok(manifest_files) @@ -383,6 +417,14 @@ impl<'a> SnapshotProducer<'a> { ); } + for delete_file in &self.added_delete_files { + summary_collector.add_file( + delete_file, + table_metadata.current_schema().clone(), + table_metadata.default_partition_spec().clone(), + ); + } + let previous_snapshot = table_metadata .snapshot_by_id(self.snapshot_id) .and_then(|snapshot| snapshot.parent_snapshot_id()) diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index 02dcda4164..30901e5fc1 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -132,6 +132,10 @@ where fn current_written_size(&self) -> usize { self.inner.as_ref().unwrap().current_written_size() } + + fn current_schema(&self) -> crate::spec::SchemaRef { + self.inner.as_ref().unwrap().current_schema() + } } #[cfg(test)] @@ -170,18 +174,21 @@ mod test { let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); - let schema = Schema::builder() - .with_schema_id(3) - .with_fields(vec![ - NestedField::required(3, "foo", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(4, "bar", Type::Primitive(PrimitiveType::String)).into(), - ]) - .build()?; + let schema = Arc::new( + Schema::builder() + .with_schema_id(3) + .with_fields(vec![ + NestedField::required(3, "foo", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(4, "bar", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?, + ); - let pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)); + let pw = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( pw, + schema, file_io.clone(), location_gen, file_name_gen, @@ -268,6 +275,7 @@ mod test { let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + schema_ref.clone(), file_io.clone(), location_gen, file_name_gen, diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 9bc2bf9840..dd6f71da99 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -60,7 +60,7 @@ where } /// Config for `EqualityDeleteWriter`. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct EqualityDeleteWriterConfig { // Field ids used to determine row equality in equality delete files. equality_ids: Vec, @@ -424,14 +424,15 @@ mod test { let equality_config = EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap(); let delete_schema = - arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap(); + Arc::new(arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap()); let projector = equality_config.projector.clone(); // prepare writer let pb = - ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(delete_schema)); + ParquetWriterBuilder::new(WriterProperties::builder().build(), delete_schema.clone()); let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( pb, + delete_schema, file_io.clone(), location_gen, file_name_gen, @@ -593,12 +594,13 @@ mod test { let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]; let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone()).unwrap(); let delete_arrow_schema = config.projected_arrow_schema_ref().clone(); - let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap(); + let delete_schema = Arc::new(arrow_schema_to_schema(&delete_arrow_schema).unwrap()); let pb = - ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(delete_schema)); + ParquetWriterBuilder::new(WriterProperties::builder().build(), delete_schema.clone()); let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( pb, + delete_schema, file_io.clone(), location_gen, file_name_gen, diff --git a/crates/iceberg/src/writer/base_writer/mod.rs b/crates/iceberg/src/writer/base_writer/mod.rs index 37ab97eb6d..5a23931f25 100644 --- a/crates/iceberg/src/writer/base_writer/mod.rs +++ b/crates/iceberg/src/writer/base_writer/mod.rs @@ -19,3 +19,4 @@ pub mod data_file_writer; pub mod equality_delete_writer; +pub mod position_delete_writer; diff --git a/crates/iceberg/src/writer/base_writer/position_delete_writer.rs b/crates/iceberg/src/writer/base_writer/position_delete_writer.rs new file mode 100644 index 0000000000..ad724fb2e4 --- /dev/null +++ b/crates/iceberg/src/writer/base_writer/position_delete_writer.rs @@ -0,0 +1,466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Position delete writer for encoding row-level deletions in Iceberg tables. +//! +//! Position deletes identify rows to delete by their file path and row position within that file. +//! This is more efficient than equality deletes when the exact location of rows to delete is known, +//! such as during CDC processing or when tracking rows written within the same transaction. + +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::spec::{DataFile, PartitionKey, Struct}; +use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; +use crate::writer::file_writer::rolling_writer::{RollingFileWriter, RollingFileWriterBuilder}; +use crate::writer::file_writer::FileWriterBuilder; +use crate::writer::{IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Builder for `PositionDeleteWriter`. +#[derive(Debug)] +pub struct PositionDeleteFileWriterBuilder< + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +> { + inner: RollingFileWriterBuilder, + config: PositionDeleteWriterConfig, +} + +impl PositionDeleteFileWriterBuilder +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + /// Create a new `PositionDeleteFileWriterBuilder` using a `RollingFileWriterBuilder`. + pub fn new( + inner: RollingFileWriterBuilder, + config: PositionDeleteWriterConfig, + ) -> Self { + Self { inner, config } + } +} + +/// Config for `PositionDeleteWriter`. +#[derive(Clone, Debug)] +pub struct PositionDeleteWriterConfig { + partition_value: Struct, + partition_spec_id: i32, + referenced_data_file: Option, +} + +impl PositionDeleteWriterConfig { + /// Create a new `PositionDeleteWriterConfig`. + /// + /// # Arguments + /// * `partition_value` - The partition value for the delete file + /// * `partition_spec_id` - The partition spec ID + /// * `referenced_data_file` - Optional path to the data file being deleted from + pub fn new( + partition_value: Option, + partition_spec_id: i32, + referenced_data_file: Option, + ) -> Self { + Self { + partition_value: partition_value.unwrap_or(Struct::empty()), + partition_spec_id, + referenced_data_file, + } + } + + /// Returns the Arrow schema for position delete files. + /// + /// Position delete files have a fixed schema: + /// - file_path: string (field id 2147483546) + /// - pos: long (field id 2147483545) + pub fn arrow_schema() -> ArrowSchemaRef { + Arc::new(ArrowSchema::new(vec![ + Field::new("file_path", DataType::Utf8, false).with_metadata( + [( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2147483546".to_string(), + )] + .into_iter() + .collect(), + ), + Field::new("pos", DataType::Int64, false).with_metadata( + [( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2147483545".to_string(), + )] + .into_iter() + .collect(), + ), + ])) + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for PositionDeleteFileWriterBuilder +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + type R = PositionDeleteFileWriter; + + async fn build(&self, partition_key: Option) -> Result { + Ok(PositionDeleteFileWriter { + inner: Some(self.inner.build()), + partition_value: self.config.partition_value.clone(), + partition_spec_id: self.config.partition_spec_id, + referenced_data_file: self.config.referenced_data_file.clone(), + partition_key, + }) + } +} + +/// Writer used to write position delete files. +/// +/// Position delete files mark specific rows in data files as deleted +/// by their position (row number). The schema is fixed with two columns: +/// - file_path: The path to the data file +/// - pos: The row position (0-indexed) in that file +#[derive(Debug)] +pub struct PositionDeleteFileWriter +{ + inner: Option>, + partition_value: Struct, + partition_spec_id: i32, + referenced_data_file: Option, + partition_key: Option, +} + +#[async_trait::async_trait] +impl IcebergWriter for PositionDeleteFileWriter +where + B: FileWriterBuilder, + L: LocationGenerator, + F: FileNameGenerator, +{ + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + // Validate the batch has the correct schema + let expected_schema = PositionDeleteWriterConfig::arrow_schema(); + if batch.schema().fields() != expected_schema.fields() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Position delete batch has invalid schema. Expected: {:?}, Got: {:?}", + expected_schema.fields(), + batch.schema().fields() + ), + )); + } + + if let Some(writer) = self.inner.as_mut() { + writer.write(&self.partition_key, &batch).await + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Position delete inner writer has been closed.", + )) + } + } + + async fn close(&mut self) -> Result> { + if let Some(writer) = self.inner.take() { + writer + .close() + .await? + .into_iter() + .map(|mut res| { + res.content(crate::spec::DataContentType::PositionDeletes); + res.partition(self.partition_value.clone()); + res.partition_spec_id(self.partition_spec_id); + if let Some(ref data_file) = self.referenced_data_file { + res.referenced_data_file(Some(data_file.clone())); + } + // Position deletes must have null sort_order_id (default is None) + res.build().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Failed to build position delete data file: {e}"), + ) + }) + }) + .collect() + } else { + Err(Error::new( + ErrorKind::Unexpected, + "Position delete inner writer has been closed.", + )) + } + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray}; + use arrow_select::concat::concat_batches; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::arrow::arrow_schema_to_schema; + use crate::io::{FileIO, FileIOBuilder, LocalFsStorageFactory}; + use crate::spec::DataFileFormat; + use crate::writer::IcebergWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + + async fn check_parquet_position_delete_file( + file_io: &FileIO, + data_file: &DataFile, + expected_batch: &RecordBatch, + ) { + assert_eq!(data_file.file_format, DataFileFormat::Parquet); + assert_eq!( + data_file.content, + crate::spec::DataContentType::PositionDeletes + ); + + // Read the written file + let input_file = file_io.new_input(data_file.file_path.clone()).unwrap(); + let input_content = input_file.read().await.unwrap(); + let reader_builder = + ParquetRecordBatchReaderBuilder::try_new(input_content.clone()).unwrap(); + let metadata = reader_builder.metadata().clone(); + + // Check data + let reader = reader_builder.build().unwrap(); + let batches = reader.map(|batch| batch.unwrap()).collect::>(); + let actual = concat_batches(&expected_batch.schema(), &batches).unwrap(); + assert_eq!(*expected_batch, actual); + + // Check metadata + assert_eq!( + data_file.record_count, + metadata + .row_groups() + .iter() + .map(|group| group.num_rows()) + .sum::() as u64 + ); + assert_eq!(data_file.file_size_in_bytes, input_content.len() as u64); + + // Position deletes must have null sort_order_id + assert!(data_file.sort_order_id.is_none()); + } + + #[tokio::test] + async fn test_position_delete_writer() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).build(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + // Get the position delete schema + let arrow_schema = PositionDeleteWriterConfig::arrow_schema(); + let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?); + + // Create writer + let config = PositionDeleteWriterConfig::new(None, 0, None); + let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pb, + schema, + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config) + .build(None) + .await?; + + // Create test data - delete rows 5, 10, 15 from a file + let file_paths = StringArray::from(vec![ + "s3://bucket/data/file1.parquet", + "s3://bucket/data/file1.parquet", + "s3://bucket/data/file1.parquet", + ]); + let positions = Int64Array::from(vec![5, 10, 15]); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(file_paths), + Arc::new(positions), + ])?; + + // Write + writer.write(batch.clone()).await?; + let data_files = writer.close().await?; + + assert_eq!(data_files.len(), 1); + let data_file = &data_files[0]; + + // Verify + check_parquet_position_delete_file(&file_io, data_file, &batch).await; + + Ok(()) + } + + #[tokio::test] + async fn test_position_delete_writer_with_referenced_file() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).build(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let arrow_schema = PositionDeleteWriterConfig::arrow_schema(); + let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?); + + // Create writer with referenced data file + let referenced_file = "s3://bucket/data/file1.parquet".to_string(); + let config = PositionDeleteWriterConfig::new(None, 0, Some(referenced_file.clone())); + let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pb, + schema, + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config) + .build(None) + .await?; + + // Create test data + let file_paths = StringArray::from(vec![referenced_file.as_str()]); + let positions = Int64Array::from(vec![42]); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(file_paths), + Arc::new(positions), + ])?; + + writer.write(batch.clone()).await?; + let data_files = writer.close().await?; + + assert_eq!(data_files.len(), 1); + let data_file = &data_files[0]; + + // Verify referenced_data_file is set + assert_eq!(data_file.referenced_data_file, Some(referenced_file)); + + Ok(()) + } + + #[tokio::test] + async fn test_position_delete_writer_invalid_schema() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).build(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let arrow_schema = PositionDeleteWriterConfig::arrow_schema(); + let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?); + + let config = PositionDeleteWriterConfig::new(None, 0, None); + let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pb, + schema, + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config) + .build(None) + .await?; + + // Try to write batch with wrong schema (missing pos field) + let wrong_schema = Arc::new(ArrowSchema::new(vec![Field::new( + "wrong_field", + DataType::Int32, + false, + )])); + let wrong_batch = + RecordBatch::try_new(wrong_schema, vec![Arc::new(Int32Array::from(vec![1]))])?; + + let result = writer.write(wrong_batch).await; + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("invalid schema")); + + Ok(()) + } + + #[tokio::test] + async fn test_position_delete_multiple_files() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).build(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let arrow_schema = PositionDeleteWriterConfig::arrow_schema(); + let schema = Arc::new(arrow_schema_to_schema(&arrow_schema)?); + + let config = PositionDeleteWriterConfig::new(None, 0, None); + let pb = ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + pb, + schema, + file_io.clone(), + location_gen, + file_name_gen, + ); + let mut writer = PositionDeleteFileWriterBuilder::new(rolling_writer_builder, config) + .build(None) + .await?; + + // Delete rows from multiple data files + let file_paths = StringArray::from(vec![ + "s3://bucket/data/file1.parquet", + "s3://bucket/data/file1.parquet", + "s3://bucket/data/file2.parquet", + "s3://bucket/data/file2.parquet", + "s3://bucket/data/file3.parquet", + ]); + let positions = Int64Array::from(vec![0, 10, 5, 15, 100]); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![ + Arc::new(file_paths), + Arc::new(positions), + ])?; + + writer.write(batch.clone()).await?; + let data_files = writer.close().await?; + + assert_eq!(data_files.len(), 1); + check_parquet_position_delete_file(&file_io, &data_files[0], &batch).await; + + Ok(()) + } +} diff --git a/crates/iceberg/src/writer/combined_writer/delta_writer.rs b/crates/iceberg/src/writer/combined_writer/delta_writer.rs new file mode 100644 index 0000000000..81eb25353e --- /dev/null +++ b/crates/iceberg/src/writer/combined_writer/delta_writer.rs @@ -0,0 +1,1026 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Delta writers handle row-level changes by combining data file and delete file writers. +//! +//! The delta writer has three sub-writers: +//! - A data file writer for new and updated rows. +//! - A position delete file writer for deletions of existing rows (that have been written within this writer) +//! - An equality delete file writer for deletions of rows based on equality conditions (for rows that may exist in other data files). +//! +//! # Input Data Format +//! +//! The `DeltaWriter` expects input data as Arrow `RecordBatch` with a specific structure: +//! +//! **Required Schema:** +//! - All data columns from your table schema (in order) +//! - A final column containing operation indicators as `Int32Array`: +//! - [`OP_INSERT`] (`1`) = Insert/Update (write to data file) +//! - [`OP_DELETE`] (`-1`) = Delete (write to delete file) +//! +//! **Example Schema:** +//! ```text +//! ┌─────────────┬──────────────┬──────────────┬──────┐ +//! │ id (Int32) │ name (Utf8) │ value (Int32)│ _ops │ +//! ├─────────────┼──────────────┼──────────────┼──────┤ +//! │ 1 │ "Alice" │ 100 │ 1 │ <- Insert +//! │ 2 │ "Bob" │ 200 │ 1 │ <- Insert +//! │ 1 │ "Alice" │ 150 │ -1 │ <- Delete +//! │ 3 │ "Charlie" │ 300 │ 1 │ <- Insert +//! └─────────────┴──────────────┴──────────────┴──────┘ +//! ``` +//! +//! # Unique Columns (Row Identity) +//! +//! The writer uses `unique_cols` (specified as Iceberg field IDs) to uniquely identify rows. +//! These columns form a composite key used for: +//! - Tracking rows written in this session (for position deletes) +//! - Generating equality delete predicates (for rows outside this session) +//! +//! Typically, this would be your table's primary key columns. +//! +//! # Memory Management +//! +//! The writer tracks recently written rows to enable efficient position deletes. +//! The `max_seen_rows` parameter controls this behavior: +//! +//! - **Default (100,000)**: Track up to 100K recently written rows +//! - Deletes for tracked rows → Position deletes (most efficient) +//! - Deletes for older/evicted rows → Equality deletes +//! +//! - **Custom value**: Adjust based on your workload +//! - Higher = more position deletes, more memory usage +//! - Lower = more equality deletes, less memory usage +//! +//! - **Zero (0)**: Disable row tracking completely +//! - All deletes → Equality deletes +//! - No memory overhead, but slower reads +//! +//! # How It Works +//! +//! When you call `write()` with a batch: +//! +//! 1. The batch is partitioned by the operations column +//! 2. For each partition: +//! - **Insert batches** (`ops = OP_INSERT`): +//! - Written to data file writer +//! - Row positions recorded in memory (up to `max_seen_rows`) +//! - **Delete batches** (`ops = OP_DELETE`): +//! - If row exists in tracked positions → Position delete file +//! - If row is unknown or evicted → Equality delete file +//! +//! 3. On `close()`, all three writers are closed and their data files are returned +//! +//! # Example Usage +//! +//! ```ignore +//! use arrow_array::{Int32Array, RecordBatch, StringArray}; +//! use iceberg::writer::DeltaWriterBuilder; +//! +//! // Build a delta writer with unique columns [field_id: 1] (the "id" column) +//! let delta_writer = DeltaWriterBuilder::new( +//! data_writer_builder, +//! pos_delete_writer_builder, +//! eq_delete_writer_builder, +//! vec![1], // field IDs of unique columns +//! ) +//! .with_max_seen_rows(50_000) // Track 50K rows +//! .build(None) +//! .await?; +//! +//! // Create a batch with inserts and deletes +//! let batch = RecordBatch::try_new( +//! schema.clone(), +//! vec![ +//! Arc::new(Int32Array::from(vec![1, 2, 1])), // id +//! Arc::new(StringArray::from(vec!["Alice", "Bob", "Alice"])), // name +//! Arc::new(Int32Array::from(vec![100, 200, -100])), // value +//! Arc::new(Int32Array::from(vec![OP_INSERT, OP_INSERT, OP_DELETE])), // ops +//! ], +//! )?; +//! +//! delta_writer.write(batch).await?; +//! let data_files = delta_writer.close().await?; +//! ``` + +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; + +use arrow_array::builder::BooleanBuilder; +use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; +use arrow_ord::partition::partition; +use arrow_row::{OwnedRow, RowConverter, Rows, SortField}; +use arrow_select::filter::filter_record_batch; +use itertools::Itertools; + +use crate::arrow::record_batch_projector::RecordBatchProjector; +use crate::spec::{DataFile, PartitionKey}; +use crate::writer::base_writer::position_delete_writer::PositionDeleteWriterConfig; +use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder}; +use crate::{Error, ErrorKind, Result}; + +/// Default maximum number of rows to track for position deletes. +/// This limits memory usage for large streaming workloads. +pub const DEFAULT_MAX_SEEN_ROWS: usize = 100_000; + +/// Operation marker for insert/update operations in the DeltaWriter input. +/// When the operations column contains this value, the row is written to a data file. +pub const OP_INSERT: i32 = 1; + +/// Operation marker for delete operations in the DeltaWriter input. +/// When the operations column contains this value, the row is written to a delete file. +pub const OP_DELETE: i32 = -1; + +/// A builder for `DeltaWriter`. +#[derive(Clone, Debug)] +pub struct DeltaWriterBuilder { + data_writer_builder: DWB, + pos_delete_writer_builder: PDWB, + eq_delete_writer_builder: EDWB, + unique_cols: Vec, + max_seen_rows: usize, +} + +impl DeltaWriterBuilder { + /// Creates a new `DeltaWriterBuilder`. + pub fn new( + data_writer_builder: DWB, + pos_delete_writer_builder: PDWB, + eq_delete_writer_builder: EDWB, + unique_cols: Vec, + ) -> Self { + Self { + data_writer_builder, + pos_delete_writer_builder, + eq_delete_writer_builder, + unique_cols, + max_seen_rows: DEFAULT_MAX_SEEN_ROWS, + } + } + + /// Sets the maximum number of rows to track for position deletes. + /// + /// When this limit is reached, the oldest tracked rows are evicted. + /// Deletes for evicted rows will use equality deletes instead of + /// position deletes. Default is [`DEFAULT_MAX_SEEN_ROWS`]. + /// + /// Set to `0` to disable row tracking entirely, causing all deletes + /// to use equality deletes. This eliminates memory overhead but may + /// reduce read performance. + pub fn with_max_seen_rows(mut self, max_seen_rows: usize) -> Self { + self.max_seen_rows = max_seen_rows; + self + } +} + +#[async_trait::async_trait] +impl IcebergWriterBuilder for DeltaWriterBuilder +where + DWB: IcebergWriterBuilder, + PDWB: IcebergWriterBuilder, + EDWB: IcebergWriterBuilder, + DWB::R: CurrentFileStatus, +{ + type R = DeltaWriter; + async fn build(&self, partition_key: Option) -> Result { + let data_writer = self + .data_writer_builder + .build(partition_key.clone()) + .await?; + let pos_delete_writer = self + .pos_delete_writer_builder + .build(partition_key.clone()) + .await?; + let eq_delete_writer = self.eq_delete_writer_builder.build(partition_key).await?; + DeltaWriter::try_new( + data_writer, + pos_delete_writer, + eq_delete_writer, + self.unique_cols.clone(), + self.max_seen_rows, + ) + } +} + +/// Position information of a row in a data file. +pub struct Position { + row_index: i64, + file_path: String, +} + +/// A writer that handles row-level changes by combining data file and delete file writers. +pub struct DeltaWriter { + /// The data file writer for new and updated rows. + pub data_writer: DW, + /// The position delete file writer for deletions of existing rows (that have been written within + /// this writer). + pub pos_delete_writer: PDW, + /// The equality delete file writer for deletions of rows based on equality conditions (for rows + /// that may exist in other data files). + pub eq_delete_writer: EDW, + /// The list of unique columns used for equality deletes. + pub unique_cols: Vec, + /// A map of rows (projected to unique columns) to their corresponding position information. + pub seen_rows: HashMap, + /// Tracks insertion order for seen_rows to enable FIFO eviction. + seen_rows_order: VecDeque, + /// Maximum number of rows to track for position deletes. + max_seen_rows: usize, + /// A projector to project the record batch to the unique columns. + pub(crate) projector: RecordBatchProjector, + /// A converter to convert the projected columns to rows for easy comparison. + pub(crate) row_convertor: RowConverter, +} + +impl DeltaWriter +where + DW: IcebergWriter + CurrentFileStatus, + PDW: IcebergWriter, + EDW: IcebergWriter, +{ + fn try_new( + data_writer: DW, + pos_delete_writer: PDW, + eq_delete_writer: EDW, + unique_cols: Vec, + max_seen_rows: usize, + ) -> Result { + let projector = RecordBatchProjector::from_iceberg_schema( + data_writer.current_schema(), + &unique_cols, + )?; + + let row_convertor = RowConverter::new( + projector + .projected_schema_ref() + .fields() + .iter() + .map(|f| SortField::new(f.data_type().clone())) + .collect(), + )?; + + Ok(Self { + data_writer, + pos_delete_writer, + eq_delete_writer, + unique_cols, + seen_rows: HashMap::new(), + seen_rows_order: VecDeque::new(), + max_seen_rows, + projector, + row_convertor, + }) + } + + async fn insert(&mut self, batch: RecordBatch) -> Result<()> { + let batch_num_rows = batch.num_rows(); + + // Write first to ensure the data is persisted before updating our tracking state. + // This prevents inconsistent state if the write fails. + // Note: We must write before calling current_file_path() because the underlying + // writer may not have created the file yet (lazy initialization). + self.data_writer.write(batch.clone()).await?; + + // Skip row tracking if disabled (max_seen_rows == 0) + if self.max_seen_rows == 0 { + return Ok(()); + } + + let rows = self.extract_unique_column_rows(&batch)?; + + // Get file path and calculate start_row_index after successful write + let file_path = self.data_writer.current_file_path(); + let end_row_num = self.data_writer.current_row_num(); + let start_row_index = end_row_num - batch_num_rows; + + // Record positions for each row in this batch + for (i, row) in rows.iter().enumerate() { + let owned_row = row.owned(); + self.seen_rows.insert(owned_row.clone(), Position { + row_index: start_row_index as i64 + i as i64, + file_path: file_path.clone(), + }); + self.seen_rows_order.push_back(owned_row); + } + + // Evict oldest entries if we exceed the limit + self.evict_oldest_seen_rows(); + + Ok(()) + } + + /// Evicts the oldest entries from seen_rows when the limit is exceeded. + /// Entries that were already deleted are skipped (stale entries in the order queue). + fn evict_oldest_seen_rows(&mut self) { + while self.seen_rows.len() > self.max_seen_rows { + if let Some(old_row) = self.seen_rows_order.pop_front() { + // Only count as eviction if the row still exists (wasn't already deleted) + self.seen_rows.remove(&old_row); + } else { + // Queue is empty but HashMap still has entries - this shouldn't happen + // in normal operation, but break to avoid infinite loop + break; + } + } + } + + async fn delete(&mut self, batch: RecordBatch) -> Result<()> { + // If row tracking is disabled, write all deletes as equality deletes + if self.max_seen_rows == 0 { + self.eq_delete_writer + .write(batch) + .await + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?; + return Ok(()); + } + + let rows = self.extract_unique_column_rows(&batch)?; + let mut file_array = vec![]; + let mut row_index_array = vec![]; + // Build a boolean array to track which rows need equality deletes. + // True = row not seen before, needs equality delete + // False = row was seen, already handled via position delete + let mut needs_equality_delete = BooleanBuilder::new(); + + for row in rows.iter() { + if let Some(pos) = self.seen_rows.remove(&row.owned()) { + // Row was previously inserted, use position delete + row_index_array.push(pos.row_index); + file_array.push(pos.file_path.clone()); + needs_equality_delete.append_value(false); + } else { + // Row not seen before, use equality delete + needs_equality_delete.append_value(true); + } + } + + // Write position deletes for rows that were previously inserted + let file_array: ArrayRef = Arc::new(StringArray::from(file_array)); + let row_index_array: ArrayRef = Arc::new(arrow_array::Int64Array::from(row_index_array)); + + let position_batch = + RecordBatch::try_new(PositionDeleteWriterConfig::arrow_schema(), vec![ + file_array, + row_index_array, + ])?; + + if position_batch.num_rows() > 0 { + self.pos_delete_writer + .write(position_batch) + .await + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?; + } + + // Write equality deletes for rows that were not previously inserted + let eq_batch = filter_record_batch(&batch, &needs_equality_delete.finish()) + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?; + + if eq_batch.num_rows() > 0 { + self.eq_delete_writer + .write(eq_batch) + .await + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}")))?; + } + + Ok(()) + } + + fn extract_unique_column_rows(&mut self, batch: &RecordBatch) -> Result { + self.row_convertor + .convert_columns(&self.projector.project_column(batch.columns())?) + .map_err(|e| Error::new(ErrorKind::Unexpected, format!("{e}"))) + } +} + +#[async_trait::async_trait] +impl IcebergWriter for DeltaWriter +where + DW: IcebergWriter + CurrentFileStatus, + PDW: IcebergWriter, + EDW: IcebergWriter, +{ + async fn write(&mut self, batch: RecordBatch) -> Result<()> { + // Treat the last row as an op indicator +1 for insert, -1 for delete + let ops = batch + .column(batch.num_columns() - 1) + .as_any() + .downcast_ref::() + .ok_or(Error::new( + ErrorKind::Unexpected, + "Failed to downcast ops column", + ))?; + + let partition = + partition(&[batch.column(batch.num_columns() - 1).clone()]).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to partition batch: {e}"), + ) + })?; + + for range in partition.ranges() { + let batch = batch + .project(&(0..batch.num_columns() - 1).collect_vec()) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + format!("Failed to project batch columns: {e}"), + ) + })? + .slice(range.start, range.end - range.start); + match ops.value(range.start) { + OP_INSERT => self.insert(batch).await?, + OP_DELETE => self.delete(batch).await?, + op => { + return Err(Error::new( + ErrorKind::Unexpected, + format!("Ops column must be {OP_INSERT} (insert) or {OP_DELETE} (delete), not {op}"), + )); + } + } + } + + Ok(()) + } + + async fn close(&mut self) -> Result> { + let data_files = self.data_writer.close().await?; + let pos_delete_files = self.pos_delete_writer.close().await?; + let eq_delete_files = self.eq_delete_writer.close().await?; + + Ok(data_files + .into_iter() + .chain(pos_delete_files) + .chain(eq_delete_files) + .collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + mod delta_writer_tests { + use std::collections::HashMap; + + use arrow_array::{Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::file::properties::WriterProperties; + use tempfile::TempDir; + + use super::*; + use crate::arrow::arrow_schema_to_schema; + use crate::io::{FileIOBuilder, LocalFsStorageFactory}; + use crate::spec::{ + DataFileFormat, NestedField, PrimitiveType, Schema as IcebergSchema, Type, + }; + use crate::writer::IcebergWriterBuilder; + use crate::writer::base_writer::data_file_writer::DataFileWriterBuilder; + use crate::writer::base_writer::equality_delete_writer::{ + EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig, + }; + use crate::writer::base_writer::position_delete_writer::PositionDeleteFileWriterBuilder; + use crate::writer::file_writer::ParquetWriterBuilder; + use crate::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, + }; + use crate::writer::file_writer::rolling_writer::RollingFileWriterBuilder; + + fn create_iceberg_schema() -> Arc { + Arc::new( + IcebergSchema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ) + } + + fn create_test_batch_with_ops( + ids: Vec, + names: Vec>, + ops: Vec, + ) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + Field::new("op", DataType::Int32, false), + ])); + + let id_array: ArrayRef = Arc::new(Int32Array::from(ids)); + let name_array: ArrayRef = Arc::new(StringArray::from(names)); + let op_array: ArrayRef = Arc::new(Int32Array::from(ops)); + + RecordBatch::try_new(schema, vec![id_array, name_array, op_array]).unwrap() + } + + #[tokio::test] + async fn test_delta_writer_insert_only() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).build(); + let schema = create_iceberg_schema(); + + // Create data writer + let data_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/data", + temp_dir.path().to_str().unwrap() + )); + let data_file_name_gen = + DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet); + let data_parquet_writer = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + data_parquet_writer, + schema.clone(), + file_io.clone(), + data_location_gen, + data_file_name_gen, + ); + let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder); + + // Create position delete writer + let pos_delete_schema = Arc::new(arrow_schema_to_schema( + &PositionDeleteWriterConfig::arrow_schema(), + )?); + let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/pos_delete", + temp_dir.path().to_str().unwrap() + )); + let pos_delete_file_name_gen = DefaultFileNameGenerator::new( + "pos_delete".to_string(), + None, + DataFileFormat::Parquet, + ); + let pos_delete_parquet_writer = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + pos_delete_schema.clone(), + ); + let pos_delete_rolling_writer_builder = + RollingFileWriterBuilder::new_with_default_file_size( + pos_delete_parquet_writer, + pos_delete_schema, + file_io.clone(), + pos_delete_location_gen, + pos_delete_file_name_gen, + ); + let pos_delete_writer = PositionDeleteFileWriterBuilder::new( + pos_delete_rolling_writer_builder, + PositionDeleteWriterConfig::new(None, 0, None), + ); + + // Create equality delete writer + let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?; + let eq_delete_schema = Arc::new(arrow_schema_to_schema( + eq_delete_config.projected_arrow_schema_ref(), + )?); + let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/eq_delete", + temp_dir.path().to_str().unwrap() + )); + let eq_delete_file_name_gen = DefaultFileNameGenerator::new( + "eq_delete".to_string(), + None, + DataFileFormat::Parquet, + ); + let eq_delete_parquet_writer = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + eq_delete_schema.clone(), + ); + let eq_delete_rolling_writer_builder = + RollingFileWriterBuilder::new_with_default_file_size( + eq_delete_parquet_writer, + eq_delete_schema, + file_io.clone(), + eq_delete_location_gen, + eq_delete_file_name_gen, + ); + let eq_delete_writer = EqualityDeleteFileWriterBuilder::new( + eq_delete_rolling_writer_builder, + eq_delete_config, + ); + + // Create delta writer + let data_writer_instance = data_writer.build(None).await?; + let pos_delete_writer_instance = pos_delete_writer.build(None).await?; + let eq_delete_writer_instance = eq_delete_writer.build(None).await?; + let mut delta_writer = DeltaWriter::try_new( + data_writer_instance, + pos_delete_writer_instance, + eq_delete_writer_instance, + vec![1], // unique on id column + DEFAULT_MAX_SEEN_ROWS, + )?; + + // Write batch with only inserts + let batch = create_test_batch_with_ops( + vec![1, 2, 3], + vec![Some("Alice"), Some("Bob"), Some("Charlie")], + vec![OP_INSERT, OP_INSERT, OP_INSERT], // all inserts + ); + + delta_writer.write(batch).await?; + let data_files = delta_writer.close().await?; + + // Should have 1 data file, 0 delete files + assert_eq!(data_files.len(), 1); + assert_eq!(data_files[0].content, crate::spec::DataContentType::Data); + assert_eq!(data_files[0].record_count, 3); + + // Read back and verify + let input_file = file_io.new_input(data_files[0].file_path.clone())?; + let content = input_file.read().await?; + let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?; + let batches: Vec<_> = reader.map(|b| b.unwrap()).collect(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 3); + + Ok(()) + } + + #[tokio::test] + async fn test_delta_writer_insert_then_position_delete() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).build(); + let schema = create_iceberg_schema(); + + // Create writers (same setup as above) + let data_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/data", + temp_dir.path().to_str().unwrap() + )); + let data_file_name_gen = + DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet); + let data_parquet_writer = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + data_parquet_writer, + schema.clone(), + file_io.clone(), + data_location_gen, + data_file_name_gen, + ); + let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder); + + let pos_delete_schema = Arc::new(arrow_schema_to_schema( + &PositionDeleteWriterConfig::arrow_schema(), + )?); + let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/pos_delete", + temp_dir.path().to_str().unwrap() + )); + let pos_delete_file_name_gen = DefaultFileNameGenerator::new( + "pos_delete".to_string(), + None, + DataFileFormat::Parquet, + ); + let pos_delete_parquet_writer = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + pos_delete_schema.clone(), + ); + let pos_delete_rolling_writer_builder = + RollingFileWriterBuilder::new_with_default_file_size( + pos_delete_parquet_writer, + pos_delete_schema, + file_io.clone(), + pos_delete_location_gen, + pos_delete_file_name_gen, + ); + let pos_delete_writer = PositionDeleteFileWriterBuilder::new( + pos_delete_rolling_writer_builder, + PositionDeleteWriterConfig::new(None, 0, None), + ); + + let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?; + let eq_delete_schema = Arc::new(arrow_schema_to_schema( + eq_delete_config.projected_arrow_schema_ref(), + )?); + let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/eq_delete", + temp_dir.path().to_str().unwrap() + )); + let eq_delete_file_name_gen = DefaultFileNameGenerator::new( + "eq_delete".to_string(), + None, + DataFileFormat::Parquet, + ); + let eq_delete_parquet_writer = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + eq_delete_schema.clone(), + ); + let eq_delete_rolling_writer_builder = + RollingFileWriterBuilder::new_with_default_file_size( + eq_delete_parquet_writer, + eq_delete_schema, + file_io.clone(), + eq_delete_location_gen, + eq_delete_file_name_gen, + ); + let eq_delete_writer = EqualityDeleteFileWriterBuilder::new( + eq_delete_rolling_writer_builder, + eq_delete_config, + ); + + let data_writer_instance = data_writer.build(None).await?; + let pos_delete_writer_instance = pos_delete_writer.build(None).await?; + let eq_delete_writer_instance = eq_delete_writer.build(None).await?; + let mut delta_writer = DeltaWriter::try_new( + data_writer_instance, + pos_delete_writer_instance, + eq_delete_writer_instance, + vec![1], + DEFAULT_MAX_SEEN_ROWS, + )?; + + // First, insert some rows + let insert_batch = create_test_batch_with_ops( + vec![1, 2, 3], + vec![Some("Alice"), Some("Bob"), Some("Charlie")], + vec![OP_INSERT, OP_INSERT, OP_INSERT], + ); + delta_writer.write(insert_batch).await?; + + // Now delete rows that were just inserted (should create position deletes) + let delete_batch = + create_test_batch_with_ops(vec![1, 2], vec![Some("Alice"), Some("Bob")], vec![ + -1, -1, + ]); + delta_writer.write(delete_batch).await?; + + let data_files = delta_writer.close().await?; + + // Should have 1 data file + 1 position delete file + assert_eq!(data_files.len(), 2); + + let data_file = data_files + .iter() + .find(|f| f.content == crate::spec::DataContentType::Data) + .unwrap(); + let pos_delete_file = data_files + .iter() + .find(|f| f.content == crate::spec::DataContentType::PositionDeletes) + .unwrap(); + + assert_eq!(data_file.record_count, 3); + assert_eq!(pos_delete_file.record_count, 2); + + // Verify position delete file content + let input_file = file_io.new_input(pos_delete_file.file_path.clone())?; + let content = input_file.read().await?; + let reader = ParquetRecordBatchReaderBuilder::try_new(content)?.build()?; + let batches: Vec<_> = reader.map(|b| b.unwrap()).collect(); + assert_eq!(batches[0].num_rows(), 2); + + Ok(()) + } + + #[tokio::test] + async fn test_delta_writer_equality_delete() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).build(); + let schema = create_iceberg_schema(); + + // Create writers + let data_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/data", + temp_dir.path().to_str().unwrap() + )); + let data_file_name_gen = + DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet); + let data_parquet_writer = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + data_parquet_writer, + schema.clone(), + file_io.clone(), + data_location_gen, + data_file_name_gen, + ); + let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder); + + let pos_delete_schema = Arc::new(arrow_schema_to_schema( + &PositionDeleteWriterConfig::arrow_schema(), + )?); + let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/pos_delete", + temp_dir.path().to_str().unwrap() + )); + let pos_delete_file_name_gen = DefaultFileNameGenerator::new( + "pos_delete".to_string(), + None, + DataFileFormat::Parquet, + ); + let pos_delete_parquet_writer = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + pos_delete_schema.clone(), + ); + let pos_delete_rolling_writer_builder = + RollingFileWriterBuilder::new_with_default_file_size( + pos_delete_parquet_writer, + pos_delete_schema, + file_io.clone(), + pos_delete_location_gen, + pos_delete_file_name_gen, + ); + let pos_delete_writer = PositionDeleteFileWriterBuilder::new( + pos_delete_rolling_writer_builder, + PositionDeleteWriterConfig::new(None, 0, None), + ); + + let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?; + let eq_delete_schema = Arc::new(arrow_schema_to_schema( + eq_delete_config.projected_arrow_schema_ref(), + )?); + let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/eq_delete", + temp_dir.path().to_str().unwrap() + )); + let eq_delete_file_name_gen = DefaultFileNameGenerator::new( + "eq_delete".to_string(), + None, + DataFileFormat::Parquet, + ); + let eq_delete_parquet_writer = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + eq_delete_schema.clone(), + ); + let eq_delete_rolling_writer_builder = + RollingFileWriterBuilder::new_with_default_file_size( + eq_delete_parquet_writer, + eq_delete_schema, + file_io.clone(), + eq_delete_location_gen, + eq_delete_file_name_gen, + ); + let eq_delete_writer = EqualityDeleteFileWriterBuilder::new( + eq_delete_rolling_writer_builder, + eq_delete_config, + ); + + let data_writer_instance = data_writer.build(None).await?; + let pos_delete_writer_instance = pos_delete_writer.build(None).await?; + let eq_delete_writer_instance = eq_delete_writer.build(None).await?; + let mut delta_writer = DeltaWriter::try_new( + data_writer_instance, + pos_delete_writer_instance, + eq_delete_writer_instance, + vec![1], + DEFAULT_MAX_SEEN_ROWS, + )?; + + // Delete rows that were never inserted (should create equality deletes) + let delete_batch = create_test_batch_with_ops( + vec![99, 100], + vec![Some("X"), Some("Y")], + vec![OP_DELETE, OP_DELETE], + ); + delta_writer.write(delete_batch).await?; + + let data_files = delta_writer.close().await?; + + // Should have only 1 equality delete file + assert_eq!(data_files.len(), 1); + assert_eq!( + data_files[0].content, + crate::spec::DataContentType::EqualityDeletes + ); + assert_eq!(data_files[0].record_count, 2); + + Ok(()) + } + + #[tokio::test] + async fn test_delta_writer_invalid_op() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIOBuilder::new(Arc::new(LocalFsStorageFactory)).build(); + let schema = create_iceberg_schema(); + + // Create writers + let data_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/data", + temp_dir.path().to_str().unwrap() + )); + let data_file_name_gen = + DefaultFileNameGenerator::new("data".to_string(), None, DataFileFormat::Parquet); + let data_parquet_writer = + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); + let data_rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( + data_parquet_writer, + schema.clone(), + file_io.clone(), + data_location_gen, + data_file_name_gen, + ); + let data_writer = DataFileWriterBuilder::new(data_rolling_writer_builder); + + let pos_delete_schema = Arc::new(arrow_schema_to_schema( + &PositionDeleteWriterConfig::arrow_schema(), + )?); + let pos_delete_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/pos_delete", + temp_dir.path().to_str().unwrap() + )); + let pos_delete_file_name_gen = DefaultFileNameGenerator::new( + "pos_delete".to_string(), + None, + DataFileFormat::Parquet, + ); + let pos_delete_parquet_writer = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + pos_delete_schema.clone(), + ); + let pos_delete_rolling_writer_builder = + RollingFileWriterBuilder::new_with_default_file_size( + pos_delete_parquet_writer, + pos_delete_schema, + file_io.clone(), + pos_delete_location_gen, + pos_delete_file_name_gen, + ); + let pos_delete_writer = PositionDeleteFileWriterBuilder::new( + pos_delete_rolling_writer_builder, + PositionDeleteWriterConfig::new(None, 0, None), + ); + + let eq_delete_config = EqualityDeleteWriterConfig::new(vec![1], schema.clone())?; + let eq_delete_schema = Arc::new(arrow_schema_to_schema( + eq_delete_config.projected_arrow_schema_ref(), + )?); + let eq_delete_location_gen = DefaultLocationGenerator::with_data_location(format!( + "{}/eq_delete", + temp_dir.path().to_str().unwrap() + )); + let eq_delete_file_name_gen = DefaultFileNameGenerator::new( + "eq_delete".to_string(), + None, + DataFileFormat::Parquet, + ); + let eq_delete_parquet_writer = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + eq_delete_schema.clone(), + ); + let eq_delete_rolling_writer_builder = + RollingFileWriterBuilder::new_with_default_file_size( + eq_delete_parquet_writer, + eq_delete_schema, + file_io.clone(), + eq_delete_location_gen, + eq_delete_file_name_gen, + ); + let eq_delete_writer = EqualityDeleteFileWriterBuilder::new( + eq_delete_rolling_writer_builder, + eq_delete_config, + ); + + let data_writer_instance = data_writer.build(None).await?; + let pos_delete_writer_instance = pos_delete_writer.build(None).await?; + let eq_delete_writer_instance = eq_delete_writer.build(None).await?; + let mut delta_writer = DeltaWriter::try_new( + data_writer_instance, + pos_delete_writer_instance, + eq_delete_writer_instance, + vec![1], + DEFAULT_MAX_SEEN_ROWS, + )?; + + // Invalid operation code + let batch = create_test_batch_with_ops(vec![1], vec![Some("Alice")], vec![99]); + + let result = delta_writer.write(batch).await; + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("Ops column must be 1 (insert) or -1 (delete)") + ); + + Ok(()) + } + } +} diff --git a/crates/iceberg/src/writer/combined_writer/mod.rs b/crates/iceberg/src/writer/combined_writer/mod.rs new file mode 100644 index 0000000000..46f0f3fc6b --- /dev/null +++ b/crates/iceberg/src/writer/combined_writer/mod.rs @@ -0,0 +1,4 @@ +//! Combined writers compose multiple base writers into a single writer that can handle +//! more complex writing scenarios, such as row-level changes involving both data files and delete files. + +pub mod delta_writer; diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 0984d8fc64..1964cb0e04 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -570,6 +570,10 @@ impl CurrentFileStatus for ParquetWriter { 0 } } + + fn current_schema(&self) -> SchemaRef { + self.schema.clone() + } } /// AsyncFileWriter is a wrapper of FileWrite to make it compatible with tokio::io::AsyncWrite. diff --git a/crates/iceberg/src/writer/file_writer/rolling_writer.rs b/crates/iceberg/src/writer/file_writer/rolling_writer.rs index b86f6a2ea7..8353dc08fa 100644 --- a/crates/iceberg/src/writer/file_writer/rolling_writer.rs +++ b/crates/iceberg/src/writer/file_writer/rolling_writer.rs @@ -20,7 +20,7 @@ use std::fmt::{Debug, Formatter}; use arrow_array::RecordBatch; use crate::io::{FileIO, OutputFile}; -use crate::spec::{DataFileBuilder, PartitionKey, TableProperties}; +use crate::spec::{DataFileBuilder, PartitionKey, TableProperties, SchemaRef}; use crate::writer::CurrentFileStatus; use crate::writer::file_writer::location_generator::{FileNameGenerator, LocationGenerator}; use crate::writer::file_writer::{FileWriter, FileWriterBuilder}; @@ -34,6 +34,7 @@ pub struct RollingFileWriterBuilder< F: FileNameGenerator, > { inner_builder: B, + schema: SchemaRef, target_file_size: usize, file_io: FileIO, location_generator: L, @@ -51,6 +52,7 @@ where /// # Parameters /// /// * `inner_builder` - The builder for the underlying file writer + /// * `schema` - The schema for the data being written /// * `target_file_size` - The target file size in bytes that triggers rollover /// * `file_io` - The file IO interface for creating output files /// * `location_generator` - Generator for file locations @@ -61,6 +63,7 @@ where /// A new `RollingFileWriterBuilder` instance pub fn new( inner_builder: B, + schema: SchemaRef, target_file_size: usize, file_io: FileIO, location_generator: L, @@ -68,6 +71,7 @@ where ) -> Self { Self { inner_builder, + schema, target_file_size, file_io, location_generator, @@ -80,6 +84,7 @@ where /// # Parameters /// /// * `inner_builder` - The builder for the underlying file writer + /// * `schema` - The schema for the data being written /// * `file_io` - The file IO interface for creating output files /// * `location_generator` - Generator for file locations /// * `file_name_generator` - Generator for file names @@ -89,12 +94,14 @@ where /// A new `RollingFileWriterBuilder` instance with default target file size pub fn new_with_default_file_size( inner_builder: B, + schema: SchemaRef, file_io: FileIO, location_generator: L, file_name_generator: F, ) -> Self { Self { inner_builder, + schema, target_file_size: TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, file_io, location_generator, @@ -107,6 +114,7 @@ where RollingFileWriter { inner: None, inner_builder: self.inner_builder.clone(), + schema: self.schema.clone(), target_file_size: self.target_file_size, data_file_builders: vec![], file_io: self.file_io.clone(), @@ -125,6 +133,7 @@ where pub struct RollingFileWriter { inner: Option, inner_builder: B, + schema: SchemaRef, target_file_size: usize, data_file_builders: Vec, file_io: FileIO, @@ -247,11 +256,21 @@ impl CurrentFi } fn current_row_num(&self) -> usize { - self.inner.as_ref().unwrap().current_row_num() + self.inner + .as_ref() + .map(|inner| inner.current_row_num()) + .unwrap_or(0) } fn current_written_size(&self) -> usize { - self.inner.as_ref().unwrap().current_written_size() + self.inner + .as_ref() + .map(|inner| inner.current_written_size()) + .unwrap_or(0) + } + + fn current_schema(&self) -> SchemaRef { + self.schema.clone() } } @@ -312,15 +331,16 @@ mod tests { DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); // Create schema - let schema = make_test_schema()?; + let schema = Arc::new(make_test_schema()?); // Create writer builders let parquet_writer_builder = - ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)); + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); // Set a large target size so no rolling occurs let rolling_file_writer_builder = RollingFileWriterBuilder::new( parquet_writer_builder, + schema, 1024 * 1024, file_io.clone(), location_gen, @@ -370,15 +390,16 @@ mod tests { DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); // Create schema - let schema = make_test_schema()?; + let schema = Arc::new(make_test_schema()?); // Create writer builders let parquet_writer_builder = - ParquetWriterBuilder::new(WriterProperties::builder().build(), Arc::new(schema)); + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); // Set a very small target size to trigger rolling let rolling_writer_builder = RollingFileWriterBuilder::new( parquet_writer_builder, + schema, 1024, file_io, location_gen, diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index d475230685..0f665f8e78 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -86,14 +86,16 @@ //! ); //! //! // Create a parquet file writer builder. The parameter can get from table. +//! let schema = table.metadata().current_schema().clone(); //! let parquet_writer_builder = ParquetWriterBuilder::new( //! WriterProperties::default(), -//! table.metadata().current_schema().clone(), +//! schema.clone(), //! ); //! //! // Create a rolling file writer using parquet file writer builder. //! let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( //! parquet_writer_builder, +//! schema, //! table.file_io().clone(), //! location_generator.clone(), //! file_name_generator.clone(), @@ -215,14 +217,16 @@ //! ); //! //! // Create a parquet file writer builder. The parameter can get from table. +//! let schema = table.metadata().current_schema().clone(); //! let parquet_writer_builder = ParquetWriterBuilder::new( //! WriterProperties::default(), -//! table.metadata().current_schema().clone(), +//! schema.clone(), //! ); //! //! // Create a rolling file writer //! let rolling_file_writer_builder = RollingFileWriterBuilder::new( //! parquet_writer_builder, +//! schema, //! 512 * 1024 * 1024, //! table.file_io().clone(), //! location_generator.clone(), @@ -274,9 +278,10 @@ //! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?; //! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); //! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet); -//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone()); +//! # let schema = table.metadata().current_schema().clone(); +//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), schema.clone()); //! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( -//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator); +//! # parquet_writer_builder, schema, table.file_io().clone(), location_generator, file_name_generator); //! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); //! //! // Wrap the data file writer with FanoutWriter for partitioning @@ -338,9 +343,10 @@ //! # let table = catalog.load_table(&TableIdent::from_strs(["hello", "world"])?).await?; //! # let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); //! # let file_name_generator = DefaultFileNameGenerator::new("test".to_string(), None, iceberg::spec::DataFileFormat::Parquet); -//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), table.metadata().current_schema().clone()); +//! # let schema = table.metadata().current_schema().clone(); +//! # let parquet_writer_builder = ParquetWriterBuilder::new(WriterProperties::default(), schema.clone()); //! # let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( -//! # parquet_writer_builder, table.file_io().clone(), location_generator, file_name_generator); +//! # parquet_writer_builder, schema, table.file_io().clone(), location_generator, file_name_generator); //! # let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); //! //! // Wrap the data file writer with ClusteredWriter for sorted partitioning @@ -385,13 +391,14 @@ //! ``` pub mod base_writer; +pub mod combined_writer; pub mod file_writer; pub mod partitioning; use arrow_array::RecordBatch; use crate::Result; -use crate::spec::{DataFile, PartitionKey}; +use crate::spec::{DataFile, PartitionKey, SchemaRef}; type DefaultInput = RecordBatch; type DefaultOutput = Vec; @@ -426,6 +433,8 @@ pub trait CurrentFileStatus { fn current_row_num(&self) -> usize; /// Get the current file written size. fn current_written_size(&self) -> usize; + /// Get the current schema used by the writer. + fn current_schema(&self) -> SchemaRef; } #[cfg(test)] diff --git a/crates/iceberg/src/writer/partitioning/clustered_writer.rs b/crates/iceberg/src/writer/partitioning/clustered_writer.rs index e2e0452045..d130aef799 100644 --- a/crates/iceberg/src/writer/partitioning/clustered_writer.rs +++ b/crates/iceberg/src/writer/partitioning/clustered_writer.rs @@ -198,6 +198,7 @@ mod tests { // Create rolling file writer builder let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + schema.clone(), file_io.clone(), location_gen, file_name_gen, @@ -316,6 +317,7 @@ mod tests { // Create rolling file writer builder let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + schema.clone(), file_io.clone(), location_gen, file_name_gen, @@ -449,6 +451,7 @@ mod tests { // Create rolling file writer builder let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + schema.clone(), file_io.clone(), location_gen, file_name_gen, diff --git a/crates/iceberg/src/writer/partitioning/fanout_writer.rs b/crates/iceberg/src/writer/partitioning/fanout_writer.rs index 5348fea157..b7e3457207 100644 --- a/crates/iceberg/src/writer/partitioning/fanout_writer.rs +++ b/crates/iceberg/src/writer/partitioning/fanout_writer.rs @@ -174,6 +174,7 @@ mod tests { // Create rolling file writer builder let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + schema.clone(), file_io.clone(), location_gen, file_name_gen, @@ -289,6 +290,7 @@ mod tests { // Create rolling file writer builder let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + schema.clone(), file_io.clone(), location_gen, file_name_gen, diff --git a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs index de127e0c9d..f2f4424d8f 100644 --- a/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs +++ b/crates/iceberg/src/writer/partitioning/unpartitioned_writer.rs @@ -162,6 +162,7 @@ mod tests { ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + schema, file_io, location_gen, file_name_gen, diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs index dc3030519f..4e6254dfce 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -83,6 +83,7 @@ async fn test_append_data_file_conflict() { ); let rolling_file_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + table.metadata().current_schema().clone(), table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 0dea150d31..0e19aca463 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -226,9 +226,10 @@ impl ExecutionPlan for IcebergWriteExec { } // Create data file writer builder + let current_schema = self.table.metadata().current_schema().clone(); let parquet_file_writer_builder = ParquetWriterBuilder::new_with_match_mode( WriterProperties::default(), - self.table.metadata().current_schema().clone(), + current_schema.clone(), FieldMatchMode::Name, ); let target_file_size = table_props.write_target_file_size_bytes; @@ -242,6 +243,7 @@ impl ExecutionPlan for IcebergWriteExec { DefaultFileNameGenerator::new(Uuid::now_v7().to_string(), None, file_format); let rolling_writer_builder = RollingFileWriterBuilder::new( parquet_file_writer_builder, + current_schema, target_file_size, file_io, location_generator, diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index 4ac5323bb8..f0c7e27369 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -356,9 +356,10 @@ mod tests { let file_name_gen = DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); let parquet_writer_builder = - ParquetWriterBuilder::new(WriterProperties::builder().build(), schema); + ParquetWriterBuilder::new(WriterProperties::builder().build(), schema.clone()); let rolling_writer_builder = RollingFileWriterBuilder::new_with_default_file_size( parquet_writer_builder, + schema, file_io, location_gen, file_name_gen,