-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-52975][SQL] Simplify field names in pushdown join sql #51686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dengziming this way, if you have left side column COL
and right side columns COL, COL_0
, alias generator will generate COL_0
which would conflict with COL_0
from right side.
Good catch @PetarVasiljevic-DB , let me think another way. |
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, the generated text is much clearer, and more importantly, it is deterministic now. Thanks for the change!
By the way, could we move generateColumnAliasesForDuplicatedName
under the pushdownJoin
. Or above, doesn't really matter, I just find it too big have it as a nested method.
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala
Show resolved
Hide resolved
…l [0,1,2,3,0,-1,-2,-3]
@@ -657,7 +657,7 @@ trait JDBCV2JoinPushdownIntegrationSuiteBase | |||
withSQLConf(SQLConf.DATA_SOURCE_V2_JOIN_PUSHDOWN.key -> "true") { | |||
val df = sql(sqlQuery) | |||
val row = df.collect()(0) | |||
assert(row == Row(0, 1, 2, 3, 0, -1, -2, -3)) | |||
assert(row.toString == Row(0, 1, 2, 3, 0, -1, -2, -3).toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that Oracle will use DecimalType
, so we can't compare Row directly.
Hello @cloud-fan |
|
||
test("Test complex duplicate column name alias") { | ||
sql(s"create table $catalogAndNamespace.t1(id int, id_1 int, id_2 int, id_1_1 int)") | ||
sql(s"create table $catalogAndNamespace.t2(id int, id_1 int, id_2 int, id_2_1 int)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we create them in def tablePreparation
?
// Count occurrences of each column name across both sides to identify duplicates. | ||
val allRequiredColumnNames = leftSideRequiredColumnNames ++ rightSideRequiredColumnNames | ||
val allNameCounts: Map[String, Int] = | ||
allRequiredColumnNames.groupBy(identity).view.mapValues(_.size).toMap |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we consider case sensitivity? if the left side has col
and right side has COL
, do we need to generate alias?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's necessary after some investigation, if our sql is select * from a(id,sid) join b(id,Sid)
, we can have 2 versions of SQL pushdown to database:
select id, sid, id_1, Sid from (select id, sid from a) join (select id as id_1, Sid from b)
select id, sid, id_1, sid_1 from (select id, sid from a) join (select id as id_1, Sid as sid_1 from b)
I added this to my test case to show version 1 also can work, and version 2 doesn't make the sql clearer.
Is it possible we will meet AMBIGUOUS_REFERENCE
in version 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generated SQL is being processed by the underlying database, so we assume all dialects are case sensitive?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I thought they were case sensitive at first, but I tested locally and found that SqlServer is not case sensitive, so I have updated this PR, please review my latest commit and latest comment here: #51686 (comment)
allRequiredColumnNames.groupBy(identity).view.mapValues(_.size).toMap | ||
// Use Set for O(1) lookups when checking existing column names, claim all names | ||
// that appears only once to ensure they have highest priority. | ||
val allClaimedAliases = mutable.HashSet.empty ++ allNameCounts.filter(_._2 == 1).keySet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val allClaimedAliases = mutable.HashSet.empty ++ allNameCounts.filter(_._2 == 1).keySet | |
val allClaimedAliases = allNameCounts.filter(_._2 == 1).keySet.to[mutable.Set] |
@cloud-fan, your idea is worth considering. SQL Server will get "Ambiguous column name 'sid'" when running my test. so we need to generate different alais if 2 columns equal ignore case. Please review my latest commit, cc @PetarVasiljevic-DB |
...c/test/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDownSuite.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
import org.apache.spark.SparkFunSuite | ||
import org.apache.spark.sql.connector.read.SupportsPushDownJoin.ColumnWithAlias | ||
|
||
class V2ScanRelationPushDownSuite extends SparkFunSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class V2ScanRelationPushDownSuite extends SparkFunSuite { | |
class DSV2JoinPushDownAliasGenerationSuite extends SparkFunSuite { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
...ore/src/test/scala/org/apache/spark/sql/jdbc/v2/JDBCV2JoinPushdownIntegrationSuiteBase.scala
Outdated
Show resolved
Hide resolved
@cloud-fan comments resolved. |
thanks, merging to master! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @dengziming and @cloud-fan .
This seems to break non-ANSI GitHub CI. Could you take a look at the failure?
https://github.com/apache/spark/actions/workflows/build_non_ansi.yml

[info] - scan with filter push-down with date time functions *** FAILED *** (531 milliseconds)
[info] List(Filter (month(cast(DATE1#3188 as date)) = 5)
[info] +- RelationV2[NAME#3187, DATE1#3188] oracle.SYSTEM.DATETIME
[info] ) was not empty (DataSourcePushdownTestUtils.scala:44)
I will take a look right now. |
Thank you, @dengziming . |
What changes were proposed in this pull request?
When pushing down join SQL, we generated aliases for duplicated names, but the aliases are too long to read and nondeterministic.
Before this change:
After this change.
Why are the changes needed?
Make code-generated JDBC SQL clearer and deterministic.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests can ensure no side effects are introduced.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Trae.