Skip to content

[SPARK-52565] [SQL] Enforce ordinal resolution before other sort order expressions #51268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,49 +24,68 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
/**
* A virtual rule to resolve [[UnresolvedAttribute]] in [[Sort]]. It's only used by the real
* rule `ResolveReferences`. The column resolution order for [[Sort]] is:
* 1. Resolves the column to [[AttributeReference]] with the output of the child plan. This
* 1. Checks whether there are [[UnresolvedOrdinal]]s in the sort order list. In case there are
* delay the resolution until we resolve all the ordinals. Without this check, we proceed to
* resolve the following query correctly:
* {{{ SELECT col1 FROM VALUES(1, 2) ORDER BY 2, col2; }}}
* That's because we add missing input in `ResolveReferencesInSort` to the underlying operator
* and then successfully resolve the ordinal because at that point there are two elements below.
* 2. Resolves the column to [[AttributeReference]] with the output of the child plan. This
* includes metadata columns as well.
* 2. Resolves the column to a literal function which is allowed to be invoked without braces, e.g.
* 3. Resolves the column to a literal function which is allowed to be invoked without braces, e.g.
* `SELECT col, current_date FROM t`.
* 3. If the child plan is Aggregate or Filter(_, Aggregate), resolves the column to
* 4. If the child plan is Aggregate or Filter(_, Aggregate), resolves the column to
* [[TempResolvedColumn]] with the output of Aggregate's child plan.
* This is to allow Sort to host grouping expressions and aggregate functions, which can
* be pushed down to the Aggregate later. For example,
* `SELECT max(a) FROM t GROUP BY b HAVING max(a) > 1 ORDER BY min(a)`.
* 4. Resolves the column to [[AttributeReference]] with the output of a descendant plan node.
* 5. Resolves the column to [[AttributeReference]] with the output of a descendant plan node.
* Spark will propagate the missing attributes from the descendant plan node to the Sort node.
* This is to allow users to ORDER BY columns that are not in the SELECT clause, which is
* widely supported in other SQL dialects. For example, `SELECT a FROM t ORDER BY b`.
* 5. If the order by expressions only have one single unresolved column named ALL, expanded it to
* 6. If the order by expressions only have one single unresolved column named ALL, expanded it to
* include all columns in the SELECT list. This is to support SQL pattern like
* `SELECT col1, col2 FROM t ORDER BY ALL`. This should also support specifying asc/desc, and
* nulls first/last.
* 6. Resolves the column to outer references with the outer plan if we are resolving subquery
* 7. Resolves the column to outer references with the outer plan if we are resolving subquery
* expressions.
*
* Note, 3 and 4 are actually orthogonal. If the child plan is Aggregate, 4 can only resolve columns
* as the grouping columns, which is completely covered by 3.
* Note, 4 and 5 are actually orthogonal. If the child plan is Aggregate, 5 can only resolve columns
* as the grouping columns, which is completely covered by 4.
*/
class ResolveReferencesInSort(val catalogManager: CatalogManager)
extends SQLConfHelper with ColumnResolutionHelper {

def apply(s: Sort): LogicalPlan = {
val resolvedBasic = s.order.map(resolveExpressionByPlanOutput(_, s.child))
val resolvedWithAgg = s.child match {
case Filter(_, agg: Aggregate) => resolvedBasic.map(resolveColWithAgg(_, agg))
case _ => resolvedBasic.map(resolveColWithAgg(_, s.child))
}
val (missingAttrResolved, newChild) = resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
val orderByAllResolved = resolveOrderByAll(
s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder]))
val resolvedFinal = orderByAllResolved
.map(e => resolveColsLastResort(e).asInstanceOf[SortOrder])
if (s.child.output == newChild.output) {
s.copy(order = resolvedFinal)
if (resolvedOrdinals(s)) {
s
} else {
// Add missing attributes and then project them away.
val newSort = s.copy(order = resolvedFinal, child = newChild)
Project(s.child.output, newSort)
val resolvedBasic = s.order.map(resolveExpressionByPlanOutput(_, s.child))
val resolvedWithAgg = s.child match {
case Filter(_, agg: Aggregate) => resolvedBasic.map(resolveColWithAgg(_, agg))
case _ => resolvedBasic.map(resolveColWithAgg(_, s.child))
}
val (missingAttrResolved, newChild) = resolveExprsAndAddMissingAttrs(resolvedWithAgg, s.child)
val orderByAllResolved = resolveOrderByAll(
s.global, newChild, missingAttrResolved.map(_.asInstanceOf[SortOrder]))
val resolvedFinal = orderByAllResolved
.map(e => resolveColsLastResort(e).asInstanceOf[SortOrder])
if (s.child.output == newChild.output) {
s.copy(order = resolvedFinal)
} else {
// Add missing attributes and then project them away.
val newSort = s.copy(order = resolvedFinal, child = newChild)
Project(s.child.output, newSort)
}
}
}

private def resolvedOrdinals(sort: Sort): Boolean = {
sort.order.exists { sortOrder =>
sortOrder.child match {
case _: UnresolvedOrdinal => true
case _ => false
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,153 @@ Sort [a#x DESC NULLS LAST], false
+- LocalRelation [a#x, b#x]


-- !query
SELECT a FROM data ORDER BY 2, b
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "ORDER_BY_POS_OUT_OF_RANGE",
"sqlState" : "42805",
"messageParameters" : {
"index" : "2",
"size" : "1"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 29,
"stopIndex" : 29,
"fragment" : "2"
} ]
}


-- !query
SELECT a FROM data ORDER BY b, 2
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "ORDER_BY_POS_OUT_OF_RANGE",
"sqlState" : "42805",
"messageParameters" : {
"index" : "2",
"size" : "1"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 32,
"stopIndex" : 32,
"fragment" : "2"
} ]
}


-- !query
SELECT a FROM data ORDER BY 'b', 2
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "ORDER_BY_POS_OUT_OF_RANGE",
"sqlState" : "42805",
"messageParameters" : {
"index" : "2",
"size" : "1"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 34,
"stopIndex" : 34,
"fragment" : "2"
} ]
}


-- !query
SELECT a FROM data ORDER BY `b`, 2
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "ORDER_BY_POS_OUT_OF_RANGE",
"sqlState" : "42805",
"messageParameters" : {
"index" : "2",
"size" : "1"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 34,
"stopIndex" : 34,
"fragment" : "2"
} ]
}


-- !query
SELECT a FROM data ORDER BY a, 2
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "ORDER_BY_POS_OUT_OF_RANGE",
"sqlState" : "42805",
"messageParameters" : {
"index" : "2",
"size" : "1"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 32,
"stopIndex" : 32,
"fragment" : "2"
} ]
}


-- !query
SELECT a FROM data ORDER BY b, 3
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "ORDER_BY_POS_OUT_OF_RANGE",
"sqlState" : "42805",
"messageParameters" : {
"index" : "3",
"size" : "1"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 32,
"stopIndex" : 32,
"fragment" : "3"
} ]
}


-- !query
SELECT a, a + 1 FROM data ORDER BY b, 3
-- !query analysis
org.apache.spark.sql.AnalysisException
{
"errorClass" : "ORDER_BY_POS_OUT_OF_RANGE",
"sqlState" : "42805",
"messageParameters" : {
"index" : "3",
"size" : "2"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 39,
"stopIndex" : 39,
"fragment" : "3"
} ]
}


-- !query
set spark.sql.orderByOrdinal=false
-- !query analysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ select * from data order by 3;
-- sort by ordinal
select * from data sort by 1 desc;

-- SPARK-52565: Enforce ordinal resolution before other sort order expressions
SELECT a FROM data ORDER BY 2, b;
SELECT a FROM data ORDER BY b, 2;
SELECT a FROM data ORDER BY 'b', 2;
SELECT a FROM data ORDER BY `b`, 2;
SELECT a FROM data ORDER BY a, 2;
SELECT a FROM data ORDER BY b, 3;
SELECT a, a + 1 FROM data ORDER BY b, 3;

-- turn off order by ordinal
set spark.sql.orderByOrdinal=false;

Expand Down
Loading