@@ -41,7 +41,7 @@ use datafusion_expr::utils::{
4141} ;
4242use datafusion_expr:: {
4343 Aggregate , Expr , Filter , GroupingSet , LogicalPlan , LogicalPlanBuilder ,
44- LogicalPlanBuilderOptions , Partitioning ,
44+ LogicalPlanBuilderOptions , Partitioning , SortExpr ,
4545} ;
4646
4747use indexmap:: IndexMap ;
@@ -219,11 +219,13 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
219219 . transpose ( ) ?;
220220
221221 // The outer expressions we will search through for aggregates.
222- // Aggregates may be sourced from the SELECT list or from the HAVING expression .
222+ // Aggregates may be sourced from the SELECT list, HAVING, QUALIFY, or ORDER BY .
223223 let aggr_expr_haystack = select_exprs
224224 . iter ( )
225225 . chain ( having_expr_opt. iter ( ) )
226- . chain ( qualify_expr_opt. iter ( ) ) ;
226+ . chain ( qualify_expr_opt. iter ( ) )
227+ . chain ( order_by_rex. iter ( ) . map ( |s| & s. expr ) ) ;
228+
227229 // All of the aggregate expressions (deduplicated).
228230 let aggr_exprs = find_aggregate_exprs ( aggr_expr_haystack) ;
229231
@@ -233,19 +235,21 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
233235 mut select_exprs_post_aggr,
234236 having_expr_post_aggr,
235237 qualify_expr_post_aggr,
238+ order_by_rex,
236239 ) = if !group_by_exprs. is_empty ( ) || !aggr_exprs. is_empty ( ) {
237240 self . aggregate (
238241 & base_plan,
239242 & select_exprs,
240243 having_expr_opt. as_ref ( ) ,
241244 qualify_expr_opt. as_ref ( ) ,
245+ & order_by_rex,
242246 & group_by_exprs,
243247 & aggr_exprs,
244248 ) ?
245249 } else {
246250 match having_expr_opt {
247251 Some ( having_expr) => return plan_err ! ( "HAVING clause references: {having_expr} must appear in the GROUP BY clause or be used in an aggregate function" ) ,
248- None => ( base_plan. clone ( ) , select_exprs. clone ( ) , having_expr_opt, qualify_expr_opt)
252+ None => ( base_plan. clone ( ) , select_exprs. clone ( ) , having_expr_opt, qualify_expr_opt, order_by_rex )
249253 }
250254 } ;
251255
@@ -872,16 +876,25 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
872876 /// the aggregate
873877 /// * `qualify_expr_post_aggr` - The "qualify" expression rewritten to reference a column from
874878 /// the aggregate
875- #[ allow( clippy:: type_complexity) ]
879+ /// * `order_by_post_aggr` - The ORDER BY expressions rewritten to reference columns from
880+ /// the aggregate
881+ #[ allow( clippy:: type_complexity, clippy:: too_many_arguments) ]
876882 fn aggregate (
877883 & self ,
878884 input : & LogicalPlan ,
879885 select_exprs : & [ Expr ] ,
880886 having_expr_opt : Option < & Expr > ,
881887 qualify_expr_opt : Option < & Expr > ,
888+ order_by_exprs : & [ SortExpr ] ,
882889 group_by_exprs : & [ Expr ] ,
883890 aggr_exprs : & [ Expr ] ,
884- ) -> Result < ( LogicalPlan , Vec < Expr > , Option < Expr > , Option < Expr > ) > {
891+ ) -> Result < (
892+ LogicalPlan ,
893+ Vec < Expr > ,
894+ Option < Expr > ,
895+ Option < Expr > ,
896+ Vec < SortExpr > ,
897+ ) > {
885898 // create the aggregate plan
886899 let options =
887900 LogicalPlanBuilderOptions :: new ( ) . with_add_implicit_group_by_exprs ( true ) ;
@@ -988,11 +1001,23 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
9881001 None
9891002 } ;
9901003
1004+ // Rewrite the ORDER BY expressions to use the columns produced by the
1005+ // aggregation.
1006+ let order_by_post_aggr = order_by_exprs
1007+ . iter ( )
1008+ . map ( |sort_expr| {
1009+ let rewritten_expr =
1010+ rebase_expr ( & sort_expr. expr , & aggr_projection_exprs, input) ?;
1011+ Ok ( sort_expr. with_expr ( rewritten_expr) )
1012+ } )
1013+ . collect :: < Result < Vec < SortExpr > > > ( ) ?;
1014+
9911015 Ok ( (
9921016 plan,
9931017 select_exprs_post_aggr,
9941018 having_expr_post_aggr,
9951019 qualify_expr_post_aggr,
1020+ order_by_post_aggr,
9961021 ) )
9971022 }
9981023
0 commit comments