Skip to content

Commit 3f0c450

Browse files
peter-tothharris233
andcommitted
[SPARK-53094][SQL] Fix CUBE with aggregate containing HAVING clauses
### What changes were proposed in this pull request? This is an alternative PR to #51810 to fix a regresion introduced in Spark 3.2 with #32470. This PR defers the resolution of not fully resolved `UnresolvedHaving` nodes from `ResolveGroupingAnalytics`: ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveGroupingAnalytics === 'Sort ['s DESC NULLS LAST], true 'Sort ['s DESC NULLS LAST], true !+- 'UnresolvedHaving ('count('product) > 2) +- 'UnresolvedHaving ('count(tempresolvedcolumn(product#261, product, false)) > 2) ! +- 'Aggregate [cube(Vector(0), Vector(1), product#261, region#262)], [product#261, region#262, sum(amount#263) AS s#264L] +- Aggregate [product#269, region#270, spark_grouping_id#268L], [product#269, region#270, sum(amount#263) AS s#264L] ! +- SubqueryAlias t +- Expand [[product#261, region#262, amount#263, product#266, region#267, 0], [product#261, region#262, amount#263, product#266, null, 1], [product#261, region#262, amount#263, null, region#267, 2], [product#261, region#262, amount#263, null, null, 3]], [product#261, region#262, amount#263, product#269, region#270, spark_grouping_id#268L] ! +- LocalRelation [product#261, region#262, amount#263] +- Project [product#261, region#262, amount#263, product#261 AS product#266, region#262 AS region#267] ! +- SubqueryAlias t ! +- LocalRelation [product#261, region#262, amount#263] ``` to `ResolveAggregateFunctions` to add the correct aggregate expressions (`count(product#261)`): ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAggregateFunctions === 'Sort ['s DESC NULLS LAST], true 'Sort ['s DESC NULLS LAST], true !+- 'UnresolvedHaving (count(tempresolvedcolumn(product#261, product, false)) > cast(2 as bigint)) +- Project [product#269, region#270, s#264L] ! +- Aggregate [product#269, region#270, spark_grouping_id#268L], [product#269, region#270, sum(amount#263) AS s#264L] +- Filter (count(product)#272L > cast(2 as bigint)) ! +- Expand [[product#261, region#262, amount#263, product#266, region#267, 0], [product#261, region#262, amount#263, product#266, null, 1], [product#261, region#262, amount#263, null, region#267, 2], [product#261, region#262, amount#263, null, null, 3]], [product#261, region#262, amount#263, product#269, region#270, spark_grouping_id#268L] +- Aggregate [product#269, region#270, spark_grouping_id#268L], [product#269, region#270, sum(amount#263) AS s#264L, count(product#261) AS count(product)#272L] ! +- Project [product#261, region#262, amount#263, product#261 AS product#266, region#262 AS region#267] +- Expand [[product#261, region#262, amount#263, product#266, region#267, 0], [product#261, region#262, amount#263, product#266, null, 1], [product#261, region#262, amount#263, null, region#267, 2], [product#261, region#262, amount#263, null, null, 3]], [product#261, region#262, amount#263, product#269, region#270, spark_grouping_id#268L] ! +- SubqueryAlias t +- Project [product#261, region#262, amount#263, product#261 AS product#266, region#262 AS region#267] ! +- LocalRelation [product#261, region#262, amount#263] +- SubqueryAlias t ! +- LocalRelation [product#261, region#262, amount#263] ``` ### Why are the changes needed? Fix a correctness isue described in #51810. ### Does this PR introduce _any_ user-facing change? Yes, it fixes a correctness issue. ### How was this patch tested? Added new UT from #51810. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51820 from peter-toth/SPARK-53094-fix-cube-having. Lead-authored-by: Peter Toth <[email protected]> Co-authored-by: harris233 <[email protected]> Signed-off-by: Peter Toth <[email protected]>
1 parent af4e1d9 commit 3f0c450

File tree

3 files changed

+21
-1
lines changed

3 files changed

+21
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -792,6 +792,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
792792
} else {
793793
colResolved.havingCondition
794794
}
795+
// `cond` might contain unresolved aggregate functions so defer its resolution to
796+
// `ResolveAggregateFunctions` rule if needed.
797+
if (!cond.resolved) return colResolved
798+
795799
// Try resolving the condition of the filter as though it is in the aggregate clause
796800
val (extraAggExprs, Seq(resolvedHavingCond)) =
797801
ResolveAggregateFunctions.resolveExprsWithAggregate(Seq(cond), aggForResolving)

sql/core/src/test/resources/sql-tests/analyzer-results/grouping_set.sql.out

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ FROM (VALUES ('x', 'a', 10), ('y', 'b', 20) ) AS t (c1, c2, c3)
116116
GROUP BY GROUPING SETS ( ( c1 ), ( c2 ) )
117117
HAVING GROUPING__ID > 1
118118
-- !query analysis
119-
Filter (grouping__id#xL > cast(1 as bigint))
119+
Filter (GROUPING__ID#xL > cast(1 as bigint))
120120
+- Aggregate [c1#x, c2#x, spark_grouping_id#xL], [c1#x, c2#x, sum(c3#x) AS sum(c3)#xL, spark_grouping_id#xL AS grouping__id#xL]
121121
+- Expand [[c1#x, c2#x, c3#x, c1#x, null, 1], [c1#x, c2#x, c3#x, null, c2#x, 2]], [c1#x, c2#x, c3#x, c1#x, c2#x, spark_grouping_id#xL]
122122
+- Project [c1#x, c2#x, c3#x, c1#x AS c1#x, c2#x AS c2#x]

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5057,6 +5057,22 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
50575057
}
50585058
}
50595059
}
5060+
5061+
test("SPARK-53094: Fix cube-related data quality problem") {
5062+
val df = sql(
5063+
"""SELECT product, region, sum(amount) AS s
5064+
|FROM VALUES
5065+
| ('a', 'east', 100),
5066+
| ('b', 'east', 200),
5067+
| ('a', 'west', 150),
5068+
| ('b', 'west', 250),
5069+
| ('a', 'east', 120) AS t(product, region, amount)
5070+
|GROUP BY product, region WITH CUBE
5071+
|HAVING count(product) > 2
5072+
|ORDER BY s DESC""".stripMargin)
5073+
5074+
checkAnswer(df, Seq(Row(null, null, 820), Row(null, "east", 420), Row("a", null, 370)))
5075+
}
50605076
}
50615077

50625078
case class Foo(bar: Option[String])

0 commit comments

Comments
 (0)