Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
7e894be
Dynamic Filter and Join Handling Refactor Summary
kosiew Sep 10, 2025
37c95a4
Dynamic Filter Pushdown Enhancements
kosiew Sep 10, 2025
02f2d00
Refactor Dynamic Filter Handling in `HashJoinExec`
kosiew Sep 11, 2025
24d2fea
Refactor dynamic filter side determination in `JoinType` for clarity …
kosiew Sep 11, 2025
5116377
Enhance dynamic filter handling in HashJoinExec to support left and r…
kosiew Sep 11, 2025
1f4a413
Remove unnecessary JoinType imports from dynamic filter pushdown tests
kosiew Sep 11, 2025
b3b7a30
Refactor dynamic filter tests to use a helper function for scan creation
kosiew Sep 11, 2025
190270d
Refactor dynamic filter pushdown tests: remove unused CoalescePartiti…
kosiew Sep 11, 2025
459abf4
Enhance dynamic filter pushdown logic in HashJoinExec and SharedBound…
kosiew Sep 11, 2025
36d1a61
Refactor dynamic filter handling in HashJoinExec: enforce specified j…
kosiew Sep 11, 2025
1c40186
Implement `preserves` method in `JoinType` to enhance join side prese…
kosiew Sep 11, 2025
581cbcd
Add test for dynamic filter pushdown in partitioned HashJoinExec
kosiew Sep 11, 2025
12ac262
Add tests for dynamic filter pushdown in right and left mark joins in…
kosiew Sep 11, 2025
50e8e17
Revert "Add tests for dynamic filter pushdown in right and left mark …
kosiew Sep 11, 2025
7dcdfdc
Add LEFT_PRESERVING and RIGHT_PRESERVING constants for join type pres…
kosiew Sep 11, 2025
f744fc3
Add preservation checks for LeftAnti and RightAnti join types
kosiew Sep 11, 2025
5755a1a
Enhance documentation for dynamic_filter_side to clarify behavior wit…
kosiew Sep 11, 2025
500a32b
Refactor imports in filter_pushdown tests for clarity and consistency
kosiew Sep 11, 2025
9764539
Remove unused dynamic filter references from HashJoinExec and HashJoi…
kosiew Sep 11, 2025
d9384f8
Refactor dynamic filter expression handling in HashJoinExec for clari…
kosiew Sep 11, 2025
29c6d63
Enhance dynamic filter pushdown in HashJoinExec by adding CoalescePar…
kosiew Sep 11, 2025
d2722c1
Restore main test_hashjoin_dynamic_filter_pushdown_partitioned
kosiew Sep 11, 2025
2bf7dc4
Add test for nested hash join dynamic filter pushdown
kosiew Sep 11, 2025
4d14bc1
Add comments for tests
kosiew Sep 11, 2025
b4f9701
Implement dynamic filter pushdown support for HashJoinExec
kosiew Sep 11, 2025
411a459
Refactor filter pushdown tests to utilize helper functions for buildi…
kosiew Sep 11, 2025
81145aa
Enhance dynamic filter selection logic for JoinType to accurately ref…
kosiew Sep 11, 2025
434ad72
Refactor dynamic_filter_side logic to improve join type handling and …
kosiew Sep 11, 2025
688ba1f
Fix dynamic filter creation to return an error for unspecified join s…
kosiew Sep 11, 2025
b814c22
Clarify documentation for dynamic filter pushdown eligibility in Join…
kosiew Sep 11, 2025
5c499d8
Add probe-side bounds accumulation for dynamic filter pushdown in Has…
kosiew Sep 11, 2025
c353d5e
Add test for dynamic filter bounds accumulation in HashJoinExec
kosiew Sep 11, 2025
5a51266
Add comment to explain waits_for_all_partitions_before_updating
kosiew Sep 11, 2025
ea03b6d
Implement dynamic filter handling for FULL joins and add test for ski…
kosiew Sep 11, 2025
b6dc5ea
Fix import path for MaxAccumulator and MinAccumulator in HashJoinStream
kosiew Sep 11, 2025
0782214
Merge branch 'main' into df-join-metadata-16973
kosiew Sep 11, 2025
61c33b5
Fix clippy error
kosiew Sep 11, 2025
fcca244
Clarify comment on dynamic filter pushdown eligibility in JoinType im…
kosiew Sep 11, 2025
f5f1fc3
Fix clippy error
kosiew Sep 11, 2025
39390be
Refactor TestMemoryExec initialization to use schema.clone() for clarity
kosiew Sep 12, 2025
6cf11d5
Fix clippy error
kosiew Sep 13, 2025
5cfb95c
Merge branch 'main' into df-join-metadata-16973
kosiew Sep 18, 2025
d67f5db
Add dynamic filter side preference tests for join types
kosiew Sep 18, 2025
238a57e
Refactor dynamic filter side logic in join types and update related t…
kosiew Sep 18, 2025
990d940
Clarify dynamic filter behavior for semi/anti joins in pushdown logic
kosiew Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions datafusion/common/src/join_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ pub enum JoinType {
RightMark,
}

// Semi/anti joins intentionally omitted: they only emit subsets of the left input and thus do
// not "preserve" every row for dynamic filter purposes.
const LEFT_PRESERVING: &[JoinType] =
&[JoinType::Left, JoinType::Full, JoinType::LeftMark];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about Semi and Anti joins?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semi and Anti joins purposely aren’t listed in LEFT_PRESERVING: they only return the subset of left rows that either match (Semi) or don’t match (Anti), so they don’t preserve all left input rows. The right-side analogue has the same behaviour, which is why only the outer/mark variants appear in those preservation tables.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semi and Anti joins purposely aren’t listed in LEFT_PRESERVING: they only return the subset of left rows that either match (Semi) or don’t match (Anti), so they don’t preserve all left input rows. The right-side analogue has the same behaviour, which is why only the outer/mark variants appear in those preservation tables.

You are right. Sorry about that. Given I think this logic is trying to figure out when to push down predicates, I was thinking it should also be considering SEMI and ANTI joins.

For example, I was thinking about these two joins

a LEFT JOIN b ON (...)
WHERE a.x < 5 AND b.y < 10
A ANTI JOIN b ON (...)
WHERE a.x < 5 AND b.y < 10

In both cases, I believe it is correct to push the predicate on a.x below the join on the a side. However, also in both cases I don't think it is correct to push the b.y predicate below the join:

For the LEFT JOIN pushing b.y can re-introduce rows from a that should be filtered after the join. Same thing for ANTI JOIN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for double-checking! In this file the LEFT_PRESERVING array is meant to capture only the join variants that are guaranteed to emit every left row at least once—i.e. the outer and mark joins. Semi/anti joins intentionally aren’t listed because they drop rows from their respective inputs, so they don’t satisfy that preservation property.

That distinction matters a few lines later when we decide which child can safely receive a dynamic filter. If we marked LeftSemi/LeftAnti as left-preserving, dynamic_filter_pushdown_side would classify them the same way as a left outer join and start attaching the dynamic filter to the right input—the exact misbehaviour you’re warning about for predicates that reference b.y. Because they remain non-preserving, those join types fall through to the (false, false) arm and we keep the dynamic filter on the left side only, which means predicates like a.x < 5 are still pushed while b.y < 10 is not.

I’ll add a short comment in the code/tests to spell this out so it’s harder to miss in the future.


// Symmetric rationale applies on the right: semi/anti joins do not preserve all right rows.
const RIGHT_PRESERVING: &[JoinType] =
&[JoinType::Right, JoinType::Full, JoinType::RightMark];

impl JoinType {
pub fn is_outer(self) -> bool {
self == JoinType::Left || self == JoinType::Right || self == JoinType::Full
Expand Down Expand Up @@ -111,6 +120,31 @@ impl JoinType {
| JoinType::RightAnti
)
}

/// Returns true if this join type preserves all rows from the specified `side`.
pub fn preserves(self, side: JoinSide) -> bool {
match side {
JoinSide::Left => LEFT_PRESERVING.contains(&self),
JoinSide::Right => RIGHT_PRESERVING.contains(&self),
JoinSide::None => false,
}
}

/// Returns true if this join type preserves all rows from its left input.
///
/// For [`JoinType::Left`], [`JoinType::Full`], and [`JoinType::LeftMark`] joins
/// every row from the left side will appear in the output at least once.
pub fn preserves_left(self) -> bool {
self.preserves(JoinSide::Left)
}

/// Returns true if this join type preserves all rows from its right input.
///
/// For [`JoinType::Right`], [`JoinType::Full`], and [`JoinType::RightMark`] joins
/// every row from the right side will appear in the output at least once.
pub fn preserves_right(self) -> bool {
self.preserves(JoinSide::Right)
}
}

impl Display for JoinType {
Expand Down Expand Up @@ -194,3 +228,57 @@ impl JoinSide {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_join_type_swap() {
assert_eq!(JoinType::Inner.swap(), JoinType::Inner);
assert_eq!(JoinType::Left.swap(), JoinType::Right);
assert_eq!(JoinType::Right.swap(), JoinType::Left);
assert_eq!(JoinType::Full.swap(), JoinType::Full);
assert_eq!(JoinType::LeftSemi.swap(), JoinType::RightSemi);
assert_eq!(JoinType::RightSemi.swap(), JoinType::LeftSemi);
assert_eq!(JoinType::LeftAnti.swap(), JoinType::RightAnti);
assert_eq!(JoinType::RightAnti.swap(), JoinType::LeftAnti);
assert_eq!(JoinType::LeftMark.swap(), JoinType::RightMark);
assert_eq!(JoinType::RightMark.swap(), JoinType::LeftMark);
}

#[test]
fn test_join_type_supports_swap() {
use JoinType::*;
let supported = [
Inner, Left, Right, Full, LeftSemi, RightSemi, LeftAnti, RightAnti,
];
for jt in supported {
assert!(jt.supports_swap(), "{jt:?} should support swap");
}
let not_supported = [LeftMark, RightMark];
for jt in not_supported {
assert!(!jt.supports_swap(), "{jt:?} should not support swap");
}
}

#[test]
fn test_preserves_sides() {
use JoinSide::*;

assert!(JoinType::Left.preserves(Left));
assert!(JoinType::Full.preserves(Left));
assert!(JoinType::LeftMark.preserves(Left));
assert!(!JoinType::LeftSemi.preserves(Left));

assert!(JoinType::Right.preserves(Right));
assert!(JoinType::Full.preserves(Right));
assert!(JoinType::RightMark.preserves(Right));
assert!(!JoinType::RightSemi.preserves(Right));

assert!(!JoinType::LeftAnti.preserves(Left));
assert!(!JoinType::LeftAnti.preserves(Right));
assert!(!JoinType::RightAnti.preserves(Left));
assert!(!JoinType::RightAnti.preserves(Right));
}
}
Loading