Skip to content

Add MERGE INTO support for DataFusion integration #2201

@wirybeaver

Description

@wirybeaver

What's the feature are you trying to implement?

Add support for SQL MERGE INTO (UPSERT) operations in the iceberg-datafusion integration. This enables atomic row-level updates and inserts based on join conditions, essential for CDC pipelines, incremental updates, and data synchronization. I already have a PoC branch.

The Spark SPJ (Storage Partition Join) style is the key optimization I wanted to introduce. The Datafusion currently doesn't support merge_into sql parsing and logic plan yet. I am contributing the "MERGE INTO" in datafusion as well: apache/datafusion#20746.

SQL Example:

MERGE INTO target_table t
USING source_table s
ON t.id = s.id
WHEN MATCHED THEN
  UPDATE SET t.value = s.value
WHEN NOT MATCHED THEN
  INSERT (id, value) VALUES (s.id, s.value)

The following tasks are already completed on the PoC branch. Will raise formal PRs one after another as the fork repo doesn't support stacking PRs.

  • feat(transaction): add RowDelta transaction action for row-level modi… #2203
  • Add IcebergMergeExec with HashJoinExec integration and row classification
  • Add IcebergMergeWriteExec and IcebergMergeCommitExec nodes
  • Implement full MERGE execution logic with file tracking
  • Integrate MERGE INTO into IcebergTableProvider
  • Add comprehensive MERGE INTO integration tests
  • Add partition-aware merge optimization (spark storage partition join style)

Willingness to contribute

I would be willing to contribute to this feature with guidance from the Iceberg Rust community

Metadata

Metadata

Assignees

No one assigned

    Labels

    epicEpic issue

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions