-
Notifications
You must be signed in to change notification settings - Fork 1.9k
fix(functions-aggregate): drain CORR state vectors for streaming aggregation #19669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| 2 2 NULL | ||
| 2 3 NULL | ||
| 2 4 NULL | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to add a companion EXPLAIN query to verify that it uses the streaming path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had it at first, and removed it as I found it too verbose. Same with a dedicated unit test in correlation.rs, which seemed out of place and only serving as a "demo" of the bug.
Adding just the EXPLAIN for CORR seems too specific to me here. However, I think it would make a lot of sense to actually have a dedicated .slt that runs EXPLAIN and the actual query for all aggregates.
@martin-g WDYT?
EDIT: pushed new comprehensive tests in commit test: add comprehensive aggregate tests for streaming aggregation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way is fine as long as there is a way to assert that it behaves the way it is supposed to be.
martin-g
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Which issue does this PR close?
Rationale for this change
This change addresses a failure in the
CORRaggregate function when running in streaming mode. TheCorrelationGroupsAccumulator(introduced in PR #13581) was failing to drain its state vectors duringEmitTo::Firstcalls, causing internal state to persist across emissions. This led to memory leaks, incorrect results for subsequent groups, and "length mismatch" errors because the internal vector sizes diverged from the number of emitted groups.Reproducer
Before:
DataFusion error: Arrow error: Invalid argument error: all columns in a record batch must have the same lengthAfter:
What changes are included in this PR?
This PR is structured into two commits: the first adds a failing test case to demonstrate the issue, and the second implements the fix.
The accumulator now uses
emit_to.take_needed()in bothevaluateandstateto properly consume the emitted portions of the state vectors. Additionally, thesize()implementation has been updated to use vector capacity for more accurate memory accounting.Are these changes tested?
Yes, a new test case in
aggregate.slttriggers streaming aggregation via an ordered subquery. This test previously crashed with an Arrow length mismatch error and now produces correct results.Are there any user-facing changes?
Yes, SQL queries that trigger streaming aggregation using
CORR(typically those with specific ordering requirements) will now succeed instead of failing with a length mismatch error.