-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Assertion fail in external sort #15469
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes an assertion failure in the external sort by refactoring the state management of in-memory batches. It removes the in_mem_batches_sorted flag, uses a new locally scoped globally_sorted_batches vector to hold sorted batches, and updates the associated spill logic accordingly.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
datafusion/physical-plan/src/sorts/sort.rs | Refactors ExternalSorter to remove the redundant in_mem_batches_sorted flag and improve spill logic using a new globally_sorted_batches vector |
datafusion/core/tests/memory_limit/mod.rs | Adds a new test to verify external sort behavior with a zero merge reservation |
Comments suppressed due to low confidence (1)
datafusion/physical-plan/src/sorts/sort.rs:476
- [nitpick] Now that this function operates on a passed-in globally_sorted_batches vector rather than an instance field, a name like 'organize_sorted_batches' might more clearly convey its purpose.
fn organize_stringview_arrays(
in_progress_file.append_batch(&batch)?; | ||
} | ||
|
||
assert!(globally_sorted_batches.is_empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we can get rid of assert and return Err
? if the code regressed the users can face the app crash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, addressed in 1f4eabb
@@ -216,10 +216,8 @@ struct ExternalSorter { | |||
// STATE BUFFERS: | |||
// Fields that hold intermediate data during sorting | |||
// ======================================================================== | |||
/// Potentially unsorted in memory buffer | |||
/// Unsorted input batches stored in the memory buffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing this field is an improvement 👍 thanks @2010YOUY01 and @comphead
Given there are still three fields whose relationship is tied, I wonder if it would improve the code if we encoded that relationship in an actual rust enum
-- for example
enum SortState {
/// All data is in memory
Memory {
batches: Vec<RecordBatch>,
},
/// intermediate data is spilling to disk
Spilling {
batches: Vec<RecordBatch>,
in_progress_spill_file: InProgressSpillFile,
}
...
}
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏼 will try it.
/// the `globally_sorted_batches` (also its memory reservation) afterwards. | ||
async fn consume_and_spill_append( | ||
&mut self, | ||
globally_sorted_batches: &mut Vec<RecordBatch>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for the name here -- much better
Which issue does this PR close?
related to #15372
Rationale for this change
One external sort query will panic due to an assertion failure
Reproducer
Compile and run
datafusion-cli
commit 46f4024For background knowledge for external sort, check the doc in
datafusion/datafusion/physical-plan/src/sorts/sort.rs
Line 80 in 46f4024
Now a single field
in_mem_batches
is used to represent buffered data in different stage: during different time, it can be interpreted as either unordered input batch, or globally sorted batch, a state flagself.in_mem_batches_sorted
is used to represent its sorted-ness and do extra sanity checks.This approach has poor understandability and is also error-prone, and actually triggered one edge case assertion failure.
Note the execution logic is not wrong, the issue is this flag is set too late.
To also address #15372, this PR did a small refactor to make the related logic more understandable:
What changes are included in this PR?
refactor to let
ExternalSorter.in_mem_batches
to only hold unsorted input batches (instead of representing both sorted and unsorted batches during different execution stage, as it's doing before the PR)Are these changes tested?
A unit test was added to cover the edge case that previously triggered an assertion failure.
Are there any user-facing changes?
No.