Skip to content

Commit e0b4e8d

Browse files
authored
feat: Implement partition_statistics API for SortMergeJoinExec (#19567)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Part of #15873. ## Rationale for this change - The goal is to replace the old `statistics` API with the new `partition_statistics` <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? - implemented the partition_statistics API for SortMergeJoinExec - Removed the special case that returned unknown statistics for specific partitions. - Added `test_partition_statistics` to verify both aggregate and specific partition statistics work correctly <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? - Addes new tests <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 7e04974 commit e0b4e8d

File tree

2 files changed

+84
-5
lines changed

2 files changed

+84
-5
lines changed

datafusion/physical-plan/src/joins/sort_merge_join/exec.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -524,15 +524,20 @@ impl ExecutionPlan for SortMergeJoinExec {
524524
}
525525

526526
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
527-
if partition.is_some() {
528-
return Ok(Statistics::new_unknown(&self.schema()));
529-
}
527+
// SortMergeJoinExec uses symmetric hash partitioning where both left and right
528+
// inputs are hash-partitioned on the join keys. This means partition `i` of the
529+
// left input is joined with partition `i` of the right input.
530+
//
531+
// Therefore, partition-specific statistics can be computed by getting the
532+
// partition-specific statistics from both children and combining them via
533+
// `estimate_join_statistics`.
534+
//
530535
// TODO stats: it is not possible in general to know the output size of joins
531536
// There are some special cases though, for example:
532537
// - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)`
533538
estimate_join_statistics(
534-
self.left.partition_statistics(None)?,
535-
self.right.partition_statistics(None)?,
539+
self.left.partition_statistics(partition)?,
540+
self.right.partition_statistics(partition)?,
536541
&self.on,
537542
&self.join_type,
538543
&self.schema,

datafusion/physical-plan/src/joins/sort_merge_join/tests.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3030,6 +3030,80 @@ async fn test_anti_join_filtered_mask() -> Result<()> {
30303030
Ok(())
30313031
}
30323032

3033+
#[test]
3034+
fn test_partition_statistics() -> Result<()> {
3035+
use crate::ExecutionPlan;
3036+
use datafusion_common::stats::Precision;
3037+
3038+
let left = build_table(
3039+
("a1", &vec![1, 2, 3]),
3040+
("b1", &vec![4, 5, 5]),
3041+
("c1", &vec![7, 8, 9]),
3042+
);
3043+
let right = build_table(
3044+
("a2", &vec![10, 20, 30]),
3045+
("b1", &vec![4, 5, 6]),
3046+
("c2", &vec![70, 80, 90]),
3047+
);
3048+
3049+
let on = vec![(
3050+
Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
3051+
Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
3052+
)];
3053+
3054+
// Test different join types to ensure partition_statistics works correctly for all
3055+
let join_types = vec![
3056+
(Inner, 6), // left cols + right cols
3057+
(Left, 6), // left cols + right cols
3058+
(Right, 6), // left cols + right cols
3059+
(Full, 6), // left cols + right cols
3060+
(LeftSemi, 3), // only left cols
3061+
(LeftAnti, 3), // only left cols
3062+
(RightSemi, 3), // only right cols
3063+
(RightAnti, 3), // only right cols
3064+
];
3065+
3066+
for (join_type, expected_cols) in join_types {
3067+
let join_exec =
3068+
join(Arc::clone(&left), Arc::clone(&right), on.clone(), join_type)?;
3069+
3070+
// Test aggregate statistics (partition = None)
3071+
// Should return meaningful statistics computed from both inputs
3072+
let stats = join_exec.partition_statistics(None)?;
3073+
assert_eq!(
3074+
stats.column_statistics.len(),
3075+
expected_cols,
3076+
"Aggregate stats column count failed for {join_type:?}"
3077+
);
3078+
// Verify that aggregate statistics have a meaningful num_rows (not Absent)
3079+
assert!(
3080+
!matches!(stats.num_rows, Precision::Absent),
3081+
"Aggregate stats should have meaningful num_rows for {join_type:?}, got {:?}",
3082+
stats.num_rows
3083+
);
3084+
3085+
// Test partition-specific statistics (partition = Some(0))
3086+
// The implementation correctly passes `partition` to children.
3087+
// Since the child TestMemoryExec returns unknown stats for specific partitions,
3088+
// the join output will also have Absent num_rows. This is expected behavior
3089+
// as the statistics depend on what the children can provide.
3090+
let partition_stats = join_exec.partition_statistics(Some(0))?;
3091+
assert_eq!(
3092+
partition_stats.column_statistics.len(),
3093+
expected_cols,
3094+
"Partition stats column count failed for {join_type:?}"
3095+
);
3096+
// When children return unknown stats, the join's partition stats will be Absent
3097+
assert!(
3098+
matches!(partition_stats.num_rows, Precision::Absent),
3099+
"Partition stats should have Absent num_rows when children return unknown for {join_type:?}, got {:?}",
3100+
partition_stats.num_rows
3101+
);
3102+
}
3103+
3104+
Ok(())
3105+
}
3106+
30333107
/// Returns the column names on the schema
30343108
fn columns(schema: &Schema) -> Vec<String> {
30353109
schema.fields().iter().map(|f| f.name().clone()).collect()

0 commit comments

Comments
 (0)