Skip to content

Commit 34d7a3c

Browse files
dengzimingcloud-fan
authored andcommitted
[SPARK-52975][SQL] Simplify field names in pushdown join sql
### What changes were proposed in this pull request? When pushing down join SQL, we generated aliases for duplicated names, but the aliases are too long to read and nondeterministic. Before this change: ``` SELECT "ID_bf822dc6_e06d_492c_a489_1e92a6fe84a0","AMOUNT_c9f3fc67_62f8_4ec6_9c3f_b7ee7bafcb5a","ADDRESS_d937a313_3e09_4b97_b91f_b2a47ef5e31d","ID","AMOUNT","ADDRESS" FROM xxxx RelationV2[ID_bf822dc6_e06d_492c_a489_1e92a6fe84a0#18, AMOUNT_c9f3fc67_62f8_4ec6_9c3f_b7ee7bafcb5a#19, ADDRESS_d937a313_3e09_4b97_b91f_b2a47ef5e31d#20, ID#21, AMOUNT#22, ADDRESS#23] join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1 ``` After this change. ``` SELECT "ID","AMOUNT","ADDRESS","ID_1","AMOUNT_1","ADDRESS_1" FROM xxx RelationV2[ID#18, AMOUNT#19, ADDRESS#20, ID_1#21, AMOUNT_1#22, ADDRESS_1#23] join_pushdown_catalog.JOIN_SCHEMA.JOIN_TABLE_1 ``` ### Why are the changes needed? Make code-generated JDBC SQL clearer and deterministic. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests can ensure no side effects are introduced. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Trae. Closes #51686 from dengziming/SPARK-52975. Authored-by: dengziming <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 712ef08 commit 34d7a3c

File tree

5 files changed

+314
-58
lines changed

5 files changed

+314
-58
lines changed

connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/join/MySQLJoinPushdownIntegrationSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class MySQLJoinPushdownIntegrationSuite
4343

4444
override def caseConvert(identifier: String): String = identifier.toUpperCase(Locale.ROOT)
4545

46+
override def remainColumnCase(identifier: String): String = "`" + identifier + "`"
47+
4648
// This method comes from DockerJDBCIntegrationSuite
4749
override def dataPreparation(connection: Connection): Unit = {
4850
super.dataPreparation()

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala

Lines changed: 79 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.datasources.v2
1919

20+
import java.util.Locale
21+
2022
import scala.collection.mutable
2123

2224
import org.apache.spark.internal.LogKeys.{AGGREGATE_FUNCTIONS, COLUMN_NAMES, GROUP_BY_EXPRS, JOIN_CONDITION, JOIN_TYPE, POST_SCAN_FILTERS, PUSHED_FILTERS, RELATION_NAME, RELATION_OUTPUT}
@@ -137,42 +139,11 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
137139
// Cross joins are not supported because they increase the amount of data.
138140
condition.isDefined &&
139141
lBuilder.isOtherSideCompatibleForJoin(rBuilder) =>
140-
val leftSideRequiredColumnNames = getRequiredColumnNames(leftProjections, leftHolder)
141-
val rightSideRequiredColumnNames = getRequiredColumnNames(rightProjections, rightHolder)
142-
143-
// Alias the duplicated columns from left side of the join. We are creating the
144-
// Map[String, Int] to tell how many times each column name has occured within one side.
145-
val leftSideNameCounts: Map[String, Int] =
146-
leftSideRequiredColumnNames.groupBy(identity).view.mapValues(_.size).toMap
147-
val rightSideNameCounts: Map[String, Int] =
148-
rightSideRequiredColumnNames.groupBy(identity).view.mapValues(_.size).toMap
149-
// It's more performant to call contains on Set than on Seq
150-
val rightSideColumnNamesSet = rightSideRequiredColumnNames.toSet
151-
152-
val leftSideRequiredColumnsWithAliases = leftSideRequiredColumnNames.map { name =>
153-
val aliasName =
154-
if (leftSideNameCounts(name) > 1 || rightSideColumnNamesSet.contains(name)) {
155-
generateJoinOutputAlias(name)
156-
} else {
157-
null
158-
}
159-
160-
new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
161-
}
162-
163-
// Aliasing of duplicated columns in right side is done only if there are duplicates in
164-
// right side only. There won't be a conflict with left side columns because they are
165-
// already aliased.
166-
val rightSideRequiredColumnsWithAliases = rightSideRequiredColumnNames.map { name =>
167-
val aliasName =
168-
if (rightSideNameCounts(name) > 1) {
169-
generateJoinOutputAlias(name)
170-
} else {
171-
null
172-
}
173-
174-
new SupportsPushDownJoin.ColumnWithAlias(name, aliasName)
175-
}
142+
// Process left and right columns in original order
143+
val (leftSideRequiredColumnsWithAliases, rightSideRequiredColumnsWithAliases) =
144+
generateColumnAliasesForDuplicatedName(
145+
getRequiredColumnNames(leftProjections, leftHolder),
146+
getRequiredColumnNames(rightProjections, rightHolder))
176147

177148
// Create the AttributeMap that holds (Attribute -> Attribute with up to date name) mapping.
178149
val pushedJoinOutputMap = AttributeMap[Expression](
@@ -249,11 +220,80 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] with PredicateHelper {
249220
node
250221
}
251222
}
223+
/**
224+
* Generates unique column aliases for join operations to avoid naming conflicts.
225+
* Handles case sensitivity issues across different databases (SQL Server, MySQL, etc.).
226+
*
227+
* @param leftSideRequiredColumnNames Columns from the left side of the join
228+
* @param rightSideRequiredColumnNames Columns from the right side of the join
229+
* @return Tuple of (leftColumnsWithAliases, rightColumnsWithAliases)
230+
*/
231+
private[v2] def generateColumnAliasesForDuplicatedName(
232+
leftSideRequiredColumnNames: Array[String],
233+
rightSideRequiredColumnNames: Array[String]
234+
): (Array[SupportsPushDownJoin.ColumnWithAlias],
235+
Array[SupportsPushDownJoin.ColumnWithAlias]) = {
236+
// Normalize all column names to lowercase for case-insensitive comparison
237+
val normalizeCase: String => String = _.toLowerCase(Locale.ROOT)
238+
239+
// Count occurrences of each column name (case-insensitive)
240+
val allRequiredColumnNames = leftSideRequiredColumnNames ++ rightSideRequiredColumnNames
241+
val allNameCounts: Map[String, Int] =
242+
allRequiredColumnNames.map(normalizeCase)
243+
.groupBy(identity)
244+
.view
245+
.mapValues(_.length)
246+
.toMap
247+
248+
// Track claimed aliases using normalized names.
249+
// Use Set for O(1) lookups when checking existing column names, claim all names
250+
// that appears only once to ensure they have highest priority.
251+
val allClaimedAliases = mutable.Set.from(
252+
allNameCounts.filter(_._2 == 1).keys
253+
)
254+
255+
// Track suffix index for each base column name (starts at 0) to avoid extreme worst
256+
// case of O(n^2) alias generation.
257+
val aliasSuffixIndex = mutable.HashMap[String, Int]().withDefaultValue(0)
258+
259+
def processColumn(originalName: String): SupportsPushDownJoin.ColumnWithAlias = {
260+
val normalizedName = normalizeCase(originalName)
261+
262+
// No alias needed for unique column names
263+
if (allNameCounts(normalizedName) == 1) {
264+
new SupportsPushDownJoin.ColumnWithAlias(originalName, null)
265+
} else {
266+
var attempt = aliasSuffixIndex(normalizedName)
267+
var candidate = if (attempt == 0) originalName else s"${originalName}_$attempt"
268+
var normalizedCandidate = normalizeCase(candidate)
269+
270+
// Find first available unique alias, use original name for the first attempt, then append
271+
// suffix for more attempts.
272+
while (allClaimedAliases.contains(normalizedCandidate)) {
273+
attempt += 1
274+
candidate = s"${originalName}_$attempt"
275+
normalizedCandidate = normalizeCase(candidate)
276+
}
277+
278+
// Update tracking state
279+
aliasSuffixIndex(normalizedName) = attempt + 1
280+
allClaimedAliases.add(normalizedCandidate)
252281

253-
def generateJoinOutputAlias(name: String): String =
254-
s"${name}_${java.util.UUID.randomUUID().toString.replace("-", "_")}"
282+
if (originalName == candidate) {
283+
new SupportsPushDownJoin.ColumnWithAlias(originalName, null)
284+
} else {
285+
new SupportsPushDownJoin.ColumnWithAlias(originalName, candidate)
286+
}
287+
}
288+
}
289+
290+
(
291+
leftSideRequiredColumnNames.map(processColumn),
292+
rightSideRequiredColumnNames.map(processColumn)
293+
)
294+
}
255295

256-
// projections' names are maybe not up to date if the joins have been previously pushed down.
296+
// Projections' names are maybe not up to date if the joins have been previously pushed down.
257297
// For this reason, we need to use pushedJoinOutputMap to get up to date names.
258298
def getRequiredColumnNames(
259299
projections: Seq[NamedExpression],

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourcePushdownTestUtils.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,7 @@ trait DataSourcePushdownTestUtils extends ExplainSuiteHelper {
182182

183183
assert(dfSchema.length == schema.length)
184184
dfSchema.fields.zip(schema.fields).foreach { case (f1, f2) =>
185-
if (f2.name.nonEmpty) {
186-
assert(f1.name == f2.name)
187-
}
185+
assert(f1.name == f2.name)
188186
assert(f1.dataType == f2.dataType)
189187
assert(f1.nullable == f2.nullable)
190188
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.datasources.v2
19+
20+
import java.util.Locale
21+
22+
import org.apache.spark.SparkFunSuite
23+
import org.apache.spark.sql.connector.read.SupportsPushDownJoin.ColumnWithAlias
24+
25+
class DSV2JoinPushDownAliasGenerationSuite extends SparkFunSuite {
26+
27+
private def assertAliases(
28+
leftInput: Array[String],
29+
rightInput: Array[String],
30+
expectedLeft: Array[ColumnWithAlias],
31+
expectedRight: Array[ColumnWithAlias]
32+
): Unit = {
33+
val (actualLeft, actualRight) = V2ScanRelationPushDown
34+
.generateColumnAliasesForDuplicatedName(leftInput, rightInput)
35+
36+
val uniqName: ColumnWithAlias => String = col => {
37+
if (col.alias() == null) col.colName() else col.alias().toLowerCase(Locale.ROOT)
38+
}
39+
// Ensure no duplicate column names after ignoring capitalization
40+
assert((actualLeft ++ actualRight).map(uniqName).distinct.length
41+
== actualLeft.length + actualRight.length)
42+
43+
assert(
44+
actualLeft === expectedLeft,
45+
s"""Left side aliases mismatch.
46+
|Expected: ${expectedLeft.map(_.alias()).mkString(", ")}
47+
|Actual: ${actualLeft.map(_.alias()).mkString(", ")}""".stripMargin
48+
)
49+
50+
assert(
51+
actualRight === expectedRight,
52+
s"""Right side aliases mismatch.
53+
|Expected: ${expectedRight.map(_.alias()).mkString(", ")}
54+
|Actual: ${actualRight.map(_.alias()).mkString(", ")}""".stripMargin
55+
)
56+
}
57+
58+
test("Basic case with no duplicate column names") {
59+
assertAliases(
60+
leftInput = Array("id", "name"),
61+
rightInput = Array("email", "phone"),
62+
expectedLeft = Array(
63+
new ColumnWithAlias("id", null),
64+
new ColumnWithAlias("name", null)
65+
),
66+
expectedRight = Array(
67+
new ColumnWithAlias("email", null),
68+
new ColumnWithAlias("phone", null)
69+
)
70+
)
71+
}
72+
73+
test("Extreme duplication scenarios") {
74+
assertAliases(
75+
leftInput = Array("id", "id", "id"),
76+
rightInput = Array("id", "id"),
77+
expectedLeft = Array(
78+
new ColumnWithAlias("id", null),
79+
new ColumnWithAlias("id", "id_1"),
80+
new ColumnWithAlias("id", "id_2")
81+
),
82+
expectedRight = Array(
83+
new ColumnWithAlias("id", "id_3"),
84+
new ColumnWithAlias("id", "id_4")
85+
)
86+
)
87+
}
88+
89+
test("Exact duplicate column names") {
90+
assertAliases(
91+
leftInput = Array("id", "name"),
92+
rightInput = Array("id", "name"),
93+
expectedLeft = Array(
94+
new ColumnWithAlias("id", null),
95+
new ColumnWithAlias("name", null)
96+
),
97+
expectedRight = Array(
98+
new ColumnWithAlias("id", "id_1"),
99+
new ColumnWithAlias("name", "name_1")
100+
)
101+
)
102+
}
103+
104+
test("Columns with numeric suffixes (id vs id_1)") {
105+
assertAliases(
106+
leftInput = Array("id", "id_1", "name"),
107+
rightInput = Array("id", "name", "value"),
108+
expectedLeft = Array(
109+
new ColumnWithAlias("id", null),
110+
new ColumnWithAlias("id_1", null),
111+
new ColumnWithAlias("name", null)
112+
),
113+
expectedRight = Array(
114+
new ColumnWithAlias("id", "id_2"),
115+
new ColumnWithAlias("name", "name_1"),
116+
new ColumnWithAlias("value", null)
117+
)
118+
)
119+
}
120+
121+
test("Case-sensitive conflicts (ID vs id)") {
122+
assertAliases(
123+
leftInput = Array("ID", "Name"),
124+
rightInput = Array("id", "name"),
125+
expectedLeft = Array(
126+
new ColumnWithAlias("ID", null),
127+
new ColumnWithAlias("Name", null)
128+
),
129+
expectedRight = Array(
130+
new ColumnWithAlias("id", "id_1"),
131+
new ColumnWithAlias("name", "name_1")
132+
)
133+
)
134+
}
135+
136+
test("Mixed case and numeric suffixes") {
137+
assertAliases(
138+
leftInput = Array("UserID", "user_id", "user_id_1"),
139+
rightInput = Array("userId", "USER_ID", "user_id_2"),
140+
expectedLeft = Array(
141+
new ColumnWithAlias("UserID", null),
142+
new ColumnWithAlias("user_id", null),
143+
new ColumnWithAlias("user_id_1", null)
144+
),
145+
expectedRight = Array(
146+
new ColumnWithAlias("userId", "userId_1"),
147+
new ColumnWithAlias("USER_ID", "USER_ID_3"),
148+
new ColumnWithAlias("user_id_2", null)
149+
)
150+
)
151+
}
152+
}

0 commit comments

Comments
 (0)