-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Compute Dynamic Filters only when a consumer supports them #19546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Compute Dynamic Filters only when a consumer supports them #19546
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this! The advantages over #19387 are:
- No API change / breaking changes
- Less code churn for us and users
- Complexity is contained within dynamic filters and even there within producers
- Should work for distributed systems (whatever is broadcasting updates to filters will also need to hold onto a reference to the dynamic filter)
This also means that if we run into issues with this approach it's easy to back out of 😄
Is there any way we can add a test showing that if there are no downstream consumers we don't compute the filters?
I added should still return |
datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Outdated
Show resolved
Hide resolved
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe append to the same test the positive case (probe side does support pushdown, is_used is true) just to prove the point? Could even be in a loop to avoid code duplication.
| let _consumer = Arc::clone(&dynamic_filter) | ||
| .with_new_children(vec![]) | ||
| .unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to add a consumer in these tests, otherwise is_used will return false, no filters will be computed and wait_complete will never return. I will add an is_used check inside wait_complete as well, I can't imagine this ever happenning (unless we call wait_complete on a probe node that does not accept dynamic filters which would be wrong usage) but its worth adding just in case.
|
@LiaCastaneda thank you! Maybe a nice follow up would be to split up the |
|
By computed independently and removing the barrier, do you mean computing and emitting each filter for each partition progressively? |
|
nice! I will take look |
Which issue does this PR close?
Closes #17527
Rationale for this change
Currently, DataFusion computes bounds for all queries that contain a HashJoinExec node whenever the option enable_dynamic_filter_pushdown is set to true (default). It might make sense to compute these bounds only when we explicitly know there is a consumer that will use them.
What changes are included in this PR?
As suggested in #17527 (comment), this PR adds an is_used() method to DynamicFilterPhysicalExpr that checks if any consumers are holding a reference to the filter using Arc::strong_count().
During filter pushdown, consumers that accept the filter and use it later in execution have to retain a reference to Arc. For example, scan nodes like ParquetSource.
Are these changes tested?
I added a unit test in dynamic_filters.rs (test_is_used) that verifies the Arc reference counting behavior.
Existing integration tests in datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs validate the end-to-end behavior. These tests verify that dynamic filters are computed and filled when consumers are present.
Are there any user-facing changes?
new is_used() function