Conversation
core/src/executor/datafusion/mod.rs
Outdated
| location_generator, | ||
| sort_order, | ||
| } = request; | ||
| println!("Starting rewrite_files with {:?} input files", sort_order); |
There was a problem hiding this comment.
Should remove debug codes.
There was a problem hiding this comment.
Pull request overview
This pull request adds support for handling sort orders during Iceberg table compaction. The implementation propagates table sort order information through the compaction pipeline, applies sorting via DataFusion's SortExec, and ensures the sort order ID is set correctly in output data files.
Changes:
- Added
sort_orderfield toRewriteFilesRequestandDataFusionTaskContextto propagate sort order information - Implemented sort order application in DataFusion processor using physical sort expressions
- Updated
build_iceberg_data_file_writerto accept and set sort_order_id on output files - Added comprehensive integration test for sort order functionality
- Updated iceberg-rust dependency to version c98834afac75ce3ecf0428fff3628f5d5a740e43
Reviewed changes
Copilot reviewed 5 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| core/src/executor/mod.rs | Added sort_order field to RewriteFilesRequest struct |
| core/src/executor/datafusion/mod.rs | Extract sort_order_id and pass it to data file writer, updated builder |
| core/src/executor/datafusion/datafusion_processor.rs | Implemented sort expression creation from Iceberg sort order and applied SortExec |
| core/src/compaction/mod.rs | Populate sort_order in RewriteFilesRequest and added integration test |
| Cargo.toml | Updated iceberg-rust dependencies to new revision |
| Cargo.lock | Updated dependency lock entries |
| // Create sort expressions based on the Iceberg sort order if available | ||
| let sort_exprs = if let Some((_, sort_order)) = sort_order { | ||
| // Use Iceberg sort order to create physical sort expressions | ||
| let mut exprs = Vec::new(); | ||
| for sort_field in &sort_order.fields { | ||
| // Find the column name from the field id | ||
| if let Some(field) = input_schema.field_by_id(sort_field.source_id) { | ||
| // Find the column index in the physical schema | ||
| if let Ok(column_index) = schema.index_of(&field.name) { | ||
| let sort_options = SortOptions { | ||
| descending: matches!( | ||
| sort_field.direction, | ||
| iceberg::spec::SortDirection::Descending | ||
| ), | ||
| nulls_first: matches!( | ||
| sort_field.null_order, | ||
| iceberg::spec::NullOrder::First | ||
| ), | ||
| }; | ||
|
|
||
| exprs.push(PhysicalSortExpr { | ||
| expr: Arc::new(Column::new(&field.name, column_index)), | ||
| options: sort_options, | ||
| }); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The sort order implementation only handles Identity transforms. Sort fields with other transforms (e.g., bucket, truncate, year, month, day, hour) are silently ignored because the code only extracts the source field name, not the transformed column. This means sort orders with transforms won't be properly applied during compaction. Consider adding validation to reject non-Identity transforms or implementing proper transform handling.
|
|
||
| let mut stats = RewriteFilesStat::default(); | ||
| stats.record_input(&file_group); | ||
| let sort_order_id = sort_order.clone().map(|(id, _)| id as i32); |
There was a problem hiding this comment.
Casting sort_order_id from i64 to i32 can cause data truncation if the sort order ID exceeds the i32 range. While unlikely in practice, this could lead to incorrect sort_order_id values in the output files. Consider using TryFrom to handle overflow gracefully, or verify that Iceberg sort order IDs are guaranteed to fit in i32.
| let sort_order_id = sort_order.clone().map(|(id, _)| id as i32); | |
| let sort_order_id = sort_order.clone().and_then(|(id, _)| i32::try_from(id).ok()); |
| let plan_to_execute = match sort_exprs { | ||
| Some(exprs) if !exprs.is_empty() => { | ||
| if let Some(lex_ordering) = LexOrdering::new(exprs) { | ||
| Arc::new( | ||
| SortExec::new(lex_ordering, plan_to_execute) | ||
| .with_preserve_partitioning(true), | ||
| ) | ||
| } else { | ||
| plan_to_execute | ||
| } | ||
| } | ||
| _ => plan_to_execute, | ||
| }; |
There was a problem hiding this comment.
The sort operation is applied after repartitioning with with_preserve_partitioning(true), which means each partition is sorted independently. This could result in data files where the overall data is not globally sorted, only locally sorted within each partition. If the goal is to produce globally sorted output files, the sort should be applied before repartitioning, or a global sort should be used instead.
core/src/executor/datafusion/mod.rs
Outdated
| let builder = | ||
| DataFileWriterBuilder::new(rolling_writer_builder).sort_order_id(sort_order_id); | ||
| builder |
There was a problem hiding this comment.
Unnecessary intermediate variable assignment. The builder can be returned directly from the expression without assigning it to a variable first.
| let builder = | |
| DataFileWriterBuilder::new(rolling_writer_builder).sort_order_id(sort_order_id); | |
| builder | |
| DataFileWriterBuilder::new(rolling_writer_builder).sort_order_id(sort_order_id) |
| let df = ctx | ||
| .sql("SELECT * FROM compacted_table ORDER BY id") |
There was a problem hiding this comment.
The test adds an explicit ORDER BY clause in the SQL query, which means the test is verifying the final sorted result rather than testing whether the compaction process itself properly sorts the data. The test should read the data without an ORDER BY to verify that the compaction output is already sorted according to the table's sort order.
|
|
||
| let table = env.catalog.load_table(&table_ident).await.unwrap(); | ||
|
|
||
| let data_files = write_simple_files(&env.table, &env.warehouse_location, "test", 3).await; |
There was a problem hiding this comment.
The test writes files to env.table instead of the newly created table with sort order. This means the data files are being written to the wrong table (test_table instead of test_table_order). The test should use table which was loaded at line 1884, or write the files directly to the table with the sort order.
| let data_files = write_simple_files(&env.table, &env.warehouse_location, "test", 3).await; | |
| let data_files = write_simple_files(&table, &env.warehouse_location, "test", 3).await; |
core/src/compaction/mod.rs
Outdated
| for (_, batch) in batches.iter().enumerate() { | ||
| batch | ||
| .column(0) | ||
| .as_any() | ||
| .downcast_ref::<Int32Array>() | ||
| .unwrap() | ||
| .iter() | ||
| .enumerate() | ||
| .for_each(|(i, value)| { | ||
| assert_eq!(value.unwrap(), target_id[i]); | ||
| }); | ||
| } |
There was a problem hiding this comment.
The test validation logic has a bug: it iterates through batches and tries to assert each batch against the full target_id array, but uses the index from within each batch. If there are multiple batches, or a batch with fewer than 9 rows, the assertion will either panic (index out of bounds) or incorrectly validate the data. The loop should track a running index across all batches, or flatten all batches first before validating.
| for (_, batch) in batches.iter().enumerate() { | |
| batch | |
| .column(0) | |
| .as_any() | |
| .downcast_ref::<Int32Array>() | |
| .unwrap() | |
| .iter() | |
| .enumerate() | |
| .for_each(|(i, value)| { | |
| assert_eq!(value.unwrap(), target_id[i]); | |
| }); | |
| } | |
| let mut row_index = 0; | |
| for batch in batches.iter() { | |
| batch | |
| .column(0) | |
| .as_any() | |
| .downcast_ref::<Int32Array>() | |
| .unwrap() | |
| .iter() | |
| .for_each(|value| { | |
| assert_eq!(value.unwrap(), target_id[row_index]); | |
| row_index += 1; | |
| }); | |
| } | |
| assert_eq!(row_index, target_id.len()); |
# Conflicts: # Cargo.lock # Cargo.toml # core/src/compaction/mod.rs # core/src/executor/datafusion/datafusion_processor.rs # core/src/executor/datafusion/mod.rs # core/src/executor/mod.rs
|
Please post the relative PR in RisingWave when it is ready. |
chenzl25
left a comment
There was a problem hiding this comment.
Review Comments
1. Non-Identity transforms are silently ignored (blocking)
In datafusion_processor.rs:172-194, when building sort expressions, the code only uses the source_id to find the column name but completely ignores sort_field.transform. If the sort order uses a non-Identity transform (e.g., bucket(16, col) or truncate(10, col)), the current code will sort by the raw column value, producing incorrect results.
Suggestion:
- At minimum, check if the transform is
Identityand skip (or return an error) for non-Identity transforms - Support for complex transforms can be added later
2. Test does not actually verify sort order (needs fix)
In compaction/mod.rs, the test query uses SELECT * FROM compacted_table ORDER BY id. The ORDER BY clause causes DataFusion to re-sort the results, so the test would pass even if compaction did not apply any sorting at all.
The query should be changed to SELECT * FROM compacted_table (without ORDER BY), and then verify that the returned data is already sorted. This would actually prove that the sort was applied during compaction.
3. rebuild_data_file_with_sort_order_id is fragile (suggestion)
In executor/datafusion/mod.rs:236-270, the entire DataFile is manually reconstructed just to set sort_order_id. If upstream iceberg-rust adds new fields to DataFile, this function won't produce a compile error (since the builder has default values) and could silently lose metadata.
Suggestions:
- Check if iceberg-rust's
DataFilehas awith_sort_order_id()or similar method that can be used directly - If not, at least add a comment noting that this function must be kept in sync with upstream
DataFilefields
Minor (non-blocking):
- The
sort_orderfield usesOption<(i64, SortOrderRef)>— a small struct or type alias would improve readability SortExec::new(...).with_preserve_partitioning(true)is correct (each partition is sorted independently), but a comment explaining why would be helpful
This pull request adds support for handling sort order during table compaction, ensuring that data files produced by compaction are properly sorted according to the table's sort order specification. The changes propagate sort order information through the compaction and execution pipeline, update the DataFusion-based processing to apply the sort order, and introduce comprehensive tests to verify the new functionality.
Sort Order Support in Compaction:
Compactionstruct and its initialization now capture the table's default sort order, making it available for downstream processing.RewriteFilesRequestand related execution contexts (DataFusionTaskContext,DataFusionTaskContextBuilder) have been updated to carry sort order information throughout the compaction process. [1] [2] [3] [4] [5] [6] [7] [8]Applying Sort Order in DataFusion Processing:
build_iceberg_data_file_writerfunction and related writer builders have been updated to accept and propagate the sort order ID, ensuring metadata correctness in output files. [1] [2] [3]Testing and Validation:
These changes ensure that the compaction process respects table sort order, improving query performance and correctness for sorted tables.