refactor: use iceberg-rust rolling writer directly#134
Conversation
There was a problem hiding this comment.
Pull request overview
Refactors compaction output writing to rely on iceberg-rust’s upstream rolling writer implementation (after bumping workspace deps), removing the local rolling writer wrapper while keeping deprecated config knobs as compatibility no-ops.
Changes:
- Bump
iceberg-rustgit workspace dependencies tobaaa9c7b2deb3e744db21712e4b6ced5891a6012. - Remove the local
RollingIcebergWritermodule and switch DataFusion execution toRollingFileWriterBuilderdirectly (includingtarget_file_size_bytes+max_concurrent_closespassthrough). - Update integration test expectations for upstream rolling behavior and deprecate dynamic size-estimation config fields/setters as no-op shims.
Reviewed changes
Copilot reviewed 7 out of 8 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| integration-tests/src/integration_tests.rs | Adjusts rolling compaction assertions to match upstream rolling behavior by counting outputs per partition bucket. |
| core/src/executor/mod.rs | Stops exporting the removed iceberg_writer module. |
| core/src/executor/iceberg_writer/rolling_iceberg_writer.rs | Deletes the local rolling writer implementation and its unit tests. |
| core/src/executor/iceberg_writer/mod.rs | Removes the now-defunct iceberg_writer module entrypoint. |
| core/src/executor/datafusion/mod.rs | Constructs writers using RollingFileWriterBuilder directly and passes through rolling-related config. |
| core/src/config/mod.rs | Deprecates dynamic size-estimation config fields and builder setters as no-op compatibility shims. |
| Cargo.toml | Updates iceberg-related git dependency revisions. |
| Cargo.lock | Locks updated transitive dependency graph for the new iceberg revision. |
| output_files_per_partition.values().all(|count| *count == 3), | ||
| "Compaction should produce exactly 3 files per partition with upstream rolling: {output_files_per_partition:?}" |
There was a problem hiding this comment.
This test’s goal (per the comment) is to ensure rolling within a partition doesn’t panic/error, but asserting an exact == 3 files per partition is likely brittle (minor parquet/iceberg writer changes or different compression can shift rollover boundaries). Consider relaxing this to assert that each bucket produced >1 output file (or a small acceptable range), while still asserting every partition bucket is present.
| output_files_per_partition.values().all(|count| *count == 3), | |
| "Compaction should produce exactly 3 files per partition with upstream rolling: {output_files_per_partition:?}" | |
| output_files_per_partition.values().all(|count| *count > 1), | |
| "Compaction should produce more than one file per partition with upstream rolling: {output_files_per_partition:?}" |
|
hi @vovacf201 @nagraham @chenzl25 We previously discussed a better size switching strategy. I have now found that the upstream has exposed the According to my tests, we no longer need to wrap an additional writer externally, and I have also retained concurrent close to ensure that throughput performance is not affected. After this PR is merged, we will deprecate two config items, for which I have added the |
nagraham
left a comment
There was a problem hiding this comment.
Excellent refactor! I see this is going to give us greater accuracy in file sizes, and it greatly reduces complexity in the code base. This makes me very happy.
The problem before is that the old writer was forced into using Arrow's RecordBatch::get_array_memory_size(), which was in-accurate (it even admitted that in its docs). This didn't account for Parquet's columnar encoding or its compression. And it had to rely on this memory size for the whole file. So we would get wild inaccuracies. For some data sets, we would try to compress to 128MB, but get files that were 20-30MB.
The iceberg-rust RollingIcebergWriter addresses the problem because Arrow exposed the current_written_size() function. That will return inner.bytes_written() + inner.in_progress_size().
bytes_written: This is the ACTUAL compressed / encoded bytes flushed to the parquet filein_progress_size: An estimate of the size of the the arrow column writer.
IIRC, that is exactly how the Java library estimates size as well.
Summary
iceberg-rustworkspace deps tobaaa9c7b2deb3e744db21712e4b6ced5891a6012RollingIcebergWriterwrapper and use upstreamRollingFileWriterBuilderdirectlytarget_file_size_bytesandmax_concurrent_closesthrough to the upstream rolling writerenable_dynamic_size_estimationandsize_estimation_smoothing_factor; both settings are now no-op compatibility shims and their builder setters emit deprecation warningsTest
cargo test -p iceberg-compaction-core --lib -- --nocapturecargo test -p iceberg-compaction-integration-tests -- --nocaptureNote