Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2189,7 +2189,11 @@ pub fn create_aggregate_expr_and_maybe_filter(
let (name, human_display, e) = match e {
Expr::Alias(Alias { name, .. }) => {
let unaliased = e.clone().unalias_nested().data;
(Some(name.clone()), e.human_display().to_string(), unaliased)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be the core change that also matches the PR title

// Use the unaliased expression for human_display so that the
// physical plan EXPLAIN shows the actual aggregate expression,
// not just the alias name
let display = unaliased.human_display().to_string();
(Some(name.clone()), display, unaliased)
}
Expr::AggregateFunction(_) => (
Some(e.schema_name().to_string()),
Expand Down
11 changes: 10 additions & 1 deletion datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,16 @@ impl DisplayAs for AggregateExec {
let a: Vec<String> = self
.aggr_expr
.iter()
.map(|agg| agg.name().to_string())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change makes sense

.map(|agg| {
let expr_display = agg.human_display();
let alias = agg.name();
// Show the expression with alias if they differ
if !expr_display.is_empty() && expr_display != alias {
format!("{expr_display} as {alias}")
} else {
alias.to_string()
}
})
.collect();
write!(f, ", aggr=[{}]", a.join(", "))?;
if let Some(limit) = self.limit {
Expand Down
12 changes: 6 additions & 6 deletions datafusion/sqllogictest/test_files/agg_func_substitute.slt
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c, 1) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] as nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c, 1) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] as nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true

Expand All @@ -63,10 +63,10 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c, 1) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] as nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c, 1) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] as nth_value(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true

Expand All @@ -81,10 +81,10 @@ logical_plan
03)----TableScan: multiple_ordered_table projection=[a, c]
physical_plan
01)ProjectionExec: expr=[a@0 as a, nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c, 101) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] as nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
03)----SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
04)------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
05)--------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[nth_value(multiple_ordered_table.c, 101) ORDER BY [multiple_ordered_table.c ASC NULLS LAST] as nth_value(multiple_ordered_table.c,Int64(1) + Int64(100)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], file_type=csv, has_header=true

Expand Down
16 changes: 8 additions & 8 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -3647,9 +3647,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[min(CAST(aggregate_test_100.c2 AS Float64)) AS percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]]]
02)--TableScan: aggregate_test_100 projection=[c2]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]]
01)AggregateExec: mode=Final, gby=[], aggr=[min(aggregate_test_100.c2) as percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]]
03)----AggregateExec: mode=Partial, gby=[], aggr=[min(aggregate_test_100.c2) as percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 ASC NULLS LAST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2], file_type=csv, has_header=true

Expand All @@ -3660,9 +3660,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[max(CAST(aggregate_test_100.c2 AS Float64)) AS percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]]]
02)--TableScan: aggregate_test_100 projection=[c2]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]]
01)AggregateExec: mode=Final, gby=[], aggr=[max(aggregate_test_100.c2) as percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]]
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(aggregate_test_100.c2) as percentile_cont(Float64(0)) WITHIN GROUP [aggregate_test_100.c2 DESC NULLS FIRST]]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2], file_type=csv, has_header=true

Expand All @@ -3673,9 +3673,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[min(CAST(aggregate_test_100.c2 AS Float64)) AS percentile_cont(aggregate_test_100.c2,Float64(0))]]
02)--TableScan: aggregate_test_100 projection=[c2]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[percentile_cont(aggregate_test_100.c2,Float64(0))]
01)AggregateExec: mode=Final, gby=[], aggr=[min(aggregate_test_100.c2) as percentile_cont(aggregate_test_100.c2,Float64(0))]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[percentile_cont(aggregate_test_100.c2,Float64(0))]
03)----AggregateExec: mode=Partial, gby=[], aggr=[min(aggregate_test_100.c2) as percentile_cont(aggregate_test_100.c2,Float64(0))]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2], file_type=csv, has_header=true

Expand All @@ -3686,9 +3686,9 @@ logical_plan
01)Aggregate: groupBy=[[]], aggr=[[max(CAST(aggregate_test_100.c2 AS Float64)) AS percentile_cont(aggregate_test_100.c2,Float64(1))]]
02)--TableScan: aggregate_test_100 projection=[c2]
physical_plan
01)AggregateExec: mode=Final, gby=[], aggr=[percentile_cont(aggregate_test_100.c2,Float64(1))]
01)AggregateExec: mode=Final, gby=[], aggr=[max(aggregate_test_100.c2) as percentile_cont(aggregate_test_100.c2,Float64(1))]
02)--CoalescePartitionsExec
03)----AggregateExec: mode=Partial, gby=[], aggr=[percentile_cont(aggregate_test_100.c2,Float64(1))]
03)----AggregateExec: mode=Partial, gby=[], aggr=[max(aggregate_test_100.c2) as percentile_cont(aggregate_test_100.c2,Float64(1))]
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
05)--------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100_with_dates.csv]]}, projection=[c2], file_type=csv, has_header=true

Expand Down
33 changes: 33 additions & 0 deletions datafusion/sqllogictest/test_files/explain.slt
Original file line number Diff line number Diff line change
Expand Up @@ -660,5 +660,38 @@ logical_plan

# unknown mode

# Test that aliased aggregate expressions are visible in physical explain output
# Issue: https://github.com/apache/datafusion/issues/19685
statement ok
create table agg_alias_test (column1 int, column2 int) as values (1, 100), (2, 200), (3, 300);

query TT
EXPLAIN SELECT sum(column1) AS my_sum FROM agg_alias_test;
----
logical_plan
01)Projection: sum(agg_alias_test.column1) AS my_sum
02)--Aggregate: groupBy=[[]], aggr=[[sum(CAST(agg_alias_test.column1 AS Int64))]]
03)----TableScan: agg_alias_test projection=[column1]
physical_plan
01)ProjectionExec: expr=[sum(agg_alias_test.column1)@0 as my_sum]
02)--AggregateExec: mode=Single, gby=[], aggr=[sum(agg_alias_test.column1)]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

# Test with filter clause on aggregate
query TT
EXPLAIN SELECT sum(column1) FILTER (WHERE column2 <= 200) AS filtered_sum FROM agg_alias_test;
----
logical_plan
01)Projection: sum(agg_alias_test.column1) FILTER (WHERE agg_alias_test.column2 <= Int64(200)) AS filtered_sum
02)--Aggregate: groupBy=[[]], aggr=[[sum(CAST(agg_alias_test.column1 AS Int64)) FILTER (WHERE agg_alias_test.column2 <= Int32(200)) AS sum(agg_alias_test.column1) FILTER (WHERE agg_alias_test.column2 <= Int64(200))]]
03)----TableScan: agg_alias_test projection=[column1, column2]
physical_plan
01)ProjectionExec: expr=[sum(agg_alias_test.column1) FILTER (WHERE agg_alias_test.column2 <= Int64(200))@0 as filtered_sum]
02)--AggregateExec: mode=Single, gby=[], aggr=[sum(agg_alias_test.column1) FILTER (WHERE agg_alias_test.column2 <= Int32(200)) as sum(agg_alias_test.column1) FILTER (WHERE agg_alias_test.column2 <= Int64(200))]
03)----DataSourceExec: partitions=1, partition_sizes=[1]

statement ok
drop table agg_alias_test;

statement ok
drop table foo;