Skip to content

Conversation

kosiew
Copy link
Contributor

@kosiew kosiew commented Sep 11, 2025

Which issue does this PR close?

This is part of a series of smaller PRs to reimplement #17090

Rationale for this change

Dynamic filter pushdown previously assumed the probe side (right) for inner joins and was overly conservative or unsafe for non-inner joins because the optimizer lacked clear metadata about which join input must be preserved in output. This caused missed pruning opportunities and required ad-hoc logic in join implementations.

This change introduces explicit preservation metadata on JoinType and a dynamic_filter_side() helper so join implementations (currently HashJoinExec) and optimizer/rewriters can determine which input can safely accept dynamic filters. With this information the physical operator can:

  • Create and attach dynamic filters to the correct input side.
  • Collect and report bounds from the build/probe side correctly according to which side is eligible.
  • Avoid incorrectly pruning rows when joins preserve both sides (e.g. FULL).

Overall this enables safer and more effective dynamic filter pushdown across a wider set of join types.

What changes are included in this PR?

High level summary of code changes (by module):

  • datafusion/common/src/join_type.rs

    • Add preserves, preserves_left, preserves_right helpers and small LEFT_PRESERVING / RIGHT_PRESERVING const arrays.
    • Add dynamic_filter_side() which returns the JoinSide eligible for receiving a dynamic filter (or None if both sides must be preserved).
    • Add unit tests for swap/support methods, preservation helpers, and dynamic_filter_side truth table.
  • datafusion/core/tests/physical_optimizer/filter_pushdown/

    • Add utility helpers (build_scan, build_hash_join, build_topk, sort_expr, etc.) to construct small plans in tests.
    • Add many tests exercising dynamic filter pushdown across join shapes: inner join, full join, nested joins, handling of child pushdown results, reporting of bounds, and integration snapshots.
    • Refactor several existing tests to use new helpers and reduce duplication.
  • datafusion/physical-plan/src/joins/hash_join/exec.rs

    • Add join_exprs_for_side(...) helper and change create_dynamic_filter(...) to accept an explicit JoinSide argument.
    • Use join_type.dynamic_filter_side() to determine where to attach dynamic filters and whether to enable bounds accumulation for a particular side.
    • Wire dynamic filter creation into gather_filters_for_pushdown_with_side(...) so the correct child receives the DynamicFilterPhysicalExpr.
    • Make handle_child_pushdown_result_with_side(...) respect dynamic_filter_side() and construct an updated HashJoinExec node when a dynamic filter is received from the expected child.
    • Add logic to initialize probe-side accumulators when the dynamic filter targets the left side.
    • Add unit/integration tests for behavior and error cases (e.g. requesting dynamic filter creation with JoinSide::None).
  • datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs

    • Rename on_right -> join_exprs and document it as "join expressions on the side receiving the dynamic filter".
    • Adjust construction and predicate generation to use the provided join_exprs.
    • Add a synchronization test for the SharedBoundsAccumulator ensuring updates are applied only after all partitions reported.
  • datafusion/physical-plan/src/joins/hash_join/stream.rs

    • Add ProbeSideBoundsAccumulator to accumulate min/max for probe-side when dynamic filters target the left side.
    • Thread probe_bounds_accumulators and probe_side_row_count through HashJoinStream and update them while scanning probe batches.
    • Report probe-side bounds to the shared accumulator when the probe stream is exhausted and the join expects dynamic filters for the left side.
    • Adjust state transitions so bounds are only collected/reported for the relevant side.
  • Tests & snapshots

    • Multiple new tests and snapshots to assert the plan contains DynamicFilterPhysicalExpr where expected, verify metrics reflect pruning and verify that FULL joins do not incorrectly prune rows.

Are these changes tested?

Yes. This PR adds and updates tests at multiple levels:

  • Unit tests in join_type.rs verifying the preserves* helpers and dynamic_filter_side() truth table.
  • Unit tests in shared_bounds.rs and hash_join modules verifying synchronization, dynamic filter creation errors, and accumulation/reporting behavior.
  • Integration-style tests under core/tests/physical_optimizer/filter_pushdown that run parts of the physical plan with FilterPushdown::new_post_optimization() and assert both the optimized plan (snapshots) and the runtime results (record batches and scan metrics).

All new behavior is covered by tests that both assert the plan contains DynamicFilterPhysicalExpr (or not) and that runtime metrics / output rows are correct (including assertions that FULL joins preserve rows and are not pruned).

Are there any user-facing changes?

  • No user-visible SQL surface changes.
  • The changes are internal to query planning/execution and aim to make dynamic filter pushdown more effective and correct across additional join types.

API compatibility:

  • Public crate types (JoinType) gained extra helper methods but no breaking changes to existing variants or serialization. This should be backward compatible for downstream code that enumerates JoinType variants.

Implementation notes & rationale

  • dynamic_filter_side() encodes conservative semantics: if a join preserves both sides (Full) it returns None (no dynamic filters). If exactly one side is preserved, the opposite side is eligible to receive a dynamic filter. When neither side is preserved (e.g., Inner, semis, antis), a default probe-side preference is used (right by default), but LeftSemi/LeftAnti prefer left and RightSemi/RightAnti prefer right.

  • HashJoinExec uses dynamic_filter_side() to decide three things:

    1. Where to create and attach a DynamicFilterPhysicalExpr (left or right child description) during filter gather.
    2. Which side should accumulate bounds at runtime (only collect when a dynamic filter is enabled and the join expects the other side to be pruned).
    3. Where to accept a reported dynamic filter from a child during handle_child_pushdown_result.
  • SharedBoundsAccumulator was generalized to accept join_exprs that represent the join key expressions for the side receiving the dynamic filter. This removes implicit "right-side" assumptions and makes the accumulator symmetric.

  • ProbeSideBoundsAccumulator mirrors the build-side accumulator behavior to support collecting min/max values when the join expects dynamic filters on the left side.

  • Tests intentionally keep small, in-memory scans and snapshot assertions so reviewers can quickly inspect the expected plan strings and runtime outputs.

Suggested reviewers / areas to focus review on

  • hash_join::exec.rs — verify the logic selecting which side to attach dynamic filters and the new gather_filters_for_pushdown_with_side / handle_child_pushdown_result_with_side functions.
  • shared_bounds.rs and stream.rs — correctness of bounds accumulation and partition synchronization across both sides.
  • Tests in core/tests/physical_optimizer/filter_pushdown — ensure that the new helpers and snapshots represent the intended behavior and are not overly brittle.

* **Refactored `create_dynamic_filter`** for better readability and
  clearer join side handling.
* **Improved dynamic filter side tests** for correctness across various
  join types.
* **Cleaned up join key null filtering and filter pushdown logic**,
  removing unused functions and clarifying join preservation behavior.
* **Enhanced `right_side_dynamic_filter_test`** with better clarity and
  extended support for more join types.
* **Upgraded `HashJoinExec`** to:

  * Clearly specify dynamic filter sides.
  * Improve documentation.
  * Support dynamic filter pushdown based on join preservation metadata.
* **Refactored `JoinType`** to:

  * Add helper methods for unmatched row preservation.
  * Improve clarity in dynamic filter logic.
  * Support new method signatures used in `HashJoinExec`.
* **Consolidated and expanded dynamic filter tests**:

  * Merged truth table test cases.
  * Added comprehensive tests for `dynamic_filter_side`.
* **Shelved unrelated changes** to maintain focus on dynamic filter and
  join logic improvements.
* Implemented dynamic filter pushdown in `HashJoinExec` to collect and
  apply bounds from both join sides.
* Added probe expressions and improved bounds accumulation logic,
  eliminating unnecessary futures for partition-bounds reporting.
* Updated inner join logic to prefer the right side for filter pushdown
  and added support for full join pushdown.

JoinType API Improvements

* Added methods to `JoinType` for dynamic filter pushdown checks and
  unmatched-row preservation.
* Introduced swap functionality with comprehensive truth-table and unit
  tests.

Refactoring & Test Coverage

* Refactored dynamic filter handling in `HashJoinExec` for clarity and
  performance.
* Removed unused dynamic filter side and related expressions.
* Expanded test coverage with join-type truth tables, full join
  scenarios, and `JoinType` swap tests.

---

These changes collectively enhance performance and correctness of
dynamic filter pushdown across different join types, while simplifying
related code and ensuring robust test coverage.
Improves partition bounds reporting and synchronization mechanisms
within the dynamic filter logic of `HashJoinExec`. This refactor
increases robustness and reliability during query execution.

Clarify Dynamic Filter Side Logic--

Adds explanatory comments to document the reasoning behind child
selection logic based on the dynamic filter side in `HashJoinExec`,
aiding future maintenance and readability.
…onsExec and improve batch creation in build_int32_scan
…sAccumulator

- Adjust dynamic filter pushdown conditions to ensure correct join side handling.
- Refactor join expressions to clarify which side receives the dynamic filter.
- Improve bounds reporting logic in HashJoinStream based on join type.
…titionsExec and CoalesceBatchesExec for improved partitioning and batch handling
@github-actions github-actions bot added core Core DataFusion crate common Related to common crate physical-plan Changes to the physical-plan crate labels Sep 11, 2025
@kosiew kosiew force-pushed the df-join-metadata-16973 branch from 51b0bad to f5f1fc3 Compare September 11, 2025 13:58
@kosiew kosiew marked this pull request as ready for review September 11, 2025 15:25
@kosiew kosiew changed the title Draft Add JoinType preservation helpers and dynamic_filter_side; enable dynamic filter pushdown in HashJoinExec Add JoinType preservation helpers and dynamic_filter_side; enable dynamic filter pushdown in HashJoinExec Sep 11, 2025
/// left input, for [`JoinType::RightSemi`] and [`JoinType::RightAnti`] it
/// is the right input, and for other joins the right input is used by
/// default as joins typically treat the right as the probe side.
pub fn dynamic_filter_side(self) -> JoinSide {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think dynamic filters are logically a property of the join type -- the join type specifies where predicates in general can be pushed through

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helper is really just reusing the existing preserves_left/right semantics to figure out which side can legally accept a pushed predicate, so it doesn’t need to live on JoinType as a dynamic-filter–specific API. I’ll move that logic back into the dynamic-filter optimizer (or, alternatively, expose a more neutrally named helper that just reports the non-preserving side) and let the rule decide what to do with it there

@@ -74,6 +74,12 @@ pub enum JoinType {
RightMark,
}

const LEFT_PRESERVING: &[JoinType] =
&[JoinType::Left, JoinType::Full, JoinType::LeftMark];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about Semi and Anti joins?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semi and Anti joins purposely aren’t listed in LEFT_PRESERVING: they only return the subset of left rows that either match (Semi) or don’t match (Anti), so they don’t preserve all left input rows. The right-side analogue has the same behaviour, which is why only the outer/mark variants appear in those preservation tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semi and Anti joins purposely aren’t listed in LEFT_PRESERVING: they only return the subset of left rows that either match (Semi) or don’t match (Anti), so they don’t preserve all left input rows. The right-side analogue has the same behaviour, which is why only the outer/mark variants appear in those preservation tables.

You are right. Sorry about that. Given I think this logic is trying to figure out when to push down predicates, I was thinking it should also be considering SEMI and ANTI joins.

For example, I was thinking about these two joins

a LEFT JOIN b ON (...)
WHERE a.x < 5 AND b.y < 10
A ANTI JOIN b ON (...)
WHERE a.x < 5 AND b.y < 10

In both cases, I believe it is correct to push the predicate on a.x below the join on the a side. However, also in both cases I don't think it is correct to push the b.y predicate below the join:

For the LEFT JOIN pushing b.y can re-introduce rows from a that should be filtered after the join. Same thing for ANTI JOIN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for double-checking! In this file the LEFT_PRESERVING array is meant to capture only the join variants that are guaranteed to emit every left row at least once—i.e. the outer and mark joins. Semi/anti joins intentionally aren’t listed because they drop rows from their respective inputs, so they don’t satisfy that preservation property.

That distinction matters a few lines later when we decide which child can safely receive a dynamic filter. If we marked LeftSemi/LeftAnti as left-preserving, dynamic_filter_pushdown_side would classify them the same way as a left outer join and start attaching the dynamic filter to the right input—the exact misbehaviour you’re warning about for predicates that reference b.y. Because they remain non-preserving, those join types fall through to the (false, false) arm and we keep the dynamic filter on the left side only, which means predicates like a.x < 5 are still pushed while b.y < 10 is not.

I’ll add a short comment in the code/tests to spell this out so it’s harder to miss in the future.

@kosiew kosiew marked this pull request as draft September 18, 2025 10:51
@kosiew kosiew force-pushed the df-join-metadata-16973 branch from 4334082 to 238a57e Compare September 18, 2025 13:28
@kosiew kosiew marked this pull request as ready for review September 18, 2025 15:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
common Related to common crate core Core DataFusion crate physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants